From db5e52f1dfcdfa825f3691e846b4e008afd136f2 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Tue, 20 Sep 2022 22:25:40 -0700 Subject: [PATCH] add support for alternate settings --- bumble/transport/usb.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index f5887c8..80a8871 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -251,7 +251,7 @@ async def open_usb_transport(spec): await self.event_loop_done class UsbTransport(Transport): - def __init__(self, context, device, interface, source, sink): + def __init__(self, context, device, interface, setting, source, sink): super().__init__(source, sink) self.context = context self.device = device @@ -260,6 +260,10 @@ async def open_usb_transport(spec): # Get exclusive access device.claimInterface(interface) + # Set the alternate setting if not the default + if setting != 0: + device.setInterfaceAltSetting(interface, setting) + # The source and sink can now start source.start() sink.start() @@ -317,13 +321,20 @@ async def open_usb_transport(spec): logger.debug(f'USB Device: {found}') - # Look for an interface and setting with the right types of endpoints + # Look for the first interface with the right class and endpoints def find_endpoints(device): for (configuration_index, configuration) in enumerate(device): interface = None for interface in configuration: setting = None for setting in interface: + if ( + setting.getClass() != USB_DEVICE_CLASS_WIRELESS_CONTROLLER or + setting.getSubClass() != USB_DEVICE_SUBCLASS_RF_CONTROLLER or + setting.getProtocol() != USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER + ): + continue + events_in = None acl_in = None acl_out = None @@ -341,18 +352,25 @@ async def open_usb_transport(spec): # Return if we found all 3 endpoints if acl_in is not None and acl_out is not None and events_in is not None: - return (configuration_index + 1, setting.getNumber(), acl_in, acl_out, events_in) + return ( + configuration_index + 1, + setting.getNumber(), + setting.getAlternateSetting(), + acl_in, + acl_out, + events_in + ) else: logger.debug(f'skipping configuration {configuration_index + 1} / interface {setting.getNumber()}') endpoints = find_endpoints(found) if endpoints is None: - logger.warning('no compatible interface found') raise ValueError('no compatible interface found for device') - (configuration, interface, acl_in, acl_out, events_in) = endpoints + (configuration, interface, setting, acl_in, acl_out, events_in) = endpoints logger.debug( f'selected endpoints: configuration={configuration}, ' f'interface={interface}, ' + f'setting={setting}, ' f'acl_in=0x{acl_in:02X}, ' f'acl_out=0x{acl_out:02X}, ' f'events_in=0x{events_in:02X}, ' @@ -385,7 +403,7 @@ async def open_usb_transport(spec): source = UsbPacketSource(context, device, acl_in, events_in) sink = UsbPacketSink(device, acl_out) - return UsbTransport(context, device, interface, source, sink) + return UsbTransport(context, device, interface, setting, source, sink) except usb1.USBError as error: logger.warning(color(f'!!! failed to open USB device: {error}', 'red')) context.close()