Merge pull request #593 from zxzxwu/periodic

Support Periodic Advertising
This commit is contained in:
zxzxwu
2024-11-19 17:22:37 +08:00
committed by GitHub
4 changed files with 235 additions and 34 deletions

View File

@@ -557,8 +557,15 @@ class AdvertisingParameters:
# -----------------------------------------------------------------------------
@dataclass
class PeriodicAdvertisingParameters:
# TODO implement this class
pass
periodic_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
periodic_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
periodic_advertising_properties: (
hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command.Properties
) = field(
default_factory=lambda: hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command.Properties(
0
)
)
# -----------------------------------------------------------------------------
@@ -575,6 +582,7 @@ class AdvertisingSet(EventEmitter):
periodic_advertising_data: bytes
selected_tx_power: int = 0
enabled: bool = False
periodic_enabled: bool = False
def __post_init__(self) -> None:
super().__init__()
@@ -603,7 +611,7 @@ class AdvertisingSet(EventEmitter):
int(advertising_parameters.primary_advertising_interval_min / 0.625)
),
primary_advertising_interval_max=(
int(advertising_parameters.primary_advertising_interval_min / 0.625)
int(advertising_parameters.primary_advertising_interval_max / 0.625)
),
primary_advertising_channel_map=int(
advertising_parameters.primary_advertising_channel_map
@@ -671,10 +679,26 @@ class AdvertisingSet(EventEmitter):
async def set_periodic_advertising_parameters(
self, advertising_parameters: PeriodicAdvertisingParameters
) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command(
advertising_handle=self.advertising_handle,
periodic_advertising_interval_min=advertising_parameters.periodic_advertising_interval_min,
periodic_advertising_interval_max=advertising_parameters.periodic_advertising_interval_max,
periodic_advertising_properties=advertising_parameters.periodic_advertising_properties,
),
check_result=True,
)
self.periodic_advertising_parameters = advertising_parameters
async def set_periodic_advertising_data(self, advertising_data: bytes) -> None:
# TODO: send command
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Data_Command(
advertising_handle=self.advertising_handle,
operation=hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
advertising_data=advertising_data,
),
check_result=True,
)
self.periodic_advertising_data = advertising_data
async def set_random_address(self, random_address: hci.Address) -> None:
@@ -712,17 +736,6 @@ class AdvertisingSet(EventEmitter):
self.emit('start')
async def start_periodic(self, include_adi: bool = False) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=1 | (2 if include_adi else 0),
advertising_handles=self.advertising_handle,
),
check_result=True,
)
self.emit('start_periodic')
async def stop(self) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Extended_Advertising_Enable_Command(
@@ -737,14 +750,31 @@ class AdvertisingSet(EventEmitter):
self.emit('stop')
async def stop_periodic(self) -> None:
async def start_periodic(self, include_adi: bool = False) -> None:
if self.periodic_enabled:
return
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=0,
advertising_handles=self.advertising_handle,
enable=1 | (2 if include_adi else 0),
advertising_handle=self.advertising_handle,
),
check_result=True,
)
self.periodic_enabled = True
self.emit('start_periodic')
async def stop_periodic(self) -> None:
if not self.periodic_enabled:
return
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=0,
advertising_handle=self.advertising_handle,
),
check_result=True,
)
self.periodic_enabled = False
self.emit('stop_periodic')
@@ -2464,14 +2494,27 @@ class Device(CompositeEventEmitter):
if advertising_parameters is None:
advertising_parameters = AdvertisingParameters()
if periodic_advertising_data and periodic_advertising_parameters is None:
periodic_advertising_parameters = PeriodicAdvertisingParameters()
if (
not advertising_parameters.advertising_event_properties.is_legacy
and advertising_data
and scan_response_data
):
raise InvalidArgumentError(
"Extended advertisements can't have both data and scan \
response data"
"Extended advertisements can't have both data and scan response data"
)
if periodic_advertising_parameters and (
advertising_parameters.advertising_event_properties.is_connectable
or advertising_parameters.advertising_event_properties.is_scannable
or advertising_parameters.advertising_event_properties.is_anonymous
or advertising_parameters.advertising_event_properties.is_legacy
):
raise InvalidArgumentError(
"Periodic advertising set cannot be connectable, scannable, anonymous,"
"or legacy"
)
# Allocate a new handle
@@ -2526,12 +2569,14 @@ class Device(CompositeEventEmitter):
await advertising_set.set_scan_response_data(scan_response_data)
if periodic_advertising_parameters:
# TODO: call LE Set Periodic Advertising Parameters command
raise NotImplementedError('periodic advertising not yet supported')
await advertising_set.set_periodic_advertising_parameters(
periodic_advertising_parameters
)
if periodic_advertising_data:
# TODO: call LE Set Periodic Advertising Data command
raise NotImplementedError('periodic advertising not yet supported')
await advertising_set.set_periodic_advertising_data(
periodic_advertising_data
)
except hci.HCI_Error as error:
# Remove the advertising set so that it doesn't stay dangling in the