L2CAP: Enhanced Retransmission Mode

This commit is contained in:
Josh Wu
2025-09-17 01:24:18 +08:00
parent bae6c1df97
commit 57e05781ad
4 changed files with 627 additions and 49 deletions

View File

@@ -342,6 +342,39 @@ async def test_mtu():
assert client_channel.peer_mtu == 345
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_enhanced_retransmission_channel():
devices = TwoDevices()
await devices.setup_connection()
server_channels = asyncio.Queue[l2cap.ClassicChannel]()
server = devices.devices[1].create_l2cap_server(
spec=l2cap.ClassicChannelSpec(), handler=server_channels.put_nowait
)
client_channel = await devices.connections[0].create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(
server.psm, mode=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION
)
)
server_channel = await server_channels.get()
assert isinstance(client_channel.processor, l2cap.EnhancedRetransmissionProcessor)
assert isinstance(server_channel.processor, l2cap.EnhancedRetransmissionProcessor)
sinks = [asyncio.Queue[bytes]() for _ in range(2)]
server_channel.sink = sinks[0].put_nowait
client_channel.sink = sinks[1].put_nowait
for _ in range(128):
server_channel.write(b'123')
for _ in range(128):
assert (await sinks[1].get()) == b'123'
for _ in range(128):
client_channel.write(b'456')
for _ in range(128):
assert (await sinks[0].get()) == b'456'
# -----------------------------------------------------------------------------
async def run():
test_helpers()