Merge pull request #597 from google/gbg/intel-hci

intel hci
This commit is contained in:
Gilles Boccon-Gibod
2024-11-29 10:41:10 -08:00
committed by GitHub
13 changed files with 1052 additions and 63 deletions

View File

@@ -199,7 +199,7 @@ def log_stats(title, stats, precision=2):
stats_min = min(stats) stats_min = min(stats)
stats_max = max(stats) stats_max = max(stats)
stats_avg = statistics.mean(stats) stats_avg = statistics.mean(stats)
stats_stdev = statistics.stdev(stats) stats_stdev = statistics.stdev(stats) if len(stats) >= 2 else 0
logging.info( logging.info(
color( color(
( (
@@ -468,6 +468,7 @@ class Ping:
for run in range(self.repeat + 1): for run in range(self.repeat + 1):
self.done.clear() self.done.clear()
self.ping_times = []
if run > 0 and self.repeat and self.repeat_delay: if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green')) logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))

View File

@@ -144,18 +144,18 @@ class Printer:
help='Format of the input file', help='Format of the input file',
) )
@click.option( @click.option(
'--vendors', '--vendor',
type=click.Choice(['android', 'zephyr']), type=click.Choice(['android', 'zephyr']),
multiple=True, multiple=True,
help='Support vendor-specific commands (list one or more)', help='Support vendor-specific commands (list one or more)',
) )
@click.argument('filename') @click.argument('filename')
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
def main(format, vendors, filename): def main(format, vendor, filename):
for vendor in vendors: for vendor_name in vendor:
if vendor == 'android': if vendor_name == 'android':
import bumble.vendor.android.hci import bumble.vendor.android.hci
elif vendor == 'zephyr': elif vendor_name == 'zephyr':
import bumble.vendor.zephyr.hci import bumble.vendor.zephyr.hci
input = open(filename, 'rb') input = open(filename, 'rb')
@@ -180,7 +180,7 @@ def main(format, vendors, filename):
else: else:
printer.print(color("[TRUNCATED]", "red")) printer.print(color("[TRUNCATED]", "red"))
except Exception as error: except Exception as error:
logger.exception() logger.exception('')
print(color(f'!!! {error}', 'red')) print(color(f'!!! {error}', 'red'))

View File

@@ -20,6 +20,8 @@ Common types for drivers.
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import abc import abc
from bumble import core
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Classes # Classes

View File

@@ -11,18 +11,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""
Support for Intel USB controllers.
Loosely based on the Fuchsia OS implementation.
"""
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging import logging
import os
import pathlib
import platform
import struct
from typing import Any, Deque, Optional, TYPE_CHECKING
from bumble import core
from bumble.drivers import common from bumble.drivers import common
from bumble.hci import ( from bumble import hci
hci_vendor_command_op_code, # type: ignore from bumble import utils
HCI_Command,
HCI_Reset_Command, if TYPE_CHECKING:
) from bumble.host import Host
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -34,39 +49,328 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
INTEL_USB_PRODUCTS = { INTEL_USB_PRODUCTS = {
# Intel AX210 (0x8087, 0x0032), # AX210
(0x8087, 0x0032), (0x8087, 0x0036), # BE200
# Intel BE200
(0x8087, 0x0036),
} }
INTEL_FW_IMAGE_NAMES = [
"ibt-0040-0041",
"ibt-0040-1020",
"ibt-0040-1050",
"ibt-0040-2120",
"ibt-0040-4150",
"ibt-0041-0041",
"ibt-0180-0041",
"ibt-0180-1050",
"ibt-0180-4150",
"ibt-0291-0291",
"ibt-1040-0041",
"ibt-1040-1020",
"ibt-1040-1050",
"ibt-1040-2120",
"ibt-1040-4150",
]
INTEL_FIRMWARE_DIR_ENV = "BUMBLE_INTEL_FIRMWARE_DIR"
INTEL_LINUX_FIRMWARE_DIR = "/lib/firmware/intel"
_MAX_FRAGMENT_SIZE = 252
_POST_RESET_DELAY = 0.2
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# HCI Commands # HCI Commands
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
HCI_INTEL_DDC_CONFIG_WRITE_COMMAND = hci_vendor_command_op_code(0xFC8B) # type: ignore HCI_INTEL_WRITE_DEVICE_CONFIG_COMMAND = hci.hci_vendor_command_op_code(0x008B)
HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD = [0x03, 0xE4, 0x02, 0x00] HCI_INTEL_READ_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x0005)
HCI_INTEL_RESET_COMMAND = hci.hci_vendor_command_op_code(0x0001)
HCI_INTEL_SECURE_SEND_COMMAND = hci.hci_vendor_command_op_code(0x0009)
HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
HCI_Command.register_commands(globals()) hci.HCI_Command.register_commands(globals())
@HCI_Command.command( # type: ignore @hci.HCI_Command.command(
fields=[("params", "*")], fields=[
("param0", 1),
],
return_parameters_fields=[ return_parameters_fields=[
("params", "*"), ("status", hci.STATUS_SPEC),
("tlv", "*"),
], ],
) )
class Hci_Intel_DDC_Config_Write_Command(HCI_Command): class HCI_Intel_Read_Version_Command(hci.HCI_Command):
pass pass
@hci.HCI_Command.command(
fields=[("data_type", 1), ("data", "*")],
return_parameters_fields=[
("status", 1),
],
)
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[
("reset_type", 1),
("patch_enable", 1),
("ddc_reload", 1),
("boot_option", 1),
("boot_address", 4),
],
return_parameters_fields=[
("data", "*"),
],
)
class HCI_Intel_Reset_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data", "*")],
return_parameters_fields=[
("status", hci.STATUS_SPEC),
("params", "*"),
],
)
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
pass
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def intel_firmware_dir() -> pathlib.Path:
"""
Returns:
A path to a subdir of the project data dir for Intel firmware.
The directory is created if it doesn't exist.
"""
from bumble.drivers import project_data_dir
p = project_data_dir() / "firmware" / "intel"
p.mkdir(parents=True, exist_ok=True)
return p
def _find_binary_path(file_name: str) -> pathlib.Path | None:
# First check if an environment variable is set
if INTEL_FIRMWARE_DIR_ENV in os.environ:
if (
path := pathlib.Path(os.environ[INTEL_FIRMWARE_DIR_ENV]) / file_name
).is_file():
logger.debug(f"{file_name} found in env dir")
return path
# When the environment variable is set, don't look elsewhere
return None
# Then, look where the firmware download tool writes by default
if (path := intel_firmware_dir() / file_name).is_file():
logger.debug(f"{file_name} found in project data dir")
return path
# Then, look in the package's driver directory
if (path := pathlib.Path(__file__).parent / "intel_fw" / file_name).is_file():
logger.debug(f"{file_name} found in package dir")
return path
# On Linux, check the system's FW directory
if (
platform.system() == "Linux"
and (path := pathlib.Path(INTEL_LINUX_FIRMWARE_DIR) / file_name).is_file()
):
logger.debug(f"{file_name} found in Linux system FW dir")
return path
# Finally look in the current directory
if (path := pathlib.Path.cwd() / file_name).is_file():
logger.debug(f"{file_name} found in CWD")
return path
return None
def _parse_tlv(data: bytes) -> list[tuple[ValueType, Any]]:
result: list[tuple[ValueType, Any]] = []
while len(data) >= 2:
value_type = ValueType(data[0])
value_length = data[1]
value = data[2 : 2 + value_length]
typed_value: Any
if value_type == ValueType.END:
break
if value_type in (ValueType.CNVI, ValueType.CNVR):
(v,) = struct.unpack("<I", value)
typed_value = (
(((v >> 0) & 0xF) << 12)
| (((v >> 4) & 0xF) << 0)
| (((v >> 8) & 0xF) << 4)
| (((v >> 24) & 0xF) << 8)
)
elif value_type == ValueType.HARDWARE_INFO:
(v,) = struct.unpack("<I", value)
typed_value = HardwareInfo(
HardwarePlatform((v >> 8) & 0xFF), HardwareVariant((v >> 16) & 0x3F)
)
elif value_type in (
ValueType.USB_VENDOR_ID,
ValueType.USB_PRODUCT_ID,
ValueType.DEVICE_REVISION,
):
(typed_value,) = struct.unpack("<H", value)
elif value_type == ValueType.CURRENT_MODE_OF_OPERATION:
typed_value = ModeOfOperation(value[0])
elif value_type in (
ValueType.BUILD_TYPE,
ValueType.BUILD_NUMBER,
ValueType.SECURE_BOOT,
ValueType.OTP_LOCK,
ValueType.API_LOCK,
ValueType.DEBUG_LOCK,
ValueType.SECURE_BOOT_ENGINE_TYPE,
):
typed_value = value[0]
elif value_type == ValueType.TIMESTAMP:
typed_value = Timestamp(value[0], value[1])
elif value_type == ValueType.FIRMWARE_BUILD:
typed_value = FirmwareBuild(value[0], Timestamp(value[1], value[2]))
elif value_type == ValueType.BLUETOOTH_ADDRESS:
typed_value = hci.Address(
value, address_type=hci.Address.PUBLIC_DEVICE_ADDRESS
)
else:
typed_value = value
result.append((value_type, typed_value))
data = data[2 + value_length :]
return result
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class DriverError(core.BaseBumbleError):
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
def __str__(self) -> str:
return f"IntelDriverError({self.message})"
class ValueType(utils.OpenIntEnum):
END = 0x00
CNVI = 0x10
CNVR = 0x11
HARDWARE_INFO = 0x12
DEVICE_REVISION = 0x16
CURRENT_MODE_OF_OPERATION = 0x1C
USB_VENDOR_ID = 0x17
USB_PRODUCT_ID = 0x18
TIMESTAMP = 0x1D
BUILD_TYPE = 0x1E
BUILD_NUMBER = 0x1F
SECURE_BOOT = 0x28
OTP_LOCK = 0x2A
API_LOCK = 0x2B
DEBUG_LOCK = 0x2C
FIRMWARE_BUILD = 0x2D
SECURE_BOOT_ENGINE_TYPE = 0x2F
BLUETOOTH_ADDRESS = 0x30
class HardwarePlatform(utils.OpenIntEnum):
INTEL_37 = 0x37
class HardwareVariant(utils.OpenIntEnum):
# This is a just a partial list.
# Add other constants here as new hardware is encountered and tested.
TYPHOON_PEAK = 0x17
GALE_PEAK = 0x1C
@dataclasses.dataclass
class HardwareInfo:
platform: HardwarePlatform
variant: HardwareVariant
@dataclasses.dataclass
class Timestamp:
week: int
year: int
@dataclasses.dataclass
class FirmwareBuild:
build_number: int
timestamp: Timestamp
class ModeOfOperation(utils.OpenIntEnum):
BOOTLOADER = 0x01
INTERMEDIATE = 0x02
OPERATIONAL = 0x03
class SecureBootEngineType(utils.OpenIntEnum):
RSA = 0x00
ECDSA = 0x01
@dataclasses.dataclass
class BootParams:
css_header_offset: int
css_header_size: int
pki_offset: int
pki_size: int
sig_offset: int
sig_size: int
write_offset: int
_BOOT_PARAMS = {
SecureBootEngineType.RSA: BootParams(0, 128, 128, 256, 388, 256, 964),
SecureBootEngineType.ECDSA: BootParams(644, 128, 772, 96, 868, 96, 964),
}
class Driver(common.Driver): class Driver(common.Driver):
def __init__(self, host): def __init__(self, host: Host) -> None:
self.host = host self.host = host
self.max_in_flight_firmware_load_commands = 1
self.pending_firmware_load_commands: Deque[hci.HCI_Command] = (
collections.deque()
)
self.can_send_firmware_load_command = asyncio.Event()
self.can_send_firmware_load_command.set()
self.firmware_load_complete = asyncio.Event()
self.reset_complete = asyncio.Event()
# Parse configuration options from the driver name.
self.ddc_addon: Optional[bytes] = None
self.ddc_override: Optional[bytes] = None
driver = host.hci_metadata.get("driver")
if driver is not None and driver.startswith("intel/"):
for key, value in [
key_eq_value.split(":") for key_eq_value in driver[6:].split("+")
]:
if key == "ddc_addon":
self.ddc_addon = bytes.fromhex(value)
elif key == "ddc_override":
self.ddc_override = bytes.fromhex(value)
@staticmethod @staticmethod
def check(host): def check(host: Host) -> bool:
driver = host.hci_metadata.get("driver") driver = host.hci_metadata.get("driver")
if driver == "intel": if driver == "intel" or driver is not None and driver.startswith("intel/"):
return True return True
vendor_id = host.hci_metadata.get("vendor_id") vendor_id = host.hci_metadata.get("vendor_id")
@@ -85,18 +389,283 @@ class Driver(common.Driver):
return True return True
@classmethod @classmethod
async def for_host(cls, host, force=False): # type: ignore async def for_host(cls, host: Host, force: bool = False):
# Only instantiate this driver if explicitly selected # Only instantiate this driver if explicitly selected
if not force and not cls.check(host): if not force and not cls.check(host):
return None return None
return cls(host) return cls(host)
async def init_controller(self): def on_packet(self, packet: bytes) -> None:
"""Handler for event packets that are received from an ACL channel"""
event = hci.HCI_Event.from_bytes(packet)
if not isinstance(event, hci.HCI_Command_Complete_Event):
self.host.on_hci_event_packet(event)
return
if not event.return_parameters == hci.HCI_SUCCESS:
raise DriverError("HCI_Command_Complete_Event error")
if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets:
logger.debug(
"max_in_flight_firmware_load_commands update: "
f"{event.num_hci_command_packets}"
)
self.max_in_flight_firmware_load_commands = event.num_hci_command_packets
logger.debug(f"event: {event}")
self.pending_firmware_load_commands.popleft()
in_flight = len(self.pending_firmware_load_commands)
logger.debug(f"event received, {in_flight} still in flight")
if in_flight < self.max_in_flight_firmware_load_commands:
self.can_send_firmware_load_command.set()
async def send_firmware_load_command(self, command: hci.HCI_Command) -> None:
# Wait until we can send.
await self.can_send_firmware_load_command.wait()
# Send the command and adjust counters.
self.host.send_hci_packet(command)
self.pending_firmware_load_commands.append(command)
in_flight = len(self.pending_firmware_load_commands)
if in_flight >= self.max_in_flight_firmware_load_commands:
logger.debug(f"max commands in flight reached [{in_flight}]")
self.can_send_firmware_load_command.clear()
async def send_firmware_data(self, data_type: int, data: bytes) -> None:
while data:
fragment_size = min(len(data), _MAX_FRAGMENT_SIZE)
fragment = data[:fragment_size]
data = data[fragment_size:]
await self.send_firmware_load_command(
Hci_Intel_Secure_Send_Command(data_type=data_type, data=fragment)
)
async def load_firmware(self) -> None:
self.host.ready = True self.host.ready = True
await self.host.send_command(HCI_Reset_Command(), check_result=True) device_info = await self.read_device_info()
await self.host.send_command( logger.debug(
Hci_Intel_DDC_Config_Write_Command( "device info: \n%s",
params=HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD "\n".join(
[
f" {value_type.name}: {value}"
for value_type, value in device_info.items()
]
),
)
# Check if the firmware is already loaded.
if (
device_info.get(ValueType.CURRENT_MODE_OF_OPERATION)
== ModeOfOperation.OPERATIONAL
):
logger.debug("firmware already loaded")
return
# We only support some platforms and variants.
hardware_info = device_info.get(ValueType.HARDWARE_INFO)
if hardware_info is None:
raise DriverError("hardware info missing")
if hardware_info.platform != HardwarePlatform.INTEL_37:
raise DriverError("hardware platform not supported")
if hardware_info.variant not in (
HardwareVariant.TYPHOON_PEAK,
HardwareVariant.GALE_PEAK,
):
raise DriverError("hardware variant not supported")
# Compute the firmware name.
if ValueType.CNVI not in device_info or ValueType.CNVR not in device_info:
raise DriverError("insufficient device info, missing CNVI or CNVR")
firmware_base_name = (
"ibt-"
f"{device_info[ValueType.CNVI]:04X}-"
f"{device_info[ValueType.CNVR]:04X}"
)
logger.debug(f"FW base name: {firmware_base_name}")
firmware_name = f"{firmware_base_name}.sfi"
firmware_path = _find_binary_path(firmware_name)
if not firmware_path:
logger.warning(f"Firmware file {firmware_name} not found")
logger.warning("See https://google.github.io/bumble/drivers/intel.html")
return None
logger.debug(f"loading firmware from {firmware_path}")
firmware_image = firmware_path.read_bytes()
engine_type = device_info.get(ValueType.SECURE_BOOT_ENGINE_TYPE)
if engine_type is None:
raise DriverError("secure boot engine type missing")
if engine_type not in _BOOT_PARAMS:
raise DriverError("secure boot engine type not supported")
boot_params = _BOOT_PARAMS[engine_type]
if len(firmware_image) < boot_params.write_offset:
raise DriverError("firmware image too small")
# Register to receive vendor events.
def on_vendor_event(event: hci.HCI_Vendor_Event):
logger.debug(f"vendor event: {event}")
event_type = event.parameters[0]
if event_type == 0x02:
# Boot event
logger.debug("boot complete")
self.reset_complete.set()
elif event_type == 0x06:
# Firmware load event
logger.debug("download complete")
self.firmware_load_complete.set()
else:
logger.debug(f"ignoring vendor event type {event_type}")
self.host.on("vendor_event", on_vendor_event)
# We need to temporarily intercept packets from the controller,
# because they are formatted as HCI event packets but are received
# on the ACL channel, so the host parser would get confused.
saved_on_packet = self.host.on_packet
self.host.on_packet = self.on_packet # type: ignore
self.firmware_load_complete.clear()
# Send the CSS header
data = firmware_image[
boot_params.css_header_offset : boot_params.css_header_offset
+ boot_params.css_header_size
]
await self.send_firmware_data(0x00, data)
# Send the PKI header
data = firmware_image[
boot_params.pki_offset : boot_params.pki_offset + boot_params.pki_size
]
await self.send_firmware_data(0x03, data)
# Send the Signature header
data = firmware_image[
boot_params.sig_offset : boot_params.sig_offset + boot_params.sig_size
]
await self.send_firmware_data(0x02, data)
# Send the rest of the image.
# The payload consists of command objects, which are sent when they add up
# to a multiple of 4 bytes.
boot_address = 0
offset = boot_params.write_offset
fragment_size = 0
while offset + 3 < len(firmware_image):
(command_opcode,) = struct.unpack_from(
"<H", firmware_image, offset + fragment_size
)
command_size = firmware_image[offset + fragment_size + 2]
if command_opcode == HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND:
(boot_address,) = struct.unpack_from(
"<I", firmware_image, offset + fragment_size + 3
)
logger.debug(
"found HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND, "
f"boot_address={boot_address}"
)
fragment_size += 3 + command_size
if fragment_size % 4 == 0:
await self.send_firmware_data(
0x01, firmware_image[offset : offset + fragment_size]
)
logger.debug(f"sent {fragment_size} bytes")
offset += fragment_size
fragment_size = 0
# Wait for the firmware loading to be complete.
logger.debug("waiting for firmware to be loaded")
await self.firmware_load_complete.wait()
logger.debug("firmware loaded")
# Restore the original packet handler.
self.host.on_packet = saved_on_packet # type: ignore
# Reset
self.reset_complete.clear()
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x00,
patch_enable=0x01,
ddc_reload=0x00,
boot_option=0x01,
boot_address=boot_address,
) )
) )
logger.debug("waiting for reset completion")
await self.reset_complete.wait()
logger.debug("reset complete")
# Load the device config if there is one.
if self.ddc_override:
logger.debug("loading overridden DDC")
await self.load_device_config(self.ddc_override)
else:
ddc_name = f"{firmware_base_name}.ddc"
ddc_path = _find_binary_path(ddc_name)
if ddc_path:
logger.debug(f"loading DDC from {ddc_path}")
ddc_data = ddc_path.read_bytes()
await self.load_device_config(ddc_data)
if self.ddc_addon:
logger.debug("loading DDC addon")
await self.load_device_config(self.ddc_addon)
async def load_device_config(self, ddc_data: bytes) -> None:
while ddc_data:
ddc_len = 1 + ddc_data[0]
ddc_payload = ddc_data[:ddc_len]
await self.host.send_command(
Hci_Intel_Write_Device_Config_Command(data=ddc_payload)
)
ddc_data = ddc_data[ddc_len:]
async def reboot_bootloader(self) -> None:
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x01,
patch_enable=0x01,
ddc_reload=0x01,
boot_option=0x00,
boot_address=0,
)
)
await asyncio.sleep(_POST_RESET_DELAY)
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response = await self.host.send_command(hci.HCI_Reset_Command())
if not (
isinstance(response, hci.HCI_Command_Complete_Event)
and response.return_parameters
in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS)
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
# HCI_UNKNOWN_HCI_COMMAND_ERROR is the expected response. Anything
# else is a failure.
logger.warning(f"unexpected response: {response}")
raise DriverError("unexpected HCI response")
# Read the firmware version.
response = await self.host.send_command(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if not isinstance(response, hci.HCI_Command_Complete_Event):
raise DriverError("unexpected HCI response")
if response.return_parameters.status != 0: # type: ignore
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
return dict(tlvs)
async def init_controller(self):
await self.load_firmware()

View File

@@ -5068,6 +5068,7 @@ class HCI_Event(HCI_Packet):
hci_packet_type = HCI_EVENT_PACKET hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {} event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {} event_classes: Dict[int, Type[HCI_Event]] = {}
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
@staticmethod @staticmethod
def event(fields=()): def event(fields=()):
@@ -5125,37 +5126,41 @@ class HCI_Event(HCI_Packet):
return event_class return event_class
@staticmethod @classmethod
def from_bytes(packet: bytes) -> HCI_Event: def from_bytes(cls, packet: bytes) -> HCI_Event:
event_code = packet[1] event_code = packet[1]
length = packet[2] length = packet[2]
parameters = packet[3:] parameters = packet[3:]
if len(parameters) != length: if len(parameters) != length:
raise InvalidPacketError('invalid packet length') raise InvalidPacketError('invalid packet length')
cls: Any subclass: Any
if event_code == HCI_LE_META_EVENT: if event_code == HCI_LE_META_EVENT:
# We do this dispatch here and not in the subclass in order to avoid call # We do this dispatch here and not in the subclass in order to avoid call
# loops # loops
subevent_code = parameters[0] subevent_code = parameters[0]
cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if cls is None: if subclass is None:
# No class registered, just use a generic class instance # No class registered, just use a generic class instance
return HCI_LE_Meta_Event(subevent_code, parameters) return HCI_LE_Meta_Event(subevent_code, parameters)
elif event_code == HCI_VENDOR_EVENT: elif event_code == HCI_VENDOR_EVENT:
subevent_code = parameters[0] # Invoke all the registered factories to see if any of them can handle
cls = HCI_Vendor_Event.subevent_classes.get(subevent_code) # the event
if cls is None: if cls.vendor_factory:
# No class registered, just use a generic class instance if event := cls.vendor_factory(parameters):
return HCI_Vendor_Event(subevent_code, parameters) return event
# No factory, or the factory could not create an instance,
# return a generic vendor event
return HCI_Event(event_code, parameters)
else: else:
cls = HCI_Event.event_classes.get(event_code) subclass = HCI_Event.event_classes.get(event_code)
if cls is None: if subclass is None:
# No class registered, just use a generic class instance # No class registered, just use a generic class instance
return HCI_Event(event_code, parameters) return HCI_Event(event_code, parameters)
# Invoke the factory to create a new instance # Invoke the factory to create a new instance
return cls.from_parameters(parameters) # type: ignore return subclass.from_parameters(parameters) # type: ignore
@classmethod @classmethod
def from_parameters(cls, parameters): def from_parameters(cls, parameters):
@@ -5198,11 +5203,11 @@ HCI_Event.register_events(globals())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_Extended_Event(HCI_Event): class HCI_Extended_Event(HCI_Event):
''' '''
HCI_Event subclass for events that has a subevent code. HCI_Event subclass for events that have a subevent code.
''' '''
subevent_names: Dict[int, str] = {} subevent_names: Dict[int, str] = {}
subevent_classes: Dict[int, Type[HCI_Extended_Event]] subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod @classmethod
def event(cls, fields=()): def event(cls, fields=()):
@@ -5253,7 +5258,22 @@ class HCI_Extended_Event(HCI_Event):
cls.subevent_names.update(cls.subevent_map(symbols)) cls.subevent_names.update(cls.subevent_map(symbols))
@classmethod @classmethod
def from_parameters(cls, parameters): def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
"""
Factory method that parses the subevent code, finds a registered subclass,
and creates an instance if found.
"""
subevent_code = parameters[0]
if subclass := cls.subevent_classes.get(subevent_code):
return subclass.from_parameters(parameters)
return None
@classmethod
def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event:
"""Factory method for subclasses (the subevent code has already been parsed)"""
self = cls.__new__(cls) self = cls.__new__(cls)
HCI_Extended_Event.__init__(self, self.subevent_code, parameters) HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):
@@ -5294,12 +5314,6 @@ class HCI_LE_Meta_Event(HCI_Extended_Event):
HCI_LE_Meta_Event.register_subevents(globals()) HCI_LE_Meta_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
class HCI_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes = {}
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event( @HCI_LE_Meta_Event.event(
[ [
@@ -6173,8 +6187,9 @@ class HCI_Command_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.14 Command Complete Event See Bluetooth spec @ 7.7.14 Command Complete Event
''' '''
return_parameters = b'' num_hci_command_packets: int
command_opcode: int command_opcode: int
return_parameters = b''
def map_return_parameters(self, return_parameters): def map_return_parameters(self, return_parameters):
'''Map simple 'status' return parameters to their named constant form''' '''Map simple 'status' return parameters to their named constant form'''
@@ -6710,6 +6725,14 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
''' '''
# -----------------------------------------------------------------------------
@HCI_Event.event([('data', "*")])
class HCI_Vendor_Event(HCI_Event):
'''
See Bluetooth spec @ 5.4.4 HCI Event packet
'''
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_AclDataPacket(HCI_Packet): class HCI_AclDataPacket(HCI_Packet):
''' '''

View File

@@ -552,7 +552,7 @@ class Host(AbortableEventEmitter):
return response return response
except Exception as error: except Exception as error:
logger.warning( logger.exception(
f'{color("!!! Exception while sending command:", "red")} {error}' f'{color("!!! Exception while sending command:", "red")} {error}'
) )
raise error raise error
@@ -1248,3 +1248,6 @@ class Host(AbortableEventEmitter):
event.connection_handle, event.connection_handle,
int.from_bytes(event.le_features, 'little'), int.from_bytes(event.le_features, 'little'),
) )
def on_hci_vendor_event(self, event):
self.emit('vendor_event', event)

View File

@@ -149,7 +149,10 @@ async def open_usb_transport(spec: str) -> Transport:
if status != usb1.TRANSFER_COMPLETED: if status != usb1.TRANSFER_COMPLETED:
logger.warning( logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red') color(
f'!!! OUT transfer not completed: status={status}',
'red',
)
) )
async def process_queue(self): async def process_queue(self):
@@ -275,7 +278,10 @@ async def open_usb_transport(spec: str) -> Transport:
) )
else: else:
logger.warning( logger.warning(
color(f'!!! IN transfer not completed: status={status}', 'red') color(
f'!!! IN[{packet_type}] transfer not completed: status={status}',
'red',
)
) )
self.loop.call_soon_threadsafe(self.on_transport_lost) self.loop.call_soon_threadsafe(self.on_transport_lost)

View File

@@ -16,6 +16,7 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import struct import struct
from typing import Dict, Optional, Type
from bumble.hci import ( from bumble.hci import (
name_or_number, name_or_number,
@@ -24,7 +25,9 @@ from bumble.hci import (
HCI_Constant, HCI_Constant,
HCI_Object, HCI_Object,
HCI_Command, HCI_Command,
HCI_Vendor_Event, HCI_Event,
HCI_Extended_Event,
HCI_VENDOR_EVENT,
STATUS_SPEC, STATUS_SPEC,
) )
@@ -48,7 +51,6 @@ HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F)
HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58 HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58
HCI_Command.register_commands(globals()) HCI_Command.register_commands(globals())
HCI_Vendor_Event.register_subevents(globals())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -279,7 +281,29 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Vendor_Event.event( class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
subevent_code = parameters[0]
if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT:
quality_report_id = parameters[1]
if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09):
return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters)
return None
HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters
# -----------------------------------------------------------------------------
@HCI_Extended_Event.event(
fields=[ fields=[
('quality_report_id', 1), ('quality_report_id', 1),
('packet_types', 1), ('packet_types', 1),
@@ -308,10 +332,11 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
('tx_last_subevent_packets', 4), ('tx_last_subevent_packets', 4),
('crc_error_packets', 4), ('crc_error_packets', 4),
('rx_duplicate_packets', 4), ('rx_duplicate_packets', 4),
('rx_unreceived_packets', 4),
('vendor_specific_parameters', '*'), ('vendor_specific_parameters', '*'),
] ]
) )
class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event): class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event):
# pylint: disable=line-too-long # pylint: disable=line-too-long
''' '''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event

View File

@@ -16,4 +16,5 @@ USB vendor ID and product ID.
Drivers included in the module are: Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles. * [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Intel](intel.md): Loading of Firmware and Config for Intel USB controllers.

View File

@@ -0,0 +1,73 @@
INTEL DRIVER
==============
This driver supports loading firmware images and optional config data to
Intel USB controllers.
A number of USB dongles are supported, but likely not all.
The initial implementation has been tested on BE200 and AX210 controllers.
When using a USB controller, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=intel`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for the firmware and config files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR`
if set.
* The directory `<package-dir>/drivers/intel_fw` where `<package-dir>` is the directory
where the `bumble` package is installed.
* The current directory.
It is also possible to override or extend the config data with parameters passed via the
transport name. The driver name `intel` may be suffixed with `/<param:value>[+<param:value>]...`
The supported params are:
* `ddc_addon`: configuration data to add to the data loaded from the config data file
* `ddc_override`: configuration data to use instead of the data loaded from the config data file.
With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing
the config data (same format as the config file).
Example transport name:
`usb:[driver=intel/dcc_addon:03E40200]0`
Obtaining Firmware Images and Config Data
-----------------------------------------
Firmware images and config data may be obtained from a variety of online
sources.
To facilitate finding a downloading the, the utility program `bumble-intel-fw-download`
may be used.
```
Usage: bumble-intel-fw-download [OPTIONS]
Download Intel firmware images and configs.
Options:
--output-dir TEXT Output directory where the files will be saved.
Defaults to the OS-specific app data dir, which the
driver will check when trying to find firmware
--source [linux-kernel] [default: linux-kernel]
--single TEXT Only download a single image set, by its base name
--force Overwrite files if they already exist
--help Show this message and exit.
```
Utility
-------
The `bumble-intel-util` utility may be used to interact with an Intel USB controller.
```
Usage: bumble-intel-util [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
bootloader Reboot in bootloader mode.
info Get the firmware info.
load Load a firmware image.
```

View File

@@ -75,6 +75,8 @@ console_scripts =
bumble-pandora-server = bumble.apps.pandora_server:main bumble-pandora-server = bumble.apps.pandora_server:main
bumble-rtk-util = bumble.tools.rtk_util:main bumble-rtk-util = bumble.tools.rtk_util:main
bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main
bumble-intel-util = bumble.tools.intel_util:main
bumble-intel-fw-download = bumble.tools.intel_fw_download:main
[options.package_data] [options.package_data]
* = py.typed, *.pyi * = py.typed, *.pyi

130
tools/intel_fw_download.py Normal file
View File

@@ -0,0 +1,130 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import pathlib
import urllib.request
import urllib.error
import click
from bumble.colors import color
from bumble.drivers import intel
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/intel"
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def download_file(base_url, name):
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
# -----------------------------------------------------------------------------
@click.command
@click.option(
"--output-dir",
default="",
help="Output directory where the files will be saved. Defaults to the OS-specific"
"app data dir, which the driver will check when trying to find firmware",
show_default=True,
)
@click.option(
"--source",
type=click.Choice(["linux-kernel"]),
default="linux-kernel",
show_default=True,
)
@click.option("--single", help="Only download a single image set, by its base name")
@click.option("--force", is_flag=True, help="Overwrite files if they already exist")
def main(output_dir, source, single, force):
"""Download Intel firmware images and configs."""
# Check that the output dir exists
if output_dir == '':
output_dir = intel.intel_firmware_dir()
else:
output_dir = pathlib.Path(output_dir)
if not output_dir.is_dir():
print("Output dir does not exist or is not a directory")
return
base_url = {
"linux-kernel": LINUX_KERNEL_GIT_SOURCE,
}[source]
print("Downloading")
print(color("FROM:", "green"), base_url)
print(color("TO:", "green"), output_dir)
if single:
images = [(f"{single}.sfi", f"{single}.ddc")]
else:
images = [
(f"{base_name}.sfi", f"{base_name}.ddc")
for base_name in intel.INTEL_FW_IMAGE_NAMES
]
for fw_name, config_name in images:
print(color("---", "yellow"))
fw_image_out = output_dir / fw_name
if not force and fw_image_out.exists():
print(color(f"{fw_image_out} already exists, skipping", "red"))
continue
if config_name:
config_image_out = output_dir / config_name
if not force and config_image_out.exists():
print(color("f{config_image_out} already exists, skipping", "red"))
continue
try:
fw_image = download_file(base_url, fw_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {fw_name}: {error}")
continue
config_image = None
if config_name:
try:
config_image = download_file(base_url, config_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {config_name}: {error}")
continue
fw_image_out.write_bytes(fw_image)
if config_image:
config_image_out.write_bytes(config_image)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()

154
tools/intel_util.py Normal file
View File

@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
from typing import Any, Optional
import click
from bumble.colors import color
from bumble import transport
from bumble.drivers import intel
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def print_device_info(device_info: dict[intel.ValueType, Any]) -> None:
if (mode := device_info.get(intel.ValueType.CURRENT_MODE_OF_OPERATION)) is not None:
print(
color("MODE:", "yellow"),
mode.name,
)
print(color("DETAILS:", "yellow"))
for key, value in device_info.items():
print(f" {color(key.name, 'green')}: {value}")
# -----------------------------------------------------------------------------
async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]:
# Create a driver
driver = await intel.Driver.for_host(host, force)
if driver is None:
print("Device does not appear to be an Intel device")
return None
return driver
# -----------------------------------------------------------------------------
async def do_info(usb_transport, force):
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_load(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.load_firmware()
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_bootloader(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.reboot_bootloader()
# -----------------------------------------------------------------------------
@click.group()
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Try to get the device info even if the USB info doesn't match",
)
def info(usb_transport, force):
"""Get the firmware info."""
asyncio.run(do_info(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Load even if the USB info doesn't match",
)
def load(usb_transport, force):
"""Load a firmware image."""
asyncio.run(do_load(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Attempt to reboot event if the USB info doesn't match",
)
def bootloader(usb_transport, force):
"""Reboot in bootloader mode."""
asyncio.run(do_bootloader(usb_transport, force))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()