Compare commits

..

2 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 48685c8587 improve vendor event support 2024-11-23 08:55:50 -08:00
Gilles Boccon-Gibod 8d908288c8 Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-15 10:19:20 -08:00
38 changed files with 1720 additions and 999 deletions
+13 -22
View File
@@ -60,7 +60,7 @@ AURACAST_DEFAULT_ATT_MTU = 256
class BroadcastScanner(pyee.EventEmitter):
@dataclasses.dataclass
class Broadcast(pyee.EventEmitter):
name: str | None
name: str
sync: bumble.device.PeriodicAdvertisingSync
rssi: int = 0
public_broadcast_announcement: Optional[
@@ -135,8 +135,7 @@ class BroadcastScanner(pyee.EventEmitter):
self.sync.advertiser_address,
color(self.sync.state.name, 'green'),
)
if self.name is not None:
print(f' {color("Name", "cyan")}: {self.name}')
print(f' {color("Name", "cyan")}: {self.name}')
if self.appearance:
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
print(f' {color("RSSI", "cyan")}: {self.rssi}')
@@ -175,7 +174,7 @@ class BroadcastScanner(pyee.EventEmitter):
print(color(' Codec ID:', 'yellow'))
print(
color(' Coding Format: ', 'green'),
subgroup.codec_id.codec_id.name,
subgroup.codec_id.coding_format.name,
)
print(
color(' Company ID: ', 'green'),
@@ -275,24 +274,13 @@ class BroadcastScanner(pyee.EventEmitter):
await self.device.stop_scanning()
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if not (
ads := advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
if (
broadcast_name := advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
) or not (
any(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
)
):
) is None:
return
broadcast_name = advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
assert isinstance(broadcast_name, str) or broadcast_name is None
assert isinstance(broadcast_name, str)
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
@@ -303,7 +291,7 @@ class BroadcastScanner(pyee.EventEmitter):
)
async def on_new_broadcast(
self, name: str | None, advertisement: bumble.device.Advertisement
self, name: str, advertisement: bumble.device.Advertisement
) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address,
@@ -311,7 +299,10 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates,
)
broadcast = self.Broadcast(name, periodic_advertising_sync)
broadcast = self.Broadcast(
name,
periodic_advertising_sync,
)
broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))
+2 -1
View File
@@ -199,7 +199,7 @@ def log_stats(title, stats, precision=2):
stats_min = min(stats)
stats_max = max(stats)
stats_avg = statistics.mean(stats)
stats_stdev = statistics.stdev(stats)
stats_stdev = statistics.stdev(stats) if len(stats) >= 2 else 0
logging.info(
color(
(
@@ -468,6 +468,7 @@ class Ping:
for run in range(self.repeat + 1):
self.done.clear()
self.ping_times = []
if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
+1 -1
View File
@@ -83,7 +83,7 @@ async def async_main():
return_parameters=bytes([hci.HCI_SUCCESS]),
)
# Return a packet with 'respond to sender' set to True
return (bytes(response), True)
return (response.to_bytes(), True)
return None
+6 -24
View File
@@ -486,12 +486,7 @@ class Speaker:
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
pcm = decode(
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
@@ -500,17 +495,11 @@ class Speaker:
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
codec_config = ase.codec_specific_configuration
if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == ascs.AudioRole.SOURCE:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or ase.cis_link is None
or codec_config.octets_per_codec_frame is None
or codec_config.frame_duration is None
or codec_config.codec_frames_per_sdu is None
):
return
ase.cis_link.abort_on(
'disconnection',
lc3_source_task(
@@ -525,17 +514,10 @@ class Speaker:
),
)
else:
if not ase.cis_link:
return
ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.sampling_frequency is None
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if ase.role == ascs.AudioRole.SOURCE:
setup_encoders(
codec_config.sampling_frequency.hz,
+6 -6
View File
@@ -144,18 +144,18 @@ class Printer:
help='Format of the input file',
)
@click.option(
'--vendors',
'--vendor',
type=click.Choice(['android', 'zephyr']),
multiple=True,
help='Support vendor-specific commands (list one or more)',
)
@click.argument('filename')
# pylint: disable=redefined-builtin
def main(format, vendors, filename):
for vendor in vendors:
if vendor == 'android':
def main(format, vendor, filename):
for vendor_name in vendor:
if vendor_name == 'android':
import bumble.vendor.android.hci
elif vendor == 'zephyr':
elif vendor_name == 'zephyr':
import bumble.vendor.zephyr.hci
input = open(filename, 'rb')
@@ -180,7 +180,7 @@ def main(format, vendors, filename):
else:
printer.print(color("[TRUNCATED]", "red"))
except Exception as error:
logger.exception()
logger.exception('')
print(color(f'!!! {error}', 'red'))
+4 -1
View File
@@ -291,6 +291,9 @@ class ATT_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property
def is_command(self):
return ((self.op_code >> 6) & 1) == 1
@@ -300,7 +303,7 @@ class ATT_PDU:
return ((self.op_code >> 7) & 1) == 1
def __bytes__(self):
return self.pdu
return self.to_bytes()
def __str__(self):
result = color(self.name, 'yellow')
+2 -58
View File
@@ -314,7 +314,7 @@ class Controller:
f'{color("CONTROLLER -> HOST", "green")}: {packet}'
)
if self.host:
self.host.on_packet(bytes(packet))
self.host.on_packet(packet.to_bytes())
# This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self):
@@ -1192,7 +1192,7 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
'''
bd_addr = (
bytes(self._public_address)
self._public_address.to_bytes()
if self._public_address is not None
else bytes(6)
)
@@ -1543,41 +1543,6 @@ class Controller:
}
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_set_random_address_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS, 0])
def on_hci_le_set_extended_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_scan_response_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
@@ -1592,27 +1557,6 @@ class Controller:
'''
return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_set_periodic_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command
+494 -454
View File
File diff suppressed because it is too large Load Diff
+2
View File
@@ -20,6 +20,8 @@ Common types for drivers.
# -----------------------------------------------------------------------------
import abc
from bumble import core
# -----------------------------------------------------------------------------
# Classes
+594 -25
View File
@@ -11,18 +11,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Support for Intel USB controllers.
Loosely based on the Fuchsia OS implementation.
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging
import os
import pathlib
import platform
import struct
from typing import Any, Deque, Optional, TYPE_CHECKING
from bumble import core
from bumble.drivers import common
from bumble.hci import (
hci_vendor_command_op_code, # type: ignore
HCI_Command,
HCI_Reset_Command,
)
from bumble import hci
from bumble import utils
if TYPE_CHECKING:
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
@@ -34,39 +49,328 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
INTEL_USB_PRODUCTS = {
# Intel AX210
(0x8087, 0x0032),
# Intel BE200
(0x8087, 0x0036),
(0x8087, 0x0032), # AX210
(0x8087, 0x0036), # BE200
}
INTEL_FW_IMAGE_NAMES = [
"ibt-0040-0041",
"ibt-0040-1020",
"ibt-0040-1050",
"ibt-0040-2120",
"ibt-0040-4150",
"ibt-0041-0041",
"ibt-0180-0041",
"ibt-0180-1050",
"ibt-0180-4150",
"ibt-0291-0291",
"ibt-1040-0041",
"ibt-1040-1020",
"ibt-1040-1050",
"ibt-1040-2120",
"ibt-1040-4150",
]
INTEL_FIRMWARE_DIR_ENV = "BUMBLE_INTEL_FIRMWARE_DIR"
INTEL_LINUX_FIRMWARE_DIR = "/lib/firmware/intel"
_MAX_FRAGMENT_SIZE = 252
_POST_RESET_DELAY = 0.2
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_INTEL_DDC_CONFIG_WRITE_COMMAND = hci_vendor_command_op_code(0xFC8B) # type: ignore
HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD = [0x03, 0xE4, 0x02, 0x00]
HCI_INTEL_WRITE_DEVICE_CONFIG_COMMAND = hci.hci_vendor_command_op_code(0x008B)
HCI_INTEL_READ_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x0005)
HCI_INTEL_RESET_COMMAND = hci.hci_vendor_command_op_code(0x0001)
HCI_INTEL_SECURE_SEND_COMMAND = hci.hci_vendor_command_op_code(0x0009)
HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
HCI_Command.register_commands(globals())
hci.HCI_Command.register_commands(globals())
@HCI_Command.command( # type: ignore
fields=[("params", "*")],
@hci.HCI_Command.command(
fields=[
("param0", 1),
],
return_parameters_fields=[
("params", "*"),
("status", hci.STATUS_SPEC),
("tlv", "*"),
],
)
class Hci_Intel_DDC_Config_Write_Command(HCI_Command):
class HCI_Intel_Read_Version_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data_type", 1), ("data", "*")],
return_parameters_fields=[
("status", 1),
],
)
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[
("reset_type", 1),
("patch_enable", 1),
("ddc_reload", 1),
("boot_option", 1),
("boot_address", 4),
],
return_parameters_fields=[
("data", "*"),
],
)
class HCI_Intel_Reset_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data", "*")],
return_parameters_fields=[
("status", hci.STATUS_SPEC),
("params", "*"),
],
)
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
pass
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def intel_firmware_dir() -> pathlib.Path:
"""
Returns:
A path to a subdir of the project data dir for Intel firmware.
The directory is created if it doesn't exist.
"""
from bumble.drivers import project_data_dir
p = project_data_dir() / "firmware" / "intel"
p.mkdir(parents=True, exist_ok=True)
return p
def _find_binary_path(file_name: str) -> pathlib.Path | None:
# First check if an environment variable is set
if INTEL_FIRMWARE_DIR_ENV in os.environ:
if (
path := pathlib.Path(os.environ[INTEL_FIRMWARE_DIR_ENV]) / file_name
).is_file():
logger.debug(f"{file_name} found in env dir")
return path
# When the environment variable is set, don't look elsewhere
return None
# Then, look where the firmware download tool writes by default
if (path := intel_firmware_dir() / file_name).is_file():
logger.debug(f"{file_name} found in project data dir")
return path
# Then, look in the package's driver directory
if (path := pathlib.Path(__file__).parent / "intel_fw" / file_name).is_file():
logger.debug(f"{file_name} found in package dir")
return path
# On Linux, check the system's FW directory
if (
platform.system() == "Linux"
and (path := pathlib.Path(INTEL_LINUX_FIRMWARE_DIR) / file_name).is_file()
):
logger.debug(f"{file_name} found in Linux system FW dir")
return path
# Finally look in the current directory
if (path := pathlib.Path.cwd() / file_name).is_file():
logger.debug(f"{file_name} found in CWD")
return path
return None
def _parse_tlv(data: bytes) -> list[tuple[ValueType, Any]]:
result: list[tuple[ValueType, Any]] = []
while len(data) >= 2:
value_type = ValueType(data[0])
value_length = data[1]
value = data[2 : 2 + value_length]
typed_value: Any
if value_type == ValueType.END:
break
if value_type in (ValueType.CNVI, ValueType.CNVR):
(v,) = struct.unpack("<I", value)
typed_value = (
(((v >> 0) & 0xF) << 12)
| (((v >> 4) & 0xF) << 0)
| (((v >> 8) & 0xF) << 4)
| (((v >> 24) & 0xF) << 8)
)
elif value_type == ValueType.HARDWARE_INFO:
(v,) = struct.unpack("<I", value)
typed_value = HardwareInfo(
HardwarePlatform((v >> 8) & 0xFF), HardwareVariant((v >> 16) & 0x3F)
)
elif value_type in (
ValueType.USB_VENDOR_ID,
ValueType.USB_PRODUCT_ID,
ValueType.DEVICE_REVISION,
):
(typed_value,) = struct.unpack("<H", value)
elif value_type == ValueType.CURRENT_MODE_OF_OPERATION:
typed_value = ModeOfOperation(value[0])
elif value_type in (
ValueType.BUILD_TYPE,
ValueType.BUILD_NUMBER,
ValueType.SECURE_BOOT,
ValueType.OTP_LOCK,
ValueType.API_LOCK,
ValueType.DEBUG_LOCK,
ValueType.SECURE_BOOT_ENGINE_TYPE,
):
typed_value = value[0]
elif value_type == ValueType.TIMESTAMP:
typed_value = Timestamp(value[0], value[1])
elif value_type == ValueType.FIRMWARE_BUILD:
typed_value = FirmwareBuild(value[0], Timestamp(value[1], value[2]))
elif value_type == ValueType.BLUETOOTH_ADDRESS:
typed_value = hci.Address(
value, address_type=hci.Address.PUBLIC_DEVICE_ADDRESS
)
else:
typed_value = value
result.append((value_type, typed_value))
data = data[2 + value_length :]
return result
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class DriverError(core.BaseBumbleError):
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
def __str__(self) -> str:
return f"IntelDriverError({self.message})"
class ValueType(utils.OpenIntEnum):
END = 0x00
CNVI = 0x10
CNVR = 0x11
HARDWARE_INFO = 0x12
DEVICE_REVISION = 0x16
CURRENT_MODE_OF_OPERATION = 0x1C
USB_VENDOR_ID = 0x17
USB_PRODUCT_ID = 0x18
TIMESTAMP = 0x1D
BUILD_TYPE = 0x1E
BUILD_NUMBER = 0x1F
SECURE_BOOT = 0x28
OTP_LOCK = 0x2A
API_LOCK = 0x2B
DEBUG_LOCK = 0x2C
FIRMWARE_BUILD = 0x2D
SECURE_BOOT_ENGINE_TYPE = 0x2F
BLUETOOTH_ADDRESS = 0x30
class HardwarePlatform(utils.OpenIntEnum):
INTEL_37 = 0x37
class HardwareVariant(utils.OpenIntEnum):
# This is a just a partial list.
# Add other constants here as new hardware is encountered and tested.
TYPHOON_PEAK = 0x17
GALE_PEAK = 0x1C
@dataclasses.dataclass
class HardwareInfo:
platform: HardwarePlatform
variant: HardwareVariant
@dataclasses.dataclass
class Timestamp:
week: int
year: int
@dataclasses.dataclass
class FirmwareBuild:
build_number: int
timestamp: Timestamp
class ModeOfOperation(utils.OpenIntEnum):
BOOTLOADER = 0x01
INTERMEDIATE = 0x02
OPERATIONAL = 0x03
class SecureBootEngineType(utils.OpenIntEnum):
RSA = 0x00
ECDSA = 0x01
@dataclasses.dataclass
class BootParams:
css_header_offset: int
css_header_size: int
pki_offset: int
pki_size: int
sig_offset: int
sig_size: int
write_offset: int
_BOOT_PARAMS = {
SecureBootEngineType.RSA: BootParams(0, 128, 128, 256, 388, 256, 964),
SecureBootEngineType.ECDSA: BootParams(644, 128, 772, 96, 868, 96, 964),
}
class Driver(common.Driver):
def __init__(self, host):
def __init__(self, host: Host) -> None:
self.host = host
self.max_in_flight_firmware_load_commands = 1
self.pending_firmware_load_commands: Deque[hci.HCI_Command] = (
collections.deque()
)
self.can_send_firmware_load_command = asyncio.Event()
self.can_send_firmware_load_command.set()
self.firmware_load_complete = asyncio.Event()
self.reset_complete = asyncio.Event()
# Parse configuration options from the driver name.
self.ddc_addon: Optional[bytes] = None
self.ddc_override: Optional[bytes] = None
driver = host.hci_metadata.get("driver")
if driver is not None and driver.startswith("intel/"):
for key, value in [
key_eq_value.split(":") for key_eq_value in driver[6:].split("+")
]:
if key == "ddc_addon":
self.ddc_addon = bytes.fromhex(value)
elif key == "ddc_override":
self.ddc_override = bytes.fromhex(value)
@staticmethod
def check(host):
def check(host: Host) -> bool:
driver = host.hci_metadata.get("driver")
if driver == "intel":
if driver == "intel" or driver is not None and driver.startswith("intel/"):
return True
vendor_id = host.hci_metadata.get("vendor_id")
@@ -85,18 +389,283 @@ class Driver(common.Driver):
return True
@classmethod
async def for_host(cls, host, force=False): # type: ignore
async def for_host(cls, host: Host, force: bool = False):
# Only instantiate this driver if explicitly selected
if not force and not cls.check(host):
return None
return cls(host)
async def init_controller(self):
def on_packet(self, packet: bytes) -> None:
"""Handler for event packets that are received from an ACL channel"""
event = hci.HCI_Event.from_bytes(packet)
if not isinstance(event, hci.HCI_Command_Complete_Event):
self.host.on_hci_event_packet(event)
return
if not event.return_parameters == hci.HCI_SUCCESS:
raise DriverError("HCI_Command_Complete_Event error")
if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets:
logger.debug(
"max_in_flight_firmware_load_commands update: "
f"{event.num_hci_command_packets}"
)
self.max_in_flight_firmware_load_commands = event.num_hci_command_packets
logger.debug(f"event: {event}")
self.pending_firmware_load_commands.popleft()
in_flight = len(self.pending_firmware_load_commands)
logger.debug(f"event received, {in_flight} still in flight")
if in_flight < self.max_in_flight_firmware_load_commands:
self.can_send_firmware_load_command.set()
async def send_firmware_load_command(self, command: hci.HCI_Command) -> None:
# Wait until we can send.
await self.can_send_firmware_load_command.wait()
# Send the command and adjust counters.
self.host.send_hci_packet(command)
self.pending_firmware_load_commands.append(command)
in_flight = len(self.pending_firmware_load_commands)
if in_flight >= self.max_in_flight_firmware_load_commands:
logger.debug(f"max commands in flight reached [{in_flight}]")
self.can_send_firmware_load_command.clear()
async def send_firmware_data(self, data_type: int, data: bytes) -> None:
while data:
fragment_size = min(len(data), _MAX_FRAGMENT_SIZE)
fragment = data[:fragment_size]
data = data[fragment_size:]
await self.send_firmware_load_command(
Hci_Intel_Secure_Send_Command(data_type=data_type, data=fragment)
)
async def load_firmware(self) -> None:
self.host.ready = True
await self.host.send_command(HCI_Reset_Command(), check_result=True)
await self.host.send_command(
Hci_Intel_DDC_Config_Write_Command(
params=HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD
device_info = await self.read_device_info()
logger.debug(
"device info: \n%s",
"\n".join(
[
f" {value_type.name}: {value}"
for value_type, value in device_info.items()
]
),
)
# Check if the firmware is already loaded.
if (
device_info.get(ValueType.CURRENT_MODE_OF_OPERATION)
== ModeOfOperation.OPERATIONAL
):
logger.debug("firmware already loaded")
return
# We only support some platforms and variants.
hardware_info = device_info.get(ValueType.HARDWARE_INFO)
if hardware_info is None:
raise DriverError("hardware info missing")
if hardware_info.platform != HardwarePlatform.INTEL_37:
raise DriverError("hardware platform not supported")
if hardware_info.variant not in (
HardwareVariant.TYPHOON_PEAK,
HardwareVariant.GALE_PEAK,
):
raise DriverError("hardware variant not supported")
# Compute the firmware name.
if ValueType.CNVI not in device_info or ValueType.CNVR not in device_info:
raise DriverError("insufficient device info, missing CNVI or CNVR")
firmware_base_name = (
"ibt-"
f"{device_info[ValueType.CNVI]:04X}-"
f"{device_info[ValueType.CNVR]:04X}"
)
logger.debug(f"FW base name: {firmware_base_name}")
firmware_name = f"{firmware_base_name}.sfi"
firmware_path = _find_binary_path(firmware_name)
if not firmware_path:
logger.warning(f"Firmware file {firmware_name} not found")
logger.warning("See https://google.github.io/bumble/drivers/intel.html")
return None
logger.debug(f"loading firmware from {firmware_path}")
firmware_image = firmware_path.read_bytes()
engine_type = device_info.get(ValueType.SECURE_BOOT_ENGINE_TYPE)
if engine_type is None:
raise DriverError("secure boot engine type missing")
if engine_type not in _BOOT_PARAMS:
raise DriverError("secure boot engine type not supported")
boot_params = _BOOT_PARAMS[engine_type]
if len(firmware_image) < boot_params.write_offset:
raise DriverError("firmware image too small")
# Register to receive vendor events.
def on_vendor_event(event: hci.HCI_Vendor_Event):
logger.debug(f"vendor event: {event}")
event_type = event.parameters[0]
if event_type == 0x02:
# Boot event
logger.debug("boot complete")
self.reset_complete.set()
elif event_type == 0x06:
# Firmware load event
logger.debug("download complete")
self.firmware_load_complete.set()
else:
logger.debug(f"ignoring vendor event type {event_type}")
self.host.on("vendor_event", on_vendor_event)
# We need to temporarily intercept packets from the controller,
# because they are formatted as HCI event packets but are received
# on the ACL channel, so the host parser would get confused.
saved_on_packet = self.host.on_packet
self.host.on_packet = self.on_packet # type: ignore
self.firmware_load_complete.clear()
# Send the CSS header
data = firmware_image[
boot_params.css_header_offset : boot_params.css_header_offset
+ boot_params.css_header_size
]
await self.send_firmware_data(0x00, data)
# Send the PKI header
data = firmware_image[
boot_params.pki_offset : boot_params.pki_offset + boot_params.pki_size
]
await self.send_firmware_data(0x03, data)
# Send the Signature header
data = firmware_image[
boot_params.sig_offset : boot_params.sig_offset + boot_params.sig_size
]
await self.send_firmware_data(0x02, data)
# Send the rest of the image.
# The payload consists of command objects, which are sent when they add up
# to a multiple of 4 bytes.
boot_address = 0
offset = boot_params.write_offset
fragment_size = 0
while offset + 3 < len(firmware_image):
(command_opcode,) = struct.unpack_from(
"<H", firmware_image, offset + fragment_size
)
command_size = firmware_image[offset + fragment_size + 2]
if command_opcode == HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND:
(boot_address,) = struct.unpack_from(
"<I", firmware_image, offset + fragment_size + 3
)
logger.debug(
"found HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND, "
f"boot_address={boot_address}"
)
fragment_size += 3 + command_size
if fragment_size % 4 == 0:
await self.send_firmware_data(
0x01, firmware_image[offset : offset + fragment_size]
)
logger.debug(f"sent {fragment_size} bytes")
offset += fragment_size
fragment_size = 0
# Wait for the firmware loading to be complete.
logger.debug("waiting for firmware to be loaded")
await self.firmware_load_complete.wait()
logger.debug("firmware loaded")
# Restore the original packet handler.
self.host.on_packet = saved_on_packet # type: ignore
# Reset
self.reset_complete.clear()
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x00,
patch_enable=0x01,
ddc_reload=0x00,
boot_option=0x01,
boot_address=boot_address,
)
)
logger.debug("waiting for reset completion")
await self.reset_complete.wait()
logger.debug("reset complete")
# Load the device config if there is one.
if self.ddc_override:
logger.debug("loading overridden DDC")
await self.load_device_config(self.ddc_override)
else:
ddc_name = f"{firmware_base_name}.ddc"
ddc_path = _find_binary_path(ddc_name)
if ddc_path:
logger.debug(f"loading DDC from {ddc_path}")
ddc_data = ddc_path.read_bytes()
await self.load_device_config(ddc_data)
if self.ddc_addon:
logger.debug("loading DDC addon")
await self.load_device_config(self.ddc_addon)
async def load_device_config(self, ddc_data: bytes) -> None:
while ddc_data:
ddc_len = 1 + ddc_data[0]
ddc_payload = ddc_data[:ddc_len]
await self.host.send_command(
Hci_Intel_Write_Device_Config_Command(data=ddc_payload)
)
ddc_data = ddc_data[ddc_len:]
async def reboot_bootloader(self) -> None:
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x01,
patch_enable=0x01,
ddc_reload=0x01,
boot_option=0x00,
boot_address=0,
)
)
await asyncio.sleep(_POST_RESET_DELAY)
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response = await self.host.send_command(hci.HCI_Reset_Command())
if not (
isinstance(response, hci.HCI_Command_Complete_Event)
and response.return_parameters
in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS)
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
# HCI_UNKNOWN_HCI_COMMAND_ERROR is the expected response. Anything
# else is a failure.
logger.warning(f"unexpected response: {response}")
raise DriverError("unexpected HCI response")
# Read the firmware version.
response = await self.host.send_command(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if not isinstance(response, hci.HCI_Command_Complete_Event):
raise DriverError("unexpected HCI response")
if response.return_parameters.status != 0: # type: ignore
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
return dict(tlvs)
async def init_controller(self):
await self.load_firmware()
+1 -1
View File
@@ -410,7 +410,7 @@ class IncludedServiceDeclaration(Attribute):
def __init__(self, service: Service) -> None:
declaration_bytes = struct.pack(
'<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes()
)
super().__init__(
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes
+3 -3
View File
@@ -292,7 +292,7 @@ class Client:
logger.debug(
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
)
self.send_gatt_pdu(bytes(command))
self.send_gatt_pdu(command.to_bytes())
async def send_request(self, request: ATT_PDU):
logger.debug(
@@ -310,7 +310,7 @@ class Client:
self.pending_request = request
try:
self.send_gatt_pdu(bytes(request))
self.send_gatt_pdu(request.to_bytes())
response = await asyncio.wait_for(
self.pending_response, GATT_REQUEST_TIMEOUT
)
@@ -328,7 +328,7 @@ class Client:
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
f'{confirmation}'
)
self.send_gatt_pdu(bytes(confirmation))
self.send_gatt_pdu(confirmation.to_bytes())
async def request_mtu(self, mtu: int) -> int:
# Check the range
+2 -2
View File
@@ -353,7 +353,7 @@ class Server(EventEmitter):
logger.debug(
f'GATT Response from server: [0x{connection.handle:04X}] {response}'
)
self.send_gatt_pdu(connection.handle, bytes(response))
self.send_gatt_pdu(connection.handle, response.to_bytes())
async def notify_subscriber(
self,
@@ -450,7 +450,7 @@ class Server(EventEmitter):
)
try:
self.send_gatt_pdu(connection.handle, bytes(indication))
self.send_gatt_pdu(connection.handle, indication.to_bytes())
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red'))
+79 -116
View File
@@ -915,8 +915,6 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_READ_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+3),
HCI_WRITE_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+4),
HCI_SET_AFH_HOST_CHANNEL_CLASSIFICATION_COMMAND : 1 << (12*8+1),
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+2),
HCI_LE_CS_WRITE_CACHED_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+3),
HCI_READ_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+4),
HCI_WRITE_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+5),
HCI_READ_INQUIRY_MODE_COMMAND : 1 << (12*8+6),
@@ -942,8 +940,6 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (16*8+3),
HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+4),
HCI_REJECT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+5),
HCI_LE_CS_CREATE_CONFIG_COMMAND : 1 << (16*8+6),
HCI_LE_CS_REMOVE_CONFIG_COMMAND : 1 << (16*8+7),
HCI_READ_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+0),
HCI_WRITE_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+1),
HCI_REFRESH_ENCRYPTION_KEY_COMMAND : 1 << (17*8+2),
@@ -967,20 +963,13 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SEND_KEYPRESS_NOTIFICATION_COMMAND : 1 << (20*8+2),
HCI_IO_CAPABILITY_REQUEST_NEGATIVE_REPLY_COMMAND : 1 << (20*8+3),
HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4),
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+5),
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+6),
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES : 1 << (20*8+7),
HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2),
HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0),
HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1),
HCI_READ_DATA_BLOCK_SIZE_COMMAND : 1 << (23*8+2),
HCI_LE_CS_TEST_COMMAND : 1 << (23*8+3),
HCI_LE_CS_TEST_END_COMMAND : 1 << (23*8+4),
HCI_READ_ENHANCED_TRANSMIT_POWER_LEVEL_COMMAND : 1 << (24*8+0),
HCI_LE_CS_SECURITY_ENABLE_COMMAND : 1 << (24*8+1),
HCI_READ_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+5),
HCI_WRITE_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+6),
HCI_LE_CS_SET_DEFAULT_SETTINGS_COMMAND : 1 << (24*8+7),
HCI_LE_SET_EVENT_MASK_COMMAND : 1 << (25*8+0),
HCI_LE_READ_BUFFER_SIZE_COMMAND : 1 << (25*8+1),
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (25*8+2),
@@ -1011,10 +1000,6 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_RECEIVER_TEST_COMMAND : 1 << (28*8+4),
HCI_LE_TRANSMITTER_TEST_COMMAND : 1 << (28*8+5),
HCI_LE_TEST_END_COMMAND : 1 << (28*8+6),
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND : 1 << (28*8+7),
HCI_LE_CS_SET_CHANNEL_CLASSIFICATION_COMMAND : 1 << (29*8+0),
HCI_LE_CS_SET_PROCEDURE_PARAMETERS_COMMAND : 1 << (29*8+1),
HCI_LE_CS_PROCEDURE_ENABLE_COMMAND : 1 << (29*8+2),
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (29*8+3),
HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (29*8+4),
HCI_READ_LOCAL_SUPPORTED_CODECS_COMMAND : 1 << (29*8+5),
@@ -1151,21 +1136,11 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND : 1 << (46*8+0),
HCI_LE_SUBRATE_REQUEST_COMMAND : 1 << (46*8+1),
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (46*8+2),
HCI_LE_SET_DECISION_DATA_COMMAND : 1 << (46*8+3),
HCI_LE_SET_DECISION_INSTRUCTIONS_COMMAND : 1 << (46*8+4),
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND : 1 << (46*8+5),
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND : 1 << (46*8+6),
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND : 1 << (46*8+7),
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND : 1 << (47*8+0),
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (47*8+1),
HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (47*8+2),
HCI_LE_READ_ALL_REMOTE_FEATURES_COMMAND : 1 << (47*8+3),
HCI_LE_SET_HOST_FEATURE_V2_COMMAND : 1 << (47*8+4),
HCI_LE_ADD_DEVICE_TO_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+5),
HCI_LE_REMOVE_DEVICE_FROM_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+6),
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
}
# LE Supported Features
@@ -1482,7 +1457,7 @@ class CodingFormat:
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
def parse_from_bytes(cls, data: bytes, offset: int):
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
@@ -1492,15 +1467,14 @@ class CodingFormat:
vendor_specific_codec_id=vendor_specific_codec_id,
)
@classmethod
def from_bytes(cls, data: bytes) -> CodingFormat:
return cls.parse_from_bytes(data, 0)[1]
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@@ -1717,7 +1691,7 @@ class HCI_Object:
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, '__bytes__'
field_value, 'to_bytes'
):
field_bytes = bytes(field_value)
if isinstance(field_type, int) and 4 < field_type <= 256:
@@ -1762,7 +1736,7 @@ class HCI_Object:
def from_bytes(cls, data, offset, fields):
return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def __bytes__(self):
def to_bytes(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@staticmethod
@@ -1857,6 +1831,9 @@ class HCI_Object:
for field_name, field_value in field_strings
)
def __bytes__(self):
return self.to_bytes()
def __init__(self, fields, **kwargs):
self.fields = fields
self.init_from_fields(self, fields, kwargs)
@@ -2031,6 +2008,9 @@ class Address:
def is_static(self):
return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_bytes(self):
return self.address_bytes
def to_string(self, with_type_qualifier=True):
'''
String representation of the address, MSB first, with an optional type
@@ -2042,7 +2022,7 @@ class Address:
return result + '/P'
def __bytes__(self):
return self.address_bytes
return self.to_bytes()
def __hash__(self):
return hash(self.address_bytes)
@@ -2248,13 +2228,16 @@ class HCI_Command(HCI_Packet):
self.op_code = op_code
self.parameters = parameters
def __bytes__(self):
def to_bytes(self):
parameters = b'' if self.parameters is None else self.parameters
return (
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
+ parameters
)
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'green')
if fields := getattr(self, 'fields', None):
@@ -4319,61 +4302,6 @@ class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
('periodic_advertising_interval_min', 2),
('periodic_advertising_interval_max', 2),
('periodic_advertising_properties', 2),
]
)
class HCI_LE_Set_Periodic_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.61 LE Set Periodic Advertising Parameters command
'''
class Properties(enum.IntFlag):
INCLUDE_TX_POWER = 1 << 6
advertising_handle: int
periodic_advertising_interval_min: int
periodic_advertising_interval_max: int
periodic_advertising_properties: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
(
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
).name,
},
),
(
'advertising_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
)
class HCI_LE_Set_Periodic_Advertising_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.62 LE Set Periodic Advertising Data command
'''
advertising_handle: int
operation: int
advertising_data: bytes
# -----------------------------------------------------------------------------
@HCI_Command.command([('enable', 1), ('advertising_handle', 1)])
class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command):
@@ -5068,6 +4996,7 @@ class HCI_Event(HCI_Packet):
hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
@staticmethod
def event(fields=()):
@@ -5125,37 +5054,41 @@ class HCI_Event(HCI_Packet):
return event_class
@staticmethod
def from_bytes(packet: bytes) -> HCI_Event:
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Event:
event_code = packet[1]
length = packet[2]
parameters = packet[3:]
if len(parameters) != length:
raise InvalidPacketError('invalid packet length')
cls: Any
subclass: Any
if event_code == HCI_LE_META_EVENT:
# We do this dispatch here and not in the subclass in order to avoid call
# loops
subevent_code = parameters[0]
cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if cls is None:
subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if subclass is None:
# No class registered, just use a generic class instance
return HCI_LE_Meta_Event(subevent_code, parameters)
elif event_code == HCI_VENDOR_EVENT:
subevent_code = parameters[0]
cls = HCI_Vendor_Event.subevent_classes.get(subevent_code)
if cls is None:
# No class registered, just use a generic class instance
return HCI_Vendor_Event(subevent_code, parameters)
# Invoke all the registered factories to see if any of them can handle
# the event
if cls.vendor_factory:
if event := cls.vendor_factory(parameters):
return event
# No factory, or the factory could not create an instance,
# return a generic vendor event
return HCI_Event(event_code, parameters)
else:
cls = HCI_Event.event_classes.get(event_code)
if cls is None:
subclass = HCI_Event.event_classes.get(event_code)
if subclass is None:
# No class registered, just use a generic class instance
return HCI_Event(event_code, parameters)
# Invoke the factory to create a new instance
return cls.from_parameters(parameters) # type: ignore
return subclass.from_parameters(parameters) # type: ignore
@classmethod
def from_parameters(cls, parameters):
@@ -5178,10 +5111,13 @@ class HCI_Event(HCI_Packet):
self.event_code = event_code
self.parameters = parameters
def __bytes__(self):
def to_bytes(self):
parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'magenta')
if fields := getattr(self, 'fields', None):
@@ -5198,11 +5134,11 @@ HCI_Event.register_events(globals())
# -----------------------------------------------------------------------------
class HCI_Extended_Event(HCI_Event):
'''
HCI_Event subclass for events that has a subevent code.
HCI_Event subclass for events that have a subevent code.
'''
subevent_names: Dict[int, str] = {}
subevent_classes: Dict[int, Type[HCI_Extended_Event]]
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod
def event(cls, fields=()):
@@ -5253,7 +5189,22 @@ class HCI_Extended_Event(HCI_Event):
cls.subevent_names.update(cls.subevent_map(symbols))
@classmethod
def from_parameters(cls, parameters):
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
"""
Factory method that parses the subevent code, finds a registered subclass,
and creates an instance if found.
"""
subevent_code = parameters[0]
if subclass := cls.subevent_classes.get(subevent_code):
return subclass.from_parameters(parameters)
return None
@classmethod
def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event:
"""Factory method for subclasses (the subevent code has already been parsed)"""
self = cls.__new__(cls)
HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
if fields := getattr(self, 'fields', None):
@@ -5294,12 +5245,6 @@ class HCI_LE_Meta_Event(HCI_Extended_Event):
HCI_LE_Meta_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
class HCI_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes = {}
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
@@ -6173,8 +6118,9 @@ class HCI_Command_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.14 Command Complete Event
'''
return_parameters = b''
num_hci_command_packets: int
command_opcode: int
return_parameters = b''
def map_return_parameters(self, return_parameters):
'''Map simple 'status' return parameters to their named constant form'''
@@ -6710,6 +6656,14 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('data', "*")])
class HCI_Vendor_Event(HCI_Event):
'''
See Bluetooth spec @ 5.4.4 HCI Event packet
'''
# -----------------------------------------------------------------------------
class HCI_AclDataPacket(HCI_Packet):
'''
@@ -6732,7 +6686,7 @@ class HCI_AclDataPacket(HCI_Packet):
connection_handle, pb_flag, bc_flag, data_total_length, data
)
def __bytes__(self):
def to_bytes(self):
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
@@ -6746,6 +6700,9 @@ class HCI_AclDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self):
return self.to_bytes()
def __str__(self):
return (
f'{color("ACL", "blue")}: '
@@ -6779,7 +6736,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
connection_handle, packet_status, data_total_length, data
)
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
h = (self.packet_status << 12) | self.connection_handle
return (
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
@@ -6798,6 +6755,9 @@ class HCI_SynchronousDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
@@ -6870,6 +6830,9 @@ class HCI_IsoDataPacket(HCI_Packet):
)
def __bytes__(self) -> bytes:
return self.to_bytes()
def to_bytes(self) -> bytes:
fmt = '<BHH'
args = [
HCI_ISO_DATA_PACKET,
+17 -20
View File
@@ -141,7 +141,7 @@ class HfFeature(enum.IntFlag):
"""
HF supported features (AT+BRSF=) (normative).
Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
EC_NR = 0x001 # Echo Cancel & Noise reduction
@@ -155,14 +155,14 @@ class HfFeature(enum.IntFlag):
HF_INDICATORS = 0x100
ESCO_S4_SETTINGS_SUPPORTED = 0x200
ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
VOICE_RECOGNITION_TEXT = 0x800
VOICE_RECOGNITION_TEST = 0x800
class AgFeature(enum.IntFlag):
"""
AG supported features (+BRSF:) (normative).
Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
THREE_WAY_CALLING = 0x001
@@ -178,7 +178,7 @@ class AgFeature(enum.IntFlag):
HF_INDICATORS = 0x400
ESCO_S4_SETTINGS_SUPPORTED = 0x800
ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
VOICE_RECOGNITION_TEXT = 0x2000
VOICE_RECOGNITION_TEST = 0x2000
class AudioCodec(enum.IntEnum):
@@ -1390,7 +1390,6 @@ class AgProtocol(pyee.EventEmitter):
def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
self.emit('supported_audio_codecs', self.supported_audio_codecs)
self.send_ok()
def _on_bcs(self, codec: bytes) -> None:
@@ -1619,7 +1618,7 @@ class ProfileVersion(enum.IntEnum):
"""
Profile version (normative).
Hands-Free Profile v1.8, 6.3 SDP Interoperability Requirements.
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
V1_5 = 0x0105
@@ -1633,7 +1632,7 @@ class HfSdpFeature(enum.IntFlag):
"""
HF supported features (normative).
Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
EC_NR = 0x01 # Echo Cancel & Noise reduction
@@ -1641,17 +1640,16 @@ class HfSdpFeature(enum.IntFlag):
CLI_PRESENTATION_CAPABILITY = 0x04
VOICE_RECOGNITION_ACTIVATION = 0x08
REMOTE_VOLUME_CONTROL = 0x10
WIDE_BAND_SPEECH = 0x20
WIDE_BAND = 0x20 # Wide band speech
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND = 0x100
VOICE_RECOGNITION_TEST = 0x80
class AgSdpFeature(enum.IntFlag):
"""
AG supported features (normative).
Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
THREE_WAY_CALLING = 0x01
@@ -1659,10 +1657,9 @@ class AgSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_FUNCTION = 0x04
IN_BAND_RING_TONE_CAPABILITY = 0x08
VOICE_TAG = 0x10 # Attach a number to voice tag
WIDE_BAND_SPEECH = 0x20
WIDE_BAND = 0x20 # Wide band speech
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND_SPEED_SPEECH = 0x100
VOICE_RECOGNITION_TEST = 0x80
def make_hf_sdp_records(
@@ -1695,11 +1692,11 @@ def make_hf_sdp_records(
in configuration.supported_hf_features
):
hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if HfFeature.VOICE_RECOGNITION_TEXT in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEXT
if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST
if AudioCodec.MSBC in configuration.supported_audio_codecs:
hf_supported_features |= HfSdpFeature.WIDE_BAND_SPEECH
hf_supported_features |= HfSdpFeature.WIDE_BAND
return [
sdp.ServiceAttribute(
@@ -1775,14 +1772,14 @@ def make_ag_sdp_records(
in configuration.supported_ag_features
):
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if AgFeature.VOICE_RECOGNITION_TEXT in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEXT
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
if AudioCodec.MSBC in configuration.supported_audio_codecs:
ag_supported_features |= AgSdpFeature.WIDE_BAND_SPEECH
ag_supported_features |= AgSdpFeature.WIDE_BAND
return [
sdp.ServiceAttribute(
+5 -2
View File
@@ -199,7 +199,7 @@ class Host(AbortableEventEmitter):
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if bytes(connection.peer_address) == bytes(bd_addr):
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type
@@ -552,7 +552,7 @@ class Host(AbortableEventEmitter):
return response
except Exception as error:
logger.warning(
logger.exception(
f'{color("!!! Exception while sending command:", "red")} {error}'
)
raise error
@@ -1248,3 +1248,6 @@ class Host(AbortableEventEmitter):
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)
def on_hci_vendor_event(self, event):
self.emit('vendor_event', event)
+8 -2
View File
@@ -225,7 +225,7 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload
@@ -233,6 +233,9 @@ class L2CAP_PDU:
self.cid = cid
self.payload = payload
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -330,9 +333,12 @@ class L2CAP_Control_Frame:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
if fields := getattr(self, 'fields', None):
+1 -1
View File
@@ -39,6 +39,7 @@ from bumble.device import (
AdvertisingEventProperties,
AdvertisingType,
Device,
Phy,
)
from bumble.gatt import Service
from bumble.hci import (
@@ -46,7 +47,6 @@ from bumble.hci import (
HCI_PAGE_TIMEOUT_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address,
Phy,
)
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
+3 -3
View File
@@ -95,7 +95,7 @@ class AudioInputStatus(OpenIntEnum):
Cf. 3.4 Audio Input Status
'''
INACTIVE = 0x00
INATIVE = 0x00
ACTIVE = 0x01
@@ -104,7 +104,7 @@ class AudioInputControlPointOpCode(OpenIntEnum):
Cf. 3.5.1 Audio Input Control Point procedure requirements
'''
SET_GAIN_SETTING = 0x01
SET_GAIN_SETTING = 0x00
UNMUTE = 0x02
MUTE = 0x03
SET_MANUAL_GAIN_MODE = 0x04
@@ -239,7 +239,7 @@ class AudioInputControlPoint:
or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum
):
logger.error("gain_settings value out of range")
logger.error("gain_seetings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
if self.audio_input_state.gain_settings != gain_settings_operand:
+42 -114
View File
@@ -265,7 +265,7 @@ class UnicastServerAdvertisingData:
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack(
'<2sBIB',
bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
self.announcement_type,
self.available_audio_contexts,
len(self.metadata),
@@ -398,21 +398,18 @@ class CodecSpecificConfiguration:
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
sampling_frequency: SamplingFrequency
frame_duration: FrameDuration
audio_channel_allocation: AudioLocation
octets_per_codec_frame: int
codec_frames_per_sdu: int
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
# Allowed default values.
audio_channel_allocation = AudioLocation.NOT_ALLOWED
codec_frames_per_sdu = 1
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
@@ -430,6 +427,8 @@ class CodecSpecificConfiguration:
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency,
frame_duration=frame_duration,
@@ -439,43 +438,23 @@ class CodecSpecificConfiguration:
)
def __bytes__(self) -> bytes:
return b''.join(
[
struct.pack(fmt, length, tag, value)
for fmt, length, tag, value in [
(
'<BBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
),
(
'<BBI',
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
),
(
'<BBH',
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
),
]
if value is not None
]
return struct.pack(
'<BBBBBBBBIBBHBBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
)
@@ -487,24 +466,6 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little'))
def __bytes__(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
@dataclasses.dataclass
class BasicAudioAnnouncement:
@@ -513,37 +474,26 @@ class BasicAudioAnnouncement:
index: int
codec_specific_configuration: CodecSpecificConfiguration
def __bytes__(self) -> bytes:
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([self.index, len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
)
@dataclasses.dataclass
class CodecInfo:
coding_format: hci.CodecID
company_id: int
vendor_specific_codec_id: int
@classmethod
def from_bytes(cls, data: bytes) -> Self:
coding_format = hci.CodecID(data[0])
company_id = int.from_bytes(data[1:3], 'little')
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
return cls(coding_format, company_id, vendor_specific_codec_id)
@dataclasses.dataclass
class Subgroup:
codec_id: hci.CodingFormat
codec_id: BasicAudioAnnouncement.CodecInfo
codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ bytes(self.codec_id)
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
+ metadata_bytes
+ b''.join(map(bytes, self.bis))
)
presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -555,7 +505,7 @@ class BasicAudioAnnouncement:
for _ in range(data[3]):
num_bis = data[offset]
offset += 1
codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5])
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5])
offset += 5
codec_specific_configuration_length = data[offset]
offset += 1
@@ -599,25 +549,3 @@ class BasicAudioAnnouncement:
)
return cls(presentation_delay, subgroups)
def __bytes__(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
+7 -1
View File
@@ -344,6 +344,9 @@ class DataElement:
] # Keep a copy so we can re-serialize to an exact replica
return result
def to_bytes(self):
return bytes(self)
def __bytes__(self):
# Return early if we have a cache
if self.bytes:
@@ -620,9 +623,12 @@ class SDP_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self):
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'
if fields := getattr(self, 'fields', None):
+5 -2
View File
@@ -298,9 +298,12 @@ class SMP_Command:
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self):
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'yellow')
if fields := getattr(self, 'fields', None):
@@ -1946,7 +1949,7 @@ class Manager(EventEmitter):
f'{connection.peer_address}: {command}'
)
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, bytes(command))
connection.send_l2cap_pdu(cid, command.to_bytes())
def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command
+8 -2
View File
@@ -149,7 +149,10 @@ async def open_usb_transport(spec: str) -> Transport:
if status != usb1.TRANSFER_COMPLETED:
logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red')
color(
f'!!! OUT transfer not completed: status={status}',
'red',
)
)
async def process_queue(self):
@@ -275,7 +278,10 @@ async def open_usb_transport(spec: str) -> Transport:
)
else:
logger.warning(
color(f'!!! IN transfer not completed: status={status}', 'red')
color(
f'!!! IN[{packet_type}] transfer not completed: status={status}',
'red',
)
)
self.loop.call_soon_threadsafe(self.on_transport_lost)
+29 -4
View File
@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Dict, Optional, Type
from bumble.hci import (
name_or_number,
@@ -24,7 +25,9 @@ from bumble.hci import (
HCI_Constant,
HCI_Object,
HCI_Command,
HCI_Vendor_Event,
HCI_Event,
HCI_Extended_Event,
HCI_VENDOR_EVENT,
STATUS_SPEC,
)
@@ -48,7 +51,6 @@ HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F)
HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58
HCI_Command.register_commands(globals())
HCI_Vendor_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
@@ -279,7 +281,29 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Vendor_Event.event(
class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
subevent_code = parameters[0]
if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT:
quality_report_id = parameters[1]
if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09):
return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters)
return None
HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters
# -----------------------------------------------------------------------------
@HCI_Extended_Event.event(
fields=[
('quality_report_id', 1),
('packet_types', 1),
@@ -308,10 +332,11 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
('tx_last_subevent_packets', 4),
('crc_error_packets', 4),
('rx_duplicate_packets', 4),
('rx_unreceived_packets', 4),
('vendor_specific_parameters', '*'),
]
)
class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event):
class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event
+2 -1
View File
@@ -16,4 +16,5 @@ USB vendor ID and product ID.
Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Intel](intel.md): Loading of Firmware and Config for Intel USB controllers.
+73
View File
@@ -0,0 +1,73 @@
INTEL DRIVER
==============
This driver supports loading firmware images and optional config data to
Intel USB controllers.
A number of USB dongles are supported, but likely not all.
The initial implementation has been tested on BE200 and AX210 controllers.
When using a USB controller, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=intel`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for the firmware and config files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR`
if set.
* The directory `<package-dir>/drivers/intel_fw` where `<package-dir>` is the directory
where the `bumble` package is installed.
* The current directory.
It is also possible to override or extend the config data with parameters passed via the
transport name. The driver name `intel` may be suffixed with `/<param:value>[+<param:value>]...`
The supported params are:
* `ddc_addon`: configuration data to add to the data loaded from the config data file
* `ddc_override`: configuration data to use instead of the data loaded from the config data file.
With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing
the config data (same format as the config file).
Example transport name:
`usb:[driver=intel/dcc_addon:03E40200]0`
Obtaining Firmware Images and Config Data
-----------------------------------------
Firmware images and config data may be obtained from a variety of online
sources.
To facilitate finding a downloading the, the utility program `bumble-intel-fw-download`
may be used.
```
Usage: bumble-intel-fw-download [OPTIONS]
Download Intel firmware images and configs.
Options:
--output-dir TEXT Output directory where the files will be saved.
Defaults to the OS-specific app data dir, which the
driver will check when trying to find firmware
--source [linux-kernel] [default: linux-kernel]
--single TEXT Only download a single image set, by its base name
--force Overwrite files if they already exist
--help Show this message and exit.
```
Utility
-------
The `bumble-intel-util` utility may be used to interact with an Intel USB controller.
```
Usage: bumble-intel-util [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
bootloader Reboot in bootloader mode.
info Get the firmware info.
load Load a firmware image.
```
+3 -8
View File
@@ -21,10 +21,10 @@ import sys
import os
import io
import logging
from typing import Iterable, Optional
import websockets
from typing import Optional
import bumble.core
from bumble.device import Device, ScoLink
from bumble.transport import open_transport_or_link
@@ -82,10 +82,6 @@ def on_microphone_volume(level: int):
send_message(type='microphone_volume', level=level)
def on_supported_audio_codecs(codecs: Iterable[hfp.AudioCodec]):
send_message(type='supported_audio_codecs', codecs=[codec.name for codec in codecs])
def on_sco_state_change(codec: int):
if codec == hfp.AudioCodec.CVSD:
sample_rate = 8000
@@ -211,7 +207,6 @@ async def main() -> None:
ag_protocol = hfp.AgProtocol(dlc, configuration)
ag_protocol.on('speaker_volume', on_speaker_volume)
ag_protocol.on('microphone_volume', on_microphone_volume)
ag_protocol.on('supported_audio_codecs', on_supported_audio_codecs)
on_hfp_state_change(True)
dlc.multiplexer.l2cap_channel.on(
'close', lambda: on_hfp_state_change(False)
@@ -246,7 +241,7 @@ async def main() -> None:
# Pick the first one
channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features.name}')
print(f'HF features: {hf_sdp_features}')
# Request authentication
print('*** Authenticating...')
+1 -7
View File
@@ -161,13 +161,7 @@ async def main() -> None:
else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration
if (
not isinstance(codec_configuration, CodecSpecificConfiguration)
or codec_configuration.sampling_frequency is None
or codec_configuration.audio_channel_allocation is None
or codec_configuration.frame_duration is None
):
return
assert isinstance(codec_configuration, CodecSpecificConfiguration)
# Write a LC3 header.
file_output.write(
bytes([0x1C, 0xCC]) # Header.
+1 -1
View File
@@ -80,7 +80,7 @@ impl Address {
/// Creates a new [Address] object.
pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> {
Python::with_gil(|py| {
PyModule::import(py, intern!(py, "bumble.hci"))?
PyModule::import(py, intern!(py, "bumble.device"))?
.getattr(intern!(py, "Address"))?
.call1((address, address_type))
.map(|any| Self(any.into()))
+3 -1
View File
@@ -51,7 +51,7 @@ install_requires =
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets == 13.1; platform_system!='Emscripten'
websockets >= 12.0; platform_system!='Emscripten'
[options.entry_points]
console_scripts =
@@ -75,6 +75,8 @@ console_scripts =
bumble-pandora-server = bumble.apps.pandora_server:main
bumble-rtk-util = bumble.tools.rtk_util:main
bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main
bumble-intel-util = bumble.tools.intel_util:main
bumble-intel-fw-download = bumble.tools.intel_fw_download:main
[options.package_data]
* = py.typed, *.pyi
-52
View File
@@ -39,8 +39,6 @@ from bumble.profiles.ascs import (
)
from bumble.profiles.bap import (
AudioLocation,
BasicAudioAnnouncement,
BroadcastAudioAnnouncement,
SupportedFrameDuration,
SupportedSamplingFrequency,
SamplingFrequency,
@@ -202,56 +200,6 @@ def test_codec_specific_configuration() -> None:
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
def test_broadcast_audio_announcement() -> None:
broadcast_audio_announcement = BroadcastAudioAnnouncement(123456)
assert (
BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement))
== broadcast_audio_announcement
)
# -----------------------------------------------------------------------------
def test_basic_audio_announcement() -> None:
basic_audio_announcement = BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
BasicAudioAnnouncement.Subgroup(
codec_id=CodingFormat(codec_id=CodecID.LC3),
codec_specific_configuration=CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=Metadata(
[
Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'),
Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'),
]
),
bis=[
BasicAudioAnnouncement.BIS(
index=0,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_LEFT
),
),
BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
assert (
BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement))
== basic_audio_announcement
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():
+10 -55
View File
@@ -19,7 +19,9 @@ import asyncio
import functools
import logging
import os
from types import LambdaType
import pytest
from unittest import mock
from bumble.core import (
BT_BR_EDR_TRANSPORT,
@@ -27,13 +29,7 @@ from bumble.core import (
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
Connection,
Device,
PeriodicAdvertisingParameters,
)
from bumble.device import AdvertisingParameters, Connection, Device
from bumble.host import AclPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -269,8 +265,7 @@ async def test_flush():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = TwoDevices()[0]
await device.power_on()
device = Device(host=mock.AsyncMock(Host))
# Start advertising
await device.start_advertising()
@@ -288,10 +283,7 @@ async def test_legacy_advertising():
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart)
device.on_connection(
@@ -313,11 +305,6 @@ async def test_legacy_advertising_disconnection(auto_restart):
await async_barrier()
if auto_restart:
assert device.legacy_advertising_set
started = asyncio.Event()
if not device.is_advertising:
device.legacy_advertising_set.once('start', started.set)
await asyncio.wait_for(started.wait(), _TIMEOUT)
assert device.is_advertising
else:
assert not device.is_advertising
@@ -326,8 +313,7 @@ async def test_legacy_advertising_disconnection(auto_restart):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = TwoDevices()[0]
await device.power_on()
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertising_set = await device.create_advertising_set()
@@ -346,8 +332,7 @@ async def test_extended_advertising():
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = TwoDevices()[0]
await device.power_on()
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
@@ -383,10 +368,8 @@ async def test_extended_advertising_connection(own_address_type):
)
@pytest.mark.asyncio
async def test_extended_advertising_connection_out_of_order(own_address_type):
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
@@ -399,7 +382,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
Address('F0:F1:F2:F3:F4:F5'),
peer_address,
None,
None,
BT_PERIPHERAL_ROLE,
@@ -414,34 +397,6 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_periodic_advertising():
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False
)
),
advertising_data=b'123',
periodic_advertising_parameters=PeriodicAdvertisingParameters(),
periodic_advertising_data=b'abc',
)
assert device.extended_advertising_sets
assert advertising_set.enabled
assert not advertising_set.periodic_enabled
await advertising_set.start_periodic()
assert advertising_set.periodic_enabled
await advertising_set.stop_periodic()
assert not advertising_set.periodic_enabled
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():
+2 -2
View File
@@ -57,7 +57,7 @@ from .test_utils import async_barrier
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = bytes(x)
pdu = x.to_bytes()
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
@@ -74,7 +74,7 @@ def test_UUID():
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(bytes(v))
w = UUID.from_bytes(v.to_bytes())
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)
+4 -4
View File
@@ -75,13 +75,13 @@ from bumble.hci import (
def basic_check(x):
packet = bytes(x)
packet = x.to_bytes()
print(packet.hex())
parsed = HCI_Packet.from_bytes(packet)
x_str = str(x)
parsed_str = str(parsed)
print(x_str)
parsed_bytes = bytes(parsed)
parsed_bytes = parsed.to_bytes()
assert x_str == parsed_str
assert packet == parsed_bytes
@@ -188,7 +188,7 @@ def test_HCI_Command_Complete_Event():
return_parameters=bytes([7]),
)
basic_check(event)
event = HCI_Packet.from_bytes(bytes(event))
event = HCI_Packet.from_bytes(event.to_bytes())
assert event.return_parameters == 7
# With a simple status as an integer status
@@ -562,7 +562,7 @@ def test_iso_data_packet():
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert bytes(packet) == data
assert packet.to_bytes() == data
# -----------------------------------------------------------------------------
+2 -2
View File
@@ -61,7 +61,7 @@ def _default_hf_configuration() -> hfp.HfConfiguration:
# -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return (
hfp.HfSdpFeature.WIDE_BAND_SPEECH
hfp.HfSdpFeature.WIDE_BAND
| hfp.HfSdpFeature.THREE_WAY_CALLING
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
)
@@ -108,7 +108,7 @@ def _default_ag_configuration() -> hfp.AgConfiguration:
# -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return (
hfp.AgSdpFeature.WIDE_BAND_SPEECH
hfp.AgSdpFeature.WIDE_BAND
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
| hfp.AgSdpFeature.THREE_WAY_CALLING
)
+1 -1
View File
@@ -240,7 +240,7 @@ async def test_self_gatt():
result = await peer.discover_included_services(result[0])
assert len(result) == 2
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID
assert bytes(result[1].uuid) == bytes(s3.uuid)
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes()
# -----------------------------------------------------------------------------
+130
View File
@@ -0,0 +1,130 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import pathlib
import urllib.request
import urllib.error
import click
from bumble.colors import color
from bumble.drivers import intel
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/intel"
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def download_file(base_url, name):
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
# -----------------------------------------------------------------------------
@click.command
@click.option(
"--output-dir",
default="",
help="Output directory where the files will be saved. Defaults to the OS-specific"
"app data dir, which the driver will check when trying to find firmware",
show_default=True,
)
@click.option(
"--source",
type=click.Choice(["linux-kernel"]),
default="linux-kernel",
show_default=True,
)
@click.option("--single", help="Only download a single image set, by its base name")
@click.option("--force", is_flag=True, help="Overwrite files if they already exist")
def main(output_dir, source, single, force):
"""Download Intel firmware images and configs."""
# Check that the output dir exists
if output_dir == '':
output_dir = intel.intel_firmware_dir()
else:
output_dir = pathlib.Path(output_dir)
if not output_dir.is_dir():
print("Output dir does not exist or is not a directory")
return
base_url = {
"linux-kernel": LINUX_KERNEL_GIT_SOURCE,
}[source]
print("Downloading")
print(color("FROM:", "green"), base_url)
print(color("TO:", "green"), output_dir)
if single:
images = [(f"{single}.sfi", f"{single}.ddc")]
else:
images = [
(f"{base_name}.sfi", f"{base_name}.ddc")
for base_name in intel.INTEL_FW_IMAGE_NAMES
]
for fw_name, config_name in images:
print(color("---", "yellow"))
fw_image_out = output_dir / fw_name
if not force and fw_image_out.exists():
print(color(f"{fw_image_out} already exists, skipping", "red"))
continue
if config_name:
config_image_out = output_dir / config_name
if not force and config_image_out.exists():
print(color("f{config_image_out} already exists, skipping", "red"))
continue
try:
fw_image = download_file(base_url, fw_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {fw_name}: {error}")
continue
config_image = None
if config_name:
try:
config_image = download_file(base_url, config_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {config_name}: {error}")
continue
fw_image_out.write_bytes(fw_image)
if config_image:
config_image_out.write_bytes(config_image)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()
+154
View File
@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
from typing import Any, Optional
import click
from bumble.colors import color
from bumble import transport
from bumble.drivers import intel
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def print_device_info(device_info: dict[intel.ValueType, Any]) -> None:
if (mode := device_info.get(intel.ValueType.CURRENT_MODE_OF_OPERATION)) is not None:
print(
color("MODE:", "yellow"),
mode.name,
)
print(color("DETAILS:", "yellow"))
for key, value in device_info.items():
print(f" {color(key.name, 'green')}: {value}")
# -----------------------------------------------------------------------------
async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]:
# Create a driver
driver = await intel.Driver.for_host(host, force)
if driver is None:
print("Device does not appear to be an Intel device")
return None
return driver
# -----------------------------------------------------------------------------
async def do_info(usb_transport, force):
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_load(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.load_firmware()
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_bootloader(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.reboot_bootloader()
# -----------------------------------------------------------------------------
@click.group()
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Try to get the device info even if the USB info doesn't match",
)
def info(usb_transport, force):
"""Get the firmware info."""
asyncio.run(do_info(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Load even if the USB info doesn't match",
)
def load(usb_transport, force):
"""Load a firmware image."""
asyncio.run(do_load(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Attempt to reboot event if the USB info doesn't match",
)
def bootloader(usb_transport, force):
"""Reboot in bootloader mode."""
asyncio.run(do_bootloader(usb_transport, force))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()