This commit is contained in:
Gilles Boccon-Gibod
2023-12-26 14:47:50 -08:00
parent d3273ffa8c
commit bad9ce272c
7 changed files with 47 additions and 23 deletions

View File

@@ -664,6 +664,7 @@ class AdvertisingSet(EventEmitter):
self.advertising_data = advertising_data self.advertising_data = advertising_data
async def set_scan_response_data(self, scan_response_data: bytes) -> None: async def set_scan_response_data(self, scan_response_data: bytes) -> None:
# pylint: disable=line-too-long
if ( if (
scan_response_data scan_response_data
and not self.advertising_parameters.advertising_event_properties.is_scannable and not self.advertising_parameters.advertising_event_properties.is_scannable
@@ -674,7 +675,6 @@ class AdvertisingSet(EventEmitter):
) )
return return
# pylint: disable=line-too-long
await self.device.send_command( await self.device.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command( HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=self.advertising_handle, advertising_handle=self.advertising_handle,
@@ -2016,6 +2016,7 @@ class Device(CompositeEventEmitter):
if self.supports_le_extended_advertising: if self.supports_le_extended_advertising:
# Use extended advertising commands with legacy PDUs. # Use extended advertising commands with legacy PDUs.
self.legacy_advertising_set = await self.create_advertising_set( self.legacy_advertising_set = await self.create_advertising_set(
auto_start=True,
auto_restart=auto_restart, auto_restart=auto_restart,
random_address=self.random_address, random_address=self.random_address,
advertising_parameters=AdvertisingParameters( advertising_parameters=AdvertisingParameters(
@@ -2036,8 +2037,6 @@ class Device(CompositeEventEmitter):
self.scan_response_data if advertising_type.is_scannable else b'' self.scan_response_data if advertising_type.is_scannable else b''
), ),
) )
await self.legacy_advertising_set.start()
else: else:
# Use legacy commands. # Use legacy commands.
self.legacy_advertiser = LegacyAdvertiser( self.legacy_advertiser = LegacyAdvertiser(
@@ -2068,8 +2067,37 @@ class Device(CompositeEventEmitter):
scan_response_data: bytes = b'', scan_response_data: bytes = b'',
periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None, periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None,
periodic_advertising_data: bytes = b'', periodic_advertising_data: bytes = b'',
auto_start: bool = True,
auto_restart: bool = False, auto_restart: bool = False,
) -> AdvertisingSet: ) -> AdvertisingSet:
"""
Create an advertising set.
This method allows the creation of advertising sets for controllers that
support extended advertising.
Args:
advertising_parameters:
The parameters to use for this set. If None, default parameters are used.
random_address:
The random address to use (only relevant when the parameters specify that
own_address_type is random).
advertising_data:
Initial value for the set's advertising data.
scan_response_data:
Initial value for the set's scan response data.
periodic_advertising_parameters:
The parameters to use for periodic advertising (if needed).
periodic_advertising_data:
Initial value for the set's periodic advertising data.
auto_start:
True if the set should be automatically started upon creation.
auto_restart:
True if the set should be automatically restated after a disconnection.
Returns:
An AdvertisingSet instance.
"""
# Allocate a new handle # Allocate a new handle
try: try:
advertising_handle = next( advertising_handle = next(
@@ -2145,6 +2173,15 @@ class Device(CompositeEventEmitter):
# Remember the set. # Remember the set.
self.extended_advertising_sets[advertising_handle] = advertising_set self.extended_advertising_sets[advertising_handle] = advertising_set
# Try to start the set if requested.
if auto_start:
try:
await advertising_set.start()
except Exception as error:
logger.exception(f'failed to start advertising set: {error}')
await advertising_set.remove()
raise
return advertising_set return advertising_set
@property @property

View File

@@ -63,7 +63,6 @@ async def main() -> None:
await asyncio.gather(*[device.power_on() for device in devices]) await asyncio.gather(*[device.power_on() for device in devices])
advertising_set = await devices[0].create_advertising_set() advertising_set = await devices[0].create_advertising_set()
await advertising_set.start()
connection = await devices[1].connect( connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC devices[0].public_address, own_address_type=OwnAddressType.PUBLIC

View File

@@ -98,13 +98,7 @@ async def main() -> None:
) )
+ csis.get_advertising_data() + csis.get_advertising_data()
) )
await device.start_extended_advertising( await device.create_advertising_set(advertising_data=advertising_data)
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await asyncio.gather( await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports] *[hci_transport.source.terminated for hci_transport in hci_transports]

View File

@@ -59,14 +59,12 @@ async def main() -> None:
sys.argv[1], hci_transport.source, hci_transport.sink sys.argv[1], hci_transport.source, hci_transport.sink
) )
await device.power_on() await device.power_on()
await ( await device.create_advertising_set(
await device.create_advertising_set( advertising_parameters=AdvertisingParameters(
advertising_parameters=AdvertisingParameters( advertising_event_properties=advertising_properties,
advertising_event_properties=advertising_properties, peer_address=peer_address,
peer_address=peer_address,
)
) )
).start() )
await hci_transport.source.terminated await hci_transport.source.terminated

View File

@@ -59,7 +59,6 @@ async def main() -> None:
advertising_data=bytes(advertising_data1), advertising_data=bytes(advertising_data1),
) )
print("Selected TX power 1:", set1.selected_tx_power) print("Selected TX power 1:", set1.selected_tx_power)
await set1.start()
advertising_data2 = AdvertisingData( advertising_data2 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))] [(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))]
@@ -70,6 +69,7 @@ async def main() -> None:
random_address=Address("F0:F0:F0:F0:F0:F1"), random_address=Address("F0:F0:F0:F0:F0:F1"),
advertising_parameters=AdvertisingParameters(), advertising_parameters=AdvertisingParameters(),
advertising_data=bytes(advertising_data2), advertising_data=bytes(advertising_data2),
auto_start=False,
auto_restart=True, auto_restart=True,
) )
print("Selected TX power 2:", set2.selected_tx_power) print("Selected TX power 2:", set2.selected_tx_power)
@@ -90,7 +90,6 @@ async def main() -> None:
scan_response_data=bytes(scan_response_data3), scan_response_data=bytes(scan_response_data3),
) )
print("Selected TX power 3:", set2.selected_tx_power) print("Selected TX power 3:", set2.selected_tx_power)
await set3.start()
await hci_transport.source.terminated await hci_transport.source.terminated

View File

@@ -181,7 +181,6 @@ async def main() -> None:
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_data=advertising_data, advertising_data=advertising_data,
) )
await advertising_set.start()
await hci_transport.source.terminated await hci_transport.source.terminated

View File

@@ -327,7 +327,6 @@ async def test_extended_advertising():
# Start advertising # Start advertising
advertising_set = await device.create_advertising_set() advertising_set = await device.create_advertising_set()
await advertising_set.start()
assert device.extended_advertising_sets assert device.extended_advertising_sets
assert advertising_set.enabled assert advertising_set.enabled
@@ -348,7 +347,6 @@ async def test_extended_advertising_connection(own_address_type):
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
) )
await advertising_set.start()
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,