Compare commits

..

3 Commits

Author SHA1 Message Date
Josh Wu 72d821b1f6 Merge pull request #928 from zxzxwu/avdtp
AVDTP: Avoid explicit in_use management
2026-05-26 16:33:08 +08:00
Josh Wu afe064b4ea AVDTP: Make local stream endpoint in_use dyanmic property 2026-05-22 15:58:11 +08:00
Josh Wu 8d0cef70c2 AVDTP: Add keyword argument to long __init__ 2026-05-20 16:19:06 +08:00
13 changed files with 604 additions and 1276 deletions
+1 -1
View File
@@ -489,7 +489,7 @@ class Sender:
flags=(
Packet.PacketFlags.LAST
if tx_i == self.tx_packet_count - 1
else Packet.PacketFlags(0)
else 0
),
sequence=tx_i,
timestamp=int((time.time() - self.start_time) * 1000000),
-12
View File
@@ -45,10 +45,8 @@ from bumble.hci import (
HCI_Read_Local_Supported_Codecs_Command,
HCI_Read_Local_Supported_Codecs_V2_Command,
HCI_Read_Local_Version_Information_Command,
HCI_Read_Voice_Setting_Command,
LeFeature,
SpecificationVersion,
VoiceSetting,
map_null_terminated_utf8_string,
)
from bumble.host import Host
@@ -216,16 +214,6 @@ async def get_codecs_info(host: Host) -> None:
if not response2.vendor_specific_codec_ids:
print(' No Vendor-specific codecs')
if host.supports_command(HCI_Read_Voice_Setting_Command.op_code):
response3 = await host.send_sync_command(HCI_Read_Voice_Setting_Command())
voice_setting = VoiceSetting.from_int(response3.voice_setting)
print(color('Voice Setting:', 'yellow'))
print(f' Air Coding Format: {voice_setting.air_coding_format.name}')
print(f' Linear PCM Bit Position: {voice_setting.linear_pcm_bit_position}')
print(f' Input Sample Size: {voice_setting.input_sample_size.name}')
print(f' Input Data Format: {voice_setting.input_data_format.name}')
print(f' Input Coding Format: {voice_setting.input_coding_format.name}')
# -----------------------------------------------------------------------------
async def async_main(
+41 -151
View File
@@ -16,8 +16,6 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import statistics
import struct
import time
import click
@@ -27,9 +25,7 @@ from bumble.colors import color
from bumble.hci import (
HCI_READ_LOOPBACK_MODE_COMMAND,
HCI_WRITE_LOOPBACK_MODE_COMMAND,
Address,
HCI_Read_Loopback_Mode_Command,
HCI_SynchronousDataPacket,
HCI_Write_Loopback_Mode_Command,
LoopbackMode,
)
@@ -40,121 +36,55 @@ from bumble.transport import open_transport
class Loopback:
"""Send and receive ACL data packets in local loopback mode"""
def __init__(
self,
packet_size: int,
packet_count: int,
connection_type: str,
mode: str,
interval: int,
transport: str,
):
def __init__(self, packet_size: int, packet_count: int, transport: str):
self.transport = transport
self.packet_size = packet_size
self.packet_count = packet_count
self.connection_handle: int | None = None
self.connection_type = connection_type
self.connection_event = asyncio.Event()
self.mode = mode
self.interval = interval
self.done = asyncio.Event()
self.expected_counter = 0
self.expected_cid = 0
self.bytes_received = 0
self.start_timestamp = 0.0
self.last_timestamp = 0.0
self.send_timestamps: list[float] = []
self.rtts: list[float] = []
def on_connection(self, connection_handle: int, *args):
"""Retrieve connection handle from new connection event"""
if not self.connection_event.is_set():
# The first connection handle is of type ACL,
# subsequent connections are of type SCO
if self.connection_type == "sco" and self.connection_handle is None:
self.connection_handle = connection_handle
return
# save first connection handle for ACL
# subsequent connections are SCO
self.connection_handle = connection_handle
self.connection_event.set()
def on_sco_connection(
self,
address: Address,
connection_handle: int,
link_type,
rx_packet_length: int,
tx_packet_length: int,
air_mode: int,
) -> None:
self.on_connection(connection_handle)
def on_packet(self, connection_handle: int, packet: bytes):
def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes):
"""Calculate packet receive speed"""
now = time.time()
(counter,) = struct.unpack_from("H", packet, 0)
rtt = now - self.send_timestamps[counter]
self.rtts.append(rtt)
print(f'<<< Received packet {counter}: {len(packet)} bytes, RTT={rtt:.4f}')
print(f'<<< Received packet {cid}: {len(pdu)} bytes')
assert connection_handle == self.connection_handle
assert counter == self.expected_counter
self.expected_counter += 1
if counter == 0:
assert cid == self.expected_cid
self.expected_cid += 1
if cid == 0:
self.start_timestamp = now
else:
elapsed_since_start = now - self.start_timestamp
elapsed_since_last = now - self.last_timestamp
self.bytes_received += len(packet)
instant_rx_speed = len(packet) / elapsed_since_last
self.bytes_received += len(pdu)
instant_rx_speed = len(pdu) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start
if self.mode == 'throughput':
print(
color(
f'@@@ RX speed: instant={instant_rx_speed:.4f},'
f' average={average_rx_speed:.4f},',
'cyan',
)
print(
color(
f'@@@ RX speed: instant={instant_rx_speed:.4f},'
f' average={average_rx_speed:.4f}',
'cyan',
)
)
self.last_timestamp = now
if self.expected_counter == self.packet_count:
if self.expected_cid == self.packet_count:
print(color('@@@ Received last packet', 'green'))
self.done.set()
def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes):
self.on_packet(connection_handle, pdu)
def on_sco_packet(self, connection_handle: int, packet) -> None:
self.on_packet(connection_handle, packet)
async def send_acl_packet(self, host: Host, packet: bytes) -> None:
assert self.connection_handle
host.send_l2cap_pdu(self.connection_handle, 0, packet)
async def send_sco_packet(self, host: Host, packet: bytes) -> None:
assert self.connection_handle
host.send_hci_packet(
HCI_SynchronousDataPacket(
connection_handle=self.connection_handle,
packet_status=HCI_SynchronousDataPacket.Status.CORRECTLY_RECEIVED_DATA,
data_total_length=len(packet),
data=packet,
)
)
async def send_loop(self, host: Host, sender) -> None:
for counter in range(0, self.packet_count):
print(
color(
f'>>> Sending {self.connection_type.upper()} '
f'packet {counter}: {self.packet_size} bytes',
'yellow',
)
)
self.send_timestamps.append(time.time())
await sender(host, struct.pack("H", counter) + bytes(self.packet_size - 2))
await asyncio.sleep(self.interval / 1000 if self.mode == "rtt" else 0)
async def run(self) -> None:
"""Run a loopback throughput test"""
print(color('>>> Connecting to HCI...', 'green'))
@@ -196,11 +126,8 @@ class Loopback:
return
# set event callbacks
host.on('classic_connection', self.on_connection)
host.on('le_connection', self.on_connection)
host.on('sco_connection', self.on_sco_connection)
host.on('connection', self.on_connection)
host.on('l2cap_pdu', self.on_l2cap_pdu)
host.on('sco_packet', self.on_sco_packet)
loopback_mode = LoopbackMode.LOCAL
@@ -221,37 +148,32 @@ class Loopback:
print(color('=== Start sending', 'magenta'))
start_time = time.time()
if self.connection_type == "acl":
sender = self.send_acl_packet
elif self.connection_type == "sco":
sender = self.send_sco_packet
else:
raise ValueError(f'Unknown connection type: {self.connection_type}')
await self.send_loop(host, sender)
bytes_sent = 0
for cid in range(0, self.packet_count):
# using the cid as an incremental index
host.send_l2cap_pdu(
self.connection_handle, cid, bytes(self.packet_size)
)
print(
color(
f'>>> Sending packet {cid}: {self.packet_size} bytes', 'yellow'
)
)
bytes_sent += self.packet_size # don't count L2CAP or HCI header sizes
await asyncio.sleep(0) # yield to allow packet receive
await self.done.wait()
print(color('=== Done!', 'magenta'))
bytes_sent = self.packet_size * self.packet_count
elapsed = time.time() - start_time
average_tx_speed = bytes_sent / elapsed
if self.mode == 'throughput':
print(
color(
f'@@@ TX speed: average={average_tx_speed:.4f} '
f'({bytes_sent} bytes in {elapsed:.2f} seconds)',
'green',
)
)
if self.mode == 'rtt':
print(
color(
f'RTTs: min={min(self.rtts):.4f}, '
f'max={max(self.rtts):.4f}, '
f'avg={statistics.mean(self.rtts):.4f}',
'blue',
)
print(
color(
f'@@@ TX speed: average={average_tx_speed:.4f} ({bytes_sent} bytes'
f' in {elapsed:.2f} seconds)',
'green',
)
)
# -----------------------------------------------------------------------------
@@ -272,43 +194,11 @@ class Loopback:
default=10,
help='Packet count',
)
@click.option(
'--connection-type',
'-t',
metavar='TYPE',
type=click.Choice(['acl', 'sco']),
default='acl',
help='Connection type',
)
@click.option(
'--mode',
'-m',
metavar='MODE',
type=click.Choice(['throughput', 'rtt']),
default='throughput',
help='Test mode',
)
@click.option(
'--interval',
type=int,
default=100,
help='Inter-packet interval (ms) [RTT mode only]',
)
@click.argument('transport')
def main(packet_size, packet_count, connection_type, mode, interval, transport):
def main(packet_size, packet_count, transport):
bumble.logging.setup_basic_logging()
if connection_type == "sco" and packet_size > 255:
print("ERROR: the maximum packet size for SCO is 255")
return
async def run():
loopback = Loopback(
packet_size, packet_count, connection_type, mode, interval, transport
)
await loopback.run()
asyncio.run(run())
loopback = Loopback(packet_size, packet_count, transport)
asyncio.run(loopback.run())
# -----------------------------------------------------------------------------
+1 -6
View File
@@ -111,14 +111,9 @@ def show_device_details(device):
if (endpoint.getAddress() & USB_ENDPOINT_IN == 0)
else 'IN'
)
endpoint_details = (
f', Max Packet Size = {endpoint.getMaxPacketSize()}'
if endpoint_type == 'ISOCHRONOUS'
else ''
)
print(
f' Endpoint 0x{endpoint.getAddress():02X}: '
f'{endpoint_type} {endpoint_direction}{endpoint_details}'
f'{endpoint_type} {endpoint_direction}'
)
+41 -24
View File
@@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import abc
import asyncio
import enum
import logging
@@ -1946,9 +1947,6 @@ class Stream:
await self.rtp_channel.disconnect()
self.rtp_channel = None
# Release the endpoint
self.local_endpoint.in_use = 0
self.change_state(State.IDLE)
async def on_set_configuration_command(
@@ -2039,7 +2037,6 @@ class Stream:
if self.rtp_channel is None:
# No channel to release, we're done
self.local_endpoint.in_use = 0
self.change_state(State.IDLE)
else:
# TODO: set a timer as we wait for the RTP channel to be closed
@@ -2051,7 +2048,6 @@ class Stream:
await self.local_endpoint.on_abort_command()
if self.rtp_channel is None:
# No need to wait
self.local_endpoint.in_use = 0
self.change_state(State.IDLE)
else:
# Wait for the RTP channel to be closed
@@ -2074,7 +2070,6 @@ class Stream:
def on_l2cap_channel_close(self) -> None:
logger.debug(color('<<< stream channel closed', 'magenta'))
self.local_endpoint.on_rtp_channel_close()
self.local_endpoint.in_use = 0
self.rtp_channel = None
if self.state in (State.CLOSING, State.ABORTING):
@@ -2099,7 +2094,6 @@ class Stream:
self.state = State.IDLE
local_endpoint.stream = self
local_endpoint.in_use = 1
def __str__(self) -> str:
return (
@@ -2109,14 +2103,16 @@ class Stream:
# -----------------------------------------------------------------------------
@dataclass
class StreamEndPoint:
class StreamEndPoint(abc.ABC):
seid: int
media_type: MediaType
tsep: StreamEndPointType
in_use: int
capabilities: Iterable[ServiceCapabilities]
@property
def in_use(self) -> int:
raise NotImplementedError()
# -----------------------------------------------------------------------------
class StreamEndPointProxy:
@@ -2156,14 +2152,30 @@ class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy):
in_use: int,
capabilities: Iterable[ServiceCapabilities],
) -> None:
StreamEndPoint.__init__(self, seid, media_type, tsep, in_use, capabilities)
StreamEndPointProxy.__init__(self, protocol, seid)
# StreamEndPoint attributes
self.seid = seid
self.media_type = media_type
self.tsep = tsep
self._in_use = in_use
self.capabilities = capabilities
StreamEndPointProxy.__init__(self, protocol=protocol, seid=seid)
@property
def in_use(self) -> int:
return self._in_use
# -----------------------------------------------------------------------------
class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter):
stream: Stream | None
@property
def in_use(self) -> int:
if self.stream and self.stream.state != State.IDLE:
return 1
return 0
EVENT_CONFIGURATION = "configuration"
EVENT_OPEN = "open"
EVENT_START = "start"
@@ -2186,8 +2198,13 @@ class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter):
capabilities: Iterable[ServiceCapabilities],
configuration: Iterable[ServiceCapabilities] | None = None,
):
StreamEndPoint.__init__(self, seid, media_type, tsep, 0, capabilities)
utils.EventEmitter.__init__(self)
# StreamEndPoint attributes
self.seid = seid
self.media_type = media_type
self.tsep = tsep
self.capabilities = capabilities
self.protocol = protocol
self.configuration = configuration if configuration is not None else []
self.stream = None
@@ -2273,12 +2290,12 @@ class LocalSource(LocalStreamEndPoint):
codec_capabilities,
] + list(other_capabilities)
super().__init__(
protocol,
seid,
codec_capabilities.media_type,
AVDTP_TSEP_SRC,
capabilities,
capabilities,
protocol=protocol,
seid=seid,
media_type=codec_capabilities.media_type,
tsep=AVDTP_TSEP_SRC,
capabilities=capabilities,
configuration=capabilities,
)
self.packet_pump = packet_pump
@@ -2317,11 +2334,11 @@ class LocalSink(LocalStreamEndPoint):
codec_capabilities,
]
super().__init__(
protocol,
seid,
codec_capabilities.media_type,
AVDTP_TSEP_SNK,
capabilities,
protocol=protocol,
seid=seid,
media_type=codec_capabilities.media_type,
tsep=AVDTP_TSEP_SNK,
capabilities=capabilities,
)
def on_rtp_channel_open(self) -> None:
+5 -22
View File
@@ -1423,9 +1423,6 @@ class ScoLink(utils.CompositeEventEmitter):
acl_connection: Connection
handle: int
link_type: int
rx_packet_length: int
tx_packet_length: int
air_mode: hci.CodecID
sink: Callable[[hci.HCI_SynchronousDataPacket], Any] | None = None
EVENT_DISCONNECTION: ClassVar[str] = "disconnection"
@@ -6052,7 +6049,7 @@ class Device(utils.CompositeEventEmitter):
def on_connection_request(
self, bd_addr: hci.Address, class_of_device: int, link_type: int
):
logger.debug(f'*** Connection request: {bd_addr} link_type={link_type}')
logger.debug(f'*** Connection request: {bd_addr}')
# Handle SCO request.
if link_type in (
@@ -6062,7 +6059,6 @@ class Device(utils.CompositeEventEmitter):
if connection := self.find_connection_by_bd_addr(
bd_addr, transport=PhysicalTransport.BR_EDR
):
connection.emit(self.EVENT_SCO_REQUEST, link_type)
self.emit(self.EVENT_SCO_REQUEST, connection, link_type)
else:
logger.error(f'SCO request from a non-connected device {bd_addr}')
@@ -6422,7 +6418,8 @@ class Device(utils.CompositeEventEmitter):
logger.warning('peer name is not valid UTF-8')
if connection:
connection.emit(connection.EVENT_REMOTE_NAME_FAILURE, error)
self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error)
else:
self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error)
# [Classic only]
@host_event_handler
@@ -6439,13 +6436,7 @@ class Device(utils.CompositeEventEmitter):
@with_connection_from_address
@utils.experimental('Only for testing.')
def on_sco_connection(
self,
acl_connection: Connection,
sco_handle: int,
link_type: int,
rx_packet_length: int,
tx_packet_length: int,
air_mode: int,
self, acl_connection: Connection, sco_handle: int, link_type: int
) -> None:
logger.debug(
f'*** SCO connected: {acl_connection.peer_address}, '
@@ -6457,11 +6448,7 @@ class Device(utils.CompositeEventEmitter):
acl_connection=acl_connection,
handle=sco_handle,
link_type=link_type,
rx_packet_length=rx_packet_length,
tx_packet_length=tx_packet_length,
air_mode=hci.CodecID(air_mode),
)
acl_connection.emit(self.EVENT_SCO_CONNECTION, sco_link)
self.emit(self.EVENT_SCO_CONNECTION, sco_link)
# [Classic only]
@@ -6472,8 +6459,7 @@ class Device(utils.CompositeEventEmitter):
self, acl_connection: Connection, status: int
) -> None:
logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
acl_connection.emit(self.EVENT_SCO_CONNECTION_FAILURE, status)
self.emit(self.EVENT_SCO_CONNECTION_FAILURE, status)
self.emit(self.EVENT_SCO_CONNECTION_FAILURE)
# [Classic only]
@host_event_handler
@@ -6936,18 +6922,15 @@ class Device(utils.CompositeEventEmitter):
@with_connection_from_address
def on_classic_pairing(self, connection: Connection) -> None:
connection.emit(connection.EVENT_CLASSIC_PAIRING)
self.emit(connection.EVENT_CLASSIC_PAIRING, connection)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_classic_pairing_failure(self, connection: Connection, status: int) -> None:
connection.emit(connection.EVENT_CLASSIC_PAIRING_FAILURE, status)
self.emit(connection.EVENT_CLASSIC_PAIRING_FAILURE, connection, status)
def on_pairing_start(self, connection: Connection) -> None:
connection.emit(connection.EVENT_PAIRING_START)
self.emit(connection.EVENT_PAIRING_START, connection)
def on_pairing(
self,
+28 -121
View File
@@ -1721,15 +1721,6 @@ class CodecID(SpecableEnum):
VENDOR_SPECIFIC = 0xFF
# From Bluetooth Assigned Numbers, 2.10 PCM_Data_Format
class PcmDataFormat(SpecableEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
TWOS_COMPLEMENT = 0x02
SIGN_MAGNITUDE = 0x03
UNSIGNED = 0x04
@dataclasses.dataclass(frozen=True)
class CodingFormat:
codec_id: CodecID
@@ -1738,7 +1729,7 @@ class CodingFormat:
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
codec_id, company_id, vendor_specific_codec_id = struct.unpack_from(
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
return offset + 5, cls(
@@ -1757,61 +1748,6 @@ class CodingFormat:
)
@dataclasses.dataclass(frozen=True)
class VoiceSetting:
class AirCodingFormat(enum.IntEnum):
CVSD = 0
U_LAW = 1
A_LAW = 2
TRANSPARENT_DATA = 3
class InputSampleSize(enum.IntEnum):
SIZE_8_BITS = 0
SIZE_16_BITS = 1
class InputDataFormat(enum.IntEnum):
ONES_COMPLEMENT = 0
TWOS_COMPLEMENT = 1
SIGN_AND_MAGNITUDE = 2
UNSIGNED = 3
class InputCodingFormat(enum.IntEnum):
LINEAR = 0
U_LAW = 1
A_LAW = 2
RESERVED = 3
air_coding_format: AirCodingFormat = AirCodingFormat.CVSD
linear_pcm_bit_position: int = 0
input_sample_size: InputSampleSize = InputSampleSize.SIZE_8_BITS
input_data_format: InputDataFormat = InputDataFormat.ONES_COMPLEMENT
input_coding_format: InputCodingFormat = InputCodingFormat.LINEAR
@classmethod
def from_int(cls, value: int) -> VoiceSetting:
air_coding_format = cls.AirCodingFormat(value & 0b11)
linear_pcm_bit_position = (value >> 2) & 0b111
input_sample_size = cls.InputSampleSize((value >> 5) & 0b1)
input_data_format = cls.InputDataFormat((value >> 6) & 0b11)
input_coding_format = cls.InputCodingFormat((value >> 8) & 0b11)
return cls(
air_coding_format=air_coding_format,
linear_pcm_bit_position=linear_pcm_bit_position,
input_sample_size=input_sample_size,
input_data_format=input_data_format,
input_coding_format=input_coding_format,
)
def __int__(self) -> int:
return (
self.air_coding_format
| (self.linear_pcm_bit_position << 2)
| (self.input_sample_size << 5)
| (self.input_data_format << 6)
| (self.input_coding_format << 8)
)
# -----------------------------------------------------------------------------
class HCI_Constant:
@staticmethod
@@ -2072,7 +2008,7 @@ class HCI_Object:
)
continue
field_name, field_type = object_field
(field_name, field_type) = object_field
result += HCI_Object.serialize_field(hci_object[field_name], field_type)
return bytes(result)
@@ -2950,23 +2886,6 @@ class HCI_Read_Clock_Offset_Command(HCI_AsyncCommand):
connection_handle: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_Accept_Synchronous_Connection_Request_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.1.27 Accept Synchronous Connection Request Command
'''
bd_addr: Address = field(metadata=metadata(Address.parse_address))
transmit_bandwidth: int = field(metadata=metadata(4))
receive_bandwidth: int = field(metadata=metadata(4))
max_latency: int = field(metadata=metadata(2))
voice_setting: int = field(metadata=metadata(2))
retransmission_effort: int = field(metadata=metadata(1))
packet_type: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
@@ -3115,8 +3034,8 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_AsyncCommand):
output_coding_format: int = field(metadata=metadata(CodingFormat.parse_from_bytes))
input_coded_data_size: int = field(metadata=metadata(2))
output_coded_data_size: int = field(metadata=metadata(2))
input_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
output_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
input_pcm_data_format: int = field(metadata=metadata(1))
output_pcm_data_format: int = field(metadata=metadata(1))
input_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
output_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
input_data_path: int = field(metadata=metadata(1))
@@ -3127,6 +3046,13 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_AsyncCommand):
packet_type: int = field(metadata=metadata(2))
retransmission_effort: int = field(metadata=metadata(1))
class PcmDataFormat(SpecableEnum):
NA = 0x00
ONES_COMPLEMENT = 0x01
TWOS_COMPLEMENT = 0x02
SIGN_MAGNITUDE = 0x03
UNSIGNED = 0x04
class DataPath(SpecableEnum):
HCI = 0x00
PCM = 0x01
@@ -3173,8 +3099,8 @@ class HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(HCI_AsyncComman
output_coding_format: int = field(metadata=metadata(CodingFormat.parse_from_bytes))
input_coded_data_size: int = field(metadata=metadata(2))
output_coded_data_size: int = field(metadata=metadata(2))
input_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
output_pcm_data_format: int = field(metadata=PcmDataFormat.type_metadata(1))
input_pcm_data_format: int = field(metadata=metadata(1))
output_pcm_data_format: int = field(metadata=metadata(1))
input_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
output_pcm_sample_payload_msb_position: int = field(metadata=metadata(1))
input_data_path: int = field(metadata=metadata(1))
@@ -4018,23 +3944,6 @@ class HCI_Read_Local_OOB_Extended_Data_Command(
'''
# -----------------------------------------------------------------------------
@HCI_SyncCommand.sync_command(HCI_StatusReturnParameters)
@dataclasses.dataclass
class HCI_Configure_Data_Path_Command(HCI_SyncCommand[HCI_StatusReturnParameters]):
'''
See Bluetooth spec @ 7.3.101 Configure Data Path Command
'''
class DataPathDirection(SpecableEnum):
INPUT = 0x00
OUTPUT = 0x01
data_path_direction: DataPathDirection = field(metadata=metadata(1))
data_path_id: int = field(metadata=metadata(1))
vendor_specific_config: bytes = field(metadata=metadata('*'))
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_Read_Local_Version_Information_ReturnParameters(HCI_StatusReturnParameters):
@@ -7425,7 +7334,7 @@ class HCI_Connection_Complete_Event(HCI_Event):
status: int = field(metadata=metadata(STATUS_SPEC))
connection_handle: int = field(metadata=metadata(2))
bd_addr: Address = field(metadata=metadata(Address.parse_address))
link_type: LinkType = field(metadata=LinkType.type_metadata(1))
link_type: int = field(metadata=LinkType.type_metadata(1))
encryption_enabled: int = field(metadata=metadata(1))
@@ -7821,6 +7730,12 @@ class HCI_Synchronous_Connection_Complete_Event(HCI_Event):
SCO = 0x00
ESCO = 0x02
class AirMode(SpecableEnum):
U_LAW_LOG = 0x00
A_LAW_LOG_AIR_MORE = 0x01
CVSD = 0x02
TRANSPARENT_DATA = 0x03
status: int = field(metadata=metadata(STATUS_SPEC))
connection_handle: int = field(metadata=metadata(2))
bd_addr: Address = field(metadata=metadata(Address.parse_address))
@@ -7829,7 +7744,7 @@ class HCI_Synchronous_Connection_Complete_Event(HCI_Event):
retransmission_window: int = field(metadata=metadata(1))
rx_packet_length: int = field(metadata=metadata(2))
tx_packet_length: int = field(metadata=metadata(2))
air_mode: int = field(metadata=CodecID.type_metadata(1))
air_mode: int = field(metadata=AirMode.type_metadata(1))
# -----------------------------------------------------------------------------
@@ -8061,9 +7976,7 @@ class HCI_AclDataPacket(HCI_Packet):
bc_flag = (h >> 14) & 3
data = packet[5:]
if len(data) != data_total_length:
raise InvalidPacketError(
f'invalid packet length {len(data)} != {data_total_length}'
)
raise InvalidPacketError('invalid packet length')
return cls(
connection_handle=connection_handle,
pb_flag=pb_flag,
@@ -8096,16 +8009,10 @@ class HCI_SynchronousDataPacket(HCI_Packet):
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
'''
class Status(enum.IntEnum):
CORRECTLY_RECEIVED_DATA = 0b00
POSSIBLY_INVALID_DATA = 0b01
NO_DATA = 0b10
DATA_PARTIALLY_LOST = 0b11
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
connection_handle: int
packet_status: Status
packet_status: int
data_total_length: int
data: bytes
@@ -8114,7 +8021,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
# Read the header
h, data_total_length = struct.unpack_from('<HB', packet, 1)
connection_handle = h & 0xFFF
packet_status = cls.Status((h >> 12) & 0b11)
packet_status = (h >> 12) & 0b11
data = packet[4:]
if len(data) != data_total_length:
raise InvalidPacketError(
@@ -8138,7 +8045,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
return (
f'{color("SCO", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
f'ps={self.packet_status.name}, '
f'ps={self.packet_status}, '
f'data_total_length={self.data_total_length}, '
f'data={self.data.hex()}'
)
@@ -8166,8 +8073,8 @@ class HCI_IsoDataPacket(HCI_Packet):
def __post_init__(self) -> None:
self.ts_flag = self.time_stamp is not None
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_IsoDataPacket:
@staticmethod
def from_bytes(packet: bytes) -> HCI_IsoDataPacket:
time_stamp: int | None = None
packet_sequence_number: int | None = None
iso_sdu_length: int | None = None
@@ -8196,7 +8103,7 @@ class HCI_IsoDataPacket(HCI_Packet):
pos += 4
iso_sdu_fragment = packet[pos:]
return cls(
return HCI_IsoDataPacket(
connection_handle=connection_handle,
pb_flag=pb_flag,
ts_flag=ts_flag,
+19 -16
View File
@@ -44,7 +44,6 @@ from bumble.hci import (
CodecID,
CodingFormat,
HCI_Enhanced_Setup_Synchronous_Connection_Command,
PcmDataFormat,
)
# -----------------------------------------------------------------------------
@@ -178,7 +177,7 @@ class AgFeature(enum.IntFlag):
VOICE_RECOGNITION_TEXT = 0x2000
class AudioCodec(utils.OpenIntEnum):
class AudioCodec(enum.IntEnum):
"""
Audio Codec IDs (normative).
@@ -190,7 +189,7 @@ class AudioCodec(utils.OpenIntEnum):
LC3_SWB = 0x03 # Support for LC3-SWB audio codec
class HfIndicator(utils.OpenIntEnum):
class HfIndicator(enum.IntEnum):
"""
HF Indicators (normative).
@@ -219,7 +218,7 @@ class CallHoldOperation(enum.Enum):
)
class ResponseHoldStatus(utils.OpenIntEnum):
class ResponseHoldStatus(enum.IntEnum):
"""
Response Hold status (normative).
@@ -247,7 +246,7 @@ class AgIndicator(enum.Enum):
BATTERY_CHARGE = 'battchg'
class CallSetupAgIndicator(utils.OpenIntEnum):
class CallSetupAgIndicator(enum.IntEnum):
"""
Values for the Call Setup AG indicator (normative).
@@ -260,7 +259,7 @@ class CallSetupAgIndicator(utils.OpenIntEnum):
REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call
class CallHeldAgIndicator(utils.OpenIntEnum):
class CallHeldAgIndicator(enum.IntEnum):
"""
Values for the Call Held AG indicator (normative).
@@ -274,7 +273,7 @@ class CallHeldAgIndicator(utils.OpenIntEnum):
CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call
class CallInfoDirection(utils.OpenIntEnum):
class CallInfoDirection(enum.IntEnum):
"""
Call Info direction (normative).
@@ -285,7 +284,7 @@ class CallInfoDirection(utils.OpenIntEnum):
MOBILE_TERMINATED_CALL = 1
class CallInfoStatus(utils.OpenIntEnum):
class CallInfoStatus(enum.IntEnum):
"""
Call Info status (normative).
@@ -300,7 +299,7 @@ class CallInfoStatus(utils.OpenIntEnum):
WAITING = 5
class CallInfoMode(utils.OpenIntEnum):
class CallInfoMode(enum.IntEnum):
"""
Call Info mode (normative).
@@ -313,7 +312,7 @@ class CallInfoMode(utils.OpenIntEnum):
UNKNOWN = 9
class CallInfoMultiParty(utils.OpenIntEnum):
class CallInfoMultiParty(enum.IntEnum):
"""
Call Info Multi-Party state (normative).
@@ -400,7 +399,7 @@ class CallLineIdentification:
)
class VoiceRecognitionState(utils.OpenIntEnum):
class VoiceRecognitionState(enum.IntEnum):
"""
vrec values provided in AT+BVRA command.
@@ -413,7 +412,7 @@ class VoiceRecognitionState(utils.OpenIntEnum):
ENHANCED_READY = 2
class CmeError(utils.OpenIntEnum):
class CmeError(enum.IntEnum):
"""
CME ERROR codes (partial listed).
@@ -1607,7 +1606,7 @@ class AgProtocol(utils.EventEmitter):
# -----------------------------------------------------------------------------
class ProfileVersion(utils.OpenIntEnum):
class ProfileVersion(enum.IntEnum):
"""
Profile version (normative).
@@ -1955,8 +1954,12 @@ class EscoParameters:
output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
input_coded_data_size: int = 16
output_coded_data_size: int = 16
input_pcm_data_format: PcmDataFormat = PcmDataFormat.TWOS_COMPLEMENT
output_pcm_data_format: PcmDataFormat = PcmDataFormat.TWOS_COMPLEMENT
input_pcm_data_format: (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat
) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
output_pcm_data_format: (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat
) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
input_pcm_sample_payload_msb_position: int = 0
output_pcm_sample_payload_msb_position: int = 0
input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = (
@@ -2055,7 +2058,6 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
max_latency=0x0008,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
@@ -2071,6 +2073,7 @@ _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
max_latency=0x000D,
packet_type=(
HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
+3 -25
View File
@@ -686,8 +686,6 @@ class Host(utils.EventEmitter):
self.pending_response, timeout=response_timeout
)
return response
except asyncio.TimeoutError:
raise
except Exception:
logger.exception(color("!!! Exception while sending command:", "red"))
raise
@@ -866,7 +864,7 @@ class Host(utils.EventEmitter):
self.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=connection_handle,
packet_status=hci.HCI_SynchronousDataPacket.Status.CORRECTLY_RECEIVED_DATA,
packet_status=0,
data_total_length=len(sdu),
data=sdu,
)
@@ -1178,28 +1176,11 @@ class Host(utils.EventEmitter):
def on_hci_connection_complete_event(
self, event: hci.HCI_Connection_Complete_Event
):
if event.link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO:
# Pass this on to the synchronous connection handler
forwarded_event = hci.HCI_Synchronous_Connection_Complete_Event(
status=event.status,
connection_handle=event.connection_handle,
bd_addr=event.bd_addr,
link_type=event.link_type,
transmission_interval=0,
retransmission_window=0,
rx_packet_length=0,
tx_packet_length=0,
air_mode=0,
)
self.on_hci_synchronous_connection_complete_event(forwarded_event)
return
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### BR/EDR ACL CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.bd_addr} '
f'{event.link_type.name}'
f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.bd_addr}'
)
connection = self.connections.get(event.connection_handle)
@@ -1599,9 +1580,6 @@ class Host(utils.EventEmitter):
event.bd_addr,
event.connection_handle,
event.link_type,
event.rx_packet_length,
event.tx_packet_length,
event.air_mode,
)
else:
logger.debug(f'### SCO CONNECTION FAILED: {event.status}')
+1 -1
View File
@@ -110,7 +110,7 @@ RFCOMM_DEFAULT_L2CAP_MTU = 2048
RFCOMM_DEFAULT_INITIAL_CREDITS = 7
RFCOMM_DEFAULT_MAX_CREDITS = 32
RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 1000
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
+353 -653
View File
File diff suppressed because it is too large Load Diff
+111 -242
View File
@@ -20,119 +20,17 @@ import contextlib
import functools
import json
import sys
import wave
import websockets.asyncio.server
import bumble.logging
from bumble import hci, hfp, rfcomm
from bumble.device import Connection, Device, ScoLink
from bumble.device import Connection, Device
from bumble.hfp import HfProtocol
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
ws: websockets.asyncio.server.ServerConnection | None = None
hf_protocol: HfProtocol | None = None
input_wav: wave.Wave_read | None = None
output_wav: wave.Wave_write | None = None
# -----------------------------------------------------------------------------
def on_audio_packet(packet: hci.HCI_SynchronousDataPacket) -> None:
if (
packet.packet_status
!= hci.HCI_SynchronousDataPacket.Status.CORRECTLY_RECEIVED_DATA
):
print('!!! discarding packet with status ', packet.packet_status.name)
return
frame_count = len(packet.data) // 2
print(f">>> received {frame_count} PCM samples")
if output_wav:
# Save the PCM audio to the output
output_wav.writeframes(packet.data)
if input_wav and hf_protocol:
# Send PCM audio from the input, same amount as what was received
while not (pcm_data := input_wav.readframes(frame_count)):
input_wav.setpos(0) # Loop
print(f">>> sending {frame_count} PCM samples")
hf_protocol.dlc.multiplexer.l2cap_channel.connection.device.host.send_sco_sdu(
connection_handle=packet.connection_handle,
sdu=pcm_data,
)
# -----------------------------------------------------------------------------
def on_sco_connection(link: ScoLink) -> None:
print('### SCO connection established:', link)
if link.air_mode == hci.CodecID.TRANSPARENT:
print("@@@ The controller does not encode/decode voice")
return
link.sink = on_audio_packet
# -----------------------------------------------------------------------------
def on_sco_request(
link_type: int, connection: Connection, protocol: HfProtocol
) -> None:
if link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO:
esco_parameters = hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.SCO_CVSD_D1]
elif protocol.active_codec == hfp.AudioCodec.MSBC:
esco_parameters = hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_MSBC_T2]
elif protocol.active_codec == hfp.AudioCodec.CVSD:
esco_parameters = hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S4]
else:
raise RuntimeError("unknown active codec")
if connection.device.host.supports_command(
hci.HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND
):
connection.cancel_on_disconnection(
connection.device.send_async_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address, **esco_parameters.asdict()
)
)
)
elif connection.device.host.supports_command(
hci.HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND
):
connection.cancel_on_disconnection(
connection.device.send_async_command(
hci.HCI_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address,
transmit_bandwidth=esco_parameters.transmit_bandwidth,
receive_bandwidth=esco_parameters.receive_bandwidth,
max_latency=esco_parameters.max_latency,
voice_setting=int(
hci.VoiceSetting(
input_sample_size=hci.VoiceSetting.InputSampleSize.SIZE_16_BITS,
input_data_format=hci.VoiceSetting.InputDataFormat.TWOS_COMPLEMENT,
)
),
retransmission_effort=esco_parameters.retransmission_effort,
packet_type=esco_parameters.packet_type,
)
)
)
else:
print('!!! no supported command for SCO connection request')
return
global output_wav
if output_wav:
output_wav.setnchannels(1)
output_wav.setsampwidth(2)
match protocol.active_codec:
case hfp.AudioCodec.CVSD:
output_wav.setframerate(8000)
case hfp.AudioCodec.MSBC:
output_wav.setframerate(16000)
connection.on('sco_connection', on_sco_connection)
# -----------------------------------------------------------------------------
@@ -142,163 +40,134 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
hf_protocol = HfProtocol(dlc, configuration)
asyncio.create_task(hf_protocol.run())
connection = dlc.multiplexer.l2cap_channel.connection
handler = functools.partial(
on_sco_request,
connection=connection,
protocol=hf_protocol,
)
connection.on('sco_request', handler)
def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol):
if connection == protocol.dlc.multiplexer.l2cap_channel.connection:
if link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.SCO_CVSD_D1
]
elif protocol.active_codec == hfp.AudioCodec.MSBC:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_MSBC_T2
]
elif protocol.active_codec == hfp.AudioCodec.CVSD:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S4
]
else:
raise RuntimeError("unknown active codec")
connection.cancel_on_disconnection(
connection.device.send_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address, **esco_parameters.asdict()
)
)
)
handler = functools.partial(on_sco_request, protocol=hf_protocol)
dlc.multiplexer.l2cap_channel.connection.device.on('sco_request', handler)
dlc.multiplexer.l2cap_channel.once(
'close',
lambda: connection.remove_listener('sco_request', handler),
lambda: dlc.multiplexer.l2cap_channel.connection.device.remove_listener(
'sco_request', handler
),
)
def on_ag_indicator(indicator):
global ws
if ws:
asyncio.create_task(ws.send(str(indicator)))
hf_protocol.on('ag_indicator', on_ag_indicator)
hf_protocol.on('codec_negotiation', on_codec_negotiation)
# -----------------------------------------------------------------------------
def on_ag_indicator(indicator):
global ws
if ws:
asyncio.create_task(ws.send(str(indicator)))
# -----------------------------------------------------------------------------
def on_codec_negotiation(codec: hfp.AudioCodec):
print(f'### Negotiated codec: {codec.name}')
# -----------------------------------------------------------------------------
async def run(device: Device, codec: str | None) -> None:
if codec is None:
supported_audio_codecs = [hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC]
else:
if codec == 'cvsd':
supported_audio_codecs = [hfp.AudioCodec.CVSD]
elif codec == 'msbc':
supported_audio_codecs = [hfp.AudioCodec.MSBC]
else:
print('Unknown codec: ', codec)
return
# Hands-Free profile configuration.
# TODO: load configuration from file.
configuration = hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.THREE_WAY_CALLING,
hfp.HfFeature.REMOTE_VOLUME_CONTROL,
hfp.HfFeature.ENHANCED_CALL_STATUS,
hfp.HfFeature.ENHANCED_CALL_CONTROL,
hfp.HfFeature.CODEC_NEGOTIATION,
hfp.HfFeature.HF_INDICATORS,
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
],
supported_hf_indicators=[
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_audio_codecs=supported_audio_codecs,
)
# Create and register a server
rfcomm_server = rfcomm.Server(device)
# Listen for incoming DLC connections
channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration))
print(f'### Listening for connection on channel {channel_number}')
# Advertise the HFP RFComm channel in the SDP
device.sdp_service_records = {
0x00010001: hfp.make_hf_sdp_records(0x00010001, channel_number, configuration)
}
# Let's go!
await device.power_on()
# Start being discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
# Start the UI websocket server to offer a few buttons and input boxes
async def serve(websocket: websockets.asyncio.server.ServerConnection):
global ws
ws = websocket
async for message in websocket:
with contextlib.suppress(websockets.exceptions.ConnectionClosedOK):
print('Received: ', str(message))
parsed = json.loads(message)
message_type = parsed['type']
if message_type == 'at_command':
if hf_protocol is not None:
response = str(
await hf_protocol.execute_command(
parsed['command'],
response_type=hfp.AtResponseType.MULTIPLE,
)
)
await websocket.send(response)
elif message_type == 'query_call':
if hf_protocol:
response = str(await hf_protocol.query_current_calls())
await websocket.send(response)
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await asyncio.get_running_loop().create_future() # run forever
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_hfp_handsfree.py <device-config> <transport-spec> '
'[codec] [input] [output]'
)
print('example: run_hfp_handsfree.py classic2.json usb:0')
print('Usage: run_classic_hfp.py <device-config> <transport-spec>')
print('example: run_classic_hfp.py classic2.json usb:04b4:f901')
return
device_config = sys.argv[1]
transport_spec = sys.argv[2]
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
codec: str | None = None
if len(sys.argv) >= 4:
codec = sys.argv[3]
# Hands-Free profile configuration.
# TODO: load configuration from file.
configuration = hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.THREE_WAY_CALLING,
hfp.HfFeature.REMOTE_VOLUME_CONTROL,
hfp.HfFeature.ENHANCED_CALL_STATUS,
hfp.HfFeature.ENHANCED_CALL_CONTROL,
hfp.HfFeature.CODEC_NEGOTIATION,
hfp.HfFeature.HF_INDICATORS,
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
],
supported_hf_indicators=[
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_audio_codecs=[
hfp.AudioCodec.CVSD,
hfp.AudioCodec.MSBC,
],
)
input_file_name: str | None = None
if len(sys.argv) >= 5:
input_file_name = sys.argv[4]
# Create a device
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
output_file_name: str | None = None
if len(sys.argv) >= 6:
output_file_name = sys.argv[5]
# Create and register a server
rfcomm_server = rfcomm.Server(device)
global input_wav, output_wav
input_cm: contextlib.AbstractContextManager[wave.Wave_read | None] = (
wave.open(input_file_name, "rb")
if input_file_name
else contextlib.nullcontext(None)
)
output_cm: contextlib.AbstractContextManager[wave.Wave_write | None] = (
wave.open(output_file_name, "wb")
if output_file_name
else contextlib.nullcontext(None)
)
with input_cm as input_wav, output_cm as output_wav:
if input_wav and input_wav.getnchannels() != 1:
print("Mono input required")
return
if input_wav and input_wav.getsampwidth() != 2:
print("16-bit input required")
return
# Listen for incoming DLC connections
channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration))
print(f'### Listening for connection on channel {channel_number}')
async with await open_transport(transport_spec) as transport:
device = Device.from_config_file_with_hci(
device_config, transport.source, transport.sink
# Advertise the HFP RFComm channel in the SDP
device.sdp_service_records = {
0x00010001: hfp.make_hf_sdp_records(
0x00010001, channel_number, configuration
)
device.classic_enabled = True
await run(device, codec)
}
# Let's go!
await device.power_on()
# Start being discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
# Start the UI websocket server to offer a few buttons and input boxes
async def serve(websocket: websockets.asyncio.server.ServerConnection):
global ws
ws = websocket
async for message in websocket:
with contextlib.suppress(websockets.exceptions.ConnectionClosedOK):
print('Received: ', str(message))
parsed = json.loads(message)
message_type = parsed['type']
if message_type == 'at_command':
if hf_protocol is not None:
response = str(
await hf_protocol.execute_command(
parsed['command'],
response_type=hfp.AtResponseType.MULTIPLE,
)
)
await websocket.send(response)
elif message_type == 'query_call':
if hf_protocol:
response = str(await hf_protocol.query_current_calls())
await websocket.send(response)
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
-2
View File
@@ -170,9 +170,7 @@ def format_code(ctx, check=False, diff=False):
@task
def check_types(ctx):
checklist = ["apps", "bumble", "examples", "tests", "tasks.py"]
print(">>> Running the type checker...")
try:
print("+++ Checking with mypy...")
ctx.run(f"mypy {' '.join(checklist)}")
except UnexpectedExit as exc:
print("Please check your code against the mypy messages.")