Compare commits

...

31 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
98ed772e8a address PR comments and add some typing 2023-12-11 17:52:04 -08:00
Gilles Boccon-Gibod
b083cc99ad fix spec parsing 2023-12-08 18:57:02 -08:00
Gilles Boccon-Gibod
62a8ced447 support drivers that can't use reset directly. 2023-12-08 17:28:57 -08:00
zxzxwu
698d947d85 Merge pull request #366 from zxzxwu/extadv
Add advertiser classes and handle adv set terminated events
2023-12-08 09:52:42 +08:00
Josh Wu
ff6528d2bf Add Advertising unit tests 2023-12-08 01:38:01 +08:00
Josh Wu
72ac75a98d Add advertiser classes and handle adv set terminated events
* Convert hci.OwnAddressType to enum
* Add LegacyAdvertiser and ExtendedAdvertiser classes
* Rename start/stop_advertising() => start/stop_legacy_advertising()
* Handle HCI_Advertising_Set_Terminated
* Properly restart advertisement on disconnection
2023-12-07 15:51:51 +08:00
zxzxwu
88b4cbdf1a Merge pull request #364 from zxzxwu/iso
Fix ISO packet issues
2023-12-05 00:41:56 +08:00
Josh Wu
d6afbc6f4e Fix ISO packet issues 2023-12-04 20:31:11 +08:00
Gilles Boccon-Gibod
fc90de3e7b Merge pull request #351 from google/dependabot/cargo/rust/openssl-0.10.60
Bump openssl from 0.10.57 to 0.10.60 in /rust
2023-12-04 00:41:27 -08:00
Gilles Boccon-Gibod
847c2ef114 Merge pull request #362 from google/gbg/more-le-features-constants
a few more HCI constants from the spec
2023-12-04 00:38:02 -08:00
Gilles Boccon-Gibod
a0bf0c1f4d Merge pull request #363 from google/gbg/android-remote-proxy-cli
android remote proxy cli
2023-12-04 00:37:49 -08:00
Gilles Boccon-Gibod
8400ff0802 shared usage printer 2023-12-04 00:37:28 -08:00
Gilles Boccon-Gibod
0ed6aa230b address PR comment 2023-12-04 00:32:04 -08:00
Gilles Boccon-Gibod
72d5360af9 keep projects compatible with Android Studio Hedgehog 2023-12-03 18:06:54 -08:00
Gilles Boccon-Gibod
ac3961e763 add doc 2023-12-03 17:50:42 -08:00
Gilles Boccon-Gibod
843466c822 a few more constants from the spec 2023-12-03 17:16:25 -08:00
Gilles Boccon-Gibod
8385035400 add CLI support 2023-12-03 16:35:14 -08:00
zxzxwu
3adcc8be09 Merge pull request #360 from zxzxwu/hci
Remove # type: ignore[call-arg] in HCI_Command builders
2023-12-03 19:18:04 +08:00
zxzxwu
c853d56302 Merge pull request #361 from zxzxwu/hci-bug
Fix typo
2023-12-03 04:22:59 +08:00
Josh Wu
dc97be5b35 Fix typo 2023-12-02 23:42:21 +08:00
zxzxwu
73dbdfff9f Merge pull request #356 from zxzxwu/bap
Add Published Audio Capabilities Service
2023-12-02 23:34:57 +08:00
Josh Wu
dff14e1258 Add Published Audio Capabilities Service 2023-12-02 23:16:37 +08:00
Josh Wu
10a3833893 Remove # type: ignore[call-arg] in HCI_Command builders 2023-12-02 19:18:54 +08:00
zxzxwu
247cb89332 Merge pull request #358 from zxzxwu/coding2
Add variable-length bytes field
2023-12-01 03:26:38 +08:00
Josh Wu
3fc71a0266 Add variable-length bytes field 2023-12-01 03:16:52 +08:00
zxzxwu
392dcc3a05 Merge pull request #357 from zxzxwu/coding
Refactor CodingFormat
2023-12-01 03:15:33 +08:00
Josh Wu
f27015d1b7 Refactor CodingFormat
As CodingFormat is now used by HFP and LEA, and vendor specific codecs
are introduced, this object needs to provide more information.
2023-12-01 02:58:09 +08:00
zxzxwu
86a19b41aa Merge pull request #344 from zxzxwu/cis
CIS and SCO responder support
2023-11-30 21:00:55 +08:00
Josh Wu
40ae661ee5 More SCO support and warnings and typo fix 2023-11-30 12:59:43 +08:00
Josh Wu
c5def93bb8 CIS and SCO responder support 2023-11-30 12:16:40 +08:00
dependabot[bot]
7eb493990f Bump openssl from 0.10.57 to 0.10.60 in /rust
Bumps [openssl](https://github.com/sfackler/rust-openssl) from 0.10.57 to 0.10.60.
- [Release notes](https://github.com/sfackler/rust-openssl/releases)
- [Commits](https://github.com/sfackler/rust-openssl/compare/openssl-v0.10.57...openssl-v0.10.60)

---
updated-dependencies:
- dependency-name: openssl
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-28 21:43:18 +00:00
29 changed files with 2299 additions and 335 deletions

View File

@@ -23,6 +23,7 @@
"CONNECTIONLESS",
"csip",
"csrcs",
"CVSD",
"datagram",
"DATALINK",
"delayreport",
@@ -40,6 +41,7 @@
"libc",
"libusb",
"MITM",
"MSBC",
"NDIS",
"netsim",
"NONBLOCK",

File diff suppressed because it is too large Load Diff

View File

@@ -19,12 +19,17 @@ like loading firmware after a cold start.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import abc
from __future__ import annotations
import logging
import pathlib
import platform
from . import rtk
from typing import Dict, Iterable, Optional, Type, TYPE_CHECKING
from . import rtk
from .common import Driver
if TYPE_CHECKING:
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
@@ -32,40 +37,31 @@ from . import rtk
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Driver(abc.ABC):
"""Base class for drivers."""
@staticmethod
async def for_host(_host):
"""Return a driver instance for a host.
Args:
host: Host object for which a driver should be created.
Returns:
A Driver instance if a driver should be instantiated for this host, or
None if no driver instance of this class is needed.
"""
return None
@abc.abstractmethod
async def init_controller(self):
"""Initialize the controller."""
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
async def get_driver_for_host(host):
"""Probe all known diver classes until one returns a valid instance for a host,
or none is found.
async def get_driver_for_host(host: Host) -> Optional[Driver]:
"""Probe diver classes until one returns a valid instance for a host, or none is
found.
If a "driver" HCI metadata entry is present, only that driver class will be probed.
"""
if driver := await rtk.Driver.for_host(host):
logger.debug("Instantiated RTK driver")
return driver
driver_classes: Dict[str, Type[Driver]] = {"rtk": rtk.Driver}
probe_list: Iterable[str]
if driver_name := host.hci_metadata.get("driver"):
# Only probe a single driver
probe_list = [driver_name]
else:
# Probe all drivers
probe_list = driver_classes.keys()
for driver_name in probe_list:
if driver_class := driver_classes.get(driver_name):
logger.debug(f"Probing driver class: {driver_name}")
if driver := await driver_class.for_host(host):
logger.debug(f"Instantiated {driver_name} driver")
return driver
else:
logger.debug(f"Skipping unknown driver class: {driver_name}")
return None

45
bumble/drivers/common.py Normal file
View File

@@ -0,0 +1,45 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common types for drivers.
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import abc
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Driver(abc.ABC):
"""Base class for drivers."""
@staticmethod
async def for_host(_host):
"""Return a driver instance for a host.
Args:
host: Host object for which a driver should be created.
Returns:
A Driver instance if a driver should be instantiated for this host, or
None if no driver instance of this class is needed.
"""
return None
@abc.abstractmethod
async def init_controller(self):
"""Initialize the controller."""

View File

@@ -41,7 +41,7 @@ from bumble.hci import (
HCI_Reset_Command,
HCI_Read_Local_Version_Information_Command,
)
from bumble.drivers import common
# -----------------------------------------------------------------------------
# Logging
@@ -285,7 +285,7 @@ class Firmware:
)
class Driver:
class Driver(common.Driver):
@dataclass
class DriverInfo:
rom: int
@@ -470,8 +470,12 @@ class Driver:
logger.debug("USB metadata not found")
return False
vendor_id = host.hci_metadata.get("vendor_id", None)
product_id = host.hci_metadata.get("product_id", None)
if host.hci_metadata.get('driver') == 'rtk':
# Forced driver
return True
vendor_id = host.hci_metadata.get("vendor_id")
product_id = host.hci_metadata.get("product_id")
if vendor_id is None or product_id is None:
logger.debug("USB metadata not sufficient")
return False
@@ -486,6 +490,9 @@ class Driver:
@classmethod
async def driver_info_for_host(cls, host):
await host.send_command(HCI_Reset_Command(), check_result=True)
host.ready = True # Needed to let the host know the controller is ready.
response = await host.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True
)

View File

@@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import collections
import dataclasses
import enum
import functools
import logging
@@ -560,6 +561,12 @@ HCI_LE_TRANSMITTER_TEST_V4_COMMAND = hci_c
HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_command_op_code(0x08, 0x007C)
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D)
HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E)
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x007F)
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND = hci_command_op_code(0x08, 0x0082)
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND = hci_command_op_code(0x08, 0x0083)
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND = hci_command_op_code(0x08, 0x0084)
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND = hci_command_op_code(0x08, 0x0085)
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x0086)
# HCI Error Codes
@@ -1316,56 +1323,72 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
(
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND,
HCI_LE_SUBRATE_REQUEST_COMMAND,
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None,
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND,
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND
),
# Octet 47
(
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None,
None,
None,
None,
None,
None
)
)
# LE Supported Features
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40
HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44
HCI_LE_SUPPORTED_FEATURES_NAMES = {
flag: feature_name for (feature_name, flag) in globals().items()
@@ -1382,6 +1405,45 @@ HCI_LE_SUPPORTED_FEATURES_NAMES = {
STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)}
class CodecID(enum.IntEnum):
# fmt: off
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
LINEAR_PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
VENDOR_SPECIFIC = 0xFF
@dataclasses.dataclass(frozen=True)
class CodingFormat:
codec_id: CodecID
company_id: int = 0
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
return offset + 5, cls(
codec_id=CodecID(codec_id),
company_id=company_id,
vendor_specific_codec_id=vendor_specific_codec_id,
)
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@staticmethod
@@ -1477,6 +1539,12 @@ class HCI_Object:
# The rest of the bytes
field_value = data[offset:]
return (field_value, len(field_value))
if field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_length = data[offset]
offset += 1
field_value = data[offset : offset + field_length]
return (field_value, field_length + 1)
if field_type == 1:
# 8-bit unsigned
return (data[offset], 1)
@@ -1581,6 +1649,11 @@ class HCI_Object:
raise ValueError('value too large for *-typed field')
else:
field_bytes = bytes(field_value)
elif field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_bytes = bytes(field_value)
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes'
):
@@ -1888,26 +1961,17 @@ Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS
Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS)
Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ADDRESS)
# -----------------------------------------------------------------------------
class OwnAddressType:
class OwnAddressType(enum.IntEnum):
PUBLIC = 0
RANDOM = 1
RESOLVABLE_OR_PUBLIC = 2
RESOLVABLE_OR_RANDOM = 3
TYPE_NAMES = {
PUBLIC: 'PUBLIC',
RANDOM: 'RANDOM',
RESOLVABLE_OR_PUBLIC: 'RESOLVABLE_OR_PUBLIC',
RESOLVABLE_OR_RANDOM: 'RESOLVABLE_OR_RANDOM',
}
@staticmethod
def type_name(type_id):
return name_or_number(OwnAddressType.TYPE_NAMES, type_id)
# pylint: disable-next=unnecessary-lambda
TYPE_SPEC = {'size': 1, 'mapper': lambda x: OwnAddressType.type_name(x)}
@classmethod
def type_spec(cls):
return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name}
# -----------------------------------------------------------------------------
@@ -1934,6 +1998,9 @@ class HCI_Packet:
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
return HCI_CustomPacket(packet)
def __init__(self, name):
@@ -1966,6 +2033,7 @@ class HCI_Command(HCI_Packet):
hci_packet_type = HCI_COMMAND_PACKET
command_names: Dict[int, str] = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
op_code: int
@staticmethod
def command(fields=(), return_parameters_fields=()):
@@ -2051,7 +2119,11 @@ class HCI_Command(HCI_Packet):
return_parameters.fields = cls.return_parameters_fields
return return_parameters
def __init__(self, op_code, parameters=None, **kwargs):
def __init__(self, op_code=-1, parameters=None, **kwargs):
# Since the legacy implementation relies on an __init__ injector, typing always
# complains that positional argument op_code is not passed, so here sets a
# default value to allow building derived HCI_Command without op_code.
assert op_code != -1
super().__init__(HCI_Command.command_name(op_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
HCI_Object.init_from_fields(self, fields, kwargs)
@@ -2445,14 +2517,14 @@ class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command):
('connection_handle', 2),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -2473,22 +2545,6 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
See Bluetooth spec @ 7.1.45 Enhanced Setup Synchronous Connection Command
'''
class CodingFormat(enum.IntEnum):
U_LOG = 0x00
A_LOG = 0x01
CVSD = 0x02
TRANSPARENT = 0x03
PCM = 0x04
MSBC = 0x05
LC3 = 0x06
G729A = 0x07
def to_bytes(self):
return self.value.to_bytes(5, 'little')
def __bytes__(self):
return self.to_bytes()
class PcmDataFormat(enum.IntEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
@@ -2525,14 +2581,14 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
('bd_addr', Address.parse_address),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_coding_format', CodingFormat.parse_from_bytes),
('receive_coding_format', CodingFormat.parse_from_bytes),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coding_format', CodingFormat.parse_from_bytes),
('output_coding_format', CodingFormat.parse_from_bytes),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
@@ -3308,7 +3364,7 @@ class HCI_LE_Set_Random_Address_Command(HCI_Command):
),
},
),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_channel_map', 1),
@@ -3401,7 +3457,7 @@ class HCI_LE_Set_Advertising_Enable_Command(HCI_Command):
('le_scan_type', 1),
('le_scan_interval', 2),
('le_scan_window', 2),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('scanning_filter_policy', 1),
]
)
@@ -3440,7 +3496,7 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command):
('initiator_filter_policy', 1),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('connection_interval_min', 2),
('connection_interval_max', 2),
('max_latency', 2),
@@ -3847,7 +3903,7 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
),
},
),
('own_address_type', OwnAddressType.TYPE_SPEC),
('own_address_type', OwnAddressType.type_spec()),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_filter_policy', 1),
@@ -4243,7 +4299,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('initiator_filter_policy:', self.initiator_filter_policy),
(
'own_address_type: ',
OwnAddressType.type_name(self.own_address_type),
OwnAddressType(self.own_address_type).name,
),
(
'peer_address_type: ',
@@ -4451,7 +4507,10 @@ class HCI_LE_Accept_CIS_Request_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('connection_handle', 2)],
fields=[
('connection_handle', 2),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
],
)
class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
'''
@@ -4459,6 +4518,7 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
'''
connection_handle: int
reason: int
# -----------------------------------------------------------------------------
@@ -4467,9 +4527,9 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
('connection_handle', 2),
('data_path_direction', 1),
('data_path_id', 1),
('codec_id', 5),
('codec_id', CodingFormat.parse_from_bytes),
('controller_delay', 3),
('codec_configuration', '*'),
('codec_configuration', 'v'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
@@ -4484,9 +4544,9 @@ class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command):
connection_handle: int
data_path_direction: int
data_path_id: int
codec_id: int
codec_id: CodingFormat
controller_delay: int
codec_configuration: int
codec_configuration: bytes
# -----------------------------------------------------------------------------
@@ -5120,6 +5180,21 @@ HCI_LE_Meta_Event.subevent_classes[
] = HCI_LE_Extended_Advertising_Report_Event
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', 1),
('advertising_handle', 1),
('connection_handle', 2),
('number_completed_extended_advertising_events', 1),
]
)
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.18 LE Advertising Set Terminated Event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event([('connection_handle', 2), ('channel_selection_algorithm', 1)])
class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event):
@@ -6053,7 +6128,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if ts_flag:
if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, _ = struct.unpack_from('<I', packet, pos)
time_stamp, *_ = struct.unpack_from('<I', packet, pos)
pos += 4
if should_include_sdu_info:
@@ -6120,7 +6195,7 @@ class HCI_IsoDataPacket(HCI_Packet):
self.packet_sequence_number,
self.iso_sdu_length | self.packet_status_flag << 14,
]
return struct.pack(fmt, args) + self.iso_sdu_fragment
return struct.pack(fmt, *args) + self.iso_sdu_fragment
def __str__(self) -> str:
return (

View File

@@ -22,7 +22,7 @@ import dataclasses
import enum
import traceback
import warnings
from typing import Dict, List, Union, Set, TYPE_CHECKING
from typing import Dict, List, Union, Set, Any, TYPE_CHECKING
from . import at
from . import rfcomm
@@ -35,7 +35,11 @@ from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hci import (
HCI_Enhanced_Setup_Synchronous_Connection_Command,
CodingFormat,
CodecID,
)
from bumble.sdp import (
DataElement,
ServiceAttribute,
@@ -66,6 +70,7 @@ class HfpProtocolError(ProtocolError):
# Protocol Support
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class HfpProtocol:
dlc: rfcomm.DLC
@@ -842,19 +847,15 @@ class DefaultCodecParameters(enum.IntEnum):
@dataclasses.dataclass
class EscoParameters:
# Codec specific
transmit_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat
receive_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat
transmit_coding_format: CodingFormat
receive_coding_format: CodingFormat
packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType
retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
max_latency: int
# Common
input_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT
)
output_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT
)
input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
input_coded_data_size: int = 16
output_coded_data_size: int = 16
input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
@@ -880,26 +881,31 @@ class EscoParameters:
transmit_codec_frame_size: int = 60
receive_codec_frame_size: int = 60
def asdict(self) -> Dict[str, Any]:
# dataclasses.asdict() will recursively deep-copy the entire object,
# which is expensive and breaks CodingFormat object, so let it simply copy here.
return self.__dict__
_ESCO_PARAMETERS_CVSD_D0 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_D1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0xFFFF,
packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION,
)
_ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -912,8 +918,8 @@ _ESCO_PARAMETERS_CVSD_S1 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x0007,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -925,8 +931,8 @@ _ESCO_PARAMETERS_CVSD_S2 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000A,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -938,8 +944,8 @@ _ESCO_PARAMETERS_CVSD_S3 = EscoParameters(
)
_ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD,
transmit_coding_format=CodingFormat(CodecID.CVSD),
receive_coding_format=CodingFormat(CodecID.CVSD),
max_latency=0x000C,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -951,8 +957,8 @@ _ESCO_PARAMETERS_CVSD_S4 = EscoParameters(
)
_ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x0008,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -960,12 +966,14 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
_ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC,
transmit_coding_format=CodingFormat(CodecID.MSBC),
receive_coding_format=CodingFormat(CodecID.MSBC),
max_latency=0x000D,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
@@ -974,10 +982,12 @@ _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
)
ESCO_PERAMETERS = {
ESCO_PARAMETERS = {
DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0,
DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1,
DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1,

View File

@@ -21,7 +21,7 @@ import collections
import logging
import struct
from typing import Optional, TYPE_CHECKING, Dict, Callable, Awaitable, cast
from typing import Any, Awaitable, Callable, Dict, Optional, Union, cast, TYPE_CHECKING
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
@@ -32,8 +32,8 @@ from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_COMMAND_COMPLETE_EVENT,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
@@ -52,6 +52,7 @@ from .hci import (
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
@@ -75,7 +76,6 @@ from .core import (
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
InvalidStateError,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
@@ -124,7 +124,8 @@ class Connection:
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
acl_packet_queue: collections.deque[HCI_AclDataPacket]
hci_sink: TransportSink
hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
]
@@ -137,9 +138,8 @@ class Host(AbortableEventEmitter):
) -> None:
super().__init__()
self.hci_metadata = None
self.hci_metadata = {}
self.ready = False # True when we can accept incoming packets
self.reset_done = False
self.connections = {} # Connections, by connection handle
self.pending_command = None
self.pending_response = None
@@ -162,10 +162,7 @@ class Host(AbortableEventEmitter):
# Connect to the source and sink if specified
if controller_source:
controller_source.set_packet_sink(self)
self.hci_metadata = getattr(
controller_source, 'metadata', self.hci_metadata
)
self.set_packet_source(controller_source)
if controller_sink:
self.set_packet_sink(controller_sink)
@@ -200,17 +197,21 @@ class Host(AbortableEventEmitter):
self.ready = False
await self.flush()
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
# Instantiate and init a driver for the host if needed.
# NOTE: we don't keep a reference to the driver here, because we don't
# currently have a need for the driver later on. But if the driver interface
# evolves, it may be required, then, to store a reference to the driver in
# an object property.
reset_needed = True
if driver_factory is not None:
if driver := await driver_factory(self):
await driver.init_controller()
reset_needed = False
# Send a reset command unless a driver has already done so.
if reset_needed:
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True
@@ -243,7 +244,7 @@ class Host(AbortableEventEmitter):
# understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFF00000000000')
le_event_mask = bytes.fromhex('FFFFFFFF00000000')
await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
@@ -313,25 +314,28 @@ class Host(AbortableEventEmitter):
)
)
self.reset_done = True
@property
def controller(self) -> TransportSink:
def controller(self) -> Optional[TransportSink]:
return self.hci_sink
@controller.setter
def controller(self, controller):
def controller(self, controller) -> None:
self.set_packet_sink(controller)
if controller:
controller.set_packet_sink(self)
def set_packet_sink(self, sink: TransportSink) -> None:
def set_packet_sink(self, sink: Optional[TransportSink]) -> None:
self.hci_sink = sink
def set_packet_source(self, source: TransportSource) -> None:
source.set_packet_sink(self)
self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
def send_hci_packet(self, packet: HCI_Packet) -> None:
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
self.hci_sink.on_packet(bytes(packet))
if self.hci_sink:
self.hci_sink.on_packet(bytes(packet))
async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
@@ -495,6 +499,8 @@ class Host(AbortableEventEmitter):
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
@@ -515,6 +521,10 @@ class Host(AbortableEventEmitter):
# Experimental
self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
self.emit('l2cap_pdu', connection.handle, cid, pdu)
@@ -715,6 +725,32 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event)
def on_hci_le_advertising_set_terminated_event(self, event):
self.emit(
'advertising_set_termination',
event.status,
event.advertising_handle,
event.connection_handle,
)
def on_hci_le_cis_request_event(self, event):
self.emit(
'cis_request',
event.acl_connection_handle,
event.cis_connection_handle,
event.cig_id,
event.cis_id,
)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
'cis_establishment_failure', event.connection_handle, event.status
)
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections:
logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle')

View File

@@ -1926,7 +1926,7 @@ class ChannelManager:
supervision_timeout=request.timeout,
min_ce_length=0,
max_ce_length=0,
) # type: ignore[call-arg]
)
)
else:
self.send_control_frame(

496
bumble/profiles/bap.py Normal file
View File

@@ -0,0 +1,496 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Sequence
import dataclasses
import enum
import struct
import functools
from typing import Optional, List, Union
from bumble import hci
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class AudioLocation(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location'''
# fmt: off
NOT_ALLOWED = 0x00000000
FRONT_LEFT = 0x00000001
FRONT_RIGHT = 0x00000002
FRONT_CENTER = 0x00000004
LOW_FREQUENCY_EFFECTS_1 = 0x00000008
BACK_LEFT = 0x00000010
BACK_RIGHT = 0x00000020
FRONT_LEFT_OF_CENTER = 0x00000040
FRONT_RIGHT_OF_CENTER = 0x00000080
BACK_CENTER = 0x00000100
LOW_FREQUENCY_EFFECTS_2 = 0x00000200
SIDE_LEFT = 0x00000400
SIDE_RIGHT = 0x00000800
TOP_FRONT_LEFT = 0x00001000
TOP_FRONT_RIGHT = 0x00002000
TOP_FRONT_CENTER = 0x00004000
TOP_CENTER = 0x00008000
TOP_BACK_LEFT = 0x00010000
TOP_BACK_RIGHT = 0x00020000
TOP_SIDE_LEFT = 0x00040000
TOP_SIDE_RIGHT = 0x00080000
TOP_BACK_CENTER = 0x00100000
BOTTOM_FRONT_CENTER = 0x00200000
BOTTOM_FRONT_LEFT = 0x00400000
BOTTOM_FRONT_RIGHT = 0x00800000
FRONT_LEFT_WIDE = 0x01000000
FRONT_RIGHT_WIDE = 0x02000000
LEFT_SURROUND = 0x04000000
RIGHT_SURROUND = 0x08000000
class AudioInputType(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type'''
# fmt: off
UNSPECIFIED = 0x00
BLUETOOTH = 0x01
MICROPHONE = 0x02
ANALOG = 0x03
DIGITAL = 0x04
RADIO = 0x05
STREAMING = 0x06
AMBIENT = 0x07
class ContextType(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type'''
# fmt: off
PROHIBITED = 0x0000
CONVERSATIONAL = 0x0002
MEDIA = 0x0004
GAME = 0x0008
INSTRUCTIONAL = 0x0010
VOICE_ASSISTANTS = 0x0020
LIVE = 0x0040
SOUND_EFFECTS = 0x0080
NOTIFICATIONS = 0x0100
RINGTONE = 0x0200
ALERTS = 0x0400
EMERGENCY_ALARM = 0x0800
class SamplingFrequency(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
# fmt: off
FREQ_8000 = 0x01
FREQ_11025 = 0x02
FREQ_16000 = 0x03
FREQ_22050 = 0x04
FREQ_24000 = 0x05
FREQ_32000 = 0x06
FREQ_44100 = 0x07
FREQ_48000 = 0x08
FREQ_88200 = 0x09
FREQ_96000 = 0x0A
FREQ_176400 = 0x0B
FREQ_192000 = 0x0C
FREQ_384000 = 0x0D
# fmt: on
@classmethod
def from_hz(cls, frequency: int) -> SamplingFrequency:
return {
8000: SamplingFrequency.FREQ_8000,
11025: SamplingFrequency.FREQ_11025,
16000: SamplingFrequency.FREQ_16000,
22050: SamplingFrequency.FREQ_22050,
24000: SamplingFrequency.FREQ_24000,
32000: SamplingFrequency.FREQ_32000,
44100: SamplingFrequency.FREQ_44100,
48000: SamplingFrequency.FREQ_48000,
88200: SamplingFrequency.FREQ_88200,
96000: SamplingFrequency.FREQ_96000,
176400: SamplingFrequency.FREQ_176400,
192000: SamplingFrequency.FREQ_192000,
384000: SamplingFrequency.FREQ_384000,
}[frequency]
@property
def hz(self) -> int:
return {
SamplingFrequency.FREQ_8000: 8000,
SamplingFrequency.FREQ_11025: 11025,
SamplingFrequency.FREQ_16000: 16000,
SamplingFrequency.FREQ_22050: 22050,
SamplingFrequency.FREQ_24000: 24000,
SamplingFrequency.FREQ_32000: 32000,
SamplingFrequency.FREQ_44100: 44100,
SamplingFrequency.FREQ_48000: 48000,
SamplingFrequency.FREQ_88200: 88200,
SamplingFrequency.FREQ_96000: 96000,
SamplingFrequency.FREQ_176400: 176400,
SamplingFrequency.FREQ_192000: 192000,
SamplingFrequency.FREQ_384000: 384000,
}[self]
class SupportedSamplingFrequency(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency'''
# fmt: off
FREQ_8000 = 1 << (SamplingFrequency.FREQ_8000 - 1)
FREQ_11025 = 1 << (SamplingFrequency.FREQ_11025 - 1)
FREQ_16000 = 1 << (SamplingFrequency.FREQ_16000 - 1)
FREQ_22050 = 1 << (SamplingFrequency.FREQ_22050 - 1)
FREQ_24000 = 1 << (SamplingFrequency.FREQ_24000 - 1)
FREQ_32000 = 1 << (SamplingFrequency.FREQ_32000 - 1)
FREQ_44100 = 1 << (SamplingFrequency.FREQ_44100 - 1)
FREQ_48000 = 1 << (SamplingFrequency.FREQ_48000 - 1)
FREQ_88200 = 1 << (SamplingFrequency.FREQ_88200 - 1)
FREQ_96000 = 1 << (SamplingFrequency.FREQ_96000 - 1)
FREQ_176400 = 1 << (SamplingFrequency.FREQ_176400 - 1)
FREQ_192000 = 1 << (SamplingFrequency.FREQ_192000 - 1)
FREQ_384000 = 1 << (SamplingFrequency.FREQ_384000 - 1)
# fmt: on
@classmethod
def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency:
MAPPING = {
8000: SupportedSamplingFrequency.FREQ_8000,
11025: SupportedSamplingFrequency.FREQ_11025,
16000: SupportedSamplingFrequency.FREQ_16000,
22050: SupportedSamplingFrequency.FREQ_22050,
24000: SupportedSamplingFrequency.FREQ_24000,
32000: SupportedSamplingFrequency.FREQ_32000,
44100: SupportedSamplingFrequency.FREQ_44100,
48000: SupportedSamplingFrequency.FREQ_48000,
88200: SupportedSamplingFrequency.FREQ_88200,
96000: SupportedSamplingFrequency.FREQ_96000,
176400: SupportedSamplingFrequency.FREQ_176400,
192000: SupportedSamplingFrequency.FREQ_192000,
384000: SupportedSamplingFrequency.FREQ_384000,
}
return functools.reduce(
lambda x, y: x | MAPPING[y],
frequencies,
cls(0),
)
class FrameDuration(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration'''
# fmt: off
DURATION_7500_US = 0x00
DURATION_10000_US = 0x01
class SupportedFrameDuration(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
# fmt: off
DURATION_7500_US_SUPPORTED = 0b0001
DURATION_10000_US_SUPPORTED = 0b0010
DURATION_7500_US_PREFERRED = 0b0001
DURATION_10000_US_PREFERRED = 0b0010
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def bits_to_channel_counts(data: int) -> List[int]:
pos = 0
counts = []
while data != 0:
# Bit 0 = count 1
# Bit 1 = count 2, and so on
pos += 1
if data & 1:
counts.append(pos)
data >>= 1
return counts
def channel_counts_to_bits(counts: Sequence[int]) -> int:
return sum(set([1 << (count - 1) for count in counts]))
# -----------------------------------------------------------------------------
# Structures
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CodecSpecificCapabilities:
'''See:
* Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures
* Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements
'''
class Type(enum.IntEnum):
# fmt: off
SAMPLING_FREQUENCY = 0x01
FRAME_DURATION = 0x02
AUDIO_CHANNEL_COUNT = 0x03
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
supported_sampling_frequencies: SupportedSamplingFrequency
supported_frame_durations: SupportedFrameDuration
supported_audio_channel_counts: Sequence[int]
min_octets_per_codec_frame: int
max_octets_per_codec_frame: int
supported_max_codec_frames_per_sdu: int
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities:
offset = 0
# Allowed default values.
supported_audio_channel_counts = [1]
supported_max_codec_frames_per_sdu = 1
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
value = int.from_bytes(data[offset : offset + length - 1], 'little')
offset += length - 1
if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY:
supported_sampling_frequencies = SupportedSamplingFrequency(value)
elif type == CodecSpecificCapabilities.Type.FRAME_DURATION:
supported_frame_durations = SupportedFrameDuration(value)
elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT:
supported_audio_channel_counts = bits_to_channel_counts(value)
elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME:
min_octets_per_sample = value & 0xFFFF
max_octets_per_sample = value >> 16
elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU:
supported_max_codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
return CodecSpecificCapabilities(
supported_sampling_frequencies=supported_sampling_frequencies,
supported_frame_durations=supported_frame_durations,
supported_audio_channel_counts=supported_audio_channel_counts,
min_octets_per_codec_frame=min_octets_per_sample,
max_octets_per_codec_frame=max_octets_per_sample,
supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu,
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBHBBBBBBBBHHBBB',
3,
CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY,
self.supported_sampling_frequencies,
2,
CodecSpecificCapabilities.Type.FRAME_DURATION,
self.supported_frame_durations,
2,
CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT,
channel_counts_to_bits(self.supported_audio_channel_counts),
5,
CodecSpecificCapabilities.Type.OCTETS_PER_FRAME,
self.min_octets_per_codec_frame,
self.max_octets_per_codec_frame,
2,
CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU,
self.supported_max_codec_frames_per_sdu,
)
@dataclasses.dataclass
class PacRecord:
coding_format: hci.CodingFormat
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
# TODO: Parse Metadata
metadata: bytes = b''
@classmethod
def from_bytes(cls, data: bytes) -> PacRecord:
offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0)
codec_specific_capabilities_size = data[offset]
offset += 1
codec_specific_capabilities_bytes = data[
offset : offset + codec_specific_capabilities_size
]
offset += codec_specific_capabilities_size
metadata_size = data[offset]
metadata = data[offset : offset + metadata_size]
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC:
codec_specific_capabilities = codec_specific_capabilities_bytes
else:
codec_specific_capabilities = CodecSpecificCapabilities.from_bytes(
codec_specific_capabilities_bytes
)
return PacRecord(
coding_format=coding_format,
codec_specific_capabilities=codec_specific_capabilities,
metadata=metadata,
)
def __bytes__(self) -> bytes:
capabilities_bytes = bytes(self.codec_specific_capabilities)
return (
bytes(self.coding_format)
+ bytes([len(capabilities_bytes)])
+ capabilities_bytes
+ bytes([len(self.metadata)])
+ self.metadata
)
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesService(gatt.TemplateService):
UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE
sink_pac: Optional[gatt.Characteristic]
sink_audio_locations: Optional[gatt.Characteristic]
source_pac: Optional[gatt.Characteristic]
source_audio_locations: Optional[gatt.Characteristic]
available_audio_contexts: gatt.Characteristic
supported_audio_contexts: gatt.Characteristic
def __init__(
self,
supported_source_context: ContextType,
supported_sink_context: ContextType,
available_source_context: ContextType,
available_sink_context: ContextType,
sink_pac: Sequence[PacRecord] = [],
sink_audio_locations: Optional[AudioLocation] = None,
source_pac: Sequence[PacRecord] = [],
source_audio_locations: Optional[AudioLocation] = None,
) -> None:
characteristics = []
self.supported_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', supported_sink_context, supported_source_context),
)
characteristics.append(self.supported_audio_contexts)
self.available_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', available_sink_context, available_source_context),
)
characteristics.append(self.available_audio_contexts)
if sink_pac:
self.sink_pac = gatt.Characteristic(
uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)),
)
characteristics.append(self.sink_pac)
if sink_audio_locations is not None:
self.sink_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', sink_audio_locations),
)
characteristics.append(self.sink_audio_locations)
if source_pac:
self.source_pac = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)),
)
characteristics.append(self.source_pac)
if source_audio_locations is not None:
self.source_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', source_audio_locations),
)
characteristics.append(self.source_audio_locations)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt_client.CharacteristicProxy] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
source_pac: Optional[gatt_client.CharacteristicProxy] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
available_audio_contexts: gatt_client.CharacteristicProxy
supported_audio_contexts: gatt_client.CharacteristicProxy
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.available_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC
):
self.sink_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC
):
self.source_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
):
self.sink_audio_locations = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = characteristics[0]

View File

@@ -1090,7 +1090,7 @@ class Session:
# We can now encrypt the connection with the short term key, so that we can
# distribute the long term and/or other keys over an encrypted connection
self.manager.device.host.send_command_sync(
HCI_LE_Enable_Encryption_Command( # type: ignore[call-arg]
HCI_LE_Enable_Encryption_Command(
connection_handle=self.connection.handle,
random_number=bytes(8),
encrypted_diversifier=0,

View File

@@ -18,6 +18,7 @@
from contextlib import asynccontextmanager
import logging
import os
from typing import Optional
from .common import Transport, AsyncPipeSink, SnoopingTransport
from ..snoop import create_snooper
@@ -52,8 +53,16 @@ def _wrap_transport(transport: Transport) -> Transport:
async def open_transport(name: str) -> Transport:
"""
Open a transport by name.
The name must be <type>:<parameters>
Where <parameters> depend on the type (and may be empty for some types).
The name must be <type>:<metadata><parameters>
Where <parameters> depend on the type (and may be empty for some types), and
<metadata> is either omitted, or a ,-separated list of <key>=<value> pairs,
enclosed in [].
If there are not metadata or parameter, the : after the <type> may be omitted.
Examples:
* usb:0
* usb:[driver=rtk]0
* android-netsim
The supported types are:
* serial
* udp
@@ -71,87 +80,106 @@ async def open_transport(name: str) -> Transport:
* android-netsim
"""
return _wrap_transport(await _open_transport(name))
scheme, *tail = name.split(':', 1)
spec = tail[0] if tail else None
if spec:
# Metadata may precede the spec
if spec.startswith('['):
metadata_str, *tail = spec[1:].split(']')
spec = tail[0] if tail else None
metadata = dict([entry.split('=') for entry in metadata_str.split(',')])
else:
metadata = None
transport = await _open_transport(scheme, spec)
if metadata:
transport.source.metadata = { # type: ignore[attr-defined]
**metadata,
**getattr(transport.source, 'metadata', {}),
}
# pylint: disable=line-too-long
logger.debug(f'HCI metadata: {transport.source.metadata}') # type: ignore[attr-defined]
return _wrap_transport(transport)
# -----------------------------------------------------------------------------
async def _open_transport(name: str) -> Transport:
async def _open_transport(scheme: str, spec: Optional[str]) -> Transport:
# pylint: disable=import-outside-toplevel
# pylint: disable=too-many-return-statements
scheme, *spec = name.split(':', 1)
if scheme == 'serial' and spec:
from .serial import open_serial_transport
return await open_serial_transport(spec[0])
return await open_serial_transport(spec)
if scheme == 'udp' and spec:
from .udp import open_udp_transport
return await open_udp_transport(spec[0])
return await open_udp_transport(spec)
if scheme == 'tcp-client' and spec:
from .tcp_client import open_tcp_client_transport
return await open_tcp_client_transport(spec[0])
return await open_tcp_client_transport(spec)
if scheme == 'tcp-server' and spec:
from .tcp_server import open_tcp_server_transport
return await open_tcp_server_transport(spec[0])
return await open_tcp_server_transport(spec)
if scheme == 'ws-client' and spec:
from .ws_client import open_ws_client_transport
return await open_ws_client_transport(spec[0])
return await open_ws_client_transport(spec)
if scheme == 'ws-server' and spec:
from .ws_server import open_ws_server_transport
return await open_ws_server_transport(spec[0])
return await open_ws_server_transport(spec)
if scheme == 'pty':
from .pty import open_pty_transport
return await open_pty_transport(spec[0] if spec else None)
return await open_pty_transport(spec)
if scheme == 'file':
from .file import open_file_transport
assert spec is not None
return await open_file_transport(spec[0])
return await open_file_transport(spec)
if scheme == 'vhci':
from .vhci import open_vhci_transport
return await open_vhci_transport(spec[0] if spec else None)
return await open_vhci_transport(spec)
if scheme == 'hci-socket':
from .hci_socket import open_hci_socket_transport
return await open_hci_socket_transport(spec[0] if spec else None)
return await open_hci_socket_transport(spec)
if scheme == 'usb':
from .usb import open_usb_transport
assert spec is not None
return await open_usb_transport(spec[0])
assert spec
return await open_usb_transport(spec)
if scheme == 'pyusb':
from .pyusb import open_pyusb_transport
assert spec is not None
return await open_pyusb_transport(spec[0])
assert spec
return await open_pyusb_transport(spec)
if scheme == 'android-emulator':
from .android_emulator import open_android_emulator_transport
return await open_android_emulator_transport(spec[0] if spec else None)
return await open_android_emulator_transport(spec)
if scheme == 'android-netsim':
from .android_netsim import open_android_netsim_transport
return await open_android_netsim_transport(spec[0] if spec else None)
return await open_android_netsim_transport(spec)
raise ValueError('unknown transport scheme')

View File

@@ -69,7 +69,7 @@ async def open_android_emulator_transport(spec: Optional[str]) -> Transport:
mode = 'host'
server_host = 'localhost'
server_port = '8554'
if spec is not None:
if spec:
params = spec.split(',')
for param in params:
if param.startswith('mode='):

View File

@@ -21,7 +21,7 @@ import struct
import asyncio
import logging
import io
from typing import ContextManager, Tuple, Optional, Protocol, Dict
from typing import Any, ContextManager, Tuple, Optional, Protocol, Dict
from bumble import hci
from bumble.colors import color
@@ -42,6 +42,7 @@ HCI_PACKET_INFO: Dict[int, Tuple[int, int, str]] = {
hci.HCI_ACL_DATA_PACKET: (2, 2, 'H'),
hci.HCI_SYNCHRONOUS_DATA_PACKET: (1, 2, 'B'),
hci.HCI_EVENT_PACKET: (1, 1, 'B'),
hci.HCI_ISO_DATA_PACKET: (2, 2, 'H'),
}

View File

@@ -59,10 +59,7 @@ async def open_hci_socket_transport(spec: Optional[str]) -> Transport:
) from error
# Compute the adapter index
if spec is None:
adapter_index = 0
else:
adapter_index = int(spec)
adapter_index = int(spec) if spec else 0
# Bind the socket
# NOTE: since Python doesn't support binding with the required address format (yet),

View File

@@ -5,6 +5,15 @@ Some Bluetooth controllers require a driver to function properly.
This may include, for instance, loading a Firmware image or patch,
loading a configuration.
By default, drivers will be automatically probed to determine if they should be
used with particular HCI controller.
When the transport for an HCI controller is instantiated from a transport name,
a driver may also be forced by specifying ``driver=<driver-name>`` in the optional
metadata portion of the transport name. For example,
``usb:[driver=-rtk]0`` indicates that the ``rtk`` driver should be used with the
first USB device, even if a normal probe would not have selected it based on the
USB vendor ID and product ID.
Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.

View File

@@ -1,13 +1,16 @@
REALTEK DRIVER
==============
This driver supports loading firmware images and optional config data to
This driver supports loading firmware images and optional config data to
USB dongles with a Realtek chipset.
A number of USB dongles are supported, but likely not all.
When using a USB dongle, the USB product ID and manufacturer ID are used
When using a USB dongle, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=rtk`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=rtk]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for those files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_RTK_FIRMWARE_DIR`

View File

@@ -1,19 +1,19 @@
ANDROID REMOTE HCI APP
======================
This application allows using an android phone's built-in Bluetooth controller with
This application allows using an android phone's built-in Bluetooth controller with
a Bumble host stack running outside the phone (typically a development laptop or desktop).
The app runs an HCI proxy between a TCP socket on the "outside" and the Bluetooth HCI HAL
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
description of the Android Bluetooth HCI HAL).
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
packets coming from the controller are forwarded to the TCP socket.
Building
--------
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `RemoteHCI` top level directory.
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `extras/android/RemoteHCI` top level directory.
You can also build with Android Studio: open the `RemoteHCI` project. You can build and/or debug from there.
If the build succeeds, you can find the app APKs (debug and release) at:
@@ -25,9 +25,23 @@ If the build succeeds, you can find the app APKs (debug and release) at:
Running
-------
!!! note
In the following examples, it is assumed that shell commands are executed while in the
app's root directory, `extras/android/RemoteHCI`. If you are in a different directory,
adjust the relative paths accordingly.
### Preconditions
When the proxy starts (tapping the "Start" button in the app's main activity), it will try to
bind to the Bluetooth HAL. This requires disabling SELinux temporarily, and being the only HAL client.
When the proxy starts (tapping the "Start" button in the app's main activity, or running the proxy
from an `adb shell` command line), it will try to bind to the Bluetooth HAL.
This requires that there is no other HAL client, and requires certain privileges.
For running as a regular app, this requires disabling SELinux temporarily.
For running as a command-line executable, this just requires a root shell.
#### Root Shell
!!! tip "Restart `adb` as root"
```bash
$ adb root
```
#### Disabling SELinux
Binding to the Bluetooth HCI HAL requires certain SELinux permissions that can't simply be changed
@@ -56,8 +70,8 @@ development phone).
This state will also reset to the normal SELinux enforcement when you reboot.
#### Stopping the bluetooth process
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Android bluetooth stack process.
!!! tip "Checking if the Bluetooth process is running"
@@ -79,7 +93,33 @@ Airplane Mode, then rebooting. The bluetooth process should, in theory, not rest
$ adb shell cmd bluetooth_manager disable
```
### Starting the app
### Running as a command line app
You push the built APK to a temporary location on the phone's filesystem, then launch the command
line executable with an `adb shell` command.
!!! tip "Pushing the executable"
```bash
$ adb push app/build/outputs/apk/release/app-release-unsigned.apk /data/local/tmp/remotehci.apk
```
Do this every time you rebuild. Alternatively, you can push the `debug` APK instead:
```bash
$ adb push app/build/outputs/apk/debug/app-debug.apk /data/local/tmp/remotehci.apk
```
!!! tip "Start the proxy from the command line"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface"
```
This will run the proxy, listening on the default TCP port.
If you want a different port, pass it as a command line parameter
!!! tip "Start the proxy from the command line with a specific TCP port"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface 12345"
```
### Running as a normal app
You can start the app from the Android launcher, from Android Studio, or with `adb`
#### Launching from the launcher
@@ -103,11 +143,11 @@ automatically start the proxy, and/or set the port number.
#### Selecting a TCP port
The RemoteHCI app's main activity has a "TCP Port" setting where you can change the port on
which the proxy is accepting connections. If the default value isn't suitable, you can
which the proxy is accepting connections. If the default value isn't suitable, you can
change it there (you can also use the special value 0 to let the OS assign a port number for you).
### Connecting to the proxy
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
stack. This can be done over the phone's WiFi connection, or, alternatively, using an `adb`
TCP forward (which should be faster than over WiFi).
@@ -116,7 +156,7 @@ TCP forward (which should be faster than over WiFi).
```bash
$ adb forward tcp:<outside-port> tcp:<inside-port>
```
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
desktop machine, and <inside-port> is the TCP port selected in the app's user interface.
Those two ports may be the same, of course.
For example, with the default TCP port 9993:
@@ -125,7 +165,7 @@ TCP forward (which should be faster than over WiFi).
```
Once you've ensured that you can reach the proxy's TCP port on the phone, either directly or
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
``tcp-client:<host>:<port>`` syntax.
!!! example "Connecting a Bumble client"

5
examples/leaudio.json Normal file
View File

@@ -0,0 +1,5 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"advertising_interval": 100
}

107
examples/run_cig_setup.py Normal file
View File

@@ -0,0 +1,107 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import (
Device,
Connection,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_cig_setup.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_cig_setup.py device1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].cis_enabled = True
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC
)
cid_ids = [2, 3]
cis_handles = await devices[1].setup_cig(
cig_id=1,
cis_id=cid_ids,
sdu_interval=(10000, 0),
framing=0,
max_sdu=(120, 0),
retransmission_number=13,
max_transport_latency=(100, 0),
)
def on_cis_request(
connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
):
connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle))
devices[0].on('cis_request', on_cis_request)
cis_links = await devices[1].create_cis(
[(cis, connection.handle) for cis in cis_handles]
)
for cis_link in cis_links:
await cis_link.disconnect()
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,86 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import logging
import sys
import os
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Device, ScoLink
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_esco_connection.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_esco_connection.py classic1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
connections = await asyncio.gather(
devices[0].accept(devices[1].public_address),
devices[1].connect(devices[0].public_address, transport=BT_BR_EDR_TRANSPORT),
)
def on_sco(sco_link: ScoLink):
connections[0].abort_on('disconnection', sco_link.disconnect())
devices[0].once('sco_connection', on_sco)
await devices[0].send_command(
HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
)
)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,134 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
CodecSpecificCapabilities,
ContextType,
AudioLocation,
SupportedSamplingFrequency,
SupportedFrameDuration,
PacRecord,
PublishedAudioCapabilitiesService,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_cig_setup.py <config-file>' '<transport-spec-for-device>')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.cis_enabled = True
await device.power_on()
device.add_service(
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
available_source_context=ContextType.PROHIBITED,
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
sink_audio_locations=(
AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
),
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
)
)
advertising_data = bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
)
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -1,5 +1,5 @@
[versions]
agp = "8.3.0-alpha11"
agp = "8.2.0"
kotlin = "1.9.0"
core-ktx = "1.12.0"
junit = "4.13.2"

View File

@@ -0,0 +1,57 @@
package com.github.google.bumble.remotehci
import java.io.IOException
class CommandLineInterface {
companion object {
fun printUsage() {
System.out.println("usage: <launch-command> [-h|--help] [<tcp-port>]")
}
@JvmStatic fun main(args: Array<String>) {
System.out.println("Starting proxy")
var tcpPort = DEFAULT_TCP_PORT
if (args.isNotEmpty()) {
if (args[0] == "-h" || args[0] == "--help") {
printUsage()
return
}
try {
tcpPort = args[0].toInt()
} catch (error: NumberFormatException) {
System.out.println("ERROR: invalid TCP port argument")
printUsage()
return
}
}
try {
val hciProxy = HciProxy(tcpPort, object : HciProxy.Listener {
override fun onHostConnectionState(connected: Boolean) {
}
override fun onHciPacketCountChange(
commandPacketsReceived: Int,
aclPacketsReceived: Int,
scoPacketsReceived: Int,
eventPacketsSent: Int,
aclPacketsSent: Int,
scoPacketsSent: Int
) {
}
override fun onMessage(message: String?) {
System.out.println(message)
}
})
hciProxy.run()
} catch (error: IOException) {
System.err.println("Exception while running HCI Server: $error")
} catch (error: HciProxy.HalException) {
System.err.println("HAL exception: ${error.message}")
}
}
}
}

View File

@@ -1,5 +1,5 @@
[versions]
agp = "8.3.0-alpha11"
agp = "8.2.0"
kotlin = "1.8.10"
core-ktx = "1.9.0"
junit = "4.13.2"

8
rust/Cargo.lock generated
View File

@@ -1073,9 +1073,9 @@ checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "openssl"
version = "0.10.57"
version = "0.10.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c"
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
dependencies = [
"bitflags 2.4.0",
"cfg-if",
@@ -1105,9 +1105,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.92"
version = "0.9.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db7e971c2c2bba161b2d2fdf37080177eff520b3bc044787c7f1f5f9e78d869b"
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
dependencies = [
"cc",
"libc",

151
tests/bap_test.py Normal file
View File

@@ -0,0 +1,151 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import logging
from bumble import device
from bumble.hci import CodecID, CodingFormat
from bumble.profiles.bap import (
AudioLocation,
SupportedFrameDuration,
SupportedSamplingFrequency,
CodecSpecificCapabilities,
ContextType,
PacRecord,
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def test_codec_specific_capabilities() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
assert CodecSpecificCapabilities.from_bytes(bytes(cap)) == cap
# -----------------------------------------------------------------------------
def test_pac_record() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
pac_record = PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=cap,
metadata=b'',
)
assert PacRecord.from_bytes(bytes(pac_record)) == pac_record
# -----------------------------------------------------------------------------
def test_vendor_specific_pac_record() -> None:
# Vendor-Specific codec, Google, ID=0xFFFF. No capabilities and metadata.
RAW_DATA = bytes.fromhex('ffe000ffff0000')
assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():
devices = TwoDevices()
devices[0].add_service(
PublishedAudioCapabilitiesService(
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
supported_source_context=0,
available_source_context=0,
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
sink_audio_locations=AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT,
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
pacs_client = await peer.discover_service_and_create_proxy(
PublishedAudioCapabilitiesServiceProxy
)
# -----------------------------------------------------------------------------
async def run():
await test_pacs()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())

View File

@@ -20,8 +20,14 @@ import logging
import os
from types import LambdaType
import pytest
from unittest import mock
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import Connection, Device
from bumble.host import Host
from bumble.hci import (
@@ -30,6 +36,7 @@ from bumble.hci import (
HCI_CREATE_CONNECTION_COMMAND,
HCI_SUCCESS,
Address,
OwnAddressType,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
@@ -232,6 +239,172 @@ async def test_flush():
pass
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_legacy_advertising()
assert device.legacy_advertiser
# Stop advertising
await advertiser.stop()
assert not device.legacy_advertiser
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_legacy_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
advertiser = await device.start_legacy_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.start_legacy_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_legacy_advertising.assert_called_with(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_legacy_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_extended_advertising()
assert device.extended_advertisers
# Stop advertising
await advertiser.stop()
assert not device.extended_advertisers
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(
own_address_type=own_address_type
)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
device = Device(host=Host(None, None))

View File

@@ -24,12 +24,15 @@ from bumble.hci import (
HCI_RESET_COMMAND,
HCI_SUCCESS,
Address,
CodingFormat,
CodecID,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_CustomPacket,
HCI_Disconnect_Command,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Add_Device_To_Filter_Accept_List_Command,
HCI_LE_Advertising_Report_Event,
HCI_LE_Channel_Selection_Algorithm_Event,
@@ -51,6 +54,7 @@ from bumble.hci import (
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
HCI_LE_Set_Scan_Parameters_Command,
HCI_LE_Setup_ISO_Data_Path_Command,
HCI_Number_Of_Completed_Packets_Event,
HCI_Packet,
HCI_PIN_Code_Request_Reply_Command,
@@ -442,6 +446,28 @@ def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Setup_ISO_Data_Path_Command():
command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000'))
assert command.connection_handle == 0x0060
assert command.data_path_direction == 0x00
assert command.data_path_id == 0x01
assert command.codec_id == CodingFormat(CodecID.TRANSPARENT)
assert command.controller_delay == 0
assert command.codec_configuration == b''
command = HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=0x0060,
data_path_direction=0x00,
data_path_id=0x01,
codec_id=CodingFormat(CodecID.TRANSPARENT),
controller_delay=0x00,
codec_configuration=b'',
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
@@ -461,6 +487,29 @@ def test_custom():
assert packet.payload == data
# -----------------------------------------------------------------------------
def test_iso_data_packet():
data = bytes.fromhex(
'05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697'
'52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
packet = HCI_IsoDataPacket.from_bytes(data)
assert packet.connection_handle == 0x0061
assert packet.packet_status_flag == 0
assert packet.pb_flag == 0x02
assert packet.ts_flag == 0x01
assert packet.data_total_length == 68
assert packet.time_stamp == 2716911914
assert packet.packet_sequence_number == 147
assert packet.iso_sdu_length == 60
assert packet.iso_sdu_fragment == bytes.fromhex(
'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3'
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert packet.to_bytes() == data
# -----------------------------------------------------------------------------
def run_test_events():
test_HCI_Event()
@@ -499,6 +548,7 @@ def run_test_commands():
test_HCI_LE_Set_Default_PHY_Command()
test_HCI_LE_Set_Extended_Scan_Parameters_Command()
test_HCI_LE_Set_Extended_Advertising_Enable_Command()
test_HCI_LE_Setup_ISO_Data_Path_Command()
# -----------------------------------------------------------------------------
@@ -507,3 +557,4 @@ if __name__ == '__main__':
run_test_commands()
test_address()
test_custom()
test_iso_data_packet()