Compare commits

...

19 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 98ed772e8a address PR comments and add some typing 2023-12-11 17:52:04 -08:00
Gilles Boccon-Gibod b083cc99ad fix spec parsing 2023-12-08 18:57:02 -08:00
Gilles Boccon-Gibod 62a8ced447 support drivers that can't use reset directly. 2023-12-08 17:28:57 -08:00
zxzxwu 698d947d85 Merge pull request #366 from zxzxwu/extadv
Add advertiser classes and handle adv set terminated events
2023-12-08 09:52:42 +08:00
Josh Wu ff6528d2bf Add Advertising unit tests 2023-12-08 01:38:01 +08:00
Josh Wu 72ac75a98d Add advertiser classes and handle adv set terminated events
* Convert hci.OwnAddressType to enum
* Add LegacyAdvertiser and ExtendedAdvertiser classes
* Rename start/stop_advertising() => start/stop_legacy_advertising()
* Handle HCI_Advertising_Set_Terminated
* Properly restart advertisement on disconnection
2023-12-07 15:51:51 +08:00
zxzxwu 88b4cbdf1a Merge pull request #364 from zxzxwu/iso
Fix ISO packet issues
2023-12-05 00:41:56 +08:00
Josh Wu d6afbc6f4e Fix ISO packet issues 2023-12-04 20:31:11 +08:00
Gilles Boccon-Gibod fc90de3e7b Merge pull request #351 from google/dependabot/cargo/rust/openssl-0.10.60
Bump openssl from 0.10.57 to 0.10.60 in /rust
2023-12-04 00:41:27 -08:00
Gilles Boccon-Gibod 847c2ef114 Merge pull request #362 from google/gbg/more-le-features-constants
a few more HCI constants from the spec
2023-12-04 00:38:02 -08:00
Gilles Boccon-Gibod a0bf0c1f4d Merge pull request #363 from google/gbg/android-remote-proxy-cli
android remote proxy cli
2023-12-04 00:37:49 -08:00
Gilles Boccon-Gibod 843466c822 a few more constants from the spec 2023-12-03 17:16:25 -08:00
zxzxwu 3adcc8be09 Merge pull request #360 from zxzxwu/hci
Remove # type: ignore[call-arg] in HCI_Command builders
2023-12-03 19:18:04 +08:00
zxzxwu c853d56302 Merge pull request #361 from zxzxwu/hci-bug
Fix typo
2023-12-03 04:22:59 +08:00
Josh Wu dc97be5b35 Fix typo 2023-12-02 23:42:21 +08:00
zxzxwu 73dbdfff9f Merge pull request #356 from zxzxwu/bap
Add Published Audio Capabilities Service
2023-12-02 23:34:57 +08:00
Josh Wu dff14e1258 Add Published Audio Capabilities Service 2023-12-02 23:16:37 +08:00
Josh Wu 10a3833893 Remove # type: ignore[call-arg] in HCI_Command builders 2023-12-02 19:18:54 +08:00
dependabot[bot] 7eb493990f Bump openssl from 0.10.57 to 0.10.60 in /rust
Bumps [openssl](https://github.com/sfackler/rust-openssl) from 0.10.57 to 0.10.60.
- [Release notes](https://github.com/sfackler/rust-openssl/releases)
- [Commits](https://github.com/sfackler/rust-openssl/compare/openssl-v0.10.57...openssl-v0.10.60)

---
updated-dependencies:
- dependency-name: openssl
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-28 21:43:18 +00:00
21 changed files with 1509 additions and 258 deletions
+234 -104
View File
@@ -437,6 +437,38 @@ class AdvertisingType(IntEnum):
) )
# -----------------------------------------------------------------------------
@dataclass
class LegacyAdvertiser:
device: Device
advertising_type: AdvertisingType
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]
async def stop(self) -> None:
await self.device.stop_legacy_advertising()
# -----------------------------------------------------------------------------
@dataclass
class ExtendedAdvertiser(CompositeEventEmitter):
device: Device
handle: int
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
own_address_type: OwnAddressType
auto_restart: bool
advertising_data: Optional[bytes]
scan_response_data: Optional[bytes]
def __post_init__(self) -> None:
super().__init__()
async def stop(self) -> None:
await self.device.stop_extended_advertising(self.handle)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class LePhyOptions: class LePhyOptions:
# Coded PHY preference # Coded PHY preference
@@ -658,6 +690,9 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int] pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int] pairing_peer_authentication_requirements: Optional[int]
advertiser_after_disconnection: Union[
LegacyAdvertiser, ExtendedAdvertiser, None
] = None
@composite_listener @composite_listener
class Listener: class Listener:
@@ -1063,7 +1098,8 @@ class Device(CompositeEventEmitter):
] ]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration config: DeviceConfiguration
extended_advertising_handles: Set[int] legacy_advertiser: Optional[LegacyAdvertiser]
extended_advertisers: Dict[int, ExtendedAdvertiser]
sco_links: Dict[int, ScoLink] sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink] cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]] _pending_cis: Dict[int, Tuple[int, int]]
@@ -1141,10 +1177,7 @@ class Device(CompositeEventEmitter):
self._host = None self._host = None
self.powered_on = False self.powered_on = False
self.advertising = False
self.advertising_type = None
self.auto_restart_inquiry = True self.auto_restart_inquiry = True
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self) self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self) self.sdp_server = sdp.Server(self)
@@ -1168,10 +1201,10 @@ class Device(CompositeEventEmitter):
self.classic_pending_accepts = { self.classic_pending_accepts = {
Address.ANY: [] Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY } # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set() self.legacy_advertiser = None
self.extended_advertisers = {}
# Own address type cache # Own address type cache
self.advertising_own_address_type = None
self.connect_own_address_type = None self.connect_own_address_type = None
# Use the initial config or a default # Use the initial config or a default
@@ -1432,7 +1465,7 @@ class Device(CompositeEventEmitter):
await self.host.reset() await self.host.reset()
# Try to get the public address from the controller # Try to get the public address from the controller
response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg] response = await self.send_command(HCI_Read_BD_ADDR_Command())
if response.return_parameters.status == HCI_SUCCESS: if response.return_parameters.status == HCI_SUCCESS:
logger.debug( logger.debug(
color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow') color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')
@@ -1455,7 +1488,7 @@ class Device(CompositeEventEmitter):
HCI_Write_LE_Host_Support_Command( HCI_Write_LE_Host_Support_Command(
le_supported_host=int(self.le_enabled), le_supported_host=int(self.le_enabled),
simultaneous_le_host=int(self.le_simultaneous_enabled), simultaneous_le_host=int(self.le_simultaneous_enabled),
) # type: ignore[call-arg] )
) )
if self.le_enabled: if self.le_enabled:
@@ -1465,7 +1498,7 @@ class Device(CompositeEventEmitter):
if self.host.supports_command(HCI_LE_RAND_COMMAND): if self.host.supports_command(HCI_LE_RAND_COMMAND):
# Get 8 random bytes # Get 8 random bytes
response = await self.send_command( response = await self.send_command(
HCI_LE_Rand_Command(), check_result=True # type: ignore[call-arg] HCI_LE_Rand_Command(), check_result=True
) )
# Ensure the address bytes can be a static random address # Ensure the address bytes can be a static random address
@@ -1486,7 +1519,7 @@ class Device(CompositeEventEmitter):
await self.send_command( await self.send_command(
HCI_LE_Set_Random_Address_Command( HCI_LE_Set_Random_Address_Command(
random_address=self.random_address random_address=self.random_address
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1499,12 +1532,12 @@ class Device(CompositeEventEmitter):
await self.send_command( await self.send_command(
HCI_LE_Set_Address_Resolution_Enable_Command( HCI_LE_Set_Address_Resolution_Enable_Command(
address_resolution_enable=1 address_resolution_enable=1
) # type: ignore[call-arg] )
) )
if self.cis_enabled: if self.cis_enabled:
await self.send_command( await self.send_command(
HCI_LE_Set_Host_Feature_Command( # type: ignore[call-arg] HCI_LE_Set_Host_Feature_Command(
bit_number=( bit_number=(
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
), ),
@@ -1514,20 +1547,20 @@ class Device(CompositeEventEmitter):
if self.classic_enabled: if self.classic_enabled:
await self.send_command( await self.send_command(
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg] HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
) )
await self.send_command( await self.send_command(
HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) # type: ignore[call-arg] HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device)
) )
await self.send_command( await self.send_command(
HCI_Write_Simple_Pairing_Mode_Command( HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled) simple_pairing_mode=int(self.classic_ssp_enabled)
) # type: ignore[call-arg] )
) )
await self.send_command( await self.send_command(
HCI_Write_Secure_Connections_Host_Support_Command( HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled) secure_connections_host_support=int(self.classic_sc_enabled)
) # type: ignore[call-arg] )
) )
await self.set_connectable(self.connectable) await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable) await self.set_discoverable(self.discoverable)
@@ -1551,7 +1584,7 @@ class Device(CompositeEventEmitter):
self.address_resolver = smp.AddressResolver(resolving_keys) self.address_resolver = smp.AddressResolver(resolving_keys)
if self.address_resolution_offload: if self.address_resolution_offload:
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg] await self.send_command(HCI_LE_Clear_Resolving_List_Command())
for irk, address in resolving_keys: for irk, address in resolving_keys:
await self.send_command( await self.send_command(
@@ -1560,7 +1593,7 @@ class Device(CompositeEventEmitter):
peer_identity_address=address, peer_identity_address=address,
peer_irk=irk, peer_irk=irk,
local_irk=self.irk, local_irk=self.irk,
) # type: ignore[call-arg] )
) )
def supports_le_feature(self, feature): def supports_le_feature(self, feature):
@@ -1579,6 +1612,7 @@ class Device(CompositeEventEmitter):
return self.host.supports_le_feature(feature_map[phy]) return self.host.supports_le_feature(feature_map[phy])
@deprecated("Please use start_legacy_advertising.")
async def start_advertising( async def start_advertising(
self, self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
@@ -1586,16 +1620,50 @@ class Device(CompositeEventEmitter):
own_address_type: int = OwnAddressType.RANDOM, own_address_type: int = OwnAddressType.RANDOM,
auto_restart: bool = False, auto_restart: bool = False,
) -> None: ) -> None:
await self.start_legacy_advertising(
advertising_type=advertising_type,
target=target,
own_address_type=OwnAddressType(own_address_type),
auto_restart=auto_restart,
)
async def start_legacy_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[Address] = None,
own_address_type: OwnAddressType = OwnAddressType.RANDOM,
auto_restart: bool = False,
advertising_data: Optional[bytes] = None,
scan_response_data: Optional[bytes] = None,
) -> LegacyAdvertiser:
"""Starts an legacy advertisement.
Args:
advertising_type: Advertising type passed to HCI_LE_Set_Advertising_Parameters_Command.
target: Directed advertising target. Directed type should be set in advertising_type arg.
own_address_type: own address type to use in the advertising.
auto_restart: whether the advertisement will be restarted after disconnection.
scan_response_data: raw scan response.
advertising_data: raw advertising data.
Returns:
LegacyAdvertiser object containing the metadata of advertisement.
"""
if self.extended_advertisers:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)
# If we're advertising, stop first # If we're advertising, stop first
if self.advertising: if self.legacy_advertiser:
await self.stop_advertising() await self.stop_advertising()
# Set/update the advertising data if the advertising type allows it # Set/update the advertising data if the advertising type allows it
if advertising_type.has_data: if advertising_type.has_data:
await self.send_command( await self.send_command(
HCI_LE_Set_Advertising_Data_Command( HCI_LE_Set_Advertising_Data_Command(
advertising_data=self.advertising_data advertising_data=advertising_data or self.advertising_data or b''
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1603,8 +1671,10 @@ class Device(CompositeEventEmitter):
if advertising_type.is_scannable: if advertising_type.is_scannable:
await self.send_command( await self.send_command(
HCI_LE_Set_Scan_Response_Data_Command( HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data=self.scan_response_data scan_response_data=scan_response_data
), # type: ignore[call-arg] or self.scan_response_data
or b''
),
check_result=True, check_result=True,
) )
@@ -1630,55 +1700,67 @@ class Device(CompositeEventEmitter):
peer_address=peer_address, peer_address=peer_address,
advertising_channel_map=7, advertising_channel_map=7,
advertising_filter_policy=0, advertising_filter_policy=0,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
# Enable advertising # Enable advertising
await self.send_command( await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), # type: ignore[call-arg] HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1),
check_result=True, check_result=True,
) )
self.advertising_type = advertising_type self.legacy_advertiser = LegacyAdvertiser(
self.advertising_own_address_type = own_address_type device=self,
self.advertising = True advertising_type=advertising_type,
self.auto_restart_advertising = auto_restart own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return self.legacy_advertiser
@deprecated("Please use stop_legacy_advertising.")
async def stop_advertising(self) -> None: async def stop_advertising(self) -> None:
await self.stop_legacy_advertising()
async def stop_legacy_advertising(self) -> None:
# Disable advertising # Disable advertising
if self.advertising: if self.legacy_advertiser:
await self.send_command( await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), # type: ignore[call-arg] HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
check_result=True, check_result=True,
) )
self.advertising_type = None self.legacy_advertiser = None
self.advertising_own_address_type = None
self.advertising = False
self.auto_restart_advertising = False
@experimental('Extended Advertising is still experimental - Might be changed soon.') @experimental('Extended Advertising is still experimental - Might be changed soon.')
async def start_extended_advertising( async def start_extended_advertising(
self, self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING, advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY, target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM, own_address_type: OwnAddressType = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None, auto_restart: bool = True,
advertising_data: Optional[bytes] = None, advertising_data: Optional[bytes] = None,
) -> int: scan_response_data: Optional[bytes] = None,
) -> ExtendedAdvertiser:
"""Starts an extended advertising set. """Starts an extended advertising set.
Args: Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg. target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising. own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. auto_restart: whether the advertisement will be restarted after disconnection.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent. advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
scan_response_data: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
Returns: Returns:
Handle of the new advertising set. ExtendedAdvertiser object containing the metadata of advertisement.
""" """
if self.legacy_advertiser:
logger.warning(
'Trying to start Legacy and Extended Advertising at the same time!'
)
adv_handle = -1 adv_handle = -1
# Find a free handle # Find a free handle
@@ -1686,7 +1768,7 @@ class Device(CompositeEventEmitter):
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
): ):
if i not in self.extended_advertising_handles: if i not in self.extended_advertisers:
adv_handle = i adv_handle = i
break break
@@ -1716,7 +1798,7 @@ class Device(CompositeEventEmitter):
secondary_advertising_phy=1, # LE 1M secondary_advertising_phy=1, # LE 1M
advertising_sid=0, advertising_sid=0,
scan_request_notification_enable=0, scan_request_notification_enable=0,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1728,19 +1810,19 @@ class Device(CompositeEventEmitter):
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment fragment_preference=0x01, # Should not fragment
advertising_data=advertising_data, advertising_data=advertising_data,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
# Set the scan response if present # Set the scan response if present
if scan_response is not None: if scan_response_data is not None:
await self.send_command( await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command( HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle, advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response, scan_response_data=scan_response_data,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1752,7 +1834,7 @@ class Device(CompositeEventEmitter):
HCI_LE_Set_Advertising_Set_Random_Address_Command( HCI_LE_Set_Advertising_Set_Random_Address_Command(
advertising_handle=adv_handle, advertising_handle=adv_handle,
random_address=self.random_address, random_address=self.random_address,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1763,19 +1845,27 @@ class Device(CompositeEventEmitter):
advertising_handles=[adv_handle], advertising_handles=[adv_handle],
durations=[0], # Forever durations=[0], # Forever
max_extended_advertising_events=[0], # Infinite max_extended_advertising_events=[0], # Infinite
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
except HCI_Error as error: except HCI_Error as error:
# When any step fails, cleanup the advertising handle. # When any step fails, cleanup the advertising handle.
await self.send_command( await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg] HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=False, check_result=False,
) )
raise error raise error
self.extended_advertising_handles.add(adv_handle) advertiser = self.extended_advertisers[adv_handle] = ExtendedAdvertiser(
return adv_handle device=self,
handle=adv_handle,
advertising_properties=advertising_properties,
own_address_type=own_address_type,
auto_restart=auto_restart,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
return advertiser
@experimental('Extended Advertising is still experimental - Might be changed soon.') @experimental('Extended Advertising is still experimental - Might be changed soon.')
async def stop_extended_advertising(self, adv_handle: int) -> None: async def stop_extended_advertising(self, adv_handle: int) -> None:
@@ -1791,19 +1881,19 @@ class Device(CompositeEventEmitter):
advertising_handles=[adv_handle], advertising_handles=[adv_handle],
durations=[0], durations=[0],
max_extended_advertising_events=[0], max_extended_advertising_events=[0],
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
# Remove advertising set # Remove advertising set
await self.send_command( await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg] HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=True, check_result=True,
) )
self.extended_advertising_handles.remove(adv_handle) del self.extended_advertisers[adv_handle]
@property @property
def is_advertising(self): def is_advertising(self):
return self.advertising return self.legacy_advertiser or self.extended_advertisers
async def start_scanning( async def start_scanning(
self, self,
@@ -1864,7 +1954,7 @@ class Device(CompositeEventEmitter):
scan_types=[scan_type] * scanning_phy_count, scan_types=[scan_type] * scanning_phy_count,
scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count, scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
scan_windows=[int(scan_window / 0.625)] * scanning_phy_count, scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1875,7 +1965,7 @@ class Device(CompositeEventEmitter):
filter_duplicates=1 if filter_duplicates else 0, filter_duplicates=1 if filter_duplicates else 0,
duration=0, # TODO allow other values duration=0, # TODO allow other values
period=0, # TODO allow other values period=0, # TODO allow other values
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
else: else:
@@ -1893,7 +1983,7 @@ class Device(CompositeEventEmitter):
le_scan_window=int(scan_window / 0.625), le_scan_window=int(scan_window / 0.625),
own_address_type=own_address_type, own_address_type=own_address_type,
scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY, scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1901,7 +1991,7 @@ class Device(CompositeEventEmitter):
await self.send_command( await self.send_command(
HCI_LE_Set_Scan_Enable_Command( HCI_LE_Set_Scan_Enable_Command(
le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0 le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -1914,12 +2004,12 @@ class Device(CompositeEventEmitter):
await self.send_command( await self.send_command(
HCI_LE_Set_Extended_Scan_Enable_Command( HCI_LE_Set_Extended_Scan_Enable_Command(
enable=0, filter_duplicates=0, duration=0, period=0 enable=0, filter_duplicates=0, duration=0, period=0
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
else: else:
await self.send_command( await self.send_command(
HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), # type: ignore[call-arg] HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0),
check_result=True, check_result=True,
) )
@@ -1939,7 +2029,7 @@ class Device(CompositeEventEmitter):
async def start_discovery(self, auto_restart: bool = True) -> None: async def start_discovery(self, auto_restart: bool = True) -> None:
await self.send_command( await self.send_command(
HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), # type: ignore[call-arg] HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE),
check_result=True, check_result=True,
) )
@@ -1948,7 +2038,7 @@ class Device(CompositeEventEmitter):
lap=HCI_GENERAL_INQUIRY_LAP, lap=HCI_GENERAL_INQUIRY_LAP,
inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH, inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH,
num_responses=0, # Unlimited number of responses. num_responses=0, # Unlimited number of responses.
) # type: ignore[call-arg] )
) )
if response.status != HCI_Command_Status_Event.PENDING: if response.status != HCI_Command_Status_Event.PENDING:
self.discovering = False self.discovering = False
@@ -1959,7 +2049,7 @@ class Device(CompositeEventEmitter):
async def stop_discovery(self) -> None: async def stop_discovery(self) -> None:
if self.discovering: if self.discovering:
await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) # type: ignore[call-arg] await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
self.auto_restart_inquiry = True self.auto_restart_inquiry = True
self.discovering = False self.discovering = False
@@ -2007,7 +2097,7 @@ class Device(CompositeEventEmitter):
await self.send_command( await self.send_command(
HCI_Write_Extended_Inquiry_Response_Command( HCI_Write_Extended_Inquiry_Response_Command(
fec_required=0, extended_inquiry_response=self.inquiry_response fec_required=0, extended_inquiry_response=self.inquiry_response
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
await self.set_scan_enable( await self.set_scan_enable(
@@ -2196,7 +2286,7 @@ class Device(CompositeEventEmitter):
supervision_timeouts=supervision_timeouts, supervision_timeouts=supervision_timeouts,
min_ce_lengths=min_ce_lengths, min_ce_lengths=min_ce_lengths,
max_ce_lengths=max_ce_lengths, max_ce_lengths=max_ce_lengths,
) # type: ignore[call-arg] )
) )
else: else:
if HCI_LE_1M_PHY not in connection_parameters_preferences: if HCI_LE_1M_PHY not in connection_parameters_preferences:
@@ -2225,7 +2315,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=int(prefs.supervision_timeout / 10), supervision_timeout=int(prefs.supervision_timeout / 10),
min_ce_length=int(prefs.min_ce_length / 0.625), min_ce_length=int(prefs.min_ce_length / 0.625),
max_ce_length=int(prefs.max_ce_length / 0.625), max_ce_length=int(prefs.max_ce_length / 0.625),
) # type: ignore[call-arg] )
) )
else: else:
# Save pending connection # Save pending connection
@@ -2242,7 +2332,7 @@ class Device(CompositeEventEmitter):
clock_offset=0x0000, clock_offset=0x0000,
allow_role_switch=0x01, allow_role_switch=0x01,
reserved=0, reserved=0,
) # type: ignore[call-arg] )
) )
if result.status != HCI_Command_Status_Event.PENDING: if result.status != HCI_Command_Status_Event.PENDING:
@@ -2261,10 +2351,10 @@ class Device(CompositeEventEmitter):
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
if transport == BT_LE_TRANSPORT: if transport == BT_LE_TRANSPORT:
await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) # type: ignore[call-arg] await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
else: else:
await self.send_command( await self.send_command(
HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) # type: ignore[call-arg] HCI_Create_Connection_Cancel_Command(bd_addr=peer_address)
) )
try: try:
@@ -2378,7 +2468,7 @@ class Device(CompositeEventEmitter):
try: try:
# Accept connection request # Accept connection request
await self.send_command( await self.send_command(
HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) # type: ignore[call-arg] HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role)
) )
# Wait for connection complete # Wait for connection complete
@@ -2445,7 +2535,7 @@ class Device(CompositeEventEmitter):
# Request a disconnection # Request a disconnection
result = await self.send_command( result = await self.send_command(
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) # type: ignore[call-arg] HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason)
) )
try: try:
@@ -2476,7 +2566,7 @@ class Device(CompositeEventEmitter):
connection_handle=connection.handle, connection_handle=connection.handle,
tx_octets=tx_octets, tx_octets=tx_octets,
tx_time=tx_time, tx_time=tx_time,
), # type: ignore[call-arg] ),
check_result=True, check_result=True,
) )
@@ -2522,7 +2612,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=supervision_timeout, supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length, min_ce_length=min_ce_length,
max_ce_length=max_ce_length, max_ce_length=max_ce_length,
) # type: ignore[call-arg] )
) )
if result.status != HCI_Command_Status_Event.PENDING: if result.status != HCI_Command_Status_Event.PENDING:
raise HCI_StatusError(result) raise HCI_StatusError(result)
@@ -2850,7 +2940,7 @@ class Device(CompositeEventEmitter):
try: try:
result = await self.send_command( result = await self.send_command(
HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) # type: ignore[call-arg] HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role)
) )
if result.status != HCI_COMMAND_STATUS_PENDING: if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning( logger.warning(
@@ -2892,7 +2982,7 @@ class Device(CompositeEventEmitter):
page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2, page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
reserved=0, reserved=0,
clock_offset=0, # TODO investigate non-0 values clock_offset=0, # TODO investigate non-0 values
) # type: ignore[call-arg] )
) )
if result.status != HCI_COMMAND_STATUS_PENDING: if result.status != HCI_COMMAND_STATUS_PENDING:
@@ -2938,7 +3028,7 @@ class Device(CompositeEventEmitter):
num_cis = len(cis_id) num_cis = len(cis_id)
response = await self.send_command( response = await self.send_command(
HCI_LE_Set_CIG_Parameters_Command( # type: ignore[call-arg] HCI_LE_Set_CIG_Parameters_Command(
cig_id=cig_id, cig_id=cig_id,
sdu_interval_c_to_p=sdu_interval[0], sdu_interval_c_to_p=sdu_interval[0],
sdu_interval_p_to_c=sdu_interval[1], sdu_interval_p_to_c=sdu_interval[1],
@@ -2982,7 +3072,7 @@ class Device(CompositeEventEmitter):
) )
result = await self.send_command( result = await self.send_command(
HCI_LE_Create_CIS_Command( # type: ignore[call-arg] HCI_LE_Create_CIS_Command(
cis_connection_handle=[p[0] for p in cis_acl_pairs], cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs], acl_connection_handle=[p[1] for p in cis_acl_pairs],
), ),
@@ -3015,9 +3105,7 @@ class Device(CompositeEventEmitter):
@experimental('Only for testing.') @experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink: async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command( result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command( # type: ignore[call-arg] HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
connection_handle=handle
),
) )
if result.status != HCI_COMMAND_STATUS_PENDING: if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning( logger.warning(
@@ -3045,9 +3133,7 @@ class Device(CompositeEventEmitter):
reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None: ) -> None:
result = await self.send_command( result = await self.send_command(
HCI_LE_Reject_CIS_Request_Command( # type: ignore[call-arg] HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason),
connection_handle=handle, reason=reason
),
) )
if result.status != HCI_COMMAND_STATUS_PENDING: if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning( logger.warning(
@@ -3148,13 +3234,18 @@ class Device(CompositeEventEmitter):
# Guess which own address type is used for this connection. # Guess which own address type is used for this connection.
# This logic is somewhat correct but may need to be improved # This logic is somewhat correct but may need to be improved
# when multiple advertising are run simultaneously. # when multiple advertising are run simultaneously.
advertiser = None
if self.connect_own_address_type is not None: if self.connect_own_address_type is not None:
own_address_type = self.connect_own_address_type own_address_type = self.connect_own_address_type
elif self.legacy_advertiser:
own_address_type = self.legacy_advertiser.own_address_type
# Store advertiser for restarting - it's only required for legacy, since
# extended advertisement produces HCI_Advertising_Set_Terminated.
if self.legacy_advertiser.auto_restart:
advertiser = self.legacy_advertiser
else: else:
own_address_type = self.advertising_own_address_type # For extended advertisement, determining own address type later.
own_address_type = OwnAddressType.RANDOM
# We are no longer advertising
self.advertising = False
if own_address_type in ( if own_address_type in (
OwnAddressType.PUBLIC, OwnAddressType.PUBLIC,
@@ -3176,6 +3267,7 @@ class Device(CompositeEventEmitter):
connection_parameters, connection_parameters,
ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
) )
connection.advertiser_after_disconnection = advertiser
self.connections[connection_handle] = connection self.connections[connection_handle] = connection
# If supported, read which PHY we're connected with before # If supported, read which PHY we're connected with before
@@ -3207,10 +3299,10 @@ class Device(CompositeEventEmitter):
# For directed advertising, this means a timeout # For directed advertising, this means a timeout
if ( if (
transport == BT_LE_TRANSPORT transport == BT_LE_TRANSPORT
and self.advertising and self.legacy_advertiser
and self.advertising_type.is_directed and self.legacy_advertiser.advertising_type.is_directed
): ):
self.advertising = False self.legacy_advertiser = None
# Notify listeners # Notify listeners
error = core.ConnectionError( error = core.ConnectionError(
@@ -3272,16 +3364,30 @@ class Device(CompositeEventEmitter):
self.gatt_server.on_disconnection(connection) self.gatt_server.on_disconnection(connection)
# Restart advertising if auto-restart is enabled # Restart advertising if auto-restart is enabled
if self.auto_restart_advertising: if advertiser := connection.advertiser_after_disconnection:
logger.debug('restarting advertising') logger.debug('restarting advertising')
self.abort_on( if isinstance(advertiser, LegacyAdvertiser):
'flush', self.abort_on(
self.start_advertising( 'flush',
advertising_type=self.advertising_type, # type: ignore[arg-type] self.start_legacy_advertising(
own_address_type=self.advertising_own_address_type, # type: ignore[arg-type] advertising_type=advertiser.advertising_type,
auto_restart=True, own_address_type=advertiser.own_address_type,
), advertising_data=advertiser.advertising_data,
) scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif isinstance(advertiser, ExtendedAdvertiser):
self.abort_on(
'flush',
self.start_extended_advertising(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True,
),
)
elif sco_link := self.sco_links.pop(connection_handle, None): elif sco_link := self.sco_links.pop(connection_handle, None):
sco_link.emit('disconnection', reason) sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None): elif cis_link := self.cis_links.pop(connection_handle, None):
@@ -3439,7 +3545,7 @@ class Device(CompositeEventEmitter):
try: try:
if await connection.abort_on('disconnection', method()): if await connection.abort_on('disconnection', method()):
await self.host.send_command( await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg] HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address bd_addr=connection.peer_address
) )
) )
@@ -3448,7 +3554,7 @@ class Device(CompositeEventEmitter):
logger.warning(f'exception while confirming: {error}') logger.warning(f'exception while confirming: {error}')
await self.host.send_command( await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg] HCI_User_Confirmation_Request_Negative_Reply_Command(
bd_addr=connection.peer_address bd_addr=connection.peer_address
) )
) )
@@ -3469,7 +3575,7 @@ class Device(CompositeEventEmitter):
) )
if number is not None: if number is not None:
await self.host.send_command( await self.host.send_command(
HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg] HCI_User_Passkey_Request_Reply_Command(
bd_addr=connection.peer_address, numeric_value=number bd_addr=connection.peer_address, numeric_value=number
) )
) )
@@ -3478,7 +3584,7 @@ class Device(CompositeEventEmitter):
logger.warning(f'exception while asking for pass-key: {error}') logger.warning(f'exception while asking for pass-key: {error}')
await self.host.send_command( await self.host.send_command(
HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg] HCI_User_Passkey_Request_Negative_Reply_Command(
bd_addr=connection.peer_address bd_addr=connection.peer_address
) )
) )
@@ -3604,6 +3710,30 @@ class Device(CompositeEventEmitter):
if sco_link := self.sco_links.get(sco_handle, None): if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet) sco_link.emit('pdu', packet)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_advertising_set_termination(
self,
status: int,
advertising_handle: int,
connection_handle: int,
) -> None:
if status == HCI_SUCCESS:
connection = self.lookup_connection(connection_handle)
if advertiser := self.extended_advertisers.pop(advertising_handle, None):
if connection:
if advertiser.auto_restart:
connection.advertiser_after_disconnection = advertiser
if advertiser.own_address_type in (
OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC,
):
connection.self_address = self.public_address
else:
connection.self_address = self.random_address
advertiser.emit('termination', status)
# [LE only] # [LE only]
@host_event_handler @host_event_handler
@with_connection_from_handle @with_connection_from_handle
+28 -32
View File
@@ -19,12 +19,17 @@ like loading firmware after a cold start.
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import abc from __future__ import annotations
import logging import logging
import pathlib import pathlib
import platform import platform
from . import rtk from typing import Dict, Iterable, Optional, Type, TYPE_CHECKING
from . import rtk
from .common import Driver
if TYPE_CHECKING:
from bumble.host import Host
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -32,40 +37,31 @@ from . import rtk
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Driver(abc.ABC):
"""Base class for drivers."""
@staticmethod
async def for_host(_host):
"""Return a driver instance for a host.
Args:
host: Host object for which a driver should be created.
Returns:
A Driver instance if a driver should be instantiated for this host, or
None if no driver instance of this class is needed.
"""
return None
@abc.abstractmethod
async def init_controller(self):
"""Initialize the controller."""
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Functions # Functions
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def get_driver_for_host(host): async def get_driver_for_host(host: Host) -> Optional[Driver]:
"""Probe all known diver classes until one returns a valid instance for a host, """Probe diver classes until one returns a valid instance for a host, or none is
or none is found. found.
If a "driver" HCI metadata entry is present, only that driver class will be probed.
""" """
if driver := await rtk.Driver.for_host(host): driver_classes: Dict[str, Type[Driver]] = {"rtk": rtk.Driver}
logger.debug("Instantiated RTK driver") probe_list: Iterable[str]
return driver if driver_name := host.hci_metadata.get("driver"):
# Only probe a single driver
probe_list = [driver_name]
else:
# Probe all drivers
probe_list = driver_classes.keys()
for driver_name in probe_list:
if driver_class := driver_classes.get(driver_name):
logger.debug(f"Probing driver class: {driver_name}")
if driver := await driver_class.for_host(host):
logger.debug(f"Instantiated {driver_name} driver")
return driver
else:
logger.debug(f"Skipping unknown driver class: {driver_name}")
return None return None
+45
View File
@@ -0,0 +1,45 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common types for drivers.
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import abc
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Driver(abc.ABC):
"""Base class for drivers."""
@staticmethod
async def for_host(_host):
"""Return a driver instance for a host.
Args:
host: Host object for which a driver should be created.
Returns:
A Driver instance if a driver should be instantiated for this host, or
None if no driver instance of this class is needed.
"""
return None
@abc.abstractmethod
async def init_controller(self):
"""Initialize the controller."""
+11 -4
View File
@@ -41,7 +41,7 @@ from bumble.hci import (
HCI_Reset_Command, HCI_Reset_Command,
HCI_Read_Local_Version_Information_Command, HCI_Read_Local_Version_Information_Command,
) )
from bumble.drivers import common
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -285,7 +285,7 @@ class Firmware:
) )
class Driver: class Driver(common.Driver):
@dataclass @dataclass
class DriverInfo: class DriverInfo:
rom: int rom: int
@@ -470,8 +470,12 @@ class Driver:
logger.debug("USB metadata not found") logger.debug("USB metadata not found")
return False return False
vendor_id = host.hci_metadata.get("vendor_id", None) if host.hci_metadata.get('driver') == 'rtk':
product_id = host.hci_metadata.get("product_id", None) # Forced driver
return True
vendor_id = host.hci_metadata.get("vendor_id")
product_id = host.hci_metadata.get("product_id")
if vendor_id is None or product_id is None: if vendor_id is None or product_id is None:
logger.debug("USB metadata not sufficient") logger.debug("USB metadata not sufficient")
return False return False
@@ -486,6 +490,9 @@ class Driver:
@classmethod @classmethod
async def driver_info_for_host(cls, host): async def driver_info_for_host(cls, host):
await host.send_command(HCI_Reset_Command(), check_result=True)
host.ready = True # Needed to let the host know the controller is ready.
response = await host.send_command( response = await host.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True HCI_Read_Local_Version_Information_Command(), check_result=True
) )
+99 -64
View File
@@ -561,6 +561,12 @@ HCI_LE_TRANSMITTER_TEST_V4_COMMAND = hci_c
HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_command_op_code(0x08, 0x007C) HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_command_op_code(0x08, 0x007C)
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D) HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D)
HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E) HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E)
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x007F)
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND = hci_command_op_code(0x08, 0x0082)
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND = hci_command_op_code(0x08, 0x0083)
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND = hci_command_op_code(0x08, 0x0084)
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND = hci_command_op_code(0x08, 0x0085)
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x0086)
# HCI Error Codes # HCI Error Codes
@@ -1317,56 +1323,72 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
( (
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND, HCI_LE_SET_DEFAULT_SUBRATE_COMMAND,
HCI_LE_SUBRATE_REQUEST_COMMAND, HCI_LE_SUBRATE_REQUEST_COMMAND,
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None,
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND,
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND
),
# Octet 47
(
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None, None,
None, None,
None, None,
None, None,
None, None,
None
) )
) )
# LE Supported Features # LE Supported Features
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0 # See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1 HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2 HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3 HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4 HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5 HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6 HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7 HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8 HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9 HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10 HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11 HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12 HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13 HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14 HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15 HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16 HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17 HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18 HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19 HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20 HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21 HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22 HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23 HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24 HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25 HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26 HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27 HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28 HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29 HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30 HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31 HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32 HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33 HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34 HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35 HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36 HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37 HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38 HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39 HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40
HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44
HCI_LE_SUPPORTED_FEATURES_NAMES = { HCI_LE_SUPPORTED_FEATURES_NAMES = {
flag: feature_name for (feature_name, flag) in globals().items() flag: feature_name for (feature_name, flag) in globals().items()
@@ -1629,7 +1651,7 @@ class HCI_Object:
field_bytes = bytes(field_value) field_bytes = bytes(field_value)
elif field_type == 'v': elif field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning # Variable-length bytes field, with 1-byte length at the beginning
field_bytes = bytes(field_bytes) field_bytes = bytes(field_value)
field_length = len(field_bytes) field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr( elif isinstance(field_value, (bytes, bytearray)) or hasattr(
@@ -1941,25 +1963,15 @@ Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class OwnAddressType: class OwnAddressType(enum.IntEnum):
PUBLIC = 0 PUBLIC = 0
RANDOM = 1 RANDOM = 1
RESOLVABLE_OR_PUBLIC = 2 RESOLVABLE_OR_PUBLIC = 2
RESOLVABLE_OR_RANDOM = 3 RESOLVABLE_OR_RANDOM = 3
TYPE_NAMES = { @classmethod
PUBLIC: 'PUBLIC', def type_spec(cls):
RANDOM: 'RANDOM', return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name}
RESOLVABLE_OR_PUBLIC: 'RESOLVABLE_OR_PUBLIC',
RESOLVABLE_OR_RANDOM: 'RESOLVABLE_OR_RANDOM',
}
@staticmethod
def type_name(type_id):
return name_or_number(OwnAddressType.TYPE_NAMES, type_id)
# pylint: disable-next=unnecessary-lambda
TYPE_SPEC = {'size': 1, 'mapper': lambda x: OwnAddressType.type_name(x)}
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -1986,6 +1998,9 @@ class HCI_Packet:
if packet_type == HCI_EVENT_PACKET: if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet) return HCI_Event.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
return HCI_CustomPacket(packet) return HCI_CustomPacket(packet)
def __init__(self, name): def __init__(self, name):
@@ -2018,6 +2033,7 @@ class HCI_Command(HCI_Packet):
hci_packet_type = HCI_COMMAND_PACKET hci_packet_type = HCI_COMMAND_PACKET
command_names: Dict[int, str] = {} command_names: Dict[int, str] = {}
command_classes: Dict[int, Type[HCI_Command]] = {} command_classes: Dict[int, Type[HCI_Command]] = {}
op_code: int
@staticmethod @staticmethod
def command(fields=(), return_parameters_fields=()): def command(fields=(), return_parameters_fields=()):
@@ -2103,7 +2119,11 @@ class HCI_Command(HCI_Packet):
return_parameters.fields = cls.return_parameters_fields return_parameters.fields = cls.return_parameters_fields
return return_parameters return return_parameters
def __init__(self, op_code, parameters=None, **kwargs): def __init__(self, op_code=-1, parameters=None, **kwargs):
# Since the legacy implementation relies on an __init__ injector, typing always
# complains that positional argument op_code is not passed, so here sets a
# default value to allow building derived HCI_Command without op_code.
assert op_code != -1
super().__init__(HCI_Command.command_name(op_code)) super().__init__(HCI_Command.command_name(op_code))
if (fields := getattr(self, 'fields', None)) and kwargs: if (fields := getattr(self, 'fields', None)) and kwargs:
HCI_Object.init_from_fields(self, fields, kwargs) HCI_Object.init_from_fields(self, fields, kwargs)
@@ -3344,7 +3364,7 @@ class HCI_LE_Set_Random_Address_Command(HCI_Command):
), ),
}, },
), ),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type), ('peer_address', Address.parse_address_preceded_by_type),
('advertising_channel_map', 1), ('advertising_channel_map', 1),
@@ -3437,7 +3457,7 @@ class HCI_LE_Set_Advertising_Enable_Command(HCI_Command):
('le_scan_type', 1), ('le_scan_type', 1),
('le_scan_interval', 2), ('le_scan_interval', 2),
('le_scan_window', 2), ('le_scan_window', 2),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('scanning_filter_policy', 1), ('scanning_filter_policy', 1),
] ]
) )
@@ -3476,7 +3496,7 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command):
('initiator_filter_policy', 1), ('initiator_filter_policy', 1),
('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type), ('peer_address', Address.parse_address_preceded_by_type),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('connection_interval_min', 2), ('connection_interval_min', 2),
('connection_interval_max', 2), ('connection_interval_max', 2),
('max_latency', 2), ('max_latency', 2),
@@ -3883,7 +3903,7 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
), ),
}, },
), ),
('own_address_type', OwnAddressType.TYPE_SPEC), ('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC), ('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type), ('peer_address', Address.parse_address_preceded_by_type),
('advertising_filter_policy', 1), ('advertising_filter_policy', 1),
@@ -4279,7 +4299,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('initiator_filter_policy:', self.initiator_filter_policy), ('initiator_filter_policy:', self.initiator_filter_policy),
( (
'own_address_type: ', 'own_address_type: ',
OwnAddressType.type_name(self.own_address_type), OwnAddressType(self.own_address_type).name,
), ),
( (
'peer_address_type: ', 'peer_address_type: ',
@@ -5160,6 +5180,21 @@ HCI_LE_Meta_Event.subevent_classes[
] = HCI_LE_Extended_Advertising_Report_Event ] = HCI_LE_Extended_Advertising_Report_Event
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', 1),
('advertising_handle', 1),
('connection_handle', 2),
('number_completed_extended_advertising_events', 1),
]
)
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.18 LE Advertising Set Terminated Event
'''
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event([('connection_handle', 2), ('channel_selection_algorithm', 1)]) @HCI_LE_Meta_Event.event([('connection_handle', 2), ('channel_selection_algorithm', 1)])
class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event): class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event):
@@ -6093,7 +6128,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if ts_flag: if ts_flag:
if not should_include_sdu_info: if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}') logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, _ = struct.unpack_from('<I', packet, pos) time_stamp, *_ = struct.unpack_from('<I', packet, pos)
pos += 4 pos += 4
if should_include_sdu_info: if should_include_sdu_info:
@@ -6160,7 +6195,7 @@ class HCI_IsoDataPacket(HCI_Packet):
self.packet_sequence_number, self.packet_sequence_number,
self.iso_sdu_length | self.packet_status_flag << 14, self.iso_sdu_length | self.packet_status_flag << 14,
] ]
return struct.pack(fmt, args) + self.iso_sdu_fragment return struct.pack(fmt, *args) + self.iso_sdu_fragment
def __str__(self) -> str: def __str__(self) -> str:
return ( return (
+29 -17
View File
@@ -21,7 +21,7 @@ import collections
import logging import logging
import struct import struct
from typing import Optional, TYPE_CHECKING, Dict, Callable, Awaitable, cast from typing import Any, Awaitable, Callable, Dict, Optional, Union, cast, TYPE_CHECKING
from bumble.colors import color from bumble.colors import color
from bumble.l2cap import L2CAP_PDU from bumble.l2cap import L2CAP_PDU
@@ -124,7 +124,8 @@ class Connection:
class Host(AbortableEventEmitter): class Host(AbortableEventEmitter):
connections: Dict[int, Connection] connections: Dict[int, Connection]
acl_packet_queue: collections.deque[HCI_AclDataPacket] acl_packet_queue: collections.deque[HCI_AclDataPacket]
hci_sink: TransportSink hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
long_term_key_provider: Optional[ long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]] Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
] ]
@@ -137,9 +138,8 @@ class Host(AbortableEventEmitter):
) -> None: ) -> None:
super().__init__() super().__init__()
self.hci_metadata = None self.hci_metadata = {}
self.ready = False # True when we can accept incoming packets self.ready = False # True when we can accept incoming packets
self.reset_done = False
self.connections = {} # Connections, by connection handle self.connections = {} # Connections, by connection handle
self.pending_command = None self.pending_command = None
self.pending_response = None self.pending_response = None
@@ -162,10 +162,7 @@ class Host(AbortableEventEmitter):
# Connect to the source and sink if specified # Connect to the source and sink if specified
if controller_source: if controller_source:
controller_source.set_packet_sink(self) self.set_packet_source(controller_source)
self.hci_metadata = getattr(
controller_source, 'metadata', self.hci_metadata
)
if controller_sink: if controller_sink:
self.set_packet_sink(controller_sink) self.set_packet_sink(controller_sink)
@@ -200,17 +197,21 @@ class Host(AbortableEventEmitter):
self.ready = False self.ready = False
await self.flush() await self.flush()
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
# Instantiate and init a driver for the host if needed. # Instantiate and init a driver for the host if needed.
# NOTE: we don't keep a reference to the driver here, because we don't # NOTE: we don't keep a reference to the driver here, because we don't
# currently have a need for the driver later on. But if the driver interface # currently have a need for the driver later on. But if the driver interface
# evolves, it may be required, then, to store a reference to the driver in # evolves, it may be required, then, to store a reference to the driver in
# an object property. # an object property.
reset_needed = True
if driver_factory is not None: if driver_factory is not None:
if driver := await driver_factory(self): if driver := await driver_factory(self):
await driver.init_controller() await driver.init_controller()
reset_needed = False
# Send a reset command unless a driver has already done so.
if reset_needed:
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command( response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True HCI_Read_Local_Supported_Commands_Command(), check_result=True
@@ -313,25 +314,28 @@ class Host(AbortableEventEmitter):
) )
) )
self.reset_done = True
@property @property
def controller(self) -> TransportSink: def controller(self) -> Optional[TransportSink]:
return self.hci_sink return self.hci_sink
@controller.setter @controller.setter
def controller(self, controller): def controller(self, controller) -> None:
self.set_packet_sink(controller) self.set_packet_sink(controller)
if controller: if controller:
controller.set_packet_sink(self) controller.set_packet_sink(self)
def set_packet_sink(self, sink: TransportSink) -> None: def set_packet_sink(self, sink: Optional[TransportSink]) -> None:
self.hci_sink = sink self.hci_sink = sink
def set_packet_source(self, source: TransportSource) -> None:
source.set_packet_sink(self)
self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
def send_hci_packet(self, packet: HCI_Packet) -> None: def send_hci_packet(self, packet: HCI_Packet) -> None:
if self.snooper: if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER) self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
self.hci_sink.on_packet(bytes(packet)) if self.hci_sink:
self.hci_sink.on_packet(bytes(packet))
async def send_command(self, command, check_result=False): async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}') logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
@@ -721,6 +725,14 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event): def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event) self.on_hci_le_advertising_report_event(event)
def on_hci_le_advertising_set_terminated_event(self, event):
self.emit(
'advertising_set_termination',
event.status,
event.advertising_handle,
event.connection_handle,
)
def on_hci_le_cis_request_event(self, event): def on_hci_le_cis_request_event(self, event):
self.emit( self.emit(
'cis_request', 'cis_request',
+1 -1
View File
@@ -1926,7 +1926,7 @@ class ChannelManager:
supervision_timeout=request.timeout, supervision_timeout=request.timeout,
min_ce_length=0, min_ce_length=0,
max_ce_length=0, max_ce_length=0,
) # type: ignore[call-arg] )
) )
else: else:
self.send_control_frame( self.send_control_frame(
+496
View File
@@ -0,0 +1,496 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Sequence
import dataclasses
import enum
import struct
import functools
from typing import Optional, List, Union
from bumble import hci
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class AudioLocation(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location'''
# fmt: off
NOT_ALLOWED = 0x00000000
FRONT_LEFT = 0x00000001
FRONT_RIGHT = 0x00000002
FRONT_CENTER = 0x00000004
LOW_FREQUENCY_EFFECTS_1 = 0x00000008
BACK_LEFT = 0x00000010
BACK_RIGHT = 0x00000020
FRONT_LEFT_OF_CENTER = 0x00000040
FRONT_RIGHT_OF_CENTER = 0x00000080
BACK_CENTER = 0x00000100
LOW_FREQUENCY_EFFECTS_2 = 0x00000200
SIDE_LEFT = 0x00000400
SIDE_RIGHT = 0x00000800
TOP_FRONT_LEFT = 0x00001000
TOP_FRONT_RIGHT = 0x00002000
TOP_FRONT_CENTER = 0x00004000
TOP_CENTER = 0x00008000
TOP_BACK_LEFT = 0x00010000
TOP_BACK_RIGHT = 0x00020000
TOP_SIDE_LEFT = 0x00040000
TOP_SIDE_RIGHT = 0x00080000
TOP_BACK_CENTER = 0x00100000
BOTTOM_FRONT_CENTER = 0x00200000
BOTTOM_FRONT_LEFT = 0x00400000
BOTTOM_FRONT_RIGHT = 0x00800000
FRONT_LEFT_WIDE = 0x01000000
FRONT_RIGHT_WIDE = 0x02000000
LEFT_SURROUND = 0x04000000
RIGHT_SURROUND = 0x08000000
class AudioInputType(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type'''
# fmt: off
UNSPECIFIED = 0x00
BLUETOOTH = 0x01
MICROPHONE = 0x02
ANALOG = 0x03
DIGITAL = 0x04
RADIO = 0x05
STREAMING = 0x06
AMBIENT = 0x07
class ContextType(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type'''
# fmt: off
PROHIBITED = 0x0000
CONVERSATIONAL = 0x0002
MEDIA = 0x0004
GAME = 0x0008
INSTRUCTIONAL = 0x0010
VOICE_ASSISTANTS = 0x0020
LIVE = 0x0040
SOUND_EFFECTS = 0x0080
NOTIFICATIONS = 0x0100
RINGTONE = 0x0200
ALERTS = 0x0400
EMERGENCY_ALARM = 0x0800
class SamplingFrequency(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
# fmt: off
FREQ_8000 = 0x01
FREQ_11025 = 0x02
FREQ_16000 = 0x03
FREQ_22050 = 0x04
FREQ_24000 = 0x05
FREQ_32000 = 0x06
FREQ_44100 = 0x07
FREQ_48000 = 0x08
FREQ_88200 = 0x09
FREQ_96000 = 0x0A
FREQ_176400 = 0x0B
FREQ_192000 = 0x0C
FREQ_384000 = 0x0D
# fmt: on
@classmethod
def from_hz(cls, frequency: int) -> SamplingFrequency:
return {
8000: SamplingFrequency.FREQ_8000,
11025: SamplingFrequency.FREQ_11025,
16000: SamplingFrequency.FREQ_16000,
22050: SamplingFrequency.FREQ_22050,
24000: SamplingFrequency.FREQ_24000,
32000: SamplingFrequency.FREQ_32000,
44100: SamplingFrequency.FREQ_44100,
48000: SamplingFrequency.FREQ_48000,
88200: SamplingFrequency.FREQ_88200,
96000: SamplingFrequency.FREQ_96000,
176400: SamplingFrequency.FREQ_176400,
192000: SamplingFrequency.FREQ_192000,
384000: SamplingFrequency.FREQ_384000,
}[frequency]
@property
def hz(self) -> int:
return {
SamplingFrequency.FREQ_8000: 8000,
SamplingFrequency.FREQ_11025: 11025,
SamplingFrequency.FREQ_16000: 16000,
SamplingFrequency.FREQ_22050: 22050,
SamplingFrequency.FREQ_24000: 24000,
SamplingFrequency.FREQ_32000: 32000,
SamplingFrequency.FREQ_44100: 44100,
SamplingFrequency.FREQ_48000: 48000,
SamplingFrequency.FREQ_88200: 88200,
SamplingFrequency.FREQ_96000: 96000,
SamplingFrequency.FREQ_176400: 176400,
SamplingFrequency.FREQ_192000: 192000,
SamplingFrequency.FREQ_384000: 384000,
}[self]
class SupportedSamplingFrequency(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency'''
# fmt: off
FREQ_8000 = 1 << (SamplingFrequency.FREQ_8000 - 1)
FREQ_11025 = 1 << (SamplingFrequency.FREQ_11025 - 1)
FREQ_16000 = 1 << (SamplingFrequency.FREQ_16000 - 1)
FREQ_22050 = 1 << (SamplingFrequency.FREQ_22050 - 1)
FREQ_24000 = 1 << (SamplingFrequency.FREQ_24000 - 1)
FREQ_32000 = 1 << (SamplingFrequency.FREQ_32000 - 1)
FREQ_44100 = 1 << (SamplingFrequency.FREQ_44100 - 1)
FREQ_48000 = 1 << (SamplingFrequency.FREQ_48000 - 1)
FREQ_88200 = 1 << (SamplingFrequency.FREQ_88200 - 1)
FREQ_96000 = 1 << (SamplingFrequency.FREQ_96000 - 1)
FREQ_176400 = 1 << (SamplingFrequency.FREQ_176400 - 1)
FREQ_192000 = 1 << (SamplingFrequency.FREQ_192000 - 1)
FREQ_384000 = 1 << (SamplingFrequency.FREQ_384000 - 1)
# fmt: on
@classmethod
def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency:
MAPPING = {
8000: SupportedSamplingFrequency.FREQ_8000,
11025: SupportedSamplingFrequency.FREQ_11025,
16000: SupportedSamplingFrequency.FREQ_16000,
22050: SupportedSamplingFrequency.FREQ_22050,
24000: SupportedSamplingFrequency.FREQ_24000,
32000: SupportedSamplingFrequency.FREQ_32000,
44100: SupportedSamplingFrequency.FREQ_44100,
48000: SupportedSamplingFrequency.FREQ_48000,
88200: SupportedSamplingFrequency.FREQ_88200,
96000: SupportedSamplingFrequency.FREQ_96000,
176400: SupportedSamplingFrequency.FREQ_176400,
192000: SupportedSamplingFrequency.FREQ_192000,
384000: SupportedSamplingFrequency.FREQ_384000,
}
return functools.reduce(
lambda x, y: x | MAPPING[y],
frequencies,
cls(0),
)
class FrameDuration(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration'''
# fmt: off
DURATION_7500_US = 0x00
DURATION_10000_US = 0x01
class SupportedFrameDuration(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
# fmt: off
DURATION_7500_US_SUPPORTED = 0b0001
DURATION_10000_US_SUPPORTED = 0b0010
DURATION_7500_US_PREFERRED = 0b0001
DURATION_10000_US_PREFERRED = 0b0010
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def bits_to_channel_counts(data: int) -> List[int]:
pos = 0
counts = []
while data != 0:
# Bit 0 = count 1
# Bit 1 = count 2, and so on
pos += 1
if data & 1:
counts.append(pos)
data >>= 1
return counts
def channel_counts_to_bits(counts: Sequence[int]) -> int:
return sum(set([1 << (count - 1) for count in counts]))
# -----------------------------------------------------------------------------
# Structures
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CodecSpecificCapabilities:
'''See:
* Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures
* Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements
'''
class Type(enum.IntEnum):
# fmt: off
SAMPLING_FREQUENCY = 0x01
FRAME_DURATION = 0x02
AUDIO_CHANNEL_COUNT = 0x03
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
supported_sampling_frequencies: SupportedSamplingFrequency
supported_frame_durations: SupportedFrameDuration
supported_audio_channel_counts: Sequence[int]
min_octets_per_codec_frame: int
max_octets_per_codec_frame: int
supported_max_codec_frames_per_sdu: int
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities:
offset = 0
# Allowed default values.
supported_audio_channel_counts = [1]
supported_max_codec_frames_per_sdu = 1
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
value = int.from_bytes(data[offset : offset + length - 1], 'little')
offset += length - 1
if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY:
supported_sampling_frequencies = SupportedSamplingFrequency(value)
elif type == CodecSpecificCapabilities.Type.FRAME_DURATION:
supported_frame_durations = SupportedFrameDuration(value)
elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT:
supported_audio_channel_counts = bits_to_channel_counts(value)
elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME:
min_octets_per_sample = value & 0xFFFF
max_octets_per_sample = value >> 16
elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU:
supported_max_codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
return CodecSpecificCapabilities(
supported_sampling_frequencies=supported_sampling_frequencies,
supported_frame_durations=supported_frame_durations,
supported_audio_channel_counts=supported_audio_channel_counts,
min_octets_per_codec_frame=min_octets_per_sample,
max_octets_per_codec_frame=max_octets_per_sample,
supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu,
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBHBBBBBBBBHHBBB',
3,
CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY,
self.supported_sampling_frequencies,
2,
CodecSpecificCapabilities.Type.FRAME_DURATION,
self.supported_frame_durations,
2,
CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT,
channel_counts_to_bits(self.supported_audio_channel_counts),
5,
CodecSpecificCapabilities.Type.OCTETS_PER_FRAME,
self.min_octets_per_codec_frame,
self.max_octets_per_codec_frame,
2,
CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU,
self.supported_max_codec_frames_per_sdu,
)
@dataclasses.dataclass
class PacRecord:
coding_format: hci.CodingFormat
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
# TODO: Parse Metadata
metadata: bytes = b''
@classmethod
def from_bytes(cls, data: bytes) -> PacRecord:
offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0)
codec_specific_capabilities_size = data[offset]
offset += 1
codec_specific_capabilities_bytes = data[
offset : offset + codec_specific_capabilities_size
]
offset += codec_specific_capabilities_size
metadata_size = data[offset]
metadata = data[offset : offset + metadata_size]
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC:
codec_specific_capabilities = codec_specific_capabilities_bytes
else:
codec_specific_capabilities = CodecSpecificCapabilities.from_bytes(
codec_specific_capabilities_bytes
)
return PacRecord(
coding_format=coding_format,
codec_specific_capabilities=codec_specific_capabilities,
metadata=metadata,
)
def __bytes__(self) -> bytes:
capabilities_bytes = bytes(self.codec_specific_capabilities)
return (
bytes(self.coding_format)
+ bytes([len(capabilities_bytes)])
+ capabilities_bytes
+ bytes([len(self.metadata)])
+ self.metadata
)
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesService(gatt.TemplateService):
UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE
sink_pac: Optional[gatt.Characteristic]
sink_audio_locations: Optional[gatt.Characteristic]
source_pac: Optional[gatt.Characteristic]
source_audio_locations: Optional[gatt.Characteristic]
available_audio_contexts: gatt.Characteristic
supported_audio_contexts: gatt.Characteristic
def __init__(
self,
supported_source_context: ContextType,
supported_sink_context: ContextType,
available_source_context: ContextType,
available_sink_context: ContextType,
sink_pac: Sequence[PacRecord] = [],
sink_audio_locations: Optional[AudioLocation] = None,
source_pac: Sequence[PacRecord] = [],
source_audio_locations: Optional[AudioLocation] = None,
) -> None:
characteristics = []
self.supported_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', supported_sink_context, supported_source_context),
)
characteristics.append(self.supported_audio_contexts)
self.available_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', available_sink_context, available_source_context),
)
characteristics.append(self.available_audio_contexts)
if sink_pac:
self.sink_pac = gatt.Characteristic(
uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)),
)
characteristics.append(self.sink_pac)
if sink_audio_locations is not None:
self.sink_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', sink_audio_locations),
)
characteristics.append(self.sink_audio_locations)
if source_pac:
self.source_pac = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)),
)
characteristics.append(self.source_pac)
if source_audio_locations is not None:
self.source_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', source_audio_locations),
)
characteristics.append(self.source_audio_locations)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt_client.CharacteristicProxy] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
source_pac: Optional[gatt_client.CharacteristicProxy] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
available_audio_contexts: gatt_client.CharacteristicProxy
supported_audio_contexts: gatt_client.CharacteristicProxy
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.available_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC
):
self.sink_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC
):
self.source_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
):
self.sink_audio_locations = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = characteristics[0]
+1 -1
View File
@@ -1090,7 +1090,7 @@ class Session:
# We can now encrypt the connection with the short term key, so that we can # We can now encrypt the connection with the short term key, so that we can
# distribute the long term and/or other keys over an encrypted connection # distribute the long term and/or other keys over an encrypted connection
self.manager.device.host.send_command_sync( self.manager.device.host.send_command_sync(
HCI_LE_Enable_Encryption_Command( # type: ignore[call-arg] HCI_LE_Enable_Encryption_Command(
connection_handle=self.connection.handle, connection_handle=self.connection.handle,
random_number=bytes(8), random_number=bytes(8),
encrypted_diversifier=0, encrypted_diversifier=0,
+49 -21
View File
@@ -18,6 +18,7 @@
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import logging import logging
import os import os
from typing import Optional
from .common import Transport, AsyncPipeSink, SnoopingTransport from .common import Transport, AsyncPipeSink, SnoopingTransport
from ..snoop import create_snooper from ..snoop import create_snooper
@@ -52,8 +53,16 @@ def _wrap_transport(transport: Transport) -> Transport:
async def open_transport(name: str) -> Transport: async def open_transport(name: str) -> Transport:
""" """
Open a transport by name. Open a transport by name.
The name must be <type>:<parameters> The name must be <type>:<metadata><parameters>
Where <parameters> depend on the type (and may be empty for some types). Where <parameters> depend on the type (and may be empty for some types), and
<metadata> is either omitted, or a ,-separated list of <key>=<value> pairs,
enclosed in [].
If there are not metadata or parameter, the : after the <type> may be omitted.
Examples:
* usb:0
* usb:[driver=rtk]0
* android-netsim
The supported types are: The supported types are:
* serial * serial
* udp * udp
@@ -71,87 +80,106 @@ async def open_transport(name: str) -> Transport:
* android-netsim * android-netsim
""" """
return _wrap_transport(await _open_transport(name)) scheme, *tail = name.split(':', 1)
spec = tail[0] if tail else None
if spec:
# Metadata may precede the spec
if spec.startswith('['):
metadata_str, *tail = spec[1:].split(']')
spec = tail[0] if tail else None
metadata = dict([entry.split('=') for entry in metadata_str.split(',')])
else:
metadata = None
transport = await _open_transport(scheme, spec)
if metadata:
transport.source.metadata = { # type: ignore[attr-defined]
**metadata,
**getattr(transport.source, 'metadata', {}),
}
# pylint: disable=line-too-long
logger.debug(f'HCI metadata: {transport.source.metadata}') # type: ignore[attr-defined]
return _wrap_transport(transport)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def _open_transport(name: str) -> Transport: async def _open_transport(scheme: str, spec: Optional[str]) -> Transport:
# pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
# pylint: disable=too-many-return-statements # pylint: disable=too-many-return-statements
scheme, *spec = name.split(':', 1)
if scheme == 'serial' and spec: if scheme == 'serial' and spec:
from .serial import open_serial_transport from .serial import open_serial_transport
return await open_serial_transport(spec[0]) return await open_serial_transport(spec)
if scheme == 'udp' and spec: if scheme == 'udp' and spec:
from .udp import open_udp_transport from .udp import open_udp_transport
return await open_udp_transport(spec[0]) return await open_udp_transport(spec)
if scheme == 'tcp-client' and spec: if scheme == 'tcp-client' and spec:
from .tcp_client import open_tcp_client_transport from .tcp_client import open_tcp_client_transport
return await open_tcp_client_transport(spec[0]) return await open_tcp_client_transport(spec)
if scheme == 'tcp-server' and spec: if scheme == 'tcp-server' and spec:
from .tcp_server import open_tcp_server_transport from .tcp_server import open_tcp_server_transport
return await open_tcp_server_transport(spec[0]) return await open_tcp_server_transport(spec)
if scheme == 'ws-client' and spec: if scheme == 'ws-client' and spec:
from .ws_client import open_ws_client_transport from .ws_client import open_ws_client_transport
return await open_ws_client_transport(spec[0]) return await open_ws_client_transport(spec)
if scheme == 'ws-server' and spec: if scheme == 'ws-server' and spec:
from .ws_server import open_ws_server_transport from .ws_server import open_ws_server_transport
return await open_ws_server_transport(spec[0]) return await open_ws_server_transport(spec)
if scheme == 'pty': if scheme == 'pty':
from .pty import open_pty_transport from .pty import open_pty_transport
return await open_pty_transport(spec[0] if spec else None) return await open_pty_transport(spec)
if scheme == 'file': if scheme == 'file':
from .file import open_file_transport from .file import open_file_transport
assert spec is not None assert spec is not None
return await open_file_transport(spec[0]) return await open_file_transport(spec)
if scheme == 'vhci': if scheme == 'vhci':
from .vhci import open_vhci_transport from .vhci import open_vhci_transport
return await open_vhci_transport(spec[0] if spec else None) return await open_vhci_transport(spec)
if scheme == 'hci-socket': if scheme == 'hci-socket':
from .hci_socket import open_hci_socket_transport from .hci_socket import open_hci_socket_transport
return await open_hci_socket_transport(spec[0] if spec else None) return await open_hci_socket_transport(spec)
if scheme == 'usb': if scheme == 'usb':
from .usb import open_usb_transport from .usb import open_usb_transport
assert spec is not None assert spec
return await open_usb_transport(spec[0]) return await open_usb_transport(spec)
if scheme == 'pyusb': if scheme == 'pyusb':
from .pyusb import open_pyusb_transport from .pyusb import open_pyusb_transport
assert spec is not None assert spec
return await open_pyusb_transport(spec[0]) return await open_pyusb_transport(spec)
if scheme == 'android-emulator': if scheme == 'android-emulator':
from .android_emulator import open_android_emulator_transport from .android_emulator import open_android_emulator_transport
return await open_android_emulator_transport(spec[0] if spec else None) return await open_android_emulator_transport(spec)
if scheme == 'android-netsim': if scheme == 'android-netsim':
from .android_netsim import open_android_netsim_transport from .android_netsim import open_android_netsim_transport
return await open_android_netsim_transport(spec[0] if spec else None) return await open_android_netsim_transport(spec)
raise ValueError('unknown transport scheme') raise ValueError('unknown transport scheme')
+1 -1
View File
@@ -69,7 +69,7 @@ async def open_android_emulator_transport(spec: Optional[str]) -> Transport:
mode = 'host' mode = 'host'
server_host = 'localhost' server_host = 'localhost'
server_port = '8554' server_port = '8554'
if spec is not None: if spec:
params = spec.split(',') params = spec.split(',')
for param in params: for param in params:
if param.startswith('mode='): if param.startswith('mode='):
+2 -1
View File
@@ -21,7 +21,7 @@ import struct
import asyncio import asyncio
import logging import logging
import io import io
from typing import ContextManager, Tuple, Optional, Protocol, Dict from typing import Any, ContextManager, Tuple, Optional, Protocol, Dict
from bumble import hci from bumble import hci
from bumble.colors import color from bumble.colors import color
@@ -42,6 +42,7 @@ HCI_PACKET_INFO: Dict[int, Tuple[int, int, str]] = {
hci.HCI_ACL_DATA_PACKET: (2, 2, 'H'), hci.HCI_ACL_DATA_PACKET: (2, 2, 'H'),
hci.HCI_SYNCHRONOUS_DATA_PACKET: (1, 2, 'B'), hci.HCI_SYNCHRONOUS_DATA_PACKET: (1, 2, 'B'),
hci.HCI_EVENT_PACKET: (1, 1, 'B'), hci.HCI_EVENT_PACKET: (1, 1, 'B'),
hci.HCI_ISO_DATA_PACKET: (2, 2, 'H'),
} }
+1 -4
View File
@@ -59,10 +59,7 @@ async def open_hci_socket_transport(spec: Optional[str]) -> Transport:
) from error ) from error
# Compute the adapter index # Compute the adapter index
if spec is None: adapter_index = int(spec) if spec else 0
adapter_index = 0
else:
adapter_index = int(spec)
# Bind the socket # Bind the socket
# NOTE: since Python doesn't support binding with the required address format (yet), # NOTE: since Python doesn't support binding with the required address format (yet),
+9
View File
@@ -5,6 +5,15 @@ Some Bluetooth controllers require a driver to function properly.
This may include, for instance, loading a Firmware image or patch, This may include, for instance, loading a Firmware image or patch,
loading a configuration. loading a configuration.
By default, drivers will be automatically probed to determine if they should be
used with particular HCI controller.
When the transport for an HCI controller is instantiated from a transport name,
a driver may also be forced by specifying ``driver=<driver-name>`` in the optional
metadata portion of the transport name. For example,
``usb:[driver=-rtk]0`` indicates that the ``rtk`` driver should be used with the
first USB device, even if a normal probe would not have selected it based on the
USB vendor ID and product ID.
Drivers included in the module are: Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles. * [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
+5 -2
View File
@@ -1,13 +1,16 @@
REALTEK DRIVER REALTEK DRIVER
============== ==============
This driver supports loading firmware images and optional config data to This driver supports loading firmware images and optional config data to
USB dongles with a Realtek chipset. USB dongles with a Realtek chipset.
A number of USB dongles are supported, but likely not all. A number of USB dongles are supported, but likely not all.
When using a USB dongle, the USB product ID and manufacturer ID are used When using a USB dongle, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data. load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=rtk`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=rtk]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for those files by name, in order, in: The driver will look for those files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_RTK_FIRMWARE_DIR` * The directory specified by the environment variable `BUMBLE_RTK_FIRMWARE_DIR`
-1
View File
@@ -73,7 +73,6 @@ async def main() -> None:
HCI_Enhanced_Setup_Synchronous_Connection_Command( HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle, connection_handle=connections[0].handle,
**ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(), **ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
# type: ignore[call-args]
) )
) )
+134
View File
@@ -0,0 +1,134 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
CodecSpecificCapabilities,
ContextType,
AudioLocation,
SupportedSamplingFrequency,
SupportedFrameDuration,
PacRecord,
PublishedAudioCapabilitiesService,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_cig_setup.py <config-file>' '<transport-spec-for-device>')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.cis_enabled = True
await device.power_on()
device.add_service(
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
available_source_context=ContextType.PROHIBITED,
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
sink_audio_locations=(
AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
),
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
)
)
advertising_data = bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
)
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+4 -4
View File
@@ -1073,9 +1073,9 @@ checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]] [[package]]
name = "openssl" name = "openssl"
version = "0.10.57" version = "0.10.60"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
dependencies = [ dependencies = [
"bitflags 2.4.0", "bitflags 2.4.0",
"cfg-if", "cfg-if",
@@ -1105,9 +1105,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]] [[package]]
name = "openssl-sys" name = "openssl-sys"
version = "0.9.92" version = "0.9.96"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db7e971c2c2bba161b2d2fdf37080177eff520b3bc044787c7f1f5f9e78d869b" checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
dependencies = [ dependencies = [
"cc", "cc",
"libc", "libc",
+151
View File
@@ -0,0 +1,151 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import logging
from bumble import device
from bumble.hci import CodecID, CodingFormat
from bumble.profiles.bap import (
AudioLocation,
SupportedFrameDuration,
SupportedSamplingFrequency,
CodecSpecificCapabilities,
ContextType,
PacRecord,
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def test_codec_specific_capabilities() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
assert CodecSpecificCapabilities.from_bytes(bytes(cap)) == cap
# -----------------------------------------------------------------------------
def test_pac_record() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
pac_record = PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=cap,
metadata=b'',
)
assert PacRecord.from_bytes(bytes(pac_record)) == pac_record
# -----------------------------------------------------------------------------
def test_vendor_specific_pac_record() -> None:
# Vendor-Specific codec, Google, ID=0xFFFF. No capabilities and metadata.
RAW_DATA = bytes.fromhex('ffe000ffff0000')
assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():
devices = TwoDevices()
devices[0].add_service(
PublishedAudioCapabilitiesService(
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
supported_source_context=0,
available_source_context=0,
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
sink_audio_locations=AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT,
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
pacs_client = await peer.discover_service_and_create_proxy(
PublishedAudioCapabilitiesServiceProxy
)
# -----------------------------------------------------------------------------
async def run():
await test_pacs()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
+174 -1
View File
@@ -20,8 +20,14 @@ import logging
import os import os
from types import LambdaType from types import LambdaType
import pytest import pytest
from unittest import mock
from bumble.core import BT_BR_EDR_TRANSPORT from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import Connection, Device from bumble.device import Connection, Device
from bumble.host import Host from bumble.host import Host
from bumble.hci import ( from bumble.hci import (
@@ -30,6 +36,7 @@ from bumble.hci import (
HCI_CREATE_CONNECTION_COMMAND, HCI_CREATE_CONNECTION_COMMAND,
HCI_SUCCESS, HCI_SUCCESS,
Address, Address,
OwnAddressType,
HCI_Command_Complete_Event, HCI_Command_Complete_Event,
HCI_Command_Status_Event, HCI_Command_Status_Event,
HCI_Connection_Complete_Event, HCI_Connection_Complete_Event,
@@ -232,6 +239,172 @@ async def test_flush():
pass pass
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_legacy_advertising()
assert device.legacy_advertiser
# Stop advertising
await advertiser.stop()
assert not device.legacy_advertiser
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_legacy_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
advertiser = await device.start_legacy_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.start_legacy_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_legacy_advertising.assert_called_with(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_legacy_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_extended_advertising()
assert device.extended_advertisers
# Stop advertising
await advertiser.stop()
assert not device.extended_advertisers
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(
own_address_type=own_address_type
)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_gatt_services_with_gas(): def test_gatt_services_with_gas():
device = Device(host=Host(None, None)) device = Device(host=Host(None, None))
+35
View File
@@ -32,6 +32,7 @@ from bumble.hci import (
HCI_CustomPacket, HCI_CustomPacket,
HCI_Disconnect_Command, HCI_Disconnect_Command,
HCI_Event, HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Add_Device_To_Filter_Accept_List_Command, HCI_LE_Add_Device_To_Filter_Accept_List_Command,
HCI_LE_Advertising_Report_Event, HCI_LE_Advertising_Report_Event,
HCI_LE_Channel_Selection_Algorithm_Event, HCI_LE_Channel_Selection_Algorithm_Event,
@@ -53,6 +54,7 @@ from bumble.hci import (
HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command, HCI_LE_Set_Scan_Enable_Command,
HCI_LE_Set_Scan_Parameters_Command, HCI_LE_Set_Scan_Parameters_Command,
HCI_LE_Setup_ISO_Data_Path_Command,
HCI_Number_Of_Completed_Packets_Event, HCI_Number_Of_Completed_Packets_Event,
HCI_Packet, HCI_Packet,
HCI_PIN_Code_Request_Reply_Command, HCI_PIN_Code_Request_Reply_Command,
@@ -455,6 +457,14 @@ def test_HCI_LE_Setup_ISO_Data_Path_Command():
assert command.controller_delay == 0 assert command.controller_delay == 0
assert command.codec_configuration == b'' assert command.codec_configuration == b''
command = HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=0x0060,
data_path_direction=0x00,
data_path_id=0x01,
codec_id=CodingFormat(CodecID.TRANSPARENT),
controller_delay=0x00,
codec_configuration=b'',
)
basic_check(command) basic_check(command)
@@ -477,6 +487,29 @@ def test_custom():
assert packet.payload == data assert packet.payload == data
# -----------------------------------------------------------------------------
def test_iso_data_packet():
data = bytes.fromhex(
'05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697'
'52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
packet = HCI_IsoDataPacket.from_bytes(data)
assert packet.connection_handle == 0x0061
assert packet.packet_status_flag == 0
assert packet.pb_flag == 0x02
assert packet.ts_flag == 0x01
assert packet.data_total_length == 68
assert packet.time_stamp == 2716911914
assert packet.packet_sequence_number == 147
assert packet.iso_sdu_length == 60
assert packet.iso_sdu_fragment == bytes.fromhex(
'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3'
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert packet.to_bytes() == data
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def run_test_events(): def run_test_events():
test_HCI_Event() test_HCI_Event()
@@ -515,6 +548,7 @@ def run_test_commands():
test_HCI_LE_Set_Default_PHY_Command() test_HCI_LE_Set_Default_PHY_Command()
test_HCI_LE_Set_Extended_Scan_Parameters_Command() test_HCI_LE_Set_Extended_Scan_Parameters_Command()
test_HCI_LE_Set_Extended_Advertising_Enable_Command() test_HCI_LE_Set_Extended_Advertising_Enable_Command()
test_HCI_LE_Setup_ISO_Data_Path_Command()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -523,3 +557,4 @@ if __name__ == '__main__':
run_test_commands() run_test_commands()
test_address() test_address()
test_custom() test_custom()
test_iso_data_packet()