Add advertiser classes and handle adv set terminated events

* Convert hci.OwnAddressType to enum
* Add LegacyAdvertiser and ExtendedAdvertiser classes
* Rename start/stop_advertising() => start/stop_legacy_advertising()
* Handle HCI_Advertising_Set_Terminated
* Properly restart advertisement on disconnection
This commit is contained in:
Josh Wu
2023-12-06 21:02:30 +08:00
parent 88b4cbdf1a
commit 72ac75a98d
3 changed files with 203 additions and 65 deletions

View File

@@ -437,6 +437,34 @@ class AdvertisingType(IntEnum):
) )
# -----------------------------------------------------------------------------
@dataclass
class LegacyAdvertiser:
advertising_type: AdvertisingType
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]
# -----------------------------------------------------------------------------
@dataclass
class ExtendedAdvertiser(CompositeEventEmitter):
device: Device
handle: int
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]
def __post_init__(self) -> None:
super().__init__()
async def stop(self) -> None:
await self.device.stop_extended_advertising(self.handle)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class LePhyOptions: class LePhyOptions:
# Coded PHY preference # Coded PHY preference
@@ -658,6 +686,9 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int] pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int] pairing_peer_authentication_requirements: Optional[int]
advertiser_after_disconnection: Union[
LegacyAdvertiser, ExtendedAdvertiser, None
] = None
@composite_listener @composite_listener
class Listener: class Listener:
@@ -1063,7 +1094,8 @@ class Device(CompositeEventEmitter):
] ]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration config: DeviceConfiguration
extended_advertising_handles: Set[int] legacy_advertiser: Optional[LegacyAdvertiser]
extended_advertisers: Dict[int, ExtendedAdvertiser]
sco_links: Dict[int, ScoLink] sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink] cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]] _pending_cis: Dict[int, Tuple[int, int]]
@@ -1141,10 +1173,7 @@ class Device(CompositeEventEmitter):
self._host = None self._host = None
self.powered_on = False self.powered_on = False
self.advertising = False
self.advertising_type = None
self.auto_restart_inquiry = True self.auto_restart_inquiry = True
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self) self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self) self.sdp_server = sdp.Server(self)
@@ -1168,10 +1197,10 @@ class Device(CompositeEventEmitter):
self.classic_pending_accepts = { self.classic_pending_accepts = {
Address.ANY: [] Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY } # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set() self.legacy_advertiser = None
self.extended_advertisers = {}
# Own address type cache # Own address type cache
self.advertising_own_address_type = None
self.connect_own_address_type = None self.connect_own_address_type = None
# Use the initial config or a default # Use the initial config or a default
@@ -1579,6 +1608,7 @@ class Device(CompositeEventEmitter):
return self.host.supports_le_feature(feature_map[phy]) return self.host.supports_le_feature(feature_map[phy])
@deprecated("Please use start_legacy_advertising.")
async def start_advertising( async def start_advertising(
self, self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
@@ -1586,15 +1616,49 @@ class Device(CompositeEventEmitter):
own_address_type: int = OwnAddressType.RANDOM, own_address_type: int = OwnAddressType.RANDOM,
auto_restart: bool = False, auto_restart: bool = False,
) -> None: ) -> None:
await self.start_legacy_advertising(
advertising_type=advertising_type,
target=target,
own_address_type=OwnAddressType(own_address_type),
auto_restart=auto_restart,
)
async def start_legacy_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[Address] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
auto_restart: bool = False,
advertising_data: Optional[bytes] = None,
scan_response_data: Optional[bytes] = None,
) -> LegacyAdvertiser:
"""Starts an legacy advertisement.
Args:
advertising_type: Advertising type passed to HCI_LE_Set_Advertising_Parameters_Command.
target: Directed advertising target. Directed type should be set in advertising_type arg.
own_address_type: own address type to use in the advertising.
auto_restart: whether the advertisement will be restarted after disconnection.
scan_response_data: raw scan response.
advertising_data: raw advertising data.
Returns:
LegacyAdvertiser object containing the metadata of advertisement.
"""
if self.extended_advertisers:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)
# If we're advertising, stop first # If we're advertising, stop first
if self.advertising: if self.legacy_advertiser:
await self.stop_advertising() await self.stop_advertising()
# Set/update the advertising data if the advertising type allows it # Set/update the advertising data if the advertising type allows it
if advertising_type.has_data: if advertising_type.has_data:
await self.send_command( await self.send_command(
HCI_LE_Set_Advertising_Data_Command( HCI_LE_Set_Advertising_Data_Command(
advertising_data=self.advertising_data advertising_data=advertising_data or self.advertising_data or b''
), ),
check_result=True, check_result=True,
) )
@@ -1603,7 +1667,9 @@ class Device(CompositeEventEmitter):
if advertising_type.is_scannable: if advertising_type.is_scannable:
await self.send_command( await self.send_command(
HCI_LE_Set_Scan_Response_Data_Command( HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data=self.scan_response_data scan_response_data=scan_response_data
or self.scan_response_data
or b''
), ),
check_result=True, check_result=True,
) )
@@ -1640,45 +1706,56 @@ class Device(CompositeEventEmitter):
check_result=True, check_result=True,
) )
self.advertising_type = advertising_type self.legacy_advertiser = LegacyAdvertiser(
self.advertising_own_address_type = own_address_type advertising_type=advertising_type,
self.advertising = True own_address_type=own_address_type,
self.auto_restart_advertising = auto_restart auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return self.legacy_advertiser
@deprecated("Please use stop_legacy_advertising.")
async def stop_advertising(self) -> None: async def stop_advertising(self) -> None:
await self.stop_legacy_advertising()
async def stop_legacy_advertising(self) -> None:
# Disable advertising # Disable advertising
if self.advertising: if self.legacy_advertiser:
await self.send_command( await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
check_result=True, check_result=True,
) )
self.advertising_type = None self.legacy_advertiser = None
self.advertising_own_address_type = None
self.advertising = False
self.auto_restart_advertising = False
@experimental('Extended Advertising is still experimental - Might be changed soon.') @experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising( async def start_extended_advertising(
self, self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING, advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY, target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM, own_address_type: OwnAddressType = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None, auto_restart: bool = True,
advertising_data: Optional[bytes] = None, advertising_data: Optional[bytes] = None,
) -> int: scan_response_data: Optional[bytes] = None,
) -> ExtendedAdvertiser:
"""Starts an extended advertising set. """Starts an extended advertising set.
Args: Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg. target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising. own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. auto_restart: whether the advertisement will be restarted after disconnection.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent. advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
scan_response_data: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
Returns: Returns:
Handle of the new advertising set. ExtendedAdvertiser object containing the metadata of advertisement.
""" """
if self.legacy_advertiser:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)
adv_handle = -1 adv_handle = -1
# Find a free handle # Find a free handle
@@ -1686,7 +1763,7 @@ class Device(CompositeEventEmitter):
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
): ):
if i not in self.extended_advertising_handles: if i not in self.extended_advertisers.keys():
adv_handle = i adv_handle = i
break break
@@ -1733,13 +1810,13 @@ class Device(CompositeEventEmitter):
) )
# Set the scan response if present # Set the scan response if present
if scan_response is not None: if scan_response_data is not None:
await self.send_command( await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command( HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle, advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response, scan_response_data=scan_response_data,
), ),
check_result=True, check_result=True,
) )
@@ -1774,8 +1851,16 @@ class Device(CompositeEventEmitter):
) )
raise error raise error
self.extended_advertising_handles.add(adv_handle) advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser(
return adv_handle device=self,
handle=adv_handle,
advertising_properties=advertising_properties,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return advertiser
@experimental('Extended Advertising is still experimental - Might be changed soon.') @experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None: async def stop_extended_advertising(self, adv_handle: int) -> None:
@@ -1799,11 +1884,11 @@ class Device(CompositeEventEmitter):
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=True, check_result=True,
) )
self.extended_advertising_handles.remove(adv_handle) del self.extended_advertisers[adv_handle]
@property @property
def is_advertising(self): def is_advertising(self):
return self.advertising return self.legacy_advertiser or self.extended_advertisers
async def start_scanning( async def start_scanning(
self, self,
@@ -3144,13 +3229,17 @@ class Device(CompositeEventEmitter):
# Guess which own address type is used for this connection. # Guess which own address type is used for this connection.
# This logic is somewhat correct but may need to be improved # This logic is somewhat correct but may need to be improved
# when multiple advertising are run simultaneously. # when multiple advertising are run simultaneously.
advertiser = None
if self.connect_own_address_type is not None: if self.connect_own_address_type is not None:
own_address_type = self.connect_own_address_type own_address_type = self.connect_own_address_type
elif self.legacy_advertiser:
own_address_type = self.legacy_advertiser.own_address_type
# Store advertiser for restarting - it's only required for legacy, since
# extended advertisement produces HCI_Advertising_Set_Terminated.
advertiser = self.legacy_advertiser
else: else:
own_address_type = self.advertising_own_address_type # For extended advertisement, determining own address type later.
own_address_type = OwnAddressType.RANDOM
# We are no longer advertising
self.advertising = False
if own_address_type in ( if own_address_type in (
OwnAddressType.PUBLIC, OwnAddressType.PUBLIC,
@@ -3172,6 +3261,7 @@ class Device(CompositeEventEmitter):
connection_parameters, connection_parameters,
ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
) )
connection.advertiser_after_disconnection = advertiser
self.connections[connection_handle] = connection self.connections[connection_handle] = connection
# If supported, read which PHY we're connected with before # If supported, read which PHY we're connected with before
@@ -3203,10 +3293,10 @@ class Device(CompositeEventEmitter):
# For directed advertising, this means a timeout # For directed advertising, this means a timeout
if ( if (
transport == BT_LE_TRANSPORT transport == BT_LE_TRANSPORT
and self.advertising and self.legacy_advertiser
and self.advertising_type.is_directed and self.legacy_advertiser.advertising_type.is_directed
): ):
self.advertising = False self.legacy_advertiser = None
# Notify listeners # Notify listeners
error = core.ConnectionError( error = core.ConnectionError(
@@ -3268,16 +3358,28 @@ class Device(CompositeEventEmitter):
self.gatt_server.on_disconnection(connection) self.gatt_server.on_disconnection(connection)
# Restart advertising if auto-restart is enabled # Restart advertising if auto-restart is enabled
if self.auto_restart_advertising: if advertiser := connection.advertiser_after_disconnection:
logger.debug('restarting advertising') logger.debug('restarting advertising')
self.abort_on( if isinstance(advertiser, LegacyAdvertiser):
'flush', self.abort_on(
self.start_advertising( 'flush',
advertising_type=self.advertising_type, # type: ignore[arg-type] self.start_legacy_advertising(
own_address_type=self.advertising_own_address_type, # type: ignore[arg-type] advertising_type=advertiser.advertising_type,
auto_restart=True, own_address_type=advertiser.own_address_type,
), auto_restart=True,
) ),
)
elif isinstance(advertiser, ExtendedAdvertiser):
self.abort_on(
'flush',
self.start_extended_advertising(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif sco_link := self.sco_links.pop(connection_handle, None): elif sco_link := self.sco_links.pop(connection_handle, None):
sco_link.emit('disconnection', reason) sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None): elif cis_link := self.cis_links.pop(connection_handle, None):
@@ -3600,6 +3702,29 @@ class Device(CompositeEventEmitter):
if sco_link := self.sco_links.get(sco_handle, None): if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet) sco_link.emit('pdu', packet)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_advertising_set_termination(
self,
status: int,
advertising_handle: int,
connection_handle: int,
) -> None:
if status == HCI_SUCCESS:
connection = self.lookup_connection(connection_handle)
if advertiser := self.extended_advertisers.pop(advertising_handle, None):
if connection:
connection.advertiser_after_disconnection = advertiser
if advertiser.own_address_type in (
OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC,
):
connection.self_address = self.public_address
else:
connection.self_address = self.random_address
advertiser.emit('termination', status)
# [LE only] # [LE only]
@host_event_handler @host_event_handler
@with_connection_from_handle @with_connection_from_handle

View File

@@ -1963,25 +1963,15 @@ Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class OwnAddressType: class OwnAddressType(enum.IntEnum):
PUBLIC = 0 PUBLIC = 0
RANDOM = 1 RANDOM = 1
RESOLVABLE_OR_PUBLIC = 2 RESOLVABLE_OR_PUBLIC = 2
RESOLVABLE_OR_RANDOM = 3 RESOLVABLE_OR_RANDOM = 3
TYPE_NAMES = { @classmethod
PUBLIC: 'PUBLIC', def type_spec(cls):
RANDOM: 'RANDOM', return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name}
RESOLVABLE_OR_PUBLIC: 'RESOLVABLE_OR_PUBLIC',
RESOLVABLE_OR_RANDOM: 'RESOLVABLE_OR_RANDOM',
}
@staticmethod
def type_name(type_id):
return name_or_number(OwnAddressType.TYPE_NAMES, type_id)
# pylint: disable-next=unnecessary-lambda
TYPE_SPEC = {'size': 1, 'mapper': lambda x: OwnAddressType.type_name(x)}
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -3374,7 +3364,7 @@ class HCI_LE_Set_Random_Address_Command(HCI_Command):
), ),
}, },
), ),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type), ('peer_address', Address.parse_address_preceded_by_type),
('advertising_channel_map', 1), ('advertising_channel_map', 1),
@@ -3467,7 +3457,7 @@ class HCI_LE_Set_Advertising_Enable_Command(HCI_Command):
('le_scan_type', 1), ('le_scan_type', 1),
('le_scan_interval', 2), ('le_scan_interval', 2),
('le_scan_window', 2), ('le_scan_window', 2),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('scanning_filter_policy', 1), ('scanning_filter_policy', 1),
] ]
) )
@@ -3506,7 +3496,7 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command):
('initiator_filter_policy', 1), ('initiator_filter_policy', 1),
('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type), ('peer_address', Address.parse_address_preceded_by_type),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('connection_interval_min', 2), ('connection_interval_min', 2),
('connection_interval_max', 2), ('connection_interval_max', 2),
('max_latency', 2), ('max_latency', 2),
@@ -3913,7 +3903,7 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
), ),
}, },
), ),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type), ('peer_address', Address.parse_address_preceded_by_type),
('advertising_filter_policy', 1), ('advertising_filter_policy', 1),
@@ -4309,7 +4299,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('initiator_filter_policy:', self.initiator_filter_policy), ('initiator_filter_policy:', self.initiator_filter_policy),
( (
'own_address_type: ', 'own_address_type: ',
OwnAddressType.type_name(self.own_address_type), OwnAddressType(self.own_address_type).name,
), ),
( (
'peer_address_type: ', 'peer_address_type: ',
@@ -5190,6 +5180,21 @@ HCI_LE_Meta_Event.subevent_classes[
] = HCI_LE_Extended_Advertising_Report_Event ] = HCI_LE_Extended_Advertising_Report_Event
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', 1),
('advertising_handle', 1),
('connection_handle', 2),
('number_completed_extended_advertising_events', 1),
]
)
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.18 LE Advertising Set Terminated Event
'''
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event([('connection_handle', 2), ('channel_selection_algorithm', 1)]) @HCI_LE_Meta_Event.event([('connection_handle', 2), ('channel_selection_algorithm', 1)])
class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event): class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event):

View File

@@ -721,6 +721,14 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event): def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event) self.on_hci_le_advertising_report_event(event)
def on_hci_le_advertising_set_terminated_event(self, event):
self.emit(
'advertising_set_termination',
event.status,
event.advertising_handle,
event.connection_handle,
)
def on_hci_le_cis_request_event(self, event): def on_hci_le_cis_request_event(self, event):
self.emit( self.emit(
'cis_request', 'cis_request',