Merge pull request #38 from google/gbg/usb-interface-discovery

add support for dynamic discovery of USB endpoint addresses
This commit is contained in:
Gilles Boccon-Gibod
2022-09-21 11:40:13 -07:00
committed by GitHub

View File

@@ -56,18 +56,19 @@ async def open_usb_transport(spec):
USB_RECIPIENT_DEVICE = 0x00 USB_RECIPIENT_DEVICE = 0x00
USB_REQUEST_TYPE_CLASS = 0x01 << 5 USB_REQUEST_TYPE_CLASS = 0x01 << 5
USB_ENDPOINT_EVENTS_IN = 0x81
USB_ENDPOINT_ACL_IN = 0x82
USB_ENDPOINT_ACL_OUT = 0x02
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0 USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01 USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01 USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
USB_ENDPOINT_IN = 0x80
READ_SIZE = 1024 READ_SIZE = 1024
class UsbPacketSink: class UsbPacketSink:
def __init__(self, device): def __init__(self, device, acl_out):
self.device = device self.device = device
self.acl_out = acl_out
self.transfer = device.getTransfer() self.transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
@@ -116,7 +117,7 @@ async def open_usb_transport(spec):
packet_type = packet[0] packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET: if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk( self.transfer.setBulk(
USB_ENDPOINT_ACL_OUT, self.acl_out,
packet[1:], packet[1:],
callback=self.on_packet_sent callback=self.on_packet_sent
) )
@@ -152,10 +153,12 @@ async def open_usb_transport(spec):
logger.debug('OUT transfer likely already completed') logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource): class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device): def __init__(self, context, device, acl_in, events_in):
super().__init__() super().__init__()
self.context = context self.context = context
self.device = device self.device = device
self.acl_in = acl_in
self.events_in = events_in
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.closed = False self.closed = False
@@ -172,7 +175,7 @@ async def open_usb_transport(spec):
# Set up transfer objects for input # Set up transfer objects for input
self.events_in_transfer = device.getTransfer() self.events_in_transfer = device.getTransfer()
self.events_in_transfer.setInterrupt( self.events_in_transfer.setInterrupt(
USB_ENDPOINT_EVENTS_IN, self.events_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.on_packet_received,
user_data=hci.HCI_EVENT_PACKET user_data=hci.HCI_EVENT_PACKET
@@ -181,7 +184,7 @@ async def open_usb_transport(spec):
self.acl_in_transfer = device.getTransfer() self.acl_in_transfer = device.getTransfer()
self.acl_in_transfer.setBulk( self.acl_in_transfer.setBulk(
USB_ENDPOINT_ACL_IN, self.acl_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.on_packet_received,
user_data=hci.HCI_ACL_DATA_PACKET user_data=hci.HCI_ACL_DATA_PACKET
@@ -248,7 +251,7 @@ async def open_usb_transport(spec):
await self.event_loop_done await self.event_loop_done
class UsbTransport(Transport): class UsbTransport(Transport):
def __init__(self, context, device, interface, source, sink): def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink) super().__init__(source, sink)
self.context = context self.context = context
self.device = device self.device = device
@@ -257,6 +260,10 @@ async def open_usb_transport(spec):
# Get exclusive access # Get exclusive access
device.claimInterface(interface) device.claimInterface(interface)
# Set the alternate setting if not the default
if setting != 0:
device.setInterfaceAltSetting(interface, setting)
# The source and sink can now start # The source and sink can now start
source.start() source.start()
sink.start() sink.start()
@@ -313,10 +320,63 @@ async def open_usb_transport(spec):
raise ValueError('device not found') raise ValueError('device not found')
logger.debug(f'USB Device: {found}') logger.debug(f'USB Device: {found}')
device = found.open()
# Use the first interface # Look for the first interface with the right class and endpoints
interface = 0 def find_endpoints(device):
for (configuration_index, configuration) in enumerate(device):
interface = None
for interface in configuration:
setting = None
for setting in interface:
if (
setting.getClass() != USB_DEVICE_CLASS_WIRELESS_CONTROLLER or
setting.getSubClass() != USB_DEVICE_SUBCLASS_RF_CONTROLLER or
setting.getProtocol() != USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
):
continue
events_in = None
acl_in = None
acl_out = None
for endpoint in setting:
attributes = endpoint.getAttributes()
address = endpoint.getAddress()
if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
if address & USB_ENDPOINT_IN and acl_in is None:
acl_in = address
elif acl_out is None:
acl_out = address
elif attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT:
if address & USB_ENDPOINT_IN and events_in is None:
events_in = address
# Return if we found all 3 endpoints
if acl_in is not None and acl_out is not None and events_in is not None:
return (
configuration_index + 1,
setting.getNumber(),
setting.getAlternateSetting(),
acl_in,
acl_out,
events_in
)
else:
logger.debug(f'skipping configuration {configuration_index + 1} / interface {setting.getNumber()}')
endpoints = find_endpoints(found)
if endpoints is None:
raise ValueError('no compatible interface found for device')
(configuration, interface, setting, acl_in, acl_out, events_in) = endpoints
logger.debug(
f'selected endpoints: configuration={configuration}, '
f'interface={interface}, '
f'setting={setting}, '
f'acl_in=0x{acl_in:02X}, '
f'acl_out=0x{acl_out:02X}, '
f'events_in=0x{events_in:02X}, '
)
device = found.open()
# Detach the kernel driver if supported and needed # Detach the kernel driver if supported and needed
if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER): if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER):
@@ -329,21 +389,21 @@ async def open_usb_transport(spec):
# Set the configuration if needed # Set the configuration if needed
try: try:
configuration = device.getConfiguration() current_configuration = device.getConfiguration()
logger.debug(f'current configuration = {configuration}') logger.debug(f'current configuration = {current_configuration}')
except usb1.USBError: except usb1.USBError:
configuration = 0 current_configuration = 0
if configuration != 1: if current_configuration != configuration:
try: try:
logger.debug('setting configuration 1') logger.debug(f'setting configuration {configuration}')
device.setConfiguration(1) device.setConfiguration(configuration)
except usb1.USBError: except usb1.USBError:
logger.warning('failed to set configuration 1') logger.warning('failed to set configuration')
source = UsbPacketSource(context, device) source = UsbPacketSource(context, device, acl_in, events_in)
sink = UsbPacketSink(device) sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, source, sink) return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error: except usb1.USBError as error:
logger.warning(color(f'!!! failed to open USB device: {error}', 'red')) logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
context.close() context.close()