Compare commits

...

6 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 98ed772e8a address PR comments and add some typing 2023-12-11 17:52:04 -08:00
Gilles Boccon-Gibod b083cc99ad fix spec parsing 2023-12-08 18:57:02 -08:00
Gilles Boccon-Gibod 62a8ced447 support drivers that can't use reset directly. 2023-12-08 17:28:57 -08:00
zxzxwu 698d947d85 Merge pull request #366 from zxzxwu/extadv
Add advertiser classes and handle adv set terminated events
2023-12-08 09:52:42 +08:00
Josh Wu ff6528d2bf Add Advertising unit tests 2023-12-08 01:38:01 +08:00
Josh Wu 72ac75a98d Add advertiser classes and handle adv set terminated events
* Convert hci.OwnAddressType to enum
* Add LegacyAdvertiser and ExtendedAdvertiser classes
* Rename start/stop_advertising() => start/stop_legacy_advertising()
* Handle HCI_Advertising_Set_Terminated
* Properly restart advertisement on disconnection
2023-12-07 15:51:51 +08:00
13 changed files with 557 additions and 148 deletions
+180 -46
View File
@@ -437,6 +437,38 @@ class AdvertisingType(IntEnum):
)
# -----------------------------------------------------------------------------
@dataclass
class LegacyAdvertiser:
device: Device
advertising_type: AdvertisingType
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]
async def stop(self) -> None:
await self.device.stop_legacy_advertising()
# -----------------------------------------------------------------------------
@dataclass
class ExtendedAdvertiser(CompositeEventEmitter):
device: Device
handle: int
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]
def __post_init__(self) -> None:
super().__init__()
async def stop(self) -> None:
await self.device.stop_extended_advertising(self.handle)
# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
@@ -658,6 +690,9 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
advertiser_after_disconnection: Union[
LegacyAdvertiser, ExtendedAdvertiser, None
] = None
@composite_listener
class Listener:
@@ -1063,7 +1098,8 @@ class Device(CompositeEventEmitter):
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
extended_advertising_handles: Set[int]
legacy_advertiser: Optional[LegacyAdvertiser]
extended_advertisers: Dict[int, ExtendedAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
@@ -1141,10 +1177,7 @@ class Device(CompositeEventEmitter):
self._host = None
self.powered_on = False
self.advertising = False
self.advertising_type = None
self.auto_restart_inquiry = True
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
@@ -1168,10 +1201,10 @@ class Device(CompositeEventEmitter):
self.classic_pending_accepts = {
Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()
self.legacy_advertiser = None
self.extended_advertisers = {}
# Own address type cache
self.advertising_own_address_type = None
self.connect_own_address_type = None
# Use the initial config or a default
@@ -1579,6 +1612,7 @@ class Device(CompositeEventEmitter):
return self.host.supports_le_feature(feature_map[phy])
@deprecated("Please use start_legacy_advertising.")
async def start_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
@@ -1586,15 +1620,49 @@ class Device(CompositeEventEmitter):
own_address_type: int = OwnAddressType.RANDOM,
auto_restart: bool = False,
) -> None:
await self.start_legacy_advertising(
advertising_type=advertising_type,
target=target,
own_address_type=OwnAddressType(own_address_type),
auto_restart=auto_restart,
)
async def start_legacy_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[Address] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
auto_restart: bool = False,
advertising_data: Optional[bytes] = None,
scan_response_data: Optional[bytes] = None,
) -> LegacyAdvertiser:
"""Starts an legacy advertisement.
Args:
advertising_type: Advertising type passed to HCI_LE_Set_Advertising_Parameters_Command.
target: Directed advertising target. Directed type should be set in advertising_type arg.
own_address_type: own address type to use in the advertising.
auto_restart: whether the advertisement will be restarted after disconnection.
scan_response_data: raw scan response.
advertising_data: raw advertising data.
Returns:
LegacyAdvertiser object containing the metadata of advertisement.
"""
if self.extended_advertisers:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)
# If we're advertising, stop first
if self.advertising:
if self.legacy_advertiser:
await self.stop_advertising()
# Set/update the advertising data if the advertising type allows it
if advertising_type.has_data:
await self.send_command(
HCI_LE_Set_Advertising_Data_Command(
advertising_data=self.advertising_data
advertising_data=advertising_data or self.advertising_data or b''
),
check_result=True,
)
@@ -1603,7 +1671,9 @@ class Device(CompositeEventEmitter):
if advertising_type.is_scannable:
await self.send_command(
HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data=self.scan_response_data
scan_response_data=scan_response_data
or self.scan_response_data
or b''
),
check_result=True,
)
@@ -1640,45 +1710,57 @@ class Device(CompositeEventEmitter):
check_result=True,
)
self.advertising_type = advertising_type
self.advertising_own_address_type = own_address_type
self.advertising = True
self.auto_restart_advertising = auto_restart
self.legacy_advertiser = LegacyAdvertiser(
device=self,
advertising_type=advertising_type,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return self.legacy_advertiser
@deprecated("Please use stop_legacy_advertising.")
async def stop_advertising(self) -> None:
await self.stop_legacy_advertising()
async def stop_legacy_advertising(self) -> None:
# Disable advertising
if self.advertising:
if self.legacy_advertiser:
await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
check_result=True,
)
self.advertising_type = None
self.advertising_own_address_type = None
self.advertising = False
self.auto_restart_advertising = False
self.legacy_advertiser = None
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
auto_restart: bool = True,
advertising_data: Optional[bytes] = None,
) -> int:
scan_response_data: Optional[bytes] = None,
) -> ExtendedAdvertiser:
"""Starts an extended advertising set.
Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
auto_restart: whether the advertisement will be restarted after disconnection.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
scan_response_data: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
Returns:
Handle of the new advertising set.
ExtendedAdvertiser object containing the metadata of advertisement.
"""
if self.legacy_advertiser:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)
adv_handle = -1
# Find a free handle
@@ -1686,7 +1768,7 @@ class Device(CompositeEventEmitter):
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
):
if i not in self.extended_advertising_handles:
if i not in self.extended_advertisers:
adv_handle = i
break
@@ -1733,13 +1815,13 @@ class Device(CompositeEventEmitter):
)
# Set the scan response if present
if scan_response is not None:
if scan_response_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
scan_response_data=scan_response_data,
),
check_result=True,
)
@@ -1774,8 +1856,16 @@ class Device(CompositeEventEmitter):
)
raise error
self.extended_advertising_handles.add(adv_handle)
return adv_handle
advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser(
device=self,
handle=adv_handle,
advertising_properties=advertising_properties,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return advertiser
@experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None:
@@ -1799,11 +1889,11 @@ class Device(CompositeEventEmitter):
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
del self.extended_advertisers[adv_handle]
@property
def is_advertising(self):
return self.advertising
return self.legacy_advertiser or self.extended_advertisers
async def start_scanning(
self,
@@ -3144,13 +3234,18 @@ class Device(CompositeEventEmitter):
# Guess which own address type is used for this connection.
# This logic is somewhat correct but may need to be improved
# when multiple advertising are run simultaneously.
advertiser = None
if self.connect_own_address_type is not None:
own_address_type = self.connect_own_address_type
elif self.legacy_advertiser:
own_address_type = self.legacy_advertiser.own_address_type
# Store advertiser for restarting - it's only required for legacy, since
# extended advertisement produces HCI_Advertising_Set_Terminated.
if self.legacy_advertiser.auto_restart:
advertiser = self.legacy_advertiser
else:
own_address_type = self.advertising_own_address_type
# We are no longer advertising
self.advertising = False
# For extended advertisement, determining own address type later.
own_address_type = OwnAddressType.RANDOM
if own_address_type in (
OwnAddressType.PUBLIC,
@@ -3172,6 +3267,7 @@ class Device(CompositeEventEmitter):
connection_parameters,
ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
)
connection.advertiser_after_disconnection = advertiser
self.connections[connection_handle] = connection
# If supported, read which PHY we're connected with before
@@ -3203,10 +3299,10 @@ class Device(CompositeEventEmitter):
# For directed advertising, this means a timeout
if (
transport == BT_LE_TRANSPORT
and self.advertising
and self.advertising_type.is_directed
and self.legacy_advertiser
and self.legacy_advertiser.advertising_type.is_directed
):
self.advertising = False
self.legacy_advertiser = None
# Notify listeners
error = core.ConnectionError(
@@ -3268,16 +3364,30 @@ class Device(CompositeEventEmitter):
self.gatt_server.on_disconnection(connection)
# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
if advertiser := connection.advertiser_after_disconnection:
logger.debug('restarting advertising')
self.abort_on(
'flush',
self.start_advertising(
advertising_type=self.advertising_type, # type: ignore[arg-type]
own_address_type=self.advertising_own_address_type, # type: ignore[arg-type]
auto_restart=True,
),
)
if isinstance(advertiser, LegacyAdvertiser):
self.abort_on(
'flush',
self.start_legacy_advertising(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif isinstance(advertiser, ExtendedAdvertiser):
self.abort_on(
'flush',
self.start_extended_advertising(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif sco_link := self.sco_links.pop(connection_handle, None):
sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
@@ -3600,6 +3710,30 @@ class Device(CompositeEventEmitter):
if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_advertising_set_termination(
self,
status: int,
advertising_handle: int,
connection_handle: int,
) -> None:
if status == HCI_SUCCESS:
connection = self.lookup_connection(connection_handle)
if advertiser := self.extended_advertisers.pop(advertising_handle, None):
if connection:
if advertiser.auto_restart:
connection.advertiser_after_disconnection = advertiser
if advertiser.own_address_type in (
OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC,
):
connection.self_address = self.public_address
else:
connection.self_address = self.random_address
advertiser.emit('termination', status)
# [LE only]
@host_event_handler
@with_connection_from_handle
+28 -32
View File
@@ -19,12 +19,17 @@ like loading firmware after a cold start.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import abc
from __future__ import annotations
import logging
import pathlib
import platform
from . import rtk
from typing import Dict, Iterable, Optional, Type, TYPE_CHECKING
from . import rtk
from .common import Driver
if TYPE_CHECKING:
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
@@ -32,40 +37,31 @@ from . import rtk
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Driver(abc.ABC):
"""Base class for drivers."""
@staticmethod
async def for_host(_host):
"""Return a driver instance for a host.
Args:
host: Host object for which a driver should be created.
Returns:
A Driver instance if a driver should be instantiated for this host, or
None if no driver instance of this class is needed.
"""
return None
@abc.abstractmethod
async def init_controller(self):
"""Initialize the controller."""
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
async def get_driver_for_host(host):
"""Probe all known diver classes until one returns a valid instance for a host,
or none is found.
async def get_driver_for_host(host: Host) -> Optional[Driver]:
"""Probe diver classes until one returns a valid instance for a host, or none is
found.
If a "driver" HCI metadata entry is present, only that driver class will be probed.
"""
if driver := await rtk.Driver.for_host(host):
logger.debug("Instantiated RTK driver")
return driver
driver_classes: Dict[str, Type[Driver]] = {"rtk": rtk.Driver}
probe_list: Iterable[str]
if driver_name := host.hci_metadata.get("driver"):
# Only probe a single driver
probe_list = [driver_name]
else:
# Probe all drivers
probe_list = driver_classes.keys()
for driver_name in probe_list:
if driver_class := driver_classes.get(driver_name):
logger.debug(f"Probing driver class: {driver_name}")
if driver := await driver_class.for_host(host):
logger.debug(f"Instantiated {driver_name} driver")
return driver
else:
logger.debug(f"Skipping unknown driver class: {driver_name}")
return None
+45
View File
@@ -0,0 +1,45 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common types for drivers.
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import abc
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Driver(abc.ABC):
"""Base class for drivers."""
@staticmethod
async def for_host(_host):
"""Return a driver instance for a host.
Args:
host: Host object for which a driver should be created.
Returns:
A Driver instance if a driver should be instantiated for this host, or
None if no driver instance of this class is needed.
"""
return None
@abc.abstractmethod
async def init_controller(self):
"""Initialize the controller."""
+11 -4
View File
@@ -41,7 +41,7 @@ from bumble.hci import (
HCI_Reset_Command,
HCI_Read_Local_Version_Information_Command,
)
from bumble.drivers import common
# -----------------------------------------------------------------------------
# Logging
@@ -285,7 +285,7 @@ class Firmware:
)
class Driver:
class Driver(common.Driver):
@dataclass
class DriverInfo:
rom: int
@@ -470,8 +470,12 @@ class Driver:
logger.debug("USB metadata not found")
return False
vendor_id = host.hci_metadata.get("vendor_id", None)
product_id = host.hci_metadata.get("product_id", None)
if host.hci_metadata.get('driver') == 'rtk':
# Forced driver
return True
vendor_id = host.hci_metadata.get("vendor_id")
product_id = host.hci_metadata.get("product_id")
if vendor_id is None or product_id is None:
logger.debug("USB metadata not sufficient")
return False
@@ -486,6 +490,9 @@ class Driver:
@classmethod
async def driver_info_for_host(cls, host):
await host.send_command(HCI_Reset_Command(), check_result=True)
host.ready = True # Needed to let the host know the controller is ready.
response = await host.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True
)
+24 -19
View File
@@ -1963,25 +1963,15 @@ Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_
# -----------------------------------------------------------------------------
class OwnAddressType:
class OwnAddressType(enum.IntEnum):
PUBLIC = 0
RANDOM = 1
RESOLVABLE_OR_PUBLIC = 2
RESOLVABLE_OR_RANDOM = 3
TYPE_NAMES = {
PUBLIC: 'PUBLIC',
RANDOM: 'RANDOM',
RESOLVABLE_OR_PUBLIC: 'RESOLVABLE_OR_PUBLIC',
RESOLVABLE_OR_RANDOM: 'RESOLVABLE_OR_RANDOM',
}
@staticmethod
def type_name(type_id):
return name_or_number(OwnAddressType.TYPE_NAMES, type_id)
# pylint: disable-next=unnecessary-lambda
TYPE_SPEC = {'size': 1, 'mapper': lambda x: OwnAddressType.type_name(x)}
@classmethod
def type_spec(cls):
return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name}
# -----------------------------------------------------------------------------
@@ -3374,7 +3364,7 @@ class HCI_LE_Set_Random_Address_Command(HCI_Command):
),
},
),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_channel_map', 1),
@@ -3467,7 +3457,7 @@ class HCI_LE_Set_Advertising_Enable_Command(HCI_Command):
('le_scan_type', 1),
('le_scan_interval', 2),
('le_scan_window', 2),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('scanning_filter_policy', 1),
]
)
@@ -3506,7 +3496,7 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command):
('initiator_filter_policy', 1),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('connection_interval_min', 2),
('connection_interval_max', 2),
('max_latency', 2),
@@ -3913,7 +3903,7 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
),
},
),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_filter_policy', 1),
@@ -4309,7 +4299,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('initiator_filter_policy:', self.initiator_filter_policy),
(
'own_address_type: ',
OwnAddressType.type_name(self.own_address_type),
OwnAddressType(self.own_address_type).name,
),
(
'peer_address_type: ',
@@ -5190,6 +5180,21 @@ HCI_LE_Meta_Event.subevent_classes[
] = HCI_LE_Extended_Advertising_Report_Event
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', 1),
('advertising_handle', 1),
('connection_handle', 2),
('number_completed_extended_advertising_events', 1),
]
)
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.18 LE Advertising Set Terminated Event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event([('connection_handle', 2), ('channel_selection_algorithm', 1)])
class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event):
+29 -17
View File
@@ -21,7 +21,7 @@ import collections
import logging
import struct
from typing import Optional, TYPE_CHECKING, Dict, Callable, Awaitable, cast
from typing import Any, Awaitable, Callable, Dict, Optional, Union, cast, TYPE_CHECKING
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
@@ -124,7 +124,8 @@ class Connection:
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
acl_packet_queue: collections.deque[HCI_AclDataPacket]
hci_sink: TransportSink
hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
]
@@ -137,9 +138,8 @@ class Host(AbortableEventEmitter):
) -> None:
super().__init__()
self.hci_metadata = None
self.hci_metadata = {}
self.ready = False # True when we can accept incoming packets
self.reset_done = False
self.connections = {} # Connections, by connection handle
self.pending_command = None
self.pending_response = None
@@ -162,10 +162,7 @@ class Host(AbortableEventEmitter):
# Connect to the source and sink if specified
if controller_source:
controller_source.set_packet_sink(self)
self.hci_metadata = getattr(
controller_source, 'metadata', self.hci_metadata
)
self.set_packet_source(controller_source)
if controller_sink:
self.set_packet_sink(controller_sink)
@@ -200,17 +197,21 @@ class Host(AbortableEventEmitter):
self.ready = False
await self.flush()
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
# Instantiate and init a driver for the host if needed.
# NOTE: we don't keep a reference to the driver here, because we don't
# currently have a need for the driver later on. But if the driver interface
# evolves, it may be required, then, to store a reference to the driver in
# an object property.
reset_needed = True
if driver_factory is not None:
if driver := await driver_factory(self):
await driver.init_controller()
reset_needed = False
# Send a reset command unless a driver has already done so.
if reset_needed:
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True
@@ -313,25 +314,28 @@ class Host(AbortableEventEmitter):
)
)
self.reset_done = True
@property
def controller(self) -> TransportSink:
def controller(self) -> Optional[TransportSink]:
return self.hci_sink
@controller.setter
def controller(self, controller):
def controller(self, controller) -> None:
self.set_packet_sink(controller)
if controller:
controller.set_packet_sink(self)
def set_packet_sink(self, sink: TransportSink) -> None:
def set_packet_sink(self, sink: Optional[TransportSink]) -> None:
self.hci_sink = sink
def set_packet_source(self, source: TransportSource) -> None:
source.set_packet_sink(self)
self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
def send_hci_packet(self, packet: HCI_Packet) -> None:
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
self.hci_sink.on_packet(bytes(packet))
if self.hci_sink:
self.hci_sink.on_packet(bytes(packet))
async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
@@ -721,6 +725,14 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event)
def on_hci_le_advertising_set_terminated_event(self, event):
self.emit(
'advertising_set_termination',
event.status,
event.advertising_handle,
event.connection_handle,
)
def on_hci_le_cis_request_event(self, event):
self.emit(
'cis_request',
+49 -21
View File
@@ -18,6 +18,7 @@
from contextlib import asynccontextmanager
import logging
import os
from typing import Optional
from .common import Transport, AsyncPipeSink, SnoopingTransport
from ..snoop import create_snooper
@@ -52,8 +53,16 @@ def _wrap_transport(transport: Transport) -> Transport:
async def open_transport(name: str) -> Transport:
"""
Open a transport by name.
The name must be <type>:<parameters>
Where <parameters> depend on the type (and may be empty for some types).
The name must be <type>:<metadata><parameters>
Where <parameters> depend on the type (and may be empty for some types), and
<metadata> is either omitted, or a ,-separated list of <key>=<value> pairs,
enclosed in [].
If there are not metadata or parameter, the : after the <type> may be omitted.
Examples:
* usb:0
* usb:[driver=rtk]0
* android-netsim
The supported types are:
* serial
* udp
@@ -71,87 +80,106 @@ async def open_transport(name: str) -> Transport:
* android-netsim
"""
return _wrap_transport(await _open_transport(name))
scheme, *tail = name.split(':', 1)
spec = tail[0] if tail else None
if spec:
# Metadata may precede the spec
if spec.startswith('['):
metadata_str, *tail = spec[1:].split(']')
spec = tail[0] if tail else None
metadata = dict([entry.split('=') for entry in metadata_str.split(',')])
else:
metadata = None
transport = await _open_transport(scheme, spec)
if metadata:
transport.source.metadata = { # type: ignore[attr-defined]
**metadata,
**getattr(transport.source, 'metadata', {}),
}
# pylint: disable=line-too-long
logger.debug(f'HCI metadata: {transport.source.metadata}') # type: ignore[attr-defined]
return _wrap_transport(transport)
# -----------------------------------------------------------------------------
async def _open_transport(name: str) -> Transport:
async def _open_transport(scheme: str, spec: Optional[str]) -> Transport:
# pylint: disable=import-outside-toplevel
# pylint: disable=too-many-return-statements
scheme, *spec = name.split(':', 1)
if scheme == 'serial' and spec:
from .serial import open_serial_transport
return await open_serial_transport(spec[0])
return await open_serial_transport(spec)
if scheme == 'udp' and spec:
from .udp import open_udp_transport
return await open_udp_transport(spec[0])
return await open_udp_transport(spec)
if scheme == 'tcp-client' and spec:
from .tcp_client import open_tcp_client_transport
return await open_tcp_client_transport(spec[0])
return await open_tcp_client_transport(spec)
if scheme == 'tcp-server' and spec:
from .tcp_server import open_tcp_server_transport
return await open_tcp_server_transport(spec[0])
return await open_tcp_server_transport(spec)
if scheme == 'ws-client' and spec:
from .ws_client import open_ws_client_transport
return await open_ws_client_transport(spec[0])
return await open_ws_client_transport(spec)
if scheme == 'ws-server' and spec:
from .ws_server import open_ws_server_transport
return await open_ws_server_transport(spec[0])
return await open_ws_server_transport(spec)
if scheme == 'pty':
from .pty import open_pty_transport
return await open_pty_transport(spec[0] if spec else None)
return await open_pty_transport(spec)
if scheme == 'file':
from .file import open_file_transport
assert spec is not None
return await open_file_transport(spec[0])
return await open_file_transport(spec)
if scheme == 'vhci':
from .vhci import open_vhci_transport
return await open_vhci_transport(spec[0] if spec else None)
return await open_vhci_transport(spec)
if scheme == 'hci-socket':
from .hci_socket import open_hci_socket_transport
return await open_hci_socket_transport(spec[0] if spec else None)
return await open_hci_socket_transport(spec)
if scheme == 'usb':
from .usb import open_usb_transport
assert spec is not None
return await open_usb_transport(spec[0])
assert spec
return await open_usb_transport(spec)
if scheme == 'pyusb':
from .pyusb import open_pyusb_transport
assert spec is not None
return await open_pyusb_transport(spec[0])
assert spec
return await open_pyusb_transport(spec)
if scheme == 'android-emulator':
from .android_emulator import open_android_emulator_transport
return await open_android_emulator_transport(spec[0] if spec else None)
return await open_android_emulator_transport(spec)
if scheme == 'android-netsim':
from .android_netsim import open_android_netsim_transport
return await open_android_netsim_transport(spec[0] if spec else None)
return await open_android_netsim_transport(spec)
raise ValueError('unknown transport scheme')
+1 -1
View File
@@ -69,7 +69,7 @@ async def open_android_emulator_transport(spec: Optional[str]) -> Transport:
mode = 'host'
server_host = 'localhost'
server_port = '8554'
if spec is not None:
if spec:
params = spec.split(',')
for param in params:
if param.startswith('mode='):
+1 -1
View File
@@ -21,7 +21,7 @@ import struct
import asyncio
import logging
import io
from typing import ContextManager, Tuple, Optional, Protocol, Dict
from typing import Any, ContextManager, Tuple, Optional, Protocol, Dict
from bumble import hci
from bumble.colors import color
+1 -4
View File
@@ -59,10 +59,7 @@ async def open_hci_socket_transport(spec: Optional[str]) -> Transport:
) from error
# Compute the adapter index
if spec is None:
adapter_index = 0
else:
adapter_index = int(spec)
adapter_index = int(spec) if spec else 0
# Bind the socket
# NOTE: since Python doesn't support binding with the required address format (yet),
+9
View File
@@ -5,6 +5,15 @@ Some Bluetooth controllers require a driver to function properly.
This may include, for instance, loading a Firmware image or patch,
loading a configuration.
By default, drivers will be automatically probed to determine if they should be
used with particular HCI controller.
When the transport for an HCI controller is instantiated from a transport name,
a driver may also be forced by specifying ``driver=<driver-name>`` in the optional
metadata portion of the transport name. For example,
``usb:[driver=-rtk]0`` indicates that the ``rtk`` driver should be used with the
first USB device, even if a normal probe would not have selected it based on the
USB vendor ID and product ID.
Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
+5 -2
View File
@@ -1,13 +1,16 @@
REALTEK DRIVER
==============
This driver supports loading firmware images and optional config data to
This driver supports loading firmware images and optional config data to
USB dongles with a Realtek chipset.
A number of USB dongles are supported, but likely not all.
When using a USB dongle, the USB product ID and manufacturer ID are used
When using a USB dongle, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=rtk`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=rtk]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for those files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_RTK_FIRMWARE_DIR`
+174 -1
View File
@@ -20,8 +20,14 @@ import logging
import os
from types import LambdaType
import pytest
from unittest import mock
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import Connection, Device
from bumble.host import Host
from bumble.hci import (
@@ -30,6 +36,7 @@ from bumble.hci import (
HCI_CREATE_CONNECTION_COMMAND,
HCI_SUCCESS,
Address,
OwnAddressType,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
@@ -232,6 +239,172 @@ async def test_flush():
pass
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_legacy_advertising()
assert device.legacy_advertiser
# Stop advertising
await advertiser.stop()
assert not device.legacy_advertiser
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_legacy_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
advertiser = await device.start_legacy_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.start_legacy_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_legacy_advertising.assert_called_with(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_legacy_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_extended_advertising()
assert device.extended_advertisers
# Stop advertising
await advertiser.stop()
assert not device.extended_advertisers
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(
own_address_type=own_address_type
)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
device = Device(host=Host(None, None))