add unit test

This commit is contained in:
Gilles Boccon-Gibod
2026-06-07 12:30:38 +02:00
parent b6a21fa3c6
commit 65c4f9a698
2 changed files with 66 additions and 2 deletions
+2 -1
View File
@@ -413,7 +413,8 @@ class PacketSplitter:
if (bytes_needed := self.header_size - len(self.packet)) > 0:
self.packet += data[:bytes_needed]
data = data[bytes_needed:]
continue
if len(self.packet) < self.header_size:
continue
packet_length = self.header_size + int.from_bytes(
self.packet[self.length_offset : self.length_offset + self.length_size],
+64 -1
View File
@@ -24,7 +24,7 @@ import sys
import pytest
from bumble import controller, device, hci, link, transport
from bumble.transport import common
from bumble.transport import common, usb
# -----------------------------------------------------------------------------
@@ -252,6 +252,69 @@ async def test_open_transport_with_metadata(spec):
await controller_transport.close()
# -----------------------------------------------------------------------------
def test_packet_splitter_complete():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
splitter.feed(packet)
assert emitted == [packet]
def test_packet_splitter_chunks():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
splitter.feed(packet[:4])
assert emitted == []
splitter.feed(packet[4:])
assert emitted == [packet]
def test_packet_splitter_multiple():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66])
splitter.feed(packet1 + packet2)
assert emitted == [packet1, packet2]
def test_packet_splitter_partial():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66])
splitter.feed(packet1 + packet2[:4])
assert emitted == [packet1]
splitter.feed(packet2[4:])
assert emitted == [packet1, packet2]
def test_packet_splitter_empty_payload():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x00, 0x00])
splitter.feed(packet)
assert emitted == [packet]
def test_sco_packet_splitter():
emitted = []
splitter = usb.ScoPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x03, 0x11, 0x22, 0x33])
splitter.feed(packet)
assert emitted == [packet]
def test_event_packet_splitter():
emitted = []
splitter = usb.EventPacketSplitter(emitted.append)
packet = bytes([0x04, 0x02, 0x11, 0x22])
splitter.feed(packet)
assert emitted == [packet]
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_parser()