From 852c933c920662a1e86a530287f1be4d90aab4e7 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Fri, 6 Jan 2023 20:22:46 -0800 Subject: [PATCH] wip (+4 squashed commits) Squashed commits: [d29a350] wip [7f541ed] wip [1e2902e] basic working version [14b497a] wip --- bumble/drivers/__init__.py | 68 +++++ bumble/drivers/rtk.py | 604 +++++++++++++++++++++++++++++++++++++ bumble/hci.py | 4 +- bumble/host.py | 20 +- bumble/transport/usb.py | 9 +- setup.cfg | 2 +- utils/rtk_util.py | 159 ++++++++++ 7 files changed, 859 insertions(+), 7 deletions(-) create mode 100644 bumble/drivers/__init__.py create mode 100644 bumble/drivers/rtk.py create mode 100644 utils/rtk_util.py diff --git a/bumble/drivers/__init__.py b/bumble/drivers/__init__.py new file mode 100644 index 0000000..6a61ca8 --- /dev/null +++ b/bumble/drivers/__init__.py @@ -0,0 +1,68 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Drivers that can be used to customize the interaction between a host and a controller, +like loading firmware after a cold start. +""" + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import abc +import logging +from . import rtk + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Classes +# ----------------------------------------------------------------------------- +class Driver(abc.ABC): + """Base class for drivers.""" + + @staticmethod + async def for_host(_host): + """Return a driver instance for a host. + + Args: + host: Host object for which a driver should be created. + + Returns: + A Driver instance if a driver should be instantiated for this host, or + None if no driver instance of this class is needed. + """ + return None + + @abc.abstractmethod + async def init_controller(self): + """Initialize the controller.""" + + +# ----------------------------------------------------------------------------- +# Functions +# ----------------------------------------------------------------------------- +async def get_driver_for_host(host): + """Probe all known diver classes until one returns a valid instance for a host, + or none is found. + """ + if (driver := await rtk.Driver.for_host(host)): + logger.debug("Instantiated RTK driver") + return driver + + return None diff --git a/bumble/drivers/rtk.py b/bumble/drivers/rtk.py new file mode 100644 index 0000000..8a1f1f9 --- /dev/null +++ b/bumble/drivers/rtk.py @@ -0,0 +1,604 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Support for Realtek USB dongles. +Based on various online bits of information, including the Linux kernel. +(see `drivers/bluetooth/btrtl.c`) +""" + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from dataclasses import dataclass +import asyncio +import enum +import logging +import math +import os +import pathlib +import struct +import weakref + + +from bumble.hci import ( + hci_command_op_code, + STATUS_SPEC, + HCI_SUCCESS, + HCI_COMMAND_NAMES, + HCI_Command, + HCI_Reset_Command, + HCI_Read_Local_Version_Information_Command, +) + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +RTK_ROM_LMP_8723A = 0x1200 +RTK_ROM_LMP_8723B = 0x8723 +RTK_ROM_LMP_8821A = 0x8821 +RTK_ROM_LMP_8761A = 0x8761 +RTK_ROM_LMP_8822B = 0x8822 +RTK_ROM_LMP_8852A = 0x8852 +RTK_CONFIG_MAGIC = 0x8723AB55 + +RTK_EPATCH_SIGNATURE = b"Realtech" + +RTK_FRAGMENT_LENGTH = 252 + +RTK_FIRMWARE_DIR_ENV = "BUMBLE_RTK_FIRMWARE_DIR" + + +class RtlProjectId(enum.IntEnum): + PROJECT_ID_8723A = 0 + PROJECT_ID_8723B = 1 + PROJECT_ID_8821A = 2 + PROJECT_ID_8761A = 3 + PROJECT_ID_8822B = 8 + PROJECT_ID_8723D = 9 + PROJECT_ID_8821C = 10 + PROJECT_ID_8822C = 13 + PROJECT_ID_8761B = 14 + PROJECT_ID_8852A = 18 + PROJECT_ID_8852B = 20 + PROJECT_ID_8852C = 25 + + +RTK_PROJECT_ID_TO_ROM = { + 0: RTK_ROM_LMP_8723A, + 1: RTK_ROM_LMP_8723B, + 2: RTK_ROM_LMP_8821A, + 3: RTK_ROM_LMP_8761A, + 8: RTK_ROM_LMP_8822B, + 9: RTK_ROM_LMP_8723B, + 10: RTK_ROM_LMP_8821A, + 13: RTK_ROM_LMP_8822B, + 14: RTK_ROM_LMP_8761A, + 18: RTK_ROM_LMP_8852A, + 20: RTK_ROM_LMP_8852A, + 25: RTK_ROM_LMP_8852A, +} + +# List of USB (VendorID, ProductID) for Realtek-based devices. +RTK_USB_PRODUCTS = { + # Realtek 8723AE + (0x0930, 0x021D), + (0x13D3, 0x3394), + # Realtek 8723BE + (0x0489, 0xE085), + (0x0489, 0xE08B), + (0x04F2, 0xB49F), + (0x13D3, 0x3410), + (0x13D3, 0x3416), + (0x13D3, 0x3459), + (0x13D3, 0x3494), + # Realtek 8723BU + (0x7392, 0xA611), + # Realtek 8723DE + (0x0BDA, 0xB009), + (0x2FF8, 0xB011), + # Realtek 8761BUV + (0x2357, 0x0604), + (0x0B05, 0x190E), + (0x2550, 0x8761), + (0x0BDA, 0x8771), + (0x7392, 0xC611), + (0x2B89, 0x8761), + (0x2230, 0x0016), + # Realtek 8821AE + (0x0B05, 0x17DC), + (0x13D3, 0x3414), + (0x13D3, 0x3458), + (0x13D3, 0x3461), + (0x13D3, 0x3462), + # Realtek 8822BE + (0x13D3, 0x3526), + (0x0B05, 0x185C), + # Realtek 8822CE + (0x04CA, 0x4005), + (0x04C5, 0x161F), + (0x0B05, 0x18EF), + (0x13D3, 0x3548), + (0x13D3, 0x3549), + (0x13D3, 0x3553), + (0x13D3, 0x3555), + (0x2FF8, 0x3051), + (0x1358, 0xC123), + (0x0BDA, 0xC123), + (0x0CB5, 0xC547), +} + +# ----------------------------------------------------------------------------- +# HCI Commands +# ----------------------------------------------------------------------------- +HCI_RTK_READ_ROM_VERSION_COMMAND = hci_command_op_code(0x3F, 0x6D) +HCI_COMMAND_NAMES[HCI_RTK_READ_ROM_VERSION_COMMAND] = "HCI_RTK_READ_ROM_VERSION_COMMAND" + + +@HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)]) +class HCI_RTK_Read_ROM_Version_Command(HCI_Command): + pass + + +HCI_RTK_DOWNLOAD_COMMAND = hci_command_op_code(0x3F, 0x20) +HCI_COMMAND_NAMES[HCI_RTK_DOWNLOAD_COMMAND] = "HCI_RTK_DOWNLOAD_COMMAND" + + +@HCI_Command.command( + fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)], + return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)], +) +class HCI_RTK_Download_Command(HCI_Command): + pass + + +HCI_RTK_DROP_FIRMWARE_COMMAND = hci_command_op_code(0x3F, 0x66) +HCI_COMMAND_NAMES[HCI_RTK_DROP_FIRMWARE_COMMAND] = "HCI_RTK_DROP_FIRMWARE_COMMAND" + + +@HCI_Command.command() +class HCI_RTK_Drop_Firmware_Command(HCI_Command): + pass + + +# ----------------------------------------------------------------------------- +class Firmware: + def __init__(self, firmware): + extension_sig = bytes([0x51, 0x04, 0xFD, 0x77]) + + if not firmware.startswith(RTK_EPATCH_SIGNATURE): + raise ValueError("Firmware does not start with epatch signature") + + if not firmware.endswith(extension_sig): + raise ValueError("Firmware does not end with extension sig") + + # The firmware should start with a 14 byte header. + epatch_header_size = 14 + if len(firmware) < epatch_header_size: + raise ValueError("Firmware too short") + + # Look for the "project ID", starting from the end. + offset = len(firmware) - len(extension_sig) + project_id = -1 + while offset >= epatch_header_size: + length, opcode = firmware[offset - 2 : offset] + offset -= 2 + + if opcode == 0xFF: + # End + break + + if length == 0: + raise ValueError("Invalid 0-length instruction") + + if opcode == 0 and length == 1: + project_id = firmware[offset - 1] + break + + offset -= length + + if project_id < 0: + raise ValueError("Project ID not found") + + self.project_id = project_id + + # Read the patch tables info. + self.version, num_patches = struct.unpack("... (16 bits each) + # ... (16 bits each) + # ... (32 bits each) + if epatch_header_size + 8 * num_patches > len(firmware): + raise ValueError("Firmware too short") + chip_id_table_offset = epatch_header_size + patch_length_table_offset = chip_id_table_offset + 2 * num_patches + patch_offset_table_offset = chip_id_table_offset + 4 * num_patches + for patch_index in range(num_patches): + chip_id_offset = chip_id_table_offset + 2 * patch_index + (chip_id,) = struct.unpack_from(" len(firmware): + raise ValueError("Firmware too short") + + # Get the SVN version for the patch + (svn_version,) = struct.unpack_from( + "= 0x80: + download_index += 1 + if fragment_index == fragment_count - 1: + download_index |= 0x80 # End marker. + fragment_offset = fragment_index * RTK_FRAGMENT_LENGTH + fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH] + logger.debug(f"downloading fragment {fragment_index}") + await self.host.send_command( + HCI_RTK_Download_Command( + index=download_index, payload=fragment, check_result=True + ) + ) + + logger.debug("download complete!") + + # Read the version again + response = await self.host.send_command( + HCI_RTK_Read_ROM_Version_Command(), check_result=True + ) + if response.return_parameters.status != HCI_SUCCESS: + logger.warning("can't get ROM version") + else: + rom_version = response.return_parameters.version + logger.debug(f"ROM version after download: {rom_version:04X}") + + async def download_firmware(self): + if self.driver_info.rom == RTK_ROM_LMP_8723A: + return await self.download_for_rtl8723a() + + if self.driver_info.rom in ( + RTK_ROM_LMP_8723B, + RTK_ROM_LMP_8821A, + RTK_ROM_LMP_8761A, + RTK_ROM_LMP_8822B, + RTK_ROM_LMP_8852A, + ): + return await self.download_for_rtl8723b() + + raise ValueError("ROM not supported") + + async def init_controller(self): + await self.download_firmware() + await self.host.send_command(HCI_Reset_Command(), check_result=True) + logger.info(f"loaded FW image {self.driver_info.fw_name}") diff --git a/bumble/hci.py b/bumble/hci.py index 97ec0cb..cba207d 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1641,9 +1641,11 @@ class HCI_Object: # Get the value for the field value = hci_object[key] - # Map the value if needed + # Check if there's a matching mapper passed if value_mappers: value_mapper = value_mappers.get(key, value_mapper) + + # Map the value if we have a mapper if value_mapper is not None: value = value_mapper(value) diff --git a/bumble/host.py b/bumble/host.py index a33efc8..cd76c69 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -23,6 +23,7 @@ import struct from bumble.colors import color from bumble.l2cap import L2CAP_PDU from bumble.snoop import Snooper +from bumble import drivers from typing import Optional @@ -116,6 +117,7 @@ class Host(AbortableEventEmitter): super().__init__() self.hci_sink = None + self.hci_metadata = None self.ready = False # True when we can accept incoming packets self.reset_done = False self.connections = {} # Connections, by connection handle @@ -141,6 +143,9 @@ class Host(AbortableEventEmitter): # Connect to the source and sink if specified if controller_source: controller_source.set_packet_sink(self) + self.hci_metadata = getattr( + controller_source, 'metadata', self.hci_metadata + ) if controller_sink: self.set_packet_sink(controller_sink) @@ -170,7 +175,7 @@ class Host(AbortableEventEmitter): self.emit('flush') self.command_semaphore.release() - async def reset(self): + async def reset(self, raw=False): if self.ready: self.ready = False await self.flush() @@ -178,6 +183,15 @@ class Host(AbortableEventEmitter): await self.send_command(HCI_Reset_Command(), check_result=True) self.ready = True + # Instantiate and init a driver for the host if needed. + # NOTE: we don't keep a reference to the driver here, because we don't + # currently have a need for the driver later on. But if the driver interface + # evolves, it may be required, then, to store a reference to the driver in + # an object property. + if not raw: + if (driver := await drivers.get_driver_for_host(self)): + await driver.init_controller() + response = await self.send_command( HCI_Read_Local_Supported_Commands_Command(), check_result=True ) @@ -298,7 +312,7 @@ class Host(AbortableEventEmitter): if self.snooper: self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER) - self.hci_sink.on_packet(packet.to_bytes()) + self.hci_sink.on_packet(bytes(packet)) async def send_command(self, command, check_result=False): logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}') @@ -350,7 +364,7 @@ class Host(AbortableEventEmitter): asyncio.create_task(send_command(command)) def send_l2cap_pdu(self, connection_handle, cid, pdu): - l2cap_pdu = L2CAP_PDU(cid, pdu).to_bytes() + l2cap_pdu = bytes(L2CAP_PDU(cid, pdu)) # Send the data to the controller via ACL packets bytes_remaining = len(l2cap_pdu) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index 68c5a6f..13cad60 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -206,10 +206,11 @@ async def open_usb_transport(spec): logger.debug('OUT transfer likely already completed') class UsbPacketSource(asyncio.Protocol, ParserSource): - def __init__(self, context, device, acl_in, events_in): + def __init__(self, context, device, metadata, acl_in, events_in): super().__init__() self.context = context self.device = device + self.metadata = metadata self.acl_in = acl_in self.events_in = events_in self.loop = asyncio.get_running_loop() @@ -510,6 +511,10 @@ async def open_usb_transport(spec): f'events_in=0x{events_in:02X}, ' ) + device_metadata = { + 'vendor_id': found.getVendorID(), + 'product_id': found.getProductID(), + } device = found.open() # Auto-detach the kernel driver if supported @@ -535,7 +540,7 @@ async def open_usb_transport(spec): except usb1.USBError: logger.warning('failed to set configuration') - source = UsbPacketSource(context, device, acl_in, events_in) + source = UsbPacketSource(context, device, device_metadata, acl_in, events_in) sink = UsbPacketSink(device, acl_out) return UsbTransport(context, device, interface, setting, source, sink) except usb1.USBError as error: diff --git a/setup.cfg b/setup.cfg index 2132438..ac13d9c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,7 +24,7 @@ url = https://github.com/google/bumble [options] python_requires = >=3.8 -packages = bumble, bumble.transport, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora +packages = bumble, bumble.transport, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora package_dir = bumble = bumble bumble.apps = apps diff --git a/utils/rtk_util.py b/utils/rtk_util.py new file mode 100644 index 0000000..bd4f223 --- /dev/null +++ b/utils/rtk_util.py @@ -0,0 +1,159 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +import asyncio +import os + +import click + +from bumble import transport +from bumble.host import Host +from bumble.drivers import rtk + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +def do_parse(firmware_path): + with open(firmware_path, 'rb') as firmware_file: + firmware_data = firmware_file.read() + firmware = rtk.Firmware(firmware_data) + print(f'Firmware: version=0x{firmware.version:08X} project_id=0x{firmware.project_id:04X}') + for patch in firmware.patches: + print( + f" Patch: chip_id=0x{patch[0]:04X}, " + f"{len(patch[1])} bytes, " + f"SVN Version={patch[2]:08X}" + ) + + +# ----------------------------------------------------------------------------- +async def do_load(usb_transport, force): + async with await transport.open_transport_or_link(usb_transport) as ( + hci_source, + hci_sink, + ): + # Create a host to communicate with the device + host = Host(hci_source, hci_sink) + await host.reset(raw=True) + + # Get the driver. + driver = await rtk.Driver.for_host(host, force) + if driver is None: + if not force: + print("Firmware already loaded or no supported driver for this device.") + return + + await driver.download_firmware() + + +# ----------------------------------------------------------------------------- +async def do_drop(usb_transport): + async with await transport.open_transport_or_link(usb_transport) as ( + hci_source, + hci_sink, + ): + # Create a host to communicate with the device + host = Host(hci_source, hci_sink) + await host.reset(raw=True) + + # Tell the device to reset/drop any loaded patch + await rtk.Driver.drop_firmware(host) + + +# ----------------------------------------------------------------------------- +async def do_info(usb_transport, force): + async with await transport.open_transport(usb_transport) as ( + hci_source, + hci_sink, + ): + # Create a host to communicate with the device + host = Host(hci_source, hci_sink) + await host.reset(raw=True) + + # Check if this is a supported device. + if not force and not rtk.Driver.check(host): + print("USB device not supported by this RTK driver") + return + + # Get the driver info. + driver_info = await rtk.Driver.driver_info_for_host(host) + if driver_info: + print( + "Driver:\n" + f" ROM: {driver_info.rom:04X}\n" + f" Firmware: {driver_info.fw_name}\n" + f" Config: {driver_info.config_name}\n" + ) + else: + print("Firmware already loaded or no supported driver for this device.") + + +# ----------------------------------------------------------------------------- +@click.group() +def main(): + pass + + +@main.command +@click.argument("firmware_path") +def parse(firmware_path): + """Parse a firmware image.""" + do_parse(firmware_path) + + +@main.command +@click.argument("usb_transport") +@click.option( + "--force", + is_flag=True, + default=False, + help="Load even if the USB info doesn't match", +) +def load(usb_transport, force): + """Load a firmware image into the USB dongle.""" + asyncio.run(do_load(usb_transport, force)) + + +@main.command +@click.argument("usb_transport") +def drop(usb_transport): + """Drop a firmware image from the USB dongle.""" + asyncio.run(do_drop(usb_transport)) + + +@main.command +@click.argument("usb_transport") +@click.option( + "--force", + is_flag=True, + default=False, + help="Try to get the device info even if the USB info doesn't match", +) +def info(usb_transport, force): + """Get the firmware info from a USB dongle.""" + asyncio.run(do_info(usb_transport, force)) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + main()