From 0568cead47356972f3c82a1ecbf0c042e6331809 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Thu, 25 Aug 2022 16:04:52 -0700 Subject: [PATCH] add usb_probe tool and improve compatibility with older/non-compliant devices --- apps/usb_probe.py | 157 ++++++++++++++++++++++++++++++++++++++++ bumble/device.py | 9 ++- bumble/host.py | 70 +++++++++++------- bumble/transport/usb.py | 61 ++++++++++------ 4 files changed, 241 insertions(+), 56 deletions(-) create mode 100644 apps/usb_probe.py diff --git a/apps/usb_probe.py b/apps/usb_probe.py new file mode 100644 index 0000000..9cded2b --- /dev/null +++ b/apps/usb_probe.py @@ -0,0 +1,157 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# This tool lists all the USB devices, with details about each device. +# For each device, the different possible Bumble transport strings that can +# refer to it are listed. If the device is known to be a Bluetooth HCI device, +# its identifier is printed in reverse colors, and the transport names in cyan color. +# For other devices, regardless of their type, the transport names are printed +# in red. Whether that device is actually a Bluetooth device or not depends on +# whether it is a Bluetooth device that uses a non-standard Class, or some other +# type of device (there's no way to tell). +# ----------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import os +import logging +import usb1 +from colors import color + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0 +USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01 +USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01 + +USB_DEVICE_CLASSES = { + 0x00: 'Device', + 0x01: 'Audio', + 0x02: 'Communications and CDC Control', + 0x03: 'Human Interface Device', + 0x05: 'Physical', + 0x06: 'Still Imaging', + 0x07: 'Printer', + 0x08: 'Mass Storage', + 0x09: 'Hub', + 0x0A: 'CDC Data', + 0x0B: 'Smart Card', + 0x0D: 'Content Security', + 0x0E: 'Video', + 0x0F: 'Personal Healthcare', + 0x10: 'Audio/Video', + 0x11: 'Billboard', + 0x12: 'USB Type-C Bridge', + 0x3C: 'I3C', + 0xDC: 'Diagnostic', + USB_DEVICE_CLASS_WIRELESS_CONTROLLER: ( + 'Wireless Controller', + { + 0x01: { + 0x01: 'Bluetooth', + 0x02: 'UWB', + 0x03: 'Remote NDIS', + 0x04: 'Bluetooth AMP' + } + } + ), + 0xEF: 'Miscellaneous', + 0xFE: 'Application Specific', + 0xFF: 'Vendor Specific' +} + + +# ----------------------------------------------------------------------------- +def main(): + logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) + + with usb1.USBContext() as context: + bluetooth_device_count = 0 + devices = {} + + for device in context.getDeviceIterator(skip_on_error=True): + device_class = device.getDeviceClass() + device_subclass = device.getDeviceSubClass() + device_protocol = device.getDeviceProtocol() + + device_id = (device.getVendorID(), device.getProductID()) + + device_is_bluetooth_hci = ( + device_class == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and + device_subclass == USB_DEVICE_SUBCLASS_RF_CONTROLLER and + device_protocol == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER + ) + + device_class_details = '' + device_class_info = USB_DEVICE_CLASSES.get(device_class) + if device_class_info is not None: + if type(device_class_info) is tuple: + device_class = device_class_info[0] + device_subclass_info = device_class_info[1].get(device_subclass) + if device_subclass_info: + device_class_details = f' [{device_subclass_info.get(device_protocol)}]' + else: + device_class = device_class_info + + if device_is_bluetooth_hci: + bluetooth_device_count += 1 + fg_color = 'black' + bg_color = 'yellow' + else: + fg_color = 'yellow' + bg_color = 'black' + + # Compute the different ways this can be referenced as a Bumble transport + bumble_transport_names = [] + basic_transport_name = f'usb:{device.getVendorID():04X}:{device.getProductID():04X}' + + if device_is_bluetooth_hci: + bumble_transport_names.append(f'usb:{bluetooth_device_count - 1}') + + serial_number_collision = False + if device_id in devices: + for device_serial in devices[device_id]: + if device_serial == device.getSerialNumber(): + serial_number_collision = True + + if device_id not in devices: + bumble_transport_names.append(basic_transport_name) + else: + bumble_transport_names.append(f'{basic_transport_name}#{len(devices[device_id])}') + + if device.getSerialNumber() and not serial_number_collision: + bumble_transport_names.append(f'{basic_transport_name}/{device.getSerialNumber()}') + + print(color(f'ID {device.getVendorID():04X}:{device.getProductID():04X}', fg=fg_color, bg=bg_color)) + if bumble_transport_names: + print(color(' Bumble Transport Names:', 'blue'), ' or '.join(color(x, 'cyan' if device_is_bluetooth_hci else 'red') for x in bumble_transport_names)) + print(color(' Bus/Device: ', 'green'), f'{device.getBusNumber():03}/{device.getDeviceAddress():03}') + if device.getSerialNumber(): + print(color(' Serial: ', 'green'), device.getSerialNumber()) + print(color(' Class: ', 'green'), device_class) + print(color(' Subclass/Protocol: ', 'green'), f'{device_subclass}/{device_protocol}{device_class_details}') + print(color(' Manufacturer: ', 'green'), device.getManufacturer()) + print(color(' Product: ', 'green'), device.getProduct()) + print() + + devices.setdefault(device_id, []).append(device.getSerialNumber()) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + main() diff --git a/bumble/device.py b/bumble/device.py index cb04e7c..3a08445 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -582,11 +582,12 @@ class Device(CompositeEventEmitter): logger.debug(color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')) self.public_address = response.return_parameters.bd_addr + if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND): + await self.send_command(HCI_Write_LE_Host_Support_Command( + le_supported_host = int(self.le_enabled), + simultaneous_le_host = int(self.le_simultaneous_enabled), + )) - await self.send_command(HCI_Write_LE_Host_Support_Command( - le_supported_host = int(self.le_enabled), - simultaneous_le_host = int(self.le_simultaneous_enabled), - )) if self.le_enabled: # Set the controller address await self.send_command(HCI_LE_Set_Random_Address_Command( diff --git a/bumble/host.py b/bumble/host.py index 7175412..2d7c979 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -75,7 +75,7 @@ class Host(EventEmitter): self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS self.acl_packet_queue = collections.deque() self.acl_packets_in_flight = 0 - self.local_version = None + self.local_version = HCI_VERSION_BLUETOOTH_CORE_4_0 self.local_supported_commands = bytes(64) self.local_le_features = 0 self.command_semaphore = asyncio.Semaphore(1) @@ -93,17 +93,18 @@ class Host(EventEmitter): await self.send_command(HCI_Reset_Command()) self.ready = True - await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFFFF'))) - await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = bytes.fromhex('FFFFF00000000000'))) - response = await self.send_command(HCI_Read_Local_Supported_Commands_Command()) if response.return_parameters.status == HCI_SUCCESS: self.local_supported_commands = response.return_parameters.supported_commands else: logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}') - if self.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND): - await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0)) + if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): + response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command()) + if response.return_parameters.status == HCI_SUCCESS: + self.local_le_features = struct.unpack(' 0 and self.acl_packets_in_flight < self.hc_total_num_le_acl_data_packets: packet = self.acl_packet_queue.pop() self.send_hci_packet(packet) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index dfc169f..65a3690 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -37,16 +37,20 @@ async def open_usb_transport(spec): ''' Open a USB transport. The parameter string has this syntax: - either or :[/] + either or + : or + :/] or + :# With as the 0-based index to select amongst all the devices that appear to be supporting Bluetooth HCI (0 being the first one), or Where and are the vendor ID and product ID in hexadecimal. The - / suffix max be specified when more than one device with the same - vendor and product identifiers are present. + / suffix or # suffix max be specified when more than one device with + the same vendor and product identifiers are present. Examples: 0 --> the first BT USB dongle 04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901 + 04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901 04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987 ''' @@ -190,7 +194,7 @@ async def open_usb_transport(spec): def on_packet_received(self, transfer): packet_type = transfer.getUserData() status = transfer.getStatus() - # logger.debug(f'<<< USB IN transfer callback: status={status} packet_type={packet_type}') + # logger.debug(f'<<< USB IN transfer callback: status={status} packet_type={packet_type} length={transfer.getActualLength()}') if status == usb1.TRANSFER_COMPLETED: packet = bytes([packet_type]) + transfer.getBuffer()[:transfer.getActualLength()] @@ -271,19 +275,25 @@ async def open_usb_transport(spec): found = None if ':' in spec: vendor_id, product_id = spec.split(':') + serial_number = None + device_index = 0 if '/' in product_id: product_id, serial_number = product_id.split('/') - for device in context.getDeviceIterator(skip_on_error=True): - if ( - device.getVendorID() == int(vendor_id, 16) and - device.getProductID() == int(product_id, 16) and - device.getSerialNumber() == serial_number - ): + elif '#' in product_id: + product_id, device_index_str = product_id.split('#') + device_index = int(device_index_str) + + for device in context.getDeviceIterator(skip_on_error=True): + if ( + device.getVendorID() == int(vendor_id, 16) and + device.getProductID() == int(product_id, 16) and + serial_number is None or device.getSerialNumber() == serial_number + ): + if device_index == 0: found = device break - device.close() - else: - found = context.getByVendorIDAndProductID(int(vendor_id, 16), int(product_id, 16), skip_on_error=True) + device_index -= 1 + device.close() else: device_index = int(spec) for device in context.getDeviceIterator(skip_on_error=True): @@ -305,17 +315,6 @@ async def open_usb_transport(spec): logger.debug(f'USB Device: {found}') device = found.open() - # Set the configuration if needed - try: - configuration = device.getConfiguration() - logger.debug(f'current configuration = {configuration}') - except usb1.USBError: - try: - logger.debug('setting configuration 1') - device.setConfiguration(1) - except usb1.USBError: - logger.debug('failed to set configuration 1') - # Use the first interface interface = 0 @@ -328,6 +327,20 @@ async def open_usb_transport(spec): except usb1.USBError: pass + # Set the configuration if needed + try: + configuration = device.getConfiguration() + logger.debug(f'current configuration = {configuration}') + except usb1.USBError: + configuration = 0 + + if configuration != 1: + try: + logger.debug('setting configuration 1') + device.setConfiguration(1) + except usb1.USBError: + logger.warning('failed to set configuration 1') + source = UsbPacketSource(context, device) sink = UsbPacketSink(device) return UsbTransport(context, device, interface, source, sink)