Merge pull request #936 from google/gbg/usb-transport-packet-splitter

usb transport packet splitter
This commit is contained in:
Gilles Boccon-Gibod
2026-06-07 12:52:53 +02:00
committed by GitHub
2 changed files with 142 additions and 31 deletions
+64 -1
View File
@@ -24,7 +24,7 @@ import sys
import pytest
from bumble import controller, device, hci, link, transport
from bumble.transport import common
from bumble.transport import common, usb
# -----------------------------------------------------------------------------
@@ -252,6 +252,69 @@ async def test_open_transport_with_metadata(spec):
await controller_transport.close()
# -----------------------------------------------------------------------------
def test_packet_splitter_complete():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
splitter.feed(packet)
assert emitted == [packet]
def test_packet_splitter_chunks():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
splitter.feed(packet[:4])
assert emitted == []
splitter.feed(packet[4:])
assert emitted == [packet]
def test_packet_splitter_multiple():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66])
splitter.feed(packet1 + packet2)
assert emitted == [packet1, packet2]
def test_packet_splitter_partial():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet1 = bytes([0x01, 0x00, 0x04, 0x00, 0x11, 0x22, 0x33, 0x44])
packet2 = bytes([0x02, 0x00, 0x02, 0x00, 0x55, 0x66])
splitter.feed(packet1 + packet2[:4])
assert emitted == [packet1]
splitter.feed(packet2[4:])
assert emitted == [packet1, packet2]
def test_packet_splitter_empty_payload():
emitted = []
splitter = usb.AclPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x00, 0x00])
splitter.feed(packet)
assert emitted == [packet]
def test_sco_packet_splitter():
emitted = []
splitter = usb.ScoPacketSplitter(emitted.append)
packet = bytes([0x01, 0x00, 0x03, 0x11, 0x22, 0x33])
splitter.feed(packet)
assert emitted == [packet]
def test_event_packet_splitter():
emitted = []
splitter = usb.EventPacketSplitter(emitted.append)
packet = bytes([0x04, 0x02, 0x11, 0x22])
splitter.feed(packet)
assert emitted == [packet]
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_parser()