better packet queue logic

This commit is contained in:
Gilles Boccon-Gibod
2024-07-17 17:48:26 -07:00
parent 5aae44b610
commit 6a51166af7

View File

@@ -15,10 +15,10 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import threading
import collections
import ctypes
import platform
@@ -114,13 +114,17 @@ async def open_usb_transport(spec: str) -> Transport:
self.device = device
self.acl_out = acl_out
self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.acl_out_transfer_ready = asyncio.Semaphore(1)
self.packets: asyncio.Queue[bytes] = (
asyncio.Queue()
) # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.queue_task = None
self.cancel_done = self.loop.create_future()
self.closed = False
def start(self):
pass
self.queue_task = asyncio.create_task(self.process_queue())
def on_packet(self, packet):
# Ignore packets if we're closed
@@ -132,62 +136,64 @@ async def open_usb_transport(spec: str) -> Transport:
return
# Queue the packet
self.packets.append(packet)
if len(self.packets) == 1:
# The queue was previously empty, re-prime the pump
self.process_queue()
self.packets.put_nowait(packet)
def transfer_callback(self, transfer):
self.acl_out_transfer_ready.release()
status = transfer.getStatus()
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED:
if status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
return
if status != usb1.TRANSFER_COMPLETED:
logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red')
)
def on_packet_sent(self):
if self.packets:
self.packets.popleft()
self.process_queue()
async def process_queue(self):
while True:
# Wait for a packet to transfer.
packet = await self.packets.get()
def process_queue(self):
if len(self.packets) == 0:
return # Nothing to do
# Wait until we can start a transfer.
await self.acl_out_transfer_ready.acquire()
packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.transfer_callback,
)
self.acl_out_transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
# Transfer the packet.
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.transfer_callback,
)
self.acl_out_transfer.submit()
else:
logger.warning(
color(f'unsupported packet type {packet_type}', 'red')
)
def close(self):
self.closed = True
if self.queue_task:
self.queue_task.cancel()
async def terminate(self):
if not self.closed:
self.close()
# Empty the packet queue so that we don't send any more data
self.packets.clear()
while not self.packets.empty():
self.packets.get_nowait()
# If we have a transfer in flight, cancel it
if self.acl_out_transfer.isSubmitted():