diff --git a/bumble/l2cap.py b/bumble/l2cap.py index 085e0360..fbf5a3bc 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -23,6 +23,7 @@ import logging import struct from collections import deque +from collections.abc import Sequence from typing import ( Optional, Callable, @@ -112,6 +113,10 @@ class CommandCode(hci.SpecableEnum): L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14 L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15 L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16 + L2CAP_CREDIT_BASED_CONNECTION_REQUEST = 0x17 + L2CAP_CREDIT_BASED_CONNECTION_RESPONSE = 0x18 + L2CAP_CREDIT_BASED_RECONFIGURE_REQUEST = 0x19 + L2CAP_CREDIT_BASED_RECONFIGURE_RESPONSE = 0x1A L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000 L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001 @@ -595,6 +600,109 @@ class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame): credits: int = dataclasses.field(metadata=hci.metadata(2)) +# ----------------------------------------------------------------------------- +@L2CAP_Control_Frame.subclass +@dataclasses.dataclass +class L2CAP_Credit_Based_Connection_Request(L2CAP_Control_Frame): + ''' + See Bluetooth spec @ Vol 3, Part A - 4.25 L2CAP_CREDIT_BASED_CONNECTION_REQ (0x17). + ''' + + @classmethod + def parse_cid_list(cls, data: bytes, offset: int) -> tuple[int, list[int]]: + count = (len(data) - offset) // 2 + return len(data), list(struct.unpack_from("<" + ("H" * count), data, offset)) + + @classmethod + def serialize_cid_list(cls, cids: Sequence[int]) -> bytes: + return b"".join([struct.pack(" None: + frame = l2cap.L2CAP_Credit_Based_Connection_Request( + identifier=1, spsm=2, mtu=3, mps=4, initial_credits=5, source_cid=[6, 7, 8] + ) + + parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame)) + assert parsed == frame + + +# ----------------------------------------------------------------------------- +def test_l2cap_credit_based_connection_response() -> None: + frame = l2cap.L2CAP_Credit_Based_Connection_Response( + identifier=1, + mtu=2, + mps=3, + initial_credits=4, + result=l2cap.L2CAP_Credit_Based_Connection_Response.Result.ALL_CONNECTIONS_PENDING_AUTHENTICATION_PENDING, + destination_cid=[6, 7, 8], + ) + + parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame)) + assert parsed == frame + + +# ----------------------------------------------------------------------------- +def test_l2cap_credit_based_reconfigure_request() -> None: + frame = l2cap.L2CAP_Credit_Based_Reconfigure_Request( + identifier=1, + mtu=2, + mps=3, + destination_cid=[6, 7, 8], + ) + + parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame)) + assert parsed == frame + + +# ----------------------------------------------------------------------------- +def test_l2cap_credit_based_reconfigure_response() -> None: + frame = l2cap.L2CAP_Credit_Based_Reconfigure_Response( + identifier=1, + result=l2cap.L2CAP_Credit_Based_Reconfigure_Response.Result.RECONFIGURATION_FAILED_OTHER_UNACCEPTABLE_PARAMETERS, + ) + + parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame)) + assert parsed == frame + + # ----------------------------------------------------------------------------- def test_unimplemented_control_frame(): frame = l2cap.L2CAP_Control_Frame(identifier=1)