diff --git a/bumble/controller.py b/bumble/controller.py index 983692f..1153ec0 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -101,10 +101,10 @@ class Connection: transport: int link_type: int - def __post_init__(self): + def __post_init__(self) -> None: self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) - def on_hci_acl_data_packet(self, packet): + def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: self.assembler.feed_packet(packet) self.controller.send_hci_packet( HCI_Number_Of_Completed_Packets_Event( @@ -112,15 +112,81 @@ class Connection: ) ) - def on_acl_pdu(self, data): + def on_acl_pdu(self, pdu: bytes) -> None: if self.link: self.link.send_acl_data( - self.controller, self.peer_address, self.transport, data + self.controller, self.peer_address, self.transport, pdu ) # ----------------------------------------------------------------------------- class Controller: + hci_sink: Optional[TransportSink] = None + + central_connections: dict[ + Address, Connection + ] # Connections where this controller is the central + peripheral_connections: dict[ + Address, Connection + ] # Connections where this controller is the peripheral + classic_connections: dict[Address, Connection] # Connections in BR/EDR + central_cis_links: dict[int, CisLink] # CIS links by handle + peripheral_cis_links: dict[int, CisLink] # CIS links by handle + + hci_version: int = HCI_VERSION_BLUETOOTH_CORE_5_0 + hci_revision: int = 0 + lmp_version: int = HCI_VERSION_BLUETOOTH_CORE_5_0 + lmp_subversion: int = 0 + lmp_features: bytes = bytes.fromhex( + '0000000060000000' + ) # BR/EDR Not Supported, LE Supported (Controller) + manufacturer_name: int = 0xFFFF + acl_data_packet_length: int = 27 + total_num_acl_data_packets: int = 64 + le_acl_data_packet_length: int = 27 + total_num_le_acl_data_packets: int = 64 + iso_data_packet_length: int = 960 + total_num_iso_data_packets: int = 64 + event_mask: int = 0 + event_mask_page_2: int = 0 + supported_commands: bytes = bytes.fromhex( + '2000800000c000000000e4000000a822000000000000040000f7ffff7f000000' + '30f0f9ff01008004002000000000000000000000000000000000000000000000' + ) + le_event_mask: int = 0 + advertising_parameters: Optional[hci.HCI_LE_Set_Advertising_Parameters_Command] = ( + None + ) + le_features: bytes = bytes.fromhex('ff49010000000000') + le_states: bytes = bytes.fromhex('ffff3fffff030000') + advertising_channel_tx_power: int = 0 + filter_accept_list_size: int = 8 + filter_duplicates: bool = False + resolving_list_size: int = 8 + supported_max_tx_octets: int = 27 + supported_max_tx_time: int = 10000 + supported_max_rx_octets: int = 27 + supported_max_rx_time: int = 10000 + suggested_max_tx_octets: int = 27 + suggested_max_tx_time: int = 0x0148 + default_phy: dict[str, int] + le_scan_type: int = 0 + le_scan_interval: int = 0x10 + le_scan_window: int = 0x10 + le_scan_enable: int = 0 + le_scan_own_address_type: int = Address.RANDOM_DEVICE_ADDRESS + le_scanning_filter_policy: int = 0 + le_scan_response_data: Optional[bytes] = None + le_address_resolution: bool = False + le_rpa_timeout: int = 0 + sync_flow_control: bool = False + local_name: str = 'Bumble' + advertising_interval: int = 2000 + advertising_data: Optional[bytes] = None + advertising_timer_handle: Optional[asyncio.Handle] = None + + _random_address: 'Address' = Address('00:00:00:00:00:00') + def __init__( self, name: str, @@ -128,75 +194,20 @@ class Controller: host_sink: Optional[TransportSink] = None, link: Optional[LocalLink] = None, public_address: Optional[Union[bytes, str, Address]] = None, - ): + ) -> None: self.name = name - self.hci_sink = None self.link = link + self.central_connections = {} + self.peripheral_connections = {} + self.classic_connections = {} + self.central_cis_links = {} + self.peripheral_cis_links = {} + self.default_phy = { + 'all_phys': 0, + 'tx_phys': 0, + 'rx_phys': 0, + } - self.central_connections: dict[Address, Connection] = ( - {} - ) # Connections where this controller is the central - self.peripheral_connections: dict[Address, Connection] = ( - {} - ) # Connections where this controller is the peripheral - self.classic_connections: dict[Address, Connection] = ( - {} - ) # Connections in BR/EDR - self.central_cis_links: dict[int, CisLink] = {} # CIS links by handle - self.peripheral_cis_links: dict[int, CisLink] = {} # CIS links by handle - - self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0 - self.hci_revision = 0 - self.lmp_version = HCI_VERSION_BLUETOOTH_CORE_5_0 - self.lmp_subversion = 0 - self.lmp_features = bytes.fromhex( - '0000000060000000' - ) # BR/EDR Not Supported, LE Supported (Controller) - self.manufacturer_name = 0xFFFF - self.acl_data_packet_length = 27 - self.total_num_acl_data_packets = 64 - self.le_acl_data_packet_length = 27 - self.total_num_le_acl_data_packets = 64 - self.iso_data_packet_length = 960 - self.total_num_iso_data_packets = 64 - self.event_mask = 0 - self.event_mask_page_2 = 0 - self.supported_commands = bytes.fromhex( - '2000800000c000000000e4000000a822000000000000040000f7ffff7f000000' - '30f0f9ff01008004002000000000000000000000000000000000000000000000' - ) - self.le_event_mask = 0 - self.advertising_parameters = None - self.le_features = bytes.fromhex('ff49010000000000') - self.le_states = bytes.fromhex('ffff3fffff030000') - self.advertising_channel_tx_power = 0 - self.filter_accept_list_size = 8 - self.filter_duplicates = False - self.resolving_list_size = 8 - self.supported_max_tx_octets = 27 - self.supported_max_tx_time = 10000 # microseconds - self.supported_max_rx_octets = 27 - self.supported_max_rx_time = 10000 # microseconds - self.suggested_max_tx_octets = 27 - self.suggested_max_tx_time = 0x0148 # microseconds - self.default_phy = bytes([0, 0, 0]) - self.le_scan_type = 0 - self.le_scan_interval = 0x10 - self.le_scan_window = 0x10 - self.le_scan_enable = 0 - self.le_scan_own_address_type = Address.RANDOM_DEVICE_ADDRESS - self.le_scanning_filter_policy = 0 - self.le_scan_response_data = None - self.le_address_resolution = False - self.le_rpa_timeout = 0 - self.sync_flow_control = False - self.local_name = 'Bumble' - - self.advertising_interval = 2000 # Fixed for now - self.advertising_data = None - self.advertising_timer_handle = None - - self._random_address = Address('00:00:00:00:00:00') if isinstance(public_address, Address): self._public_address = public_address elif public_address is not None: @@ -215,44 +226,44 @@ class Controller: if link: link.add_controller(self) - self.terminated = asyncio.get_running_loop().create_future() + self.terminated: asyncio.Future[Any] = ( + asyncio.get_running_loop().create_future() + ) @property - def host(self): + def host(self) -> Optional[TransportSink]: return self.hci_sink @host.setter - def host(self, host): + def host(self, host: Optional[TransportSink]) -> None: ''' Sets the host (sink) for this controller, and set this controller as the controller (sink) for the host ''' self.set_packet_sink(host) - if host: - host.controller = self - def set_packet_sink(self, sink): + def set_packet_sink(self, sink: Optional[TransportSink]) -> None: ''' Method from the Packet Source interface ''' self.hci_sink = sink @property - def public_address(self): + def public_address(self) -> Address: return self._public_address @public_address.setter - def public_address(self, address): + def public_address(self, address: Union[Address, str]) -> None: if isinstance(address, str): address = Address(address) self._public_address = address @property - def random_address(self): + def random_address(self) -> Address: return self._random_address @random_address.setter - def random_address(self, address): + def random_address(self, address: Union[Address, str]) -> None: if isinstance(address, str): address = Address(address) self._random_address = address @@ -262,29 +273,29 @@ class Controller: self.link.on_address_changed(self) # Packet Sink protocol (packets coming from the host via HCI) - def on_packet(self, packet): + def on_packet(self, packet: bytes) -> None: self.on_hci_packet(HCI_Packet.from_bytes(packet)) - def on_hci_packet(self, packet): + def on_hci_packet(self, packet: HCI_Packet) -> None: logger.debug( f'{color("<<<", "blue")} [{self.name}] ' f'{color("HOST -> CONTROLLER", "blue")}: {packet}' ) # If the packet is a command, invoke the handler for this packet - if packet.hci_packet_type == HCI_COMMAND_PACKET: + if isinstance(packet, hci.HCI_Command): self.on_hci_command_packet(packet) - elif packet.hci_packet_type == HCI_EVENT_PACKET: - self.on_hci_event_packet(packet) - elif packet.hci_packet_type == HCI_ACL_DATA_PACKET: + elif isinstance(packet, hci.HCI_AclDataPacket): self.on_hci_acl_data_packet(packet) + elif isinstance(packet, hci.HCI_Event): + self.on_hci_event_packet(packet) else: logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') - def on_hci_command_packet(self, command): + def on_hci_command_packet(self, command: hci.HCI_Command) -> None: handler_name = f'on_{command.name.lower()}' handler = getattr(self, handler_name, self.on_hci_command) - result = handler(command) + result: Optional[bytes] = handler(command) if isinstance(result, bytes): self.send_hci_packet( HCI_Command_Complete_Event( @@ -294,10 +305,10 @@ class Controller: ) ) - def on_hci_event_packet(self, _event): + def on_hci_event_packet(self, _event: HCI_Packet) -> None: logger.warning('!!! unexpected event packet') - def on_hci_acl_data_packet(self, packet): + def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: # Look for the connection to which this data belongs connection = self.find_connection_by_handle(packet.connection_handle) if connection is None: @@ -309,7 +320,7 @@ class Controller: # Pass the packet to the connection connection.on_hci_acl_data_packet(packet) - def send_hci_packet(self, packet): + def send_hci_packet(self, packet: HCI_Packet) -> None: logger.debug( f'{color(">>>", "green")} [{self.name}] ' f'{color("CONTROLLER -> HOST", "green")}: {packet}' @@ -318,7 +329,7 @@ class Controller: self.host.on_packet(bytes(packet)) # This method allows the controller to emulate the same API as a transport source - async def wait_for_termination(self): + async def wait_for_termination(self) -> None: await self.terminated ############################################################ @@ -345,15 +356,17 @@ class Controller: handle = max_handle + 1 return handle - def find_le_connection_by_address(self, address): + def find_le_connection_by_address(self, address: Address) -> Optional[Connection]: return self.central_connections.get(address) or self.peripheral_connections.get( address ) - def find_classic_connection_by_address(self, address): + def find_classic_connection_by_address( + self, address: Address + ) -> Optional[Connection]: return self.classic_connections.get(address) - def find_connection_by_handle(self, handle): + def find_connection_by_handle(self, handle: int) -> Optional[Connection]: for connection in itertools.chain( self.central_connections.values(), self.peripheral_connections.values(), @@ -363,19 +376,19 @@ class Controller: return connection return None - def find_central_connection_by_handle(self, handle): + def find_central_connection_by_handle(self, handle: int) -> Optional[Connection]: for connection in self.central_connections.values(): if connection.handle == handle: return connection return None - def find_peripheral_connection_by_handle(self, handle): + def find_peripheral_connection_by_handle(self, handle: int) -> Optional[Connection]: for connection in self.peripheral_connections.values(): if connection.handle == handle: return connection return None - def find_classic_connection_by_handle(self, handle): + def find_classic_connection_by_handle(self, handle: int) -> Optional[Connection]: for connection in self.classic_connections.values(): if connection.handle == handle: return connection @@ -386,7 +399,7 @@ class Controller: handle ) - def on_link_central_connected(self, central_address): + def on_link_central_connected(self, central_address: Address) -> None: ''' Called when an incoming connection occurs from a central on the link ''' @@ -424,7 +437,7 @@ class Controller: ) ) - def on_link_disconnected(self, peer_address, reason): + def on_link_disconnected(self, peer_address: Address, reason: int) -> None: ''' Called when an active disconnection occurs from a peer ''' @@ -456,8 +469,10 @@ class Controller: logger.warning(f'!!! No peripheral connection found for {peer_address}') def on_link_peripheral_connection_complete( - self, le_create_connection_command, status - ): + self, + le_create_connection_command: hci.HCI_LE_Create_Connection_Command, + status: int, + ) -> None: ''' Called by the link when a connection has been made or has failed to be made ''' @@ -500,7 +515,9 @@ class Controller: ) ) - def on_link_disconnection_complete(self, disconnection_command, status): + def on_link_disconnection_complete( + self, disconnection_command: hci.HCI_Disconnect_Command, status: int + ) -> None: ''' Called when a disconnection has been completed ''' @@ -526,7 +543,9 @@ class Controller: logger.debug(f'PERIPHERAL Connection removed: {connection}') del self.peripheral_connections[connection.peer_address] - def on_link_encrypted(self, peer_address, _rand, _ediv, _ltk): + def on_link_encrypted( + self, peer_address: Address, _rand: int, _ediv: int, _ltk: bytes + ) -> None: # For now, just setup the encryption without asking the host if connection := self.find_le_connection_by_address(peer_address): self.send_hci_packet( @@ -535,7 +554,9 @@ class Controller: ) ) - def on_link_acl_data(self, sender_address, transport, data): + def on_link_acl_data( + self, sender_address: Address, transport: PhysicalTransport, data: bytes + ) -> None: # Look for the connection to which this data belongs if transport == PhysicalTransport.LE: connection = self.find_le_connection_by_address(sender_address) @@ -550,7 +571,7 @@ class Controller: acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data) self.send_hci_packet(acl_packet) - def on_link_advertising_data(self, sender_address: Address, data: bytes): + def on_link_advertising_data(self, sender_address: Address, data: bytes) -> None: # Ignore if we're not scanning if self.le_scan_enable == 0: return @@ -677,7 +698,9 @@ class Controller: # Classic link connections ############################################################ - def on_classic_connection_request(self, peer_address, link_type): + def on_classic_connection_request( + self, peer_address: Address, link_type: int + ) -> None: self.send_hci_packet( HCI_Connection_Request_Event( bd_addr=peer_address, @@ -686,7 +709,9 @@ class Controller: ) ) - def on_classic_connection_complete(self, peer_address, status): + def on_classic_connection_complete( + self, peer_address: Address, status: int + ) -> None: if status == HCI_SUCCESS: # Allocate (or reuse) a connection handle peer_address = peer_address @@ -730,7 +755,7 @@ class Controller: ) ) - def on_classic_disconnected(self, peer_address, reason): + def on_classic_disconnected(self, peer_address: Address, reason: int) -> None: # Send a disconnection complete event if connection := self.classic_connections.get(peer_address): self.send_hci_packet( @@ -746,7 +771,7 @@ class Controller: else: logger.warning(f'!!! No classic connection found for {peer_address}') - def on_classic_role_change(self, peer_address, new_role): + def on_classic_role_change(self, peer_address: Address, new_role: int) -> None: self.send_hci_packet( HCI_Role_Change_Event( status=HCI_SUCCESS, @@ -757,7 +782,7 @@ class Controller: def on_classic_sco_connection_complete( self, peer_address: Address, status: int, link_type: int - ): + ) -> None: if status == HCI_SUCCESS: # Allocate (or reuse) a connection handle connection_handle = self.allocate_connection_handle() @@ -794,13 +819,13 @@ class Controller: ############################################################ # Advertising support ############################################################ - def on_advertising_timer_fired(self): + def on_advertising_timer_fired(self) -> None: self.send_advertising_data() self.advertising_timer_handle = asyncio.get_running_loop().call_later( self.advertising_interval / 1000.0, self.on_advertising_timer_fired ) - def start_advertising(self): + def start_advertising(self) -> None: # Stop any ongoing advertising before we start again self.stop_advertising() @@ -809,33 +834,35 @@ class Controller: self.on_advertising_timer_fired ) - def stop_advertising(self): + def stop_advertising(self) -> None: if self.advertising_timer_handle is not None: self.advertising_timer_handle.cancel() self.advertising_timer_handle = None - def send_advertising_data(self): + def send_advertising_data(self) -> None: if self.link and self.advertising_data: self.link.send_advertising_data(self.random_address, self.advertising_data) @property - def is_advertising(self): + def is_advertising(self) -> bool: return self.advertising_timer_handle is not None ############################################################ # HCI handlers ############################################################ - def on_hci_command(self, command): + def on_hci_command(self, command: hci.HCI_Command) -> Optional[bytes]: logger.warning(color(f'--- Unsupported command {command}', 'red')) return bytes([HCI_UNKNOWN_HCI_COMMAND_ERROR]) - def on_hci_create_connection_command(self, command): + def on_hci_create_connection_command( + self, command: hci.HCI_Create_Connection_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.1.5 Create Connection command ''' if self.link is None: - return + return None logger.debug(f'Connection request to {command.bd_addr}') # Check that we don't already have a pending connection @@ -847,7 +874,7 @@ class Controller: command_opcode=command.op_code, ) ) - return + return None self.link.classic_connect(self, command.bd_addr) @@ -859,8 +886,11 @@ class Controller: command_opcode=command.op_code, ) ) + return None - def on_hci_disconnect_command(self, command): + def on_hci_disconnect_command( + self, command: hci.HCI_Disconnect_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.1.6 Disconnect Command ''' @@ -904,7 +934,7 @@ class Controller: elif cis_link := ( self.central_cis_links.get(handle) or self.peripheral_cis_links.get(handle) ): - if self.link: + if self.link and cis_link.acl_connection: self.link.disconnect_cis( initiator_controller=self, peer_address=cis_link.acl_connection.peer_address, @@ -913,13 +943,17 @@ class Controller: ) # Spec requires handle to be kept after disconnection. - def on_hci_accept_connection_request_command(self, command): + return None + + def on_hci_accept_connection_request_command( + self, command: hci.HCI_Accept_Connection_Request_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.1.8 Accept Connection Request command ''' if self.link is None: - return + return None self.send_hci_packet( HCI_Command_Status_Event( status=HCI_SUCCESS, @@ -928,14 +962,17 @@ class Controller: ) ) self.link.classic_accept_connection(self, command.bd_addr, command.role) + return None - def on_hci_enhanced_setup_synchronous_connection_command(self, command): + def on_hci_enhanced_setup_synchronous_connection_command( + self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.1.45 Enhanced Setup Synchronous Connection command ''' if self.link is None: - return + return None if not ( connection := self.find_classic_connection_by_handle( @@ -949,7 +986,7 @@ class Controller: command_opcode=command.op_code, ) ) - return + return None self.send_hci_packet( HCI_Command_Status_Event( @@ -961,14 +998,17 @@ class Controller: self.link.classic_sco_connect( self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO ) + return None - def on_hci_enhanced_accept_synchronous_connection_request_command(self, command): + def on_hci_enhanced_accept_synchronous_connection_request_command( + self, command: hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.1.46 Enhanced Accept Synchronous Connection Request command ''' if self.link is None: - return + return None if not (connection := self.find_classic_connection_by_address(command.bd_addr)): self.send_hci_packet( @@ -978,7 +1018,7 @@ class Controller: command_opcode=command.op_code, ) ) - return + return None self.send_hci_packet( HCI_Command_Status_Event( @@ -990,8 +1030,11 @@ class Controller: self.link.classic_accept_sco_connection( self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO ) + return None - def on_hci_sniff_mode_command(self, command: hci.HCI_Sniff_Mode_Command): + def on_hci_sniff_mode_command( + self, command: hci.HCI_Sniff_Mode_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.2.2 Sniff Mode command ''' @@ -1003,7 +1046,7 @@ class Controller: command_opcode=command.op_code, ) ) - return + return None self.send_hci_packet( hci.HCI_Command_Status_Event( @@ -1020,8 +1063,11 @@ class Controller: interval=2, ) ) + return None - def on_hci_exit_sniff_mode_command(self, command: hci.HCI_Exit_Sniff_Mode_Command): + def on_hci_exit_sniff_mode_command( + self, command: hci.HCI_Exit_Sniff_Mode_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.2.3 Exit Sniff Mode command ''' @@ -1034,7 +1080,7 @@ class Controller: command_opcode=command.op_code, ) ) - return + return None self.send_hci_packet( hci.HCI_Command_Status_Event( @@ -1051,14 +1097,17 @@ class Controller: interval=2, ) ) + return None - def on_hci_switch_role_command(self, command: hci.HCI_Switch_Role_Command): + def on_hci_switch_role_command( + self, command: hci.HCI_Switch_Role_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command ''' if self.link is None: - return + return None self.send_hci_packet( HCI_Command_Status_Event( status=HCI_SUCCESS, @@ -1067,22 +1116,29 @@ class Controller: ) ) self.link.classic_switch_role(self, command.bd_addr, command.role) + return None - def on_hci_set_event_mask_command(self, command): + def on_hci_set_event_mask_command( + self, command: hci.HCI_Set_Event_Mask_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.1 Set Event Mask Command ''' - self.event_mask = command.event_mask + self.event_mask = int.from_bytes( + command.event_mask, byteorder='little', signed=False + ) return bytes([HCI_SUCCESS]) - def on_hci_reset_command(self, _command): + def on_hci_reset_command(self, _command: hci.HCI_Reset_Command) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.2 Reset Command ''' # TODO: cleanup what needs to be reset return bytes([HCI_SUCCESS]) - def on_hci_write_local_name_command(self, command): + def on_hci_write_local_name_command( + self, command: hci.HCI_Write_Local_Name_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.11 Write Local Name Command ''' @@ -1097,7 +1153,9 @@ class Controller: pass return bytes([HCI_SUCCESS]) - def on_hci_read_local_name_command(self, _command): + def on_hci_read_local_name_command( + self, _command: hci.HCI_Read_Local_Name_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.12 Read Local Name Command ''' @@ -1107,19 +1165,25 @@ class Controller: return bytes([HCI_SUCCESS]) + local_name - def on_hci_read_class_of_device_command(self, _command): + def on_hci_read_class_of_device_command( + self, _command: hci.HCI_Read_Class_Of_Device_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command ''' return bytes([HCI_SUCCESS, 0, 0, 0]) - def on_hci_write_class_of_device_command(self, _command): + def on_hci_write_class_of_device_command( + self, _command: hci.HCI_Write_Class_Of_Device_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command ''' return bytes([HCI_SUCCESS]) - def on_hci_read_synchronous_flow_control_enable_command(self, _command): + def on_hci_read_synchronous_flow_control_enable_command( + self, _command: hci.HCI_Read_Synchronous_Flow_Control_Enable_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.36 Read Synchronous Flow Control Enable Command @@ -1130,7 +1194,9 @@ class Controller: ret = 0 return bytes([HCI_SUCCESS, ret]) - def on_hci_write_synchronous_flow_control_enable_command(self, command): + def on_hci_write_synchronous_flow_control_enable_command( + self, command: hci.HCI_Write_Synchronous_Flow_Control_Enable_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.37 Write Synchronous Flow Control Enable Command @@ -1144,7 +1210,9 @@ class Controller: ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR return bytes([ret]) - def on_hci_set_controller_to_host_flow_control_command(self, _command): + def on_hci_set_controller_to_host_flow_control_command( + self, _command: hci.HCI_Set_Controller_To_Host_Flow_Control_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.38 Set Controller To Host Flow Control Command @@ -1153,7 +1221,9 @@ class Controller: # TODO: respect the passed in values. return bytes([HCI_SUCCESS]) - def on_hci_host_buffer_size_command(self, _command): + def on_hci_host_buffer_size_command( + self, _command: hci.HCI_Host_Buffer_Size_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.39 Host Buffer Size Command ''' @@ -1161,40 +1231,54 @@ class Controller: # TODO: respect the passed in values. return bytes([HCI_SUCCESS]) - def on_hci_write_extended_inquiry_response_command(self, _command): + def on_hci_write_extended_inquiry_response_command( + self, _command: hci.HCI_Write_Extended_Inquiry_Response_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.56 Write Extended Inquiry Response Command ''' return bytes([HCI_SUCCESS]) - def on_hci_write_simple_pairing_mode_command(self, _command): + def on_hci_write_simple_pairing_mode_command( + self, _command: hci.HCI_Write_Simple_Pairing_Mode_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command ''' return bytes([HCI_SUCCESS]) - def on_hci_set_event_mask_page_2_command(self, command): + def on_hci_set_event_mask_page_2_command( + self, command: hci.HCI_Set_Event_Mask_Page_2_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.69 Set Event Mask Page 2 Command ''' - self.event_mask_page_2 = command.event_mask_page_2 + self.event_mask_page_2 = int.from_bytes( + command.event_mask_page_2, byteorder='little', signed=False + ) return bytes([HCI_SUCCESS]) - def on_hci_read_le_host_support_command(self, _command): + def on_hci_read_le_host_support_command( + self, _command: hci.HCI_Read_LE_Host_Support_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command ''' return bytes([HCI_SUCCESS, 1, 0]) - def on_hci_write_le_host_support_command(self, _command): + def on_hci_write_le_host_support_command( + self, _command: hci.HCI_Write_LE_Host_Support_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command ''' # TODO / Just ignore for now return bytes([HCI_SUCCESS]) - def on_hci_write_authenticated_payload_timeout_command(self, command): + def on_hci_write_authenticated_payload_timeout_command( + self, command: hci.HCI_Write_Authenticated_Payload_Timeout_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.3.94 Write Authenticated Payload Timeout Command @@ -1202,7 +1286,9 @@ class Controller: # TODO return struct.pack(' Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.4.1 Read Local Version Information Command ''' @@ -1216,19 +1302,25 @@ class Controller: self.lmp_subversion, ) - def on_hci_read_local_supported_commands_command(self, _command): + def on_hci_read_local_supported_commands_command( + self, _command: hci.HCI_Read_Local_Supported_Commands_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command ''' return bytes([HCI_SUCCESS]) + self.supported_commands - def on_hci_read_local_supported_features_command(self, _command): + def on_hci_read_local_supported_features_command( + self, _command: hci.HCI_Read_Local_Supported_Features_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command ''' return bytes([HCI_SUCCESS]) + self.lmp_features[:8] - def on_hci_read_local_extended_features_command(self, command): + def on_hci_read_local_extended_features_command( + self, command: hci.HCI_Read_Local_Extended_Features_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command ''' @@ -1249,7 +1341,9 @@ class Controller: + self.lmp_features[command.page_number * 8 : (command.page_number + 1) * 8] ) - def on_hci_read_buffer_size_command(self, _command): + def on_hci_read_buffer_size_command( + self, _command: hci.HCI_Read_Buffer_Size_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.4.5 Read Buffer Size Command ''' @@ -1262,7 +1356,9 @@ class Controller: 0, ) - def on_hci_read_bd_addr_command(self, _command): + def on_hci_read_bd_addr_command( + self, _command: hci.HCI_Read_BD_ADDR_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command ''' @@ -1275,7 +1371,7 @@ class Controller: def on_hci_le_set_default_subrate_command( self, command: hci.HCI_LE_Set_Default_Subrate_Command - ): + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 6, Part E - 7.8.123 LE Set Event Mask Command ''' @@ -1291,7 +1387,7 @@ class Controller: def on_hci_le_subrate_request_command( self, command: hci.HCI_LE_Subrate_Request_Command - ): + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 6, Part E - 7.8.124 LE Subrate Request command ''' @@ -1323,14 +1419,20 @@ class Controller: ) return None - def on_hci_le_set_event_mask_command(self, command): + def on_hci_le_set_event_mask_command( + self, command: hci.HCI_LE_Set_Event_Mask_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.1 LE Set Event Mask Command ''' - self.le_event_mask = command.le_event_mask + self.le_event_mask = int.from_bytes( + command.le_event_mask, byteorder='little', signed=False + ) return bytes([HCI_SUCCESS]) - def on_hci_le_read_buffer_size_command(self, _command): + def on_hci_le_read_buffer_size_command( + self, _command: hci.HCI_LE_Read_Buffer_Size_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command ''' @@ -1341,7 +1443,9 @@ class Controller: self.total_num_le_acl_data_packets, ) - def on_hci_le_read_buffer_size_v2_command(self, _command): + def on_hci_le_read_buffer_size_v2_command( + self, _command: hci.HCI_LE_Read_Buffer_Size_V2_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command ''' @@ -1354,49 +1458,63 @@ class Controller: self.total_num_iso_data_packets, ) - def on_hci_le_read_local_supported_features_command(self, _command): + def on_hci_le_read_local_supported_features_command( + self, _command: hci.HCI_LE_Read_Local_Supported_Features_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.3 LE Read Local Supported Features Command ''' return bytes([HCI_SUCCESS]) + self.le_features - def on_hci_le_set_random_address_command(self, command): + def on_hci_le_set_random_address_command( + self, command: hci.HCI_LE_Set_Random_Address_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random Address Command ''' self.random_address = command.random_address return bytes([HCI_SUCCESS]) - def on_hci_le_set_advertising_parameters_command(self, command): + def on_hci_le_set_advertising_parameters_command( + self, command: hci.HCI_LE_Set_Advertising_Parameters_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.5 LE Set Advertising Parameters Command ''' self.advertising_parameters = command return bytes([HCI_SUCCESS]) - def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command): + def on_hci_le_read_advertising_physical_channel_tx_power_command( + self, _command: hci.HCI_LE_Read_Advertising_Physical_Channel_Tx_Power_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.6 LE Read Advertising Physical Channel Tx Power Command ''' return bytes([HCI_SUCCESS, self.advertising_channel_tx_power]) - def on_hci_le_set_advertising_data_command(self, command): + def on_hci_le_set_advertising_data_command( + self, command: hci.HCI_LE_Set_Advertising_Data_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.7 LE Set Advertising Data Command ''' self.advertising_data = command.advertising_data return bytes([HCI_SUCCESS]) - def on_hci_le_set_scan_response_data_command(self, command): + def on_hci_le_set_scan_response_data_command( + self, command: hci.HCI_LE_Set_Scan_Response_Data_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.8 LE Set Scan Response Data Command ''' self.le_scan_response_data = command.scan_response_data return bytes([HCI_SUCCESS]) - def on_hci_le_set_advertising_enable_command(self, command): + def on_hci_le_set_advertising_enable_command( + self, command: hci.HCI_LE_Set_Advertising_Enable_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.9 LE Set Advertising Enable Command ''' @@ -1407,7 +1525,9 @@ class Controller: return bytes([HCI_SUCCESS]) - def on_hci_le_set_scan_parameters_command(self, command): + def on_hci_le_set_scan_parameters_command( + self, command: hci.HCI_LE_Set_Scan_Parameters_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command ''' @@ -1417,25 +1537,29 @@ class Controller: self.le_scan_type = command.le_scan_type self.le_scan_interval = command.le_scan_interval self.le_scan_window = command.le_scan_window - self.le_scan_own_address_type = command.own_address_type + self.le_scan_own_address_type = hci.AddressType(command.own_address_type) self.le_scanning_filter_policy = command.scanning_filter_policy return bytes([HCI_SUCCESS]) - def on_hci_le_set_scan_enable_command(self, command): + def on_hci_le_set_scan_enable_command( + self, command: hci.HCI_LE_Set_Scan_Enable_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.11 LE Set Scan Enable Command ''' - self.le_scan_enable = command.le_scan_enable - self.filter_duplicates = command.filter_duplicates + self.le_scan_enable = bool(command.le_scan_enable) + self.filter_duplicates = bool(command.filter_duplicates) return bytes([HCI_SUCCESS]) - def on_hci_le_create_connection_command(self, command): + def on_hci_le_create_connection_command( + self, command: hci.HCI_LE_Create_Connection_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.12 LE Create Connection Command ''' if not self.link: - return + return None logger.debug(f'Connection request to {command.peer_address}') @@ -1448,7 +1572,7 @@ class Controller: command_opcode=command.op_code, ) ) - return + return None # Initiate the connection self.link.connect(self.random_address, command) @@ -1461,41 +1585,54 @@ class Controller: command_opcode=command.op_code, ) ) + return None - def on_hci_le_create_connection_cancel_command(self, _command): + def on_hci_le_create_connection_cancel_command( + self, _command: hci.HCI_LE_Create_Connection_Cancel_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.13 LE Create Connection Cancel Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_read_filter_accept_list_size_command(self, _command): + def on_hci_le_read_filter_accept_list_size_command( + self, _command: hci.HCI_LE_Read_Filter_Accept_List_Size_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.14 LE Read Filter Accept List Size Command ''' return bytes([HCI_SUCCESS, self.filter_accept_list_size]) - def on_hci_le_clear_filter_accept_list_command(self, _command): + def on_hci_le_clear_filter_accept_list_command( + self, _command: hci.HCI_LE_Clear_Filter_Accept_List_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.15 LE Clear Filter Accept List Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_add_device_to_filter_accept_list_command(self, _command): + def on_hci_le_add_device_to_filter_accept_list_command( + self, _command: hci.HCI_LE_Add_Device_To_Filter_Accept_List_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.16 LE Add Device To Filter Accept List Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_remove_device_from_filter_accept_list_command(self, _command): + def on_hci_le_remove_device_from_filter_accept_list_command( + self, _command: hci.HCI_LE_Remove_Device_From_Filter_Accept_List_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.17 LE Remove Device From Filter Accept List Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_read_remote_features_command(self, command): + def on_hci_le_read_remote_features_command( + self, command: hci.HCI_LE_Read_Remote_Features_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command ''' @@ -1510,7 +1647,7 @@ class Controller: command_opcode=command.op_code, ) ) - return + return None # First, say that the command is pending self.send_hci_packet( @@ -1529,17 +1666,24 @@ class Controller: le_features=bytes.fromhex('dd40000000000000'), ) ) + return None - def on_hci_le_rand_command(self, _command): + def on_hci_le_rand_command( + self, _command: hci.HCI_LE_Rand_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.23 LE Rand Command ''' return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64)) - def on_hci_le_enable_encryption_command(self, command): + def on_hci_le_enable_encryption_command( + self, command: hci.HCI_LE_Enable_Encryption_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.24 LE Enable Encryption Command ''' + if not self.link: + return None # Check the parameters if not ( @@ -1569,13 +1713,17 @@ class Controller: return None - def on_hci_le_read_supported_states_command(self, _command): + def on_hci_le_read_supported_states_command( + self, _command: hci.HCI_LE_Read_Supported_States_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.27 LE Read Supported States Command ''' return bytes([HCI_SUCCESS]) + self.le_states - def on_hci_le_read_suggested_default_data_length_command(self, _command): + def on_hci_le_read_suggested_default_data_length_command( + self, _command: hci.HCI_LE_Read_Suggested_Default_Data_Length_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.34 LE Read Suggested Default Data Length Command @@ -1587,7 +1735,9 @@ class Controller: self.suggested_max_tx_time, ) - def on_hci_le_write_suggested_default_data_length_command(self, command): + def on_hci_le_write_suggested_default_data_length_command( + self, command: hci.HCI_LE_Write_Suggested_Default_Data_Length_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.35 LE Write Suggested Default Data Length Command @@ -1597,33 +1747,43 @@ class Controller: ) return bytes([HCI_SUCCESS]) - def on_hci_le_read_local_p_256_public_key_command(self, _command): + def on_hci_le_read_local_p_256_public_key_command( + self, _command: hci.HCI_LE_Read_Local_P_256_Public_Key_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.36 LE Read P-256 Public Key Command ''' # TODO create key and send HCI_LE_Read_Local_P-256_Public_Key_Complete event return bytes([HCI_SUCCESS]) - def on_hci_le_add_device_to_resolving_list_command(self, _command): + def on_hci_le_add_device_to_resolving_list_command( + self, _command: hci.HCI_LE_Add_Device_To_Resolving_List_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.38 LE Add Device To Resolving List Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_clear_resolving_list_command(self, _command): + def on_hci_le_clear_resolving_list_command( + self, _command: hci.HCI_LE_Clear_Resolving_List_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.40 LE Clear Resolving List Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_read_resolving_list_size_command(self, _command): + def on_hci_le_read_resolving_list_size_command( + self, _command: hci.HCI_LE_Read_Resolving_List_Size_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.41 LE Read Resolving List Size Command ''' return bytes([HCI_SUCCESS, self.resolving_list_size]) - def on_hci_le_set_address_resolution_enable_command(self, command): + def on_hci_le_set_address_resolution_enable_command( + self, command: hci.HCI_LE_Set_Address_Resolution_Enable_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set Address Resolution Enable Command @@ -1637,7 +1797,9 @@ class Controller: ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR return bytes([ret]) - def on_hci_le_set_resolvable_private_address_timeout_command(self, command): + def on_hci_le_set_resolvable_private_address_timeout_command( + self, command: hci.HCI_LE_Set_Resolvable_Private_Address_Timeout_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private Address Timeout Command @@ -1645,7 +1807,9 @@ class Controller: self.le_rpa_timeout = command.rpa_timeout return bytes([HCI_SUCCESS]) - def on_hci_le_read_maximum_data_length_command(self, _command): + def on_hci_le_read_maximum_data_length_command( + self, _command: hci.HCI_LE_Read_Maximum_Data_Length_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.46 LE Read Maximum Data Length Command ''' @@ -1658,7 +1822,9 @@ class Controller: self.supported_max_rx_time, ) - def on_hci_le_read_phy_command(self, command): + def on_hci_le_read_phy_command( + self, command: hci.HCI_LE_Read_PHY_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.47 LE Read PHY Command ''' @@ -1670,94 +1836,118 @@ class Controller: HCI_LE_1M_PHY, ) - def on_hci_le_set_default_phy_command(self, command): + def on_hci_le_set_default_phy_command( + self, command: hci.HCI_LE_Set_Default_PHY_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.48 LE Set Default PHY Command ''' - self.default_phy = { - 'all_phys': command.all_phys, - 'tx_phys': command.tx_phys, - 'rx_phys': command.rx_phys, - } + self.default_phy['all_phys'] = command.all_phys + self.default_phy['tx_phys'] = command.tx_phys + self.default_phy['rx_phys'] = command.rx_phys return bytes([HCI_SUCCESS]) - def on_hci_le_set_advertising_set_random_address_command(self, _command): + def on_hci_le_set_advertising_set_random_address_command( + self, _command: hci.HCI_LE_Set_Advertising_Set_Random_Address_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_set_extended_advertising_parameters_command(self, _command): + def on_hci_le_set_extended_advertising_parameters_command( + self, _command: hci.HCI_LE_Set_Extended_Advertising_Parameters_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters Command ''' return bytes([HCI_SUCCESS, 0]) - def on_hci_le_set_extended_advertising_data_command(self, _command): + def on_hci_le_set_extended_advertising_data_command( + self, _command: hci.HCI_LE_Set_Extended_Advertising_Data_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_set_extended_scan_response_data_command(self, _command): + def on_hci_le_set_extended_scan_response_data_command( + self, _command: hci.HCI_LE_Set_Extended_Scan_Response_Data_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_set_extended_advertising_enable_command(self, _command): + def on_hci_le_set_extended_advertising_enable_command( + self, _command: hci.HCI_LE_Set_Extended_Advertising_Enable_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_read_maximum_advertising_data_length_command(self, _command): + def on_hci_le_read_maximum_advertising_data_length_command( + self, _command: hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data Length Command ''' return struct.pack(' Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.58 LE Read Number of Supported Advertising Set Command ''' return struct.pack(' Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_set_periodic_advertising_data_command(self, _command): + def on_hci_le_set_periodic_advertising_data_command( + self, _command: hci.HCI_LE_Set_Periodic_Advertising_Data_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_set_periodic_advertising_enable_command(self, _command): + def on_hci_le_set_periodic_advertising_enable_command( + self, _command: hci.HCI_LE_Set_Periodic_Advertising_Enable_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable Command ''' return bytes([HCI_SUCCESS]) - def on_hci_le_read_transmit_power_command(self, _command): + def on_hci_le_read_transmit_power_command( + self, _command: hci.HCI_LE_Read_Transmit_Power_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command ''' return struct.pack(' Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.97 LE Set CIG Parameter Command ''' @@ -1781,12 +1971,14 @@ class Controller: ' Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.99 LE Create CIS Command ''' if not self.link: - return + return None for cis_handle, acl_handle in zip( command.cis_connection_handle, command.acl_connection_handle @@ -1815,8 +2007,11 @@ class Controller: command_opcode=command.op_code, ) ) + return None - def on_hci_le_remove_cig_command(self, command): + def on_hci_le_remove_cig_command( + self, command: hci.HCI_LE_Remove_CIG_Command + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.100 LE Remove CIG Command ''' @@ -1831,12 +2026,14 @@ class Controller: return struct.pack(' Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.101 LE Accept CIS Request Command ''' if not self.link: - return + return None if not ( pending_cis_link := self.peripheral_cis_links.get(command.connection_handle) @@ -1859,10 +2056,11 @@ class Controller: command_opcode=command.op_code, ) ) + return None def on_hci_le_setup_iso_data_path_command( self, command: hci.HCI_LE_Setup_ISO_Data_Path_Command - ) -> bytes: + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command ''' @@ -1883,7 +2081,7 @@ class Controller: def on_hci_le_remove_iso_data_path_command( self, command: hci.HCI_LE_Remove_ISO_Data_Path_Command - ) -> bytes: + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.110 LE Remove ISO Data Path Command ''' @@ -1909,7 +2107,7 @@ class Controller: def on_hci_le_set_host_feature_command( self, _command: hci.HCI_LE_Set_Host_Feature_Command - ): + ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.115 LE Set Host Feature command ''' diff --git a/bumble/hci.py b/bumble/hci.py index 9943ff0..0bb7586 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -3441,6 +3441,17 @@ class HCI_Write_Synchronous_Flow_Control_Enable_Command(HCI_Command): synchronous_flow_control_enable: int = field(metadata=metadata(1)) +# ----------------------------------------------------------------------------- +@HCI_Command.command +@dataclasses.dataclass +class HCI_Set_Controller_To_Host_Flow_Control_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.3.38 Set Controller To Host Flow Control command + ''' + + flow_control_enable: int = field(metadata=metadata(1)) + + # ----------------------------------------------------------------------------- @HCI_Command.command @dataclasses.dataclass @@ -4338,6 +4349,15 @@ class HCI_LE_Write_Suggested_Default_Data_Length_Command(HCI_Command): suggested_max_tx_time: int = field(metadata=metadata(2)) +# ----------------------------------------------------------------------------- +@HCI_Command.command +@dataclasses.dataclass +class HCI_LE_Read_Local_P_256_Public_Key_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.36 LE LE Read Local P-256 Public Key command + ''' + + # ----------------------------------------------------------------------------- @HCI_Command.command @dataclasses.dataclass @@ -4365,6 +4385,15 @@ class HCI_LE_Clear_Resolving_List_Command(HCI_Command): ''' +# ----------------------------------------------------------------------------- +@HCI_Command.command +@dataclasses.dataclass +class HCI_LE_Read_Resolving_List_Size_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.41 LE Read Resolving List Size command + ''' + + # ----------------------------------------------------------------------------- @HCI_Command.command @dataclasses.dataclass @@ -5028,6 +5057,15 @@ class HCI_LE_Periodic_Advertising_Terminate_Sync_Command(HCI_Command): sync_handle: int = field(metadata=metadata(2)) +# ----------------------------------------------------------------------------- +@HCI_Command.command +@dataclasses.dataclass +class HCI_LE_Read_Transmit_Power_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.74 LE Read Transmit Power command + ''' + + # ----------------------------------------------------------------------------- @HCI_Command.command @dataclasses.dataclass diff --git a/examples/run_controller.py b/examples/run_controller.py index 2b7f743..d25d32f 100644 --- a/examples/run_controller.py +++ b/examples/run_controller.py @@ -19,6 +19,7 @@ import asyncio import sys import bumble.logging +from bumble import hci from bumble.controller import Controller from bumble.device import Device from bumble.gatt import ( @@ -61,7 +62,7 @@ async def main() -> None: host_sink=hci_transport.sink, link=link, ) - controller1.random_address = sys.argv[1] + controller1.random_address = hci.Address(sys.argv[1]) # Create a second controller using the same link controller2 = Controller('C2', link=link)