diff --git a/.vscode/settings.json b/.vscode/settings.json index 4011e64..b564a38 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -23,6 +23,7 @@ "CONNECTIONLESS", "csip", "csrcs", + "CVSD", "datagram", "DATALINK", "delayreport", @@ -40,6 +41,7 @@ "libc", "libusb", "MITM", + "MSBC", "NDIS", "netsim", "NONBLOCK", diff --git a/bumble/hci.py b/bumble/hci.py index f978644..376a940 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -17,6 +17,7 @@ # ----------------------------------------------------------------------------- from __future__ import annotations import collections +import dataclasses import enum import functools import logging @@ -1382,6 +1383,45 @@ HCI_LE_SUPPORTED_FEATURES_NAMES = { STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)} +class CodecID(enum.IntEnum): + # fmt: off + U_LOG = 0x00 + A_LOG = 0x01 + CVSD = 0x02 + TRANSPARENT = 0x03 + LINEAR_PCM = 0x04 + MSBC = 0x05 + LC3 = 0x06 + G729A = 0x07 + VENDOR_SPECIFIC = 0xFF + + +@dataclasses.dataclass(frozen=True) +class CodingFormat: + codec_id: CodecID + company_id: int = 0 + vendor_specific_codec_id: int = 0 + + @classmethod + def parse_from_bytes(cls, data: bytes, offset: int): + (codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from( + ' bytes: + return struct.pack( + ' bytes: + return self.to_bytes() + + # ----------------------------------------------------------------------------- class HCI_Constant: @staticmethod @@ -1888,6 +1928,7 @@ Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS Address.ANY = Address(b"\x00\x00\x00\x00\x00\x00", Address.PUBLIC_DEVICE_ADDRESS) Address.ANY_RANDOM = Address(b"\x00\x00\x00\x00\x00\x00", Address.RANDOM_DEVICE_ADDRESS) + # ----------------------------------------------------------------------------- class OwnAddressType: PUBLIC = 0 @@ -2445,14 +2486,14 @@ class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command): ('connection_handle', 2), ('transmit_bandwidth', 4), ('receive_bandwidth', 4), - ('transmit_coding_format', 5), - ('receive_coding_format', 5), + ('transmit_coding_format', CodingFormat.parse_from_bytes), + ('receive_coding_format', CodingFormat.parse_from_bytes), ('transmit_codec_frame_size', 2), ('receive_codec_frame_size', 2), ('input_bandwidth', 4), ('output_bandwidth', 4), - ('input_coding_format', 5), - ('output_coding_format', 5), + ('input_coding_format', CodingFormat.parse_from_bytes), + ('output_coding_format', CodingFormat.parse_from_bytes), ('input_coded_data_size', 2), ('output_coded_data_size', 2), ('input_pcm_data_format', 1), @@ -2473,22 +2514,6 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command): See Bluetooth spec @ 7.1.45 Enhanced Setup Synchronous Connection Command ''' - class CodingFormat(enum.IntEnum): - U_LOG = 0x00 - A_LOG = 0x01 - CVSD = 0x02 - TRANSPARENT = 0x03 - PCM = 0x04 - MSBC = 0x05 - LC3 = 0x06 - G729A = 0x07 - - def to_bytes(self): - return self.value.to_bytes(5, 'little') - - def __bytes__(self): - return self.to_bytes() - class PcmDataFormat(enum.IntEnum): NA = 0x00 ONES_COMPLEMENT = 0x01 @@ -2525,14 +2550,14 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command): ('bd_addr', Address.parse_address), ('transmit_bandwidth', 4), ('receive_bandwidth', 4), - ('transmit_coding_format', 5), - ('receive_coding_format', 5), + ('transmit_coding_format', CodingFormat.parse_from_bytes), + ('receive_coding_format', CodingFormat.parse_from_bytes), ('transmit_codec_frame_size', 2), ('receive_codec_frame_size', 2), ('input_bandwidth', 4), ('output_bandwidth', 4), - ('input_coding_format', 5), - ('output_coding_format', 5), + ('input_coding_format', CodingFormat.parse_from_bytes), + ('output_coding_format', CodingFormat.parse_from_bytes), ('input_coded_data_size', 2), ('output_coded_data_size', 2), ('input_pcm_data_format', 1), @@ -4471,7 +4496,7 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command): ('connection_handle', 2), ('data_path_direction', 1), ('data_path_id', 1), - ('codec_id', 5), + ('codec_id', CodingFormat.parse_from_bytes), ('controller_delay', 3), ('codec_configuration', '*'), ], @@ -4488,7 +4513,7 @@ class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command): connection_handle: int data_path_direction: int data_path_id: int - codec_id: int + codec_id: CodingFormat controller_delay: int codec_configuration: int diff --git a/bumble/hfp.py b/bumble/hfp.py index a655b8f..2079e32 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -22,7 +22,7 @@ import dataclasses import enum import traceback import warnings -from typing import Dict, List, Union, Set, TYPE_CHECKING +from typing import Dict, List, Union, Set, Any, TYPE_CHECKING from . import at from . import rfcomm @@ -35,7 +35,11 @@ from bumble.core import ( BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, ) -from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command +from bumble.hci import ( + HCI_Enhanced_Setup_Synchronous_Connection_Command, + CodingFormat, + CodecID, +) from bumble.sdp import ( DataElement, ServiceAttribute, @@ -66,6 +70,7 @@ class HfpProtocolError(ProtocolError): # Protocol Support # ----------------------------------------------------------------------------- + # ----------------------------------------------------------------------------- class HfpProtocol: dlc: rfcomm.DLC @@ -842,19 +847,15 @@ class DefaultCodecParameters(enum.IntEnum): @dataclasses.dataclass class EscoParameters: # Codec specific - transmit_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat - receive_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat + transmit_coding_format: CodingFormat + receive_coding_format: CodingFormat packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort max_latency: int # Common - input_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = ( - HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.PCM - ) - output_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = ( - HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.PCM - ) + input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM) + output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM) input_coded_data_size: int = 16 output_coded_data_size: int = 16 input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = ( @@ -880,26 +881,31 @@ class EscoParameters: transmit_codec_frame_size: int = 60 receive_codec_frame_size: int = 60 + def asdict(self) -> Dict[str, Any]: + # dataclasses.asdict() will recursively deep-copy the entire object, + # which is expensive and breaks CodingFormat object, so let it simply copy here. + return self.__dict__ + _ESCO_PARAMETERS_CVSD_D0 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, + transmit_coding_format=CodingFormat(CodecID.CVSD), + receive_coding_format=CodingFormat(CodecID.CVSD), max_latency=0xFFFF, packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION, ) _ESCO_PARAMETERS_CVSD_D1 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, + transmit_coding_format=CodingFormat(CodecID.CVSD), + receive_coding_format=CodingFormat(CodecID.CVSD), max_latency=0xFFFF, packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION, ) _ESCO_PARAMETERS_CVSD_S1 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, + transmit_coding_format=CodingFormat(CodecID.CVSD), + receive_coding_format=CodingFormat(CodecID.CVSD), max_latency=0x0007, packet_type=( HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 @@ -912,8 +918,8 @@ _ESCO_PARAMETERS_CVSD_S1 = EscoParameters( ) _ESCO_PARAMETERS_CVSD_S2 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, + transmit_coding_format=CodingFormat(CodecID.CVSD), + receive_coding_format=CodingFormat(CodecID.CVSD), max_latency=0x0007, packet_type=( HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 @@ -925,8 +931,8 @@ _ESCO_PARAMETERS_CVSD_S2 = EscoParameters( ) _ESCO_PARAMETERS_CVSD_S3 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, + transmit_coding_format=CodingFormat(CodecID.CVSD), + receive_coding_format=CodingFormat(CodecID.CVSD), max_latency=0x000A, packet_type=( HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 @@ -938,8 +944,8 @@ _ESCO_PARAMETERS_CVSD_S3 = EscoParameters( ) _ESCO_PARAMETERS_CVSD_S4 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.CVSD, + transmit_coding_format=CodingFormat(CodecID.CVSD), + receive_coding_format=CodingFormat(CodecID.CVSD), max_latency=0x000C, packet_type=( HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 @@ -951,8 +957,8 @@ _ESCO_PARAMETERS_CVSD_S4 = EscoParameters( ) _ESCO_PARAMETERS_MSBC_T1 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, + transmit_coding_format=CodingFormat(CodecID.MSBC), + receive_coding_format=CodingFormat(CodecID.MSBC), max_latency=0x0008, packet_type=( HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 @@ -966,8 +972,8 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters( ) _ESCO_PARAMETERS_MSBC_T2 = EscoParameters( - transmit_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, - receive_coding_format=HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.MSBC, + transmit_coding_format=CodingFormat(CodecID.MSBC), + receive_coding_format=CodingFormat(CodecID.MSBC), max_latency=0x000D, packet_type=( HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 diff --git a/examples/run_esco_connection.py b/examples/run_esco_connection.py index a136360..0ad34c4 100644 --- a/examples/run_esco_connection.py +++ b/examples/run_esco_connection.py @@ -72,7 +72,7 @@ async def main() -> None: await devices[0].send_command( HCI_Enhanced_Setup_Synchronous_Connection_Command( connection_handle=connections[0].handle, - **dataclasses.asdict(ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3]) + **ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(), # type: ignore[call-args] ) ) diff --git a/tests/hci_test.py b/tests/hci_test.py index c648592..12f611f 100644 --- a/tests/hci_test.py +++ b/tests/hci_test.py @@ -24,6 +24,8 @@ from bumble.hci import ( HCI_RESET_COMMAND, HCI_SUCCESS, Address, + CodingFormat, + CodecID, HCI_Command, HCI_Command_Complete_Event, HCI_Command_Status_Event, @@ -442,6 +444,19 @@ def test_HCI_LE_Set_Extended_Advertising_Enable_Command(): basic_check(command) +# ----------------------------------------------------------------------------- +def test_HCI_LE_Setup_ISO_Data_Path_Command(): + command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000')) + + assert command.connection_handle == 0x0060 + assert command.data_path_direction == 0x00 + assert command.data_path_id == 0x01 + assert command.codec_id == CodingFormat(CodecID.TRANSPARENT) + assert command.controller_delay == 0 + + basic_check(command) + + # ----------------------------------------------------------------------------- def test_address(): a = Address('C4:F2:17:1A:1D:BB')