diff --git a/bumble/helpers.py b/bumble/helpers.py index 80a376e..0a304d2 100644 --- a/bumble/helpers.py +++ b/bumble/helpers.py @@ -34,9 +34,8 @@ from bumble.att import ATT_CID, ATT_PDU from bumble.smp import SMP_CID, SMP_Command from bumble.core import name_or_number from bumble.l2cap import ( + CommandCode, L2CAP_PDU, - L2CAP_CONNECTION_REQUEST, - L2CAP_CONNECTION_RESPONSE, L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID, L2CAP_Control_Frame, @@ -106,14 +105,14 @@ class PacketTracer: self.analyzer.emit(control_frame) # Check if this signals a new channel - if control_frame.code == L2CAP_CONNECTION_REQUEST: + if control_frame.code == CommandCode.L2CAP_CONNECTION_REQUEST: connection_request = cast(L2CAP_Connection_Request, control_frame) self.psms[connection_request.source_cid] = connection_request.psm - elif control_frame.code == L2CAP_CONNECTION_RESPONSE: + elif control_frame.code == CommandCode.L2CAP_CONNECTION_RESPONSE: connection_response = cast(L2CAP_Connection_Response, control_frame) if ( connection_response.result - == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL + == L2CAP_Connection_Response.Result.CONNECTION_SUCCESSFUL ): if self.peer and ( psm := self.peer.psms.get(connection_response.source_cid) diff --git a/bumble/l2cap.py b/bumble/l2cap.py index eac65d6..856b7f3 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -30,10 +30,13 @@ from typing import ( Union, Iterable, SupportsBytes, + TypeVar, + ClassVar, TYPE_CHECKING, ) from bumble import utils +from bumble import hci from bumble.colors import color from bumble.core import ( InvalidStateError, @@ -42,13 +45,6 @@ from bumble.core import ( OutOfResourcesError, ProtocolError, ) -from bumble.hci import ( - HCI_LE_Connection_Update_Command, - HCI_Object, - Role, - key_with_value, - name_or_number, -) if TYPE_CHECKING: from bumble.device import Connection @@ -93,54 +89,29 @@ L2CAP_PSM_DYNAMIC_RANGE_END = 0xFFFF L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080 L2CAP_LE_PSM_DYNAMIC_RANGE_END = 0x00FF -# Frame types -L2CAP_COMMAND_REJECT = 0x01 -L2CAP_CONNECTION_REQUEST = 0x02 -L2CAP_CONNECTION_RESPONSE = 0x03 -L2CAP_CONFIGURE_REQUEST = 0x04 -L2CAP_CONFIGURE_RESPONSE = 0x05 -L2CAP_DISCONNECTION_REQUEST = 0x06 -L2CAP_DISCONNECTION_RESPONSE = 0x07 -L2CAP_ECHO_REQUEST = 0x08 -L2CAP_ECHO_RESPONSE = 0x09 -L2CAP_INFORMATION_REQUEST = 0x0A -L2CAP_INFORMATION_RESPONSE = 0x0B -L2CAP_CREATE_CHANNEL_REQUEST = 0x0C -L2CAP_CREATE_CHANNEL_RESPONSE = 0x0D -L2CAP_MOVE_CHANNEL_REQUEST = 0x0E -L2CAP_MOVE_CHANNEL_RESPONSE = 0x0F -L2CAP_MOVE_CHANNEL_CONFIRMATION = 0x10 -L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE = 0x11 -L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST = 0x12 -L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13 -L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14 -L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15 -L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16 - -L2CAP_CONTROL_FRAME_NAMES = { - L2CAP_COMMAND_REJECT: 'L2CAP_COMMAND_REJECT', - L2CAP_CONNECTION_REQUEST: 'L2CAP_CONNECTION_REQUEST', - L2CAP_CONNECTION_RESPONSE: 'L2CAP_CONNECTION_RESPONSE', - L2CAP_CONFIGURE_REQUEST: 'L2CAP_CONFIGURE_REQUEST', - L2CAP_CONFIGURE_RESPONSE: 'L2CAP_CONFIGURE_RESPONSE', - L2CAP_DISCONNECTION_REQUEST: 'L2CAP_DISCONNECTION_REQUEST', - L2CAP_DISCONNECTION_RESPONSE: 'L2CAP_DISCONNECTION_RESPONSE', - L2CAP_ECHO_REQUEST: 'L2CAP_ECHO_REQUEST', - L2CAP_ECHO_RESPONSE: 'L2CAP_ECHO_RESPONSE', - L2CAP_INFORMATION_REQUEST: 'L2CAP_INFORMATION_REQUEST', - L2CAP_INFORMATION_RESPONSE: 'L2CAP_INFORMATION_RESPONSE', - L2CAP_CREATE_CHANNEL_REQUEST: 'L2CAP_CREATE_CHANNEL_REQUEST', - L2CAP_CREATE_CHANNEL_RESPONSE: 'L2CAP_CREATE_CHANNEL_RESPONSE', - L2CAP_MOVE_CHANNEL_REQUEST: 'L2CAP_MOVE_CHANNEL_REQUEST', - L2CAP_MOVE_CHANNEL_RESPONSE: 'L2CAP_MOVE_CHANNEL_RESPONSE', - L2CAP_MOVE_CHANNEL_CONFIRMATION: 'L2CAP_MOVE_CHANNEL_CONFIRMATION', - L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE: 'L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE', - L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST: 'L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST', - L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE: 'L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE', - L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST: 'L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST', - L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE: 'L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE', - L2CAP_LE_FLOW_CONTROL_CREDIT: 'L2CAP_LE_FLOW_CONTROL_CREDIT' -} +class CommandCode(hci.SpecableEnum): + L2CAP_COMMAND_REJECT = 0x01 + L2CAP_CONNECTION_REQUEST = 0x02 + L2CAP_CONNECTION_RESPONSE = 0x03 + L2CAP_CONFIGURE_REQUEST = 0x04 + L2CAP_CONFIGURE_RESPONSE = 0x05 + L2CAP_DISCONNECTION_REQUEST = 0x06 + L2CAP_DISCONNECTION_RESPONSE = 0x07 + L2CAP_ECHO_REQUEST = 0x08 + L2CAP_ECHO_RESPONSE = 0x09 + L2CAP_INFORMATION_REQUEST = 0x0A + L2CAP_INFORMATION_RESPONSE = 0x0B + L2CAP_CREATE_CHANNEL_REQUEST = 0x0C + L2CAP_CREATE_CHANNEL_RESPONSE = 0x0D + L2CAP_MOVE_CHANNEL_REQUEST = 0x0E + L2CAP_MOVE_CHANNEL_RESPONSE = 0x0F + L2CAP_MOVE_CHANNEL_CONFIRMATION = 0x10 + L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE = 0x11 + L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST = 0x12 + L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13 + L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14 + L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15 + L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16 L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000 L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001 @@ -232,43 +203,44 @@ class L2CAP_PDU: # ----------------------------------------------------------------------------- +@dataclasses.dataclass class L2CAP_Control_Frame: ''' See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS ''' - classes: dict[int, type[L2CAP_Control_Frame]] = {} - code = 0 - name: str + classes: ClassVar[dict[int, type[L2CAP_Control_Frame]]] = {} + fields: ClassVar[hci.Fields] = () + code: int = dataclasses.field(default=0, init=False) + name: str = dataclasses.field(default='', init=False) + _data: Optional[bytes] = dataclasses.field(default=None, init=False) - @staticmethod - def from_bytes(pdu: bytes) -> L2CAP_Control_Frame: - code = pdu[0] + identifier: int - cls = L2CAP_Control_Frame.classes.get(code) - if cls is None: + @classmethod + def from_bytes(cls, pdu: bytes) -> L2CAP_Control_Frame: + code, identifier, length = struct.unpack_from(" str: - return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code) + return frame @staticmethod def decode_configuration_options(data: bytes) -> list[tuple[int, bytes]]: @@ -288,112 +260,84 @@ class L2CAP_Control_Frame: [bytes([option[0], len(option[1])]) + option[1] for option in options] ) - @staticmethod - def subclass(fields): - def inner(cls): - cls.name = cls.__name__.upper() - cls.code = key_with_value(L2CAP_CONTROL_FRAME_NAMES, cls.name) - if cls.code is None: - raise KeyError( - f'Control Frame name {cls.name} ' - 'not found in L2CAP_CONTROL_FRAME_NAMES' - ) - cls.fields = fields + _ControlFrame = TypeVar('_ControlFrame', bound='L2CAP_Control_Frame') - # Register a factory for this class - L2CAP_Control_Frame.classes[cls.code] = cls + @classmethod + def subclass(cls, subclass: type[_ControlFrame]) -> type[_ControlFrame]: + subclass.name = subclass.__name__.upper() + subclass.code = CommandCode[subclass.name] + subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass) - return cls + # Register a factory for this class + L2CAP_Control_Frame.classes[subclass.code] = subclass - return inner + return subclass - def __init__(self, pdu=None, **kwargs) -> None: + def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None: self.identifier = kwargs.get('identifier', 0) - if hasattr(self, 'fields'): + if self.fields: if kwargs: - HCI_Object.init_from_fields(self, self.fields, kwargs) + hci.HCI_Object.init_from_fields(self, self.fields, kwargs) if pdu is None: - data = HCI_Object.dict_to_bytes(kwargs, self.fields) + data = hci.HCI_Object.dict_to_bytes(kwargs, self.fields) pdu = ( bytes([self.code, self.identifier]) + struct.pack(' bytes: + if self._data is None: + self._data = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields) + return self._data + + @data.setter + def data(self, parameters: bytes) -> None: + self._data = parameters def __bytes__(self) -> bytes: - return self.pdu + return ( + struct.pack(' str: result = f'{color(self.name, "yellow")} [ID={self.identifier}]' if fields := getattr(self, 'fields', None): - result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') + result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ') else: - if len(self.pdu) > 1: - result += f': {self.pdu.hex()}' + if len(self.data) > 1: + result += f': {self.data.hex()}' return result # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - # pylint: disable=unnecessary-lambda - [ - ( - 'reason', - {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)}, - ), - ('data', '*'), - ] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Command_Reject(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.1 COMMAND REJECT ''' - COMMAND_NOT_UNDERSTOOD = 0x0000 - SIGNALING_MTU_EXCEEDED = 0x0001 - INVALID_CID_IN_REQUEST = 0x0002 + class Reason(hci.SpecableEnum): + COMMAND_NOT_UNDERSTOOD = 0x0000 + SIGNALING_MTU_EXCEEDED = 0x0001 + INVALID_CID_IN_REQUEST = 0x0002 - REASON_NAMES = { - COMMAND_NOT_UNDERSTOOD: 'COMMAND_NOT_UNDERSTOOD', - SIGNALING_MTU_EXCEEDED: 'SIGNALING_MTU_EXCEEDED', - INVALID_CID_IN_REQUEST: 'INVALID_CID_IN_REQUEST', - } - - @staticmethod - def reason_name(reason: int) -> str: - return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason) + reason: int = dataclasses.field(metadata=Reason.type_metadata(2)) + data: bytes = dataclasses.field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - # pylint: disable=unnecessary-lambda - [ - ( - 'psm', - { - 'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm( - data, offset - ), - 'serializer': lambda value: L2CAP_Connection_Request.serialize_psm( - value - ), - }, - ), - ('source_cid', 2), - ] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Connection_Request(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST ''' - psm: int - source_cid: int - @staticmethod def parse_psm(data: bytes, offset: int = 0) -> tuple[int, int]: psm_length = 2 @@ -416,156 +360,138 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame): return serialized + psm: int = dataclasses.field( + metadata=hci.metadata( + { + 'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm( + data, offset + ), + 'serializer': lambda value: L2CAP_Connection_Request.serialize_psm( + value + ), + } + ) + ) + source_cid: int = dataclasses.field(metadata=hci.metadata(2)) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - # pylint: disable=unnecessary-lambda - [ - ('destination_cid', 2), - ('source_cid', 2), - ( - 'result', - {'size': 2, 'mapper': lambda x: L2CAP_Connection_Response.result_name(x)}, - ), - ('status', 2), - ] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Connection_Response(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE ''' - source_cid: int - destination_cid: int - status: int - result: int + class Result(hci.SpecableEnum): + CONNECTION_SUCCESSFUL = 0x0000 + CONNECTION_PENDING = 0x0001 + CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002 + CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003 + CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 + CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006 + CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007 + CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B - CONNECTION_SUCCESSFUL = 0x0000 - CONNECTION_PENDING = 0x0001 - CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002 - CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003 - CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 - CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006 - CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007 - CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B - - # pylint: disable=line-too-long - RESULT_NAMES = { - CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL', - CONNECTION_PENDING: 'CONNECTION_PENDING', - CONNECTION_REFUSED_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED', - CONNECTION_REFUSED_SECURITY_BLOCK: 'CONNECTION_REFUSED_SECURITY_BLOCK', - CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE', - CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID', - CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED', - CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS', - } - - @staticmethod - def result_name(result: int) -> str: - return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result) + destination_cid: int = dataclasses.field(metadata=hci.metadata(2)) + source_cid: int = dataclasses.field(metadata=hci.metadata(2)) + result: int = dataclasses.field(metadata=Result.type_metadata(2)) + status: int = dataclasses.field(metadata=hci.metadata(2)) # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('flags', 2), ('options', '*')]) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Configure_Request(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.4 CONFIGURATION REQUEST ''' + destination_cid: int = dataclasses.field(metadata=hci.metadata(2)) + flags: int = dataclasses.field(metadata=hci.metadata(2)) + options: bytes = dataclasses.field(metadata=hci.metadata('*')) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - # pylint: disable=unnecessary-lambda - [ - ('source_cid', 2), - ('flags', 2), - ( - 'result', - {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)}, - ), - ('options', '*'), - ] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Configure_Response(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.5 CONFIGURATION RESPONSE ''' - SUCCESS = 0x0000 - FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001 - FAILURE_REJECTED = 0x0002 - FAILURE_UNKNOWN_OPTIONS = 0x0003 - PENDING = 0x0004 - FAILURE_FLOW_SPEC_REJECTED = 0x0005 + class Result(hci.SpecableEnum): + SUCCESS = 0x0000 + FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001 + FAILURE_REJECTED = 0x0002 + FAILURE_UNKNOWN_OPTIONS = 0x0003 + PENDING = 0x0004 + FAILURE_FLOW_SPEC_REJECTED = 0x0005 - RESULT_NAMES = { - SUCCESS: 'SUCCESS', - FAILURE_UNACCEPTABLE_PARAMETERS: 'FAILURE_UNACCEPTABLE_PARAMETERS', - FAILURE_REJECTED: 'FAILURE_REJECTED', - FAILURE_UNKNOWN_OPTIONS: 'FAILURE_UNKNOWN_OPTIONS', - PENDING: 'PENDING', - FAILURE_FLOW_SPEC_REJECTED: 'FAILURE_FLOW_SPEC_REJECTED', - } - - @staticmethod - def result_name(result: int) -> str: - return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result) + source_cid: int = dataclasses.field(metadata=hci.metadata(2)) + flags: int = dataclasses.field(metadata=hci.metadata(2)) + result: int = dataclasses.field(metadata=Result.type_metadata(2)) + options: bytes = dataclasses.field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)]) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Disconnection_Request(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.6 DISCONNECTION REQUEST ''' + destination_cid: int = dataclasses.field(metadata=hci.metadata(2)) + source_cid: int = dataclasses.field(metadata=hci.metadata(2)) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)]) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Disconnection_Response(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.7 DISCONNECTION RESPONSE ''' + destination_cid: int = dataclasses.field(metadata=hci.metadata(2)) + source_cid: int = dataclasses.field(metadata=hci.metadata(2)) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass([('data', '*')]) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Echo_Request(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.8 ECHO REQUEST ''' + data: bytes = dataclasses.field(metadata=hci.metadata('*')) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass([('data', '*')]) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Echo_Response(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.9 ECHO RESPONSE ''' + data: bytes = dataclasses.field(metadata=hci.metadata('*')) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - [ - ( - 'info_type', - { - 'size': 2, - # pylint: disable-next=unnecessary-lambda - 'mapper': lambda x: L2CAP_Information_Request.info_type_name(x), - }, - ) - ] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Information_Request(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST ''' - CONNECTIONLESS_MTU = 0x0001 - EXTENDED_FEATURES_SUPPORTED = 0x0002 - FIXED_CHANNELS_SUPPORTED = 0x0003 + class InfoType(hci.SpecableEnum): + CONNECTIONLESS_MTU = 0x0001 + EXTENDED_FEATURES_SUPPORTED = 0x0002 + FIXED_CHANNELS_SUPPORTED = 0x0003 EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001 EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002 @@ -579,139 +505,108 @@ class L2CAP_Information_Request(L2CAP_Control_Frame): EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200 EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400 - INFO_TYPE_NAMES = { - CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU', - EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED', - FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED', - } - - @staticmethod - def info_type_name(info_type: int) -> str: - return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type) + info_type: int = dataclasses.field(metadata=InfoType.type_metadata(2)) # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - [ - ('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}), - ( - 'result', - # pylint: disable-next=unnecessary-lambda - {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)}, - ), - ('data', '*'), - ] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Information_Response(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE ''' - SUCCESS = 0x00 - NOT_SUPPORTED = 0x01 + class Result(hci.SpecableEnum): + SUCCESS = 0x00 + NOT_SUPPORTED = 0x01 - RESULT_NAMES = {SUCCESS: 'SUCCESS', NOT_SUPPORTED: 'NOT_SUPPORTED'} - - @staticmethod - def result_name(result: int) -> str: - return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result) + info_type: int = dataclasses.field( + metadata=L2CAP_Information_Request.InfoType.type_metadata(2) + ) + result: int = dataclasses.field(metadata=Result.type_metadata(2)) + data: bytes = dataclasses.field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - [('interval_min', 2), ('interval_max', 2), ('latency', 2), ('timeout', 2)] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.20 CONNECTION PARAMETER UPDATE REQUEST ''' + interval_min: int = dataclasses.field(metadata=hci.metadata(2)) + interval_max: int = dataclasses.field(metadata=hci.metadata(2)) + latency: int = dataclasses.field(metadata=hci.metadata(2)) + timeout: int = dataclasses.field(metadata=hci.metadata(2)) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass([('result', 2)]) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_Connection_Parameter_Update_Response(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.21 CONNECTION PARAMETER UPDATE RESPONSE ''' + result: int = dataclasses.field(metadata=hci.metadata(2)) + # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - [('le_psm', 2), ('source_cid', 2), ('mtu', 2), ('mps', 2), ('initial_credits', 2)] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.22 LE CREDIT BASED CONNECTION REQUEST (CODE 0x14) ''' - source_cid: int + le_psm: int = dataclasses.field(metadata=hci.metadata(2)) + source_cid: int = dataclasses.field(metadata=hci.metadata(2)) + mtu: int = dataclasses.field(metadata=hci.metadata(2)) + mps: int = dataclasses.field(metadata=hci.metadata(2)) + initial_credits: int = dataclasses.field(metadata=hci.metadata(2)) # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass( - # pylint: disable=unnecessary-lambda,line-too-long - [ - ('destination_cid', 2), - ('mtu', 2), - ('mps', 2), - ('initial_credits', 2), - ( - 'result', - { - 'size': 2, - 'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name( - x - ), - }, - ), - ] -) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.23 LE CREDIT BASED CONNECTION RESPONSE (CODE 0x15) ''' - CONNECTION_SUCCESSFUL = 0x0000 - CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002 - CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 - CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005 - CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006 - CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007 - CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008 - CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009 - CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A - CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B + class Result(hci.SpecableEnum): + CONNECTION_SUCCESSFUL = 0x0000 + CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002 + CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 + CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005 + CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006 + CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007 + CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008 + CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009 + CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A + CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B - # pylint: disable=line-too-long - RESULT_NAMES = { - CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL', - CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED', - CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE', - CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION', - CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION', - CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE', - CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION', - CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID', - CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED', - CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS', - } - - @staticmethod - def result_name(result: int) -> str: - return name_or_number( - L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result - ) + destination_cid: int = dataclasses.field(metadata=hci.metadata(2)) + mtu: int = dataclasses.field(metadata=hci.metadata(2)) + mps: int = dataclasses.field(metadata=hci.metadata(2)) + initial_credits: int = dataclasses.field(metadata=hci.metadata(2)) + result: int = dataclasses.field(metadata=Result.type_metadata(2)) # ----------------------------------------------------------------------------- -@L2CAP_Control_Frame.subclass([('cid', 2), ('credits', 2)]) +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame): ''' See Bluetooth spec @ Vol 3, Part A - 4.24 LE FLOW CONTROL CREDIT (CODE 0x16) ''' + cid: int = dataclasses.field(metadata=hci.metadata(2)) + credits: int = dataclasses.field(metadata=hci.metadata(2)) + # ----------------------------------------------------------------------------- class ClassicChannel(utils.EventEmitter): @@ -863,7 +758,7 @@ class ClassicChannel(utils.EventEmitter): ) ) - def on_connection_request(self, request) -> None: + def on_connection_request(self, request: L2CAP_Connection_Request) -> None: self.destination_cid = request.source_cid self._change_state(self.State.WAIT_CONNECT) self.send_control_frame( @@ -871,7 +766,7 @@ class ClassicChannel(utils.EventEmitter): identifier=request.identifier, destination_cid=self.source_cid, source_cid=self.destination_cid, - result=L2CAP_Connection_Response.CONNECTION_SUCCESSFUL, + result=L2CAP_Connection_Response.Result.CONNECTION_SUCCESSFUL, status=0x0000, ) ) @@ -879,30 +774,31 @@ class ClassicChannel(utils.EventEmitter): self.send_configure_request() self._change_state(self.State.WAIT_CONFIG_REQ_RSP) - def on_connection_response(self, response): + def on_connection_response(self, response: L2CAP_Connection_Response): if self.state != self.State.WAIT_CONNECT_RSP: logger.warning(color('invalid state', 'red')) return - if response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL: + if response.result == L2CAP_Connection_Response.Result.CONNECTION_SUCCESSFUL: self.destination_cid = response.destination_cid self._change_state(self.State.WAIT_CONFIG) self.send_configure_request() self._change_state(self.State.WAIT_CONFIG_REQ_RSP) - elif response.result == L2CAP_Connection_Response.CONNECTION_PENDING: + elif response.result == L2CAP_Connection_Response.Result.CONNECTION_PENDING: pass else: self._change_state(self.State.CLOSED) - self.connection_result.set_exception( - ProtocolError( - response.result, - 'l2cap', - L2CAP_Connection_Response.result_name(response.result), + if self.connection_result: + self.connection_result.set_exception( + ProtocolError( + response.result, + 'l2cap', + L2CAP_Connection_Response.Result(response.result).name, + ) ) - ) - self.connection_result = None + self.connection_result = None - def on_configure_request(self, request) -> None: + def on_configure_request(self, request: L2CAP_Configure_Request) -> None: if self.state not in ( self.State.WAIT_CONFIG, self.State.WAIT_CONFIG_REQ, @@ -923,7 +819,7 @@ class ClassicChannel(utils.EventEmitter): identifier=request.identifier, source_cid=self.destination_cid, flags=0x0000, - result=L2CAP_Configure_Response.SUCCESS, + result=L2CAP_Configure_Response.Result.SUCCESS, options=request.options, # TODO: don't accept everything blindly ) ) @@ -940,8 +836,8 @@ class ClassicChannel(utils.EventEmitter): elif self.state == self.State.WAIT_CONFIG_REQ_RSP: self._change_state(self.State.WAIT_CONFIG_RSP) - def on_configure_response(self, response) -> None: - if response.result == L2CAP_Configure_Response.SUCCESS: + def on_configure_response(self, response: L2CAP_Configure_Response) -> None: + if response.result == L2CAP_Configure_Response.Result.SUCCESS: if self.state == self.State.WAIT_CONFIG_REQ_RSP: self._change_state(self.State.WAIT_CONFIG_REQ) elif self.state in ( @@ -956,7 +852,8 @@ class ClassicChannel(utils.EventEmitter): else: logger.warning(color('invalid state', 'red')) elif ( - response.result == L2CAP_Configure_Response.FAILURE_UNACCEPTABLE_PARAMETERS + response.result + == L2CAP_Configure_Response.Result.FAILURE_UNACCEPTABLE_PARAMETERS ): # Re-configure with what's suggested in the response self.send_control_frame( @@ -971,13 +868,13 @@ class ClassicChannel(utils.EventEmitter): logger.warning( color( '!!! configuration rejected: ' - f'{L2CAP_Configure_Response.result_name(response.result)}', + f'{L2CAP_Configure_Response.Result(response.result).name}', 'red', ) ) # TODO: decide how to fail gracefully - def on_disconnection_request(self, request) -> None: + def on_disconnection_request(self, request: L2CAP_Disconnection_Request) -> None: if self.state in (self.State.OPEN, self.State.WAIT_DISCONNECT): self.send_control_frame( L2CAP_Disconnection_Response( @@ -992,7 +889,7 @@ class ClassicChannel(utils.EventEmitter): else: logger.warning(color('invalid state', 'red')) - def on_disconnection_response(self, response) -> None: + def on_disconnection_response(self, response: L2CAP_Disconnection_Response) -> None: if self.state != self.State.WAIT_DISCONNECT: logger.warning(color('invalid state', 'red')) return @@ -1225,7 +1122,9 @@ class LeCreditBasedChannel(utils.EventEmitter): self.in_sdu = None self.in_sdu_length = 0 - def on_connection_response(self, response) -> None: + def on_connection_response( + self, response: L2CAP_LE_Credit_Based_Connection_Response + ) -> None: # Look for a matching pending response result if self.connection_result is None: logger.warning( @@ -1235,7 +1134,7 @@ class LeCreditBasedChannel(utils.EventEmitter): if ( response.result - == L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL + == L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_SUCCESSFUL ): self.destination_cid = response.destination_cid self.peer_mtu = response.mtu @@ -1249,9 +1148,9 @@ class LeCreditBasedChannel(utils.EventEmitter): ProtocolError( response.result, 'l2cap', - L2CAP_LE_Credit_Based_Connection_Response.result_name( + L2CAP_LE_Credit_Based_Connection_Response.Result( response.result - ), + ).name, ) ) self._change_state(self.State.CONNECTION_ERROR) @@ -1266,7 +1165,7 @@ class LeCreditBasedChannel(utils.EventEmitter): # Try to send more data if we have any queued up self.process_output() - def on_disconnection_request(self, request) -> None: + def on_disconnection_request(self, request: L2CAP_Disconnection_Request) -> None: self.send_control_frame( L2CAP_Disconnection_Response( identifier=request.identifier, @@ -1277,7 +1176,7 @@ class LeCreditBasedChannel(utils.EventEmitter): self._change_state(self.State.DISCONNECTED) self.flush_output() - def on_disconnection_response(self, response) -> None: + def on_disconnection_response(self, response: L2CAP_Disconnection_Response) -> None: if self.state != self.State.DISCONNECTING: logger.warning(color('invalid state', 'red')) return @@ -1726,12 +1625,12 @@ class ChannelManager: ) def on_l2cap_command_reject( - self, _connection: Connection, _cid: int, packet + self, _connection: Connection, _cid: int, packet: L2CAP_Command_Reject ) -> None: logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}') def on_l2cap_connection_request( - self, connection: Connection, cid: int, request + self, connection: Connection, cid: int, request: L2CAP_Connection_Request ) -> None: # Check if there's a server for this PSM server = self.servers.get(request.psm) @@ -1748,7 +1647,7 @@ class ChannelManager: destination_cid=request.source_cid, source_cid=0, # pylint: disable=line-too-long - result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, + result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, status=0x0000, ), ) @@ -1779,13 +1678,16 @@ class ChannelManager: destination_cid=request.source_cid, source_cid=0, # pylint: disable=line-too-long - result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED, + result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_PSM_NOT_SUPPORTED, status=0x0000, ), ) def on_l2cap_connection_response( - self, connection: Connection, cid: int, response + self, + connection: Connection, + cid: int, + response: L2CAP_Connection_Response, ) -> None: if ( channel := self.find_channel(connection.handle, response.source_cid) @@ -1802,7 +1704,7 @@ class ChannelManager: channel.on_connection_response(response) def on_l2cap_configure_request( - self, connection: Connection, cid: int, request + self, connection: Connection, cid: int, request: L2CAP_Configure_Request ) -> None: if ( channel := self.find_channel(connection.handle, request.destination_cid) @@ -1819,7 +1721,7 @@ class ChannelManager: channel.on_configure_request(request) def on_l2cap_configure_response( - self, connection: Connection, cid: int, response + self, connection: Connection, cid: int, response: L2CAP_Configure_Response ) -> None: if ( channel := self.find_channel(connection.handle, response.source_cid) @@ -1836,7 +1738,7 @@ class ChannelManager: channel.on_configure_response(response) def on_l2cap_disconnection_request( - self, connection: Connection, cid: int, request + self, connection: Connection, cid: int, request: L2CAP_Disconnection_Request ) -> None: if ( channel := self.find_channel(connection.handle, request.destination_cid) @@ -1853,7 +1755,7 @@ class ChannelManager: channel.on_disconnection_request(request) def on_l2cap_disconnection_response( - self, connection: Connection, cid: int, response + self, connection: Connection, cid: int, response: L2CAP_Disconnection_Response ) -> None: if ( channel := self.find_channel(connection.handle, response.source_cid) @@ -1869,7 +1771,9 @@ class ChannelManager: channel.on_disconnection_response(response) - def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None: + def on_l2cap_echo_request( + self, connection: Connection, cid: int, request: L2CAP_Echo_Request + ) -> None: logger.debug(f'<<< Echo request: data={request.data.hex()}') self.send_control_frame( connection, @@ -1878,25 +1782,31 @@ class ChannelManager: ) def on_l2cap_echo_response( - self, _connection: Connection, _cid: int, response + self, _connection: Connection, _cid: int, response: L2CAP_Echo_Response ) -> None: logger.debug(f'<<< Echo response: data={response.data.hex()}') # TODO notify listeners def on_l2cap_information_request( - self, connection: Connection, cid: int, request + self, connection: Connection, cid: int, request: L2CAP_Information_Request ) -> None: - if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU: - result = L2CAP_Information_Response.SUCCESS + if request.info_type == L2CAP_Information_Request.InfoType.CONNECTIONLESS_MTU: + result = L2CAP_Information_Response.Result.SUCCESS data = self.connectionless_mtu.to_bytes(2, 'little') - elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED: - result = L2CAP_Information_Response.SUCCESS + elif ( + request.info_type + == L2CAP_Information_Request.InfoType.EXTENDED_FEATURES_SUPPORTED + ): + result = L2CAP_Information_Response.Result.SUCCESS data = sum(self.extended_features).to_bytes(4, 'little') - elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED: - result = L2CAP_Information_Response.SUCCESS + elif ( + request.info_type + == L2CAP_Information_Request.InfoType.FIXED_CHANNELS_SUPPORTED + ): + result = L2CAP_Information_Response.Result.SUCCESS data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little') else: - result = L2CAP_Information_Response.NOT_SUPPORTED + result = L2CAP_Information_Response.Result.NOT_SUPPORTED data = b'' self.send_control_frame( @@ -1911,9 +1821,12 @@ class ChannelManager: ) def on_l2cap_connection_parameter_update_request( - self, connection: Connection, cid: int, request + self, + connection: Connection, + cid: int, + request: L2CAP_Connection_Parameter_Update_Request, ): - if connection.role == Role.CENTRAL: + if connection.role == hci.Role.CENTRAL: self.send_control_frame( connection, cid, @@ -1923,7 +1836,7 @@ class ChannelManager: ), ) self.host.send_command_sync( - HCI_LE_Connection_Update_Command( + hci.HCI_LE_Connection_Update_Command( connection_handle=connection.handle, connection_interval_min=request.interval_min, connection_interval_max=request.interval_max, @@ -1961,6 +1874,7 @@ class ChannelManager: connection, L2CAP_LE_SIGNALING_CID, L2CAP_Connection_Parameter_Update_Request( + identifier=self.next_identifier(connection), interval_min=interval_min, interval_max=interval_max, latency=latency, @@ -1970,7 +1884,10 @@ class ChannelManager: return await self.connection_parameters_update_response def on_l2cap_connection_parameter_update_response( - self, connection: Connection, cid: int, response + self, + connection: Connection, + cid: int, + response: L2CAP_Connection_Parameter_Update_Response, ) -> None: if self.connection_parameters_update_response: self.connection_parameters_update_response.set_result(response.result) @@ -1984,7 +1901,10 @@ class ChannelManager: ) def on_l2cap_le_credit_based_connection_request( - self, connection: Connection, cid: int, request + self, + connection: Connection, + cid: int, + request: L2CAP_LE_Credit_Based_Connection_Request, ) -> None: if request.le_psm in self.le_coc_servers: server = self.le_coc_servers[request.le_psm] @@ -2005,7 +1925,7 @@ class ChannelManager: mps=server.mps, initial_credits=0, # pylint: disable=line-too-long - result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED, + result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED, ), ) return @@ -2024,7 +1944,7 @@ class ChannelManager: mps=server.mps, initial_credits=0, # pylint: disable=line-too-long - result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, + result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, ), ) return @@ -2062,7 +1982,7 @@ class ChannelManager: mps=server.mps, initial_credits=server.max_credits, # pylint: disable=line-too-long - result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL, + result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_SUCCESSFUL, ), ) @@ -2083,12 +2003,15 @@ class ChannelManager: mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS, initial_credits=0, # pylint: disable=line-too-long - result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED, + result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED, ), ) def on_l2cap_le_credit_based_connection_response( - self, connection: Connection, _cid: int, response + self, + connection: Connection, + _cid: int, + response: L2CAP_LE_Credit_Based_Connection_Response, ) -> None: # Find the pending request by identifier request = self.le_coc_requests.get(response.identifier) @@ -2113,7 +2036,7 @@ class ChannelManager: channel.on_connection_response(response) def on_l2cap_le_flow_control_credit( - self, connection: Connection, _cid: int, credit + self, connection: Connection, _cid: int, credit: L2CAP_LE_Flow_Control_Credit ) -> None: channel = self.find_le_coc_channel(connection.handle, credit.cid) if channel is None: diff --git a/tests/l2cap_test.py b/tests/l2cap_test.py index 6323ddf..3ece29a 100644 --- a/tests/l2cap_test.py +++ b/tests/l2cap_test.py @@ -68,11 +68,13 @@ def test_helpers(): assert offset == 4 assert psm == 0x242311 - rq = L2CAP_Connection_Request(psm=0x01, source_cid=0x44) + rq = L2CAP_Connection_Request(psm=0x01, source_cid=0x44, identifier=0x88) brq = bytes(rq) srq = L2CAP_Connection_Request.from_bytes(brq) + assert isinstance(srq, L2CAP_Connection_Request) assert srq.psm == rq.psm assert srq.source_cid == rq.source_cid + assert srq.identifier == rq.identifier # -----------------------------------------------------------------------------