Support Periodic Advertising

This commit is contained in:
Josh Wu
2024-11-16 03:38:44 +08:00
parent 5e959d638e
commit bbcd14dbf0
4 changed files with 235 additions and 34 deletions

View File

@@ -1543,6 +1543,41 @@ class Controller:
} }
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_set_random_address_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS, 0])
def on_hci_le_set_extended_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_scan_response_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command): def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
''' '''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
@@ -1557,6 +1592,27 @@ class Controller:
''' '''
return struct.pack('<BB', HCI_SUCCESS, 0xF0) return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_set_periodic_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, _command): def on_hci_le_read_transmit_power_command(self, _command):
''' '''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command

View File

@@ -557,8 +557,15 @@ class AdvertisingParameters:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclass @dataclass
class PeriodicAdvertisingParameters: class PeriodicAdvertisingParameters:
# TODO implement this class periodic_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
pass periodic_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
periodic_advertising_properties: (
hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command.Properties
) = field(
default_factory=lambda: hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command.Properties(
0
)
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -575,6 +582,7 @@ class AdvertisingSet(EventEmitter):
periodic_advertising_data: bytes periodic_advertising_data: bytes
selected_tx_power: int = 0 selected_tx_power: int = 0
enabled: bool = False enabled: bool = False
periodic_enabled: bool = False
def __post_init__(self) -> None: def __post_init__(self) -> None:
super().__init__() super().__init__()
@@ -603,7 +611,7 @@ class AdvertisingSet(EventEmitter):
int(advertising_parameters.primary_advertising_interval_min / 0.625) int(advertising_parameters.primary_advertising_interval_min / 0.625)
), ),
primary_advertising_interval_max=( primary_advertising_interval_max=(
int(advertising_parameters.primary_advertising_interval_min / 0.625) int(advertising_parameters.primary_advertising_interval_max / 0.625)
), ),
primary_advertising_channel_map=int( primary_advertising_channel_map=int(
advertising_parameters.primary_advertising_channel_map advertising_parameters.primary_advertising_channel_map
@@ -671,10 +679,26 @@ class AdvertisingSet(EventEmitter):
async def set_periodic_advertising_parameters( async def set_periodic_advertising_parameters(
self, advertising_parameters: PeriodicAdvertisingParameters self, advertising_parameters: PeriodicAdvertisingParameters
) -> None: ) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command(
advertising_handle=self.advertising_handle,
periodic_advertising_interval_min=advertising_parameters.periodic_advertising_interval_min,
periodic_advertising_interval_max=advertising_parameters.periodic_advertising_interval_max,
periodic_advertising_properties=advertising_parameters.periodic_advertising_properties,
),
check_result=True,
)
self.periodic_advertising_parameters = advertising_parameters self.periodic_advertising_parameters = advertising_parameters
async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: async def set_periodic_advertising_data(self, advertising_data: bytes) -> None:
# TODO: send command await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Data_Command(
advertising_handle=self.advertising_handle,
operation=hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
advertising_data=advertising_data,
),
check_result=True,
)
self.periodic_advertising_data = advertising_data self.periodic_advertising_data = advertising_data
async def set_random_address(self, random_address: hci.Address) -> None: async def set_random_address(self, random_address: hci.Address) -> None:
@@ -712,17 +736,6 @@ class AdvertisingSet(EventEmitter):
self.emit('start') self.emit('start')
async def start_periodic(self, include_adi: bool = False) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=1 | (2 if include_adi else 0),
advertising_handles=self.advertising_handle,
),
check_result=True,
)
self.emit('start_periodic')
async def stop(self) -> None: async def stop(self) -> None:
await self.device.send_command( await self.device.send_command(
hci.HCI_LE_Set_Extended_Advertising_Enable_Command( hci.HCI_LE_Set_Extended_Advertising_Enable_Command(
@@ -737,14 +750,31 @@ class AdvertisingSet(EventEmitter):
self.emit('stop') self.emit('stop')
async def stop_periodic(self) -> None: async def start_periodic(self, include_adi: bool = False) -> None:
if self.periodic_enabled:
return
await self.device.send_command( await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command( hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=0, enable=1 | (2 if include_adi else 0),
advertising_handles=self.advertising_handle, advertising_handle=self.advertising_handle,
), ),
check_result=True, check_result=True,
) )
self.periodic_enabled = True
self.emit('start_periodic')
async def stop_periodic(self) -> None:
if not self.periodic_enabled:
return
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=0,
advertising_handle=self.advertising_handle,
),
check_result=True,
)
self.periodic_enabled = False
self.emit('stop_periodic') self.emit('stop_periodic')
@@ -2460,14 +2490,27 @@ class Device(CompositeEventEmitter):
if advertising_parameters is None: if advertising_parameters is None:
advertising_parameters = AdvertisingParameters() advertising_parameters = AdvertisingParameters()
if periodic_advertising_data and periodic_advertising_parameters is None:
periodic_advertising_parameters = PeriodicAdvertisingParameters()
if ( if (
not advertising_parameters.advertising_event_properties.is_legacy not advertising_parameters.advertising_event_properties.is_legacy
and advertising_data and advertising_data
and scan_response_data and scan_response_data
): ):
raise InvalidArgumentError( raise InvalidArgumentError(
"Extended advertisements can't have both data and scan \ "Extended advertisements can't have both data and scan response data"
response data" )
if periodic_advertising_parameters and (
advertising_parameters.advertising_event_properties.is_connectable
or advertising_parameters.advertising_event_properties.is_scannable
or advertising_parameters.advertising_event_properties.is_anonymous
or advertising_parameters.advertising_event_properties.is_legacy
):
raise InvalidArgumentError(
"Periodic advertising set cannot be connectable, scannable, anonymous,"
"or legacy"
) )
# Allocate a new handle # Allocate a new handle
@@ -2522,12 +2565,14 @@ class Device(CompositeEventEmitter):
await advertising_set.set_scan_response_data(scan_response_data) await advertising_set.set_scan_response_data(scan_response_data)
if periodic_advertising_parameters: if periodic_advertising_parameters:
# TODO: call LE Set Periodic Advertising Parameters command await advertising_set.set_periodic_advertising_parameters(
raise NotImplementedError('periodic advertising not yet supported') periodic_advertising_parameters
)
if periodic_advertising_data: if periodic_advertising_data:
# TODO: call LE Set Periodic Advertising Data command await advertising_set.set_periodic_advertising_data(
raise NotImplementedError('periodic advertising not yet supported') periodic_advertising_data
)
except hci.HCI_Error as error: except hci.HCI_Error as error:
# Remove the advertising set so that it doesn't stay dangling in the # Remove the advertising set so that it doesn't stay dangling in the

