diff --git a/bumble/controller.py b/bumble/controller.py index db6334b5..547bceaf 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -28,40 +28,6 @@ from typing import TYPE_CHECKING, Any, Optional, Union from bumble import hci from bumble.colors import color from bumble.core import PhysicalTransport -from bumble.hci import ( - HCI_ACL_DATA_PACKET, - HCI_COMMAND_DISALLOWED_ERROR, - HCI_COMMAND_PACKET, - HCI_COMMAND_STATUS_PENDING, - HCI_CONTROLLER_BUSY_ERROR, - HCI_EVENT_PACKET, - HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, - HCI_LE_1M_PHY, - HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, - HCI_SUCCESS, - HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, - HCI_UNKNOWN_HCI_COMMAND_ERROR, - HCI_VERSION_BLUETOOTH_CORE_5_0, - Address, - HCI_AclDataPacket, - HCI_AclDataPacketAssembler, - HCI_Command_Complete_Event, - HCI_Command_Status_Event, - HCI_Connection_Complete_Event, - HCI_Connection_Request_Event, - HCI_Disconnection_Complete_Event, - HCI_Encryption_Change_Event, - HCI_LE_Advertising_Report_Event, - HCI_LE_CIS_Established_Event, - HCI_LE_CIS_Request_Event, - HCI_LE_Connection_Complete_Event, - HCI_LE_Read_Remote_Features_Complete_Event, - HCI_Number_Of_Completed_Packets_Event, - HCI_Packet, - HCI_Role_Change_Event, - HCI_Synchronous_Connection_Complete_Event, - Role, -) if TYPE_CHECKING: from bumble.link import LocalLink @@ -95,19 +61,19 @@ class CisLink: class Connection: controller: Controller handle: int - role: Role - peer_address: Address + role: hci.Role + peer_address: hci.Address link: Any transport: int link_type: int def __post_init__(self) -> None: - self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) + self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu) def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: self.assembler.feed_packet(packet) self.controller.send_hci_packet( - HCI_Number_Of_Completed_Packets_Event( + hci.HCI_Number_Of_Completed_Packets_Event( connection_handles=[self.handle], num_completed_packets=[1] ) ) @@ -124,18 +90,18 @@ class Controller: hci_sink: Optional[TransportSink] = None central_connections: dict[ - Address, Connection + hci.Address, Connection ] # Connections where this controller is the central peripheral_connections: dict[ - Address, Connection + hci.Address, Connection ] # Connections where this controller is the peripheral - classic_connections: dict[Address, Connection] # Connections in BR/EDR + classic_connections: dict[hci.Address, Connection] # Connections in BR/EDR central_cis_links: dict[int, CisLink] # CIS links by handle peripheral_cis_links: dict[int, CisLink] # CIS links by handle - hci_version: int = HCI_VERSION_BLUETOOTH_CORE_5_0 + hci_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0 hci_revision: int = 0 - lmp_version: int = HCI_VERSION_BLUETOOTH_CORE_5_0 + lmp_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0 lmp_subversion: int = 0 lmp_features: bytes = bytes.fromhex( '0000000060000000' @@ -174,7 +140,7 @@ class Controller: le_scan_interval: int = 0x10 le_scan_window: int = 0x10 le_scan_enable: int = 0 - le_scan_own_address_type: int = Address.RANDOM_DEVICE_ADDRESS + le_scan_own_address_type: int = hci.Address.RANDOM_DEVICE_ADDRESS le_scanning_filter_policy: int = 0 le_scan_response_data: Optional[bytes] = None le_address_resolution: bool = False @@ -186,7 +152,7 @@ class Controller: advertising_timer_handle: Optional[asyncio.Handle] = None classic_scan_enable: int = 0 - _random_address: 'Address' = Address('00:00:00:00:00:00') + _random_address: hci.Address = hci.Address('00:00:00:00:00:00') def __init__( self, @@ -194,7 +160,7 @@ class Controller: host_source=None, host_sink: Optional[TransportSink] = None, link: Optional[LocalLink] = None, - public_address: Optional[Union[bytes, str, Address]] = None, + public_address: Optional[Union[bytes, str, hci.Address]] = None, ) -> None: self.name = name self.link = link @@ -209,14 +175,14 @@ class Controller: 'rx_phys': 0, } - if isinstance(public_address, Address): + if isinstance(public_address, hci.Address): self._public_address = public_address elif public_address is not None: - self._public_address = Address( - public_address, Address.PUBLIC_DEVICE_ADDRESS + self._public_address = hci.Address( + public_address, hci.Address.PUBLIC_DEVICE_ADDRESS ) else: - self._public_address = Address('00:00:00:00:00:00') + self._public_address = hci.Address('00:00:00:00:00:00') # Set the source and sink interfaces if host_source: @@ -250,23 +216,23 @@ class Controller: self.hci_sink = sink @property - def public_address(self) -> Address: + def public_address(self) -> hci.Address: return self._public_address @public_address.setter - def public_address(self, address: Union[Address, str]) -> None: + def public_address(self, address: Union[hci.Address, str]) -> None: if isinstance(address, str): - address = Address(address) + address = hci.Address(address) self._public_address = address @property - def random_address(self) -> Address: + def random_address(self) -> hci.Address: return self._random_address @random_address.setter - def random_address(self, address: Union[Address, str]) -> None: + def random_address(self, address: Union[hci.Address, str]) -> None: if isinstance(address, str): - address = Address(address) + address = hci.Address(address) self._random_address = address logger.debug(f'new random address: {address}') @@ -275,9 +241,9 @@ class Controller: # Packet Sink protocol (packets coming from the host via HCI) def on_packet(self, packet: bytes) -> None: - self.on_hci_packet(HCI_Packet.from_bytes(packet)) + self.on_hci_packet(hci.HCI_Packet.from_bytes(packet)) - def on_hci_packet(self, packet: HCI_Packet) -> None: + def on_hci_packet(self, packet: hci.HCI_Packet) -> None: logger.debug( f'{color("<<<", "blue")} [{self.name}] ' f'{color("HOST -> CONTROLLER", "blue")}: {packet}' @@ -299,14 +265,14 @@ class Controller: result: Optional[bytes] = handler(command) if isinstance(result, bytes): self.send_hci_packet( - HCI_Command_Complete_Event( + hci.HCI_Command_Complete_Event( num_hci_command_packets=1, command_opcode=command.op_code, return_parameters=result, ) ) - def on_hci_event_packet(self, _event: HCI_Packet) -> None: + def on_hci_event_packet(self, _event: hci.HCI_Packet) -> None: logger.warning('!!! unexpected event packet') def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: @@ -321,7 +287,7 @@ class Controller: # Pass the packet to the connection connection.on_hci_acl_data_packet(packet) - def send_hci_packet(self, packet: HCI_Packet) -> None: + def send_hci_packet(self, packet: hci.HCI_Packet) -> None: logger.debug( f'{color(">>>", "green")} [{self.name}] ' f'{color("CONTROLLER -> HOST", "green")}: {packet}' @@ -357,13 +323,15 @@ class Controller: handle = max_handle + 1 return handle - def find_le_connection_by_address(self, address: Address) -> Optional[Connection]: + def find_le_connection_by_address( + self, address: hci.Address + ) -> Optional[Connection]: return self.central_connections.get(address) or self.peripheral_connections.get( address ) def find_classic_connection_by_address( - self, address: Address + self, address: hci.Address ) -> Optional[Connection]: return self.classic_connections.get(address) @@ -400,7 +368,7 @@ class Controller: handle ) - def on_link_central_connected(self, central_address: Address) -> None: + def on_link_central_connected(self, central_address: hci.Address) -> None: ''' Called when an incoming connection occurs from a central on the link ''' @@ -414,19 +382,19 @@ class Controller: connection = Connection( controller=self, handle=connection_handle, - role=Role.PERIPHERAL, + role=hci.Role.PERIPHERAL, peer_address=peer_address, link=self.link, transport=PhysicalTransport.LE, - link_type=HCI_Connection_Complete_Event.LinkType.ACL, + link_type=hci.HCI_Connection_Complete_Event.LinkType.ACL, ) self.peripheral_connections[peer_address] = connection logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}') # Then say that the connection has completed self.send_hci_packet( - HCI_LE_Connection_Complete_Event( - status=HCI_SUCCESS, + hci.HCI_LE_Connection_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=connection.handle, role=connection.role, peer_address_type=peer_address_type, @@ -438,7 +406,7 @@ class Controller: ) ) - def on_link_disconnected(self, peer_address: Address, reason: int) -> None: + def on_link_disconnected(self, peer_address: hci.Address, reason: int) -> None: ''' Called when an active disconnection occurs from a peer ''' @@ -446,8 +414,8 @@ class Controller: # Send a disconnection complete event if connection := self.peripheral_connections.get(peer_address): self.send_hci_packet( - HCI_Disconnection_Complete_Event( - status=HCI_SUCCESS, + hci.HCI_Disconnection_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=connection.handle, reason=reason, ) @@ -457,8 +425,8 @@ class Controller: del self.peripheral_connections[peer_address] elif connection := self.central_connections.get(peer_address): self.send_hci_packet( - HCI_Disconnection_Complete_Event( - status=HCI_SUCCESS, + hci.HCI_Disconnection_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=connection.handle, reason=reason, ) @@ -478,7 +446,7 @@ class Controller: Called by the link when a connection has been made or has failed to be made ''' - if status == HCI_SUCCESS: + if status == hci.HCI_SUCCESS: # Allocate (or reuse) a connection handle peer_address = le_create_connection_command.peer_address connection = self.central_connections.get(peer_address) @@ -487,11 +455,11 @@ class Controller: connection = Connection( controller=self, handle=connection_handle, - role=Role.CENTRAL, + role=hci.Role.CENTRAL, peer_address=peer_address, link=self.link, transport=PhysicalTransport.LE, - link_type=HCI_Connection_Complete_Event.LinkType.ACL, + link_type=hci.HCI_Connection_Complete_Event.LinkType.ACL, ) self.central_connections[peer_address] = connection logger.debug( @@ -503,10 +471,10 @@ class Controller: # Say that the connection has completed self.send_hci_packet( # pylint: disable=line-too-long - HCI_LE_Connection_Complete_Event( + hci.HCI_LE_Connection_Complete_Event( status=status, connection_handle=connection.handle if connection else 0, - role=Role.CENTRAL, + role=hci.Role.CENTRAL, peer_address_type=le_create_connection_command.peer_address_type, peer_address=le_create_connection_command.peer_address, connection_interval=le_create_connection_command.connection_interval_min, @@ -525,7 +493,7 @@ class Controller: # Send a disconnection complete event self.send_hci_packet( - HCI_Disconnection_Complete_Event( + hci.HCI_Disconnection_Complete_Event( status=status, connection_handle=disconnection_command.connection_handle, reason=disconnection_command.reason, @@ -545,18 +513,18 @@ class Controller: del self.peripheral_connections[connection.peer_address] def on_link_encrypted( - self, peer_address: Address, _rand: int, _ediv: int, _ltk: bytes + self, peer_address: hci.Address, _rand: bytes, _ediv: int, _ltk: bytes ) -> None: # For now, just setup the encryption without asking the host if connection := self.find_le_connection_by_address(peer_address): self.send_hci_packet( - HCI_Encryption_Change_Event( + hci.HCI_Encryption_Change_Event( status=0, connection_handle=connection.handle, encryption_enabled=1 ) ) def on_link_acl_data( - self, sender_address: Address, transport: PhysicalTransport, data: bytes + self, sender_address: hci.Address, transport: PhysicalTransport, data: bytes ) -> None: # Look for the connection to which this data belongs if transport == PhysicalTransport.LE: @@ -569,36 +537,38 @@ class Controller: # Send the data to the host # TODO: should fragment - acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data) + acl_packet = hci.HCI_AclDataPacket(connection.handle, 2, 0, len(data), data) self.send_hci_packet(acl_packet) - def on_link_advertising_data(self, sender_address: Address, data: bytes) -> None: + def on_link_advertising_data( + self, sender_address: hci.Address, data: bytes + ) -> None: # Ignore if we're not scanning if self.le_scan_enable == 0: return # Send a scan report - report = HCI_LE_Advertising_Report_Event.Report( - event_type=HCI_LE_Advertising_Report_Event.EventType.ADV_IND, + report = hci.HCI_LE_Advertising_Report_Event.Report( + event_type=hci.HCI_LE_Advertising_Report_Event.EventType.ADV_IND, address_type=sender_address.address_type, address=sender_address, data=data, rssi=-50, ) - self.send_hci_packet(HCI_LE_Advertising_Report_Event([report])) + self.send_hci_packet(hci.HCI_LE_Advertising_Report_Event([report])) # Simulate a scan response - report = HCI_LE_Advertising_Report_Event.Report( - event_type=HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP, + report = hci.HCI_LE_Advertising_Report_Event.Report( + event_type=hci.HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP, address_type=sender_address.address_type, address=sender_address, data=data, rssi=-50, ) - self.send_hci_packet(HCI_LE_Advertising_Report_Event([report])) + self.send_hci_packet(hci.HCI_LE_Advertising_Report_Event([report])) def on_link_cis_request( - self, central_address: Address, cig_id: int, cis_id: int + self, central_address: hci.Address, cig_id: int, cis_id: int ) -> None: ''' Called when an incoming CIS request occurs from a central on the link @@ -616,7 +586,7 @@ class Controller: self.peripheral_cis_links[pending_cis_link.handle] = pending_cis_link self.send_hci_packet( - HCI_LE_CIS_Request_Event( + hci.HCI_LE_CIS_Request_Event( acl_connection_handle=connection.handle, cis_connection_handle=pending_cis_link.handle, cig_id=cig_id, @@ -638,8 +608,8 @@ class Controller: ) self.send_hci_packet( - HCI_LE_CIS_Established_Event( - status=HCI_SUCCESS, + hci.HCI_LE_CIS_Established_Event( + status=hci.HCI_SUCCESS, connection_handle=cis_link.handle, # CIS parameters are ignored. cig_sync_delay=0, @@ -682,16 +652,16 @@ class Controller: ), None, ): - # Keep central CIS on disconnection. They should be removed by HCI_LE_Remove_CIG_Command. + # Keep central CIS on disconnection. They should be removed by hci.HCI_LE_Remove_CIG_Command. cis_link.acl_connection = None else: return self.send_hci_packet( - HCI_Disconnection_Complete_Event( - status=HCI_SUCCESS, + hci.HCI_Disconnection_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=cis_link.handle, - reason=HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, + reason=hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, ) ) @@ -700,10 +670,10 @@ class Controller: ############################################################ def on_classic_connection_request( - self, peer_address: Address, link_type: int + self, peer_address: hci.Address, link_type: int ) -> None: self.send_hci_packet( - HCI_Connection_Request_Event( + hci.HCI_Connection_Request_Event( bd_addr=peer_address, class_of_device=0, link_type=link_type, @@ -711,9 +681,9 @@ class Controller: ) def on_classic_connection_complete( - self, peer_address: Address, status: int + self, peer_address: hci.Address, status: int ) -> None: - if status == HCI_SUCCESS: + if status == hci.HCI_SUCCESS: # Allocate (or reuse) a connection handle peer_address = peer_address connection = self.classic_connections.get(peer_address) @@ -722,12 +692,12 @@ class Controller: connection = Connection( controller=self, handle=connection_handle, - # Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery - role=Role.CENTRAL, + # hci.Role doesn't matter in Classic because they are managed by hci.HCI_Role_Change and hci.HCI_Role_Discovery + role=hci.Role.CENTRAL, peer_address=peer_address, link=self.link, transport=PhysicalTransport.BR_EDR, - link_type=HCI_Connection_Complete_Event.LinkType.ACL, + link_type=hci.HCI_Connection_Complete_Event.LinkType.ACL, ) self.classic_connections[peer_address] = connection logger.debug( @@ -736,32 +706,32 @@ class Controller: else: connection_handle = connection.handle self.send_hci_packet( - HCI_Connection_Complete_Event( + hci.HCI_Connection_Complete_Event( status=status, connection_handle=connection_handle, bd_addr=peer_address, encryption_enabled=False, - link_type=HCI_Connection_Complete_Event.LinkType.ACL, + link_type=hci.HCI_Connection_Complete_Event.LinkType.ACL, ) ) else: connection = None self.send_hci_packet( - HCI_Connection_Complete_Event( + hci.HCI_Connection_Complete_Event( status=status, connection_handle=0, bd_addr=peer_address, encryption_enabled=False, - link_type=HCI_Connection_Complete_Event.LinkType.ACL, + link_type=hci.HCI_Connection_Complete_Event.LinkType.ACL, ) ) - def on_classic_disconnected(self, peer_address: Address, reason: int) -> None: + def on_classic_disconnected(self, peer_address: hci.Address, reason: int) -> None: # Send a disconnection complete event if connection := self.classic_connections.get(peer_address): self.send_hci_packet( - HCI_Disconnection_Complete_Event( - status=HCI_SUCCESS, + hci.HCI_Disconnection_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=connection.handle, reason=reason, ) @@ -772,26 +742,26 @@ class Controller: else: logger.warning(f'!!! No classic connection found for {peer_address}') - def on_classic_role_change(self, peer_address: Address, new_role: int) -> None: + def on_classic_role_change(self, peer_address: hci.Address, new_role: int) -> None: self.send_hci_packet( - HCI_Role_Change_Event( - status=HCI_SUCCESS, + hci.HCI_Role_Change_Event( + status=hci.HCI_SUCCESS, bd_addr=peer_address, new_role=new_role, ) ) def on_classic_sco_connection_complete( - self, peer_address: Address, status: int, link_type: int + self, peer_address: hci.Address, status: int, link_type: int ) -> None: - if status == HCI_SUCCESS: + if status == hci.HCI_SUCCESS: # Allocate (or reuse) a connection handle connection_handle = self.allocate_connection_handle() connection = Connection( controller=self, handle=connection_handle, - # Role doesn't matter in SCO. - role=Role.CENTRAL, + # hci.Role doesn't matter in SCO. + role=hci.Role.CENTRAL, peer_address=peer_address, link=self.link, transport=PhysicalTransport.BR_EDR, @@ -803,7 +773,7 @@ class Controller: connection_handle = 0 self.send_hci_packet( - HCI_Synchronous_Connection_Complete_Event( + hci.HCI_Synchronous_Connection_Complete_Event( status=status, connection_handle=connection_handle, bd_addr=peer_address, @@ -853,7 +823,7 @@ class Controller: ############################################################ def on_hci_command(self, command: hci.HCI_Command) -> Optional[bytes]: logger.warning(color(f'--- Unsupported command {command}', 'red')) - return bytes([HCI_UNKNOWN_HCI_COMMAND_ERROR]) + return bytes([hci.HCI_UNKNOWN_HCI_COMMAND_ERROR]) def on_hci_create_connection_command( self, command: hci.HCI_Create_Connection_Command @@ -869,8 +839,8 @@ class Controller: # Check that we don't already have a pending connection if self.link.get_pending_connection(): self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_CONTROLLER_BUSY_ERROR, + hci.HCI_Command_Status_Event( + status=hci.HCI_CONTROLLER_BUSY_ERROR, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -881,8 +851,8 @@ class Controller: # Say that the connection is pending self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_COMMAND_STATUS_PENDING, + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_STATUS_PENDING, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -897,8 +867,8 @@ class Controller: ''' # First, say that the disconnection is pending self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_COMMAND_STATUS_PENDING, + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_STATUS_PENDING, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -927,7 +897,7 @@ class Controller: self.link.classic_disconnect( self, connection.peer_address, - HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, + hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, ) else: # Remove the connection @@ -956,8 +926,8 @@ class Controller: if self.link is None: return None self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_SUCCESS, + hci.HCI_Command_Status_Event( + status=hci.HCI_SUCCESS, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -981,8 +951,8 @@ class Controller: ) ): self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, + hci.HCI_Command_Status_Event( + status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -990,14 +960,16 @@ class Controller: return None self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_SUCCESS, + hci.HCI_Command_Status_Event( + status=hci.HCI_SUCCESS, num_hci_command_packets=1, command_opcode=command.op_code, ) ) self.link.classic_sco_connect( - self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO + self, + connection.peer_address, + hci.HCI_Connection_Complete_Event.LinkType.ESCO, ) return None @@ -1013,8 +985,8 @@ class Controller: if not (connection := self.find_classic_connection_by_address(command.bd_addr)): self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, + hci.HCI_Command_Status_Event( + status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -1022,14 +994,16 @@ class Controller: return None self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_SUCCESS, + hci.HCI_Command_Status_Event( + status=hci.HCI_SUCCESS, num_hci_command_packets=1, command_opcode=command.op_code, ) ) self.link.classic_accept_sco_connection( - self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO + self, + connection.peer_address, + hci.HCI_Connection_Complete_Event.LinkType.ESCO, ) return None @@ -1051,14 +1025,14 @@ class Controller: self.send_hci_packet( hci.HCI_Command_Status_Event( - status=HCI_SUCCESS, + status=hci.HCI_SUCCESS, num_hci_command_packets=1, command_opcode=command.op_code, ) ) self.send_hci_packet( hci.HCI_Mode_Change_Event( - status=HCI_SUCCESS, + status=hci.HCI_SUCCESS, connection_handle=command.connection_handle, current_mode=hci.HCI_Mode_Change_Event.Mode.SNIFF, interval=2, @@ -1085,14 +1059,14 @@ class Controller: self.send_hci_packet( hci.HCI_Command_Status_Event( - status=HCI_SUCCESS, + status=hci.HCI_SUCCESS, num_hci_command_packets=1, command_opcode=command.op_code, ) ) self.send_hci_packet( hci.HCI_Mode_Change_Event( - status=HCI_SUCCESS, + status=hci.HCI_SUCCESS, connection_handle=command.connection_handle, current_mode=hci.HCI_Mode_Change_Event.Mode.ACTIVE, interval=2, @@ -1104,14 +1078,14 @@ class Controller: self, command: hci.HCI_Switch_Role_Command ) -> Optional[bytes]: ''' - See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command + See Bluetooth spec Vol 4, Part E - 7.2.8 Switch hci.Role command ''' if self.link is None: return None self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_SUCCESS, + hci.HCI_Command_Status_Event( + status=hci.HCI_SUCCESS, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -1128,14 +1102,14 @@ class Controller: self.event_mask = int.from_bytes( command.event_mask, byteorder='little', signed=False ) - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_reset_command(self, _command: hci.HCI_Reset_Command) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.2 Reset Command ''' # TODO: cleanup what needs to be reset - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_write_local_name_command( self, command: hci.HCI_Write_Local_Name_Command @@ -1152,7 +1126,7 @@ class Controller: self.local_name = str(local_name, 'utf-8') except UnicodeDecodeError: pass - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_read_local_name_command( self, _command: hci.HCI_Read_Local_Name_Command @@ -1164,7 +1138,7 @@ class Controller: if len(local_name) < 248: local_name = local_name + bytes(248 - len(local_name)) - return bytes([HCI_SUCCESS]) + local_name + return bytes([hci.HCI_SUCCESS]) + local_name def on_hci_read_class_of_device_command( self, _command: hci.HCI_Read_Class_Of_Device_Command @@ -1172,7 +1146,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command ''' - return bytes([HCI_SUCCESS, 0, 0, 0]) + return bytes([hci.HCI_SUCCESS, 0, 0, 0]) def on_hci_write_class_of_device_command( self, _command: hci.HCI_Write_Class_Of_Device_Command @@ -1180,7 +1154,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_read_synchronous_flow_control_enable_command( self, _command: hci.HCI_Read_Synchronous_Flow_Control_Enable_Command @@ -1193,7 +1167,7 @@ class Controller: ret = 1 else: ret = 0 - return bytes([HCI_SUCCESS, ret]) + return bytes([hci.HCI_SUCCESS, ret]) def on_hci_write_synchronous_flow_control_enable_command( self, command: hci.HCI_Write_Synchronous_Flow_Control_Enable_Command @@ -1202,13 +1176,13 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.3.37 Write Synchronous Flow Control Enable Command ''' - ret = HCI_SUCCESS + ret = hci.HCI_SUCCESS if command.synchronous_flow_control_enable == 1: self.sync_flow_control = True elif command.synchronous_flow_control_enable == 0: self.sync_flow_control = False else: - ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR + ret = hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR return bytes([ret]) def on_hci_set_controller_to_host_flow_control_command( @@ -1220,7 +1194,7 @@ class Controller: ''' # For now we just accept the command but ignore the values. # TODO: respect the passed in values. - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_host_buffer_size_command( self, _command: hci.HCI_Host_Buffer_Size_Command @@ -1230,7 +1204,7 @@ class Controller: ''' # For now we just accept the command but ignore the values. # TODO: respect the passed in values. - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_write_extended_inquiry_response_command( self, _command: hci.HCI_Write_Extended_Inquiry_Response_Command @@ -1239,7 +1213,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.3.56 Write Extended Inquiry Response Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_write_simple_pairing_mode_command( self, _command: hci.HCI_Write_Simple_Pairing_Mode_Command @@ -1247,7 +1221,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_set_event_mask_page_2_command( self, command: hci.HCI_Set_Event_Mask_Page_2_Command @@ -1258,7 +1232,7 @@ class Controller: self.event_mask_page_2 = int.from_bytes( command.event_mask_page_2, byteorder='little', signed=False ) - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_read_le_host_support_command( self, _command: hci.HCI_Read_LE_Host_Support_Command @@ -1266,7 +1240,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command ''' - return bytes([HCI_SUCCESS, 1, 0]) + return bytes([hci.HCI_SUCCESS, 1, 0]) def on_hci_write_le_host_support_command( self, _command: hci.HCI_Write_LE_Host_Support_Command @@ -1275,7 +1249,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command ''' # TODO / Just ignore for now - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_write_authenticated_payload_timeout_command( self, command: hci.HCI_Write_Authenticated_Payload_Timeout_Command @@ -1285,7 +1259,7 @@ class Controller: Command ''' # TODO - return struct.pack(' len(self.lmp_features): - return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) return ( bytes( [ # Status - HCI_SUCCESS, + hci.HCI_SUCCESS, # Page number command.page_number, # Max page number @@ -1350,7 +1324,7 @@ class Controller: ''' return struct.pack( '= command.subrate_max ): - return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_subrate_request_command( self, command: hci.HCI_LE_Subrate_Request_Command @@ -1398,7 +1372,7 @@ class Controller: or command.subrate_max < command.subrate_min or command.continuation_number >= command.subrate_max ): - return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) self.send_hci_packet( hci.HCI_Command_Status_Event( @@ -1429,7 +1403,7 @@ class Controller: self.le_event_mask = int.from_bytes( command.le_event_mask, byteorder='little', signed=False ) - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_read_buffer_size_command( self, _command: hci.HCI_LE_Read_Buffer_Size_Command @@ -1439,7 +1413,7 @@ class Controller: ''' return struct.pack( ' Optional[bytes]: ''' - See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random Address Command + See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random hci.Address Command ''' self.random_address = command.random_address - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_advertising_parameters_command( self, command: hci.HCI_LE_Set_Advertising_Parameters_Command @@ -1484,7 +1458,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.5 LE Set Advertising Parameters Command ''' self.advertising_parameters = command - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_read_advertising_physical_channel_tx_power_command( self, _command: hci.HCI_LE_Read_Advertising_Physical_Channel_Tx_Power_Command @@ -1493,7 +1467,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.6 LE Read Advertising Physical Channel Tx Power Command ''' - return bytes([HCI_SUCCESS, self.advertising_channel_tx_power]) + return bytes([hci.HCI_SUCCESS, self.advertising_channel_tx_power]) def on_hci_le_set_advertising_data_command( self, command: hci.HCI_LE_Set_Advertising_Data_Command @@ -1502,7 +1476,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.7 LE Set Advertising Data Command ''' self.advertising_data = command.advertising_data - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_scan_response_data_command( self, command: hci.HCI_LE_Set_Scan_Response_Data_Command @@ -1511,7 +1485,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.8 LE Set Scan Response Data Command ''' self.le_scan_response_data = command.scan_response_data - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_advertising_enable_command( self, command: hci.HCI_LE_Set_Advertising_Enable_Command @@ -1524,7 +1498,7 @@ class Controller: else: self.stop_advertising() - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_scan_parameters_command( self, command: hci.HCI_LE_Set_Scan_Parameters_Command @@ -1533,14 +1507,14 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command ''' if self.le_scan_enable: - return bytes([HCI_COMMAND_DISALLOWED_ERROR]) + return bytes([hci.HCI_COMMAND_DISALLOWED_ERROR]) self.le_scan_type = command.le_scan_type self.le_scan_interval = command.le_scan_interval self.le_scan_window = command.le_scan_window self.le_scan_own_address_type = hci.AddressType(command.own_address_type) self.le_scanning_filter_policy = command.scanning_filter_policy - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_scan_enable_command( self, command: hci.HCI_LE_Set_Scan_Enable_Command @@ -1550,7 +1524,7 @@ class Controller: ''' self.le_scan_enable = bool(command.le_scan_enable) self.filter_duplicates = bool(command.filter_duplicates) - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_create_connection_command( self, command: hci.HCI_LE_Create_Connection_Command @@ -1567,8 +1541,8 @@ class Controller: # Check that we don't already have a pending connection if self.link.get_pending_connection(): self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_COMMAND_DISALLOWED_ERROR, + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_DISALLOWED_ERROR, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -1580,8 +1554,8 @@ class Controller: # Say that the connection is pending self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_COMMAND_STATUS_PENDING, + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_STATUS_PENDING, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -1594,7 +1568,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.8.13 LE Create Connection Cancel Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_read_filter_accept_list_size_command( self, _command: hci.HCI_LE_Read_Filter_Accept_List_Size_Command @@ -1603,7 +1577,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.14 LE Read Filter Accept List Size Command ''' - return bytes([HCI_SUCCESS, self.filter_accept_list_size]) + return bytes([hci.HCI_SUCCESS, self.filter_accept_list_size]) def on_hci_le_clear_filter_accept_list_command( self, _command: hci.HCI_LE_Clear_Filter_Accept_List_Command @@ -1611,7 +1585,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.8.15 LE Clear Filter Accept List Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_add_device_to_filter_accept_list_command( self, _command: hci.HCI_LE_Add_Device_To_Filter_Accept_List_Command @@ -1620,7 +1594,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.16 LE Add Device To Filter Accept List Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_remove_device_from_filter_accept_list_command( self, _command: hci.HCI_LE_Remove_Device_From_Filter_Accept_List_Command @@ -1629,7 +1603,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.17 LE Remove Device From Filter Accept List Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_write_scan_enable_command( self, command: hci.HCI_Write_Scan_Enable_Command @@ -1638,7 +1612,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.3.18 Write Scan Enable Command ''' self.classic_scan_enable = command.scan_enable - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_read_remote_features_command( self, command: hci.HCI_LE_Read_Remote_Features_Command @@ -1651,8 +1625,8 @@ class Controller: if not self.find_connection_by_handle(handle): self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, + hci.HCI_Command_Status_Event( + status=hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -1661,8 +1635,8 @@ class Controller: # First, say that the command is pending self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_COMMAND_STATUS_PENDING, + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_STATUS_PENDING, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -1670,8 +1644,8 @@ class Controller: # Then send the remote features self.send_hci_packet( - HCI_LE_Read_Remote_Features_Complete_Event( - status=HCI_SUCCESS, + hci.HCI_LE_Read_Remote_Features_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=handle, le_features=bytes.fromhex('dd40000000000000'), ) @@ -1684,7 +1658,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.8.23 LE Rand Command ''' - return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64)) + return bytes([hci.HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64)) def on_hci_le_enable_encryption_command( self, command: hci.HCI_LE_Enable_Encryption_Command @@ -1702,7 +1676,7 @@ class Controller: ) ): logger.warning('connection not found') - return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) + return bytes([hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) # Notify that the connection is now encrypted self.link.on_connection_encrypted( @@ -1714,8 +1688,8 @@ class Controller: ) self.send_hci_packet( - HCI_Command_Status_Event( - status=HCI_COMMAND_STATUS_PENDING, + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_STATUS_PENDING, num_hci_command_packets=1, command_opcode=command.op_code, ) @@ -1729,7 +1703,7 @@ class Controller: ''' See Bluetooth spec Vol 4, Part E - 7.8.27 LE Read Supported States Command ''' - return bytes([HCI_SUCCESS]) + self.le_states + return bytes([hci.HCI_SUCCESS]) + self.le_states def on_hci_le_read_suggested_default_data_length_command( self, _command: hci.HCI_LE_Read_Suggested_Default_Data_Length_Command @@ -1740,7 +1714,7 @@ class Controller: ''' return struct.pack( ' Optional[bytes]: ''' - See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set Address Resolution Enable + See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set hci.Address Resolution Enable Command ''' - ret = HCI_SUCCESS + ret = hci.HCI_SUCCESS if command.address_resolution_enable == 1: self.le_address_resolution = True elif command.address_resolution_enable == 0: self.le_address_resolution = False else: - ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR + ret = hci.HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR return bytes([ret]) def on_hci_le_set_resolvable_private_address_timeout_command( self, command: hci.HCI_LE_Set_Resolvable_Private_Address_Timeout_Command ) -> Optional[bytes]: ''' - See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private Address + See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private hci.Address Timeout Command ''' self.le_rpa_timeout = command.rpa_timeout - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_read_maximum_data_length_command( self, _command: hci.HCI_LE_Read_Maximum_Data_Length_Command @@ -1825,7 +1799,7 @@ class Controller: ''' return struct.pack( ' Optional[bytes]: ''' - See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address + See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random hci.Address Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_extended_advertising_parameters_command( self, _command: hci.HCI_LE_Set_Extended_Advertising_Parameters_Command @@ -1873,7 +1847,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters Command ''' - return bytes([HCI_SUCCESS, 0]) + return bytes([hci.HCI_SUCCESS, 0]) def on_hci_le_set_extended_advertising_data_command( self, _command: hci.HCI_LE_Set_Extended_Advertising_Data_Command @@ -1882,7 +1856,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_extended_scan_response_data_command( self, _command: hci.HCI_LE_Set_Extended_Scan_Response_Data_Command @@ -1891,7 +1865,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_extended_advertising_enable_command( self, _command: hci.HCI_LE_Set_Extended_Advertising_Enable_Command @@ -1900,7 +1874,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable Command ''' - return bytes([HCI_SUCCESS]) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_read_maximum_advertising_data_length_command( self, _command: hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command @@ -1909,7 +1883,7 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data Length Command ''' - return struct.pack(' controller.Controller | None: for controller in self.controllers: if controller.random_address == address: return controller return None def find_classic_controller( - self, address: Address + self, address: hci.Address ) -> Optional[controller.Controller]: for controller in self.controllers: if controller.public_address == address: @@ -100,13 +85,19 @@ class LocalLink: def on_address_changed(self, controller): pass - def send_advertising_data(self, sender_address, data): + def send_advertising_data(self, sender_address: hci.Address, data: bytes): # Send the advertising data to all controllers, except the sender for controller in self.controllers: if controller.random_address != sender_address: controller.on_link_advertising_data(sender_address, data) - def send_acl_data(self, sender_controller, destination_address, transport, data): + def send_acl_data( + self, + sender_controller: controller.Controller, + destination_address: hci.Address, + transport: core.PhysicalTransport, + data: bytes, + ): # Send the data to the first controller with a matching address if transport == core.PhysicalTransport.LE: destination_controller = self.find_controller(destination_address) @@ -120,7 +111,7 @@ class LocalLink: if destination_controller is not None: destination_controller.on_link_acl_data(source_address, transport, data) - def on_connection_complete(self): + def on_connection_complete(self) -> None: # Check that we expect this call if not self.pending_connection: logger.warning('on_connection_complete with no pending connection') @@ -139,17 +130,21 @@ class LocalLink: le_create_connection_command.peer_address ): central_controller.on_link_peripheral_connection_complete( - le_create_connection_command, HCI_SUCCESS + le_create_connection_command, hci.HCI_SUCCESS ) peripheral_controller.on_link_central_connected(central_address) return # No peripheral found central_controller.on_link_peripheral_connection_complete( - le_create_connection_command, HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR + le_create_connection_command, hci.HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR ) - def connect(self, central_address, le_create_connection_command): + def connect( + self, + central_address: hci.Address, + le_create_connection_command: hci.HCI_LE_Create_Connection_Command, + ): logger.debug( f'$$$ CONNECTION {central_address} -> ' f'{le_create_connection_command.peer_address}' @@ -158,7 +153,10 @@ class LocalLink: asyncio.get_running_loop().call_soon(self.on_connection_complete) def on_disconnection_complete( - self, initiating_address, target_address, disconnect_command + self, + initiating_address: hci.Address, + target_address: hci.Address, + disconnect_command: hci.HCI_Disconnect_Command, ): # Find the controller that initiated the disconnection if not (initiating_controller := self.find_controller(initiating_address)): @@ -172,20 +170,32 @@ class LocalLink: ) initiating_controller.on_link_disconnection_complete( - disconnect_command, HCI_SUCCESS + disconnect_command, hci.HCI_SUCCESS ) - def disconnect(self, initiating_address, target_address, disconnect_command): + def disconnect( + self, + initiating_address: hci.Address, + target_address: hci.Address, + disconnect_command: hci.HCI_Disconnect_Command, + ): logger.debug( f'$$$ DISCONNECTION {initiating_address} -> ' f'{target_address}: reason = {disconnect_command.reason}' ) - args = [initiating_address, target_address, disconnect_command] - asyncio.get_running_loop().call_soon(self.on_disconnection_complete, *args) + asyncio.get_running_loop().call_soon( + lambda: self.on_disconnection_complete( + initiating_address, target_address, disconnect_command + ) + ) - # pylint: disable=too-many-arguments def on_connection_encrypted( - self, central_address, peripheral_address, rand, ediv, ltk + self, + central_address: hci.Address, + peripheral_address: hci.Address, + rand: bytes, + ediv: int, + ltk: bytes, ): logger.debug(f'*** ENCRYPTION {central_address} -> {peripheral_address}') @@ -198,7 +208,7 @@ class LocalLink: def create_cis( self, central_controller: controller.Controller, - peripheral_address: Address, + peripheral_address: hci.Address, cig_id: int, cis_id: int, ) -> None: @@ -216,7 +226,7 @@ class LocalLink: def accept_cis( self, peripheral_controller: controller.Controller, - central_address: Address, + central_address: hci.Address, cig_id: int, cis_id: int, ) -> None: @@ -224,17 +234,16 @@ class LocalLink: f'$$$ CIS Accept {peripheral_controller.random_address} -> {central_address}' ) if central_controller := self.find_controller(central_address): - asyncio.get_running_loop().call_soon( - central_controller.on_link_cis_established, cig_id, cis_id - ) - asyncio.get_running_loop().call_soon( + loop = asyncio.get_running_loop() + loop.call_soon(central_controller.on_link_cis_established, cig_id, cis_id) + loop.call_soon( peripheral_controller.on_link_cis_established, cig_id, cis_id ) def disconnect_cis( self, initiator_controller: controller.Controller, - peer_address: Address, + peer_address: hci.Address, cig_id: int, cis_id: int, ) -> None: @@ -242,36 +251,42 @@ class LocalLink: f'$$$ CIS Disconnect {initiator_controller.random_address} -> {peer_address}' ) if peer_controller := self.find_controller(peer_address): - asyncio.get_running_loop().call_soon( + loop = asyncio.get_running_loop() + loop.call_soon( initiator_controller.on_link_cis_disconnected, cig_id, cis_id ) - asyncio.get_running_loop().call_soon( - peer_controller.on_link_cis_disconnected, cig_id, cis_id - ) + loop.call_soon(peer_controller.on_link_cis_disconnected, cig_id, cis_id) ############################################################ # Classic handlers ############################################################ - def classic_connect(self, initiator_controller, responder_address): + def classic_connect( + self, + initiator_controller: controller.Controller, + responder_address: hci.Address, + ): logger.debug( f'[Classic] {initiator_controller.public_address} connects to {responder_address}' ) responder_controller = self.find_classic_controller(responder_address) if responder_controller is None: initiator_controller.on_classic_connection_complete( - responder_address, HCI_PAGE_TIMEOUT_ERROR + responder_address, hci.HCI_PAGE_TIMEOUT_ERROR ) return self.pending_classic_connection = (initiator_controller, responder_controller) responder_controller.on_classic_connection_request( initiator_controller.public_address, - HCI_Connection_Complete_Event.LinkType.ACL, + hci.HCI_Connection_Complete_Event.LinkType.ACL, ) def classic_accept_connection( - self, responder_controller, initiator_address, responder_role + self, + responder_controller: controller.Controller, + initiator_address: hci.Address, + responder_role: int, ): logger.debug( f'[Classic] {responder_controller.public_address} accepts to connect {initiator_address}' @@ -279,55 +294,64 @@ class LocalLink: initiator_controller = self.find_classic_controller(initiator_address) if initiator_controller is None: responder_controller.on_classic_connection_complete( - responder_controller.public_address, HCI_PAGE_TIMEOUT_ERROR + responder_controller.public_address, hci.HCI_PAGE_TIMEOUT_ERROR ) return - async def task(): - if responder_role != Role.PERIPHERAL: + def connection_complete() -> None: + if responder_role != hci.Role.PERIPHERAL: initiator_controller.on_classic_role_change( responder_controller.public_address, int(not (responder_role)) ) initiator_controller.on_classic_connection_complete( - responder_controller.public_address, HCI_SUCCESS + responder_controller.public_address, hci.HCI_SUCCESS ) - asyncio.create_task(task()) responder_controller.on_classic_role_change( initiator_controller.public_address, responder_role ) responder_controller.on_classic_connection_complete( - initiator_controller.public_address, HCI_SUCCESS + initiator_controller.public_address, hci.HCI_SUCCESS ) self.pending_classic_connection = None + asyncio.get_running_loop().call_soon(connection_complete) - def classic_disconnect(self, initiator_controller, responder_address, reason): + def classic_disconnect( + self, + initiator_controller: controller.Controller, + responder_address: hci.Address, + reason: int, + ): logger.debug( f'[Classic] {initiator_controller.public_address} disconnects {responder_address}' ) responder_controller = self.find_classic_controller(responder_address) + assert responder_controller - async def task(): - initiator_controller.on_classic_disconnected(responder_address, reason) - - asyncio.create_task(task()) + asyncio.get_running_loop().call_soon( + lambda: initiator_controller.on_classic_disconnected( + responder_address, reason + ) + ) responder_controller.on_classic_disconnected( initiator_controller.public_address, reason ) def classic_switch_role( - self, initiator_controller, responder_address, initiator_new_role + self, + initiator_controller: controller.Controller, + responder_address: hci.Address, + initiator_new_role: int, ): responder_controller = self.find_classic_controller(responder_address) if responder_controller is None: return - async def task(): - initiator_controller.on_classic_role_change( + asyncio.get_running_loop().call_soon( + lambda: initiator_controller.on_classic_role_change( responder_address, initiator_new_role ) - - asyncio.create_task(task()) + ) responder_controller.on_classic_role_change( initiator_controller.public_address, int(not (initiator_new_role)) ) @@ -335,7 +359,7 @@ class LocalLink: def classic_sco_connect( self, initiator_controller: controller.Controller, - responder_address: Address, + responder_address: hci.Address, link_type: int, ): logger.debug( @@ -353,7 +377,7 @@ class LocalLink: def classic_accept_sco_connection( self, responder_controller: controller.Controller, - initiator_address: Address, + initiator_address: hci.Address, link_type: int, ): logger.debug( @@ -363,17 +387,16 @@ class LocalLink: if initiator_controller is None: responder_controller.on_classic_sco_connection_complete( responder_controller.public_address, - HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, + hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, link_type, ) return - async def task(): - initiator_controller.on_classic_sco_connection_complete( - responder_controller.public_address, HCI_SUCCESS, link_type + asyncio.get_running_loop().call_soon( + lambda: initiator_controller.on_classic_sco_connection_complete( + responder_controller.public_address, hci.HCI_SUCCESS, link_type ) - - asyncio.create_task(task()) - responder_controller.on_classic_sco_connection_complete( - initiator_controller.public_address, HCI_SUCCESS, link_type + ) + responder_controller.on_classic_sco_connection_complete( + initiator_controller.public_address, hci.HCI_SUCCESS, link_type )