From 65c4f9a698f10c518d88a799c2d46100600eb604 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Sun, 7 Jun 2026 12:30:38 +0200 Subject: [PATCH] add unit test --- bumble/transport/usb.py | 3 +- tests/transport_test.py | 65 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index bb787d9..dc058d1 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -413,7 +413,8 @@ class PacketSplitter: if (bytes_needed := self.header_size - len(self.packet)) > 0: self.packet += data[:bytes_needed] data = data[bytes_needed:] - continue + if len(self.packet) < self.header_size: + continue packet_length = self.header_size + int.from_bytes( self.packet[self.length_offset : self.length_offset + self.length_size], diff --git a/tests/transport_test.py b/tests/transport_test.py index bf7d8d4..c7c3c1c 100644 --- a/tests/transport_test.py +++ b/tests/transport_test.py @@ -24,7 +24,7 @@ import sys import pytest from bumble import controller, device, hci, link, transport -from bumble.transport import common +from bumble.transport import common, usb # ----------------------------------------------------------------------------- @@ -252,6 +252,69 @@ async def test_open_transport_with_metadata(spec): await controller_transport.close() +# ----------------------------------------------------------------------------- +def test_packet_splitter_complete(): + emitted = [] + splitter = usb.AclPacketSplitter(emitted.append) + packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44]) + splitter.feed(packet) + assert emitted == [packet] + + +def test_packet_splitter_chunks(): + emitted = [] + splitter = usb.AclPacketSplitter(emitted.append) + packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44]) + splitter.feed(packet[:4]) + assert emitted == [] + splitter.feed(packet[4:]) + assert emitted == [packet] + + +def test_packet_splitter_multiple(): + emitted = [] + splitter = usb.AclPacketSplitter(emitted.append) + packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44]) + packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66]) + splitter.feed(packet1 + packet2) + assert emitted == [packet1, packet2] + + +def test_packet_splitter_partial(): + emitted = [] + splitter = usb.AclPacketSplitter(emitted.append) + packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44]) + packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66]) + splitter.feed(packet1 + packet2[:4]) + assert emitted == [packet1] + splitter.feed(packet2[4:]) + assert emitted == [packet1, packet2] + + +def test_packet_splitter_empty_payload(): + emitted = [] + splitter = usb.AclPacketSplitter(emitted.append) + packet = bytes([0x01, 0x00, 0x00, 0x00]) + splitter.feed(packet) + assert emitted == [packet] + + +def test_sco_packet_splitter(): + emitted = [] + splitter = usb.ScoPacketSplitter(emitted.append) + packet = bytes([0x01, 0x00, 0x03, 0x11, 0x22, 0x33]) + splitter.feed(packet) + assert emitted == [packet] + + +def test_event_packet_splitter(): + emitted = [] + splitter = usb.EventPacketSplitter(emitted.append) + packet = bytes([0x04, 0x02, 0x11, 0x22]) + splitter.feed(packet) + assert emitted == [packet] + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_parser()