diff --git a/bumble/hci.py b/bumble/hci.py index 45cd7eb..bf58ee0 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -21,7 +21,7 @@ import enum import functools import logging import struct -from typing import Any, Dict, Callable, Optional, Type, Union +from typing import Any, Dict, Callable, Optional, Type, Union, List from .colors import color from .core import ( @@ -149,6 +149,7 @@ HCI_COMMAND_PACKET = 0x01 HCI_ACL_DATA_PACKET = 0x02 HCI_SYNCHRONOUS_DATA_PACKET = 0x03 HCI_EVENT_PACKET = 0x04 +HCI_ISO_DATA_PACKET = 0x05 # HCI Event Codes HCI_INQUIRY_COMPLETE_EVENT = 0x01 @@ -4386,6 +4387,158 @@ class HCI_LE_Set_Host_Feature_Command(HCI_Command): ''' +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('cig_id', 1), + ('sdu_interval_c_to_p', 3), + ('sdu_interval_p_to_c', 3), + ('worst_case_sca', 1), + ('packing', 1), + ('framing', 1), + ('max_transport_latency_c_to_p', 2), + ('max_transport_latency_p_to_c', 2), + [ + ('cis_id', 1), + ('max_sdu_c_to_p', 2), + ('max_sdu_p_to_c', 2), + ('phy_c_to_p', 1), + ('phy_p_to_c', 1), + ('rtn_c_to_p', 1), + ('rtn_p_to_c', 1), + ], + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('cig_id', 1), + [('connection_handle', 2)], + ], +) +class HCI_LE_Set_CIG_Parameters_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.97 LE Set CIG Parameters Command + ''' + + cig_id: int + sdu_interval_c_to_p: int + sdu_interval_p_to_c: int + worst_case_sca: int + packing: int + framing: int + max_transport_latency_c_to_p: int + max_transport_latency_p_to_c: int + cis_id: List[int] + max_sdu_c_to_p: List[int] + max_sdu_p_to_c: List[int] + phy_c_to_p: List[int] + phy_p_to_c: List[int] + rtn_c_to_p: List[int] + rtn_p_to_c: List[int] + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + [ + ('cis_connection_handle', 2), + ('acl_connection_handle', 2), + ], + ], +) +class HCI_LE_Create_CIS_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.99 LE Create CIS command + ''' + + cis_connection_handle: List[int] + acl_connection_handle: List[int] + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[('cig_id', 1)], + return_parameters_fields=[('status', STATUS_SPEC), ('cig_id', 1)], +) +class HCI_LE_Remove_CIG_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.100 LE Remove CIG command + ''' + + cig_id: int + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[('connection_handle', 2)], +) +class HCI_LE_Accept_CIS_Request_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.101 LE Accept CIS Request command + ''' + + connection_handle: int + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[('connection_handle', 2)], +) +class HCI_LE_Reject_CIS_Request_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.102 LE Reject CIS Request command + ''' + + connection_handle: int + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('connection_handle', 2), + ('data_path_direction', 1), + ('data_path_id', 1), + ('codec_id', 5), + ('controller_delay', 3), + ('codec_configuration', '*'), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('connection_handle', 2), + ], +) +class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.109 LE Setup ISO Data Path command + ''' + + connection_handle: int + data_path_direction: int + data_path_id: int + codec_id: int + controller_delay: int + codec_configuration: int + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('connection_handle', 2), + ('data_path_direction', 1), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('connection_handle', 2), + ], +) +class HCI_LE_Remove_ISO_Data_Path_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.110 LE Remove ISO Data Path command + ''' + + connection_handle: int + data_path_direction: int + + # ----------------------------------------------------------------------------- # HCI Events # ----------------------------------------------------------------------------- @@ -5005,6 +5158,48 @@ class HCI_LE_Channel_Selection_Algorithm_Event(HCI_LE_Meta_Event): ''' +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('status', STATUS_SPEC), + ('connection_handle', 2), + ('cig_sync_delay', 3), + ('cis_sync_delay', 3), + ('transport_latency_c_to_p', 3), + ('transport_latency_p_to_c', 3), + ('phy_c_to_p', 1), + ('phy_p_to_c', 1), + ('nse', 1), + ('bn_c_to_p', 1), + ('bn_p_to_c', 1), + ('ft_c_to_p', 1), + ('ft_p_to_c', 1), + ('max_pdu_c_to_p', 2), + ('max_pdu_p_to_c', 2), + ('iso_interval', 2), + ] +) +class HCI_LE_CIS_Established_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.25 LE CIS Established Event + ''' + + +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('acl_connection_handle', 2), + ('cis_connection_handle', 2), + ('cig_id', 1), + ('cis_id', 1), + ] +) +class HCI_LE_CIS_Request_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.26 LE CIS Request Event + ''' + + # ----------------------------------------------------------------------------- @HCI_Event.event([('status', STATUS_SPEC)]) class HCI_Inquiry_Complete_Event(HCI_Event): @@ -5815,18 +6010,17 @@ class HCI_SynchronousDataPacket(HCI_Packet): h, data_total_length = struct.unpack_from('> 12) & 0b11 - rfu = (h >> 14) & 0b11 data = packet[4:] if len(data) != data_total_length: raise ValueError( f'invalid packet length {len(data)} != {data_total_length}' ) return HCI_SynchronousDataPacket( - connection_handle, packet_status, rfu, data_total_length, data + connection_handle, packet_status, data_total_length, data ) def to_bytes(self) -> bytes: - h = (self.packet_status << 12) | (self.rfu << 14) | self.connection_handle + h = (self.packet_status << 12) | self.connection_handle return ( struct.pack(' None: self.connection_handle = connection_handle self.packet_status = packet_status - self.rfu = rfu self.data_total_length = data_total_length self.data = data @@ -5853,12 +6045,119 @@ class HCI_SynchronousDataPacket(HCI_Packet): return ( f'{color("SCO", "blue")}: ' f'handle=0x{self.connection_handle:04x}, ' - f'ps={self.packet_status}, rfu={self.rfu}, ' + f'ps={self.packet_status}, ' f'data_total_length={self.data_total_length}, ' f'data={self.data.hex()}' ) +# ----------------------------------------------------------------------------- +class HCI_IsoDataPacket(HCI_Packet): + ''' + See Bluetooth spec @ 5.4.5 HCI ISO Data Packets + ''' + + hci_packet_type = HCI_ISO_DATA_PACKET + + @staticmethod + def from_bytes(packet: bytes) -> HCI_IsoDataPacket: + time_stamp: Optional[int] = None + packet_sequence_number: Optional[int] = None + iso_sdu_length: Optional[int] = None + packet_status_flag: Optional[int] = None + + pos = 1 + pdu_info, data_total_length = struct.unpack_from('> 12) & 0b11 + ts_flag = (pdu_info >> 14) & 0b01 + pos += 4 + + # pb_flag in (0b00, 0b10) but faster + should_include_sdu_info = not (pb_flag & 0b01) + + if ts_flag: + if not should_include_sdu_info: + logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}') + time_stamp, _ = struct.unpack_from('> 14 + pos += 4 + + iso_sdu_fragment = packet[pos:] + return HCI_IsoDataPacket( + connection_handle=connection_handle, + pb_flag=pb_flag, + ts_flag=ts_flag, + data_total_length=data_total_length, + time_stamp=time_stamp, + packet_sequence_number=packet_sequence_number, + iso_sdu_length=iso_sdu_length, + packet_status_flag=packet_status_flag, + iso_sdu_fragment=iso_sdu_fragment, + ) + + def __init__( + self, + connection_handle: int, + pb_flag: int, + ts_flag: int, + data_total_length: int, + time_stamp: Optional[int], + packet_sequence_number: Optional[int], + iso_sdu_length: Optional[int], + packet_status_flag: Optional[int], + iso_sdu_fragment: bytes, + ) -> None: + self.connection_handle = connection_handle + self.pb_flag = pb_flag + self.ts_flag = ts_flag + self.data_total_length = data_total_length + self.time_stamp = time_stamp + self.packet_sequence_number = packet_sequence_number + self.iso_sdu_length = iso_sdu_length + self.packet_status_flag = packet_status_flag + self.iso_sdu_fragment = iso_sdu_fragment + + def __bytes__(self) -> bytes: + return self.to_bytes() + + def to_bytes(self) -> bytes: + fmt = ' str: + return ( + f'{color("ISO", "blue")}: ' + f'handle=0x{self.connection_handle:04x}, ' + f'ps={self.packet_status_flag}, ' + f'data_total_length={self.data_total_length}, ' + f'sdu={self.iso_sdu_fragment.hex()}' + ) + + # ----------------------------------------------------------------------------- class HCI_AclDataPacketAssembler: current_data: Optional[bytes]