Compare commits

..

3 Commits

Author SHA1 Message Date
Josh Wu 17bc5566aa Merge pull request #932 from zxzxwu/usb-iso-bulk-workaround
fix(usb): support LE ISO data over Bulk endpoints
2026-06-04 15:45:08 +08:00
Josh Wu 7a14ebdabe fix(usb): add transport layer support for sending ISO over Bulk Out
This change adds the missing transport-side support for sending HCI ISO Data packets
over the default Bulk Out endpoint when Isochronous endpoints are not enabled.
- Handles HCI_ISO_DATA_PACKET (0x05) in both `usb` and `pyusb` transports.
- Adds unit tests to verify the routing behavior.

TAG=agy
CONV=5502c76b-b272-4e43-a0b9-425a23cf137e
2026-06-03 22:08:39 +08:00
zxzxwu 17a202bc13 fix(usb): support LE ISO data over Bulk endpoints
This change implements a complete Bulk-only transport for LE Audio ISO
data (CIS/BIS) on USB controllers (like Intel BE200 and ASUSTek) that
send/expect ISO data over Bulk endpoints. It also improves the stability
and compatibility of periodic advertising sync on newer controllers.

Key Changes:
1. Host Layer Workaround (Bulk In):
   - Intercepts ACL packets using CIS/BIS handles on Bulk In.
   - Adaptively reconstructs them into HCI ISO Data packets:
     * For CIS (Unicast): Dynamically determines if the receiver controller
       includes a Timestamp in the ACL-wrapped payload (Intel does not,
       Realtek does) by checking the controller's company_identifier.
       It then correctly reconstructs either a 4-byte (TS_Flag = 0) or
       8-byte (TS_Flag = 1) ISO header.
     * For BIS (Broadcast): Reconstructs an 8-byte ISO header (TS_Flag = 1)
       as BIS packets always include the Timestamp.
     This vendor-adaptive approach dynamically supports both Unicast and
     Broadcast ISO across different controller hardware (Intel & Realtek) in
     all transmitter/receiver roles.
   - Cleans up the learned TS flags from memory when the link is disconnected.
2. USB Transport Layer (Bulk Out):
   - Adds support for sending HCI ISO Data packets over the default
     Bulk Out endpoint when Isochronous endpoints are not enabled.
3. LE Periodic Sync V2 Event Support:
   - Enables `HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT` in
     the LE event mask and implements its handler in Host. This supports
     periodic sync on BT 5.4 controllers (like Intel BE200) that use the
     V2 event.

This enables seamless LE Audio Broadcast/Unicast ISO receipt and
transmission on standard USB Bluetooth controllers without requiring
alternate interface activation (+sco is not needed).

