diff --git a/apps/bench.py b/apps/bench.py index 6e6df3a0..67299dbc 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -199,7 +199,7 @@ def log_stats(title, stats, precision=2): stats_min = min(stats) stats_max = max(stats) stats_avg = statistics.mean(stats) - stats_stdev = statistics.stdev(stats) + stats_stdev = statistics.stdev(stats) if len(stats) >= 2 else 0 logging.info( color( ( @@ -468,6 +468,7 @@ class Ping: for run in range(self.repeat + 1): self.done.clear() + self.ping_times = [] if run > 0 and self.repeat and self.repeat_delay: logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green')) diff --git a/apps/show.py b/apps/show.py index 97640a37..8c386817 100644 --- a/apps/show.py +++ b/apps/show.py @@ -144,18 +144,18 @@ class Printer: help='Format of the input file', ) @click.option( - '--vendors', + '--vendor', type=click.Choice(['android', 'zephyr']), multiple=True, help='Support vendor-specific commands (list one or more)', ) @click.argument('filename') # pylint: disable=redefined-builtin -def main(format, vendors, filename): - for vendor in vendors: - if vendor == 'android': +def main(format, vendor, filename): + for vendor_name in vendor: + if vendor_name == 'android': import bumble.vendor.android.hci - elif vendor == 'zephyr': + elif vendor_name == 'zephyr': import bumble.vendor.zephyr.hci input = open(filename, 'rb') @@ -180,7 +180,7 @@ def main(format, vendors, filename): else: printer.print(color("[TRUNCATED]", "red")) except Exception as error: - logger.exception() + logger.exception('') print(color(f'!!! {error}', 'red')) diff --git a/bumble/drivers/common.py b/bumble/drivers/common.py index a4c0427c..42a545c4 100644 --- a/bumble/drivers/common.py +++ b/bumble/drivers/common.py @@ -20,6 +20,8 @@ Common types for drivers. # ----------------------------------------------------------------------------- import abc +from bumble import core + # ----------------------------------------------------------------------------- # Classes diff --git a/bumble/drivers/intel.py b/bumble/drivers/intel.py index e613c1e5..cdb7e72e 100644 --- a/bumble/drivers/intel.py +++ b/bumble/drivers/intel.py @@ -11,18 +11,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +Support for Intel USB controllers. +Loosely based on the Fuchsia OS implementation. +""" # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations +import asyncio +import collections +import dataclasses import logging +import os +import pathlib +import platform +import struct +from typing import Any, Deque, Optional, TYPE_CHECKING +from bumble import core from bumble.drivers import common -from bumble.hci import ( - hci_vendor_command_op_code, # type: ignore - HCI_Command, - HCI_Reset_Command, -) +from bumble import hci +from bumble import utils + +if TYPE_CHECKING: + from bumble.host import Host + # ----------------------------------------------------------------------------- # Logging @@ -34,39 +49,328 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- INTEL_USB_PRODUCTS = { - # Intel AX210 - (0x8087, 0x0032), - # Intel BE200 - (0x8087, 0x0036), + (0x8087, 0x0032), # AX210 + (0x8087, 0x0036), # BE200 } +INTEL_FW_IMAGE_NAMES = [ + "ibt-0040-0041", + "ibt-0040-1020", + "ibt-0040-1050", + "ibt-0040-2120", + "ibt-0040-4150", + "ibt-0041-0041", + "ibt-0180-0041", + "ibt-0180-1050", + "ibt-0180-4150", + "ibt-0291-0291", + "ibt-1040-0041", + "ibt-1040-1020", + "ibt-1040-1050", + "ibt-1040-2120", + "ibt-1040-4150", +] + +INTEL_FIRMWARE_DIR_ENV = "BUMBLE_INTEL_FIRMWARE_DIR" +INTEL_LINUX_FIRMWARE_DIR = "/lib/firmware/intel" + +_MAX_FRAGMENT_SIZE = 252 +_POST_RESET_DELAY = 0.2 + # ----------------------------------------------------------------------------- # HCI Commands # ----------------------------------------------------------------------------- -HCI_INTEL_DDC_CONFIG_WRITE_COMMAND = hci_vendor_command_op_code(0xFC8B) # type: ignore -HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD = [0x03, 0xE4, 0x02, 0x00] +HCI_INTEL_WRITE_DEVICE_CONFIG_COMMAND = hci.hci_vendor_command_op_code(0x008B) +HCI_INTEL_READ_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x0005) +HCI_INTEL_RESET_COMMAND = hci.hci_vendor_command_op_code(0x0001) +HCI_INTEL_SECURE_SEND_COMMAND = hci.hci_vendor_command_op_code(0x0009) +HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E) -HCI_Command.register_commands(globals()) +hci.HCI_Command.register_commands(globals()) -@HCI_Command.command( # type: ignore - fields=[("params", "*")], +@hci.HCI_Command.command( + fields=[ + ("param0", 1), + ], return_parameters_fields=[ - ("params", "*"), + ("status", hci.STATUS_SPEC), + ("tlv", "*"), ], ) -class Hci_Intel_DDC_Config_Write_Command(HCI_Command): +class HCI_Intel_Read_Version_Command(hci.HCI_Command): pass +@hci.HCI_Command.command( + fields=[("data_type", 1), ("data", "*")], + return_parameters_fields=[ + ("status", 1), + ], +) +class Hci_Intel_Secure_Send_Command(hci.HCI_Command): + pass + + +@hci.HCI_Command.command( + fields=[ + ("reset_type", 1), + ("patch_enable", 1), + ("ddc_reload", 1), + ("boot_option", 1), + ("boot_address", 4), + ], + return_parameters_fields=[ + ("data", "*"), + ], +) +class HCI_Intel_Reset_Command(hci.HCI_Command): + pass + + +@hci.HCI_Command.command( + fields=[("data", "*")], + return_parameters_fields=[ + ("status", hci.STATUS_SPEC), + ("params", "*"), + ], +) +class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command): + pass + + +# ----------------------------------------------------------------------------- +# Functions +# ----------------------------------------------------------------------------- +def intel_firmware_dir() -> pathlib.Path: + """ + Returns: + A path to a subdir of the project data dir for Intel firmware. + The directory is created if it doesn't exist. + """ + from bumble.drivers import project_data_dir + + p = project_data_dir() / "firmware" / "intel" + p.mkdir(parents=True, exist_ok=True) + return p + + +def _find_binary_path(file_name: str) -> pathlib.Path | None: + # First check if an environment variable is set + if INTEL_FIRMWARE_DIR_ENV in os.environ: + if ( + path := pathlib.Path(os.environ[INTEL_FIRMWARE_DIR_ENV]) / file_name + ).is_file(): + logger.debug(f"{file_name} found in env dir") + return path + + # When the environment variable is set, don't look elsewhere + return None + + # Then, look where the firmware download tool writes by default + if (path := intel_firmware_dir() / file_name).is_file(): + logger.debug(f"{file_name} found in project data dir") + return path + + # Then, look in the package's driver directory + if (path := pathlib.Path(__file__).parent / "intel_fw" / file_name).is_file(): + logger.debug(f"{file_name} found in package dir") + return path + + # On Linux, check the system's FW directory + if ( + platform.system() == "Linux" + and (path := pathlib.Path(INTEL_LINUX_FIRMWARE_DIR) / file_name).is_file() + ): + logger.debug(f"{file_name} found in Linux system FW dir") + return path + + # Finally look in the current directory + if (path := pathlib.Path.cwd() / file_name).is_file(): + logger.debug(f"{file_name} found in CWD") + return path + + return None + + +def _parse_tlv(data: bytes) -> list[tuple[ValueType, Any]]: + result: list[tuple[ValueType, Any]] = [] + while len(data) >= 2: + value_type = ValueType(data[0]) + value_length = data[1] + value = data[2 : 2 + value_length] + typed_value: Any + + if value_type == ValueType.END: + break + + if value_type in (ValueType.CNVI, ValueType.CNVR): + (v,) = struct.unpack("> 0) & 0xF) << 12) + | (((v >> 4) & 0xF) << 0) + | (((v >> 8) & 0xF) << 4) + | (((v >> 24) & 0xF) << 8) + ) + elif value_type == ValueType.HARDWARE_INFO: + (v,) = struct.unpack("> 8) & 0xFF), HardwareVariant((v >> 16) & 0x3F) + ) + elif value_type in ( + ValueType.USB_VENDOR_ID, + ValueType.USB_PRODUCT_ID, + ValueType.DEVICE_REVISION, + ): + (typed_value,) = struct.unpack(" None: + super().__init__(message) + self.message = message + + def __str__(self) -> str: + return f"IntelDriverError({self.message})" + + +class ValueType(utils.OpenIntEnum): + END = 0x00 + CNVI = 0x10 + CNVR = 0x11 + HARDWARE_INFO = 0x12 + DEVICE_REVISION = 0x16 + CURRENT_MODE_OF_OPERATION = 0x1C + USB_VENDOR_ID = 0x17 + USB_PRODUCT_ID = 0x18 + TIMESTAMP = 0x1D + BUILD_TYPE = 0x1E + BUILD_NUMBER = 0x1F + SECURE_BOOT = 0x28 + OTP_LOCK = 0x2A + API_LOCK = 0x2B + DEBUG_LOCK = 0x2C + FIRMWARE_BUILD = 0x2D + SECURE_BOOT_ENGINE_TYPE = 0x2F + BLUETOOTH_ADDRESS = 0x30 + + +class HardwarePlatform(utils.OpenIntEnum): + INTEL_37 = 0x37 + + +class HardwareVariant(utils.OpenIntEnum): + # This is a just a partial list. + # Add other constants here as new hardware is encountered and tested. + TYPHOON_PEAK = 0x17 + GALE_PEAK = 0x1C + + +@dataclasses.dataclass +class HardwareInfo: + platform: HardwarePlatform + variant: HardwareVariant + + +@dataclasses.dataclass +class Timestamp: + week: int + year: int + + +@dataclasses.dataclass +class FirmwareBuild: + build_number: int + timestamp: Timestamp + + +class ModeOfOperation(utils.OpenIntEnum): + BOOTLOADER = 0x01 + INTERMEDIATE = 0x02 + OPERATIONAL = 0x03 + + +class SecureBootEngineType(utils.OpenIntEnum): + RSA = 0x00 + ECDSA = 0x01 + + +@dataclasses.dataclass +class BootParams: + css_header_offset: int + css_header_size: int + pki_offset: int + pki_size: int + sig_offset: int + sig_size: int + write_offset: int + + +_BOOT_PARAMS = { + SecureBootEngineType.RSA: BootParams(0, 128, 128, 256, 388, 256, 964), + SecureBootEngineType.ECDSA: BootParams(644, 128, 772, 96, 868, 96, 964), +} + + class Driver(common.Driver): - def __init__(self, host): + def __init__(self, host: Host) -> None: self.host = host + self.max_in_flight_firmware_load_commands = 1 + self.pending_firmware_load_commands: Deque[hci.HCI_Command] = ( + collections.deque() + ) + self.can_send_firmware_load_command = asyncio.Event() + self.can_send_firmware_load_command.set() + self.firmware_load_complete = asyncio.Event() + self.reset_complete = asyncio.Event() + + # Parse configuration options from the driver name. + self.ddc_addon: Optional[bytes] = None + self.ddc_override: Optional[bytes] = None + driver = host.hci_metadata.get("driver") + if driver is not None and driver.startswith("intel/"): + for key, value in [ + key_eq_value.split(":") for key_eq_value in driver[6:].split("+") + ]: + if key == "ddc_addon": + self.ddc_addon = bytes.fromhex(value) + elif key == "ddc_override": + self.ddc_override = bytes.fromhex(value) @staticmethod - def check(host): + def check(host: Host) -> bool: driver = host.hci_metadata.get("driver") - if driver == "intel": + if driver == "intel" or driver is not None and driver.startswith("intel/"): return True vendor_id = host.hci_metadata.get("vendor_id") @@ -85,18 +389,283 @@ class Driver(common.Driver): return True @classmethod - async def for_host(cls, host, force=False): # type: ignore + async def for_host(cls, host: Host, force: bool = False): # Only instantiate this driver if explicitly selected if not force and not cls.check(host): return None return cls(host) - async def init_controller(self): + def on_packet(self, packet: bytes) -> None: + """Handler for event packets that are received from an ACL channel""" + event = hci.HCI_Event.from_bytes(packet) + + if not isinstance(event, hci.HCI_Command_Complete_Event): + self.host.on_hci_event_packet(event) + return + + if not event.return_parameters == hci.HCI_SUCCESS: + raise DriverError("HCI_Command_Complete_Event error") + + if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets: + logger.debug( + "max_in_flight_firmware_load_commands update: " + f"{event.num_hci_command_packets}" + ) + self.max_in_flight_firmware_load_commands = event.num_hci_command_packets + logger.debug(f"event: {event}") + self.pending_firmware_load_commands.popleft() + in_flight = len(self.pending_firmware_load_commands) + logger.debug(f"event received, {in_flight} still in flight") + if in_flight < self.max_in_flight_firmware_load_commands: + self.can_send_firmware_load_command.set() + + async def send_firmware_load_command(self, command: hci.HCI_Command) -> None: + # Wait until we can send. + await self.can_send_firmware_load_command.wait() + + # Send the command and adjust counters. + self.host.send_hci_packet(command) + self.pending_firmware_load_commands.append(command) + in_flight = len(self.pending_firmware_load_commands) + if in_flight >= self.max_in_flight_firmware_load_commands: + logger.debug(f"max commands in flight reached [{in_flight}]") + self.can_send_firmware_load_command.clear() + + async def send_firmware_data(self, data_type: int, data: bytes) -> None: + while data: + fragment_size = min(len(data), _MAX_FRAGMENT_SIZE) + fragment = data[:fragment_size] + data = data[fragment_size:] + + await self.send_firmware_load_command( + Hci_Intel_Secure_Send_Command(data_type=data_type, data=fragment) + ) + + async def load_firmware(self) -> None: self.host.ready = True - await self.host.send_command(HCI_Reset_Command(), check_result=True) - await self.host.send_command( - Hci_Intel_DDC_Config_Write_Command( - params=HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD + device_info = await self.read_device_info() + logger.debug( + "device info: \n%s", + "\n".join( + [ + f" {value_type.name}: {value}" + for value_type, value in device_info.items() + ] + ), + ) + + # Check if the firmware is already loaded. + if ( + device_info.get(ValueType.CURRENT_MODE_OF_OPERATION) + == ModeOfOperation.OPERATIONAL + ): + logger.debug("firmware already loaded") + return + + # We only support some platforms and variants. + hardware_info = device_info.get(ValueType.HARDWARE_INFO) + if hardware_info is None: + raise DriverError("hardware info missing") + if hardware_info.platform != HardwarePlatform.INTEL_37: + raise DriverError("hardware platform not supported") + if hardware_info.variant not in ( + HardwareVariant.TYPHOON_PEAK, + HardwareVariant.GALE_PEAK, + ): + raise DriverError("hardware variant not supported") + + # Compute the firmware name. + if ValueType.CNVI not in device_info or ValueType.CNVR not in device_info: + raise DriverError("insufficient device info, missing CNVI or CNVR") + + firmware_base_name = ( + "ibt-" + f"{device_info[ValueType.CNVI]:04X}-" + f"{device_info[ValueType.CNVR]:04X}" + ) + logger.debug(f"FW base name: {firmware_base_name}") + + firmware_name = f"{firmware_base_name}.sfi" + firmware_path = _find_binary_path(firmware_name) + if not firmware_path: + logger.warning(f"Firmware file {firmware_name} not found") + logger.warning("See https://google.github.io/bumble/drivers/intel.html") + return None + logger.debug(f"loading firmware from {firmware_path}") + firmware_image = firmware_path.read_bytes() + + engine_type = device_info.get(ValueType.SECURE_BOOT_ENGINE_TYPE) + if engine_type is None: + raise DriverError("secure boot engine type missing") + if engine_type not in _BOOT_PARAMS: + raise DriverError("secure boot engine type not supported") + + boot_params = _BOOT_PARAMS[engine_type] + if len(firmware_image) < boot_params.write_offset: + raise DriverError("firmware image too small") + + # Register to receive vendor events. + def on_vendor_event(event: hci.HCI_Vendor_Event): + logger.debug(f"vendor event: {event}") + event_type = event.parameters[0] + if event_type == 0x02: + # Boot event + logger.debug("boot complete") + self.reset_complete.set() + elif event_type == 0x06: + # Firmware load event + logger.debug("download complete") + self.firmware_load_complete.set() + else: + logger.debug(f"ignoring vendor event type {event_type}") + + self.host.on("vendor_event", on_vendor_event) + + # We need to temporarily intercept packets from the controller, + # because they are formatted as HCI event packets but are received + # on the ACL channel, so the host parser would get confused. + saved_on_packet = self.host.on_packet + self.host.on_packet = self.on_packet # type: ignore + self.firmware_load_complete.clear() + + # Send the CSS header + data = firmware_image[ + boot_params.css_header_offset : boot_params.css_header_offset + + boot_params.css_header_size + ] + await self.send_firmware_data(0x00, data) + + # Send the PKI header + data = firmware_image[ + boot_params.pki_offset : boot_params.pki_offset + boot_params.pki_size + ] + await self.send_firmware_data(0x03, data) + + # Send the Signature header + data = firmware_image[ + boot_params.sig_offset : boot_params.sig_offset + boot_params.sig_size + ] + await self.send_firmware_data(0x02, data) + + # Send the rest of the image. + # The payload consists of command objects, which are sent when they add up + # to a multiple of 4 bytes. + boot_address = 0 + offset = boot_params.write_offset + fragment_size = 0 + while offset + 3 < len(firmware_image): + (command_opcode,) = struct.unpack_from( + " None: + while ddc_data: + ddc_len = 1 + ddc_data[0] + ddc_payload = ddc_data[:ddc_len] + await self.host.send_command( + Hci_Intel_Write_Device_Config_Command(data=ddc_payload) + ) + ddc_data = ddc_data[ddc_len:] + + async def reboot_bootloader(self) -> None: + self.host.send_hci_packet( + HCI_Intel_Reset_Command( + reset_type=0x01, + patch_enable=0x01, + ddc_reload=0x01, + boot_option=0x00, + boot_address=0, + ) + ) + await asyncio.sleep(_POST_RESET_DELAY) + + async def read_device_info(self) -> dict[ValueType, Any]: + self.host.ready = True + response = await self.host.send_command(hci.HCI_Reset_Command()) + if not ( + isinstance(response, hci.HCI_Command_Complete_Event) + and response.return_parameters + in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS) + ): + # When the controller is in operational mode, the response is a + # successful response. + # When the controller is in bootloader mode, + # HCI_UNKNOWN_HCI_COMMAND_ERROR is the expected response. Anything + # else is a failure. + logger.warning(f"unexpected response: {response}") + raise DriverError("unexpected HCI response") + + # Read the firmware version. + response = await self.host.send_command( + HCI_Intel_Read_Version_Command(param0=0xFF) + ) + if not isinstance(response, hci.HCI_Command_Complete_Event): + raise DriverError("unexpected HCI response") + + if response.return_parameters.status != 0: # type: ignore + raise DriverError("HCI_Intel_Read_Version_Command error") + + tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore + + # Convert the list to a dict. That's Ok here because we only expect each type + # to appear just once. + return dict(tlvs) + + async def init_controller(self): + await self.load_firmware() diff --git a/bumble/hci.py b/bumble/hci.py index 1f1aa2ab..24f91fa4 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -5068,6 +5068,7 @@ class HCI_Event(HCI_Packet): hci_packet_type = HCI_EVENT_PACKET event_names: Dict[int, str] = {} event_classes: Dict[int, Type[HCI_Event]] = {} + vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None @staticmethod def event(fields=()): @@ -5125,37 +5126,41 @@ class HCI_Event(HCI_Packet): return event_class - @staticmethod - def from_bytes(packet: bytes) -> HCI_Event: + @classmethod + def from_bytes(cls, packet: bytes) -> HCI_Event: event_code = packet[1] length = packet[2] parameters = packet[3:] if len(parameters) != length: raise InvalidPacketError('invalid packet length') - cls: Any + subclass: Any if event_code == HCI_LE_META_EVENT: # We do this dispatch here and not in the subclass in order to avoid call # loops subevent_code = parameters[0] - cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) - if cls is None: + subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) + if subclass is None: # No class registered, just use a generic class instance return HCI_LE_Meta_Event(subevent_code, parameters) elif event_code == HCI_VENDOR_EVENT: - subevent_code = parameters[0] - cls = HCI_Vendor_Event.subevent_classes.get(subevent_code) - if cls is None: - # No class registered, just use a generic class instance - return HCI_Vendor_Event(subevent_code, parameters) + # Invoke all the registered factories to see if any of them can handle + # the event + if cls.vendor_factory: + if event := cls.vendor_factory(parameters): + return event + + # No factory, or the factory could not create an instance, + # return a generic vendor event + return HCI_Event(event_code, parameters) else: - cls = HCI_Event.event_classes.get(event_code) - if cls is None: + subclass = HCI_Event.event_classes.get(event_code) + if subclass is None: # No class registered, just use a generic class instance return HCI_Event(event_code, parameters) # Invoke the factory to create a new instance - return cls.from_parameters(parameters) # type: ignore + return subclass.from_parameters(parameters) # type: ignore @classmethod def from_parameters(cls, parameters): @@ -5198,11 +5203,11 @@ HCI_Event.register_events(globals()) # ----------------------------------------------------------------------------- class HCI_Extended_Event(HCI_Event): ''' - HCI_Event subclass for events that has a subevent code. + HCI_Event subclass for events that have a subevent code. ''' subevent_names: Dict[int, str] = {} - subevent_classes: Dict[int, Type[HCI_Extended_Event]] + subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {} @classmethod def event(cls, fields=()): @@ -5253,7 +5258,22 @@ class HCI_Extended_Event(HCI_Event): cls.subevent_names.update(cls.subevent_map(symbols)) @classmethod - def from_parameters(cls, parameters): + def subclass_from_parameters( + cls, parameters: bytes + ) -> Optional[HCI_Extended_Event]: + """ + Factory method that parses the subevent code, finds a registered subclass, + and creates an instance if found. + """ + subevent_code = parameters[0] + if subclass := cls.subevent_classes.get(subevent_code): + return subclass.from_parameters(parameters) + + return None + + @classmethod + def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event: + """Factory method for subclasses (the subevent code has already been parsed)""" self = cls.__new__(cls) HCI_Extended_Event.__init__(self, self.subevent_code, parameters) if fields := getattr(self, 'fields', None): @@ -5294,12 +5314,6 @@ class HCI_LE_Meta_Event(HCI_Extended_Event): HCI_LE_Meta_Event.register_subevents(globals()) -# ----------------------------------------------------------------------------- -class HCI_Vendor_Event(HCI_Extended_Event): - event_code: int = HCI_VENDOR_EVENT - subevent_classes = {} - - # ----------------------------------------------------------------------------- @HCI_LE_Meta_Event.event( [ @@ -6173,8 +6187,9 @@ class HCI_Command_Complete_Event(HCI_Event): See Bluetooth spec @ 7.7.14 Command Complete Event ''' - return_parameters = b'' + num_hci_command_packets: int command_opcode: int + return_parameters = b'' def map_return_parameters(self, return_parameters): '''Map simple 'status' return parameters to their named constant form''' @@ -6710,6 +6725,14 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event): ''' +# ----------------------------------------------------------------------------- +@HCI_Event.event([('data', "*")]) +class HCI_Vendor_Event(HCI_Event): + ''' + See Bluetooth spec @ 5.4.4 HCI Event packet + ''' + + # ----------------------------------------------------------------------------- class HCI_AclDataPacket(HCI_Packet): ''' diff --git a/bumble/host.py b/bumble/host.py index 7f4146d9..57d05fa6 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -552,7 +552,7 @@ class Host(AbortableEventEmitter): return response except Exception as error: - logger.warning( + logger.exception( f'{color("!!! Exception while sending command:", "red")} {error}' ) raise error @@ -1248,3 +1248,6 @@ class Host(AbortableEventEmitter): event.connection_handle, int.from_bytes(event.le_features, 'little'), ) + + def on_hci_vendor_event(self, event): + self.emit('vendor_event', event) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index 0b865cfa..8bfd8b61 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -149,7 +149,10 @@ async def open_usb_transport(spec: str) -> Transport: if status != usb1.TRANSFER_COMPLETED: logger.warning( - color(f'!!! OUT transfer not completed: status={status}', 'red') + color( + f'!!! OUT transfer not completed: status={status}', + 'red', + ) ) async def process_queue(self): @@ -275,7 +278,10 @@ async def open_usb_transport(spec: str) -> Transport: ) else: logger.warning( - color(f'!!! IN transfer not completed: status={status}', 'red') + color( + f'!!! IN[{packet_type}] transfer not completed: status={status}', + 'red', + ) ) self.loop.call_soon_threadsafe(self.on_transport_lost) diff --git a/bumble/vendor/android/hci.py b/bumble/vendor/android/hci.py index c411ecf3..0aaa23ae 100644 --- a/bumble/vendor/android/hci.py +++ b/bumble/vendor/android/hci.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- import struct +from typing import Dict, Optional, Type from bumble.hci import ( name_or_number, @@ -24,7 +25,9 @@ from bumble.hci import ( HCI_Constant, HCI_Object, HCI_Command, - HCI_Vendor_Event, + HCI_Event, + HCI_Extended_Event, + HCI_VENDOR_EVENT, STATUS_SPEC, ) @@ -48,7 +51,6 @@ HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F) HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58 HCI_Command.register_commands(globals()) -HCI_Vendor_Event.register_subevents(globals()) # ----------------------------------------------------------------------------- @@ -279,7 +281,29 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command): # ----------------------------------------------------------------------------- -@HCI_Vendor_Event.event( +class HCI_Android_Vendor_Event(HCI_Extended_Event): + event_code: int = HCI_VENDOR_EVENT + subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {} + + @classmethod + def subclass_from_parameters( + cls, parameters: bytes + ) -> Optional[HCI_Extended_Event]: + subevent_code = parameters[0] + if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT: + quality_report_id = parameters[1] + if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09): + return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters) + + return None + + +HCI_Android_Vendor_Event.register_subevents(globals()) +HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters + + +# ----------------------------------------------------------------------------- +@HCI_Extended_Event.event( fields=[ ('quality_report_id', 1), ('packet_types', 1), @@ -308,10 +332,11 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command): ('tx_last_subevent_packets', 4), ('crc_error_packets', 4), ('rx_duplicate_packets', 4), + ('rx_unreceived_packets', 4), ('vendor_specific_parameters', '*'), ] ) -class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event): +class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event): # pylint: disable=line-too-long ''' See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event diff --git a/docs/mkdocs/src/drivers/index.md b/docs/mkdocs/src/drivers/index.md index aa5f0a17..e741126d 100644 --- a/docs/mkdocs/src/drivers/index.md +++ b/docs/mkdocs/src/drivers/index.md @@ -16,4 +16,5 @@ USB vendor ID and product ID. Drivers included in the module are: - * [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles. \ No newline at end of file + * [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles. + * [Intel](intel.md): Loading of Firmware and Config for Intel USB controllers. \ No newline at end of file diff --git a/docs/mkdocs/src/drivers/intel.md b/docs/mkdocs/src/drivers/intel.md new file mode 100644 index 00000000..372b6c5e --- /dev/null +++ b/docs/mkdocs/src/drivers/intel.md @@ -0,0 +1,73 @@ +INTEL DRIVER +============== + +This driver supports loading firmware images and optional config data to +Intel USB controllers. +A number of USB dongles are supported, but likely not all. +The initial implementation has been tested on BE200 and AX210 controllers. +When using a USB controller, the USB product ID and vendor ID are used +to find whether a matching set of firmware image and config data +is needed for that specific model. If a match exists, the driver will try +load the firmware image and, if needed, config data. +Alternatively, the metadata property ``driver=intel`` may be specified in a transport +name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just +``usb:0`` for the first USB device). +The driver will look for the firmware and config files by name, in order, in: + + * The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR` + if set. + * The directory `/drivers/intel_fw` where `` is the directory + where the `bumble` package is installed. + * The current directory. + +It is also possible to override or extend the config data with parameters passed via the +transport name. The driver name `intel` may be suffixed with `/[+]...` +The supported params are: + * `ddc_addon`: configuration data to add to the data loaded from the config data file + * `ddc_override`: configuration data to use instead of the data loaded from the config data file. + +With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing +the config data (same format as the config file). +Example transport name: +`usb:[driver=intel/dcc_addon:03E40200]0` + + +Obtaining Firmware Images and Config Data +----------------------------------------- + +Firmware images and config data may be obtained from a variety of online +sources. +To facilitate finding a downloading the, the utility program `bumble-intel-fw-download` +may be used. + +``` +Usage: bumble-intel-fw-download [OPTIONS] + + Download Intel firmware images and configs. + +Options: + --output-dir TEXT Output directory where the files will be saved. + Defaults to the OS-specific app data dir, which the + driver will check when trying to find firmware + --source [linux-kernel] [default: linux-kernel] + --single TEXT Only download a single image set, by its base name + --force Overwrite files if they already exist + --help Show this message and exit. +``` + +Utility +------- + +The `bumble-intel-util` utility may be used to interact with an Intel USB controller. + +``` +Usage: bumble-intel-util [OPTIONS] COMMAND [ARGS]... + +Options: + --help Show this message and exit. + +Commands: + bootloader Reboot in bootloader mode. + info Get the firmware info. + load Load a firmware image. +``` \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index caa6c40a..73c359a1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -75,6 +75,8 @@ console_scripts = bumble-pandora-server = bumble.apps.pandora_server:main bumble-rtk-util = bumble.tools.rtk_util:main bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main + bumble-intel-util = bumble.tools.intel_util:main + bumble-intel-fw-download = bumble.tools.intel_fw_download:main [options.package_data] * = py.typed, *.pyi diff --git a/tools/intel_fw_download.py b/tools/intel_fw_download.py new file mode 100644 index 00000000..b34c314b --- /dev/null +++ b/tools/intel_fw_download.py @@ -0,0 +1,130 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +import pathlib +import urllib.request +import urllib.error + +import click + +from bumble.colors import color +from bumble.drivers import intel + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/intel" + + +# ----------------------------------------------------------------------------- +# Functions +# ----------------------------------------------------------------------------- +def download_file(base_url, name): + url = f"{base_url}/{name}" + with urllib.request.urlopen(url) as file: + data = file.read() + print(f"Downloaded {name}: {len(data)} bytes") + return data + + +# ----------------------------------------------------------------------------- +@click.command +@click.option( + "--output-dir", + default="", + help="Output directory where the files will be saved. Defaults to the OS-specific" + "app data dir, which the driver will check when trying to find firmware", + show_default=True, +) +@click.option( + "--source", + type=click.Choice(["linux-kernel"]), + default="linux-kernel", + show_default=True, +) +@click.option("--single", help="Only download a single image set, by its base name") +@click.option("--force", is_flag=True, help="Overwrite files if they already exist") +def main(output_dir, source, single, force): + """Download Intel firmware images and configs.""" + + # Check that the output dir exists + if output_dir == '': + output_dir = intel.intel_firmware_dir() + else: + output_dir = pathlib.Path(output_dir) + if not output_dir.is_dir(): + print("Output dir does not exist or is not a directory") + return + + base_url = { + "linux-kernel": LINUX_KERNEL_GIT_SOURCE, + }[source] + + print("Downloading") + print(color("FROM:", "green"), base_url) + print(color("TO:", "green"), output_dir) + + if single: + images = [(f"{single}.sfi", f"{single}.ddc")] + else: + images = [ + (f"{base_name}.sfi", f"{base_name}.ddc") + for base_name in intel.INTEL_FW_IMAGE_NAMES + ] + + for fw_name, config_name in images: + print(color("---", "yellow")) + fw_image_out = output_dir / fw_name + if not force and fw_image_out.exists(): + print(color(f"{fw_image_out} already exists, skipping", "red")) + continue + if config_name: + config_image_out = output_dir / config_name + if not force and config_image_out.exists(): + print(color("f{config_image_out} already exists, skipping", "red")) + continue + + try: + fw_image = download_file(base_url, fw_name) + except urllib.error.HTTPError as error: + print(f"Failed to download {fw_name}: {error}") + continue + + config_image = None + if config_name: + try: + config_image = download_file(base_url, config_name) + except urllib.error.HTTPError as error: + print(f"Failed to download {config_name}: {error}") + continue + + fw_image_out.write_bytes(fw_image) + if config_image: + config_image_out.write_bytes(config_image) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + main() diff --git a/tools/intel_util.py b/tools/intel_util.py new file mode 100644 index 00000000..0333a010 --- /dev/null +++ b/tools/intel_util.py @@ -0,0 +1,154 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +import asyncio +import os +from typing import Any, Optional + +import click + +from bumble.colors import color +from bumble import transport +from bumble.drivers import intel +from bumble.host import Host + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +def print_device_info(device_info: dict[intel.ValueType, Any]) -> None: + if (mode := device_info.get(intel.ValueType.CURRENT_MODE_OF_OPERATION)) is not None: + print( + color("MODE:", "yellow"), + mode.name, + ) + print(color("DETAILS:", "yellow")) + for key, value in device_info.items(): + print(f" {color(key.name, 'green')}: {value}") + + +# ----------------------------------------------------------------------------- +async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]: + # Create a driver + driver = await intel.Driver.for_host(host, force) + if driver is None: + print("Device does not appear to be an Intel device") + return None + + return driver + + +# ----------------------------------------------------------------------------- +async def do_info(usb_transport, force): + async with await transport.open_transport(usb_transport) as ( + hci_source, + hci_sink, + ): + host = Host(hci_source, hci_sink) + driver = await get_driver(host, force) + if driver is None: + return + + # Get and print the device info + print_device_info(await driver.read_device_info()) + + +# ----------------------------------------------------------------------------- +async def do_load(usb_transport: str, force: bool) -> None: + async with await transport.open_transport(usb_transport) as ( + hci_source, + hci_sink, + ): + host = Host(hci_source, hci_sink) + driver = await get_driver(host, force) + if driver is None: + return + + # Reboot in bootloader mode + await driver.load_firmware() + + # Get and print the device info + print_device_info(await driver.read_device_info()) + + +# ----------------------------------------------------------------------------- +async def do_bootloader(usb_transport: str, force: bool) -> None: + async with await transport.open_transport(usb_transport) as ( + hci_source, + hci_sink, + ): + host = Host(hci_source, hci_sink) + driver = await get_driver(host, force) + if driver is None: + return + + # Reboot in bootloader mode + await driver.reboot_bootloader() + + +# ----------------------------------------------------------------------------- +@click.group() +def main(): + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + + +@main.command +@click.argument("usb_transport") +@click.option( + "--force", + is_flag=True, + default=False, + help="Try to get the device info even if the USB info doesn't match", +) +def info(usb_transport, force): + """Get the firmware info.""" + asyncio.run(do_info(usb_transport, force)) + + +@main.command +@click.argument("usb_transport") +@click.option( + "--force", + is_flag=True, + default=False, + help="Load even if the USB info doesn't match", +) +def load(usb_transport, force): + """Load a firmware image.""" + asyncio.run(do_load(usb_transport, force)) + + +@main.command +@click.argument("usb_transport") +@click.option( + "--force", + is_flag=True, + default=False, + help="Attempt to reboot event if the USB info doesn't match", +) +def bootloader(usb_transport, force): + """Reboot in bootloader mode.""" + asyncio.run(do_bootloader(usb_transport, force)) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + main()