View File

@@ -4331,6 +4331,61 @@ class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command):
''' '''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
('periodic_advertising_interval_min', 2),
('periodic_advertising_interval_max', 2),
('periodic_advertising_properties', 2),
]
)
class HCI_LE_Set_Periodic_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.61 LE Set Periodic Advertising Parameters command
'''
class Properties(enum.IntFlag):
INCLUDE_TX_POWER = 1 << 6
advertising_handle: int
periodic_advertising_interval_min: int
periodic_advertising_interval_max: int
periodic_advertising_properties: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
(
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
).name,
},
),
(
'advertising_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
)
class HCI_LE_Set_Periodic_Advertising_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.62 LE Set Periodic Advertising Data command
'''
advertising_handle: int
operation: int
advertising_data: bytes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command([('enable', 1), ('advertising_handle', 1)]) @HCI_Command.command([('enable', 1), ('advertising_handle', 1)])
class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command): class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command):

View File

@@ -19,9 +19,7 @@ import asyncio
import functools import functools
import logging import logging
import os import os
from types import LambdaType
import pytest import pytest
from unittest import mock
from bumble.core import ( from bumble.core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
@@ -29,7 +27,13 @@ from bumble.core import (
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
ConnectionParameters, ConnectionParameters,
) )
from bumble.device import AdvertisingParameters, Connection, Device from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
Connection,
Device,
PeriodicAdvertisingParameters,
)
from bumble.host import AclPacketQueue, Host from bumble.host import AclPacketQueue, Host
from bumble.hci import ( from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -265,7 +269,8 @@ async def test_flush():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_legacy_advertising(): async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host)) device = TwoDevices()[0]
await device.power_on()
# Start advertising # Start advertising
await device.start_advertising() await device.start_advertising()
@@ -283,7 +288,10 @@ async def test_legacy_advertising():
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart): async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host)) devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart) await device.start_advertising(auto_restart=auto_restart)
device.on_connection( device.on_connection(
@@ -305,6 +313,11 @@ async def test_legacy_advertising_disconnection(auto_restart):
await async_barrier() await async_barrier()
if auto_restart: if auto_restart:
assert device.legacy_advertising_set
started = asyncio.Event()
if not device.is_advertising:
device.legacy_advertising_set.once('start', started.set)
await asyncio.wait_for(started.wait(), _TIMEOUT)
assert device.is_advertising assert device.is_advertising
else: else:
assert not device.is_advertising assert not device.is_advertising
@@ -313,7 +326,8 @@ async def test_legacy_advertising_disconnection(auto_restart):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising(): async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host)) device = TwoDevices()[0]
await device.power_on()
# Start advertising # Start advertising
advertising_set = await device.create_advertising_set() advertising_set = await device.create_advertising_set()
@@ -332,7 +346,8 @@ async def test_extended_advertising():
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type): async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) device = TwoDevices()[0]
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
@@ -368,8 +383,10 @@ async def test_extended_advertising_connection(own_address_type):
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising_connection_out_of_order(own_address_type): async def test_extended_advertising_connection_out_of_order(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) devices = TwoDevices()
peer_address = Address('F0:F1:F2:F3:F4:F5') device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
) )
@@ -382,7 +399,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
peer_address, Address('F0:F1:F2:F3:F4:F5'),
None, None,
None, None,
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
@@ -397,6 +414,34 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
await async_barrier() await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_periodic_advertising():
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False
)
),
advertising_data=b'123',
periodic_advertising_parameters=PeriodicAdvertisingParameters(),
periodic_advertising_data=b'abc',
)
assert device.extended_advertising_sets
assert advertising_set.enabled
assert not advertising_set.periodic_enabled
await advertising_set.start_periodic()
assert advertising_set.periodic_enabled
await advertising_set.stop_periodic()
assert not advertising_set.periodic_enabled
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_remote_le_features(): async def test_get_remote_le_features():