minor cleanup of the internals of the usb transport implementation

This commit is contained in:
Gilles Boccon-Gibod
2023-02-03 18:45:45 -08:00
parent 0667e83919
commit a8ec1b0949

View File

@@ -113,7 +113,7 @@ async def open_usb_transport(spec: str) -> Transport:
def __init__(self, device, acl_out): def __init__(self, device, acl_out):
self.device = device self.device = device
self.acl_out = acl_out self.acl_out = acl_out
self.transfer = device.getTransfer() self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future() self.cancel_done = self.loop.create_future()
@@ -137,21 +137,20 @@ async def open_usb_transport(spec: str) -> Transport:
# The queue was previously empty, re-prime the pump # The queue was previously empty, re-prime the pump
self.process_queue() self.process_queue()
def on_packet_sent(self, transfer): def transfer_callback(self, transfer):
status = transfer.getStatus() status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')
# pylint: disable=no-member # pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED: if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent_) self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED: elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None) self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else: else:
logger.warning( logger.warning(
color(f'!!! out transfer not completed: status={status}', 'red') color(f'!!! OUT transfer not completed: status={status}', 'red')
) )
def on_packet_sent_(self): def on_packet_sent(self):
if self.packets: if self.packets:
self.packets.popleft() self.packets.popleft()
self.process_queue() self.process_queue()
@@ -163,22 +162,20 @@ async def open_usb_transport(spec: str) -> Transport:
packet = self.packets[0] packet = self.packets[0]
packet_type = packet[0] packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET: if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk( self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent self.acl_out, packet[1:], callback=self.transfer_callback
) )
logger.debug('submit ACL') self.acl_out_transfer.submit()
self.transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET: elif packet_type == hci.HCI_COMMAND_PACKET:
self.transfer.setControl( self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0, 0,
0, 0,
0, 0,
packet[1:], packet[1:],
callback=self.on_packet_sent, callback=self.transfer_callback,
) )
logger.debug('submit COMMAND') self.acl_out_transfer.submit()
self.transfer.submit()
else: else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red')) logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
@@ -193,11 +190,11 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.clear() self.packets.clear()
# If we have a transfer in flight, cancel it # If we have a transfer in flight, cancel it
if self.transfer.isSubmitted(): if self.acl_out_transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have # Try to cancel the transfer, but that may fail because it may have
# already completed # already completed
try: try:
self.transfer.cancel() self.acl_out_transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...') logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done await self.cancel_done
@@ -206,27 +203,22 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug('OUT transfer likely already completed') logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource): class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, metadata, acl_in, events_in): def __init__(self, device, metadata, acl_in, events_in):
super().__init__() super().__init__()
self.context = context
self.device = device self.device = device
self.metadata = metadata self.metadata = metadata
self.acl_in = acl_in self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.dequeue_task = None self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = { self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(), hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(), hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
} }
self.events_in_transfer = None self.closed = False
self.acl_in_transfer = None
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
def start(self): def start(self):
# Set up transfer objects for input # Set up transfer objects for input
@@ -234,7 +226,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.events_in_transfer.setInterrupt( self.events_in_transfer.setInterrupt(
self.events_in, self.events_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET, user_data=hci.HCI_EVENT_PACKET,
) )
self.events_in_transfer.submit() self.events_in_transfer.submit()
@@ -243,22 +235,23 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_in_transfer.setBulk( self.acl_in_transfer.setBulk(
self.acl_in, self.acl_in,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET, user_data=hci.HCI_ACL_DATA_PACKET,
) )
self.acl_in_transfer.submit() self.acl_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue()) self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()
def on_packet_received(self, transfer): @property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)
def transfer_callback(self, transfer):
packet_type = transfer.getUserData() packet_type = transfer.getUserData()
status = transfer.getStatus() status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )
# pylint: disable=no-member # pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED: if status == usb1.TRANSFER_COMPLETED:
@@ -267,19 +260,18 @@ async def open_usb_transport(spec: str) -> Transport:
+ transfer.getBuffer()[: transfer.getActualLength()] + transfer.getBuffer()[: transfer.getActualLength()]
) )
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet) self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED: elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe( self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None self.cancel_done[packet_type].set_result, None
) )
return
else: else:
logger.warning( logger.warning(
color(f'!!! transfer not completed: status={status}', 'red') color(f'!!! IN transfer not completed: status={status}', 'red')
) )
# Re-submit the transfer so we can receive more data
transfer.submit()
async def dequeue(self): async def dequeue(self):
while not self.closed: while not self.closed:
try: try:
@@ -288,21 +280,6 @@ async def open_usb_transport(spec: str) -> Transport:
return return
self.parser.feed_data(packet) self.parser.feed_data(packet)
def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
def close(self): def close(self):
self.closed = True self.closed = True
@@ -331,15 +308,14 @@ async def open_usb_transport(spec: str) -> Transport:
f'IN[{packet_type}] transfer likely already completed' f'IN[{packet_type}] transfer likely already completed'
) )
# Wait for the thread to terminate
await self.event_loop_done
class UsbTransport(Transport): class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink): def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink) super().__init__(source, sink)
self.context = context self.context = context
self.device = device self.device = device
self.interface = interface self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()
# Get exclusive access # Get exclusive access
device.claimInterface(interface) device.claimInterface(interface)
@@ -352,6 +328,22 @@ async def open_usb_transport(spec: str) -> Transport:
source.start() source.start()
sink.start() sink.start()
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()
def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self): async def close(self):
self.source.close() self.source.close()
self.sink.close() self.sink.close()
@@ -361,6 +353,9 @@ async def open_usb_transport(spec: str) -> Transport:
self.device.close() self.device.close()
self.context.close() self.context.close()
# Wait for the thread to terminate
await self.event_loop_done
# Find the device according to the spec moniker # Find the device according to the spec moniker
load_libusb() load_libusb()
context = usb1.USBContext() context = usb1.USBContext()
@@ -540,7 +535,7 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError: except usb1.USBError:
logger.warning('failed to set configuration') logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in) source = UsbPacketSource(device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out) sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink) return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error: except usb1.USBError as error: