Compare commits

...

13 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 3266d16cf1 Merge pull request #936 from google/gbg/usb-transport-packet-splitter
usb transport packet splitter
2026-06-07 12:52:53 +02:00
Gilles Boccon-Gibod 65c4f9a698 add unit test 2026-06-07 12:30:38 +02:00
Josh Wu 17bc5566aa Merge pull request #932 from zxzxwu/usb-iso-bulk-workaround
fix(usb): support LE ISO data over Bulk endpoints
2026-06-04 15:45:08 +08:00
Gilles Boccon-Gibod b6a21fa3c6 use multiple in transfers for isochronous endpoints 2026-06-03 21:23:34 +02:00
Josh Wu 7a14ebdabe fix(usb): add transport layer support for sending ISO over Bulk Out
This change adds the missing transport-side support for sending HCI ISO Data packets
over the default Bulk Out endpoint when Isochronous endpoints are not enabled.
- Handles HCI_ISO_DATA_PACKET (0x05) in both `usb` and `pyusb` transports.
- Adds unit tests to verify the routing behavior.

TAG=agy
CONV=5502c76b-b272-4e43-a0b9-425a23cf137e
2026-06-03 22:08:39 +08:00
Gilles Boccon-Gibod e44eaf2147 implement packet splitters 2026-06-03 15:55:41 +02:00
zxzxwu 17a202bc13 fix(usb): support LE ISO data over Bulk endpoints
This change implements a complete Bulk-only transport for LE Audio ISO
data (CIS/BIS) on USB controllers (like Intel BE200 and ASUSTek) that
send/expect ISO data over Bulk endpoints. It also improves the stability
and compatibility of periodic advertising sync on newer controllers.

Key Changes:
1. Host Layer Workaround (Bulk In):
   - Intercepts ACL packets using CIS/BIS handles on Bulk In.
   - Adaptively reconstructs them into HCI ISO Data packets:
     * For CIS (Unicast): Dynamically determines if the receiver controller
       includes a Timestamp in the ACL-wrapped payload (Intel does not,
       Realtek does) by checking the controller's company_identifier.
       It then correctly reconstructs either a 4-byte (TS_Flag = 0) or
       8-byte (TS_Flag = 1) ISO header.
     * For BIS (Broadcast): Reconstructs an 8-byte ISO header (TS_Flag = 1)
       as BIS packets always include the Timestamp.
     This vendor-adaptive approach dynamically supports both Unicast and
     Broadcast ISO across different controller hardware (Intel & Realtek) in
     all transmitter/receiver roles.
   - Cleans up the learned TS flags from memory when the link is disconnected.
2. USB Transport Layer (Bulk Out):
   - Adds support for sending HCI ISO Data packets over the default
     Bulk Out endpoint when Isochronous endpoints are not enabled.
3. LE Periodic Sync V2 Event Support:
   - Enables `HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT` in
     the LE event mask and implements its handler in Host. This supports
     periodic sync on BT 5.4 controllers (like Intel BE200) that use the
     V2 event.

This enables seamless LE Audio Broadcast/Unicast ISO receipt and
transmission on standard USB Bluetooth controllers without requiring
alternate interface activation (+sco is not needed).

TAG=agy
CONV=8b9a01f7-32cb-4a83-9300-23c4b688d861
2026-06-02 16:23:03 +08:00
Gilles Boccon-Gibod ef634953f0 Merge pull request #896 from google/gbg/usb-hci-sco
add basic support for SCO packets over USB
2026-06-01 18:34:59 +02:00
Gilles Boccon-Gibod 71672ec64f address PR comments 2026-06-01 18:17:23 +02:00
Gilles Boccon-Gibod 5ee2d80ce4 multiple packets per transfer 2026-05-29 08:47:28 +02:00
Gilles Boccon-Gibod 9b2e345a1e fix types 2026-05-22 18:02:47 +02:00
Gilles Boccon-Gibod f9bd3084b9 revert libusb-package version change 2026-05-22 18:02:47 +02:00
Gilles Boccon-Gibod 808ea1abeb add basic support for SCO packets over USB 2026-05-22 18:02:47 +02:00
15 changed files with 1575 additions and 564 deletions
+1 -1
View File
@@ -489,7 +489,7 @@ class Sender:
flags=( flags=(
Packet.PacketFlags.LAST Packet.PacketFlags.LAST
if tx_i == self.tx_packet_count - 1 if tx_i == self.tx_packet_count - 1
else 0 else Packet.PacketFlags(0)
), ),
sequence=tx_i, sequence=tx_i,
timestamp=int((time.time() - self.start_time) * 1000000), timestamp=int((time.time() - self.start_time) * 1000000),
+12
View File
@@ -45,8 +45,10 @@ from bumble.hci import (
HCI_Read_Local_Supported_Codecs_Command, HCI_Read_Local_Supported_Codecs_Command,
HCI_Read_Local_Supported_Codecs_V2_Command, HCI_Read_Local_Supported_Codecs_V2_Command,
HCI_Read_Local_Version_Information_Command, HCI_Read_Local_Version_Information_Command,
HCI_Read_Voice_Setting_Command,
LeFeature, LeFeature,
SpecificationVersion, SpecificationVersion,
VoiceSetting,
map_null_terminated_utf8_string, map_null_terminated_utf8_string,
) )
from bumble.host import Host from bumble.host import Host
@@ -214,6 +216,16 @@ async def get_codecs_info(host: Host) -> None:
if not response2.vendor_specific_codec_ids: if not response2.vendor_specific_codec_ids:
print(' No Vendor-specific codecs') print(' No Vendor-specific codecs')
if host.supports_command(HCI_Read_Voice_Setting_Command.op_code):
response3 = await host.send_sync_command(HCI_Read_Voice_Setting_Command())
voice_setting = VoiceSetting.from_int(response3.voice_setting)
print(color('Voice Setting:', 'yellow'))
print(f' Air Coding Format: {voice_setting.air_coding_format.name}')
print(f' Linear PCM Bit Position: {voice_setting.linear_pcm_bit_position}')
print(f' Input Sample Size: {voice_setting.input_sample_size.name}')
print(f' Input Data Format: {voice_setting.input_data_format.name}')
print(f' Input Coding Format: {voice_setting.input_coding_format.name}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def async_main( async def async_main(
+142 -32
View File
@@ -16,6 +16,8 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
import statistics
import struct
import time import time
import click import click
@@ -25,7 +27,9 @@ from bumble.colors import color
from bumble.hci import ( from bumble.hci import (
HCI_READ_LOOPBACK_MODE_COMMAND, HCI_READ_LOOPBACK_MODE_COMMAND,
HCI_WRITE_LOOPBACK_MODE_COMMAND, HCI_WRITE_LOOPBACK_MODE_COMMAND,
Address,
HCI_Read_Loopback_Mode_Command, HCI_Read_Loopback_Mode_Command,
HCI_SynchronousDataPacket,
HCI_Write_Loopback_Mode_Command, HCI_Write_Loopback_Mode_Command,
LoopbackMode, LoopbackMode,
) )
@@ -36,55 +40,121 @@ from bumble.transport import open_transport
class Loopback: class Loopback:
"""Send and receive ACL data packets in local loopback mode""" """Send and receive ACL data packets in local loopback mode"""
def __init__(self, packet_size: int, packet_count: int, transport: str): def __init__(
self,
packet_size: int,
packet_count: int,
connection_type: str,
mode: str,
interval: int,
transport: str,
):
self.transport = transport self.transport = transport
self.packet_size = packet_size self.packet_size = packet_size
self.packet_count = packet_count self.packet_count = packet_count
self.connection_handle: int | None = None self.connection_handle: int | None = None
self.connection_type = connection_type
self.connection_event = asyncio.Event() self.connection_event = asyncio.Event()
self.mode = mode
self.interval = interval
self.done = asyncio.Event() self.done = asyncio.Event()
self.expected_cid = 0 self.expected_counter = 0
self.bytes_received = 0 self.bytes_received = 0
self.start_timestamp = 0.0 self.start_timestamp = 0.0
self.last_timestamp = 0.0 self.last_timestamp = 0.0
self.send_timestamps: list[float] = []
self.rtts: list[float] = []
def on_connection(self, connection_handle: int, *args): def on_connection(self, connection_handle: int, *args):
"""Retrieve connection handle from new connection event""" """Retrieve connection handle from new connection event"""
if not self.connection_event.is_set(): if not self.connection_event.is_set():
# save first connection handle for ACL # The first connection handle is of type ACL,
# subsequent connections are SCO # subsequent connections are of type SCO
if self.connection_type == "sco" and self.connection_handle is None:
self.connection_handle = connection_handle
return
self.connection_handle = connection_handle self.connection_handle = connection_handle
self.connection_event.set() self.connection_event.set()
def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes): def on_sco_connection(
self,
address: Address,
connection_handle: int,
link_type,
rx_packet_length: int,
tx_packet_length: int,
air_mode: int,
) -> None:
self.on_connection(connection_handle)
def on_packet(self, connection_handle: int, packet: bytes):
"""Calculate packet receive speed""" """Calculate packet receive speed"""
now = time.time() now = time.time()
print(f'<<< Received packet {cid}: {len(pdu)} bytes') (counter,) = struct.unpack_from("H", packet, 0)
rtt = now - self.send_timestamps[counter]
self.rtts.append(rtt)
print(f'<<< Received packet {counter}: {len(packet)} bytes, RTT={rtt:.4f}')
assert connection_handle == self.connection_handle assert connection_handle == self.connection_handle
assert cid == self.expected_cid assert counter == self.expected_counter
self.expected_cid += 1 self.expected_counter += 1
if cid == 0: if counter == 0:
self.start_timestamp = now self.start_timestamp = now
else: else:
elapsed_since_start = now - self.start_timestamp elapsed_since_start = now - self.start_timestamp
elapsed_since_last = now - self.last_timestamp elapsed_since_last = now - self.last_timestamp
self.bytes_received += len(pdu) self.bytes_received += len(packet)
instant_rx_speed = len(pdu) / elapsed_since_last instant_rx_speed = len(packet) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start average_rx_speed = self.bytes_received / elapsed_since_start
if self.mode == 'throughput':
print( print(
color( color(
f'@@@ RX speed: instant={instant_rx_speed:.4f},' f'@@@ RX speed: instant={instant_rx_speed:.4f},'
f' average={average_rx_speed:.4f}', f' average={average_rx_speed:.4f},',
'cyan', 'cyan',
) )
) )
self.last_timestamp = now self.last_timestamp = now
if self.expected_cid == self.packet_count: if self.expected_counter == self.packet_count:
print(color('@@@ Received last packet', 'green')) print(color('@@@ Received last packet', 'green'))
self.done.set() self.done.set()
def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes):
self.on_packet(connection_handle, pdu)
def on_sco_packet(self, connection_handle: int, packet) -> None:
self.on_packet(connection_handle, packet)
async def send_acl_packet(self, host: Host, packet: bytes) -> None:
assert self.connection_handle
host.send_l2cap_pdu(self.connection_handle, 0, packet)
async def send_sco_packet(self, host: Host, packet: bytes) -> None:
assert self.connection_handle
host.send_hci_packet(
HCI_SynchronousDataPacket(
connection_handle=self.connection_handle,
packet_status=HCI_SynchronousDataPacket.Status.CORRECTLY_RECEIVED_DATA,
data_total_length=len(packet),
data=packet,
)
)
async def send_loop(self, host: Host, sender) -> None:
for counter in range(0, self.packet_count):
print(
color(
f'>>> Sending {self.connection_type.upper()} '
f'packet {counter}: {self.packet_size} bytes',
'yellow',
)
)
self.send_timestamps.append(time.time())
await sender(host, struct.pack("H", counter) + bytes(self.packet_size - 2))
await asyncio.sleep(self.interval / 1000 if self.mode == "rtt" else 0)
async def run(self) -> None: async def run(self) -> None:
"""Run a loopback throughput test""" """Run a loopback throughput test"""
print(color('>>> Connecting to HCI...', 'green')) print(color('>>> Connecting to HCI...', 'green'))
@@ -126,8 +196,11 @@ class Loopback:
return return
# set event callbacks # set event callbacks
host.on('connection', self.on_connection) host.on('classic_connection', self.on_connection)
host.on('le_connection', self.on_connection)
host.on('sco_connection', self.on_sco_connection)
host.on('l2cap_pdu', self.on_l2cap_pdu) host.on('l2cap_pdu', self.on_l2cap_pdu)
host.on('sco_packet', self.on_sco_packet)
loopback_mode = LoopbackMode.LOCAL loopback_mode = LoopbackMode.LOCAL
@@ -148,32 +221,37 @@ class Loopback:
print(color('=== Start sending', 'magenta')) print(color('=== Start sending', 'magenta'))
start_time = time.time() start_time = time.time()
bytes_sent = 0 if self.connection_type == "acl":
for cid in range(0, self.packet_count): sender = self.send_acl_packet
# using the cid as an incremental index elif self.connection_type == "sco":
host.send_l2cap_pdu( sender = self.send_sco_packet
self.connection_handle, cid, bytes(self.packet_size) else:
) raise ValueError(f'Unknown connection type: {self.connection_type}')
print( await self.send_loop(host, sender)
color(
f'>>> Sending packet {cid}: {self.packet_size} bytes', 'yellow'
)
)
bytes_sent += self.packet_size # don't count L2CAP or HCI header sizes
await asyncio.sleep(0) # yield to allow packet receive
await self.done.wait() await self.done.wait()
print(color('=== Done!', 'magenta')) print(color('=== Done!', 'magenta'))
bytes_sent = self.packet_size * self.packet_count
elapsed = time.time() - start_time elapsed = time.time() - start_time
average_tx_speed = bytes_sent / elapsed average_tx_speed = bytes_sent / elapsed
if self.mode == 'throughput':
print( print(
color( color(
f'@@@ TX speed: average={average_tx_speed:.4f} ({bytes_sent} bytes' f'@@@ TX speed: average={average_tx_speed:.4f} '
f' in {elapsed:.2f} seconds)', f'({bytes_sent} bytes in {elapsed:.2f} seconds)',
'green', 'green',
) )
) )
if self.mode == 'rtt':
print(
color(
f'RTTs: min={min(self.rtts):.4f}, '
f'max={max(self.rtts):.4f}, '
f'avg={statistics.mean(self.rtts):.4f}',
'blue',
)
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -194,11 +272,43 @@ class Loopback:
default=10, default=10,
help='Packet count', help='Packet count',
) )
@click.option(
'--connection-type',
'-t',
metavar='TYPE',
type=click.Choice(['acl', 'sco']),
default='acl',
help='Connection type',
)
@click.option(
'--mode',
'-m',
metavar='MODE',
type=click.Choice(['throughput', 'rtt']),
default='throughput',
help='Test mode',
)
@click.option(
'--interval',
type=int,
default=100,
help='Inter-packet interval (ms) [RTT mode only]',
)
@click.argument('transport') @click.argument('transport')
def main(packet_size, packet_count, transport): def main(packet_size, packet_count, connection_type, mode, interval, transport):
bumble.logging.setup_basic_logging() bumble.logging.setup_basic_logging()
loopback = Loopback(packet_size, packet_count, transport)
asyncio.run(loopback.run()) if connection_type == "sco" and packet_size > 255:
print("ERROR: the maximum packet size for SCO is 255")
return
async def run():
loopback = Loopback(
packet_size, packet_count, connection_type, mode, interval, transport
)
await loopback.run()
asyncio.run(run())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
+6 -1
View File
@@ -111,9 +111,14 @@ def show_device_details(device):
if (endpoint.getAddress() & USB_ENDPOINT_IN == 0) if (endpoint.getAddress() & USB_ENDPOINT_IN == 0)
else 'IN' else 'IN'
) )
endpoint_details = (
f', Max Packet Size = {endpoint.getMaxPacketSize()}'
if endpoint_type == 'ISOCHRONOUS'
else ''
)
print( print(
f' Endpoint 0x{endpoint.getAddress():02X}: ' f' Endpoint 0x{endpoint.getAddress():02X}: '
f'{endpoint_type} {endpoint_direction}' f'{endpoint_type} {endpoint_direction}{endpoint_details}'
) )
+21 -4
View File
@@ -1423,6 +1423,9 @@ class ScoLink(utils.CompositeEventEmitter):
acl_connection: Connection acl_connection: Connection
handle: int handle: int
link_type: int link_type: int
rx_packet_length: int
tx_packet_length: int
air_mode: hci.CodecID
sink: Callable[[hci.HCI_SynchronousDataPacket], Any] | None = None sink: Callable[[hci.HCI_SynchronousDataPacket], Any] | None = None
EVENT_DISCONNECTION: ClassVar[str] = "disconnection" EVENT_DISCONNECTION: ClassVar[str] = "disconnection"
@@ -6049,7 +6052,7 @@ class Device(utils.CompositeEventEmitter):
def on_connection_request( def on_connection_request(
self, bd_addr: hci.Address, class_of_device: int, link_type: int self, bd_addr: hci.Address, class_of_device: int, link_type: int
): ):
logger.debug(f'*** Connection request: {bd_addr}') logger.debug(f'*** Connection request: {bd_addr} link_type={link_type}')
# Handle SCO request. # Handle SCO request.
if link_type in ( if link_type in (
@@ -6059,6 +6062,7 @@ class Device(utils.CompositeEventEmitter):
if connection := self.find_connection_by_bd_addr( if connection := self.find_connection_by_bd_addr(
bd_addr, transport=PhysicalTransport.BR_EDR bd_addr, transport=PhysicalTransport.BR_EDR
): ):
connection.emit(self.EVENT_SCO_REQUEST, link_type)
self.emit(self.EVENT_SCO_REQUEST, connection, link_type) self.emit(self.EVENT_SCO_REQUEST, connection, link_type)
else: else:
logger.error(f'SCO request from a non-connected device {bd_addr}') logger.error(f'SCO request from a non-connected device {bd_addr}')
@@ -6418,7 +6422,6 @@ class Device(utils.CompositeEventEmitter):
logger.warning('peer name is not valid UTF-8') logger.warning('peer name is not valid UTF-8')
if connection: if connection:
connection.emit(connection.EVENT_REMOTE_NAME_FAILURE, error) connection.emit(connection.EVENT_REMOTE_NAME_FAILURE, error)
else:
self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error) self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error)
# [Classic only] # [Classic only]
@@ -6436,7 +6439,13 @@ class Device(utils.CompositeEventEmitter):
@with_connection_from_address @with_connection_from_address
@utils.experimental('Only for testing.') @utils.experimental('Only for testing.')
def on_sco_connection( def on_sco_connection(
self, acl_connection: Connection, sco_handle: int, link_type: int self,
acl_connection: Connection,
sco_handle: int,
link_type: int,
rx_packet_length: int,
tx_packet_length: int,
air_mode: int,
) -> None: ) -> None:
logger.debug( logger.debug(
f'*** SCO connected: {acl_connection.peer_address}, ' f'*** SCO connected: {acl_connection.peer_address}, '
@@ -6448,7 +6457,11 @@ class Device(utils.CompositeEventEmitter):
acl_connection=acl_connection, acl_connection=acl_connection,
handle=sco_handle, handle=sco_handle,
link_type=link_type, link_type=link_type,
rx_packet_length=rx_packet_length,
tx_packet_length=tx_packet_length,
air_mode=hci.CodecID(air_mode),
) )
acl_connection.emit(self.EVENT_SCO_CONNECTION, sco_link)
self.emit(self.EVENT_SCO_CONNECTION, sco_link) self.emit(self.EVENT_SCO_CONNECTION, sco_link)
# [Classic only] # [Classic only]
@@ -6459,7 +6472,8 @@ class Device(utils.CompositeEventEmitter):
self, acl_connection: Connection, status: int self, acl_connection: Connection, status: int
) -> None: ) -> None:
logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***') logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
self.emit(self.EVENT_SCO_CONNECTION_FAILURE) acl_connection.emit(self.EVENT_SCO_CONNECTION_FAILURE, status)
self.emit(self.EVENT_SCO_CONNECTION_FAILURE, status)
# [Classic only] # [Classic only]
@host_event_handler @host_event_handler
@@ -6922,15 +6936,18 @@ class Device(utils.CompositeEventEmitter):
@with_connection_from_address @with_connection_from_address
def on_classic_pairing(self, connection: Connection) -> None: def on_classic_pairing(self, connection: Connection) -> None:
connection.emit(connection.EVENT_CLASSIC_PAIRING) connection.emit(connection.EVENT_CLASSIC_PAIRING)
self.emit(connection.EVENT_CLASSIC_PAIRING, connection)
# [Classic only] # [Classic only]
@host_event_handler @host_event_handler
@with_connection_from_address @with_connection_from_address
def on_classic_pairing_failure(self, connection: Connection, status: int) -> None: def on_classic_pairing_failure(self, connection: Connection, status: int) -> None:
connection.emit(connection.EVENT_CLASSIC_PAIRING_FAILURE, status) connection.emit(connection.EVENT_CLASSIC_PAIRING_FAILURE, status)
self.emit(connection.EVENT_CLASSIC_PAIRING_FAILURE, connection, status)
def on_pairing_start(self, connection: Connection) -> None: def on_pairing_start(self, connection: Connection) -> None:
connection.emit(connection.EVENT_PAIRING_START) connection.emit(connection.EVENT_PAIRING_START)
self.emit(connection.EVENT_PAIRING_START, connection)
def on_pairing( def on_pairing(
self, self,
+121 -28
View File
@@ -1721,6 +1721,15 @@ class CodecID(SpecableEnum):
VENDOR_SPECIFIC = 0xFF VENDOR_SPECIFIC = 0xFF
# From Bluetooth Assigned Numbers, 2.10 PCM_Data_Format
class PcmDataFormat(SpecableEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
TWOS_COMPLEMENT = 0x02
SIGN_MAGNITUDE = 0x03
UNSIGNED = 0x04
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class CodingFormat: class CodingFormat:
codec_id: CodecID codec_id: CodecID
@@ -1729,7 +1738,7 @@ class CodingFormat:
@classmethod @classmethod
def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]: def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from( codec_id, company_id, vendor_specific_codec_id = struct.unpack_from(
'<BHH', data, offset '<BHH', data, offset
) )
return offset + 5, cls( return offset + 5, cls(
@@ -1748,6 +1757,61 @@ class CodingFormat:
) )
@dataclasses.dataclass(frozen=True)
class VoiceSetting:
class AirCodingFormat(enum.IntEnum):
CVSD = 0
U_LAW = 1
A_LAW = 2
TRANSPARENT_DATA = 3
class InputSampleSize(enum.IntEnum):
SIZE_8_BITS = 0
SIZE_16_BITS = 1
class InputDataFormat(enum.IntEnum):
ONES_COMPLEMENT = 0
TWOS_COMPLEMENT = 1
SIGN_AND_MAGNITUDE = 2
UNSIGNED = 3
class InputCodingFormat(enum.IntEnum):
LINEAR = 0
U_LAW = 1
A_LAW = 2
RESERVED = 3
air_coding_format: AirCodingFormat = AirCodingFormat.CVSD
linear_pcm_bit_position: int = 0
input_sample_size: InputSampleSize = InputSampleSize.SIZE_8_BITS
input_data_format: InputDataFormat = InputDataFormat.ONES_COMPLEMENT
input_coding_format: InputCodingFormat = InputCodingFormat.LINEAR
@classmethod
def from_int(cls, value: int) -> VoiceSetting:
air_coding_format = cls.AirCodingFormat(value & 0b11)
linear_pcm_bit_position = (value >> 2) & 0b111
input_sample_size = cls.InputSampleSize((value >> 5) & 0b1)
input_data_format = cls.InputDataFormat((value >> 6) & 0b11)
input_coding_format = cls.InputCodingFormat((value >> 8) & 0b11)
return cls(
air_coding_format=air_coding_format,
linear_pcm_bit_position=linear_pcm_bit_position,
input_sample_size=input_sample_size,
input_data_format=input_data_format,
input_coding_format=input_coding_format,
)
def __int__(self) -> int:
return (
self.air_coding_format
| (self.linear_pcm_bit_position << 2)
| (self.input_sample_size << 5)
| (self.input_data_format << 6)
| (self.input_coding_format << 8)
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_Constant: class HCI_Constant:
@staticmethod @staticmethod
@@ -2008,7 +2072,7 @@ class HCI_Object:
) )
continue continue
(field_name, field_type) = object_field field_name, field_type = object_field
result += HCI_Object.serialize_field(hci_object[field_name], field_type) result += HCI_Object.serialize_field(hci_object[field_name], field_type)
return bytes(result) return bytes(result)
@@ -2886,6 +2950,23 @@ class HCI_Read_Clock_Offset_Command(HCI_AsyncCommand):
connection_handle: int = field(metadata=metadata(2)) connection_handle: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_Accept_Synchronous_Connection_Request_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.1.27 Accept Synchronous Connection Request Command
'''
bd_addr: Address = field(metadata=metadata(Address.parse_address))
transmit_bandwidth: int = field(metadata=metadata(4))
receive_bandwidth: int = field(metadata=metadata(4))
max_latency: int = field(metadata=metadata(2))
voice_setting: int = field(metadata=metadata(2))
retransmission_effort: int = field(metadata=metadata(1))
packet_type: int = field(metadata=metadata(2))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command @HCI_Command.command
@dataclasses.dataclass @dataclasses.dataclass
@@ -3034,8 +3115,8 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_AsyncCommand):
output_coding_format: int = field(metadata=metadata(CodingFormat.parse_from_bytes)) output_coding_format: int = field(metadata=metadata(CodingFormat.parse_from_bytes))
input_coded_data_size: int = field(metadata=metadata(2)) input_coded_data_size: int = field(metadata=metadata(2))
output_coded_data_size: int = field(metadata=metadata(2)) output_coded_data_size: int = field(metadata=metadata(2))
input_pcm_data_format: int = field(metadata=metadata(1)) input_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
output_pcm_data_format: int = field(metadata=metadata(1)) output_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
input_pcm_sample_payload_msb_position: int = field(metadata=metadata(1)) input_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
output_pcm_sample_payload_msb_position: int = field(metadata=metadata(1)) output_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
input_data_path: int = field(metadata=metadata(1)) input_data_path: int = field(metadata=metadata(1))
@@ -3046,13 +3127,6 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_AsyncCommand):
packet_type: int = field(metadata=metadata(2)) packet_type: int = field(metadata=metadata(2))
retransmission_effort: int = field(metadata=metadata(1)) retransmission_effort: int = field(metadata=metadata(1))
class PcmDataFormat(SpecableEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
TWOS_COMPLEMENT = 0x02
SIGN_MAGNITUDE = 0x03
UNSIGNED = 0x04
class DataPath(SpecableEnum): class DataPath(SpecableEnum):
HCI = 0x00 HCI = 0x00
PCM = 0x01 PCM = 0x01
@@ -3099,8 +3173,8 @@ class HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(HCI_AsyncComman
output_coding_format: int = field(metadata=metadata(CodingFormat.parse_from_bytes)) output_coding_format: int = field(metadata=metadata(CodingFormat.parse_from_bytes))
input_coded_data_size: int = field(metadata=metadata(2)) input_coded_data_size: int = field(metadata=metadata(2))
output_coded_data_size: int = field(metadata=metadata(2)) output_coded_data_size: int = field(metadata=metadata(2))
input_pcm_data_format: int = field(metadata=metadata(1)) input_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
output_pcm_data_format: int = field(metadata=metadata(1)) output_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
input_pcm_sample_payload_msb_position: int = field(metadata=metadata(1)) input_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
output_pcm_sample_payload_msb_position: int = field(metadata=metadata(1)) output_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
input_data_path: int = field(metadata=metadata(1)) input_data_path: int = field(metadata=metadata(1))
@@ -3944,6 +4018,23 @@ class HCI_Read_Local_OOB_Extended_Data_Command(
''' '''
# -----------------------------------------------------------------------------
@HCI_SyncCommand.sync_command(HCI_StatusReturnParameters)
@dataclasses.dataclass
class HCI_Configure_Data_Path_Command(HCI_SyncCommand[HCI_StatusReturnParameters]):
'''
See Bluetooth spec @ 7.3.101 Configure Data Path Command
'''
class DataPathDirection(SpecableEnum):
INPUT = 0x00
OUTPUT = 0x01
data_path_direction: DataPathDirection = field(metadata=metadata(1))
data_path_id: int = field(metadata=metadata(1))
vendor_specific_config: bytes = field(metadata=metadata('*'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclasses.dataclass @dataclasses.dataclass
class HCI_Read_Local_Version_Information_ReturnParameters(HCI_StatusReturnParameters): class HCI_Read_Local_Version_Information_ReturnParameters(HCI_StatusReturnParameters):
@@ -7334,7 +7425,7 @@ class HCI_Connection_Complete_Event(HCI_Event):
status: int = field(metadata=metadata(STATUS_SPEC)) status: int = field(metadata=metadata(STATUS_SPEC))
connection_handle: int = field(metadata=metadata(2)) connection_handle: int = field(metadata=metadata(2))
bd_addr: Address = field(metadata=metadata(Address.parse_address)) bd_addr: Address = field(metadata=metadata(Address.parse_address))
link_type: int = field(metadata=LinkType.type_metadata(1)) link_type: LinkType = field(metadata=LinkType.type_metadata(1))
encryption_enabled: int = field(metadata=metadata(1)) encryption_enabled: int = field(metadata=metadata(1))
@@ -7730,12 +7821,6 @@ class HCI_Synchronous_Connection_Complete_Event(HCI_Event):
SCO = 0x00 SCO = 0x00
ESCO = 0x02 ESCO = 0x02
class AirMode(SpecableEnum):
U_LAW_LOG = 0x00
A_LAW_LOG_AIR_MORE = 0x01
CVSD = 0x02
TRANSPARENT_DATA = 0x03
status: int = field(metadata=metadata(STATUS_SPEC)) status: int = field(metadata=metadata(STATUS_SPEC))
connection_handle: int = field(metadata=metadata(2)) connection_handle: int = field(metadata=metadata(2))
bd_addr: Address = field(metadata=metadata(Address.parse_address)) bd_addr: Address = field(metadata=metadata(Address.parse_address))
@@ -7744,7 +7829,7 @@ class HCI_Synchronous_Connection_Complete_Event(HCI_Event):
retransmission_window: int = field(metadata=metadata(1)) retransmission_window: int = field(metadata=metadata(1))
rx_packet_length: int = field(metadata=metadata(2)) rx_packet_length: int = field(metadata=metadata(2))
tx_packet_length: int = field(metadata=metadata(2)) tx_packet_length: int = field(metadata=metadata(2))
air_mode: int = field(metadata=AirMode.type_metadata(1)) air_mode: int = field(metadata=CodecID.type_metadata(1))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -7976,7 +8061,9 @@ class HCI_AclDataPacket(HCI_Packet):
bc_flag = (h >> 14) & 3 bc_flag = (h >> 14) & 3
data = packet[5:] data = packet[5:]
if len(data) != data_total_length: if len(data) != data_total_length:
raise InvalidPacketError('invalid packet length') raise InvalidPacketError(
f'invalid packet length {len(data)} != {data_total_length}'
)
return cls( return cls(
connection_handle=connection_handle, connection_handle=connection_handle,
pb_flag=pb_flag, pb_flag=pb_flag,
@@ -8009,10 +8096,16 @@ class HCI_SynchronousDataPacket(HCI_Packet):
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
''' '''
class Status(enum.IntEnum):
CORRECTLY_RECEIVED_DATA = 0b00
POSSIBLY_INVALID_DATA = 0b01
NO_DATA = 0b10
DATA_PARTIALLY_LOST = 0b11
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
connection_handle: int connection_handle: int
packet_status: int packet_status: Status
data_total_length: int data_total_length: int
data: bytes data: bytes
@@ -8021,7 +8114,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
# Read the header # Read the header
h, data_total_length = struct.unpack_from('<HB', packet, 1) h, data_total_length = struct.unpack_from('<HB', packet, 1)
connection_handle = h & 0xFFF connection_handle = h & 0xFFF
packet_status = (h >> 12) & 0b11 packet_status = cls.Status((h >> 12) & 0b11)
data = packet[4:] data = packet[4:]
if len(data) != data_total_length: if len(data) != data_total_length:
raise InvalidPacketError( raise InvalidPacketError(
@@ -8045,7 +8138,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
return ( return (
f'{color("SCO", "blue")}: ' f'{color("SCO", "blue")}: '
f'handle=0x{self.connection_handle:04x}, ' f'handle=0x{self.connection_handle:04x}, '
f'ps={self.packet_status}, ' f'ps={self.packet_status.name}, '
f'data_total_length={self.data_total_length}, ' f'data_total_length={self.data_total_length}, '
f'data={self.data.hex()}' f'data={self.data.hex()}'
) )
@@ -8073,8 +8166,8 @@ class HCI_IsoDataPacket(HCI_Packet):
def __post_init__(self) -> None: def __post_init__(self) -> None:
self.ts_flag = self.time_stamp is not None self.ts_flag = self.time_stamp is not None
@staticmethod @classmethod
def from_bytes(packet: bytes) -> HCI_IsoDataPacket: def from_bytes(cls, packet: bytes) -> HCI_IsoDataPacket:
time_stamp: int | None = None time_stamp: int | None = None
packet_sequence_number: int | None = None packet_sequence_number: int | None = None
iso_sdu_length: int | None = None iso_sdu_length: int | None = None
@@ -8103,7 +8196,7 @@ class HCI_IsoDataPacket(HCI_Packet):
pos += 4 pos += 4
iso_sdu_fragment = packet[pos:] iso_sdu_fragment = packet[pos:]
return HCI_IsoDataPacket( return cls(
connection_handle=connection_handle, connection_handle=connection_handle,
pb_flag=pb_flag, pb_flag=pb_flag,
ts_flag=ts_flag, ts_flag=ts_flag,
+16 -19
View File
@@ -44,6 +44,7 @@ from bumble.hci import (
CodecID, CodecID,
CodingFormat, CodingFormat,
HCI_Enhanced_Setup_Synchronous_Connection_Command, HCI_Enhanced_Setup_Synchronous_Connection_Command,
PcmDataFormat,
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -177,7 +178,7 @@ class AgFeature(enum.IntFlag):
VOICE_RECOGNITION_TEXT = 0x2000 VOICE_RECOGNITION_TEXT = 0x2000
class AudioCodec(enum.IntEnum): class AudioCodec(utils.OpenIntEnum):
""" """
Audio Codec IDs (normative). Audio Codec IDs (normative).
@@ -189,7 +190,7 @@ class AudioCodec(enum.IntEnum):
LC3_SWB = 0x03 # Support for LC3-SWB audio codec LC3_SWB = 0x03 # Support for LC3-SWB audio codec
class HfIndicator(enum.IntEnum): class HfIndicator(utils.OpenIntEnum):
""" """
HF Indicators (normative). HF Indicators (normative).
@@ -218,7 +219,7 @@ class CallHoldOperation(enum.Enum):
) )
class ResponseHoldStatus(enum.IntEnum): class ResponseHoldStatus(utils.OpenIntEnum):
""" """
Response Hold status (normative). Response Hold status (normative).
@@ -246,7 +247,7 @@ class AgIndicator(enum.Enum):
BATTERY_CHARGE = 'battchg' BATTERY_CHARGE = 'battchg'
class CallSetupAgIndicator(enum.IntEnum): class CallSetupAgIndicator(utils.OpenIntEnum):
""" """
Values for the Call Setup AG indicator (normative). Values for the Call Setup AG indicator (normative).
@@ -259,7 +260,7 @@ class CallSetupAgIndicator(enum.IntEnum):
REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call
class CallHeldAgIndicator(enum.IntEnum): class CallHeldAgIndicator(utils.OpenIntEnum):
""" """
Values for the Call Held AG indicator (normative). Values for the Call Held AG indicator (normative).
@@ -273,7 +274,7 @@ class CallHeldAgIndicator(enum.IntEnum):
CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call
class CallInfoDirection(enum.IntEnum): class CallInfoDirection(utils.OpenIntEnum):
""" """
Call Info direction (normative). Call Info direction (normative).
@@ -284,7 +285,7 @@ class CallInfoDirection(enum.IntEnum):
MOBILE_TERMINATED_CALL = 1 MOBILE_TERMINATED_CALL = 1
class CallInfoStatus(enum.IntEnum): class CallInfoStatus(utils.OpenIntEnum):
""" """
Call Info status (normative). Call Info status (normative).
@@ -299,7 +300,7 @@ class CallInfoStatus(enum.IntEnum):
WAITING = 5 WAITING = 5
class CallInfoMode(enum.IntEnum): class CallInfoMode(utils.OpenIntEnum):
""" """
Call Info mode (normative). Call Info mode (normative).
@@ -312,7 +313,7 @@ class CallInfoMode(enum.IntEnum):
UNKNOWN = 9 UNKNOWN = 9
class CallInfoMultiParty(enum.IntEnum): class CallInfoMultiParty(utils.OpenIntEnum):
""" """
Call Info Multi-Party state (normative). Call Info Multi-Party state (normative).
@@ -399,7 +400,7 @@ class CallLineIdentification:
) )
class VoiceRecognitionState(enum.IntEnum): class VoiceRecognitionState(utils.OpenIntEnum):
""" """
vrec values provided in AT+BVRA command. vrec values provided in AT+BVRA command.
@@ -412,7 +413,7 @@ class VoiceRecognitionState(enum.IntEnum):
ENHANCED_READY = 2 ENHANCED_READY = 2
class CmeError(enum.IntEnum): class CmeError(utils.OpenIntEnum):
""" """
CME ERROR codes (partial listed). CME ERROR codes (partial listed).
@@ -1606,7 +1607,7 @@ class AgProtocol(utils.EventEmitter):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ProfileVersion(enum.IntEnum): class ProfileVersion(utils.OpenIntEnum):
""" """
Profile version (normative). Profile version (normative).
@@ -1954,12 +1955,8 @@ class EscoParameters:
output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM) output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
input_coded_data_size: int = 16 input_coded_data_size: int = 16
output_coded_data_size: int = 16 output_coded_data_size: int = 16
input_pcm_data_format: ( input_pcm_data_format: PcmDataFormat = PcmDataFormat.TWOS_COMPLEMENT
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat output_pcm_data_format: PcmDataFormat = PcmDataFormat.TWOS_COMPLEMENT
) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
output_pcm_data_format: (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat
) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
input_pcm_sample_payload_msb_position: int = 0 input_pcm_sample_payload_msb_position: int = 0
output_pcm_sample_payload_msb_position: int = 0 output_pcm_sample_payload_msb_position: int = 0
input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = ( input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = (
@@ -2058,6 +2055,7 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
max_latency=0x0008, max_latency=0x0008,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
@@ -2073,7 +2071,6 @@ _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
max_latency=0x000D, max_latency=0x000D,
packet_type=( packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
+118 -2
View File
@@ -247,6 +247,7 @@ class Host(utils.EventEmitter):
bis_links: dict[int, IsoLink] bis_links: dict[int, IsoLink]
sco_links: dict[int, ScoLink] sco_links: dict[int, ScoLink]
bigs: dict[int, set[int]] bigs: dict[int, set[int]]
link_ts_flags: dict[int, int]
acl_packet_queue: DataPacketQueue | None = None acl_packet_queue: DataPacketQueue | None = None
le_acl_packet_queue: DataPacketQueue | None = None le_acl_packet_queue: DataPacketQueue | None = None
iso_packet_queue: DataPacketQueue | None = None iso_packet_queue: DataPacketQueue | None = None
@@ -269,6 +270,7 @@ class Host(utils.EventEmitter):
self.bis_links = {} # BIS links, by connection handle self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles self.bigs = {} # BIG Handle to BIS Handles
self.link_ts_flags = {} # TS_Flag for ISO links, by handle
self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None
self.pending_response: ( self.pending_response: (
asyncio.Future[ asyncio.Future[
@@ -486,6 +488,7 @@ class Host(utils.EventEmitter):
hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT, hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT, hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT, hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT, hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT, hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
hci.HCI_LE_SCAN_TIMEOUT_EVENT, hci.HCI_LE_SCAN_TIMEOUT_EVENT,
@@ -686,6 +689,8 @@ class Host(utils.EventEmitter):
self.pending_response, timeout=response_timeout self.pending_response, timeout=response_timeout
) )
return response return response
except asyncio.TimeoutError:
raise
except Exception: except Exception:
logger.exception(color("!!! Exception while sending command:", "red")) logger.exception(color("!!! Exception while sending command:", "red"))
raise raise
@@ -864,7 +869,7 @@ class Host(utils.EventEmitter):
self.send_hci_packet( self.send_hci_packet(
hci.HCI_SynchronousDataPacket( hci.HCI_SynchronousDataPacket(
connection_handle=connection_handle, connection_handle=connection_handle,
packet_status=0, packet_status=hci.HCI_SynchronousDataPacket.Status.CORRECTLY_RECEIVED_DATA,
data_total_length=len(sdu), data_total_length=len(sdu),
data=sdu, data=sdu,
) )
@@ -1026,6 +1031,82 @@ class Host(utils.EventEmitter):
# Look for the connection to which this data belongs # Look for the connection to which this data belongs
if connection := self.connections.get(packet.connection_handle): if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet) connection.on_hci_acl_data_packet(packet)
return
# WORKAROUND: Some controllers (e.g. Intel BE200) send ISO data wrapped in ACL packets
# using the CIS handle.
is_cis = packet.connection_handle in self.cis_links
is_bis = packet.connection_handle in self.bis_links
if is_cis or is_bis:
logger.debug(
f"Received ISO data wrapped in ACL packet for handle 0x{packet.connection_handle:04X}"
)
payload = packet.data
ts_flag = self.link_ts_flags.get(packet.connection_handle)
if ts_flag is None:
# Learn TS flag from the first packet on this link
if is_bis:
# BIS packets always have Timestamp according to spec
ts_flag = 1
elif len(payload) < 8:
# Too short to have 8-byte header (TS), must be No TS
ts_flag = 0
else:
psn_no_ts = int.from_bytes(payload[0:2], 'little')
psn_has_ts = int.from_bytes(payload[4:6], 'little')
if psn_has_ts == 0:
ts_flag = 1
elif psn_no_ts == 0:
ts_flag = 0
else:
# Fallback heuristic
ts_flag = 1 if psn_has_ts < psn_no_ts else 0
self.link_ts_flags[packet.connection_handle] = ts_flag
logger.info(
f"Learned TS_Flag = {ts_flag} for handle 0x{packet.connection_handle:04X}"
)
if ts_flag:
header_size = 8
sdu_length_offset = 6
else:
header_size = 4
sdu_length_offset = 2
pb_flag = 0b10
if len(payload) >= header_size:
sdu_length = int.from_bytes(
payload[sdu_length_offset : sdu_length_offset + 2], 'little'
)
if sdu_length == len(payload) - header_size:
pb_flag = 0b10 # Complete SDU
else:
pb_flag = 0b00 # First fragment
else:
pb_flag = 0b01 # Continuation
ts_flag = 0
# Reconstruct the raw ISO packet (excluding packet indicator 0x05)
pdu_info = packet.connection_handle | (pb_flag << 12) | (ts_flag << 14)
header = bytes(
[
pdu_info & 0xFF,
(pdu_info >> 8) & 0xFF,
len(payload) & 0xFF,
(len(payload) >> 8) & 0xFF,
]
)
raw_iso_packet = header + payload
try:
iso_packet = hci.HCI_IsoDataPacket.from_bytes(
bytes([hci.HCI_ISO_DATA_PACKET]) + raw_iso_packet
)
self.on_hci_iso_data_packet(iso_packet)
except Exception as e:
logger.warning(f"Failed to reconstruct ISO packet from ACL: {e}")
def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None: def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None:
# Experimental # Experimental
@@ -1176,11 +1257,28 @@ class Host(utils.EventEmitter):
def on_hci_connection_complete_event( def on_hci_connection_complete_event(
self, event: hci.HCI_Connection_Complete_Event self, event: hci.HCI_Connection_Complete_Event
): ):
if event.link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO:
# Pass this on to the synchronous connection handler
forwarded_event = hci.HCI_Synchronous_Connection_Complete_Event(
status=event.status,
connection_handle=event.connection_handle,
bd_addr=event.bd_addr,
link_type=event.link_type,
transmission_interval=0,
retransmission_window=0,
rx_packet_length=0,
tx_packet_length=0,
air_mode=0,
)
self.on_hci_synchronous_connection_complete_event(forwarded_event)
return
if event.status == hci.HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
# Create/update the connection # Create/update the connection
logger.debug( logger.debug(
f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] ' f'### BR/EDR ACL CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.bd_addr} ' f'{event.bd_addr} '
f'{event.link_type.name}'
) )
connection = self.connections.get(event.connection_handle) connection = self.connections.get(event.connection_handle)
@@ -1232,6 +1330,7 @@ class Host(utils.EventEmitter):
self.emit('disconnection', handle, event.reason) self.emit('disconnection', handle, event.reason)
# Remove the handle reference # Remove the handle reference
self.link_ts_flags.pop(handle, None)
_ = ( _ = (
self.connections.pop(handle, 0) self.connections.pop(handle, 0)
or self.cis_links.pop(handle, 0) or self.cis_links.pop(handle, 0)
@@ -1352,6 +1451,20 @@ class Host(utils.EventEmitter):
event.advertiser_clock_accuracy, event.advertiser_clock_accuracy,
) )
def on_hci_le_periodic_advertising_sync_established_v2_event(
self, event: hci.HCI_LE_Periodic_Advertising_Sync_Established_V2_Event
):
self.emit(
'periodic_advertising_sync_establishment',
event.status,
event.sync_handle,
event.advertising_sid,
event.advertiser_address,
event.advertiser_phy,
event.periodic_advertising_interval,
event.advertiser_clock_accuracy,
)
def on_hci_le_periodic_advertising_sync_lost_event( def on_hci_le_periodic_advertising_sync_lost_event(
self, event: hci.HCI_LE_Periodic_Advertising_Sync_Lost_Event self, event: hci.HCI_LE_Periodic_Advertising_Sync_Lost_Event
): ):
@@ -1580,6 +1693,9 @@ class Host(utils.EventEmitter):
event.bd_addr, event.bd_addr,
event.connection_handle, event.connection_handle,
event.link_type, event.link_type,
event.rx_packet_length,
event.tx_packet_length,
event.air_mode,
) )
else: else:
logger.debug(f'### SCO CONNECTION FAILED: {event.status}') logger.debug(f'### SCO CONNECTION FAILED: {event.status}')
+1 -1
View File
@@ -110,7 +110,7 @@ RFCOMM_DEFAULT_L2CAP_MTU = 2048
RFCOMM_DEFAULT_INITIAL_CREDITS = 7 RFCOMM_DEFAULT_INITIAL_CREDITS = 7
RFCOMM_DEFAULT_MAX_CREDITS = 32 RFCOMM_DEFAULT_MAX_CREDITS = 32
RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2 RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 RFCOMM_DEFAULT_MAX_FRAME_SIZE = 1000
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1 RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30 RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
+3
View File
@@ -104,6 +104,9 @@ async def open_pyusb_transport(spec: str) -> Transport:
0, 0,
packet[1:], packet[1:],
) )
elif packet_type == hci.HCI_ISO_DATA_PACKET:
# Workaround: Send ISO packets over Bulk Out
self.device.write(USB_ENDPOINT_ACL_OUT, packet[1:])
else: else:
logger.warning( logger.warning(
color(f'unsupported packet type {packet_type}', 'red') color(f'unsupported packet type {packet_type}', 'red')
+557 -190
View File
@@ -22,6 +22,8 @@ import ctypes
import logging import logging
import platform import platform
import threading import threading
from collections.abc import Callable
from typing import Any
import usb1 import usb1
@@ -35,6 +37,32 @@ from bumble.transport.common import BaseSource, Transport, TransportInitError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
# pylint: disable=invalid-name
USB_RECIPIENT_DEVICE = 0x00
USB_REQUEST_TYPE_CLASS = 0x01 << 5
USB_DEVICE_CLASS_DEVICE = 0x00
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
USB_ENDPOINT_TRANSFER_TYPE_ISOCHRONOUS = 0x01
USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
USB_ENDPOINT_IN = 0x80
USB_BT_HCI_CLASS_TUPLE = (
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER,
)
MAX_SCO_PACKET_SIZE = 1024
MAX_SCO_IN_PACKETS = 128
NUMBER_OF_SCO_IN_TRANSFERS = 2
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def load_libusb(): def load_libusb():
''' '''
@@ -60,65 +88,166 @@ def load_libusb():
usb1.loadLibrary(libusb_dll) usb1.loadLibrary(libusb_dll)
async def open_usb_transport(spec: str) -> Transport: def find_endpoints(device, forced_mode, sco_alternate=None):
''' '''Look for the interfaces with the right class and endpoints'''
Open a USB transport. # pylint: disable-next=too-many-nested-blocks
The moniker string has this syntax: for configuration_index, configuration in enumerate(device):
either <index> or # Select the interface and endpoints for ACL
<vendor>:<product> or acl_interface = None
<vendor>:<product>/<serial-number>] or bulk_in = None
<vendor>:<product>#<index> bulk_out = None
With <index> as the 0-based index to select amongst all the devices that appear interrupt_in = None
to be supporting Bluetooth HCI (0 being the first one), or for interface in configuration:
Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The for setting in interface:
/<serial-number> suffix or #<index> suffix max be specified when more than one if acl_interface is not None:
device with the same vendor and product identifiers are present. continue
In addition, if the moniker ends with the symbol "!", the device will be used in if (
"forced" mode: not forced_mode
the first USB interface of the device will be used, regardless of the interface and (
class/subclass. setting.getClass(),
This may be useful for some devices that use a custom class/subclass but may setting.getSubClass(),
nonetheless work as-is. setting.getProtocol(),
)
!= USB_BT_HCI_CLASS_TUPLE
):
continue
Examples: for endpoint in setting:
0 --> the first BT USB dongle attributes = endpoint.getAttributes()
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901 address = endpoint.getAddress()
04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901 if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and if address & USB_ENDPOINT_IN:
serial number 00E04C239987 if bulk_in is None:
usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode. bulk_in = endpoint
''' else:
if bulk_out is None:
bulk_out = endpoint
elif attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT:
if address & USB_ENDPOINT_IN and interrupt_in is None:
interrupt_in = endpoint
# pylint: disable=invalid-name # Only keep complete sets (endpoints that should be under the
USB_RECIPIENT_DEVICE = 0x00 # same interface)
USB_REQUEST_TYPE_CLASS = 0x01 << 5 if (
USB_DEVICE_CLASS_DEVICE = 0x00 bulk_in is not None
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0 and bulk_out is not None
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01 and interrupt_in is not None
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01 ):
USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02 acl_interface = setting
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
USB_ENDPOINT_IN = 0x80
USB_BT_HCI_CLASS_TUPLE = ( # Select the interface and endpoints for SCO
USB_DEVICE_CLASS_WIRELESS_CONTROLLER, sco_interface = None
USB_DEVICE_SUBCLASS_RF_CONTROLLER, max_packet_size = (0, 0)
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER, isochronous_in = None
isochronous_out = None
for interface in configuration:
if sco_interface is not None:
continue
if sco_alternate is None:
continue
for setting in interface:
if (
not forced_mode
and (
setting.getClass(),
setting.getSubClass(),
setting.getProtocol(),
)
!= USB_BT_HCI_CLASS_TUPLE
):
continue
if (
sco_alternate != 0
and setting.getAlternateSetting() != sco_alternate
):
continue
isochronous_in = None
isochronous_out = None
for endpoint in setting:
if (
endpoint.getAttributes() & 0x03
== USB_ENDPOINT_TRANSFER_TYPE_ISOCHRONOUS
):
if endpoint.getMaxPacketSize() > 0:
if endpoint.getAddress() & USB_ENDPOINT_IN:
if (
isochronous_in is None
or endpoint.getMaxPacketSize()
> (isochronous_in.getMaxPacketSize())
):
isochronous_in = endpoint
else:
if (
isochronous_out is None
or endpoint.getMaxPacketSize()
> (isochronous_out.getMaxPacketSize())
):
isochronous_out = endpoint
if isochronous_in is not None and isochronous_out is not None:
if (
sco_interface is None
or sco_alternate == 0
and (
isochronous_in.getMaxPacketSize(),
isochronous_out.getMaxPacketSize(),
)
> max_packet_size
):
sco_interface = setting
max_packet_size = (
isochronous_in.getMaxPacketSize(),
isochronous_out.getMaxPacketSize(),
) )
READ_SIZE = 4096 # Return if we found at least a compatible ACL interface
if acl_interface is not None:
return (
configuration_index + 1,
acl_interface,
sco_interface,
interrupt_in,
bulk_in,
isochronous_in,
bulk_out,
isochronous_out,
)
logger.debug(f'skipping configuration {configuration_index + 1}')
return None
class UsbPacketSink: class UsbPacketSink:
def __init__(self, device, acl_out): def __init__(self, device, bulk_out, isochronous_out) -> None:
self.device = device self.device = device
self.acl_out = acl_out self.bulk_out = bulk_out
self.acl_out_transfer = device.getTransfer() self.isochronous_out = isochronous_out
self.acl_out_transfer_ready = asyncio.Semaphore(1) self.bulk_or_control_out_transfer = device.getTransfer()
self.packets = asyncio.Queue[bytes]() # Queue of packets waiting to be sent self.isochronous_out_transfer = (
device.getTransfer(
iso_packets=(
MAX_SCO_PACKET_SIZE // isochronous_out.getMaxPacketSize()
if isochronous_out.getMaxPacketSize()
else 1
)
)
if isochronous_out is not None
else None
)
self.out_transfer_ready = asyncio.Semaphore(1)
self.packets: asyncio.Queue[bytes] = (
asyncio.Queue()
) # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.queue_task = None self.queue_task = None
self.cancel_done = self.loop.create_future()
self.closed = False self.closed = False
def start(self): def start(self):
@@ -137,13 +266,10 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.put_nowait(packet) self.packets.put_nowait(packet)
def transfer_callback(self, transfer): def transfer_callback(self, transfer):
self.loop.call_soon_threadsafe(self.acl_out_transfer_ready.release) self.loop.call_soon_threadsafe(self.out_transfer_ready.release)
status = transfer.getStatus() status = transfer.getStatus()
# pylint: disable=no-member logger.debug(f"OUT CALLBACK: {status}")
if status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
return
if status != usb1.TRANSFER_COMPLETED: if status != usb1.TRANSFER_COMPLETED:
logger.warning( logger.warning(
@@ -154,106 +280,263 @@ async def open_usb_transport(spec: str) -> Transport:
) )
async def process_queue(self): async def process_queue(self):
while True: while not self.closed:
# Wait for a packet to transfer. # Wait for a packet to transfer.
packet = await self.packets.get() packet = await self.packets.get()
# Wait until we can start a transfer. # Wait until we can start a transfer.
await self.acl_out_transfer_ready.acquire() await self.out_transfer_ready.acquire()
# Transfer the packet. # Transfer the packet.
packet_type = packet[0] packet_type = packet[0]
packet_payload = packet[1:]
submitted = False
try:
if packet_type == hci.HCI_ACL_DATA_PACKET: if packet_type == hci.HCI_ACL_DATA_PACKET:
self.acl_out_transfer.setBulk( self.bulk_or_control_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback self.bulk_out.getAddress(),
packet_payload,
callback=self.transfer_callback,
) )
self.acl_out_transfer.submit() self.bulk_or_control_out_transfer.submit()
submitted = True
elif packet_type == hci.HCI_COMMAND_PACKET: elif packet_type == hci.HCI_COMMAND_PACKET:
self.acl_out_transfer.setControl( self.bulk_or_control_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0, 0,
0, 0,
0, 0,
packet[1:], packet_payload,
callback=self.transfer_callback, callback=self.transfer_callback,
) )
self.acl_out_transfer.submit() self.bulk_or_control_out_transfer.submit()
submitted = True
elif packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
if self.isochronous_out_transfer is None:
logger.warning(
color('isochronous packets not supported', 'red')
)
self.out_transfer_ready.release()
continue
# Setup a list of packet lengths, each up to the max packet size
iso_max_packet_size = self.isochronous_out.getMaxPacketSize()
iso_packet_count = (
len(packet_payload) + iso_max_packet_size - 1
) // iso_max_packet_size
iso_packet_lengths = [iso_max_packet_size] * (iso_packet_count - 1)
iso_packet_lengths.append(
len(packet_payload) - sum(iso_packet_lengths)
)
# Set up and submit the isochronous transfer
self.isochronous_out_transfer.setIsochronous(
self.isochronous_out.getAddress(),
packet_payload,
callback=self.transfer_callback,
iso_transfer_length_list=iso_packet_lengths,
)
self.isochronous_out_transfer.submit()
submitted = True
elif packet_type == hci.HCI_ISO_DATA_PACKET:
if self.isochronous_out_transfer is None:
# Workaround: Send ISO packets over Bulk Out when Isochronous endpoints are not enabled
self.bulk_or_control_out_transfer.setBulk(
self.bulk_out.getAddress(),
packet_payload,
callback=self.transfer_callback,
)
self.bulk_or_control_out_transfer.submit()
submitted = True
else:
logger.warning(
color(
'ISO packets over Isochronous endpoints not supported yet',
'red',
)
)
self.out_transfer_ready.release()
continue
else: else:
logger.warning( logger.warning(
color(f'unsupported packet type {packet_type}', 'red') color(f'unsupported packet type {packet_type}', 'red')
) )
except Exception as error:
logger.warning(f'!!! exception while submitting transfer: {error}')
if not submitted:
self.out_transfer_ready.release()
def close(self): def close(self):
self.closed = True self.closed = True
if self.queue_task:
self.queue_task.cancel()
async def terminate(self): async def terminate(self):
if not self.closed:
self.close() self.close()
if self.queue_task:
self.queue_task.cancel()
# Empty the packet queue so that we don't send any more data # Empty the packet queue so that we don't send any more data
while not self.packets.empty(): while not self.packets.empty():
self.packets.get_nowait() self.packets.get_nowait()
# If we have a transfer in flight, cancel it # If we have transfers in flight, cancel them
if self.acl_out_transfer.isSubmitted(): for transfer in (
self.bulk_or_control_out_transfer,
self.isochronous_out_transfer,
):
if transfer is None:
continue
if transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have # Try to cancel the transfer, but that may fail because it may have
# already completed # already completed
try: try:
self.acl_out_transfer.cancel() transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...') logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done await self.out_transfer_ready.acquire()
logger.debug('OUT transfer cancellation done') logger.debug('OUT transfer cancellation done')
except usb1.USBError: except usb1.USBError as error:
logger.debug('OUT transfer likely already completed') logger.debug(f'OUT transfer likely already completed ({error})')
try:
transfer.close()
except usb1.USBError as error:
logger.warning(f'failed to close transfer ({error})')
READ_SIZE = 4096
class PacketSplitter:
"""Splitter than can parse a byte stream and extract packets that consist of a
header and a body, where the header includes an n-byte 'length' field at a
certain offset.
Extracted packets are emitted by calling a function passed to the constructor,
with the full packet (header + body) as argument.
"""
def __init__(
self, length_offset: int, length_size: int, emit: Callable[[bytes], Any]
) -> None:
self.emit = emit
self.packet = b''
self.length_offset = length_offset
self.length_size = length_size
self.header_size = length_offset + length_size
def feed(self, data: bytes) -> None:
while data:
# Accumulate until we have a complete header
if (bytes_needed := self.header_size - len(self.packet)) > 0:
self.packet += data[:bytes_needed]
data = data[bytes_needed:]
if len(self.packet) < self.header_size:
continue
packet_length = self.header_size + int.from_bytes(
self.packet[self.length_offset : self.length_offset + self.length_size],
'little',
)
bytes_needed = packet_length - len(self.packet)
self.packet += data[:bytes_needed]
data = data[bytes_needed:]
if len(self.packet) == packet_length:
# Packet complete
self.emit(self.packet)
self.packet = b''
class ScoPacketSplitter(PacketSplitter):
def __init__(self, emit: Callable[[bytes], Any]) -> None:
# The length field is 1 byte at offset 2 in the HCI SCO packet header
super().__init__(length_offset=2, length_size=1, emit=emit)
class EventPacketSplitter(PacketSplitter):
def __init__(self, emit: Callable[[bytes], Any]) -> None:
# The length field is 1 byte at offset 1 in the HCI Event packet header
super().__init__(length_offset=1, length_size=1, emit=emit)
class AclPacketSplitter(PacketSplitter):
def __init__(self, emit: Callable[[bytes], Any]) -> None:
# The length field is 2 bytes at offset 2 in the HCI ACL packet header
super().__init__(length_offset=2, length_size=2, emit=emit)
class UsbPacketSource(asyncio.Protocol, BaseSource): class UsbPacketSource(asyncio.Protocol, BaseSource):
def __init__(self, device, metadata, acl_in, events_in): def __init__(self, device, metadata, interrupt_in, bulk_in, isochronous_in):
super().__init__() super().__init__()
self.device = device self.device = device
self.metadata = metadata self.metadata = metadata
self.acl_in = acl_in self.interrupt_in = interrupt_in
self.acl_in_transfer = None self.interrupt_in_transfer = None
self.events_in = events_in self.bulk_in = bulk_in
self.events_in_transfer = None self.bulk_in_transfer = None
self.isochronous_in = isochronous_in
self.isochronous_in_transfers = []
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.dequeue_task = None self.dequeue_task = None
self.cancel_done = { self.done = {}
hci.HCI_EVENT_PACKET: self.loop.create_future(), self.splitters = {
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(), hci.HCI_EVENT_PACKET: EventPacketSplitter(
lambda packet: self.queue_packet(hci.HCI_EVENT_PACKET, packet)
),
hci.HCI_ACL_DATA_PACKET: AclPacketSplitter(
lambda packet: self.queue_packet(hci.HCI_ACL_DATA_PACKET, packet)
),
hci.HCI_SYNCHRONOUS_DATA_PACKET: ScoPacketSplitter(
lambda packet: self.queue_packet(
hci.HCI_SYNCHRONOUS_DATA_PACKET, packet
)
),
} }
self.closed = False self.closed = False
self.lock = threading.Lock()
def start(self): def start(self):
# Set up transfer objects for input # Set up transfer objects for input
self.events_in_transfer = device.getTransfer() self.interrupt_in_transfer = self.device.getTransfer()
self.events_in_transfer.setInterrupt( self.interrupt_in_transfer.setInterrupt(
self.events_in, self.interrupt_in.getAddress(),
READ_SIZE, READ_SIZE,
callback=self.transfer_callback, callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET, user_data=hci.HCI_EVENT_PACKET,
) )
self.events_in_transfer.submit() self.done[self.interrupt_in_transfer] = asyncio.Event()
self.interrupt_in_transfer.submit()
self.acl_in_transfer = device.getTransfer() self.bulk_in_transfer = self.device.getTransfer()
self.acl_in_transfer.setBulk( self.bulk_in_transfer.setBulk(
self.acl_in, self.bulk_in.getAddress(),
READ_SIZE, READ_SIZE,
callback=self.transfer_callback, callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET, user_data=hci.HCI_ACL_DATA_PACKET,
) )
self.acl_in_transfer.submit() self.done[self.bulk_in_transfer] = asyncio.Event()
self.bulk_in_transfer.submit()
if self.isochronous_in is not None:
for _ in range(NUMBER_OF_SCO_IN_TRANSFERS):
transfer = self.device.getTransfer(iso_packets=MAX_SCO_IN_PACKETS)
transfer.setIsochronous(
self.isochronous_in.getAddress(),
MAX_SCO_IN_PACKETS * self.isochronous_in.getMaxPacketSize(),
callback=self.transfer_callback,
user_data=hci.HCI_SYNCHRONOUS_DATA_PACKET,
)
self.isochronous_in_transfers.append(transfer)
self.done[transfer] = asyncio.Event()
transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue()) self.dequeue_task = self.loop.create_task(self.dequeue())
@property def queue_packet(self, packet_type: int, packet_data: bytes) -> None:
def usb_transfer_submitted(self): self.loop.call_soon_threadsafe(
return ( self.queue.put_nowait, bytes([packet_type]) + packet_data
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
) )
def transfer_callback(self, transfer): def transfer_callback(self, transfer):
@@ -261,26 +544,53 @@ async def open_usb_transport(spec: str) -> Transport:
status = transfer.getStatus() status = transfer.getStatus()
# pylint: disable=no-member # pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED: if (
packet = ( packet_type != hci.HCI_SYNCHRONOUS_DATA_PACKET
bytes([packet_type]) or transfer.getActualLength()
+ transfer.getBuffer()[: transfer.getActualLength()] or status != usb1.TRANSFER_COMPLETED
):
logger.debug(
f"IN[{packet_type}] CALLBACK: status={status}, length={transfer.getActualLength()}"
)
if status == usb1.TRANSFER_COMPLETED:
with self.lock:
if self.closed:
logger.debug("packet source closed, discarding transfer")
elif (splitter := self.splitters.get(packet_type)) is None:
logger.warning(f'no splitter for packet type {packet_type}')
else:
if packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
for iso_status, iso_buffer in transfer.iterISO():
if not iso_buffer:
continue
if iso_status:
logger.warning(f"ISO packet status error: {iso_status}")
continue
logger.debug(
"### SCO packet: %d %s",
len(iso_buffer),
iso_buffer.hex(),
)
splitter.feed(iso_buffer)
else:
splitter.feed(
transfer.getBuffer()[: transfer.getActualLength()]
) )
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data # Re-submit the transfer so we can receive more data
try:
transfer.submit() transfer.submit()
except usb1.USBError as error:
logger.warning(f"Failed to re-submit transfer: {error}")
self.loop.call_soon_threadsafe(self.on_transport_lost)
elif status == usb1.TRANSFER_CANCELLED: elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe( logger.debug(f"IN[{packet_type}] transfer canceled")
self.cancel_done[packet_type].set_result, None self.loop.call_soon_threadsafe(self.done[transfer].set)
)
else: else:
logger.warning( logger.warning(
color( color(f'!!! IN[{packet_type}] transfer not completed', 'red')
f'!!! IN[{packet_type}] transfer not completed: status={status}',
'red',
)
) )
self.loop.call_soon_threadsafe(self.done[transfer].set)
self.loop.call_soon_threadsafe(self.on_transport_lost) self.loop.call_soon_threadsafe(self.on_transport_lost)
async def dequeue(self): async def dequeue(self):
@@ -293,72 +603,107 @@ async def open_usb_transport(spec: str) -> Transport:
try: try:
self.sink.on_packet(packet) self.sink.on_packet(packet)
except Exception: except Exception:
logger.exception( logger.exception(color('!!! Exception in sink.on_packet', 'red'))
color('!!! Exception in sink.on_packet', 'red')
)
def close(self): def close(self):
with self.lock:
self.closed = True self.closed = True
async def terminate(self): async def terminate(self):
if not self.closed:
self.close() self.close()
if self.dequeue_task:
self.dequeue_task.cancel() self.dequeue_task.cancel()
# Cancel the transfers # Cancel the transfers
for transfer in (self.events_in_transfer, self.acl_in_transfer): for transfer in (
self.interrupt_in_transfer,
self.bulk_in_transfer,
*self.isochronous_in_transfers,
):
if transfer is None:
continue
if transfer.isSubmitted(): if transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have # Try to cancel the transfer, but that may fail because it may
# already completed # have already completed
packet_type = transfer.getUserData() packet_type = transfer.getUserData()
assert isinstance(packet_type, int)
try: try:
transfer.cancel() transfer.cancel()
logger.debug( logger.debug(
f'waiting for IN[{packet_type}] transfer cancellation ' f'waiting for IN[{packet_type}] transfer cancellation '
'to be done...' 'to be done...'
) )
await self.cancel_done[packet_type] await self.done[transfer].wait()
logger.debug(f'IN[{packet_type}] transfer cancellation done') logger.debug(f'IN[{packet_type}] transfer cancellation done')
except usb1.USBError: except usb1.USBError as error:
logger.debug( logger.debug(
f'IN[{packet_type}] transfer likely already completed ' f'IN[{packet_type}] transfer likely already completed '
f'({error})'
) )
class UsbTransport(Transport): class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink): def __init__(self, context, device, acl_interface, sco_interface, source, sink):
super().__init__(source, sink) super().__init__(source, sink)
self.context = context self.context = context
self.device = device self.device = device
self.interface = interface self.acl_interface = acl_interface
self.sco_interface = sco_interface
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future() self.event_loop_done = self.loop.create_future()
self.event_loop_should_exit = False
self.lock = threading.Lock()
# Get exclusive access # Get exclusive access
device.claimInterface(interface) device.claimInterface(acl_interface.getNumber())
if sco_interface is not None:
device.claimInterface(sco_interface.getNumber())
# Set the alternate setting if not the default # Set the alternate setting if not the default
if setting != 0: if acl_interface.getAlternateSetting() != 0:
device.setInterfaceAltSetting(interface, setting) logger.debug(
f'setting ACL interface {acl_interface.getNumber()} '
f'altsetting {acl_interface.getAlternateSetting()}'
)
device.setInterfaceAltSetting(
acl_interface.getNumber(), acl_interface.getAlternateSetting()
)
if sco_interface is not None and sco_interface.getAlternateSetting() != 0:
logger.debug(
f'setting SCO interface {sco_interface.getNumber()} '
f'altsetting {sco_interface.getAlternateSetting()}'
)
device.setInterfaceAltSetting(
sco_interface.getNumber(), sco_interface.getAlternateSetting()
)
# The source and sink can now start # The source and sink can now start
source.start() source.start()
sink.start() sink.start()
# Create a thread to process events # Create a thread to process events
self.event_thread = threading.Thread(target=self.run) self.event_thread = threading.Thread(target=self.run, daemon=True)
self.event_thread.start() self.event_thread.start()
def run(self): def run(self):
logger.debug('starting USB event loop') logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted: while True:
with self.lock:
if self.event_loop_should_exit:
logger.debug("USB event loop exit requested")
break
# pylint: disable=no-member # pylint: disable=no-member
try: try:
self.context.handleEvents() self.context.handleEvents()
except usb1.USBErrorInterrupted: except usb1.USBErrorInterrupted:
pass pass
except Exception as error:
logger.warning(f'!!! Exception while handling events: {error}')
logger.debug('USB event loop done') logger.debug('ending USB event loop')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None) self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self): async def close(self):
@@ -366,12 +711,60 @@ async def open_usb_transport(spec: str) -> Transport:
self.sink.close() self.sink.close()
await self.source.terminate() await self.source.terminate()
await self.sink.terminate() await self.sink.terminate()
self.device.releaseInterface(self.interface)
# We no longer need the event loop to run
with self.lock:
self.event_loop_should_exit = True
self.context.interruptEventHandler()
self.device.releaseInterface(self.acl_interface.getNumber())
if self.sco_interface:
self.device.releaseInterface(self.sco_interface.getNumber())
self.device.close() self.device.close()
self.context.close() self.context.close()
# Wait for the thread to terminate # Wait for the thread to terminate
logger.debug("waiting for USB event loop to be done...")
await self.event_loop_done await self.event_loop_done
logger.debug("USB event loop done")
async def open_usb_transport(spec: str) -> Transport:
'''
Open a USB transport.
The moniker string has this syntax:
either <index> or
<vendor>:<product> or
<vendor>:<product>/<serial-number>] or
<vendor>:<product>#<index>
With <index> as the 0-based index to select amongst all the devices that appear
to be supporting Bluetooth HCI (0 being the first one), or
Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The
/<serial-number> suffix or #<index> suffix max be specified when more than one
device with the same vendor and product identifiers are present.
Opotionally, the moniker may include a +sco=<alternate> suffix to enable SCO/eSCO
and specify the alternate setting to use for SCO/eSCO transfers, with 0 meaning an
automatic selection.
In addition, if the moniker ends with the symbol "!", the device will be used in
"forced" mode:
the first USB interface of the device will be used, regardless of the interface
class/subclass.
This may be useful for some devices that use a custom class/subclass but may
nonetheless work as-is.
Examples:
0 --> the first BT USB dongle
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and
serial number 00E04C239987
0B05:17CB! --> the BT USB dongle with vendor=0B05 and product=17CB, in "forced"
mode.
0+sco=0 --> the first BT USB dongle, with SCO enabled using auto-selection.
0+sco=5 --> the first BT USB dongle, with SCO enabled using alternate setting 5.
'''
# Find the device according to the spec moniker # Find the device according to the spec moniker
load_libusb() load_libusb()
@@ -379,6 +772,7 @@ async def open_usb_transport(spec: str) -> Transport:
context.open() context.open()
try: try:
found = None found = None
device = None
if spec.endswith('!'): if spec.endswith('!'):
spec = spec[:-1] spec = spec[:-1]
@@ -386,6 +780,12 @@ async def open_usb_transport(spec: str) -> Transport:
else: else:
forced_mode = False forced_mode = False
if '+sco=' in spec:
spec, sco_alternate_str = spec.split('+sco=')
sco_alternate = int(sco_alternate_str)
else:
sco_alternate = None
if ':' in spec: if ':' in spec:
vendor_id, product_id = spec.split(':') vendor_id, product_id = spec.split(':')
serial_number = None serial_number = None
@@ -461,76 +861,41 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug(f'USB Device: {found}') logger.debug(f'USB Device: {found}')
# Look for the first interface with the right class and endpoints assert device is not None
def find_endpoints(device): endpoints = find_endpoints(device, forced_mode, sco_alternate)
# pylint: disable-next=too-many-nested-blocks
for configuration_index, configuration in enumerate(device):
interface = None
for interface in configuration:
setting = None
for setting in interface:
if (
not forced_mode
and (
setting.getClass(),
setting.getSubClass(),
setting.getProtocol(),
)
!= USB_BT_HCI_CLASS_TUPLE
):
continue
events_in = None
acl_in = None
acl_out = None
for endpoint in setting:
attributes = endpoint.getAttributes()
address = endpoint.getAddress()
if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
if address & USB_ENDPOINT_IN and acl_in is None:
acl_in = address
elif acl_out is None:
acl_out = address
elif (
attributes & 0x03
== USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT
):
if address & USB_ENDPOINT_IN and events_in is None:
events_in = address
# Return if we found all 3 endpoints
if (
acl_in is not None
and acl_out is not None
and events_in is not None
):
return (
configuration_index + 1,
setting.getNumber(),
setting.getAlternateSetting(),
acl_in,
acl_out,
events_in,
)
logger.debug(
f'skipping configuration {configuration_index + 1} / '
f'interface {setting.getNumber()}'
)
return None
endpoints = find_endpoints(found)
if endpoints is None: if endpoints is None:
raise TransportInitError('no compatible interface found for device') raise TransportInitError('no compatible interface found for device')
(configuration, interface, setting, acl_in, acl_out, events_in) = endpoints (
configuration,
acl_interface,
sco_interface,
interrupt_in,
bulk_in,
isochronous_in,
bulk_out,
isochronous_out,
) = endpoints
acl_interface_info = (
f'acl_interface={acl_interface.getNumber()}/'
f'{acl_interface.getAlternateSetting()}'
)
sco_interface_info = (
'<none>'
if sco_interface is None
else (
f'sco_interface={sco_interface.getNumber()}/'
f'{sco_interface.getAlternateSetting()}'
)
)
logger.debug( logger.debug(
f'selected endpoints: configuration={configuration}, ' f'selected endpoints: configuration={configuration}, '
f'interface={interface}, ' f'acl_interface={acl_interface_info}, '
f'setting={setting}, ' f'sco_interface={sco_interface_info}, '
f'acl_in=0x{acl_in:02X}, ' f'interrupt_in=0x{interrupt_in.getAddress():02X}, '
f'acl_out=0x{acl_out:02X}, ' f'bulk_in=0x{bulk_in.getAddress():02X}, '
f'events_in=0x{events_in:02X}, ' f'bulk_out=0x{bulk_out.getAddress():02X}, '
f'isochronous_in=0x{isochronous_in.getAddress() if isochronous_in else 0:02X}, '
f'isochronous_out=0x{isochronous_out.getAddress() if isochronous_out else 0:02X}'
) )
device_metadata = { device_metadata = {
@@ -562,9 +927,11 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError: except usb1.USBError:
logger.warning('failed to set configuration') logger.warning('failed to set configuration')
source = UsbPacketSource(device, device_metadata, acl_in, events_in) source = UsbPacketSource(
sink = UsbPacketSink(device, acl_out) device, device_metadata, interrupt_in, bulk_in, isochronous_in
return UsbTransport(context, device, interface, setting, source, sink) )
sink = UsbPacketSink(device, bulk_out, isochronous_out)
return UsbTransport(context, device, acl_interface, sco_interface, source, sink)
except usb1.USBError as error: except usb1.USBError as error:
logger.warning(color(f'!!! failed to open USB device: {error}', 'red')) logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
context.close() context.close()
+184 -53
View File
@@ -20,17 +20,119 @@ import contextlib
import functools import functools
import json import json
import sys import sys
import wave
import websockets.asyncio.server import websockets.asyncio.server
import bumble.logging import bumble.logging
from bumble import hci, hfp, rfcomm from bumble import hci, hfp, rfcomm
from bumble.device import Connection, Device from bumble.device import Connection, Device, ScoLink
from bumble.hfp import HfProtocol from bumble.hfp import HfProtocol
from bumble.transport import open_transport from bumble.transport import open_transport
# -----------------------------------------------------------------------------
ws: websockets.asyncio.server.ServerConnection | None = None ws: websockets.asyncio.server.ServerConnection | None = None
hf_protocol: HfProtocol | None = None hf_protocol: HfProtocol | None = None
input_wav: wave.Wave_read | None = None
output_wav: wave.Wave_write | None = None
# -----------------------------------------------------------------------------
def on_audio_packet(packet: hci.HCI_SynchronousDataPacket) -> None:
if (
packet.packet_status
!= hci.HCI_SynchronousDataPacket.Status.CORRECTLY_RECEIVED_DATA
):
print('!!! discarding packet with status ', packet.packet_status.name)
return
frame_count = len(packet.data) // 2
print(f">>> received {frame_count} PCM samples")
if output_wav:
# Save the PCM audio to the output
output_wav.writeframes(packet.data)
if input_wav and hf_protocol:
# Send PCM audio from the input, same amount as what was received
while not (pcm_data := input_wav.readframes(frame_count)):
input_wav.setpos(0) # Loop
print(f">>> sending {frame_count} PCM samples")
hf_protocol.dlc.multiplexer.l2cap_channel.connection.device.host.send_sco_sdu(
connection_handle=packet.connection_handle,
sdu=pcm_data,
)
# -----------------------------------------------------------------------------
def on_sco_connection(link: ScoLink) -> None:
print('### SCO connection established:', link)
if link.air_mode == hci.CodecID.TRANSPARENT:
print("@@@ The controller does not encode/decode voice")
return
link.sink = on_audio_packet
# -----------------------------------------------------------------------------
def on_sco_request(
link_type: int, connection: Connection, protocol: HfProtocol
) -> None:
if link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO:
esco_parameters = hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.SCO_CVSD_D1]
elif protocol.active_codec == hfp.AudioCodec.MSBC:
esco_parameters = hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_MSBC_T2]
elif protocol.active_codec == hfp.AudioCodec.CVSD:
esco_parameters = hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S4]
else:
raise RuntimeError("unknown active codec")
if connection.device.host.supports_command(
hci.HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND
):
connection.cancel_on_disconnection(
connection.device.send_async_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address, **esco_parameters.asdict()
)
)
)
elif connection.device.host.supports_command(
hci.HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND
):
connection.cancel_on_disconnection(
connection.device.send_async_command(
hci.HCI_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address,
transmit_bandwidth=esco_parameters.transmit_bandwidth,
receive_bandwidth=esco_parameters.receive_bandwidth,
max_latency=esco_parameters.max_latency,
voice_setting=int(
hci.VoiceSetting(
input_sample_size=hci.VoiceSetting.InputSampleSize.SIZE_16_BITS,
input_data_format=hci.VoiceSetting.InputDataFormat.TWOS_COMPLEMENT,
)
),
retransmission_effort=esco_parameters.retransmission_effort,
packet_type=esco_parameters.packet_type,
)
)
)
else:
print('!!! no supported command for SCO connection request')
return
global output_wav
if output_wav:
output_wav.setnchannels(1)
output_wav.setsampwidth(2)
match protocol.active_codec:
case hfp.AudioCodec.CVSD:
output_wav.setframerate(8000)
case hfp.AudioCodec.MSBC:
output_wav.setframerate(16000)
connection.on('sco_connection', on_sco_connection)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -40,59 +142,47 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
hf_protocol = HfProtocol(dlc, configuration) hf_protocol = HfProtocol(dlc, configuration)
asyncio.create_task(hf_protocol.run()) asyncio.create_task(hf_protocol.run())
def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol): connection = dlc.multiplexer.l2cap_channel.connection
if connection == protocol.dlc.multiplexer.l2cap_channel.connection: handler = functools.partial(
if link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO: on_sco_request,
esco_parameters = hfp.ESCO_PARAMETERS[ connection=connection,
hfp.DefaultCodecParameters.SCO_CVSD_D1 protocol=hf_protocol,
]
elif protocol.active_codec == hfp.AudioCodec.MSBC:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_MSBC_T2
]
elif protocol.active_codec == hfp.AudioCodec.CVSD:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S4
]
else:
raise RuntimeError("unknown active codec")
connection.cancel_on_disconnection(
connection.device.send_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address, **esco_parameters.asdict()
) )
) connection.on('sco_request', handler)
)
handler = functools.partial(on_sco_request, protocol=hf_protocol)
dlc.multiplexer.l2cap_channel.connection.device.on('sco_request', handler)
dlc.multiplexer.l2cap_channel.once( dlc.multiplexer.l2cap_channel.once(
'close', 'close',
lambda: dlc.multiplexer.l2cap_channel.connection.device.remove_listener( lambda: connection.remove_listener('sco_request', handler),
'sco_request', handler
),
) )
hf_protocol.on('ag_indicator', on_ag_indicator)
hf_protocol.on('codec_negotiation', on_codec_negotiation)
# -----------------------------------------------------------------------------
def on_ag_indicator(indicator): def on_ag_indicator(indicator):
global ws global ws
if ws: if ws:
asyncio.create_task(ws.send(str(indicator))) asyncio.create_task(ws.send(str(indicator)))
hf_protocol.on('ag_indicator', on_ag_indicator)
# -----------------------------------------------------------------------------
def on_codec_negotiation(codec: hfp.AudioCodec):
print(f'### Negotiated codec: {codec.name}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main() -> None: async def run(device: Device, codec: str | None) -> None:
if len(sys.argv) < 3: if codec is None:
print('Usage: run_classic_hfp.py <device-config> <transport-spec>') supported_audio_codecs = [hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC]
print('example: run_classic_hfp.py classic2.json usb:04b4:f901') else:
if codec == 'cvsd':
supported_audio_codecs = [hfp.AudioCodec.CVSD]
elif codec == 'msbc':
supported_audio_codecs = [hfp.AudioCodec.MSBC]
else:
print('Unknown codec: ', codec)
return return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Hands-Free profile configuration. # Hands-Free profile configuration.
# TODO: load configuration from file. # TODO: load configuration from file.
configuration = hfp.HfConfiguration( configuration = hfp.HfConfiguration(
@@ -108,18 +198,9 @@ async def main() -> None:
supported_hf_indicators=[ supported_hf_indicators=[
hfp.HfIndicator.BATTERY_LEVEL, hfp.HfIndicator.BATTERY_LEVEL,
], ],
supported_audio_codecs=[ supported_audio_codecs=supported_audio_codecs,
hfp.AudioCodec.CVSD,
hfp.AudioCodec.MSBC,
],
) )
# Create a device
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create and register a server # Create and register a server
rfcomm_server = rfcomm.Server(device) rfcomm_server = rfcomm.Server(device)
@@ -129,9 +210,7 @@ async def main() -> None:
# Advertise the HFP RFComm channel in the SDP # Advertise the HFP RFComm channel in the SDP
device.sdp_service_records = { device.sdp_service_records = {
0x00010001: hfp.make_hf_sdp_records( 0x00010001: hfp.make_hf_sdp_records(0x00010001, channel_number, configuration)
0x00010001, channel_number, configuration
)
} }
# Let's go! # Let's go!
@@ -167,7 +246,59 @@ async def main() -> None:
await websockets.asyncio.server.serve(serve, 'localhost', 8989) await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await hci_transport.source.terminated await asyncio.get_running_loop().create_future() # run forever
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_hfp_handsfree.py <device-config> <transport-spec> '
'[codec] [input] [output]'
)
print('example: run_hfp_handsfree.py classic2.json usb:0')
return
device_config = sys.argv[1]
transport_spec = sys.argv[2]
codec: str | None = None
if len(sys.argv) >= 4:
codec = sys.argv[3]
input_file_name: str | None = None
if len(sys.argv) >= 5:
input_file_name = sys.argv[4]
output_file_name: str | None = None
if len(sys.argv) >= 6:
output_file_name = sys.argv[5]
global input_wav, output_wav
input_cm: contextlib.AbstractContextManager[wave.Wave_read | None] = (
wave.open(input_file_name, "rb")
if input_file_name
else contextlib.nullcontext(None)
)
output_cm: contextlib.AbstractContextManager[wave.Wave_write | None] = (
wave.open(output_file_name, "wb")
if output_file_name
else contextlib.nullcontext(None)
)
with input_cm as input_wav, output_cm as output_wav:
if input_wav and input_wav.getnchannels() != 1:
print("Mono input required")
return
if input_wav and input_wav.getsampwidth() != 2:
print("16-bit input required")
return
async with await open_transport(transport_spec) as transport:
device = Device.from_config_file_with_hci(
device_config, transport.source, transport.sink
)
device.classic_enabled = True
await run(device, codec)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
+2
View File
@@ -170,7 +170,9 @@ def format_code(ctx, check=False, diff=False):
@task @task
def check_types(ctx): def check_types(ctx):
checklist = ["apps", "bumble", "examples", "tests", "tasks.py"] checklist = ["apps", "bumble", "examples", "tests", "tasks.py"]
print(">>> Running the type checker...")
try: try:
print("+++ Checking with mypy...")
ctx.run(f"mypy {' '.join(checklist)}") ctx.run(f"mypy {' '.join(checklist)}")
except UnexpectedExit as exc: except UnexpectedExit as exc:
print("Please check your code against the mypy messages.") print("Please check your code against the mypy messages.")
+64 -1
View File
@@ -24,7 +24,7 @@ import sys
import pytest import pytest
from bumble import controller, device, hci, link, transport from bumble import controller, device, hci, link, transport
from bumble.transport import common from bumble.transport import common, usb
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -252,6 +252,69 @@ async def test_open_transport_with_metadata(spec):
await controller_transport.close() await controller_transport.close()
# -----------------------------------------------------------------------------
def test_packet_splitter_complete():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
splitter.feed(packet)
assert emitted == [packet]
def test_packet_splitter_chunks():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
splitter.feed(packet[:4])
assert emitted == []
splitter.feed(packet[4:])
assert emitted == [packet]
def test_packet_splitter_multiple():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66])
splitter.feed(packet1 + packet2)
assert emitted == [packet1, packet2]
def test_packet_splitter_partial():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66])
splitter.feed(packet1 + packet2[:4])
assert emitted == [packet1]
splitter.feed(packet2[4:])
assert emitted == [packet1, packet2]
def test_packet_splitter_empty_payload():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x00, 0x00])
splitter.feed(packet)
assert emitted == [packet]
def test_sco_packet_splitter():
emitted = []
splitter = usb.ScoPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x03, 0x11, 0x22, 0x33])
splitter.feed(packet)
assert emitted == [packet]
def test_event_packet_splitter():
emitted = []
splitter = usb.EventPacketSplitter(emitted.append)
packet = bytes([0x04, 0x02, 0x11, 0x22])
splitter.feed(packet)
assert emitted == [packet]
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_parser() test_parser()
+95
View File
@@ -0,0 +1,95 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from unittest import mock
import pytest
from bumble import hci
from bumble.transport import usb
@pytest.mark.asyncio
async def test_usb_packet_sink_iso_routing():
# Mock usb1 device and endpoints
mock_device = mock.Mock()
mock_bulk_out = mock.Mock()
mock_bulk_out.getAddress.return_value = 0x02
# Scenario 1: Isochronous endpoints are not enabled (isochronous_out is None)
mock_transfer = mock.Mock()
mock_device.getTransfer.return_value = mock_transfer
sink = usb.UsbPacketSink(mock_device, mock_bulk_out, isochronous_out=None)
sink.start()
# Send HCI_ISO_DATA_PACKET
iso_packet = bytes([hci.HCI_ISO_DATA_PACKET, 0x01, 0x02, 0x03])
sink.on_packet(iso_packet)
# Yield control to let the queue processor run
await asyncio.sleep(0.01)
# Verify it was sent via bulk transfer
mock_transfer.setBulk.assert_called_once_with(
0x02,
bytes([0x01, 0x02, 0x03]),
callback=sink.transfer_callback,
)
mock_transfer.submit.assert_called_once()
if sink.queue_task:
sink.queue_task.cancel()
try:
await sink.queue_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_usb_packet_sink_iso_routing_with_iso_endpoint():
# Mock usb1 device and endpoints
mock_device = mock.Mock()
mock_bulk_out = mock.Mock()
mock_bulk_out.getAddress.return_value = 0x02
mock_iso_out = mock.Mock()
mock_iso_out.getMaxPacketSize.return_value = 64
# Scenario 2: Isochronous endpoints are enabled
mock_transfer_bulk = mock.Mock()
mock_transfer_iso = mock.Mock()
# getTransfer is called twice: once for bulk_or_control and once for isochronous
mock_device.getTransfer.side_effect = [mock_transfer_bulk, mock_transfer_iso]
sink = usb.UsbPacketSink(mock_device, mock_bulk_out, isochronous_out=mock_iso_out)
sink.start()
# Send HCI_ISO_DATA_PACKET
iso_packet = bytes([hci.HCI_ISO_DATA_PACKET, 0x01, 0x02, 0x03])
sink.on_packet(iso_packet)
# Yield control to let the queue processor run
await asyncio.sleep(0.01)
# Verify it was NOT sent via bulk transfer
mock_transfer_bulk.setBulk.assert_not_called()
if sink.queue_task:
sink.queue_task.cancel()
try:
await sink.queue_task
except asyncio.CancelledError:
pass