Files
bumble_mirror/bumble/l2cap.py
2025-08-27 14:15:08 +08:00

2204 lines
79 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import dataclasses
import enum
import logging
import struct
from collections import deque
from collections.abc import Sequence
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Iterable,
Optional,
SupportsBytes,
TypeVar,
Union,
)
from bumble import hci, utils
from bumble.colors import color
from bumble.core import (
InvalidArgumentError,
InvalidPacketError,
InvalidStateError,
OutOfResourcesError,
ProtocolError,
)
if TYPE_CHECKING:
from bumble.device import Connection
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
# fmt: off
# pylint: disable=line-too-long
L2CAP_SIGNALING_CID = 0x01
L2CAP_LE_SIGNALING_CID = 0x05
L2CAP_MIN_LE_MTU = 23
L2CAP_MIN_BR_EDR_MTU = 48
L2CAP_MAX_BR_EDR_MTU = 65535
L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept
L2CAP_DEFAULT_CONNECTIONLESS_MTU = 1024
# See Bluetooth spec @ Vol 3, Part A - Table 2.1: CID name space on ACL-U, ASB-U, and AMP-U logical links
L2CAP_ACL_U_DYNAMIC_CID_RANGE_START = 0x0040
L2CAP_ACL_U_DYNAMIC_CID_RANGE_END = 0xFFFF
# See Bluetooth spec @ Vol 3, Part A - Table 2.2: CID name space on LE-U logical link
L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x0040
L2CAP_LE_U_DYNAMIC_CID_RANGE_END = 0x007F
# PSM Range - See Bluetooth spec @ Vol 3, Part A / Table 4.5: PSM ranges and usage
L2CAP_PSM_DYNAMIC_RANGE_START = 0x1001
L2CAP_PSM_DYNAMIC_RANGE_END = 0xFFFF
# LE PSM Ranges - See Bluetooth spec @ Vol 3, Part A / Table 4.19: LE Credit Based Connection Request LE_PSM ranges
L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080
L2CAP_LE_PSM_DYNAMIC_RANGE_END = 0x00FF
class CommandCode(hci.SpecableEnum):
L2CAP_COMMAND_REJECT = 0x01
L2CAP_CONNECTION_REQUEST = 0x02
L2CAP_CONNECTION_RESPONSE = 0x03
L2CAP_CONFIGURE_REQUEST = 0x04
L2CAP_CONFIGURE_RESPONSE = 0x05
L2CAP_DISCONNECTION_REQUEST = 0x06
L2CAP_DISCONNECTION_RESPONSE = 0x07
L2CAP_ECHO_REQUEST = 0x08
L2CAP_ECHO_RESPONSE = 0x09
L2CAP_INFORMATION_REQUEST = 0x0A
L2CAP_INFORMATION_RESPONSE = 0x0B
L2CAP_CREATE_CHANNEL_REQUEST = 0x0C
L2CAP_CREATE_CHANNEL_RESPONSE = 0x0D
L2CAP_MOVE_CHANNEL_REQUEST = 0x0E
L2CAP_MOVE_CHANNEL_RESPONSE = 0x0F
L2CAP_MOVE_CHANNEL_CONFIRMATION = 0x10
L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE = 0x11
L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST = 0x12
L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13
L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14
L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15
L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16
L2CAP_CREDIT_BASED_CONNECTION_REQUEST = 0x17
L2CAP_CREDIT_BASED_CONNECTION_RESPONSE = 0x18
L2CAP_CREDIT_BASED_RECONFIGURE_REQUEST = 0x19
L2CAP_CREDIT_BASED_RECONFIGURE_RESPONSE = 0x1A
L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000
L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001
L2CAP_COMMAND_NOT_UNDERSTOOD_REASON = 0x0000
L2CAP_SIGNALING_MTU_EXCEEDED_REASON = 0x0001
L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01
# fmt: on
# pylint: enable=line-too-long
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
# pylint: disable=invalid-name
@dataclasses.dataclass
class ClassicChannelSpec:
psm: Optional[int] = None
mtu: int = L2CAP_DEFAULT_MTU
@dataclasses.dataclass
class LeCreditBasedChannelSpec:
psm: Optional[int] = None
mtu: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
mps: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
max_credits: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
def __post_init__(self):
if (
self.max_credits < 1
or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
):
raise InvalidArgumentError('max credits out of range')
if (
self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU
or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU
):
raise InvalidArgumentError('MTU out of range')
if (
self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
):
raise InvalidArgumentError('MPS out of range')
class L2CAP_PDU:
'''
See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT
'''
@staticmethod
def from_bytes(data: bytes) -> L2CAP_PDU:
# Check parameters
if len(data) < 4:
raise InvalidPacketError('not enough data for L2CAP header')
_, l2cap_pdu_cid = struct.unpack_from('<HH', data, 0)
l2cap_pdu_payload = data[4:]
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def __bytes__(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload
def __init__(self, cid: int, payload: bytes) -> None:
self.cid = cid
self.payload = payload
def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class L2CAP_Control_Frame:
'''
See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS
'''
classes: ClassVar[dict[int, type[L2CAP_Control_Frame]]] = {}
fields: ClassVar[hci.Fields] = ()
code: int = dataclasses.field(default=0, init=False)
name: str = dataclasses.field(default='', init=False)
_payload: Optional[bytes] = dataclasses.field(default=None, init=False)
identifier: int
@classmethod
def from_bytes(cls, pdu: bytes) -> L2CAP_Control_Frame:
code, identifier, length = struct.unpack_from("<BBH", pdu)
subclass = L2CAP_Control_Frame.classes.get(code)
if subclass is None:
instance = L2CAP_Control_Frame(identifier=identifier)
instance.payload = pdu[4:]
instance.code = CommandCode(code)
instance.name = instance.code.name
return instance
frame = subclass(
**hci.HCI_Object.dict_from_bytes(pdu, 4, subclass.fields),
identifier=identifier,
)
frame.identifier = identifier
frame.payload = pdu[4:]
if length != len(frame.payload):
logger.warning(
color(
f'!!! length mismatch: expected {length} but got {len(frame.payload)}',
'red',
)
)
return frame
@staticmethod
def decode_configuration_options(data: bytes) -> list[tuple[int, bytes]]:
options = []
while len(data) >= 2:
value_type = data[0]
length = data[1]
value = data[2 : 2 + length]
data = data[2 + length :]
options.append((value_type, value))
return options
@staticmethod
def encode_configuration_options(options: list[tuple[int, bytes]]) -> bytes:
return b''.join(
[bytes([option[0], len(option[1])]) + option[1] for option in options]
)
_ControlFrame = TypeVar('_ControlFrame', bound='L2CAP_Control_Frame')
@classmethod
def subclass(cls, subclass: type[_ControlFrame]) -> type[_ControlFrame]:
subclass.name = subclass.__name__.upper()
subclass.code = CommandCode[subclass.name]
subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass)
# Register a factory for this class
L2CAP_Control_Frame.classes[subclass.code] = subclass
return subclass
@property
def payload(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._payload
@payload.setter
def payload(self, payload: bytes) -> None:
self._payload = payload
def __bytes__(self) -> bytes:
return (
struct.pack('<BBH', self.code, self.identifier, len(self.payload))
+ self.payload
)
def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
if fields := getattr(self, 'fields', None):
result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if len(self.payload) > 1:
result += f': {self.payload.hex()}'
return result
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Command_Reject(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.1 COMMAND REJECT
'''
class Reason(hci.SpecableEnum):
COMMAND_NOT_UNDERSTOOD = 0x0000
SIGNALING_MTU_EXCEEDED = 0x0001
INVALID_CID_IN_REQUEST = 0x0002
reason: int = dataclasses.field(metadata=Reason.type_metadata(2))
data: bytes = dataclasses.field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Connection_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
'''
@staticmethod
def parse_psm(data: bytes, offset: int = 0) -> tuple[int, int]:
psm_length = 2
psm = data[offset] | data[offset + 1] << 8
# The PSM field extends until the first even octet (inclusive)
while data[offset + psm_length - 1] % 2 == 1:
psm |= data[offset + psm_length] << (8 * psm_length)
psm_length += 1
return offset + psm_length, psm
@staticmethod
def serialize_psm(psm: int) -> bytes:
serialized = struct.pack('<H', psm & 0xFFFF)
psm >>= 16
while psm:
serialized += bytes([psm & 0xFF])
psm >>= 8
return serialized
psm: int = dataclasses.field(
metadata=hci.metadata(
{
'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm(
data, offset
),
'serializer': lambda value: L2CAP_Connection_Request.serialize_psm(
value
),
}
)
)
source_cid: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Connection_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
'''
class Result(hci.SpecableEnum):
CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_PENDING = 0x0001
CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003
CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006
CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007
CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
destination_cid: int = dataclasses.field(metadata=hci.metadata(2))
source_cid: int = dataclasses.field(metadata=hci.metadata(2))
result: int = dataclasses.field(metadata=Result.type_metadata(2))
status: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Configure_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.4 CONFIGURATION REQUEST
'''
destination_cid: int = dataclasses.field(metadata=hci.metadata(2))
flags: int = dataclasses.field(metadata=hci.metadata(2))
options: bytes = dataclasses.field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Configure_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.5 CONFIGURATION RESPONSE
'''
class Result(hci.SpecableEnum):
SUCCESS = 0x0000
FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001
FAILURE_REJECTED = 0x0002
FAILURE_UNKNOWN_OPTIONS = 0x0003
PENDING = 0x0004
FAILURE_FLOW_SPEC_REJECTED = 0x0005
source_cid: int = dataclasses.field(metadata=hci.metadata(2))
flags: int = dataclasses.field(metadata=hci.metadata(2))
result: int = dataclasses.field(metadata=Result.type_metadata(2))
options: bytes = dataclasses.field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Disconnection_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.6 DISCONNECTION REQUEST
'''
destination_cid: int = dataclasses.field(metadata=hci.metadata(2))
source_cid: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Disconnection_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.7 DISCONNECTION RESPONSE
'''
destination_cid: int = dataclasses.field(metadata=hci.metadata(2))
source_cid: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Echo_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.8 ECHO REQUEST
'''
data: bytes = dataclasses.field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Echo_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.9 ECHO RESPONSE
'''
data: bytes = dataclasses.field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Information_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST
'''
class InfoType(hci.SpecableEnum):
CONNECTIONLESS_MTU = 0x0001
EXTENDED_FEATURES_SUPPORTED = 0x0002
FIXED_CHANNELS_SUPPORTED = 0x0003
EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001
EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002
EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004
EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008
EXTENDED_FEATURE_STREAMING_MODE = 0x0010
EXTENDED_FEATURE_FCS_OPTION = 0x0020
EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040
EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080
EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100
EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200
EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400
info_type: int = dataclasses.field(metadata=InfoType.type_metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Information_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE
'''
class Result(hci.SpecableEnum):
SUCCESS = 0x00
NOT_SUPPORTED = 0x01
info_type: int = dataclasses.field(
metadata=L2CAP_Information_Request.InfoType.type_metadata(2)
)
result: int = dataclasses.field(metadata=Result.type_metadata(2))
data: bytes = dataclasses.field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.20 CONNECTION PARAMETER UPDATE REQUEST
'''
interval_min: int = dataclasses.field(metadata=hci.metadata(2))
interval_max: int = dataclasses.field(metadata=hci.metadata(2))
latency: int = dataclasses.field(metadata=hci.metadata(2))
timeout: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Connection_Parameter_Update_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.21 CONNECTION PARAMETER UPDATE RESPONSE
'''
result: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.22 LE CREDIT BASED CONNECTION REQUEST
(CODE 0x14)
'''
le_psm: int = dataclasses.field(metadata=hci.metadata(2))
source_cid: int = dataclasses.field(metadata=hci.metadata(2))
mtu: int = dataclasses.field(metadata=hci.metadata(2))
mps: int = dataclasses.field(metadata=hci.metadata(2))
initial_credits: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.23 LE CREDIT BASED CONNECTION RESPONSE
(CODE 0x15)
'''
class Result(hci.SpecableEnum):
CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002
CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005
CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006
CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007
CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008
CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009
CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A
CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
destination_cid: int = dataclasses.field(metadata=hci.metadata(2))
mtu: int = dataclasses.field(metadata=hci.metadata(2))
mps: int = dataclasses.field(metadata=hci.metadata(2))
initial_credits: int = dataclasses.field(metadata=hci.metadata(2))
result: int = dataclasses.field(metadata=Result.type_metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.24 LE FLOW CONTROL CREDIT (CODE 0x16)
'''
cid: int = dataclasses.field(metadata=hci.metadata(2))
credits: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Credit_Based_Connection_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.25 L2CAP_CREDIT_BASED_CONNECTION_REQ (0x17).
'''
@classmethod
def parse_cid_list(cls, data: bytes, offset: int) -> tuple[int, list[int]]:
count = (len(data) - offset) // 2
return len(data), list(struct.unpack_from("<" + ("H" * count), data, offset))
@classmethod
def serialize_cid_list(cls, cids: Sequence[int]) -> bytes:
return b"".join([struct.pack("<H", cid) for cid in cids])
CID_METADATA: ClassVar[dict[str, Any]] = hci.metadata(
{
'parser': lambda data, offset: L2CAP_Credit_Based_Connection_Request.parse_cid_list(
data, offset
),
'serializer': lambda value: L2CAP_Credit_Based_Connection_Request.serialize_cid_list(
value
),
}
)
spsm: int = dataclasses.field(metadata=hci.metadata(2))
mtu: int = dataclasses.field(metadata=hci.metadata(2))
mps: int = dataclasses.field(metadata=hci.metadata(2))
initial_credits: int = dataclasses.field(metadata=hci.metadata(2))
source_cid: Sequence[int] = dataclasses.field(metadata=CID_METADATA)
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Credit_Based_Connection_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.26 L2CAP_CREDIT_BASED_CONNECTION_RSP (0x18).
'''
class Result(hci.SpecableEnum):
ALL_CONNECTIONS_SUCCESSFUL = 0x0000
ALL_CONNECTIONS_REFUSED_SPSM_NOT_SUPPORTED = 0x0002
SOME_CONNECTIONS_REFUSED_INSUFFICIENT_RESOURCES_AVAILABLE = 0x0004
ALL_CONNECTIONS_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005
ALL_CONNECTIONS_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006
ALL_CONNECTIONS_REFUSED_ENCRYPTION_KEY_SIZE_TOO_SHORT = 0x0007
ALL_CONNECTIONS_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008
SOME_CONNECTIONS_REFUSED_INVALID_SOURCE_CID = 0x0009
SOME_CONNECTIONS_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A
ALL_CONNECTIONS_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B
ALL_CONNECTIONS_REFUSED_INVALID_PARAMETERS = 0x000C
ALL_CONNECTIONS_PENDING_NO_FURTHER_INFORMATION_AVAILABLE = 0x000D
ALL_CONNECTIONS_PENDING_AUTHENTICATION_PENDING = 0x000E
ALL_CONNECTIONS_PENDING_AUTHORIZATION_PENDING = 0x000F
mtu: int = dataclasses.field(metadata=hci.metadata(2))
mps: int = dataclasses.field(metadata=hci.metadata(2))
initial_credits: int = dataclasses.field(metadata=hci.metadata(2))
result: int = dataclasses.field(metadata=Result.type_metadata(2))
destination_cid: Sequence[int] = dataclasses.field(
metadata=L2CAP_Credit_Based_Connection_Request.CID_METADATA
)
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Credit_Based_Reconfigure_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.27 L2CAP_CREDIT_BASED_RECONFIGURE_REQ (0x19).
'''
mtu: int = dataclasses.field(metadata=hci.metadata(2))
mps: int = dataclasses.field(metadata=hci.metadata(2))
destination_cid: Sequence[int] = dataclasses.field(
metadata=L2CAP_Credit_Based_Connection_Request.CID_METADATA
)
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass
@dataclasses.dataclass
class L2CAP_Credit_Based_Reconfigure_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.28 L2CAP_CREDIT_BASED_RECONFIGURE_RSP (0x1A).
'''
class Result(hci.SpecableEnum):
RECONFIGURATION_SUCCESSFUL = 0x0000
RECONFIGURATION_FAILED_REDUCTION_IN_SIZE_OF_MTU_NOT_ALLOWED = 0x0001
RECONFIGURATION_FAILED_REDUCTION_IN_SIZE_OF_MPS_NOT_ALLOWED_FOR_MORE_THAN_ONE_CHANNEL_AT_A_TIME = (
0x0002
)
RECONFIGURATION_FAILED_ONE_OR_MORE_DESTINATION_CIDS_INVALID = 0x0003
RECONFIGURATION_FAILED_OTHER_UNACCEPTABLE_PARAMETERS = 0x0004
result: int = dataclasses.field(metadata=Result.type_metadata(2))
# -----------------------------------------------------------------------------
class ClassicChannel(utils.EventEmitter):
class State(enum.IntEnum):
# States
CLOSED = 0x00
WAIT_CONNECT = 0x01
WAIT_CONNECT_RSP = 0x02
OPEN = 0x03
WAIT_DISCONNECT = 0x04
WAIT_CREATE = 0x05
WAIT_CREATE_RSP = 0x06
WAIT_MOVE = 0x07
WAIT_MOVE_RSP = 0x08
WAIT_MOVE_CONFIRM = 0x09
WAIT_CONFIRM_RSP = 0x0A
# CONFIG substates
WAIT_CONFIG = 0x10
WAIT_SEND_CONFIG = 0x11
WAIT_CONFIG_REQ_RSP = 0x12
WAIT_CONFIG_RSP = 0x13
WAIT_CONFIG_REQ = 0x14
WAIT_IND_FINAL_RSP = 0x15
WAIT_FINAL_RSP = 0x16
WAIT_CONTROL_IND = 0x17
EVENT_OPEN = "open"
EVENT_CLOSE = "close"
connection_result: Optional[asyncio.Future[None]]
disconnection_result: Optional[asyncio.Future[None]]
response: Optional[asyncio.Future[bytes]]
sink: Optional[Callable[[bytes], Any]]
state: State
connection: Connection
mtu: int
peer_mtu: int
def __init__(
self,
manager: ChannelManager,
connection: Connection,
signaling_cid: int,
psm: int,
source_cid: int,
mtu: int,
) -> None:
super().__init__()
self.manager = manager
self.connection = connection
self.signaling_cid = signaling_cid
self.state = self.State.CLOSED
self.mtu = mtu
self.peer_mtu = L2CAP_MIN_BR_EDR_MTU
self.psm = psm
self.source_cid = source_cid
self.destination_cid = 0
self.connection_result = None
self.disconnection_result = None
self.sink = None
def _change_state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
self.state = new_state
def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
if self.state != self.State.OPEN:
raise InvalidStateError('channel not open')
self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
def on_pdu(self, pdu: bytes) -> None:
if self.sink:
# pylint: disable=not-callable
self.sink(pdu)
else:
logger.warning(
color('received pdu without a pending request or sink', 'red')
)
async def connect(self) -> None:
if self.state != self.State.CLOSED:
raise InvalidStateError('invalid state')
# Check that we can start a new connection
if self.connection_result:
raise InvalidStateError('connection already pending')
self._change_state(self.State.WAIT_CONNECT_RSP)
self.send_control_frame(
L2CAP_Connection_Request(
identifier=self.manager.next_identifier(self.connection),
psm=self.psm,
source_cid=self.source_cid,
)
)
# Create a future to wait for the state machine to get to a success or error
# state
self.connection_result = asyncio.get_running_loop().create_future()
# Wait for the connection to succeed or fail
try:
return await self.connection.cancel_on_disconnection(self.connection_result)
finally:
self.connection_result = None
async def disconnect(self) -> None:
if self.state != self.State.OPEN:
raise InvalidStateError('invalid state')
self._change_state(self.State.WAIT_DISCONNECT)
self.send_control_frame(
L2CAP_Disconnection_Request(
identifier=self.manager.next_identifier(self.connection),
destination_cid=self.destination_cid,
source_cid=self.source_cid,
)
)
# Create a future to wait for the state machine to get to a success or error
# state
self.disconnection_result = asyncio.get_running_loop().create_future()
return await self.disconnection_result
def abort(self) -> None:
if self.state == self.State.OPEN:
self._change_state(self.State.CLOSED)
self.emit(self.EVENT_CLOSE)
def send_configure_request(self) -> None:
options = L2CAP_Control_Frame.encode_configuration_options(
[
(
L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE,
struct.pack('<H', self.mtu),
)
]
)
self.send_control_frame(
L2CAP_Configure_Request(
identifier=self.manager.next_identifier(self.connection),
destination_cid=self.destination_cid,
flags=0x0000,
options=options,
)
)
def on_connection_request(self, request: L2CAP_Connection_Request) -> None:
self.destination_cid = request.source_cid
self._change_state(self.State.WAIT_CONNECT)
self.send_control_frame(
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=self.source_cid,
source_cid=self.destination_cid,
result=L2CAP_Connection_Response.Result.CONNECTION_SUCCESSFUL,
status=0x0000,
)
)
self._change_state(self.State.WAIT_CONFIG)
self.send_configure_request()
self._change_state(self.State.WAIT_CONFIG_REQ_RSP)
def on_connection_response(self, response: L2CAP_Connection_Response):
if self.state != self.State.WAIT_CONNECT_RSP:
logger.warning(color('invalid state', 'red'))
return
if response.result == L2CAP_Connection_Response.Result.CONNECTION_SUCCESSFUL:
self.destination_cid = response.destination_cid
self._change_state(self.State.WAIT_CONFIG)
self.send_configure_request()
self._change_state(self.State.WAIT_CONFIG_REQ_RSP)
elif response.result == L2CAP_Connection_Response.Result.CONNECTION_PENDING:
pass
else:
self._change_state(self.State.CLOSED)
if self.connection_result:
self.connection_result.set_exception(
ProtocolError(
response.result,
'l2cap',
L2CAP_Connection_Response.Result(response.result).name,
)
)
self.connection_result = None
def on_configure_request(self, request: L2CAP_Configure_Request) -> None:
if self.state not in (
self.State.WAIT_CONFIG,
self.State.WAIT_CONFIG_REQ,
self.State.WAIT_CONFIG_REQ_RSP,
):
logger.warning(color('invalid state', 'red'))
return
# Decode the options
options = L2CAP_Control_Frame.decode_configuration_options(request.options)
for option in options:
if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE:
self.peer_mtu = struct.unpack('<H', option[1])[0]
logger.debug(f'peer MTU = {self.peer_mtu}')
self.send_control_frame(
L2CAP_Configure_Response(
identifier=request.identifier,
source_cid=self.destination_cid,
flags=0x0000,
result=L2CAP_Configure_Response.Result.SUCCESS,
options=request.options, # TODO: don't accept everything blindly
)
)
if self.state == self.State.WAIT_CONFIG:
self._change_state(self.State.WAIT_SEND_CONFIG)
self.send_configure_request()
self._change_state(self.State.WAIT_CONFIG_RSP)
elif self.state == self.State.WAIT_CONFIG_REQ:
self._change_state(self.State.OPEN)
if self.connection_result:
self.connection_result.set_result(None)
self.connection_result = None
self.emit(self.EVENT_OPEN)
elif self.state == self.State.WAIT_CONFIG_REQ_RSP:
self._change_state(self.State.WAIT_CONFIG_RSP)
def on_configure_response(self, response: L2CAP_Configure_Response) -> None:
if response.result == L2CAP_Configure_Response.Result.SUCCESS:
if self.state == self.State.WAIT_CONFIG_REQ_RSP:
self._change_state(self.State.WAIT_CONFIG_REQ)
elif self.state in (
self.State.WAIT_CONFIG_RSP,
self.State.WAIT_CONTROL_IND,
):
self._change_state(self.State.OPEN)
if self.connection_result:
self.connection_result.set_result(None)
self.connection_result = None
self.emit(self.EVENT_OPEN)
else:
logger.warning(color('invalid state', 'red'))
elif (
response.result
== L2CAP_Configure_Response.Result.FAILURE_UNACCEPTABLE_PARAMETERS
):
# Re-configure with what's suggested in the response
self.send_control_frame(
L2CAP_Configure_Request(
identifier=self.manager.next_identifier(self.connection),
destination_cid=self.destination_cid,
flags=0x0000,
options=response.options,
)
)
else:
logger.warning(
color(
'!!! configuration rejected: '
f'{L2CAP_Configure_Response.Result(response.result).name}',
'red',
)
)
# TODO: decide how to fail gracefully
def on_disconnection_request(self, request: L2CAP_Disconnection_Request) -> None:
if self.state in (self.State.OPEN, self.State.WAIT_DISCONNECT):
self.send_control_frame(
L2CAP_Disconnection_Response(
identifier=request.identifier,
destination_cid=request.destination_cid,
source_cid=request.source_cid,
)
)
self._change_state(self.State.CLOSED)
self.emit(self.EVENT_CLOSE)
self.manager.on_channel_closed(self)
else:
logger.warning(color('invalid state', 'red'))
def on_disconnection_response(self, response: L2CAP_Disconnection_Response) -> None:
if self.state != self.State.WAIT_DISCONNECT:
logger.warning(color('invalid state', 'red'))
return
if (
response.destination_cid != self.destination_cid
or response.source_cid != self.source_cid
):
logger.warning('unexpected source or destination CID')
return
self._change_state(self.State.CLOSED)
if self.disconnection_result:
self.disconnection_result.set_result(None)
self.disconnection_result = None
self.emit(self.EVENT_CLOSE)
self.manager.on_channel_closed(self)
def __str__(self) -> str:
return (
f'Channel({self.source_cid}->{self.destination_cid}, '
f'PSM={self.psm}, '
f'MTU={self.mtu}/{self.peer_mtu}, '
f'state={self.state.name})'
)
# -----------------------------------------------------------------------------
class LeCreditBasedChannel(utils.EventEmitter):
"""
LE Credit-based Connection Oriented Channel
"""
class State(enum.IntEnum):
INIT = 0
CONNECTED = 1
CONNECTING = 2
DISCONNECTING = 3
DISCONNECTED = 4
CONNECTION_ERROR = 5
out_queue: deque[bytes]
connection_result: Optional[asyncio.Future[LeCreditBasedChannel]]
disconnection_result: Optional[asyncio.Future[None]]
in_sdu: Optional[bytes]
out_sdu: Optional[bytes]
state: State
connection: Connection
sink: Optional[Callable[[bytes], Any]]
EVENT_OPEN = "open"
EVENT_CLOSE = "close"
def __init__(
self,
manager: ChannelManager,
connection: Connection,
le_psm: int,
source_cid: int,
destination_cid: int,
mtu: int,
mps: int,
credits: int, # pylint: disable=redefined-builtin
peer_mtu: int,
peer_mps: int,
peer_credits: int,
connected: bool,
) -> None:
super().__init__()
self.manager = manager
self.connection = connection
self.le_psm = le_psm
self.source_cid = source_cid
self.destination_cid = destination_cid
self.mtu = mtu
self.mps = mps
self.credits = credits
self.peer_mtu = peer_mtu
self.peer_mps = peer_mps
self.peer_credits = peer_credits
self.peer_max_credits = self.peer_credits
self.peer_credits_threshold = self.peer_max_credits // 2
self.in_sdu = None
self.in_sdu_length = 0
self.out_queue = deque()
self.out_sdu = None
self.sink = None
self.connected = False
self.connection_result = None
self.disconnection_result = None
self.drained = asyncio.Event()
self.drained.set()
if connected:
self.state = self.State.CONNECTED
else:
self.state = self.State.INIT
def _change_state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
self.state = new_state
if new_state == self.State.CONNECTED:
self.emit(self.EVENT_OPEN)
elif new_state == self.State.DISCONNECTED:
self.emit(self.EVENT_CLOSE)
def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame)
async def connect(self) -> LeCreditBasedChannel:
# Check that we're in the right state
if self.state != self.State.INIT:
raise InvalidStateError('not in a connectable state')
# Check that we can start a new connection
identifier = self.manager.next_identifier(self.connection)
if identifier in self.manager.le_coc_requests:
raise InvalidStateError('too many concurrent connection requests')
self._change_state(self.State.CONNECTING)
request = L2CAP_LE_Credit_Based_Connection_Request(
identifier=identifier,
le_psm=self.le_psm,
source_cid=self.source_cid,
mtu=self.mtu,
mps=self.mps,
initial_credits=self.peer_credits,
)
self.manager.le_coc_requests[identifier] = request
self.send_control_frame(request)
# Create a future to wait for the response
self.connection_result = asyncio.get_running_loop().create_future()
# Wait for the connection to succeed or fail
return await self.connection_result
async def disconnect(self) -> None:
# Check that we're connected
if self.state != self.State.CONNECTED:
raise InvalidStateError('not connected')
self._change_state(self.State.DISCONNECTING)
self.flush_output()
self.send_control_frame(
L2CAP_Disconnection_Request(
identifier=self.manager.next_identifier(self.connection),
destination_cid=self.destination_cid,
source_cid=self.source_cid,
)
)
# Create a future to wait for the state machine to get to a success or error
# state
self.disconnection_result = asyncio.get_running_loop().create_future()
return await self.disconnection_result
def abort(self) -> None:
if self.state == self.State.CONNECTED:
self._change_state(self.State.DISCONNECTED)
def on_pdu(self, pdu: bytes) -> None:
if self.sink is None:
logger.warning('received pdu without a sink')
return
if self.state != self.State.CONNECTED:
logger.warning('received PDU while not connected, dropping')
# Manage the peer credits
if self.peer_credits == 0:
logger.warning('received LE frame when peer out of credits')
else:
self.peer_credits -= 1
if self.peer_credits <= self.peer_credits_threshold:
# The credits fell below the threshold, replenish them to the max
self.send_control_frame(
L2CAP_LE_Flow_Control_Credit(
identifier=self.manager.next_identifier(self.connection),
cid=self.source_cid,
credits=self.peer_max_credits - self.peer_credits,
)
)
self.peer_credits = self.peer_max_credits
# Check if this starts a new SDU
if self.in_sdu is None:
# Start a new SDU
self.in_sdu = pdu
else:
# Continue an SDU
self.in_sdu += pdu
# Check if the SDU is complete
if self.in_sdu_length == 0:
# We don't know the size yet, check if we have received the header to
# compute it
if len(self.in_sdu) >= 2:
self.in_sdu_length = struct.unpack_from('<H', self.in_sdu, 0)[0]
if self.in_sdu_length == 0:
# We'll compute it later
return
if len(self.in_sdu) < 2 + self.in_sdu_length:
# Not complete yet
logger.debug(
f'SDU: {len(self.in_sdu) - 2} of {self.in_sdu_length} bytes received'
)
return
if len(self.in_sdu) != 2 + self.in_sdu_length:
# Overflow
logger.warning(
f'SDU overflow: sdu_length={self.in_sdu_length}, '
f'received {len(self.in_sdu) - 2}'
)
# TODO: we should disconnect
self.in_sdu = None
self.in_sdu_length = 0
return
# Send the SDU to the sink
logger.debug(f'SDU complete: 2+{len(self.in_sdu) - 2} bytes')
self.sink(self.in_sdu[2:]) # pylint: disable=not-callable
# Prepare for a new SDU
self.in_sdu = None
self.in_sdu_length = 0
def on_connection_response(
self, response: L2CAP_LE_Credit_Based_Connection_Response
) -> None:
# Look for a matching pending response result
if self.connection_result is None:
logger.warning(
f'received unexpected connection response (id={response.identifier})'
)
return
if (
response.result
== L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_SUCCESSFUL
):
self.destination_cid = response.destination_cid
self.peer_mtu = response.mtu
self.peer_mps = response.mps
self.credits = response.initial_credits
self.connected = True
self.connection_result.set_result(self)
self._change_state(self.State.CONNECTED)
else:
self.connection_result.set_exception(
ProtocolError(
response.result,
'l2cap',
L2CAP_LE_Credit_Based_Connection_Response.Result(
response.result
).name,
)
)
self._change_state(self.State.CONNECTION_ERROR)
# Cleanup
self.connection_result = None
def on_credits(self, credits: int) -> None: # pylint: disable=redefined-builtin
self.credits += credits
logger.debug(f'received {credits} credits, total = {self.credits}')
# Try to send more data if we have any queued up
self.process_output()
def on_disconnection_request(self, request: L2CAP_Disconnection_Request) -> None:
self.send_control_frame(
L2CAP_Disconnection_Response(
identifier=request.identifier,
destination_cid=request.destination_cid,
source_cid=request.source_cid,
)
)
self._change_state(self.State.DISCONNECTED)
self.flush_output()
def on_disconnection_response(self, response: L2CAP_Disconnection_Response) -> None:
if self.state != self.State.DISCONNECTING:
logger.warning(color('invalid state', 'red'))
return
if (
response.destination_cid != self.destination_cid
or response.source_cid != self.source_cid
):
logger.warning('unexpected source or destination CID')
return
self._change_state(self.State.DISCONNECTED)
if self.disconnection_result:
self.disconnection_result.set_result(None)
self.disconnection_result = None
def flush_output(self) -> None:
self.out_queue.clear()
self.out_sdu = None
def process_output(self) -> None:
while self.credits > 0:
if self.out_sdu is not None:
# Finish the current SDU
packet = self.out_sdu[: self.peer_mps]
self.send_pdu(packet)
self.credits -= 1
logger.debug(f'sent {len(packet)} bytes, {self.credits} credits left')
if len(packet) == len(self.out_sdu):
# We sent everything
self.out_sdu = None
else:
# Keep what's still left to send
self.out_sdu = self.out_sdu[len(packet) :]
continue
if self.out_queue:
# Create the next SDU (2 bytes header plus up to MTU bytes payload)
logger.debug(
f'assembling SDU from {len(self.out_queue)} packets in output queue'
)
payload = b''
while self.out_queue and len(payload) < self.peer_mtu:
# We can add more data to the payload
chunk = self.out_queue[0][: self.peer_mtu - len(payload)]
payload += chunk
self.out_queue[0] = self.out_queue[0][len(chunk) :]
if len(self.out_queue[0]) == 0:
# We consumed the entire buffer, remove it
self.out_queue.popleft()
logger.debug(
f'packet completed, {len(self.out_queue)} left in queue'
)
# Construct the SDU with its header
assert len(payload) != 0
logger.debug(f'SDU complete: {len(payload)} payload bytes')
self.out_sdu = struct.pack('<H', len(payload)) + payload
else:
# Nothing left to send for now
self.drained.set()
return
def write(self, data: bytes) -> None:
if self.state != self.State.CONNECTED:
logger.warning('not connected, dropping data')
return
# Queue the data
self.out_queue.append(data)
self.drained.clear()
logger.debug(
f'{len(data)} bytes packet queued, {len(self.out_queue)} packets in queue'
)
# Send what we can
self.process_output()
async def drain(self) -> None:
await self.drained.wait()
def pause_reading(self) -> None:
# TODO: not implemented yet
pass
def resume_reading(self) -> None:
# TODO: not implemented yet
pass
def __str__(self) -> str:
return (
f'CoC({self.source_cid}->{self.destination_cid}, '
f'State={self.state.name}, '
f'PSM={self.le_psm}, '
f'MTU={self.mtu}/{self.peer_mtu}, '
f'MPS={self.mps}/{self.peer_mps}, '
f'credits={self.credits}/{self.peer_credits})'
)
# -----------------------------------------------------------------------------
class ClassicChannelServer(utils.EventEmitter):
EVENT_CONNECTION = "connection"
def __init__(
self,
manager: ChannelManager,
psm: int,
handler: Optional[Callable[[ClassicChannel], Any]],
mtu: int,
) -> None:
super().__init__()
self.manager = manager
self.handler = handler
self.psm = psm
self.mtu = mtu
def on_connection(self, channel: ClassicChannel) -> None:
self.emit(self.EVENT_CONNECTION, channel)
if self.handler:
self.handler(channel)
def close(self) -> None:
if self.psm in self.manager.servers:
del self.manager.servers[self.psm]
# -----------------------------------------------------------------------------
class LeCreditBasedChannelServer(utils.EventEmitter):
EVENT_CONNECTION = "connection"
def __init__(
self,
manager: ChannelManager,
psm: int,
handler: Optional[Callable[[LeCreditBasedChannel], Any]],
max_credits: int,
mtu: int,
mps: int,
) -> None:
super().__init__()
self.manager = manager
self.handler = handler
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
def on_connection(self, channel: LeCreditBasedChannel) -> None:
self.emit(self.EVENT_CONNECTION, channel)
if self.handler:
self.handler(channel)
def close(self) -> None:
if self.psm in self.manager.le_coc_servers:
del self.manager.le_coc_servers[self.psm]
# -----------------------------------------------------------------------------
class ChannelManager:
identifiers: dict[int, int]
channels: dict[int, dict[int, Union[ClassicChannel, LeCreditBasedChannel]]]
servers: dict[int, ClassicChannelServer]
le_coc_channels: dict[int, dict[int, LeCreditBasedChannel]]
le_coc_servers: dict[int, LeCreditBasedChannelServer]
le_coc_requests: dict[int, L2CAP_LE_Credit_Based_Connection_Request]
fixed_channels: dict[int, Optional[Callable[[int, bytes], Any]]]
_host: Optional[Host]
connection_parameters_update_response: Optional[asyncio.Future[int]]
def __init__(
self,
extended_features: Iterable[int] = (),
connectionless_mtu: int = L2CAP_DEFAULT_CONNECTIONLESS_MTU,
) -> None:
self._host = None
self.identifiers = {} # Incrementing identifier values by connection
self.channels = {} # All channels, mapped by connection and source cid
self.fixed_channels = { # Fixed channel handlers, mapped by cid
L2CAP_SIGNALING_CID: None,
L2CAP_LE_SIGNALING_CID: None,
}
self.servers = {} # Servers accepting connections, by PSM
self.le_coc_channels = (
{}
) # LE CoC channels, mapped by connection and destination cid
self.le_coc_servers = {} # LE CoC - Servers accepting connections, by PSM
self.le_coc_requests = {} # LE CoC connection requests, by identifier
self.extended_features = extended_features
self.connectionless_mtu = connectionless_mtu
self.connection_parameters_update_response = None
@property
def host(self) -> Host:
assert self._host
return self._host
@host.setter
def host(self, host: Host) -> None:
if self._host is not None:
self._host.remove_listener('disconnection', self.on_disconnection)
self._host = host
if host is not None:
host.on('disconnection', self.on_disconnection)
def find_channel(self, connection_handle: int, cid: int):
if connection_channels := self.channels.get(connection_handle):
return connection_channels.get(cid)
return None
def find_le_coc_channel(self, connection_handle: int, cid: int):
if connection_channels := self.le_coc_channels.get(connection_handle):
return connection_channels.get(cid)
return None
@staticmethod
def find_free_br_edr_cid(channels: Iterable[int]) -> int:
# Pick the smallest valid CID that's not already in the list
# (not necessarily the most efficient algorithm, but the list of CID is
# very small in practice)
for cid in range(
L2CAP_ACL_U_DYNAMIC_CID_RANGE_START, L2CAP_ACL_U_DYNAMIC_CID_RANGE_END + 1
):
if cid not in channels:
return cid
raise OutOfResourcesError('no free CID available')
@staticmethod
def find_free_le_cid(channels: Iterable[int]) -> int:
# Pick the smallest valid CID that's not already in the list
# (not necessarily the most efficient algorithm, but the list of CID is
# very small in practice)
for cid in range(
L2CAP_LE_U_DYNAMIC_CID_RANGE_START, L2CAP_LE_U_DYNAMIC_CID_RANGE_END + 1
):
if cid not in channels:
return cid
raise OutOfResourcesError('no free CID')
def next_identifier(self, connection: Connection) -> int:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
# 0x00 is an invalid ID (BT Core Spec, Vol 3, Part A, Sect 4
if identifier == 0:
identifier = 1
self.identifiers[connection.handle] = identifier
return identifier
def register_fixed_channel(
self, cid: int, handler: Callable[[int, bytes], Any]
) -> None:
self.fixed_channels[cid] = handler
def deregister_fixed_channel(self, cid: int) -> None:
if cid in self.fixed_channels:
del self.fixed_channels[cid]
def create_classic_server(
self,
spec: ClassicChannelSpec,
handler: Optional[Callable[[ClassicChannel], Any]] = None,
) -> ClassicChannelServer:
if not spec.psm:
# Find a free PSM
for candidate in range(
L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2
):
if (candidate >> 8) % 2 == 1:
continue
if candidate in self.servers:
continue
spec.psm = candidate
break
else:
raise InvalidStateError('no free PSM')
else:
# Check that the PSM isn't already in use
if spec.psm in self.servers:
raise InvalidArgumentError('PSM already in use')
# Check that the PSM is valid
if spec.psm % 2 == 0:
raise InvalidArgumentError('invalid PSM (not odd)')
check = spec.psm >> 8
while check:
if check % 2 != 0:
raise InvalidArgumentError('invalid PSM')
check >>= 8
self.servers[spec.psm] = ClassicChannelServer(self, spec.psm, handler, spec.mtu)
return self.servers[spec.psm]
def create_le_credit_based_server(
self,
spec: LeCreditBasedChannelSpec,
handler: Optional[Callable[[LeCreditBasedChannel], Any]] = None,
) -> LeCreditBasedChannelServer:
if not spec.psm:
# Find a free PSM
for candidate in range(
L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1
):
if candidate in self.le_coc_servers:
continue
spec.psm = candidate
break
else:
raise InvalidStateError('no free PSM')
else:
# Check that the PSM isn't already in use
if spec.psm in self.le_coc_servers:
raise InvalidArgumentError('PSM already in use')
self.le_coc_servers[spec.psm] = LeCreditBasedChannelServer(
self,
spec.psm,
handler,
max_credits=spec.max_credits,
mtu=spec.mtu,
mps=spec.mps,
)
return self.le_coc_servers[spec.psm]
def on_disconnection(self, connection_handle: int, _reason: int) -> None:
logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
if connection_handle in self.channels:
for _, channel in self.channels[connection_handle].items():
channel.abort()
del self.channels[connection_handle]
if connection_handle in self.le_coc_channels:
for _, channel in self.le_coc_channels[connection_handle].items():
channel.abort()
del self.le_coc_channels[connection_handle]
if connection_handle in self.identifiers:
del self.identifiers[connection_handle]
def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
pdu_bytes = bytes(pdu)
logger.debug(
f'{color(">>> Sending L2CAP PDU", "blue")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}'
)
self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes)
def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
# Parse the L2CAP payload into a Control Frame object
control_frame = L2CAP_Control_Frame.from_bytes(pdu)
self.on_control_frame(connection, cid, control_frame)
elif cid in self.fixed_channels:
handler = self.fixed_channels[cid]
assert handler is not None
handler(connection.handle, pdu)
else:
if (channel := self.find_channel(connection.handle, cid)) is None:
logger.warning(
color(
f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'
)
)
return
channel.on_pdu(pdu)
def send_control_frame(
self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame
) -> None:
logger.debug(
f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
f'{connection.peer_address}:\n{control_frame}'
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame))
def on_control_frame(
self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame
) -> None:
logger.debug(
f'{color("<<< Received L2CAP Signaling Control Frame", "green")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
f'{connection.peer_address}:\n{control_frame}'
)
# Find the handler method
handler_name = f'on_{control_frame.name.lower()}'
handler = getattr(self, handler_name, None)
if handler:
try:
handler(connection, cid, control_frame)
except Exception:
logger.exception(color("!!! Exception in handler:", "red"))
self.send_control_frame(
connection,
cid,
L2CAP_Command_Reject(
identifier=control_frame.identifier,
reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
data=b'',
),
)
raise
else:
logger.error(color('Channel Manager command not handled???', 'red'))
self.send_control_frame(
connection,
cid,
L2CAP_Command_Reject(
identifier=control_frame.identifier,
reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON,
data=b'',
),
)
def on_l2cap_command_reject(
self, _connection: Connection, _cid: int, packet: L2CAP_Command_Reject
) -> None:
logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
def on_l2cap_connection_request(
self, connection: Connection, cid: int, request: L2CAP_Connection_Request
) -> None:
# Check if there's a server for this PSM
server = self.servers.get(request.psm)
if server:
# Find a free CID for this new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_br_edr_cid(connection_channels)
if source_cid is None: # Should never happen!
self.send_control_frame(
connection,
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
# pylint: disable=line-too-long
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
status=0x0000,
),
)
return
# Create a new channel
logger.debug(
f'creating server channel with cid={source_cid} for psm {request.psm}'
)
channel = ClassicChannel(
self, connection, cid, request.psm, source_cid, server.mtu
)
connection_channels[source_cid] = channel
# Notify
server.on_connection(channel)
channel.on_connection_request(request)
else:
logger.warning(
f'No server for connection 0x{connection.handle:04X} '
f'on PSM {request.psm}'
)
self.send_control_frame(
connection,
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
# pylint: disable=line-too-long
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
status=0x0000,
),
)
def on_l2cap_connection_response(
self,
connection: Connection,
cid: int,
response: L2CAP_Connection_Response,
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
) is None:
logger.warning(
color(
f'channel {response.source_cid} not found for '
f'0x{connection.handle:04X}:{cid}',
'red',
)
)
return
channel.on_connection_response(response)
def on_l2cap_configure_request(
self, connection: Connection, cid: int, request: L2CAP_Configure_Request
) -> None:
if (
channel := self.find_channel(connection.handle, request.destination_cid)
) is None:
logger.warning(
color(
f'channel {request.destination_cid} not found for '
f'0x{connection.handle:04X}:{cid}',
'red',
)
)
return
channel.on_configure_request(request)
def on_l2cap_configure_response(
self, connection: Connection, cid: int, response: L2CAP_Configure_Response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
) is None:
logger.warning(
color(
f'channel {response.source_cid} not found for '
f'0x{connection.handle:04X}:{cid}',
'red',
)
)
return
channel.on_configure_response(response)
def on_l2cap_disconnection_request(
self, connection: Connection, cid: int, request: L2CAP_Disconnection_Request
) -> None:
if (
channel := self.find_channel(connection.handle, request.destination_cid)
) is None:
logger.warning(
color(
f'channel {request.destination_cid} not found for '
f'0x{connection.handle:04X}:{cid}',
'red',
)
)
return
channel.on_disconnection_request(request)
def on_l2cap_disconnection_response(
self, connection: Connection, cid: int, response: L2CAP_Disconnection_Response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
) is None:
logger.warning(
color(
f'channel {response.source_cid} not found for '
f'0x{connection.handle:04X}:{cid}',
'red',
)
)
return
channel.on_disconnection_response(response)
def on_l2cap_echo_request(
self, connection: Connection, cid: int, request: L2CAP_Echo_Request
) -> None:
logger.debug(f'<<< Echo request: data={request.data.hex()}')
self.send_control_frame(
connection,
cid,
L2CAP_Echo_Response(identifier=request.identifier, data=request.data),
)
def on_l2cap_echo_response(
self, _connection: Connection, _cid: int, response: L2CAP_Echo_Response
) -> None:
logger.debug(f'<<< Echo response: data={response.data.hex()}')
# TODO notify listeners
def on_l2cap_information_request(
self, connection: Connection, cid: int, request: L2CAP_Information_Request
) -> None:
if request.info_type == L2CAP_Information_Request.InfoType.CONNECTIONLESS_MTU:
result = L2CAP_Information_Response.Result.SUCCESS
data = self.connectionless_mtu.to_bytes(2, 'little')
elif (
request.info_type
== L2CAP_Information_Request.InfoType.EXTENDED_FEATURES_SUPPORTED
):
result = L2CAP_Information_Response.Result.SUCCESS
data = sum(self.extended_features).to_bytes(4, 'little')
elif (
request.info_type
== L2CAP_Information_Request.InfoType.FIXED_CHANNELS_SUPPORTED
):
result = L2CAP_Information_Response.Result.SUCCESS
data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little')
else:
result = L2CAP_Information_Response.Result.NOT_SUPPORTED
data = b''
self.send_control_frame(
connection,
cid,
L2CAP_Information_Response(
identifier=request.identifier,
info_type=request.info_type,
result=result,
data=data,
),
)
def on_l2cap_connection_parameter_update_request(
self,
connection: Connection,
cid: int,
request: L2CAP_Connection_Parameter_Update_Request,
):
if connection.role == hci.Role.CENTRAL:
self.send_control_frame(
connection,
cid,
L2CAP_Connection_Parameter_Update_Response(
identifier=request.identifier,
result=L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT,
),
)
self.host.send_command_sync(
hci.HCI_LE_Connection_Update_Command(
connection_handle=connection.handle,
connection_interval_min=request.interval_min,
connection_interval_max=request.interval_max,
max_latency=request.latency,
supervision_timeout=request.timeout,
min_ce_length=0,
max_ce_length=0,
)
)
else:
self.send_control_frame(
connection,
cid,
L2CAP_Connection_Parameter_Update_Response(
identifier=request.identifier,
result=L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT,
),
)
async def update_connection_parameters(
self,
connection: Connection,
interval_min: int,
interval_max: int,
latency: int,
timeout: int,
) -> int:
# Check that there isn't already a request pending
if self.connection_parameters_update_response:
raise InvalidStateError('request already pending')
self.connection_parameters_update_response = (
asyncio.get_running_loop().create_future()
)
self.send_control_frame(
connection,
L2CAP_LE_SIGNALING_CID,
L2CAP_Connection_Parameter_Update_Request(
identifier=self.next_identifier(connection),
interval_min=interval_min,
interval_max=interval_max,
latency=latency,
timeout=timeout,
),
)
return await self.connection_parameters_update_response
def on_l2cap_connection_parameter_update_response(
self,
connection: Connection,
cid: int,
response: L2CAP_Connection_Parameter_Update_Response,
) -> None:
if self.connection_parameters_update_response:
self.connection_parameters_update_response.set_result(response.result)
self.connection_parameters_update_response = None
else:
logger.warning(
color(
'received l2cap_connection_parameter_update_response without a pending request',
'red',
)
)
def on_l2cap_le_credit_based_connection_request(
self,
connection: Connection,
cid: int,
request: L2CAP_LE_Credit_Based_Connection_Request,
) -> None:
if request.le_psm in self.le_coc_servers:
server = self.le_coc_servers[request.le_psm]
# Check that the CID isn't already used
le_connection_channels = self.le_coc_channels.setdefault(
connection.handle, {}
)
if request.source_cid in le_connection_channels:
logger.warning(f'source CID {request.source_cid} already in use')
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=server.mtu,
mps=server.mps,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED,
),
)
return
# Find a free CID for this new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_le_cid(connection_channels)
if source_cid is None: # Should never happen!
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=server.mtu,
mps=server.mps,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
),
)
return
# Create a new channel
logger.debug(
f'creating LE CoC server channel with cid={source_cid} for psm '
f'{request.le_psm}'
)
channel = LeCreditBasedChannel(
self,
connection,
request.le_psm,
source_cid,
request.source_cid,
server.mtu,
server.mps,
request.initial_credits,
request.mtu,
request.mps,
server.max_credits,
True,
)
connection_channels[source_cid] = channel
le_connection_channels[request.source_cid] = channel
# Respond
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=source_cid,
mtu=server.mtu,
mps=server.mps,
initial_credits=server.max_credits,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_SUCCESSFUL,
),
)
# Notify
server.on_connection(channel)
else:
logger.info(
f'No LE server for connection 0x{connection.handle:04X} '
f'on PSM {request.le_psm}'
)
self.send_control_frame(
connection,
cid,
L2CAP_LE_Credit_Based_Connection_Response(
identifier=request.identifier,
destination_cid=0,
mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
initial_credits=0,
# pylint: disable=line-too-long
result=L2CAP_LE_Credit_Based_Connection_Response.Result.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
),
)
def on_l2cap_le_credit_based_connection_response(
self,
connection: Connection,
_cid: int,
response: L2CAP_LE_Credit_Based_Connection_Response,
) -> None:
# Find the pending request by identifier
request = self.le_coc_requests.get(response.identifier)
if request is None:
logger.warning(color('!!! received response for unknown request', 'red'))
return
del self.le_coc_requests[response.identifier]
# Find the channel for this request
channel = self.find_channel(connection.handle, request.source_cid)
if channel is None:
logger.warning(
color(
'received connection response for an unknown channel '
f'(cid={request.source_cid})',
'red',
)
)
return
# Process the response
channel.on_connection_response(response)
def on_l2cap_le_flow_control_credit(
self, connection: Connection, _cid: int, credit: L2CAP_LE_Flow_Control_Credit
) -> None:
channel = self.find_le_coc_channel(connection.handle, credit.cid)
if channel is None:
logger.warning(f'received credits for an unknown channel (cid={credit.cid}')
return
channel.on_credits(credit.credits)
def on_channel_closed(self, channel: ClassicChannel) -> None:
connection_channels = self.channels.get(channel.connection.handle)
if connection_channels:
if channel.source_cid in connection_channels:
del connection_channels[channel.source_cid]
async def create_le_credit_based_channel(
self,
connection: Connection,
spec: LeCreditBasedChannelSpec,
) -> LeCreditBasedChannel:
# Find a free CID for the new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_le_cid(connection_channels)
if source_cid is None: # Should never happen!
raise OutOfResourcesError('all CIDs already in use')
if spec.psm is None:
raise InvalidArgumentError('PSM cannot be None')
# Create the channel
logger.debug(f'creating coc channel with cid={source_cid} for psm {spec.psm}')
channel = LeCreditBasedChannel(
manager=self,
connection=connection,
le_psm=spec.psm,
source_cid=source_cid,
destination_cid=0,
mtu=spec.mtu,
mps=spec.mps,
credits=0,
peer_mtu=0,
peer_mps=0,
peer_credits=spec.max_credits,
connected=False,
)
connection_channels[source_cid] = channel
# Connect
try:
await channel.connect()
except Exception:
logger.exception('connection failed')
del connection_channels[source_cid]
raise
# Remember the channel by source CID and destination CID
le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {})
le_connection_channels[channel.destination_cid] = channel
return channel
async def create_classic_channel(
self, connection: Connection, spec: ClassicChannelSpec
) -> ClassicChannel:
# NOTE: this implementation hard-codes BR/EDR
# Find a free CID for a new channel
connection_channels = self.channels.setdefault(connection.handle, {})
source_cid = self.find_free_br_edr_cid(connection_channels)
if source_cid is None: # Should never happen!
raise OutOfResourcesError('all CIDs already in use')
if spec.psm is None:
raise InvalidArgumentError('PSM cannot be None')
# Create the channel
logger.debug(
f'creating client channel with cid={source_cid} for psm {spec.psm}'
)
channel = ClassicChannel(
self,
connection,
L2CAP_SIGNALING_CID,
spec.psm,
source_cid,
spec.mtu,
)
connection_channels[source_cid] = channel
# Connect
try:
await channel.connect()
except BaseException as e:
del connection_channels[source_cid]
raise e
return channel