Add support for Extended Advertising

This commit is contained in:
Josh Wu
2023-11-17 00:29:26 +08:00
parent 9bf2e03354
commit a9628f73e3
4 changed files with 267 additions and 73 deletions

View File

@@ -32,6 +32,7 @@ from typing import (
Optional, Optional,
Tuple, Tuple,
Type, Type,
Set,
Union, Union,
cast, cast,
overload, overload,
@@ -99,14 +100,20 @@ from .hci import (
HCI_LE_Extended_Create_Connection_Command, HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command, HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command, HCI_LE_Read_PHY_Command,
HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command, HCI_LE_Set_Address_Resolution_Enable_Command,
HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command, HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command, HCI_LE_Set_Advertising_Parameters_Command,
HCI_LE_Set_Advertising_Set_Random_Address_Command,
HCI_LE_Set_Data_Length_Command, HCI_LE_Set_Data_Length_Command,
HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Extended_Scan_Enable_Command, HCI_LE_Set_Extended_Scan_Enable_Command,
HCI_LE_Set_Extended_Scan_Parameters_Command, HCI_LE_Set_Extended_Scan_Parameters_Command,
HCI_LE_Set_Extended_Scan_Response_Data_Command,
HCI_LE_Set_Extended_Advertising_Data_Command,
HCI_LE_Set_Extended_Advertising_Enable_Command,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
HCI_LE_Set_PHY_Command, HCI_LE_Set_PHY_Command,
HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command, HCI_LE_Set_Scan_Enable_Command,
@@ -155,6 +162,7 @@ from .utils import (
setup_event_forwarding, setup_event_forwarding,
composite_listener, composite_listener,
deprecated, deprecated,
experimental,
) )
from .keys import ( from .keys import (
KeyStore, KeyStore,
@@ -189,6 +197,8 @@ DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240 DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127 DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20 DEVICE_MAX_LE_RSSI = 20
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00' DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
@@ -960,6 +970,7 @@ class Device(CompositeEventEmitter):
] ]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration config: DeviceConfiguration
extended_advertising_handles: Set[int]
@composite_listener @composite_listener
class Listener: class Listener:
@@ -1058,6 +1069,7 @@ class Device(CompositeEventEmitter):
self.classic_pending_accepts = { self.classic_pending_accepts = {
Address.ANY: [] Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY } # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()
# Own address type cache # Own address type cache
self.advertising_own_address_type = None self.advertising_own_address_type = None
@@ -1536,6 +1548,149 @@ class Device(CompositeEventEmitter):
self.advertising = False self.advertising = False
self.auto_restart_advertising = False self.auto_restart_advertising = False
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
advertising_data: Optional[bytes] = None,
) -> int:
"""Starts an extended advertising set.
Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
Returns:
Handle of the new advertising set.
"""
adv_handle = -1
# Find a free handle
for i in range(
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
):
if i not in self.extended_advertising_handles:
adv_handle = i
break
if adv_handle == -1:
raise InvalidStateError('No available advertising set.')
try:
# Set the advertising parameters
await self.send_command(
HCI_LE_Set_Extended_Advertising_Parameters_Command(
advertising_handle=adv_handle,
advertising_event_properties=advertising_properties,
primary_advertising_interval_min=self.advertising_interval_min,
primary_advertising_interval_max=self.advertising_interval_max,
primary_advertising_channel_map=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39
),
own_address_type=own_address_type,
peer_address_type=target.address_type,
peer_address=target,
advertising_tx_power=7,
advertising_filter_policy=0,
primary_advertising_phy=1, # LE 1M
secondary_advertising_max_skip=0,
secondary_advertising_phy=1, # LE 1M
advertising_sid=0,
scan_request_notification_enable=0,
), # type: ignore[call-arg]
check_result=True,
)
# Set the advertising data if present
if advertising_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Advertising_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
advertising_data=advertising_data,
), # type: ignore[call-arg]
check_result=True,
)
# Set the scan response if present
if scan_response is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
), # type: ignore[call-arg]
check_result=True,
)
if own_address_type in (
OwnAddressType.RANDOM,
OwnAddressType.RESOLVABLE_OR_RANDOM,
):
await self.send_command(
HCI_LE_Set_Advertising_Set_Random_Address_Command(
advertising_handle=adv_handle,
random_address=self.random_address,
), # type: ignore[call-arg]
check_result=True,
)
# Enable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=1,
advertising_handles=[adv_handle],
durations=[0], # Forever
max_extended_advertising_events=[0], # Infinite
), # type: ignore[call-arg]
check_result=True,
)
except HCI_Error as error:
# When any step fails, cleanup the advertising handle.
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=False,
)
raise error
self.extended_advertising_handles.add(adv_handle)
return adv_handle
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None:
"""Stops an extended advertising set.
Args:
adv_handle: Handle of the advertising set to stop.
"""
# Disable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=0,
advertising_handles=[adv_handle],
durations=[0],
max_extended_advertising_events=[0],
), # type: ignore[call-arg]
check_result=True,
)
# Remove advertising set
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
@property @property
def is_advertising(self): def is_advertising(self):
return self.advertising return self.advertising