TAG=agy
CONV=8b9a01f7-32cb-4a83-9300-23c4b688d861
2026-06-02 16:23:03 +08:00
4 changed files with 240 additions and 76 deletions
+94
View File
@@ -247,6 +247,7 @@ class Host(utils.EventEmitter):
bis_links: dict[int, IsoLink]
sco_links: dict[int, ScoLink]
bigs: dict[int, set[int]]
link_ts_flags: dict[int, int]
acl_packet_queue: DataPacketQueue | None = None
le_acl_packet_queue: DataPacketQueue | None = None
iso_packet_queue: DataPacketQueue | None = None
@@ -269,6 +270,7 @@ class Host(utils.EventEmitter):
self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles
self.link_ts_flags = {} # TS_Flag for ISO links, by handle
self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None
self.pending_response: (
asyncio.Future[
@@ -486,6 +488,7 @@ class Host(utils.EventEmitter):
hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
hci.HCI_LE_SCAN_TIMEOUT_EVENT,
@@ -1028,6 +1031,82 @@ class Host(utils.EventEmitter):
# Look for the connection to which this data belongs
if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet)
return
# WORKAROUND: Some controllers (e.g. Intel BE200) send ISO data wrapped in ACL packets
# using the CIS handle.
is_cis = packet.connection_handle in self.cis_links
is_bis = packet.connection_handle in self.bis_links
if is_cis or is_bis:
logger.debug(
f"Received ISO data wrapped in ACL packet for handle 0x{packet.connection_handle:04X}"
)
payload = packet.data
ts_flag = self.link_ts_flags.get(packet.connection_handle)
if ts_flag is None:
# Learn TS flag from the first packet on this link
if is_bis:
# BIS packets always have Timestamp according to spec
ts_flag = 1
elif len(payload) < 8:
# Too short to have 8-byte header (TS), must be No TS
ts_flag = 0
else:
psn_no_ts = int.from_bytes(payload[0:2], 'little')
psn_has_ts = int.from_bytes(payload[4:6], 'little')
if psn_has_ts == 0:
ts_flag = 1
elif psn_no_ts == 0:
ts_flag = 0
else:
# Fallback heuristic
ts_flag = 1 if psn_has_ts < psn_no_ts else 0
self.link_ts_flags[packet.connection_handle] = ts_flag
logger.info(
f"Learned TS_Flag = {ts_flag} for handle 0x{packet.connection_handle:04X}"
)
if ts_flag:
header_size = 8
sdu_length_offset = 6
else:
header_size = 4
sdu_length_offset = 2
pb_flag = 0b10
if len(payload) >= header_size:
sdu_length = int.from_bytes(
payload[sdu_length_offset : sdu_length_offset + 2], 'little'
)
if sdu_length == len(payload) - header_size:
pb_flag = 0b10 # Complete SDU
else:
pb_flag = 0b00 # First fragment
else:
pb_flag = 0b01 # Continuation
ts_flag = 0
# Reconstruct the raw ISO packet (excluding packet indicator 0x05)
pdu_info = packet.connection_handle | (pb_flag << 12) | (ts_flag << 14)
header = bytes(
[
pdu_info & 0xFF,
(pdu_info >> 8) & 0xFF,
len(payload) & 0xFF,
(len(payload) >> 8) & 0xFF,
]
)
raw_iso_packet = header + payload
try:
iso_packet = hci.HCI_IsoDataPacket.from_bytes(
bytes([hci.HCI_ISO_DATA_PACKET]) + raw_iso_packet
)
self.on_hci_iso_data_packet(iso_packet)
except Exception as e:
logger.warning(f"Failed to reconstruct ISO packet from ACL: {e}")
def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None:
# Experimental
@@ -1251,6 +1330,7 @@ class Host(utils.EventEmitter):
self.emit('disconnection', handle, event.reason)
# Remove the handle reference
self.link_ts_flags.pop(handle, None)
_ = (
self.connections.pop(handle, 0)
or self.cis_links.pop(handle, 0)
@@ -1371,6 +1451,20 @@ class Host(utils.EventEmitter):
event.advertiser_clock_accuracy,
)
def on_hci_le_periodic_advertising_sync_established_v2_event(
self, event: hci.HCI_LE_Periodic_Advertising_Sync_Established_V2_Event
):
self.emit(
'periodic_advertising_sync_establishment',
event.status,
event.sync_handle,
event.advertising_sid,
event.advertiser_address,
event.advertiser_phy,
event.periodic_advertising_interval,
event.advertiser_clock_accuracy,
)
def on_hci_le_periodic_advertising_sync_lost_event(
self, event: hci.HCI_LE_Periodic_Advertising_Sync_Lost_Event
):
+3
View File
@@ -104,6 +104,9 @@ async def open_pyusb_transport(spec: str) -> Transport:
0,
packet[1:],
)
elif packet_type == hci.HCI_ISO_DATA_PACKET:
# Workaround: Send ISO packets over Bulk Out
self.device.write(USB_ENDPOINT_ACL_OUT, packet[1:])
else:
logger.warning(
color(f'unsupported packet type {packet_type}', 'red')
+48 -76
View File
@@ -59,8 +59,6 @@ USB_BT_HCI_CLASS_TUPLE = (
)
MAX_SCO_PACKET_SIZE = 1024
MAX_SCO_IN_PACKETS = 128
NUMBER_OF_SCO_IN_TRANSFERS = 2
# -----------------------------------------------------------------------------
@@ -338,6 +336,25 @@ class UsbPacketSink:
)
self.isochronous_out_transfer.submit()
submitted = True
elif packet_type == hci.HCI_ISO_DATA_PACKET:
if self.isochronous_out_transfer is None:
# Workaround: Send ISO packets over Bulk Out when Isochronous endpoints are not enabled
self.bulk_or_control_out_transfer.setBulk(
self.bulk_out.getAddress(),
packet_payload,
callback=self.transfer_callback,
)
self.bulk_or_control_out_transfer.submit()
submitted = True
else:
logger.warning(
color(
'ISO packets over Isochronous endpoints not supported yet',
'red',
)
)
self.out_transfer_ready.release()
continue
else:
logger.warning(
color(f'unsupported packet type {packet_type}', 'red')
@@ -390,35 +407,20 @@ class UsbPacketSink:
READ_SIZE = 4096
class PacketSplitter:
"""Splitter than can parse a byte stream and extract packets that consist of a
header and a body, where the header includes an n-byte 'length' field at a
certain offset.
Extracted packets are emitted by calling a function passed to the constructor,
with the full packet (header + body) as argument.
"""
def __init__(
self, length_offset: int, length_size: int, emit: Callable[[bytes], Any]
) -> None:
class ScoAccumulator:
def __init__(self, emit: Callable[[bytes], Any]) -> None:
self.emit = emit
self.packet = b''
self.length_offset = length_offset
self.length_size = length_size
self.header_size = length_offset + length_size
def feed(self, data: bytes) -> None:
while data:
# Accumulate until we have a complete header
if (bytes_needed := self.header_size - len(self.packet)) > 0:
# Accumulate until we have a complete 3-byte header
if (bytes_needed := 3 - len(self.packet)) > 0:
self.packet += data[:bytes_needed]
data = data[bytes_needed:]
continue
packet_length = self.header_size + int.from_bytes(
self.packet[self.length_offset : self.length_offset + self.length_size],
'little',
)
packet_length = 3 + self.packet[2]
bytes_needed = packet_length - len(self.packet)
self.packet += data[:bytes_needed]
data = data[bytes_needed:]
@@ -428,24 +430,6 @@ class PacketSplitter:
self.packet = b''
class ScoPacketSplitter(PacketSplitter):
def __init__(self, emit: Callable[[bytes], Any]) -> None:
# The length field is 1 byte at offset 2 in the HCI SCO packet header
super().__init__(length_offset=2, length_size=1, emit=emit)
class EventPacketSplitter(PacketSplitter):
def __init__(self, emit: Callable[[bytes], Any]) -> None:
# The length field is 1 byte at offset 1 in the HCI Event packet header
super().__init__(length_offset=1, length_size=1, emit=emit)
class AclPacketSplitter(PacketSplitter):
def __init__(self, emit: Callable[[bytes], Any]) -> None:
# The length field is 2 bytes at offset 2 in the HCI ACL packet header
super().__init__(length_offset=2, length_size=2, emit=emit)
class UsbPacketSource(asyncio.Protocol, BaseSource):
def __init__(self, device, metadata, interrupt_in, bulk_in, isochronous_in):
super().__init__()
@@ -456,23 +440,17 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
self.bulk_in = bulk_in
self.bulk_in_transfer = None
self.isochronous_in = isochronous_in
self.isochronous_in_transfers = []
self.isochronous_in_transfer = None
self.isochronous_accumulator = ScoAccumulator(
lambda packet: self.queue_packet(hci.HCI_SYNCHRONOUS_DATA_PACKET, packet)
)
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.dequeue_task = None
self.done = {}
self.splitters = {
hci.HCI_EVENT_PACKET: EventPacketSplitter(
lambda packet: self.queue_packet(hci.HCI_EVENT_PACKET, packet)
),
hci.HCI_ACL_DATA_PACKET: AclPacketSplitter(
lambda packet: self.queue_packet(hci.HCI_ACL_DATA_PACKET, packet)
),
hci.HCI_SYNCHRONOUS_DATA_PACKET: ScoPacketSplitter(
lambda packet: self.queue_packet(
hci.HCI_SYNCHRONOUS_DATA_PACKET, packet
)
),
self.done = {
hci.HCI_EVENT_PACKET: asyncio.Event(),
hci.HCI_ACL_DATA_PACKET: asyncio.Event(),
hci.HCI_SYNCHRONOUS_DATA_PACKET: asyncio.Event(),
}
self.closed = False
self.lock = threading.Lock()
@@ -486,7 +464,6 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET,
)
self.done[self.interrupt_in_transfer] = asyncio.Event()
self.interrupt_in_transfer.submit()
self.bulk_in_transfer = self.device.getTransfer()
@@ -496,21 +473,17 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET,
)
self.done[self.bulk_in_transfer] = asyncio.Event()
self.bulk_in_transfer.submit()
if self.isochronous_in is not None:
for _ in range(NUMBER_OF_SCO_IN_TRANSFERS):
transfer = self.device.getTransfer(iso_packets=MAX_SCO_IN_PACKETS)
transfer.setIsochronous(
self.isochronous_in.getAddress(),
MAX_SCO_IN_PACKETS * self.isochronous_in.getMaxPacketSize(),
callback=self.transfer_callback,
user_data=hci.HCI_SYNCHRONOUS_DATA_PACKET,
)
self.isochronous_in_transfers.append(transfer)
self.done[transfer] = asyncio.Event()
transfer.submit()
self.isochronous_in_transfer = self.device.getTransfer(iso_packets=16)
self.isochronous_in_transfer.setIsochronous(
self.isochronous_in.getAddress(),
16 * self.isochronous_in.getMaxPacketSize(),
callback=self.transfer_callback,
user_data=hci.HCI_SYNCHRONOUS_DATA_PACKET,
)
self.isochronous_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue())
@@ -536,8 +509,6 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
with self.lock:
if self.closed:
logger.debug("packet source closed, discarding transfer")
elif (splitter := self.splitters.get(packet_type)) is None:
logger.warning(f'no splitter for packet type {packet_type}')
else:
if packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
for iso_status, iso_buffer in transfer.iterISO():
@@ -551,10 +522,11 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
len(iso_buffer),
iso_buffer.hex(),
)
splitter.feed(iso_buffer)
self.isochronous_accumulator.feed(iso_buffer)
else:
splitter.feed(
transfer.getBuffer()[: transfer.getActualLength()]
self.queue_packet(
packet_type,
transfer.getBuffer()[: transfer.getActualLength()],
)
# Re-submit the transfer so we can receive more data
@@ -565,12 +537,12 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
self.loop.call_soon_threadsafe(self.on_transport_lost)
elif status == usb1.TRANSFER_CANCELLED:
logger.debug(f"IN[{packet_type}] transfer canceled")
self.loop.call_soon_threadsafe(self.done[transfer].set)
self.loop.call_soon_threadsafe(self.done[packet_type].set)
else:
logger.warning(
color(f'!!! IN[{packet_type}] transfer not completed', 'red')
)
self.loop.call_soon_threadsafe(self.done[transfer].set)
self.loop.call_soon_threadsafe(self.done[packet_type].set)
self.loop.call_soon_threadsafe(self.on_transport_lost)
async def dequeue(self):
@@ -599,7 +571,7 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
for transfer in (
self.interrupt_in_transfer,
self.bulk_in_transfer,
*self.isochronous_in_transfers,
self.isochronous_in_transfer,
):
if transfer is None:
continue
@@ -615,7 +587,7 @@ class UsbPacketSource(asyncio.Protocol, BaseSource):
f'waiting for IN[{packet_type}] transfer cancellation '
'to be done...'
)
await self.done[transfer].wait()
await self.done[packet_type].wait()
logger.debug(f'IN[{packet_type}] transfer cancellation done')
except usb1.USBError as error:
logger.debug(
+95
View File
@@ -0,0 +1,95 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from unittest import mock
import pytest
from bumble import hci
from bumble.transport import usb
@pytest.mark.asyncio
async def test_usb_packet_sink_iso_routing():
# Mock usb1 device and endpoints
mock_device = mock.Mock()
mock_bulk_out = mock.Mock()
mock_bulk_out.getAddress.return_value = 0x02
# Scenario 1: Isochronous endpoints are not enabled (isochronous_out is None)
mock_transfer = mock.Mock()
mock_device.getTransfer.return_value = mock_transfer
sink = usb.UsbPacketSink(mock_device, mock_bulk_out, isochronous_out=None)
sink.start()
# Send HCI_ISO_DATA_PACKET
iso_packet = bytes([hci.HCI_ISO_DATA_PACKET, 0x01, 0x02, 0x03])
sink.on_packet(iso_packet)
# Yield control to let the queue processor run
await asyncio.sleep(0.01)
# Verify it was sent via bulk transfer
mock_transfer.setBulk.assert_called_once_with(
0x02,
bytes([0x01, 0x02, 0x03]),
callback=sink.transfer_callback,
)
mock_transfer.submit.assert_called_once()
if sink.queue_task:
sink.queue_task.cancel()
try:
await sink.queue_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_usb_packet_sink_iso_routing_with_iso_endpoint():
# Mock usb1 device and endpoints
mock_device = mock.Mock()
mock_bulk_out = mock.Mock()
mock_bulk_out.getAddress.return_value = 0x02
mock_iso_out = mock.Mock()
mock_iso_out.getMaxPacketSize.return_value = 64
# Scenario 2: Isochronous endpoints are enabled
mock_transfer_bulk = mock.Mock()
mock_transfer_iso = mock.Mock()
# getTransfer is called twice: once for bulk_or_control and once for isochronous
mock_device.getTransfer.side_effect = [mock_transfer_bulk, mock_transfer_iso]
sink = usb.UsbPacketSink(mock_device, mock_bulk_out, isochronous_out=mock_iso_out)
sink.start()
# Send HCI_ISO_DATA_PACKET
iso_packet = bytes([hci.HCI_ISO_DATA_PACKET, 0x01, 0x02, 0x03])
sink.on_packet(iso_packet)
# Yield control to let the queue processor run
await asyncio.sleep(0.01)
# Verify it was NOT sent via bulk transfer
mock_transfer_bulk.setBulk.assert_not_called()
if sink.queue_task:
sink.queue_task.cancel()
try:
await sink.queue_task
except asyncio.CancelledError:
pass