forked from auracaster/bumble_mirror
Fix ISO packet issues
This commit is contained in:
@@ -1986,6 +1986,9 @@ class HCI_Packet:
|
||||
if packet_type == HCI_EVENT_PACKET:
|
||||
return HCI_Event.from_bytes(packet)
|
||||
|
||||
if packet_type == HCI_ISO_DATA_PACKET:
|
||||
return HCI_IsoDataPacket.from_bytes(packet)
|
||||
|
||||
return HCI_CustomPacket(packet)
|
||||
|
||||
def __init__(self, name):
|
||||
@@ -6098,7 +6101,7 @@ class HCI_IsoDataPacket(HCI_Packet):
|
||||
if ts_flag:
|
||||
if not should_include_sdu_info:
|
||||
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}')
|
||||
time_stamp, _ = struct.unpack_from('<I', packet, pos)
|
||||
time_stamp, *_ = struct.unpack_from('<I', packet, pos)
|
||||
pos += 4
|
||||
|
||||
if should_include_sdu_info:
|
||||
@@ -6165,7 +6168,7 @@ class HCI_IsoDataPacket(HCI_Packet):
|
||||
self.packet_sequence_number,
|
||||
self.iso_sdu_length | self.packet_status_flag << 14,
|
||||
]
|
||||
return struct.pack(fmt, args) + self.iso_sdu_fragment
|
||||
return struct.pack(fmt, *args) + self.iso_sdu_fragment
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (
|
||||
|
||||
@@ -42,6 +42,7 @@ HCI_PACKET_INFO: Dict[int, Tuple[int, int, str]] = {
|
||||
hci.HCI_ACL_DATA_PACKET: (2, 2, 'H'),
|
||||
hci.HCI_SYNCHRONOUS_DATA_PACKET: (1, 2, 'B'),
|
||||
hci.HCI_EVENT_PACKET: (1, 1, 'B'),
|
||||
hci.HCI_ISO_DATA_PACKET: (2, 2, 'H'),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ from bumble.hci import (
|
||||
HCI_CustomPacket,
|
||||
HCI_Disconnect_Command,
|
||||
HCI_Event,
|
||||
HCI_IsoDataPacket,
|
||||
HCI_LE_Add_Device_To_Filter_Accept_List_Command,
|
||||
HCI_LE_Advertising_Report_Event,
|
||||
HCI_LE_Channel_Selection_Algorithm_Event,
|
||||
@@ -486,6 +487,29 @@ def test_custom():
|
||||
assert packet.payload == data
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_iso_data_packet():
|
||||
data = bytes.fromhex(
|
||||
'05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697'
|
||||
'52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2'
|
||||
)
|
||||
packet = HCI_IsoDataPacket.from_bytes(data)
|
||||
assert packet.connection_handle == 0x0061
|
||||
assert packet.packet_status_flag == 0
|
||||
assert packet.pb_flag == 0x02
|
||||
assert packet.ts_flag == 0x01
|
||||
assert packet.data_total_length == 68
|
||||
assert packet.time_stamp == 2716911914
|
||||
assert packet.packet_sequence_number == 147
|
||||
assert packet.iso_sdu_length == 60
|
||||
assert packet.iso_sdu_fragment == bytes.fromhex(
|
||||
'e83b477b00eba8d41dc018bf1a980f0290afe1e7c3765209669752b6a535a8df61e22931ef5a3'
|
||||
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
|
||||
)
|
||||
|
||||
assert packet.to_bytes() == data
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def run_test_events():
|
||||
test_HCI_Event()
|
||||
@@ -524,6 +548,7 @@ def run_test_commands():
|
||||
test_HCI_LE_Set_Default_PHY_Command()
|
||||
test_HCI_LE_Set_Extended_Scan_Parameters_Command()
|
||||
test_HCI_LE_Set_Extended_Advertising_Enable_Command()
|
||||
test_HCI_LE_Setup_ISO_Data_Path_Command()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -532,3 +557,4 @@ if __name__ == '__main__':
|
||||
run_test_commands()
|
||||
test_address()
|
||||
test_custom()
|
||||
test_iso_data_packet()
|
||||
|
||||
Reference in New Issue
Block a user