View File

@@ -3829,9 +3829,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'advertising_event_properties', 'advertising_event_properties',
{ {
'size': 2, 'size': 2,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string( 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
x x
), ).name,
}, },
), ),
('primary_advertising_interval_min', 3), ('primary_advertising_interval_min', 3),
@@ -3840,9 +3840,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'primary_advertising_channel_map', 'primary_advertising_channel_map',
{ {
'size': 1, 'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string( 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(
x x
), ).name,
}, },
), ),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.TYPE_SPEC),
@@ -3863,38 +3863,19 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
''' '''
CONNECTABLE_ADVERTISING = 0 class AdvertisingProperties(enum.IntFlag):
SCANNABLE_ADVERTISING = 1 CONNECTABLE_ADVERTISING = 1 << 0
DIRECTED_ADVERTISING = 2 SCANNABLE_ADVERTISING = 1 << 1
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3 DIRECTED_ADVERTISING = 1 << 2
USE_LEGACY_ADVERTISING_PDUS = 4 HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3
ANONYMOUS_ADVERTISING = 5 USE_LEGACY_ADVERTISING_PDUS = 1 << 4
INCLUDE_TX_POWER = 6 ANONYMOUS_ADVERTISING = 1 << 5
INCLUDE_TX_POWER = 1 << 6
ADVERTISING_PROPERTIES_NAMES = ( class ChannelMap(enum.IntFlag):
'CONNECTABLE_ADVERTISING', CHANNEL_37 = 1 << 0
'SCANNABLE_ADVERTISING', CHANNEL_38 = 1 << 1
'DIRECTED_ADVERTISING', CHANNEL_39 = 1 << 2
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
'USE_LEGACY_ADVERTISING_PDUS',
'ANONYMOUS_ADVERTISING',
'INCLUDE_TX_POWER',
)
CHANNEL_37 = 0
CHANNEL_38 = 1
CHANNEL_39 = 2
CHANNEL_NAMES = ('37', '38', '39')
@classmethod
def advertising_properties_string(cls, properties):
# pylint: disable=line-too-long
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
@classmethod
def channel_map_string(cls, channel_map):
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -3906,9 +3887,9 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'operation', 'operation',
{ {
'size': 1, 'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x x
), ).name,
}, },
), ),
('fragment_preference', 1), ('fragment_preference', 1),
@@ -3926,24 +3907,13 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
''' '''
class Operation(enum.IntEnum):
INTERMEDIATE_FRAGMENT = 0x00 INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01 FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02 LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03 COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04 UNCHANGED_DATA = 0x04
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
UNCHANGED_DATA: 'UNCHANGED_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.command(
@@ -3954,9 +3924,9 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
'operation', 'operation',
{ {
'size': 1, 'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x x
), ).name,
}, },
), ),
('fragment_preference', 1), ('fragment_preference', 1),
@@ -3974,22 +3944,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
''' '''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.command(

View File

@@ -432,7 +432,7 @@ def wrap_async(function):
def deprecated(msg: str): def deprecated(msg: str):
""" """
Throw deprecation warning before execution Throw deprecation warning before execution.
""" """
def wrapper(function): def wrapper(function):
@@ -444,3 +444,19 @@ def deprecated(msg: str):
return inner return inner
return wrapper return wrapper
def experimental(msg: str):
"""
Throws a future warning before execution.
"""
def wrapper(function):
@wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)
return inner
return wrapper

View File

@@ -0,0 +1,69 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_extended_advertiser.py <config-file> <transport-spec> [type] [address]'
)
print('example: run_extended_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
else:
target = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())