forked from auracaster/bumble_mirror
Merge pull request #751 from zxzxwu/l2cap
Add L2CAP Credit Based packets definitions (0x17-0x1A)
This commit is contained in:
108
bumble/l2cap.py
108
bumble/l2cap.py
@@ -23,6 +23,7 @@ import logging
|
|||||||
import struct
|
import struct
|
||||||
|
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
from collections.abc import Sequence
|
||||||
from typing import (
|
from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
Callable,
|
Callable,
|
||||||
@@ -112,6 +113,10 @@ class CommandCode(hci.SpecableEnum):
|
|||||||
L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14
|
L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14
|
||||||
L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15
|
L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15
|
||||||
L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16
|
L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16
|
||||||
|
L2CAP_CREDIT_BASED_CONNECTION_REQUEST = 0x17
|
||||||
|
L2CAP_CREDIT_BASED_CONNECTION_RESPONSE = 0x18
|
||||||
|
L2CAP_CREDIT_BASED_RECONFIGURE_REQUEST = 0x19
|
||||||
|
L2CAP_CREDIT_BASED_RECONFIGURE_RESPONSE = 0x1A
|
||||||
|
|
||||||
L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000
|
L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000
|
||||||
L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001
|
L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001
|
||||||
@@ -595,6 +600,109 @@ class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame):
|
|||||||
credits: int = dataclasses.field(metadata=hci.metadata(2))
|
credits: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@L2CAP_Control_Frame.subclass
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class L2CAP_Credit_Based_Connection_Request(L2CAP_Control_Frame):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec @ Vol 3, Part A - 4.25 L2CAP_CREDIT_BASED_CONNECTION_REQ (0x17).
|
||||||
|
'''
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse_cid_list(cls, data: bytes, offset: int) -> tuple[int, list[int]]:
|
||||||
|
count = (len(data) - offset) // 2
|
||||||
|
return len(data), list(struct.unpack_from("<" + ("H" * count), data, offset))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def serialize_cid_list(cls, cids: Sequence[int]) -> bytes:
|
||||||
|
return b"".join([struct.pack("<H", cid) for cid in cids])
|
||||||
|
|
||||||
|
CID_METADATA: ClassVar[dict[str, Any]] = hci.metadata(
|
||||||
|
{
|
||||||
|
'parser': lambda data, offset: L2CAP_Credit_Based_Connection_Request.parse_cid_list(
|
||||||
|
data, offset
|
||||||
|
),
|
||||||
|
'serializer': lambda value: L2CAP_Credit_Based_Connection_Request.serialize_cid_list(
|
||||||
|
value
|
||||||
|
),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
spsm: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
mtu: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
mps: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
initial_credits: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
source_cid: Sequence[int] = dataclasses.field(metadata=CID_METADATA)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@L2CAP_Control_Frame.subclass
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class L2CAP_Credit_Based_Connection_Response(L2CAP_Control_Frame):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec @ Vol 3, Part A - 4.26 L2CAP_CREDIT_BASED_CONNECTION_RSP (0x18).
|
||||||
|
'''
|
||||||
|
|
||||||
|
class Result(hci.SpecableEnum):
|
||||||
|
ALL_CONNECTIONS_SUCCESSFUL = 0x0000
|
||||||
|
ALL_CONNECTIONS_REFUSED_SPSM_NOT_SUPPORTED = 0x0002
|
||||||
|
SOME_CONNECTIONS_REFUSED_INSUFFICIENT_RESOURCES_AVAILABLE = 0x0004
|
||||||
|
ALL_CONNECTIONS_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005
|
||||||
|
ALL_CONNECTIONS_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006
|
||||||
|
ALL_CONNECTIONS_REFUSED_ENCRYPTION_KEY_SIZE_TOO_SHORT = 0x0007
|
||||||
|
ALL_CONNECTIONS_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008
|
||||||
|
SOME_CONNECTIONS_REFUSED_INVALID_SOURCE_CID = 0x0009
|
||||||
|
SOME_CONNECTIONS_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A
|
||||||
|
ALL_CONNECTIONS_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
|
||||||
|
ALL_CONNECTIONS_REFUSED_INVALID_PARAMETERS = 0x000C
|
||||||
|
ALL_CONNECTIONS_PENDING_NO_FURTHER_INFORMATION_AVAILABLE = 0x000D
|
||||||
|
ALL_CONNECTIONS_PENDING_AUTHENTICATION_PENDING = 0x000E
|
||||||
|
ALL_CONNECTIONS_PENDING_AUTHORIZATION_PENDING = 0x000F
|
||||||
|
|
||||||
|
mtu: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
mps: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
initial_credits: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
result: int = dataclasses.field(metadata=Result.type_metadata(2))
|
||||||
|
destination_cid: Sequence[int] = dataclasses.field(
|
||||||
|
metadata=L2CAP_Credit_Based_Connection_Request.CID_METADATA
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@L2CAP_Control_Frame.subclass
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class L2CAP_Credit_Based_Reconfigure_Request(L2CAP_Control_Frame):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec @ Vol 3, Part A - 4.27 L2CAP_CREDIT_BASED_RECONFIGURE_REQ (0x19).
|
||||||
|
'''
|
||||||
|
|
||||||
|
mtu: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
mps: int = dataclasses.field(metadata=hci.metadata(2))
|
||||||
|
destination_cid: Sequence[int] = dataclasses.field(
|
||||||
|
metadata=L2CAP_Credit_Based_Connection_Request.CID_METADATA
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@L2CAP_Control_Frame.subclass
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class L2CAP_Credit_Based_Reconfigure_Response(L2CAP_Control_Frame):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec @ Vol 3, Part A - 4.28 L2CAP_CREDIT_BASED_RECONFIGURE_RSP (0x1A).
|
||||||
|
'''
|
||||||
|
|
||||||
|
class Result(hci.SpecableEnum):
|
||||||
|
RECONFIGURATION_SUCCESSFUL = 0x0000
|
||||||
|
RECONFIGURATION_FAILED_REDUCTION_IN_SIZE_OF_MTU_NOT_ALLOWED = 0x0001
|
||||||
|
RECONFIGURATION_FAILED_REDUCTION_IN_SIZE_OF_MPS_NOT_ALLOWED_FOR_MORE_THAN_ONE_CHANNEL_AT_A_TIME = (
|
||||||
|
0x0002
|
||||||
|
)
|
||||||
|
RECONFIGURATION_FAILED_ONE_OR_MORE_DESTINATION_CIDS_INVALID = 0x0003
|
||||||
|
RECONFIGURATION_FAILED_OTHER_UNACCEPTABLE_PARAMETERS = 0x0004
|
||||||
|
|
||||||
|
result: int = dataclasses.field(metadata=Result.type_metadata(2))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
class ClassicChannel(utils.EventEmitter):
|
class ClassicChannel(utils.EventEmitter):
|
||||||
class State(enum.IntEnum):
|
class State(enum.IntEnum):
|
||||||
|
|||||||
@@ -73,6 +73,55 @@ def test_helpers():
|
|||||||
assert srq.identifier == rq.identifier
|
assert srq.identifier == rq.identifier
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_l2cap_credit_based_connection_request() -> None:
|
||||||
|
frame = l2cap.L2CAP_Credit_Based_Connection_Request(
|
||||||
|
identifier=1, spsm=2, mtu=3, mps=4, initial_credits=5, source_cid=[6, 7, 8]
|
||||||
|
)
|
||||||
|
|
||||||
|
parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame))
|
||||||
|
assert parsed == frame
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_l2cap_credit_based_connection_response() -> None:
|
||||||
|
frame = l2cap.L2CAP_Credit_Based_Connection_Response(
|
||||||
|
identifier=1,
|
||||||
|
mtu=2,
|
||||||
|
mps=3,
|
||||||
|
initial_credits=4,
|
||||||
|
result=l2cap.L2CAP_Credit_Based_Connection_Response.Result.ALL_CONNECTIONS_PENDING_AUTHENTICATION_PENDING,
|
||||||
|
destination_cid=[6, 7, 8],
|
||||||
|
)
|
||||||
|
|
||||||
|
parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame))
|
||||||
|
assert parsed == frame
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_l2cap_credit_based_reconfigure_request() -> None:
|
||||||
|
frame = l2cap.L2CAP_Credit_Based_Reconfigure_Request(
|
||||||
|
identifier=1,
|
||||||
|
mtu=2,
|
||||||
|
mps=3,
|
||||||
|
destination_cid=[6, 7, 8],
|
||||||
|
)
|
||||||
|
|
||||||
|
parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame))
|
||||||
|
assert parsed == frame
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_l2cap_credit_based_reconfigure_response() -> None:
|
||||||
|
frame = l2cap.L2CAP_Credit_Based_Reconfigure_Response(
|
||||||
|
identifier=1,
|
||||||
|
result=l2cap.L2CAP_Credit_Based_Reconfigure_Response.Result.RECONFIGURATION_FAILED_OTHER_UNACCEPTABLE_PARAMETERS,
|
||||||
|
)
|
||||||
|
|
||||||
|
parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame))
|
||||||
|
assert parsed == frame
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_unimplemented_control_frame():
|
def test_unimplemented_control_frame():
|
||||||
frame = l2cap.L2CAP_Control_Frame(identifier=1)
|
frame = l2cap.L2CAP_Control_Frame(identifier=1)
|
||||||
|
|||||||
Reference in New Issue
Block a user