mirror of
https://github.com/google/bumble.git
synced 2026-05-31 07:27:01 +00:00
add support for alternate settings
This commit is contained in:
+24
-6
@@ -251,7 +251,7 @@ async def open_usb_transport(spec):
|
||||
await self.event_loop_done
|
||||
|
||||
class UsbTransport(Transport):
|
||||
def __init__(self, context, device, interface, source, sink):
|
||||
def __init__(self, context, device, interface, setting, source, sink):
|
||||
super().__init__(source, sink)
|
||||
self.context = context
|
||||
self.device = device
|
||||
@@ -260,6 +260,10 @@ async def open_usb_transport(spec):
|
||||
# Get exclusive access
|
||||
device.claimInterface(interface)
|
||||
|
||||
# Set the alternate setting if not the default
|
||||
if setting != 0:
|
||||
device.setInterfaceAltSetting(interface, setting)
|
||||
|
||||
# The source and sink can now start
|
||||
source.start()
|
||||
sink.start()
|
||||
@@ -317,13 +321,20 @@ async def open_usb_transport(spec):
|
||||
|
||||
logger.debug(f'USB Device: {found}')
|
||||
|
||||
# Look for an interface and setting with the right types of endpoints
|
||||
# Look for the first interface with the right class and endpoints
|
||||
def find_endpoints(device):
|
||||
for (configuration_index, configuration) in enumerate(device):
|
||||
interface = None
|
||||
for interface in configuration:
|
||||
setting = None
|
||||
for setting in interface:
|
||||
if (
|
||||
setting.getClass() != USB_DEVICE_CLASS_WIRELESS_CONTROLLER or
|
||||
setting.getSubClass() != USB_DEVICE_SUBCLASS_RF_CONTROLLER or
|
||||
setting.getProtocol() != USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
):
|
||||
continue
|
||||
|
||||
events_in = None
|
||||
acl_in = None
|
||||
acl_out = None
|
||||
@@ -341,18 +352,25 @@ async def open_usb_transport(spec):
|
||||
|
||||
# Return if we found all 3 endpoints
|
||||
if acl_in is not None and acl_out is not None and events_in is not None:
|
||||
return (configuration_index + 1, setting.getNumber(), acl_in, acl_out, events_in)
|
||||
return (
|
||||
configuration_index + 1,
|
||||
setting.getNumber(),
|
||||
setting.getAlternateSetting(),
|
||||
acl_in,
|
||||
acl_out,
|
||||
events_in
|
||||
)
|
||||
else:
|
||||
logger.debug(f'skipping configuration {configuration_index + 1} / interface {setting.getNumber()}')
|
||||
|
||||
endpoints = find_endpoints(found)
|
||||
if endpoints is None:
|
||||
logger.warning('no compatible interface found')
|
||||
raise ValueError('no compatible interface found for device')
|
||||
(configuration, interface, acl_in, acl_out, events_in) = endpoints
|
||||
(configuration, interface, setting, acl_in, acl_out, events_in) = endpoints
|
||||
logger.debug(
|
||||
f'selected endpoints: configuration={configuration}, '
|
||||
f'interface={interface}, '
|
||||
f'setting={setting}, '
|
||||
f'acl_in=0x{acl_in:02X}, '
|
||||
f'acl_out=0x{acl_out:02X}, '
|
||||
f'events_in=0x{events_in:02X}, '
|
||||
@@ -385,7 +403,7 @@ async def open_usb_transport(spec):
|
||||
|
||||
source = UsbPacketSource(context, device, acl_in, events_in)
|
||||
sink = UsbPacketSink(device, acl_out)
|
||||
return UsbTransport(context, device, interface, source, sink)
|
||||
return UsbTransport(context, device, interface, setting, source, sink)
|
||||
except usb1.USBError as error:
|
||||
logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
|
||||
context.close()
|
||||
|
||||
Reference in New Issue
Block a user