diff --git a/bumble/device.py b/bumble/device.py index 9ae502e4..369040f2 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -437,6 +437,34 @@ class AdvertisingType(IntEnum): ) +# ----------------------------------------------------------------------------- +@dataclass +class LegacyAdvertiser: + advertising_type: AdvertisingType + own_address_type: OwnAddressType + auto_restart: bool + advertising_data: Optional[bytes] + scan_response_data: Optional[bytes] + + +# ----------------------------------------------------------------------------- +@dataclass +class ExtendedAdvertiser(CompositeEventEmitter): + device: Device + handle: int + advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties + own_address_type: OwnAddressType + auto_restart: bool + advertising_data: Optional[bytes] + scan_response_data: Optional[bytes] + + def __post_init__(self) -> None: + super().__init__() + + async def stop(self) -> None: + await self.device.stop_extended_advertising(self.handle) + + # ----------------------------------------------------------------------------- class LePhyOptions: # Coded PHY preference @@ -658,6 +686,9 @@ class Connection(CompositeEventEmitter): gatt_client: gatt_client.Client pairing_peer_io_capability: Optional[int] pairing_peer_authentication_requirements: Optional[int] + advertiser_after_disconnection: Union[ + LegacyAdvertiser, ExtendedAdvertiser, None + ] = None @composite_listener class Listener: @@ -1063,7 +1094,8 @@ class Device(CompositeEventEmitter): ] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] config: DeviceConfiguration - extended_advertising_handles: Set[int] + legacy_advertiser: Optional[LegacyAdvertiser] + extended_advertisers: Dict[int, ExtendedAdvertiser] sco_links: Dict[int, ScoLink] cis_links: Dict[int, CisLink] _pending_cis: Dict[int, Tuple[int, int]] @@ -1141,10 +1173,7 @@ class Device(CompositeEventEmitter): self._host = None self.powered_on = False - self.advertising = False - self.advertising_type = None self.auto_restart_inquiry = True - self.auto_restart_advertising = False self.command_timeout = 10 # seconds self.gatt_server = gatt_server.Server(self) self.sdp_server = sdp.Server(self) @@ -1168,10 +1197,10 @@ class Device(CompositeEventEmitter): self.classic_pending_accepts = { Address.ANY: [] } # Futures, by BD address OR [Futures] for Address.ANY - self.extended_advertising_handles = set() + self.legacy_advertiser = None + self.extended_advertisers = {} # Own address type cache - self.advertising_own_address_type = None self.connect_own_address_type = None # Use the initial config or a default @@ -1579,6 +1608,7 @@ class Device(CompositeEventEmitter): return self.host.supports_le_feature(feature_map[phy]) + @deprecated("Please use start_legacy_advertising.") async def start_advertising( self, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, @@ -1586,15 +1616,49 @@ class Device(CompositeEventEmitter): own_address_type: int = OwnAddressType.RANDOM, auto_restart: bool = False, ) -> None: + await self.start_legacy_advertising( + advertising_type=advertising_type, + target=target, + own_address_type=OwnAddressType(own_address_type), + auto_restart=auto_restart, + ) + + async def start_legacy_advertising( + self, + advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, + target: Optional[Address] = None, + own_address_type: OwnAddressType = OwnAddressType.RANDOM, + auto_restart: bool = False, + advertising_data: Optional[bytes] = None, + scan_response_data: Optional[bytes] = None, + ) -> LegacyAdvertiser: + """Starts an legacy advertisement. + + Args: + advertising_type: Advertising type passed to HCI_LE_Set_Advertising_Parameters_Command. + target: Directed advertising target. Directed type should be set in advertising_type arg. + own_address_type: own address type to use in the advertising. + auto_restart: whether the advertisement will be restarted after disconnection. + scan_response_data: raw scan response. + advertising_data: raw advertising data. + + Returns: + LegacyAdvertiser object containing the metadata of advertisement. + """ + if self.extended_advertisers: + logger.warning( + 'Trying to start Legacy and Extended Advertising at the same time!' + ) + # If we're advertising, stop first - if self.advertising: + if self.legacy_advertiser: await self.stop_advertising() # Set/update the advertising data if the advertising type allows it if advertising_type.has_data: await self.send_command( HCI_LE_Set_Advertising_Data_Command( - advertising_data=self.advertising_data + advertising_data=advertising_data or self.advertising_data or b'' ), check_result=True, ) @@ -1603,7 +1667,9 @@ class Device(CompositeEventEmitter): if advertising_type.is_scannable: await self.send_command( HCI_LE_Set_Scan_Response_Data_Command( - scan_response_data=self.scan_response_data + scan_response_data=scan_response_data + or self.scan_response_data + or b'' ), check_result=True, ) @@ -1640,45 +1706,56 @@ class Device(CompositeEventEmitter): check_result=True, ) - self.advertising_type = advertising_type - self.advertising_own_address_type = own_address_type - self.advertising = True - self.auto_restart_advertising = auto_restart + self.legacy_advertiser = LegacyAdvertiser( + advertising_type=advertising_type, + own_address_type=own_address_type, + auto_restart=auto_restart, + advertising_data=advertising_data, + scan_response_data=scan_response_data, + ) + return self.legacy_advertiser + @deprecated("Please use stop_legacy_advertising.") async def stop_advertising(self) -> None: + await self.stop_legacy_advertising() + + async def stop_legacy_advertising(self) -> None: # Disable advertising - if self.advertising: + if self.legacy_advertiser: await self.send_command( HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), check_result=True, ) - self.advertising_type = None - self.advertising_own_address_type = None - self.advertising = False - self.auto_restart_advertising = False + self.legacy_advertiser = None @experimental('Extended Advertising is still experimental - Might be changed soon.') async def start_extended_advertising( self, advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING, target: Address = Address.ANY, - own_address_type: int = OwnAddressType.RANDOM, - scan_response: Optional[bytes] = None, + own_address_type: OwnAddressType = OwnAddressType.RANDOM, + auto_restart: bool = True, advertising_data: Optional[bytes] = None, - ) -> int: + scan_response_data: Optional[bytes] = None, + ) -> ExtendedAdvertiser: """Starts an extended advertising set. Args: advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command target: Directed advertising target. Directed property should be set in advertising_properties arg. own_address_type: own address type to use in the advertising. - scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. + auto_restart: whether the advertisement will be restarted after disconnection. advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent. + scan_response_data: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. Returns: - Handle of the new advertising set. + ExtendedAdvertiser object containing the metadata of advertisement. """ + if self.legacy_advertiser: + logger.warning( + 'Trying to start Legacy and Extended Advertising at the same time!' + ) adv_handle = -1 # Find a free handle @@ -1686,7 +1763,7 @@ class Device(CompositeEventEmitter): DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, ): - if i not in self.extended_advertising_handles: + if i not in self.extended_advertisers.keys(): adv_handle = i break @@ -1733,13 +1810,13 @@ class Device(CompositeEventEmitter): ) # Set the scan response if present - if scan_response is not None: + if scan_response_data is not None: await self.send_command( HCI_LE_Set_Extended_Scan_Response_Data_Command( advertising_handle=adv_handle, operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, fragment_preference=0x01, # Should not fragment - scan_response_data=scan_response, + scan_response_data=scan_response_data, ), check_result=True, ) @@ -1774,8 +1851,16 @@ class Device(CompositeEventEmitter): ) raise error - self.extended_advertising_handles.add(adv_handle) - return adv_handle + advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser( + device=self, + handle=adv_handle, + advertising_properties=advertising_properties, + own_address_type=own_address_type, + auto_restart=auto_restart, + advertising_data=advertising_data, + scan_response_data=scan_response_data, + ) + return advertiser @experimental('Extended Advertising is still experimental - Might be changed soon.') async def stop_extended_advertising(self, adv_handle: int) -> None: @@ -1799,11 +1884,11 @@ class Device(CompositeEventEmitter): HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), check_result=True, ) - self.extended_advertising_handles.remove(adv_handle) + del self.extended_advertisers[adv_handle] @property def is_advertising(self): - return self.advertising + return self.legacy_advertiser or self.extended_advertisers async def start_scanning( self, @@ -3144,13 +3229,17 @@ class Device(CompositeEventEmitter): # Guess which own address type is used for this connection. # This logic is somewhat correct but may need to be improved # when multiple advertising are run simultaneously. + advertiser = None if self.connect_own_address_type is not None: own_address_type = self.connect_own_address_type + elif self.legacy_advertiser: + own_address_type = self.legacy_advertiser.own_address_type + # Store advertiser for restarting - it's only required for legacy, since + # extended advertisement produces HCI_Advertising_Set_Terminated. + advertiser = self.legacy_advertiser else: - own_address_type = self.advertising_own_address_type - - # We are no longer advertising - self.advertising = False + # For extended advertisement, determining own address type later. + own_address_type = OwnAddressType.RANDOM if own_address_type in ( OwnAddressType.PUBLIC, @@ -3172,6 +3261,7 @@ class Device(CompositeEventEmitter): connection_parameters, ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), ) + connection.advertiser_after_disconnection = advertiser self.connections[connection_handle] = connection # If supported, read which PHY we're connected with before @@ -3203,10 +3293,10 @@ class Device(CompositeEventEmitter): # For directed advertising, this means a timeout if ( transport == BT_LE_TRANSPORT - and self.advertising - and self.advertising_type.is_directed + and self.legacy_advertiser + and self.legacy_advertiser.advertising_type.is_directed ): - self.advertising = False + self.legacy_advertiser = None # Notify listeners error = core.ConnectionError( @@ -3268,16 +3358,28 @@ class Device(CompositeEventEmitter): self.gatt_server.on_disconnection(connection) # Restart advertising if auto-restart is enabled - if self.auto_restart_advertising: + if advertiser := connection.advertiser_after_disconnection: logger.debug('restarting advertising') - self.abort_on( - 'flush', - self.start_advertising( - advertising_type=self.advertising_type, # type: ignore[arg-type] - own_address_type=self.advertising_own_address_type, # type: ignore[arg-type] - auto_restart=True, - ), - ) + if isinstance(advertiser, LegacyAdvertiser): + self.abort_on( + 'flush', + self.start_legacy_advertising( + advertising_type=advertiser.advertising_type, + own_address_type=advertiser.own_address_type, + auto_restart=True, + ), + ) + elif isinstance(advertiser, ExtendedAdvertiser): + self.abort_on( + 'flush', + self.start_extended_advertising( + advertising_properties=advertiser.advertising_properties, + own_address_type=advertiser.own_address_type, + advertising_data=advertiser.advertising_data, + scan_response_data=advertiser.scan_response_data, + auto_restart=True, + ), + ) elif sco_link := self.sco_links.pop(connection_handle, None): sco_link.emit('disconnection', reason) elif cis_link := self.cis_links.pop(connection_handle, None): @@ -3600,6 +3702,29 @@ class Device(CompositeEventEmitter): if sco_link := self.sco_links.get(sco_handle, None): sco_link.emit('pdu', packet) + # [LE only] + @host_event_handler + @experimental('Only for testing') + def on_advertising_set_termination( + self, + status: int, + advertising_handle: int, + connection_handle: int, + ) -> None: + if status == HCI_SUCCESS: + connection = self.lookup_connection(connection_handle) + if advertiser := self.extended_advertisers.pop(advertising_handle, None): + if connection: + connection.advertiser_after_disconnection = advertiser + if advertiser.own_address_type in ( + OwnAddressType.PUBLIC, + OwnAddressType.RESOLVABLE_OR_PUBLIC, + ): + connection.self_address = self.public_address + else: + connection.self_address = self.random_address + advertiser.emit('termination', status) + # [LE only] @host_event_handler @with_connection_from_handle diff --git a/bumble/hci.py b/bumble/hci.py index a28246ab..936b6816 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1963,25 +1963,15 @@ Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ # ----------------------------------------------------------------------------- -class OwnAddressType: +class OwnAddressType(enum.IntEnum): PUBLIC = 0 RANDOM = 1 RESOLVABLE_OR_PUBLIC = 2 RESOLVABLE_OR_RANDOM = 3 - TYPE_NAMES = { - PUBLIC: 'PUBLIC', - RANDOM: 'RANDOM', - RESOLVABLE_OR_PUBLIC: 'RESOLVABLE_OR_PUBLIC', - RESOLVABLE_OR_RANDOM: 'RESOLVABLE_OR_RANDOM', - } - - @staticmethod - def type_name(type_id): - return name_or_number(OwnAddressType.TYPE_NAMES, type_id) - - # pylint: disable-next=unnecessary-lambda - TYPE_SPEC = {'size': 1, 'mapper': lambda x: OwnAddressType.type_name(x)} + @classmethod + def type_spec(cls): + return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name} # ----------------------------------------------------------------------------- @@ -3374,7 +3364,7 @@ class HCI_LE_Set_Random_Address_Command(HCI_Command): ), }, ), - ('own_address_type', OwnAddressType.TYPE_SPEC), + ('own_address_type', OwnAddressType.type_spec()), ('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address', Address.parse_address_preceded_by_type), ('advertising_channel_map', 1), @@ -3467,7 +3457,7 @@ class HCI_LE_Set_Advertising_Enable_Command(HCI_Command): ('le_scan_type', 1), ('le_scan_interval', 2), ('le_scan_window', 2), - ('own_address_type', OwnAddressType.TYPE_SPEC), + ('own_address_type', OwnAddressType.type_spec()), ('scanning_filter_policy', 1), ] ) @@ -3506,7 +3496,7 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command): ('initiator_filter_policy', 1), ('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address', Address.parse_address_preceded_by_type), - ('own_address_type', OwnAddressType.TYPE_SPEC), + ('own_address_type', OwnAddressType.type_spec()), ('connection_interval_min', 2), ('connection_interval_max', 2), ('max_latency', 2), @@ -3913,7 +3903,7 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): ), }, ), - ('own_address_type', OwnAddressType.TYPE_SPEC), + ('own_address_type', OwnAddressType.type_spec()), ('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address', Address.parse_address_preceded_by_type), ('advertising_filter_policy', 1), @@ -4309,7 +4299,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command): ('initiator_filter_policy:', self.initiator_filter_policy), ( 'own_address_type: ', - OwnAddressType.type_name(self.own_address_type), + OwnAddressType(self.own_address_type).name, ), ( 'peer_address_type: ', @@ -5190,6 +5180,21 @@ HCI_LE_Meta_Event.subevent_classes[ ] = HCI_LE_Extended_Advertising_Report_Event +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('status', 1), + ('advertising_handle', 1), + ('connection_handle', 2), + ('number_completed_extended_advertising_events', 1), + ] +) +class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.18 LE Advertising Set Terminated Event + ''' + + # ----------------------------------------------------------------------------- @HCI_LE_Meta_Event.event([('connection_handle', 2), ('channel_selection_algorithm', 1)]) class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event): diff --git a/bumble/host.py b/bumble/host.py index b06ceba4..3ae2280b 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -721,6 +721,14 @@ class Host(AbortableEventEmitter): def on_hci_le_extended_advertising_report_event(self, event): self.on_hci_le_advertising_report_event(event) + def on_hci_le_advertising_set_terminated_event(self, event): + self.emit( + 'advertising_set_termination', + event.status, + event.advertising_handle, + event.connection_handle, + ) + def on_hci_le_cis_request_event(self, event): self.emit( 'cis_request',