Merge pull request #120 from google/gbg/usb-cleanup

minor cleanup of the internals of the usb transport implementation
This commit is contained in:
Gilles Boccon-Gibod
2023-11-22 17:18:23 -08:00
committed by GitHub

View File

@@ -24,9 +24,10 @@ import platform
import usb1 import usb1
from .common import Transport, ParserSource from bumble.transport.common import Transport, ParserSource
from .. import hci from bumble import hci
from ..colors import color from bumble.colors import color
from bumble.utils import AsyncRunner
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -113,7 +114,7 @@ async def open_usb_transport(spec: str) -> Transport:
def __init__(self, device, acl_out): def __init__(self, device, acl_out):
self.device = device self.device = device
self.acl_out = acl_out self.acl_out = acl_out
self.transfer = device.getTransfer() self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future() self.cancel_done = self.loop.create_future()
@@ -137,21 +138,20 @@ async def open_usb_transport(spec: str) -> Transport:
# The queue was previously empty, re-prime the pump # The queue was previously empty, re-prime the pump
self.process_queue() self.process_queue()
def on_packet_sent(self, transfer): def transfer_callback(self, transfer):
status = transfer.getStatus() status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')
# pylint: disable=no-member # pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED: if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent_) self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED: elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None) self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else: else:
logger.warning( logger.warning(
color(f'!!! out transfer not completed: status={status}', 'red') color(f'!!! OUT transfer not completed: status={status}', 'red')
) )
def on_packet_sent_(self): def on_packet_sent(self):
if self.packets: if self.packets:
self.packets.popleft() self.packets.popleft()
self.process_queue() self.process_queue()
@@ -163,22 +163,20 @@ async def open_usb_transport(spec: str) -> Transport:
packet = self.packets[0] packet = self.packets[0]
packet_type = packet[0] packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET: if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk( self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent self.acl_out, packet[1:], callback=self.transfer_callback
) )
logger.debug('submit ACL') self.acl_out_transfer.submit()
self.transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET: elif packet_type == hci.HCI_COMMAND_PACKET:
self.transfer.setControl( self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0, 0,
0, 0,
0, 0,
packet[1:], packet[1:],
callback=self.on_packet_sent, callback=self.transfer_callback,
) )
logger.debug('submit COMMAND') self.acl_out_transfer.submit()
self.transfer.submit()
else: else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red')) logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
@@ -193,11 +191,11 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.clear() self.packets.clear()
# If we have a transfer in flight, cancel it # If we have a transfer in flight, cancel it
if self.transfer.isSubmitted(): if self.acl_out_transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have # Try to cancel the transfer, but that may fail because it may have
# already completed # already completed
try: try:
self.transfer.cancel() self.acl_out_transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...') logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done await self.cancel_done
@@ -206,27 +204,22 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug('OUT transfer likely already completed') logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource): class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, metadata, acl_in, events_in): def __init__(self, device, metadata, acl_in, events_in):
super().__init__() super().__init__()
self.context = context
self.device = device self.device = device
self.metadata = metadata self.metadata = metadata
self.acl_in = acl_in self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.dequeue_task = None self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = { self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(), hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(), hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
} }
self.events_in_transfer = None self.closed = False
self.acl_in_transfer = None
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
def start(self): def start(self):
# Set up transfer objects for input # Set up transfer objects for input
@@ -234,7 +227,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.events_in_transfer.setInterrupt( self.events_in_transfer.setInterrupt(
self.events_in, self.events_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET, user_data=hci.HCI_EVENT_PACKET,
) )
self.events_in_transfer.submit() self.events_in_transfer.submit()
@@ -243,22 +236,23 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_in_transfer.setBulk( self.acl_in_transfer.setBulk(
self.acl_in, self.acl_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET, user_data=hci.HCI_ACL_DATA_PACKET,
) )
self.acl_in_transfer.submit() self.acl_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue()) self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()
def on_packet_received(self, transfer): @property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)
def transfer_callback(self, transfer):
packet_type = transfer.getUserData() packet_type = transfer.getUserData()
status = transfer.getStatus() status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )
# pylint: disable=no-member # pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED: if status == usb1.TRANSFER_COMPLETED:
@@ -267,18 +261,18 @@ async def open_usb_transport(spec: str) -> Transport:
+ transfer.getBuffer()[: transfer.getActualLength()] + transfer.getBuffer()[: transfer.getActualLength()]
) )
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet) self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED: elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe( self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None self.cancel_done[packet_type].set_result, None
) )
return
else: else:
logger.warning( logger.warning(
color(f'!!! transfer not completed: status={status}', 'red') color(f'!!! IN transfer not completed: status={status}', 'red')
) )
self.loop.call_soon_threadsafe(self.on_transport_lost)
# Re-submit the transfer so we can receive more data
transfer.submit()
async def dequeue(self): async def dequeue(self):
while not self.closed: while not self.closed:
@@ -288,21 +282,6 @@ async def open_usb_transport(spec: str) -> Transport:
return return
self.parser.feed_data(packet) self.parser.feed_data(packet)
def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
def close(self): def close(self):
self.closed = True self.closed = True
@@ -331,15 +310,14 @@ async def open_usb_transport(spec: str) -> Transport:
f'IN[{packet_type}] transfer likely already completed' f'IN[{packet_type}] transfer likely already completed'
) )
# Wait for the thread to terminate
await self.event_loop_done
class UsbTransport(Transport): class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink): def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink) super().__init__(source, sink)
self.context = context self.context = context
self.device = device self.device = device
self.interface = interface self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()
# Get exclusive access # Get exclusive access
device.claimInterface(interface) device.claimInterface(interface)
@@ -352,6 +330,22 @@ async def open_usb_transport(spec: str) -> Transport:
source.start() source.start()
sink.start() sink.start()
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()
def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self): async def close(self):
self.source.close() self.source.close()
self.sink.close() self.sink.close()
@@ -361,6 +355,9 @@ async def open_usb_transport(spec: str) -> Transport:
self.device.close() self.device.close()
self.context.close() self.context.close()
# Wait for the thread to terminate
await self.event_loop_done
# Find the device according to the spec moniker # Find the device according to the spec moniker
load_libusb() load_libusb()
context = usb1.USBContext() context = usb1.USBContext()
@@ -540,7 +537,7 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError: except usb1.USBError:
logger.warning('failed to set configuration') logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in) source = UsbPacketSource(device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out) sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink) return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error: except usb1.USBError as error: