Merge pull request #779 from zxzxwu/l2cap

L2CAP Enhanced Retransmission mode
This commit is contained in:
zxzxwu
2025-12-03 21:57:48 +08:00
committed by GitHub
6 changed files with 857 additions and 114 deletions

View File

@@ -19,6 +19,7 @@ import asyncio
import logging
import os
import random
import struct
import pytest
@@ -342,6 +343,76 @@ async def test_mtu():
assert client_channel.peer_mtu == 345
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_enhanced_retransmission_mode():
devices = TwoDevices()
await devices.setup_connection()
server_channels = asyncio.Queue[l2cap.ClassicChannel]()
server = devices.devices[1].create_l2cap_server(
spec=l2cap.ClassicChannelSpec(
mode=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION
),
handler=server_channels.put_nowait,
)
client_channel = await devices.connections[0].create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(
server.psm, mode=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION
)
)
server_channel = await server_channels.get()
sinks = [asyncio.Queue[bytes]() for _ in range(2)]
server_channel.sink = sinks[0].put_nowait
client_channel.sink = sinks[1].put_nowait
for i in range(128):
server_channel.write(struct.pack('<I', i))
for i in range(128):
assert (await sinks[1].get()) == struct.pack('<I', i)
for i in range(128):
client_channel.write(struct.pack('<I', i))
for i in range(128):
assert (await sinks[0].get()) == struct.pack('<I', i)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'server_mode, client_mode',
[
(l2cap.TransmissionMode.BASIC, l2cap.TransmissionMode.ENHANCED_RETRANSMISSION),
(l2cap.TransmissionMode.ENHANCED_RETRANSMISSION, l2cap.TransmissionMode.BASIC),
],
)
@pytest.mark.asyncio
async def test_mode_mismatching(server_mode, client_mode):
devices = TwoDevices()
await devices.setup_connection()
server = devices.devices[1].create_l2cap_server(
spec=l2cap.ClassicChannelSpec(mode=server_mode)
)
with pytest.raises(l2cap.L2capError):
await devices.connections[0].create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(psm=server.psm, mode=client_mode)
)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'cid, payload, expected',
[
(0x0040, '020000010203040506070809', '0E0040000200000102030405060708093861'),
(0x0040, '0101', '040040000101D414'),
],
)
def test_fcs(cid: int, payload: str, expected: str):
'''Core Spec 6.1, Vol 3, Part A, 3.3.5. Frame Check Sequence.'''
pdu = l2cap.L2CAP_PDU(cid, bytes.fromhex(payload))
assert pdu.to_bytes(with_fcs=True) == bytes.fromhex(expected)
# -----------------------------------------------------------------------------
async def run():
test_helpers()