diff --git a/bumble/controller.py b/bumble/controller.py index 9f12a40..c9bdae7 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -394,7 +394,7 @@ class Controller: peer_address=peer_address, link=self.link, transport=PhysicalTransport.LE, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, ) self.peripheral_connections[peer_address] = connection logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}') @@ -454,7 +454,7 @@ class Controller: peer_address=peer_address, link=self.link, transport=PhysicalTransport.LE, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, ) self.central_connections[peer_address] = connection logger.debug( @@ -695,7 +695,7 @@ class Controller: peer_address=peer_address, link=self.link, transport=PhysicalTransport.BR_EDR, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, ) self.classic_connections[peer_address] = connection logger.debug( @@ -709,7 +709,7 @@ class Controller: connection_handle=connection_handle, bd_addr=peer_address, encryption_enabled=False, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, ) ) else: @@ -720,7 +720,7 @@ class Controller: connection_handle=0, bd_addr=peer_address, encryption_enabled=False, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, ) ) @@ -945,7 +945,7 @@ class Controller: ) ) self.link.classic_sco_connect( - self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE + self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO ) def on_hci_enhanced_accept_synchronous_connection_request_command(self, command): @@ -974,7 +974,7 @@ class Controller: ) ) self.link.classic_accept_sco_connection( - self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE + self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO ) def on_hci_switch_role_command(self, command): diff --git a/bumble/device.py b/bumble/device.py index 24732a4..a92af56 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -5084,8 +5084,8 @@ class Device(utils.CompositeEventEmitter): # Store the keys in the key store if self.keystore: authenticated = key_type in ( - hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, - hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, + hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192, + hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256, ) pairing_keys = PairingKeys( link_key=PairingKeys.Key(value=link_key, authenticated=authenticated), @@ -5532,8 +5532,8 @@ class Device(utils.CompositeEventEmitter): # Handle SCO request. if link_type in ( - hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE, - hci.HCI_Connection_Complete_Event.ESCO_LINK_TYPE, + hci.HCI_Connection_Complete_Event.LinkType.SCO, + hci.HCI_Connection_Complete_Event.LinkType.ESCO, ): if connection := self.find_connection_by_bd_addr( bd_addr, transport=PhysicalTransport.BR_EDR @@ -5641,7 +5641,7 @@ class Device(utils.CompositeEventEmitter): # [Classic only] @host_event_handler @with_connection_from_address - def on_authentication_io_capability_request(self, connection): + def on_authentication_io_capability_request(self, connection: Connection): # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) @@ -5649,13 +5649,13 @@ class Device(utils.CompositeEventEmitter): authentication_requirements = ( # No Bonding ( - hci.HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, - hci.HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, + hci.AuthenticationRequirements.MITM_NOT_REQUIRED_NO_BONDING, + hci.AuthenticationRequirements.MITM_REQUIRED_NO_BONDING, ), # General Bonding ( - hci.HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, - hci.HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, + hci.AuthenticationRequirements.MITM_NOT_REQUIRED_GENERAL_BONDING, + hci.AuthenticationRequirements.MITM_REQUIRED_GENERAL_BONDING, ), )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0] @@ -5710,30 +5710,30 @@ class Device(utils.CompositeEventEmitter): raise UnreachableError() # See Bluetooth spec @ Vol 3, Part C 5.2.2.6 - methods = { - hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: { - hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, - hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, - hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, - hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + methods: dict[int, dict[int, Callable[[], Awaitable[bool]]]] = { + hci.IoCapability.DISPLAY_ONLY: { + hci.IoCapability.DISPLAY_ONLY: display_auto_confirm, + hci.IoCapability.DISPLAY_YES_NO: display_confirm, + hci.IoCapability.KEYBOARD_ONLY: na, + hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm, }, - hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: { - hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, - hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, - hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, - hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + hci.IoCapability.DISPLAY_YES_NO: { + hci.IoCapability.DISPLAY_ONLY: display_auto_confirm, + hci.IoCapability.DISPLAY_YES_NO: display_confirm, + hci.IoCapability.KEYBOARD_ONLY: na, + hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm, }, - hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: { - hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: na, - hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: na, - hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, - hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + hci.IoCapability.KEYBOARD_ONLY: { + hci.IoCapability.DISPLAY_ONLY: na, + hci.IoCapability.DISPLAY_YES_NO: na, + hci.IoCapability.KEYBOARD_ONLY: na, + hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm, }, - hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { - hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm, - hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm, - hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm, - hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + hci.IoCapability.NO_INPUT_NO_OUTPUT: { + hci.IoCapability.DISPLAY_ONLY: confirm, + hci.IoCapability.DISPLAY_YES_NO: confirm, + hci.IoCapability.KEYBOARD_ONLY: auto_confirm, + hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm, }, } @@ -5799,7 +5799,7 @@ class Device(utils.CompositeEventEmitter): io_capability = pairing_config.delegate.classic_io_capability # Respond - if io_capability == hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: + if io_capability == hci.IoCapability.KEYBOARD_ONLY: # Ask the user to enter a string async def get_pin_code(): pin_code = await connection.cancel_on_disconnection( @@ -5979,7 +5979,7 @@ class Device(utils.CompositeEventEmitter): @host_event_handler @with_connection_from_handle def on_connection_encryption_change( - self, connection, encryption, encryption_key_size + self, connection: Connection, encryption: int, encryption_key_size: int ): logger.debug( f'*** Connection Encryption Change: [0x{connection.handle:04X}] ' @@ -5992,14 +5992,14 @@ class Device(utils.CompositeEventEmitter): if ( not connection.authenticated and connection.transport == PhysicalTransport.BR_EDR - and encryption == hci.HCI_Encryption_Change_Event.AES_CCM + and encryption == hci.HCI_Encryption_Change_Event.Enabled.AES_CCM ): connection.authenticated = True connection.sc = True if ( not connection.authenticated and connection.transport == PhysicalTransport.LE - and encryption == hci.HCI_Encryption_Change_Event.E0_OR_AES_CCM + and encryption == hci.HCI_Encryption_Change_Event.Enabled.E0_OR_AES_CCM ): connection.authenticated = True connection.sc = True diff --git a/bumble/hci.py b/bumble/hci.py index 7f0afe2..a954a04 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -880,57 +880,32 @@ HCI_R1_PAGE_SCAN_REPETITION_MODE = 0x01 HCI_R2_PAGE_SCAN_REPETITION_MODE = 0x02 # IO Capability -HCI_DISPLAY_ONLY_IO_CAPABILITY = 0x00 -HCI_DISPLAY_YES_NO_IO_CAPABILITY = 0x01 -HCI_KEYBOARD_ONLY_IO_CAPABILITY = 0x02 -HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03 - -HCI_IO_CAPABILITY_NAMES = { - HCI_DISPLAY_ONLY_IO_CAPABILITY: 'HCI_DISPLAY_ONLY_IO_CAPABILITY', - HCI_DISPLAY_YES_NO_IO_CAPABILITY: 'HCI_DISPLAY_YES_NO_IO_CAPABILITY', - HCI_KEYBOARD_ONLY_IO_CAPABILITY: 'HCI_KEYBOARD_ONLY_IO_CAPABILITY', - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY' -} +class IoCapability(SpecableEnum): + DISPLAY_ONLY = 0x00 + DISPLAY_YES_NO = 0x01 + KEYBOARD_ONLY = 0x02 + NO_INPUT_NO_OUTPUT = 0x03 # Authentication Requirements -HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS = 0x00 -HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS = 0x01 -HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS = 0x02 -HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS = 0x03 -HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS = 0x04 -HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS = 0x05 - -HCI_AUTHENTICATION_REQUIREMENTS_NAMES = { - HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS', - HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS', - HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS', - HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS', - HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS', - HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS' -} +class AuthenticationRequirements(SpecableEnum): + MITM_NOT_REQUIRED_NO_BONDING = 0x00 + MITM_REQUIRED_NO_BONDING = 0x01 + MITM_NOT_REQUIRED_DEDICATED_BONDING = 0x02 + MITM_REQUIRED_DEDICATED_BONDING = 0x03 + MITM_NOT_REQUIRED_GENERAL_BONDING = 0x04 + MITM_REQUIRED_GENERAL_BONDING = 0x05 # Link Key Types -HCI_COMBINATION_KEY_TYPE = 0X00 -HCI_LOCAL_UNIT_KEY_TYPE = 0X01 -HCI_REMOTE_UNIT_KEY_TYPE = 0X02 -HCI_DEBUG_COMBINATION_KEY_TYPE = 0X03 -HCI_UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE = 0X04 -HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE = 0X05 -HCI_CHANGED_COMBINATION_KEY_TYPE = 0X06 -HCI_UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE = 0X07 -HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE = 0X08 - -HCI_LINK_TYPE_NAMES = { - HCI_COMBINATION_KEY_TYPE: 'HCI_COMBINATION_KEY_TYPE', - HCI_LOCAL_UNIT_KEY_TYPE: 'HCI_LOCAL_UNIT_KEY_TYPE', - HCI_REMOTE_UNIT_KEY_TYPE: 'HCI_REMOTE_UNIT_KEY_TYPE', - HCI_DEBUG_COMBINATION_KEY_TYPE: 'HCI_DEBUG_COMBINATION_KEY_TYPE', - HCI_UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE: 'HCI_UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE', - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE: 'HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE', - HCI_CHANGED_COMBINATION_KEY_TYPE: 'HCI_CHANGED_COMBINATION_KEY_TYPE', - HCI_UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE: 'HCI_UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE', - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE: 'HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE' -} +class LinkKeyType(SpecableEnum): + COMBINATION_KEY = 0X00 + LOCAL_UNIT_KEY = 0X01 + REMOTE_UNIT_KEY = 0X02 + DEBUG_COMBINATION_KEY = 0X03 + UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192 = 0X04 + AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192 = 0X05 + CHANGED_COMBINATION_KEY = 0X06 + UNAUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256 = 0X07 + AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256 = 0X08 # Address types class AddressType(SpecableEnum): @@ -1578,6 +1553,7 @@ CS_SYNC_PHY_SPEC = {'size': 1, 'mapper': lambda x: CsSyncPhy(x).name} CS_SYNC_PHY_SUPPORTED_SPEC = {'size': 1, 'mapper': lambda x: CsSyncPhySupported(x).name} RTT_TYPE_SPEC = {'size': 1, 'mapper': lambda x: RttType(x).name} CS_SNR_SPEC = {'size': 1, 'mapper': lambda x: CsSnr(x).name} +COD_SPEC = {'size': 3, 'mapper': map_class_of_device} class CodecID(SpecableEnum): @@ -1643,18 +1619,16 @@ class HCI_Constant: return HCI_INQUIRY_LAP_NAMES.get(lap, f'0x{lap:06X}') @staticmethod - def io_capability_name(io_capability): - return HCI_IO_CAPABILITY_NAMES.get(io_capability, f'0x{io_capability:02X}') + def io_capability_name(io_capability: int) -> str: + return IoCapability(io_capability).name @staticmethod - def authentication_requirements_name(authentication_requirements): - return HCI_AUTHENTICATION_REQUIREMENTS_NAMES.get( - authentication_requirements, f'0x{authentication_requirements:02X}' - ) + def authentication_requirements_name(authentication_requirements: int) -> str: + return AuthenticationRequirements(authentication_requirements).name @staticmethod - def link_key_type_name(link_key_type): - return HCI_LINK_TYPE_NAMES.get(link_key_type, f'0x{link_key_type:02X}') + def link_key_type_name(link_key_type: int) -> str: + return LinkKeyType(link_key_type).name # ----------------------------------------------------------------------------- @@ -2666,11 +2640,11 @@ class HCI_Reject_Synchronous_Connection_Request_Command(HCI_Command): @HCI_Command.command( fields=[ ('bd_addr', Address.parse_address), - ('io_capability', {'size': 1, 'mapper': HCI_Constant.io_capability_name}), + ('io_capability', IoCapability.type_spec(1)), ('oob_data_present', 1), ( 'authentication_requirements', - {'size': 1, 'mapper': HCI_Constant.authentication_requirements_name}, + AuthenticationRequirements.type_spec(1), ), ], ) @@ -3267,12 +3241,12 @@ class HCI_Read_Class_Of_Device_Command(HCI_Command): return_parameters_fields = [ ('status', STATUS_SPEC), - ('class_of_device', {'size': 3, 'mapper': map_class_of_device}), + ('class_of_device', COD_SPEC), ] # ----------------------------------------------------------------------------- -@HCI_Command.command([('class_of_device', {'size': 3, 'mapper': map_class_of_device})]) +@HCI_Command.command([('class_of_device', COD_SPEC)]) class HCI_Write_Class_Of_Device_Command(HCI_Command): ''' See Bluetooth spec @ 7.3.26 Write Class of Device Command @@ -4232,8 +4206,8 @@ class HCI_LE_Read_PHY_Command(HCI_Command): return_parameters_fields = [ ('status', STATUS_SPEC), ('connection_handle', 2), - ('tx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), - ('rx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('tx_phy', Phy.type_spec(1)), + ('rx_phy', Phy.type_spec(1)), ] @@ -4367,9 +4341,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): ('peer_address', Address.parse_address_preceded_by_type), ('advertising_filter_policy', 1), ('advertising_tx_power', 1), - ('primary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('primary_advertising_phy', Phy.type_spec(1)), ('secondary_advertising_max_skip', 1), - ('secondary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('secondary_advertising_phy', Phy.type_spec(1)), ('advertising_sid', 1), ('scan_request_notification_enable', 1), ], @@ -5630,39 +5604,25 @@ class HCI_Event(HCI_Packet): vendor_factories: list[Callable[[bytes], Optional[HCI_Event]]] = [] event_code: int = -1 fields: Fields = () - parameters: bytes = b'' + _parameters: bytes = b'' - @staticmethod - def event(fields: Optional[Fields] = ()): + _Event = TypeVar("_Event", bound="HCI_Event") + + @classmethod + def event(cls, subclass: type[_Event]) -> type[_Event]: ''' Decorator used to declare and register subclasses ''' - Event = TypeVar("Event", bound=HCI_Event) + subclass.name = subclass.__name__.upper() + subclass.event_code = key_with_value(subclass.event_names, subclass.name) + subclass.fields = HCI_Object.fields_from_dataclass(subclass) + if subclass.event_code is None: + raise KeyError(f'event {subclass.name} not found in event_names') - def inner(cls: type[Event]) -> type[Event]: - cls.name = cls.__name__.upper() - cls.event_code = key_with_value(cls.event_names, cls.name) - if cls.event_code is None: - raise KeyError(f'event {cls.name} not found in event_names') - if fields is not None: - cls.fields = fields - - # Register a factory for this class - HCI_Event.event_classes[cls.event_code] = cls - - return cls - - return inner - - @classmethod - def dataclass_event(cls, subclass): - # TODO: Move this to __post_init__ when all packets become dataclasses. - subclass.parameters = functools.cached_property( - lambda self: HCI_Object.dict_to_bytes(self.__dict__, self.fields) - ) - subclass.parameters.__set_name__(subclass, 'parameters') - return HCI_Event.event(HCI_Object.fields_from_dataclass(subclass))(subclass) + # Register a factory for this class + cls.event_classes[subclass.event_code] = subclass + return subclass @staticmethod def event_map(symbols: dict[str, Any]) -> dict[int, str]: @@ -5747,9 +5707,9 @@ class HCI_Event(HCI_Packet): @classmethod def from_parameters(cls, parameters: bytes) -> Self: - if dataclasses.is_dataclass(cls): - return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) - return cls(parameters, **HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) + event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) + event.parameters = parameters + return event def __init__( self, @@ -5767,8 +5727,18 @@ class HCI_Event(HCI_Packet): parameters = HCI_Object.dict_to_bytes(kwargs, self.fields) self.parameters = parameters or b'' + @property + def parameters(self) -> bytes: + if not self._parameters: + self._parameters = HCI_Object.dict_to_bytes(self.__dict__, self.fields) + return self._parameters + + @parameters.setter + def parameters(self, parameters: bytes): + self._parameters = parameters + def __bytes__(self) -> bytes: - parameters = b'' if self.parameters is None else self.parameters + parameters = self.parameters return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters def __str__(self): @@ -6152,12 +6122,8 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): address: Address = field( metadata=metadata(Address.parse_address_preceded_by_type) ) - primary_phy: int = field( - metadata=metadata({'size': 1, 'mapper': HCI_Constant.le_phy_name}) - ) - secondary_phy: int = field( - metadata=metadata({'size': 1, 'mapper': HCI_Constant.le_phy_name}) - ) + primary_phy: int = field(metadata=metadata(Phy.type_spec(1))) + secondary_phy: int = field(metadata=metadata(Phy.type_spec(1))) advertising_sid: int = field(metadata=metadata(1)) tx_power: int = field(metadata=metadata(1)) rssi: int = field(metadata=metadata(-1)) @@ -6663,7 +6629,7 @@ class HCI_LE_CS_Test_End_Complete_Event(HCI_LE_Meta_Event): # ----------------------------------------------------------------------------- -@HCI_Event.dataclass_event +@HCI_Event.event @dataclasses.dataclass class HCI_Inquiry_Complete_Event(HCI_Event): ''' @@ -6674,231 +6640,163 @@ class HCI_Inquiry_Complete_Event(HCI_Event): # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - [ - ('bd_addr', Address.parse_address), - ('page_scan_repetition_mode', 1), - ('reserved', 1), - ('reserved', 1), - ('class_of_device', {'size': 3, 'mapper': map_class_of_device}), - ('clock_offset', 2), - ] - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Inquiry_Result_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.2 Inquiry Result Event ''' - bd_addr: list[Address] - page_scan_repetition_mode: list[int] - class_of_device: list[int] - clock_offset: list[int] + bd_addr: Sequence[Address] = field( + metadata=metadata(Address.parse_address, list_begin=True) + ) + page_scan_repetition_mode: Sequence[int] = field(metadata=metadata(1)) + reserved_0: Sequence[int] = field(metadata=metadata(1)) + reserved_1: Sequence[int] = field(metadata=metadata(1)) + class_of_device: Sequence[int] = field(metadata=metadata(COD_SPEC)) + clock_offset: Sequence[int] = field(metadata=metadata(2, list_end=True)) # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('bd_addr', Address.parse_address), - ( - 'link_type', - { - 'size': 1, - # pylint: disable-next=unnecessary-lambda - 'mapper': lambda x: HCI_Connection_Complete_Event.link_type_name(x), - }, - ), - ('encryption_enabled', 1), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Connection_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.3 Connection Complete Event ''' - SCO_LINK_TYPE = 0x00 - ACL_LINK_TYPE = 0x01 - ESCO_LINK_TYPE = 0x02 + class LinkType(SpecableEnum): + SCO = 0x00 + ACL = 0x01 + ESCO = 0x02 - LINK_TYPE_NAMES = { - SCO_LINK_TYPE: 'SCO', - ACL_LINK_TYPE: 'ACL', - ESCO_LINK_TYPE: 'eSCO', - } - - @staticmethod - def link_type_name(link_type): - return name_or_number(HCI_Connection_Complete_Event.LINK_TYPE_NAMES, link_type) + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + link_type: int = field(metadata=LinkType.type_metadata(1)) + encryption_enabled: int = field(metadata=metadata(1)) # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('bd_addr', Address.parse_address), - ('class_of_device', 3), - ( - 'link_type', - { - 'size': 1, - # pylint: disable-next=unnecessary-lambda - 'mapper': lambda x: HCI_Connection_Complete_Event.link_type_name(x), - }, - ), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Connection_Request_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.4 Connection Request Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + class_of_device: int = field(metadata=metadata(3)) + link_type: int = field( + metadata=HCI_Connection_Complete_Event.LinkType.type_metadata(1) + ) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('reason', {'size': 1, 'mapper': HCI_Constant.error_name}), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Disconnection_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.5 Disconnection Complete Event ''' - status: int - connection_handle: int - reason: int + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + reason: int = field(metadata=metadata(STATUS_SPEC)) # ----------------------------------------------------------------------------- -@HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Authentication_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.6 Authentication Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('bd_addr', Address.parse_address), - ('remote_name', {'size': 248, 'mapper': map_null_terminated_utf8_string}), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Remote_Name_Request_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.7 Remote Name Request Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + remote_name: int = field( + metadata=metadata({'size': 248, 'mapper': map_null_terminated_utf8_string}) + ) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ( - 'encryption_enabled', - { - 'size': 1, - # pylint: disable-next=unnecessary-lambda - 'mapper': lambda x: HCI_Encryption_Change_Event.encryption_enabled_name( - x - ), - }, - ), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Encryption_Change_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.8 Encryption Change Event ''' - OFF = 0x00 - E0_OR_AES_CCM = 0x01 - AES_CCM = 0x02 + class Enabled(SpecableEnum): + OFF = 0x00 + E0_OR_AES_CCM = 0x01 + AES_CCM = 0x02 - ENCRYPTION_ENABLED_NAMES = { - OFF: 'OFF', - E0_OR_AES_CCM: 'E0_OR_AES_CCM', - AES_CCM: 'AES_CCM', - } - - @staticmethod - def encryption_enabled_name(encryption_enabled): - return name_or_number( - HCI_Encryption_Change_Event.ENCRYPTION_ENABLED_NAMES, encryption_enabled - ) + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + encryption_enabled: int = field(metadata=Enabled.type_metadata(1)) # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ( - 'encryption_enabled', - { - 'size': 1, - # pylint: disable-next=unnecessary-lambda - 'mapper': lambda x: HCI_Encryption_Change_Event.encryption_enabled_name( - x - ), - }, - ), - ('encryption_key_size', 1), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Encryption_Change_V2_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.8 Encryption Change Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + encryption_enabled: int = field( + metadata=HCI_Encryption_Change_Event.Enabled.type_metadata(1) + ) + encryption_key_size: int = field(metadata=metadata(1)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [('status', STATUS_SPEC), ('connection_handle', 2), ('lmp_features', 8)] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Read_Remote_Supported_Features_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.11 Read Remote Supported Features Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + lmp_features: bytes = field(metadata=metadata(8)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('version', 1), - ('manufacturer_name', 2), - ('subversion', 2), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Read_Remote_Version_Information_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.12 Read Remote Version Information Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + version: int = field(metadata=metadata(1)) + manufacturer_name: int = field(metadata=metadata(2)) + subversion: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('unused', 1), - ( - 'service_type', - { - 'size': 1, - 'mapper': lambda x: HCI_QOS_Setup_Complete_Event.ServiceType(x).name, - }, - ), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_QOS_Setup_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.13 QoS Setup Complete Event @@ -6909,23 +6807,25 @@ class HCI_QOS_Setup_Complete_Event(HCI_Event): BEST_EFFORT_AVAILABLE = 0x01 GUARANTEED_AVAILABLE = 0x02 + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + unused: int = field(metadata=metadata(1)) + service_type: int = field(metadata=ServiceType.type_metadata(1)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('num_hci_command_packets', 1), - ('command_opcode', {'size': 2, 'mapper': HCI_Command.command_name}), - ('return_parameters', '*'), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Command_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.14 Command Complete Event ''' - num_hci_command_packets: int - command_opcode: int - return_parameters = b'' + num_hci_command_packets: int = field(metadata=metadata(1)) + command_opcode: int = field( + metadata=metadata({'size': 2, 'mapper': HCI_Command.command_name}) + ) + return_parameters: Union[bytes, HCI_Object, int] = field(metadata=metadata("*")) def map_return_parameters(self, return_parameters): '''Map simple 'status' return parameters to their named constant form''' @@ -6940,13 +6840,10 @@ class HCI_Command_Complete_Event(HCI_Event): return return_parameters - @staticmethod - def from_parameters(parameters): - event = HCI_Command_Complete_Event(parameters) - HCI_Object.init_from_bytes( - event, parameters, 0, HCI_Command_Complete_Event.fields - ) - + @classmethod + def from_parameters(cls, parameters: bytes) -> Self: + event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) + event.parameters = parameters # Parse the return parameters if ( isinstance(event.return_parameters, bytes) @@ -6956,10 +6853,12 @@ class HCI_Command_Complete_Event(HCI_Event): # convert it to an integer event.return_parameters = event.return_parameters[0] else: - cls = HCI_Command.command_classes.get(event.command_opcode) - if cls: + subclass = HCI_Command.command_classes.get(event.command_opcode) + if subclass: # Try to parse the return parameters bytes into an object. - return_parameters = cls.parse_return_parameters(event.return_parameters) + return_parameters = subclass.parse_return_parameters( + event.return_parameters + ) if return_parameters is not None: event.return_parameters = return_parameters @@ -6975,17 +6874,8 @@ class HCI_Command_Complete_Event(HCI_Event): # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ( - 'status', - # pylint: disable-next=unnecessary-lambda - {'size': 1, 'mapper': lambda x: HCI_Command_Status_Event.status_name(x)}, - ), - ('num_hci_command_packets', 1), - ('command_opcode', {'size': 2, 'mapper': HCI_Command.command_name}), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Command_Status_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.15 Command Complete Event @@ -7000,416 +6890,412 @@ class HCI_Command_Status_Event(HCI_Event): return HCI_Constant.error_name(status) + status: int = field( + metadata=metadata( + {'size': 1, 'mapper': lambda x: HCI_Command_Status_Event.status_name(x)} + ) + ) + num_hci_command_packets: int = field(metadata=metadata(1)) + command_opcode: int = field( + metadata=metadata({'size': 2, 'mapper': HCI_Command.command_name}) + ) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('bd_addr', Address.parse_address), - ('new_role', {'size': 1, 'mapper': HCI_Constant.role_name}), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Role_Change_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.18 Role Change Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + new_role: int = field(metadata=Role.type_metadata(1)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - [ - ('connection_handles', 2), - ('num_completed_packets', 2), - ] - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Number_Of_Completed_Packets_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.19 Number Of Completed Packets Event ''' - connection_handles: list[int] - num_completed_packets: list[int] + connection_handles: Sequence[int] = field(metadata=metadata(2, list_begin=True)) + num_completed_packets: Sequence[int] = field(metadata=metadata(2, list_end=True)) # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ( - 'current_mode', - # pylint: disable-next=unnecessary-lambda - {'size': 1, 'mapper': lambda x: HCI_Mode_Change_Event.mode_name(x)}, - ), - ('interval', 2), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Mode_Change_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.20 Mode Change Event ''' - ACTIVE_MODE = 0x00 - HOLD_MODE = 0x01 - SNIFF_MODE = 0x02 + class Mode(SpecableEnum): + ACTIVE = 0x00 + HOLD = 0x01 + SNIFF = 0x02 - MODE_NAMES = { - ACTIVE_MODE: 'ACTIVE_MODE', - HOLD_MODE: 'HOLD_MODE', - SNIFF_MODE: 'SNIFF_MODE', - } - - @staticmethod - def mode_name(mode): - return name_or_number(HCI_Mode_Change_Event.MODE_NAMES, mode) + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + current_mode: int = field(metadata=Mode.type_metadata(1)) + interval: int = field(metadata=metadata(2)) # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_PIN_Code_Request_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.22 PIN Code Request Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Link_Key_Request_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.24 7.7.23 Link Key Request Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('bd_addr', Address.parse_address), - ('link_key', 16), - ('key_type', {'size': 1, 'mapper': HCI_Constant.link_key_type_name}), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Link_Key_Notification_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.24 Link Key Notification Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + link_key: bytes = field(metadata=metadata(16)) + key_type: int = field(metadata=LinkKeyType.type_metadata(1)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('connection_handle', 2), ('lmp_max_slots', 1)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Max_Slots_Change_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.27 Max Slots Change Event ''' + connection_handle: int = field(metadata=metadata(2)) + lmp_max_slots: int = field(metadata=metadata(1)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [('status', STATUS_SPEC), ('connection_handle', 2), ('clock_offset', 2)] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Read_Clock_Offset_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.28 Read Clock Offset Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + clock_offset: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [('status', STATUS_SPEC), ('connection_handle', 2), ('packet_type', 2)] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Connection_Packet_Type_Changed_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.29 Connection Packet Type Changed Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + packet_type: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address), ('page_scan_repetition_mode', 1)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Page_Scan_Repetition_Mode_Change_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.31 Page Scan Repetition Mode Change Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + page_scan_repetition_mode: int = field(metadata=metadata(1)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - [ - ('bd_addr', Address.parse_address), - ('page_scan_repetition_mode', 1), - ('reserved', 1), - ('class_of_device', {'size': 3, 'mapper': map_class_of_device}), - ('clock_offset', 2), - ('rssi', -1), - ] - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Inquiry_Result_With_RSSI_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.33 Inquiry Result with RSSI Event ''' - bd_addr: list[Address] - page_scan_repetition_mode: list[int] - class_of_device: list[int] - clock_offset: list[int] - rssi: list[int] + bd_addr: Sequence[Address] = field( + metadata=metadata(Address.parse_address, list_begin=True) + ) + page_scan_repetition_mode: Sequence[int] = field(metadata=metadata(1)) + reserved: Sequence[int] = field(metadata=metadata(1)) + class_of_device: Sequence[int] = field(metadata=metadata(COD_SPEC)) + clock_offset: Sequence[int] = field(metadata=metadata(2)) + rssi: Sequence[int] = field(metadata=metadata(-1, list_end=True)) # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('page_number', 1), - ('maximum_page_number', 1), - ('extended_lmp_features', 8), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Read_Remote_Extended_Features_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.34 Read Remote Extended Features Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + page_number: int = field(metadata=metadata(1)) + maximum_page_number: int = field(metadata=metadata(1)) + extended_lmp_features: bytes = field(metadata=metadata(8)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - # pylint: disable=line-too-long - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('bd_addr', Address.parse_address), - ( - 'link_type', - { - 'size': 1, - # pylint: disable-next=unnecessary-lambda - 'mapper': lambda x: HCI_Synchronous_Connection_Complete_Event.link_type_name( - x - ), - }, - ), - ('transmission_interval', 1), - ('retransmission_window', 1), - ('rx_packet_length', 2), - ('tx_packet_length', 2), - ( - 'air_mode', - { - 'size': 1, - # pylint: disable-next=unnecessary-lambda - 'mapper': lambda x: HCI_Synchronous_Connection_Complete_Event.air_mode_name( - x - ), - }, - ), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Synchronous_Connection_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.35 Synchronous Connection Complete Event ''' - SCO_CONNECTION_LINK_TYPE = 0x00 - ESCO_CONNECTION_LINK_TYPE = 0x02 + class LinkType(SpecableEnum): + SCO = 0x00 + ESCO = 0x02 - LINK_TYPE_NAMES = { - SCO_CONNECTION_LINK_TYPE: 'SCO', - ESCO_CONNECTION_LINK_TYPE: 'eSCO', - } + class AirMode(SpecableEnum): + U_LAW_LOG = 0x00 + A_LAW_LOG_AIR_MORE = 0x01 + CVSD = 0x02 + TRANSPARENT_DATA = 0x03 - U_LAW_LOG_AIR_MODE = 0x00 - A_LAW_LOG_AIR_MORE = 0x01 - CVSD_AIR_MODE = 0x02 - TRANSPARENT_DATA_AIR_MODE = 0x03 - - AIR_MODE_NAMES = { - U_LAW_LOG_AIR_MODE: 'u-law log', - A_LAW_LOG_AIR_MORE: 'A-law log', - CVSD_AIR_MODE: 'CVSD', - TRANSPARENT_DATA_AIR_MODE: 'Transparent Data', - } - - @staticmethod - def link_type_name(link_type): - return name_or_number( - HCI_Synchronous_Connection_Complete_Event.LINK_TYPE_NAMES, link_type - ) - - @staticmethod - def air_mode_name(air_mode): - return name_or_number( - HCI_Synchronous_Connection_Complete_Event.AIR_MODE_NAMES, air_mode - ) + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + link_type: int = field(metadata=LinkType.type_metadata(1)) + transmission_interval: int = field(metadata=metadata(1)) + retransmission_window: int = field(metadata=metadata(1)) + rx_packet_length: int = field(metadata=metadata(2)) + tx_packet_length: int = field(metadata=metadata(2)) + air_mode: int = field(metadata=AirMode.type_metadata(1)) # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('transmission_interval', 1), - ('retransmission_window', 1), - ('rx_packet_length', 2), - ('tx_packet_length', 2), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Synchronous_Connection_Changed_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.36 Synchronous Connection Changed Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + transmission_interval: int = field(metadata=metadata(1)) + retransmission_window: int = field(metadata=metadata(1)) + rx_packet_length: int = field(metadata=metadata(2)) + tx_packet_length: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('status', STATUS_SPEC), - ('connection_handle', 2), - ('max_tx_latency', 2), - ('max_rx_latency', 2), - ('min_remote_timeout', 2), - ('min_local_timeout', 2), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Sniff_Subrating_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.37 Sniff Subrating Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + max_tx_latency: int = field(metadata=metadata(2)) + max_rx_latency: int = field(metadata=metadata(2)) + min_remote_timeout: int = field(metadata=metadata(2)) + min_local_timeout: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('num_responses', 1), - ('bd_addr', Address.parse_address), - ('page_scan_repetition_mode', 1), - ('reserved', 1), - ('class_of_device', {'size': 3, 'mapper': map_class_of_device}), - ('clock_offset', 2), - ('rssi', -1), - ('extended_inquiry_response', 240), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_Extended_Inquiry_Result_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.38 Extended Inquiry Result Event ''' + num_responses: int = field(metadata=metadata(1)) + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + page_scan_repetition_mode: int = field(metadata=metadata(1)) + reserved: int = field(metadata=metadata(1)) + class_of_device: int = field(metadata=metadata(COD_SPEC)) + clock_offset: int = field(metadata=metadata(2)) + rssi: int = field(metadata=metadata(-1)) + extended_inquiry_response: bytes = field(metadata=metadata(240)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('status', STATUS_SPEC), ('connection_handle', 2)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Encryption_Key_Refresh_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.39 Encryption Key Refresh Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + connection_handle: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_IO_Capability_Request_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.40 IO Capability Request Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + # ----------------------------------------------------------------------------- -@HCI_Event.event( - [ - ('bd_addr', Address.parse_address), - ('io_capability', {'size': 1, 'mapper': HCI_Constant.io_capability_name}), - ('oob_data_present', 1), - ( - 'authentication_requirements', - {'size': 1, 'mapper': HCI_Constant.authentication_requirements_name}, - ), - ] -) +@HCI_Event.event +@dataclasses.dataclass class HCI_IO_Capability_Response_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.41 IO Capability Response Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + io_capability: int = field(metadata=IoCapability.type_metadata(1)) + oob_data_present: int = field(metadata=metadata(1)) + authentication_requirements: int = field( + metadata=AuthenticationRequirements.type_metadata(1) + ) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address), ('numeric_value', 4)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_User_Confirmation_Request_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.42 User Confirmation Request Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + numeric_value: int = field(metadata=metadata(4)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_User_Passkey_Request_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.43 User Passkey Request Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Remote_OOB_Data_Request_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.44 Remote OOB Data Request Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('status', STATUS_SPEC), ('bd_addr', Address.parse_address)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Simple_Pairing_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.45 Simple Pairing Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('connection_handle', 2), ('link_supervision_timeout', 2)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Link_Supervision_Timeout_Changed_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.46 Link Supervision Timeout Changed Event ''' + connection_handle: int = field(metadata=metadata(2)) + link_supervision_timeout: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('handle', 2)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Enhanced_Flush_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.47 Enhanced Flush Complete Event ''' + handle: int = field(metadata=metadata(2)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address), ('passkey', 4)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_User_Passkey_Notification_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.48 User Passkey Notification Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + passkey: int = field(metadata=metadata(4)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address), ('notification_type', 1)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Keypress_Notification_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.49 Keypress Notification Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + notification_type: int = field(metadata=metadata(1)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('bd_addr', Address.parse_address), ('host_supported_features', 8)]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.50 Remote Host Supported Features Notification Event ''' + bd_addr: Address = field(metadata=metadata(Address.parse_address)) + host_supported_features: bytes = field(metadata=metadata(8)) + # ----------------------------------------------------------------------------- -@HCI_Event.event([('data', "*")]) +@HCI_Event.event +@dataclasses.dataclass class HCI_Vendor_Event(HCI_Event): ''' See Bluetooth spec @ 5.4.4 HCI Event packet ''' + data: bytes = field(metadata=metadata("*")) + # ----------------------------------------------------------------------------- class HCI_AclDataPacket(HCI_Packet): diff --git a/bumble/host.py b/bumble/host.py index a1f9719..400816a 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -835,8 +835,8 @@ class Host(utils.EventEmitter): def on_packet(self, packet: bytes) -> None: try: hci_packet = hci.HCI_Packet.from_bytes(packet) - except Exception as error: - logger.warning(f'!!! error parsing packet from bytes: {error}') + except Exception: + logger.exception('!!! error parsing packet from bytes') return if self.ready or ( @@ -1392,7 +1392,7 @@ class Host(utils.EventEmitter): event.status, ) - def on_hci_encryption_change_event(self, event): + def on_hci_encryption_change_event(self, event: hci.HCI_Encryption_Change_Event): # Notify the client if event.status == hci.HCI_SUCCESS: self.emit( @@ -1406,7 +1406,9 @@ class Host(utils.EventEmitter): 'connection_encryption_failure', event.connection_handle, event.status ) - def on_hci_encryption_change_v2_event(self, event): + def on_hci_encryption_change_v2_event( + self, event: hci.HCI_Encryption_Change_V2_Event + ): # Notify the client if event.status == hci.HCI_SUCCESS: self.emit( diff --git a/bumble/link.py b/bumble/link.py index a2d19e4..06e291e 100644 --- a/bumble/link.py +++ b/bumble/link.py @@ -274,7 +274,7 @@ class LocalLink: responder_controller.on_classic_connection_request( initiator_controller.public_address, - HCI_Connection_Complete_Event.ACL_LINK_TYPE, + HCI_Connection_Complete_Event.LinkType.ACL, ) def classic_accept_connection( diff --git a/bumble/pairing.py b/bumble/pairing.py index 7d777df..9e6ec09 100644 --- a/bumble/pairing.py +++ b/bumble/pairing.py @@ -21,13 +21,7 @@ from dataclasses import dataclass import secrets from typing import Optional -from bumble.hci import ( - Address, - HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, - HCI_DISPLAY_ONLY_IO_CAPABILITY, - HCI_DISPLAY_YES_NO_IO_CAPABILITY, - HCI_KEYBOARD_ONLY_IO_CAPABILITY, -) +from bumble import hci from bumble.smp import ( SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, SMP_KEYBOARD_ONLY_IO_CAPABILITY, @@ -50,7 +44,7 @@ from bumble.core import AdvertisingData, LeRole class OobData: """OOB data that can be sent from one device to another.""" - address: Optional[Address] = None + address: Optional[hci.Address] = None role: Optional[LeRole] = None shared_data: Optional[OobSharedData] = None legacy_context: Optional[OobLegacyContext] = None @@ -62,7 +56,7 @@ class OobData: shared_data_r: Optional[bytes] = None for ad_type, ad_data in ad.ad_structures: if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS: - instance.address = Address(ad_data) + instance.address = hci.Address(ad_data) elif ad_type == AdvertisingData.LE_ROLE: instance.role = LeRole(ad_data[0]) elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: @@ -130,11 +124,11 @@ class PairingDelegate: # Default mapping from abstract to Classic I/O capabilities. # Subclasses may override this if they prefer a different mapping. CLASSIC_IO_CAPABILITIES_MAP = { - NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, - KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY, - DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY, - DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY, - DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY, + NO_OUTPUT_NO_INPUT: hci.IoCapability.NO_INPUT_NO_OUTPUT, + KEYBOARD_INPUT_ONLY: hci.IoCapability.KEYBOARD_ONLY, + DISPLAY_OUTPUT_ONLY: hci.IoCapability.DISPLAY_ONLY, + DISPLAY_OUTPUT_AND_YES_NO_INPUT: hci.IoCapability.DISPLAY_YES_NO, + DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: hci.IoCapability.DISPLAY_YES_NO, } io_capability: IoCapability @@ -160,7 +154,7 @@ class PairingDelegate: # pylint: disable=line-too-long return self.CLASSIC_IO_CAPABILITIES_MAP.get( - self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY + self.io_capability, hci.IoCapability.NO_INPUT_NO_OUTPUT ) @property @@ -237,8 +231,8 @@ class PairingConfig: """Configuration for the Pairing protocol.""" class AddressType(enum.IntEnum): - PUBLIC = Address.PUBLIC_DEVICE_ADDRESS - RANDOM = Address.RANDOM_DEVICE_ADDRESS + PUBLIC = hci.Address.PUBLIC_DEVICE_ADDRESS + RANDOM = hci.Address.RANDOM_DEVICE_ADDRESS @dataclass class OobConfig: diff --git a/bumble/pandora/security.py b/bumble/pandora/security.py index dc81141..e1ae610 100644 --- a/bumble/pandora/security.py +++ b/bumble/pandora/security.py @@ -244,16 +244,16 @@ class SecurityService(SecurityServicer): and connection.authenticated and link_key_type in ( - hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, - hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, + hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192, + hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256, ) ) if level == LEVEL4: return ( - connection.encryption == hci.HCI_Encryption_Change_Event.AES_CCM + connection.encryption == hci.HCI_Encryption_Change_Event.Enabled.AES_CCM and connection.authenticated and link_key_type - == hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE + == hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256 ) raise InvalidArgumentError(f"Unexpected level {level}") diff --git a/examples/run_hfp_handsfree.py b/examples/run_hfp_handsfree.py index 331ca7b..16f420a 100644 --- a/examples/run_hfp_handsfree.py +++ b/examples/run_hfp_handsfree.py @@ -46,7 +46,7 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration): def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol): if connection == protocol.dlc.multiplexer.l2cap_channel.connection: - if link_type == hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE: + if link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO: esco_parameters = hfp.ESCO_PARAMETERS[ hfp.DefaultCodecParameters.SCO_CVSD_D1 ] diff --git a/tests/device_test.py b/tests/device_test.py index 36e9415..3e390a8 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -125,7 +125,7 @@ async def test_device_connect_parallel(): HCI_Connection_Request_Event( bd_addr=d0.public_address, class_of_device=0, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, ) ) @@ -145,7 +145,7 @@ async def test_device_connect_parallel(): HCI_Connection_Request_Event( bd_addr=d0.public_address, class_of_device=0, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, ) ) @@ -168,7 +168,7 @@ async def test_device_connect_parallel(): status=HCI_SUCCESS, connection_handle=0x100, bd_addr=d0.public_address, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, encryption_enabled=True, ) ) @@ -178,7 +178,7 @@ async def test_device_connect_parallel(): status=HCI_SUCCESS, connection_handle=0x100, bd_addr=d1.public_address, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, encryption_enabled=True, ) ) @@ -202,7 +202,7 @@ async def test_device_connect_parallel(): status=HCI_SUCCESS, connection_handle=0x101, bd_addr=d0.public_address, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, encryption_enabled=True, ) ) @@ -212,7 +212,7 @@ async def test_device_connect_parallel(): status=HCI_SUCCESS, connection_handle=0x101, bd_addr=d2.public_address, - link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, + link_type=HCI_Connection_Complete_Event.LinkType.ACL, encryption_enabled=True, ) )