diff --git a/bumble/controller.py b/bumble/controller.py index 4f68795..27bae2b 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -56,6 +56,61 @@ class CisLink: data_paths: set[int] = dataclasses.field(default_factory=set) +# ----------------------------------------------------------------------------- +@dataclasses.dataclass +class AdvertisingSet: + controller: Controller + handle: int + parameters: Optional[hci.HCI_LE_Set_Extended_Advertising_Parameters_Command] = None + data: bytearray = dataclasses.field(default_factory=bytearray) + scan_response_data: bytearray = dataclasses.field(default_factory=bytearray) + enabled: bool = False + timer_handle: Optional[asyncio.Handle] = None + random_address: Optional[hci.Address] = None + + def _on_extended_advertising_timer_fired(self) -> None: + if not self.enabled: + return + + self.send_extended_advertising_data() + + interval = ( + self.parameters.primary_advertising_interval_min * 0.625 / 1000.0 + if self.parameters + else 1.0 + ) + self.timer_handle = asyncio.get_running_loop().call_later( + interval, self._on_extended_advertising_timer_fired + ) + + def start(self) -> None: + self.enabled = True + asyncio.get_running_loop().call_soon(self._on_extended_advertising_timer_fired) + + def stop(self) -> None: + self.enabled = False + if timer_handle := self.timer_handle: + timer_handle.cancel() + self.timer_handle = None + + def send_extended_advertising_data(self) -> None: + if self.controller.link: + address = self.random_address or self.random_address + + properties = ( + self.parameters.advertising_event_properties if self.parameters else 0 + ) + + self.controller.link.send_extended_advertising_data( + address, bytes(self.data), properties + ) + + if self.scan_response_data: + self.controller.link.send_extended_advertising_data( + address, self.scan_response_data, properties | 0x08 + ) + + # ----------------------------------------------------------------------------- @dataclasses.dataclass class ScoLink: @@ -109,6 +164,7 @@ class Controller: sco_links: dict[hci.Address, ScoLink] # SCO links by address central_cis_links: dict[int, CisLink] # CIS links by handle peripheral_cis_links: dict[int, CisLink] # CIS links by handle + advertising_sets: dict[int, AdvertisingSet] # Advertising sets by handle hci_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0 hci_revision: int = 0 @@ -150,7 +206,7 @@ class Controller: le_scan_type: int = 0 le_scan_interval: int = 0x10 le_scan_window: int = 0x10 - le_scan_enable: int = 0 + le_scan_enable: bool = False le_scan_own_address_type: int = hci.Address.RANDOM_DEVICE_ADDRESS le_scanning_filter_policy: int = 0 le_scan_response_data: Optional[bytes] = None @@ -183,6 +239,7 @@ class Controller: self.classic_pending_commands = {} self.central_cis_links = {} self.peripheral_cis_links = {} + self.advertising_sets = {} self.default_phy = { 'all_phys': 0, 'tx_phys': 0, @@ -329,7 +386,9 @@ class Controller: ) ) return next( - handle for handle in range(0xEFF + 1) if handle not in current_handles + handle + for handle in range(0x0001, 0xEFF + 1) + if handle not in current_handles ) def find_le_connection_by_address( @@ -383,7 +442,9 @@ class Controller: handle ) - def on_link_central_connected(self, central_address: hci.Address) -> None: + def on_link_central_connected( + self, central_address: hci.Address, local_address: Optional[hci.Address] = None + ) -> None: ''' Called when an incoming connection occurs from a central on the link ''' @@ -421,6 +482,49 @@ class Controller: ) ) + if local_address: + for handle, adv_set in self.advertising_sets.items(): + set_address = ( + adv_set.random_address + if adv_set.random_address + else self.random_address + ) + # Check if address matches. + # Note: local_address passed from Link is what central connected to. + # If set uses Public address, local_address should match self.public_address. + # But set_address above is random_address or self.random_address. + # We need to handle Public address case. + + # If set parameters say Own_Address_Type is Public, set_address logic above is wrong? + # The set itself doesn't store Own_Address_Type, it is in parameters. + + use_public = False + if adv_set.parameters and adv_set.parameters.own_address_type in ( + hci.OwnAddressType.PUBLIC, + hci.OwnAddressType.RESOLVABLE_OR_PUBLIC, + ): + use_public = True + + matched = False + if use_public: + if self.public_address == local_address: + matched = True + else: + if set_address == local_address: + matched = True + + if matched and adv_set.enabled: + self.send_hci_packet( + hci.HCI_LE_Advertising_Set_Terminated_Event( + status=hci.HCI_SUCCESS, + advertising_handle=handle, + connection_handle=connection.handle, + num_completed_extended_advertising_events=0, + ) + ) + adv_set.stop() + break + def on_link_disconnected(self, peer_address: hci.Address, reason: int) -> None: ''' Called when an active disconnection occurs from a peer @@ -454,7 +558,10 @@ class Controller: def on_link_peripheral_connection_complete( self, - le_create_connection_command: hci.HCI_LE_Create_Connection_Command, + le_create_connection_command: Union[ + hci.HCI_LE_Create_Connection_Command, + hci.HCI_LE_Extended_Create_Connection_Command, + ], status: int, ) -> None: ''' @@ -483,6 +590,17 @@ class Controller: else: connection = None + if isinstance( + le_create_connection_command, hci.HCI_LE_Extended_Create_Connection_Command + ): + interval = le_create_connection_command.connection_interval_mins[0] + latency = le_create_connection_command.max_latencies[0] + timeout = le_create_connection_command.supervision_timeouts[0] + else: + interval = le_create_connection_command.connection_interval_min + latency = le_create_connection_command.max_latency + timeout = le_create_connection_command.supervision_timeout + # Say that the connection has completed self.send_hci_packet( # pylint: disable=line-too-long @@ -492,9 +610,9 @@ class Controller: role=hci.Role.CENTRAL, peer_address_type=le_create_connection_command.peer_address_type, peer_address=le_create_connection_command.peer_address, - connection_interval=le_create_connection_command.connection_interval_min, - peripheral_latency=le_create_connection_command.max_latency, - supervision_timeout=le_create_connection_command.supervision_timeout, + connection_interval=interval, + peripheral_latency=latency, + supervision_timeout=timeout, central_clock_accuracy=0, ) ) @@ -559,7 +677,7 @@ class Controller: self, sender_address: hci.Address, data: bytes ) -> None: # Ignore if we're not scanning - if self.le_scan_enable == 0: + if not self.le_scan_enable: return # Send a scan report @@ -955,12 +1073,39 @@ class Controller: self.advertising_timer_handle = None def send_advertising_data(self) -> None: - if self.link and self.advertising_data: - self.link.send_advertising_data(self.random_address, self.advertising_data) + if self.link: + self.link.send_advertising_data( + self.random_address, self.advertising_data or b'' + ) @property def is_advertising(self) -> bool: - return self.advertising_timer_handle is not None + return self.advertising_timer_handle is not None or any( + s.enabled for s in self.advertising_sets.values() + ) + + def on_link_extended_advertising_data( + self, sender_address: hci.Address, data: bytes, properties: int + ) -> None: + if not self.le_scan_enable: + return + + # Send extended advertising report + report = hci.HCI_LE_Extended_Advertising_Report_Event.Report( + event_type=properties, + address_type=sender_address.address_type, + address=sender_address, + primary_phy=hci.HCI_LE_1M_PHY, + secondary_phy=hci.HCI_LE_1M_PHY, + advertising_sid=0, + tx_power=127, + rssi=-50, + periodic_advertising_interval=0, + direct_address_type=0, + direct_address=hci.Address('00:00:00:00:00:00'), + data=data, + ) + self.send_hci_packet(hci.HCI_LE_Extended_Advertising_Report_Event([report])) ############################################################ # HCI handlers @@ -1872,6 +2017,37 @@ class Controller: ''' return bytes([hci.HCI_SUCCESS]) + def on_hci_le_extended_create_connection_command( + self, command: hci.HCI_LE_Extended_Create_Connection_Command + ) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.8.66 LE Extended Create Connection Command + ''' + if not self.link: + return None + + # Check pending + if self.link.get_pending_connection(): + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_DISALLOWED_ERROR, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + return None + + self.link.connect(self.random_address, command) + + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_STATUS_PENDING, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + return None + def on_hci_le_read_filter_accept_list_size_command( self, _command: hci.HCI_LE_Read_Filter_Accept_List_Size_Command ) -> Optional[bytes]: @@ -2134,48 +2310,125 @@ class Controller: return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_advertising_set_random_address_command( - self, _command: hci.HCI_LE_Set_Advertising_Set_Random_Address_Command + self, command: hci.HCI_LE_Set_Advertising_Set_Random_Address_Command ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random hci.Address Command ''' + handle = command.advertising_handle + if handle not in self.advertising_sets: + self.advertising_sets[handle] = AdvertisingSet( + controller=self, handle=handle + ) + self.advertising_sets[handle].random_address = command.random_address return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_extended_advertising_parameters_command( - self, _command: hci.HCI_LE_Set_Extended_Advertising_Parameters_Command + self, command: hci.HCI_LE_Set_Extended_Advertising_Parameters_Command ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters Command ''' + handle = command.advertising_handle + if handle not in self.advertising_sets: + self.advertising_sets[handle] = AdvertisingSet( + controller=self, handle=handle + ) + + self.advertising_sets[handle].parameters = command return bytes([hci.HCI_SUCCESS, 0]) def on_hci_le_set_extended_advertising_data_command( - self, _command: hci.HCI_LE_Set_Extended_Advertising_Data_Command + self, command: hci.HCI_LE_Set_Extended_Advertising_Data_Command ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data Command ''' + handle = command.advertising_handle + if not (adv_set := self.advertising_sets.get(handle)): + return bytes([hci.HCI_UNKNOWN_ADVERTISING_IDENTIFIER_ERROR]) + + if command.operation in ( + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.FIRST_FRAGMENT, + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + ): + adv_set.data = bytearray(command.advertising_data) + elif command.operation in ( + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.INTERMEDIATE_FRAGMENT, + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.LAST_FRAGMENT, + ): + adv_set.data.extend(command.advertising_data) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_extended_scan_response_data_command( - self, _command: hci.HCI_LE_Set_Extended_Scan_Response_Data_Command + self, command: hci.HCI_LE_Set_Extended_Scan_Response_Data_Command ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data Command ''' + handle = command.advertising_handle + if not (adv_set := self.advertising_sets.get(handle)): + return bytes([hci.HCI_UNKNOWN_ADVERTISING_IDENTIFIER_ERROR]) + + if command.operation in ( + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.FIRST_FRAGMENT, + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + ): + adv_set.scan_response_data = bytearray(command.scan_response_data) + elif command.operation in ( + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.INTERMEDIATE_FRAGMENT, + hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.LAST_FRAGMENT, + ): + adv_set.scan_response_data.extend(command.scan_response_data) + return bytes([hci.HCI_SUCCESS]) def on_hci_le_set_extended_advertising_enable_command( - self, _command: hci.HCI_LE_Set_Extended_Advertising_Enable_Command + self, command: hci.HCI_LE_Set_Extended_Advertising_Enable_Command ) -> Optional[bytes]: ''' See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable Command ''' + if command.enable: + for handle in command.advertising_handles: + if advertising_set := self.advertising_sets.get(handle): + advertising_set.start() + else: + if not command.advertising_handles: + for advertising_set in self.advertising_sets.values(): + advertising_set.stop() + else: + for handle in command.advertising_handles: + if advertising_set := self.advertising_sets.get(handle): + advertising_set.stop() + return bytes([hci.HCI_SUCCESS]) + + def on_hci_le_remove_advertising_set_command( + self, command: hci.HCI_LE_Remove_Advertising_Set_Command + ) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.8.59 LE Remove Advertising Set Command + ''' + handle = command.advertising_handle + if advertising_set := self.advertising_sets.pop(handle, None): + advertising_set.stop() + return bytes([hci.HCI_SUCCESS]) + + def on_hci_le_clear_advertising_sets_command( + self, _command: hci.HCI_LE_Clear_Advertising_Sets_Command + ) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.8.60 LE Clear Advertising Sets Command + ''' + for advertising_set in self.advertising_sets.values(): + advertising_set.stop() + self.advertising_sets.clear() return bytes([hci.HCI_SUCCESS]) def on_hci_le_read_maximum_advertising_data_length_command( diff --git a/bumble/link.py b/bumble/link.py index 4c4e168..74dea7d 100644 --- a/bumble/link.py +++ b/bumble/link.py @@ -65,6 +65,11 @@ class LocalLink: for controller in self.controllers: if controller.random_address == address: return controller + if controller.public_address == address: + return controller + for advertising_set in controller.advertising_sets.values(): + if advertising_set.random_address == address: + return controller return None def find_classic_controller( @@ -91,6 +96,17 @@ class LocalLink: if controller.random_address != sender_address: controller.on_link_advertising_data(sender_address, data) + def send_extended_advertising_data( + self, sender_address: hci.Address, data: bytes, properties: int = 0 + ): + # Send the advertising data to all controllers, except the sender + sender_controller = self.find_controller(sender_address) + for controller in self.controllers: + if controller != sender_controller: + controller.on_link_extended_advertising_data( + sender_address, data, properties + ) + def send_acl_data( self, sender_controller: controller.Controller, @@ -136,7 +152,9 @@ class LocalLink: central_controller.on_link_peripheral_connection_complete( le_create_connection_command, hci.HCI_SUCCESS ) - peripheral_controller.on_link_central_connected(central_address) + peripheral_controller.on_link_central_connected( + central_address, le_create_connection_command.peer_address + ) return # No peripheral found @@ -147,7 +165,10 @@ class LocalLink: def connect( self, central_address: hci.Address, - le_create_connection_command: hci.HCI_LE_Create_Connection_Command, + le_create_connection_command: ( + hci.HCI_LE_Create_Connection_Command + | hci.HCI_LE_Extended_Create_Connection_Command + ), ): logger.debug( f'$$$ CONNECTION {central_address} -> ' diff --git a/tests/device_test.py b/tests/device_test.py index 3fd3d20..995e25a 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -284,52 +284,49 @@ async def test_legacy_advertising(): @pytest.mark.asyncio async def test_legacy_advertising_disconnection(auto_restart): devices = TwoDevices() - device = devices[0] - devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff') - await device.power_on() - peer_address = Address('F0:F1:F2:F3:F4:F5') - await device.start_advertising(auto_restart=auto_restart) - device.on_le_connection( - 0x0001, - peer_address, - None, - None, - Role.PERIPHERAL, - 0, - 0, - 0, - ) + for controller in devices.controllers: + controller.le_features = bytes.fromhex('ffffffffffffffff') + for dev in devices: + await dev.power_on() + await devices[0].start_advertising(auto_restart=auto_restart) + connecion = await devices[1].connect(devices[0].random_address) - device.on_advertising_set_termination( - HCI_SUCCESS, device.legacy_advertising_set.advertising_handle, 0x0001, 0 - ) + await connecion.disconnect() - device.on_disconnection(0x0001, 0) await async_barrier() await async_barrier() if auto_restart: - assert device.legacy_advertising_set + assert devices[0].legacy_advertising_set started = asyncio.Event() - if not device.is_advertising: - device.legacy_advertising_set.once('start', started.set) + if not devices[0].is_advertising: + devices[0].legacy_advertising_set.once('start', started.set) await asyncio.wait_for(started.wait(), _TIMEOUT) - assert device.is_advertising + assert devices[0].is_advertising else: - assert not device.is_advertising + assert not devices[0].is_advertising # ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_extended_advertising(): - device = TwoDevices()[0] - await device.power_on() +async def test_advertising_and_scanning(): + devices = TwoDevices() + for dev in devices: + await dev.power_on() + + # Start scanning + advertisements = asyncio.Queue[device.Advertisement]() + devices[1].on(devices[1].EVENT_ADVERTISEMENT, advertisements.put_nowait) + await devices[1].start_scanning() # Start advertising - advertising_set = await device.create_advertising_set() - assert device.extended_advertising_sets + advertising_set = await devices[0].create_advertising_set(advertising_data=b'123') + assert devices[0].extended_advertising_sets assert advertising_set.enabled + advertisement = await asyncio.wait_for(advertisements.get(), _TIMEOUT) + assert advertisement.data_bytes == b'123' + # Stop advertising await advertising_set.stop() assert not advertising_set.enabled @@ -342,33 +339,30 @@ async def test_extended_advertising(): ) @pytest.mark.asyncio async def test_extended_advertising_connection(own_address_type): - device = TwoDevices()[0] - await device.power_on() - peer_address = Address('F0:F1:F2:F3:F4:F5') - advertising_set = await device.create_advertising_set( + devices = TwoDevices() + for dev in devices: + await dev.power_on() + advertising_set = await devices[0].create_advertising_set( advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) ) - device.on_le_connection( - 0x0001, - peer_address, - None, - None, - Role.PERIPHERAL, - 0, - 0, - 0, - ) - device.on_advertising_set_termination( - HCI_SUCCESS, - advertising_set.advertising_handle, - 0x0001, - 0, + await asyncio.wait_for( + devices[1].connect(advertising_set.random_address or devices[0].public_address), + _TIMEOUT, ) + # Advertising set should be terminated after connected. + assert not advertising_set.enabled + if own_address_type == OwnAddressType.PUBLIC: - assert device.lookup_connection(0x0001).self_address == device.public_address + assert ( + devices[0].lookup_connection(0x0001).self_address + == devices[0].public_address + ) else: - assert device.lookup_connection(0x0001).self_address == device.random_address + assert ( + devices[0].lookup_connection(0x0001).self_address + == devices[0].random_address + ) await async_barrier()