From 456cb59b48356b1c1e82caa8b293a6321047a0af Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 26 Sep 2025 21:59:56 +0800 Subject: [PATCH] L2CAP: FCS Implementation --- bumble/device.py | 2 + bumble/host.py | 29 +- bumble/l2cap.py | 341 +++++++++++------- bumble/utils.py | 17 + ...n_l2cap_server.py => run_classic_l2cap.py} | 54 ++- tests/l2cap_test.py | 62 +++- 6 files changed, 335 insertions(+), 170 deletions(-) rename examples/{run_l2cap_server.py => run_classic_l2cap.py} (57%) diff --git a/bumble/device.py b/bumble/device.py index afc01fc0..069cea5d 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -2071,6 +2071,8 @@ class DeviceConfiguration: enhanced_retransmission_supported: bool = False l2cap_extended_features: Sequence[int] = ( l2cap.L2CAP_Information_Request.ExtendedFeatures.FIXED_CHANNELS, + l2cap.L2CAP_Information_Request.ExtendedFeatures.FCS_OPTION, + l2cap.L2CAP_Information_Request.ExtendedFeatures.ENHANCED_RETRANSMISSION_MODE, ) def __post_init__(self) -> None: diff --git a/bumble/host.py b/bumble/host.py index bc8a7920..b470eee1 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -707,7 +707,7 @@ class Host(utils.EventEmitter): asyncio.create_task(send_command(command)) - def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: + def send_acl_sdu(self, connection_handle: int, sdu: bytes) -> None: if not (connection := self.connections.get(connection_handle)): logger.warning(f'connection 0x{connection_handle:04X} not found') return @@ -718,27 +718,24 @@ class Host(utils.EventEmitter): ) return - # Create a PDU - l2cap_pdu = bytes(L2CAP_PDU(cid, pdu)) - # Send the data to the controller via ACL packets - bytes_remaining = len(l2cap_pdu) - offset = 0 - pb_flag = 0 - while bytes_remaining: - data_total_length = min(bytes_remaining, packet_queue.max_packet_size) + max_packet_size = packet_queue.max_packet_size + for offset in range(0, len(sdu), max_packet_size): + pdu = sdu[offset : offset + max_packet_size] acl_packet = hci.HCI_AclDataPacket( connection_handle=connection_handle, - pb_flag=pb_flag, + pb_flag=1 if offset > 0 else 0, bc_flag=0, - data_total_length=data_total_length, - data=l2cap_pdu[offset : offset + data_total_length], + data_total_length=len(pdu), + data=pdu, + ) + logger.debug( + '>>> ACL packet enqueue: (Handle=0x%04X) %s', connection_handle, pdu ) - logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}') packet_queue.enqueue(acl_packet, connection_handle) - pb_flag = 1 - offset += data_total_length - bytes_remaining -= data_total_length + + def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: + self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu))) def get_data_packet_queue(self, connection_handle: int) -> DataPacketQueue | None: if connection := self.connections.get(connection_handle): diff --git a/bumble/l2cap.py b/bumble/l2cap.py index 387d9fae..72b39f7e 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -23,18 +23,8 @@ import enum import logging import struct from collections import deque -from collections.abc import Sequence -from typing import ( - TYPE_CHECKING, - Any, - Callable, - ClassVar, - Iterable, - Optional, - SupportsBytes, - TypeVar, - Union, -) +from collections.abc import Callable, Iterable, Sequence +from typing import TYPE_CHECKING, Any, ClassVar, Optional, SupportsBytes, TypeVar, Union from typing_extensions import override @@ -73,8 +63,8 @@ L2CAP_MAX_BR_EDR_MTU = 65535 L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept L2CAP_DEFAULT_MPS = 1010 # Default value for the MPS we are willing to accept -DEFAULT_TX_WINDOW_SIZE = 10 -DEFAULT_MAX_RETRANSMISSION = 10 +DEFAULT_TX_WINDOW_SIZE = 63 +DEFAULT_MAX_RETRANSMISSION = 1 DEFAULT_RETRANSMISSION_TIMEOUT = 2.0 DEFAULT_MONITOR_TIMEOUT = 12.0 @@ -160,6 +150,11 @@ class TransmissionMode(utils.OpenIntEnum): # pylint: disable=invalid-name +class L2capError(ProtocolError): + def __init__(self, error_code, error_name='', details=''): + super().__init__(error_code, 'L2CAP', error_name, details) + + @dataclasses.dataclass class ClassicChannelSpec: '''Spec of L2CAP Channel over Classic Transport. @@ -177,16 +172,18 @@ class ClassicChannelSpec: monitor_timeout: The interval at which S-frames should be transmitted on the return channel when no frames are received on the forward channel. mode: The transmission mode to use. + fcs_enabled: Whether to enable FCS (Frame Check Sequence). ''' psm: Optional[int] = None mtu: int = L2CAP_DEFAULT_MTU - mps: int = L2CAP_DEFAULT_MTU + mps: int = L2CAP_DEFAULT_MPS tx_window_size: int = DEFAULT_TX_WINDOW_SIZE max_retransmission: int = DEFAULT_MAX_RETRANSMISSION retransmission_timeout: float = DEFAULT_RETRANSMISSION_TIMEOUT monitor_timeout: float = DEFAULT_MONITOR_TIMEOUT mode: TransmissionMode = TransmissionMode.BASIC + fcs_enabled: bool = False @dataclasses.dataclass @@ -219,20 +216,29 @@ class L2CAP_PDU: See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT ''' - @staticmethod - def from_bytes(data: bytes) -> L2CAP_PDU: + @classmethod + def from_bytes(cls, data: bytes) -> L2CAP_PDU: # Check parameters if len(data) < 4: raise InvalidPacketError('not enough data for L2CAP header') - _, l2cap_pdu_cid = struct.unpack_from(' bytes: - header = struct.pack(' bytes: + length = len(self.payload) + if with_fcs: + length += 2 + header = struct.pack(' None: self.cid = cid @@ -865,7 +871,7 @@ class L2CAP_Credit_Based_Reconfigure_Response(L2CAP_Control_Frame): # ----------------------------------------------------------------------------- -class BaseProcessor: +class Processor: def __init__(self, channel: ClassicChannel) -> None: self.channel = channel @@ -877,7 +883,7 @@ class BaseProcessor: # TODO: Handle retransmission -class EnhancedRetransmissionProcessor(BaseProcessor): +class EnhancedRetransmissionProcessor(Processor): MAX_SEQ_NUM = 64 @@ -916,24 +922,30 @@ class EnhancedRetransmissionProcessor(BaseProcessor): @classmethod def _num_frames_between(cls, low: int, high: int) -> int: if high < low: - high += cls.MAX_SEQ_NUM + 1 + high += cls.MAX_SEQ_NUM return high - low - def __init__(self, channel: ClassicChannel): + def __init__( + self, + channel: ClassicChannel, + peer_tx_window_size: int = DEFAULT_TX_WINDOW_SIZE, + peer_max_retransmission: int = DEFAULT_MAX_RETRANSMISSION, + peer_mps: int = L2CAP_DEFAULT_MPS, + ): spec = channel.spec self.mps = spec.mps - self.peer_mps = 0 - self.tx_window_size = spec.tx_window_size + self.peer_mps = peer_mps + self.peer_tx_window_size = peer_tx_window_size self._pending_pdus = [] self.monitor_timeout = spec.monitor_timeout self.channel = channel self.retransmission_timeout = spec.retransmission_timeout - self.max_retransmission = spec.max_retransmission + self.peer_max_retransmission = peer_max_retransmission def _monitor(self) -> None: if ( - self.max_retransmission <= 0 - or self._num_receiver_ready_polls_sent < self.max_retransmission + self.peer_max_retransmission <= 0 + or self._num_receiver_ready_polls_sent < self.peer_max_retransmission ): self._send_receiver_ready_poll() self._start_monitor() @@ -1028,7 +1040,7 @@ class EnhancedRetransmissionProcessor(BaseProcessor): return for pdu in self._pending_pdus: - if self._num_unacked_frames >= self.tx_window_size: + if self._num_unacked_frames >= self.peer_tx_window_size: return self._send_pdu(pdu) self._last_tx_seq = pdu.tx_seq @@ -1107,7 +1119,7 @@ class ClassicChannel(utils.EventEmitter): connection: Connection mtu: int peer_mtu: int - processor: BaseProcessor + processor: Processor def __init__( self, @@ -1131,12 +1143,15 @@ class ClassicChannel(utils.EventEmitter): self.connection_result = None self.disconnection_result = None self.sink = None + self.fcs_enabled = spec.fcs_enabled self.spec = spec - if spec.mode == TransmissionMode.BASIC: - self.processor = BaseProcessor(self) - elif spec.mode == TransmissionMode.ENHANCED_RETRANSMISSION: - self.processor = EnhancedRetransmissionProcessor(self) - else: + self.mode = spec.mode + # Configure mode-specific processor later on configure request. + self.processor = Processor(self) + if self.mode not in ( + TransmissionMode.BASIC, + TransmissionMode.ENHANCED_RETRANSMISSION, + ): raise InvalidArgumentError(f"Mode {spec.mode} is not supported") def _change_state(self, new_state: State) -> None: @@ -1149,12 +1164,17 @@ class ClassicChannel(utils.EventEmitter): def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None: if self.state != self.State.OPEN: raise InvalidStateError('channel not open') - self.manager.send_pdu(self.connection, self.destination_cid, pdu) + self.manager.send_pdu( + self.connection, self.destination_cid, pdu, self.fcs_enabled + ) def send_control_frame(self, frame: L2CAP_Control_Frame) -> None: self.manager.send_control_frame(self.connection, self.signaling_cid, frame) def on_pdu(self, pdu: bytes) -> None: + if self.fcs_enabled: + # Drop FCS. + pdu = pdu[:-2] self.processor.on_pdu(pdu) def on_sdu(self, sdu: bytes) -> None: @@ -1193,10 +1213,8 @@ class ClassicChannel(utils.EventEmitter): finally: self.connection_result = None - async def disconnect(self) -> None: - if self.state != self.State.OPEN: - raise InvalidStateError('invalid state') - + def _disconnect_sync(self) -> None: + """For internal sync disconnection.""" self._change_state(self.State.WAIT_DISCONNECT) self.send_control_frame( L2CAP_Disconnection_Request( @@ -1209,7 +1227,21 @@ class ClassicChannel(utils.EventEmitter): # Create a future to wait for the state machine to get to a success or error # state self.disconnection_result = asyncio.get_running_loop().create_future() - return await self.disconnection_result + + def _abort_connection_result(self, message: str = 'Connection failure') -> None: + # Cancel pending connection result. + if self.connection_result and not self.connection_result.done(): + self.connection_result.set_exception( + L2capError(error_code=0, error_name=message) + ) + + async def disconnect(self) -> None: + if self.state != self.State.OPEN: + raise InvalidStateError('invalid state') + + self._disconnect_sync() + if self.disconnection_result: + return await self.disconnection_result def abort(self) -> None: if self.state == self.State.OPEN: @@ -1223,21 +1255,28 @@ class ClassicChannel(utils.EventEmitter): struct.pack(' None: - if self.state in (self.State.OPEN, self.State.WAIT_DISCONNECT): - self.send_control_frame( - L2CAP_Disconnection_Response( - identifier=request.identifier, - destination_cid=request.destination_cid, - source_cid=request.source_cid, - ) + self.send_control_frame( + L2CAP_Disconnection_Response( + identifier=request.identifier, + destination_cid=request.destination_cid, + source_cid=request.source_cid, ) - self._change_state(self.State.CLOSED) - self.emit(self.EVENT_CLOSE) - self.manager.on_channel_closed(self) - else: - logger.warning(color('invalid state', 'red')) + ) + self._abort_connection_result() + self._change_state(self.State.CLOSED) + self.emit(self.EVENT_CLOSE) + self.manager.on_channel_closed(self) def on_disconnection_response(self, response: L2CAP_Disconnection_Response) -> None: - if self.state != self.State.WAIT_DISCONNECT: - logger.warning(color('invalid state', 'red')) - return - if ( response.destination_cid != self.destination_cid or response.source_cid != self.source_cid @@ -1702,9 +1760,8 @@ class LeCreditBasedChannel(utils.EventEmitter): self._change_state(self.State.CONNECTED) else: self.connection_result.set_exception( - ProtocolError( + L2capError( response.result, - 'l2cap', L2CAP_LE_Credit_Based_Connection_Response.Result( response.result ).name, @@ -2075,7 +2132,13 @@ class ChannelManager: if connection_handle in self.identifiers: del self.identifiers[connection_handle] - def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None: + def send_pdu( + self, + connection: Connection, + cid: int, + pdu: Union[SupportsBytes, bytes], + with_fcs: bool = False, + ) -> None: pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu) pdu_bytes = bytes(pdu) logger.debug( @@ -2083,7 +2146,9 @@ class ChannelManager: f'on connection [0x{connection.handle:04X}] (CID={cid}) ' f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}' ) - self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes) + self.host.send_acl_sdu( + connection.handle, L2CAP_PDU(cid, bytes(pdu)).to_bytes(with_fcs=with_fcs) + ) def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID): @@ -2660,7 +2725,27 @@ class ChannelManager: try: await channel.connect() except BaseException as e: - del connection_channels[source_cid] + connection_channels.pop(source_cid, None) raise e return channel + + @classmethod + def make_mode_processor( + self, + channel: ClassicChannel, + mode: TransmissionMode, + peer_tx_window_size: int, + peer_max_retransmission: int, + peer_retransmission_timeout: int, + peer_monitor_timeout: int, + peer_mps: int, + ) -> Processor: + del peer_retransmission_timeout, peer_monitor_timeout # Unused. + if mode == TransmissionMode.BASIC: + return Processor(channel) + elif mode == TransmissionMode.ENHANCED_RETRANSMISSION: + return EnhancedRetransmissionProcessor( + channel, peer_tx_window_size, peer_max_retransmission, peer_mps + ) + raise InvalidArgumentError("Mode %s is not implemented", mode.name) diff --git a/bumble/utils.py b/bumble/utils.py index fcb7429b..17bb6c3a 100644 --- a/bumble/utils.py +++ b/bumble/utils.py @@ -533,3 +533,20 @@ class IntConvertible(Protocol): def __init__(self, value: int) -> None: ... def __int__(self) -> int: ... + + +# ----------------------------------------------------------------------------- +def crc_16(data: bytes) -> int: + """Calculate CRC-16-IBM of given data. + + Polynomial = x^16 + x^15 + x^2 + 1 = 0x8005 or 0xA001(Reversed) + """ + crc = 0x0000 + for byte in data: + crc ^= byte + for _ in range(8): + if (crc & 0x0001) > 0: + crc = (crc >> 1) ^ 0xA001 + else: + crc = crc >> 1 + return crc diff --git a/examples/run_l2cap_server.py b/examples/run_classic_l2cap.py similarity index 57% rename from examples/run_l2cap_server.py rename to examples/run_classic_l2cap.py index e8b09039..b780065a 100644 --- a/examples/run_l2cap_server.py +++ b/examples/run_classic_l2cap.py @@ -12,35 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. + # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + +import argparse import asyncio import sys -from typing import Optional - import bumble.logging +from bumble import core, l2cap from bumble.device import Device -from bumble import l2cap from bumble.transport import open_transport # ----------------------------------------------------------------------------- -async def main() -> None: +async def main( + config_file: str, transport: str, mode: int, peer_address: str, psm: int +) -> None: print('<<< connecting to HCI...') - async with await open_transport(sys.argv[2]) as hci_transport: + async with await open_transport(transport) as hci_transport: print('<<< connected') # Create a device device = Device.from_config_file_with_hci( - sys.argv[1], hci_transport.source, hci_transport.sink + config_file, hci_transport.source, hci_transport.sink ) device.classic_enabled = True device.l2cap_channel_manager.extended_features.add( l2cap.L2CAP_Information_Request.ExtendedFeatures.ENHANCED_RETRANSMISSION_MODE ) + device.l2cap_channel_manager.extended_features.add( + l2cap.L2CAP_Information_Request.ExtendedFeatures.FCS_OPTION + ) # Start the controller await device.power_on() @@ -49,7 +56,7 @@ async def main() -> None: await device.set_discoverable(True) await device.set_connectable(True) - channels: list[l2cap.ClassicChannel] = [] + active_channel: l2cap.ClassicChannel | None = None def on_connection(channel: l2cap.ClassicChannel): @@ -57,25 +64,44 @@ async def main() -> None: print(f'<<< {sdu.decode()}') channel.sink = on_sdu - if channels: - channels.clear() - channels.append(channel) + nonlocal active_channel + active_channel = channel server = device.create_l2cap_server( spec=l2cap.ClassicChannelSpec( - mode=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION + mode=l2cap.TransmissionMode(mode), psm=psm if psm else None ), handler=on_connection, ) print(f'Listen L2CAP on channel {server.psm}') + if peer_address: + connection = await device.connect( + peer_address, transport=core.PhysicalTransport.BR_EDR + ) + channel = await connection.create_l2cap_channel( + spec=l2cap.ClassicChannelSpec( + mode=l2cap.TransmissionMode(mode), psm=psm + ) + ) + active_channel = channel + while sdu := await asyncio.to_thread(lambda: input('>>> ')): - if channels: - channels[0].write(sdu.encode()) + if active_channel: + active_channel.write(sdu.encode()) await hci_transport.source.terminated # ----------------------------------------------------------------------------- bumble.logging.setup_basic_logging('INFO') -asyncio.run(main()) +parser = argparse.ArgumentParser() +parser.add_argument('config') +parser.add_argument('transport') +parser.add_argument('-p', '--peer_address', default='') +parser.add_argument( + '-m', '--mode', default=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION +) +parser.add_argument('--psm', default=0) +args = parser.parse_args(sys.argv[1:]) +asyncio.run(main(args.config, args.transport, args.mode, args.peer_address, args.psm)) diff --git a/tests/l2cap_test.py b/tests/l2cap_test.py index 45b94213..bfc566ef 100644 --- a/tests/l2cap_test.py +++ b/tests/l2cap_test.py @@ -19,6 +19,7 @@ import asyncio import logging import os import random +import struct import pytest @@ -344,13 +345,16 @@ async def test_mtu(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_enhanced_retransmission_channel(): +async def test_enhanced_retransmission_mode(): devices = TwoDevices() await devices.setup_connection() server_channels = asyncio.Queue[l2cap.ClassicChannel]() server = devices.devices[1].create_l2cap_server( - spec=l2cap.ClassicChannelSpec(), handler=server_channels.put_nowait + spec=l2cap.ClassicChannelSpec( + mode=l2cap.TransmissionMode.ENHANCED_RETRANSMISSION + ), + handler=server_channels.put_nowait, ) client_channel = await devices.connections[0].create_l2cap_channel( spec=l2cap.ClassicChannelSpec( @@ -358,21 +362,55 @@ async def test_enhanced_retransmission_channel(): ) ) server_channel = await server_channels.get() - assert isinstance(client_channel.processor, l2cap.EnhancedRetransmissionProcessor) - assert isinstance(server_channel.processor, l2cap.EnhancedRetransmissionProcessor) sinks = [asyncio.Queue[bytes]() for _ in range(2)] server_channel.sink = sinks[0].put_nowait client_channel.sink = sinks[1].put_nowait - for _ in range(128): - server_channel.write(b'123') - for _ in range(128): - assert (await sinks[1].get()) == b'123' - for _ in range(128): - client_channel.write(b'456') - for _ in range(128): - assert (await sinks[0].get()) == b'456' + for i in range(128): + server_channel.write(struct.pack('