mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
1164 lines
40 KiB
Python
1164 lines
40 KiB
Python
# Copyright 2021-2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import collections
|
|
import dataclasses
|
|
import enum
|
|
import logging
|
|
from collections.abc import Callable
|
|
from typing import TYPE_CHECKING
|
|
|
|
from typing_extensions import Self
|
|
|
|
from bumble import core, l2cap, sdp, utils
|
|
from bumble.colors import color
|
|
from bumble.core import (
|
|
BT_L2CAP_PROTOCOL_ID,
|
|
BT_RFCOMM_PROTOCOL_ID,
|
|
UUID,
|
|
InvalidArgumentError,
|
|
InvalidPacketError,
|
|
InvalidStateError,
|
|
PhysicalTransport,
|
|
ProtocolError,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from bumble.device import Connection, Device
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Logging
|
|
# -----------------------------------------------------------------------------
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Constants
|
|
# -----------------------------------------------------------------------------
|
|
# fmt: off
|
|
|
|
RFCOMM_PSM = 0x0003
|
|
DEFAULT_RX_QUEUE_SIZE = 32
|
|
|
|
class FrameType(enum.IntEnum):
|
|
SABM = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first
|
|
UA = 0x63 # Control field [0,1,1,0,_,0,1,1] LSB-first
|
|
DM = 0x0F # Control field [1,1,1,1,_,0,0,0] LSB-first
|
|
DISC = 0x43 # Control field [0,1,0,_,0,0,1,1] LSB-first
|
|
UIH = 0xEF # Control field [1,1,1,_,1,1,1,1] LSB-first
|
|
UI = 0x03 # Control field [0,0,0,_,0,0,1,1] LSB-first
|
|
|
|
class MccType(enum.IntEnum):
|
|
PN = 0x20
|
|
MSC = 0x38
|
|
|
|
|
|
# FCS CRC
|
|
CRC_TABLE = bytes([
|
|
0X00, 0X91, 0XE3, 0X72, 0X07, 0X96, 0XE4, 0X75,
|
|
0X0E, 0X9F, 0XED, 0X7C, 0X09, 0X98, 0XEA, 0X7B,
|
|
0X1C, 0X8D, 0XFF, 0X6E, 0X1B, 0X8A, 0XF8, 0X69,
|
|
0X12, 0X83, 0XF1, 0X60, 0X15, 0X84, 0XF6, 0X67,
|
|
0X38, 0XA9, 0XDB, 0X4A, 0X3F, 0XAE, 0XDC, 0X4D,
|
|
0X36, 0XA7, 0XD5, 0X44, 0X31, 0XA0, 0XD2, 0X43,
|
|
0X24, 0XB5, 0XC7, 0X56, 0X23, 0XB2, 0XC0, 0X51,
|
|
0X2A, 0XBB, 0XC9, 0X58, 0X2D, 0XBC, 0XCE, 0X5F,
|
|
0X70, 0XE1, 0X93, 0X02, 0X77, 0XE6, 0X94, 0X05,
|
|
0X7E, 0XEF, 0X9D, 0X0C, 0X79, 0XE8, 0X9A, 0X0B,
|
|
0X6C, 0XFD, 0X8F, 0X1E, 0X6B, 0XFA, 0X88, 0X19,
|
|
0X62, 0XF3, 0X81, 0X10, 0X65, 0XF4, 0X86, 0X17,
|
|
0X48, 0XD9, 0XAB, 0X3A, 0X4F, 0XDE, 0XAC, 0X3D,
|
|
0X46, 0XD7, 0XA5, 0X34, 0X41, 0XD0, 0XA2, 0X33,
|
|
0X54, 0XC5, 0XB7, 0X26, 0X53, 0XC2, 0XB0, 0X21,
|
|
0X5A, 0XCB, 0XB9, 0X28, 0X5D, 0XCC, 0XBE, 0X2F,
|
|
0XE0, 0X71, 0X03, 0X92, 0XE7, 0X76, 0X04, 0X95,
|
|
0XEE, 0X7F, 0X0D, 0X9C, 0XE9, 0X78, 0X0A, 0X9B,
|
|
0XFC, 0X6D, 0X1F, 0X8E, 0XFB, 0X6A, 0X18, 0X89,
|
|
0XF2, 0X63, 0X11, 0X80, 0XF5, 0X64, 0X16, 0X87,
|
|
0XD8, 0X49, 0X3B, 0XAA, 0XDF, 0X4E, 0X3C, 0XAD,
|
|
0XD6, 0X47, 0X35, 0XA4, 0XD1, 0X40, 0X32, 0XA3,
|
|
0XC4, 0X55, 0X27, 0XB6, 0XC3, 0X52, 0X20, 0XB1,
|
|
0XCA, 0X5B, 0X29, 0XB8, 0XCD, 0X5C, 0X2E, 0XBF,
|
|
0X90, 0X01, 0X73, 0XE2, 0X97, 0X06, 0X74, 0XE5,
|
|
0X9E, 0X0F, 0X7D, 0XEC, 0X99, 0X08, 0X7A, 0XEB,
|
|
0X8C, 0X1D, 0X6F, 0XFE, 0X8B, 0X1A, 0X68, 0XF9,
|
|
0X82, 0X13, 0X61, 0XF0, 0X85, 0X14, 0X66, 0XF7,
|
|
0XA8, 0X39, 0X4B, 0XDA, 0XAF, 0X3E, 0X4C, 0XDD,
|
|
0XA6, 0X37, 0X45, 0XD4, 0XA1, 0X30, 0X42, 0XD3,
|
|
0XB4, 0X25, 0X57, 0XC6, 0XB3, 0X22, 0X50, 0XC1,
|
|
0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF
|
|
])
|
|
|
|
RFCOMM_DEFAULT_L2CAP_MTU = 2048
|
|
RFCOMM_DEFAULT_INITIAL_CREDITS = 7
|
|
RFCOMM_DEFAULT_MAX_CREDITS = 32
|
|
RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2
|
|
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
|
|
|
|
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
|
|
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
|
|
|
|
# fmt: on
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def make_service_sdp_records(
|
|
service_record_handle: int, channel: int, uuid: UUID | None = None
|
|
) -> list[sdp.ServiceAttribute]:
|
|
"""
|
|
Create SDP records for an RFComm service given a channel number and an
|
|
optional UUID. A Service Class Attribute is included only if the UUID is not None.
|
|
"""
|
|
records = [
|
|
sdp.ServiceAttribute(
|
|
sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
|
sdp.DataElement.unsigned_integer_32(service_record_handle),
|
|
),
|
|
sdp.ServiceAttribute(
|
|
sdp.SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
|
sdp.DataElement.sequence(
|
|
[sdp.DataElement.uuid(sdp.SDP_PUBLIC_BROWSE_ROOT)]
|
|
),
|
|
),
|
|
sdp.ServiceAttribute(
|
|
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
sdp.DataElement.sequence(
|
|
[
|
|
sdp.DataElement.sequence(
|
|
[sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
|
|
),
|
|
sdp.DataElement.sequence(
|
|
[
|
|
sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
|
|
sdp.DataElement.unsigned_integer_8(channel),
|
|
]
|
|
),
|
|
]
|
|
),
|
|
),
|
|
]
|
|
|
|
if uuid:
|
|
records.append(
|
|
sdp.ServiceAttribute(
|
|
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
|
sdp.DataElement.sequence([sdp.DataElement.uuid(uuid)]),
|
|
)
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def find_rfcomm_channels(connection: Connection) -> dict[int, list[UUID]]:
|
|
"""Searches all RFCOMM channels and their associated UUID from SDP service records.
|
|
|
|
Args:
|
|
connection: ACL connection to make SDP search.
|
|
|
|
Returns:
|
|
Dictionary mapping from channel number to service class UUID list.
|
|
"""
|
|
results = {}
|
|
async with sdp.Client(connection) as sdp_client:
|
|
search_result = await sdp_client.search_attributes(
|
|
uuids=[core.BT_RFCOMM_PROTOCOL_ID],
|
|
attribute_ids=[
|
|
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
|
],
|
|
)
|
|
for attribute_lists in search_result:
|
|
service_classes: list[UUID] = []
|
|
channel: int | None = None
|
|
for attribute in attribute_lists:
|
|
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
|
|
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
|
protocol_descriptor_list = attribute.value.value
|
|
channel = protocol_descriptor_list[1].value[1].value
|
|
elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:
|
|
service_class_id_list = attribute.value.value
|
|
service_classes = [
|
|
service_class.value for service_class in service_class_id_list
|
|
]
|
|
if not service_classes or not channel:
|
|
logger.warning(f"Bad result {attribute_lists}.")
|
|
else:
|
|
results[channel] = service_classes
|
|
return results
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def find_rfcomm_channel_with_uuid(
|
|
connection: Connection, uuid: str | UUID
|
|
) -> int | None:
|
|
"""Searches an RFCOMM channel associated with given UUID from service records.
|
|
|
|
Args:
|
|
connection: ACL connection to make SDP search.
|
|
uuid: UUID of service record to search for.
|
|
|
|
Returns:
|
|
RFCOMM channel number if found, otherwise None.
|
|
"""
|
|
if isinstance(uuid, str):
|
|
uuid = UUID(uuid)
|
|
return next(
|
|
(
|
|
channel
|
|
for channel, class_id_list in (
|
|
await find_rfcomm_channels(connection)
|
|
).items()
|
|
if uuid in class_id_list
|
|
),
|
|
None,
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def compute_fcs(buffer: bytes) -> int:
|
|
result = 0xFF
|
|
for byte in buffer:
|
|
result = CRC_TABLE[result ^ byte]
|
|
return 0xFF - result
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class RFCOMM_Frame:
|
|
def __init__(
|
|
self,
|
|
frame_type: FrameType,
|
|
c_r: int,
|
|
dlci: int,
|
|
p_f: int,
|
|
information: bytes = b'',
|
|
with_credits: bool = False,
|
|
) -> None:
|
|
self.type = frame_type
|
|
self.c_r = c_r
|
|
self.dlci = dlci
|
|
self.p_f = p_f
|
|
self.information = information
|
|
length = len(information)
|
|
if with_credits:
|
|
length -= 1
|
|
if length > 0x7F:
|
|
# 2-byte length indicator
|
|
self.length = bytes([(length & 0x7F) << 1, (length >> 7) & 0xFF])
|
|
else:
|
|
# 1-byte length indicator
|
|
self.length = bytes([(length << 1) | 1])
|
|
self.address = (dlci << 2) | (c_r << 1) | 1
|
|
self.control = frame_type | (p_f << 4)
|
|
if frame_type == FrameType.UIH:
|
|
self.fcs = compute_fcs(bytes([self.address, self.control]))
|
|
else:
|
|
self.fcs = compute_fcs(bytes([self.address, self.control]) + self.length)
|
|
|
|
@staticmethod
|
|
def parse_mcc(data) -> tuple[int, bool, bytes]:
|
|
mcc_type = data[0] >> 2
|
|
c_r = bool((data[0] >> 1) & 1)
|
|
length = data[1]
|
|
if data[1] & 1:
|
|
length >>= 1
|
|
value = data[2:]
|
|
else:
|
|
length = (data[3] << 7) & (length >> 1)
|
|
value = data[3 : 3 + length]
|
|
|
|
return (mcc_type, c_r, value)
|
|
|
|
@staticmethod
|
|
def make_mcc(mcc_type: int, c_r: int, data: bytes) -> bytes:
|
|
return (
|
|
bytes([(mcc_type << 2 | c_r << 1 | 1) & 0xFF, (len(data) & 0x7F) << 1 | 1])
|
|
+ data
|
|
)
|
|
|
|
@staticmethod
|
|
def sabm(c_r: int, dlci: int):
|
|
return RFCOMM_Frame(FrameType.SABM, c_r, dlci, 1)
|
|
|
|
@staticmethod
|
|
def ua(c_r: int, dlci: int):
|
|
return RFCOMM_Frame(FrameType.UA, c_r, dlci, 1)
|
|
|
|
@staticmethod
|
|
def dm(c_r: int, dlci: int):
|
|
return RFCOMM_Frame(FrameType.DM, c_r, dlci, 1)
|
|
|
|
@staticmethod
|
|
def disc(c_r: int, dlci: int):
|
|
return RFCOMM_Frame(FrameType.DISC, c_r, dlci, 1)
|
|
|
|
@staticmethod
|
|
def uih(c_r: int, dlci: int, information: bytes, p_f: int = 0):
|
|
return RFCOMM_Frame(
|
|
FrameType.UIH, c_r, dlci, p_f, information, with_credits=(p_f == 1)
|
|
)
|
|
|
|
@staticmethod
|
|
def from_bytes(data: bytes) -> RFCOMM_Frame:
|
|
# Extract fields
|
|
dlci = (data[0] >> 2) & 0x3F
|
|
c_r = (data[0] >> 1) & 0x01
|
|
frame_type = FrameType(data[1] & 0xEF)
|
|
p_f = (data[1] >> 4) & 0x01
|
|
length = data[2]
|
|
if length & 0x01:
|
|
length >>= 1
|
|
information = data[3:-1]
|
|
else:
|
|
length = (data[3] << 7) & (length >> 1)
|
|
information = data[4:-1]
|
|
fcs = data[-1]
|
|
|
|
# Construct the frame and check the CRC
|
|
frame = RFCOMM_Frame(frame_type, c_r, dlci, p_f, information)
|
|
if frame.fcs != fcs:
|
|
logger.warning(f'FCS mismatch: got {fcs:02X}, expected {frame.fcs:02X}')
|
|
raise InvalidPacketError('fcs mismatch')
|
|
|
|
return frame
|
|
|
|
def __bytes__(self) -> bytes:
|
|
return (
|
|
bytes([self.address, self.control])
|
|
+ self.length
|
|
+ self.information
|
|
+ bytes([self.fcs])
|
|
)
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f'{color(self.type.name, "yellow")}'
|
|
f'(c/r={self.c_r},'
|
|
f'dlci={self.dlci},'
|
|
f'p/f={self.p_f},'
|
|
f'length={len(self.information)},'
|
|
f'fcs=0x{self.fcs:02X})'
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@dataclasses.dataclass
|
|
class RFCOMM_MCC_PN:
|
|
dlci: int
|
|
cl: int
|
|
priority: int
|
|
ack_timer: int
|
|
max_frame_size: int
|
|
max_retransmissions: int
|
|
initial_credits: int
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.initial_credits < 1 or self.initial_credits > 7:
|
|
logger.warning(
|
|
f'Initial credits {self.initial_credits} is out of range [1, 7].'
|
|
)
|
|
|
|
@staticmethod
|
|
def from_bytes(data: bytes) -> RFCOMM_MCC_PN:
|
|
return RFCOMM_MCC_PN(
|
|
dlci=data[0],
|
|
cl=data[1],
|
|
priority=data[2],
|
|
ack_timer=data[3],
|
|
max_frame_size=data[4] | data[5] << 8,
|
|
max_retransmissions=data[6],
|
|
initial_credits=data[7] & 0x07,
|
|
)
|
|
|
|
def __bytes__(self) -> bytes:
|
|
return bytes(
|
|
[
|
|
self.dlci & 0xFF,
|
|
self.cl & 0xFF,
|
|
self.priority & 0xFF,
|
|
self.ack_timer & 0xFF,
|
|
self.max_frame_size & 0xFF,
|
|
(self.max_frame_size >> 8) & 0xFF,
|
|
self.max_retransmissions & 0xFF,
|
|
# Only 3 bits are meaningful.
|
|
self.initial_credits & 0x07,
|
|
]
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@dataclasses.dataclass
|
|
class RFCOMM_MCC_MSC:
|
|
dlci: int
|
|
fc: int
|
|
rtc: int
|
|
rtr: int
|
|
ic: int
|
|
dv: int
|
|
|
|
@staticmethod
|
|
def from_bytes(data: bytes) -> RFCOMM_MCC_MSC:
|
|
return RFCOMM_MCC_MSC(
|
|
dlci=data[0] >> 2,
|
|
fc=data[1] >> 1 & 1,
|
|
rtc=data[1] >> 2 & 1,
|
|
rtr=data[1] >> 3 & 1,
|
|
ic=data[1] >> 6 & 1,
|
|
dv=data[1] >> 7 & 1,
|
|
)
|
|
|
|
def __bytes__(self) -> bytes:
|
|
return bytes(
|
|
[
|
|
(self.dlci << 2) | 3,
|
|
1
|
|
| self.fc << 1
|
|
| self.rtc << 2
|
|
| self.rtr << 3
|
|
| self.ic << 6
|
|
| self.dv << 7,
|
|
]
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class DLC(utils.EventEmitter):
|
|
EVENT_OPEN = "open"
|
|
EVENT_CLOSE = "close"
|
|
|
|
class State(enum.IntEnum):
|
|
INIT = 0x00
|
|
CONNECTING = 0x01
|
|
CONNECTED = 0x02
|
|
DISCONNECTING = 0x03
|
|
DISCONNECTED = 0x04
|
|
RESET = 0x05
|
|
|
|
def __init__(
|
|
self,
|
|
multiplexer: Multiplexer,
|
|
dlci: int,
|
|
tx_max_frame_size: int,
|
|
tx_initial_credits: int,
|
|
rx_max_frame_size: int,
|
|
rx_initial_credits: int,
|
|
) -> None:
|
|
super().__init__()
|
|
self.multiplexer = multiplexer
|
|
self.dlci = dlci
|
|
self.rx_max_frame_size = rx_max_frame_size
|
|
self.rx_initial_credits = rx_initial_credits
|
|
self.rx_max_credits = RFCOMM_DEFAULT_MAX_CREDITS
|
|
self.rx_credits = rx_initial_credits
|
|
self.rx_credits_threshold = RFCOMM_DEFAULT_CREDIT_THRESHOLD
|
|
self.tx_max_frame_size = tx_max_frame_size
|
|
self.tx_credits = tx_initial_credits
|
|
self.tx_buffer = b''
|
|
self.state = DLC.State.INIT
|
|
self.role = multiplexer.role
|
|
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
|
|
self.connection_result: asyncio.Future | None = None
|
|
self.disconnection_result: asyncio.Future | None = None
|
|
self.drained = asyncio.Event()
|
|
self.drained.set()
|
|
# Queued packets when sink is not set.
|
|
self._enqueued_rx_packets: collections.deque[bytes] = collections.deque(
|
|
maxlen=DEFAULT_RX_QUEUE_SIZE
|
|
)
|
|
self._sink: Callable[[bytes], None] | None = None
|
|
|
|
# Compute the MTU
|
|
max_overhead = 4 + 1 # header with 2-byte length + fcs
|
|
self.mtu = min(
|
|
tx_max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead
|
|
)
|
|
|
|
@property
|
|
def sink(self) -> Callable[[bytes], None] | None:
|
|
return self._sink
|
|
|
|
@sink.setter
|
|
def sink(self, sink: Callable[[bytes], None] | None) -> None:
|
|
self._sink = sink
|
|
# Dump queued packets to sink
|
|
if sink:
|
|
for packet in self._enqueued_rx_packets:
|
|
sink(packet) # pylint: disable=not-callable
|
|
self._enqueued_rx_packets.clear()
|
|
|
|
def change_state(self, new_state: State) -> None:
|
|
logger.debug(f'{self} state change -> {color(new_state.name, "magenta")}')
|
|
self.state = new_state
|
|
|
|
def send_frame(self, frame: RFCOMM_Frame) -> None:
|
|
self.multiplexer.send_frame(frame)
|
|
|
|
def on_frame(self, frame: RFCOMM_Frame) -> None:
|
|
handler = getattr(self, f'on_{frame.type.name}_frame'.lower())
|
|
handler(frame)
|
|
|
|
def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
|
|
if self.state != DLC.State.CONNECTING:
|
|
logger.warning(
|
|
color('!!! received SABM when not in CONNECTING state', 'red')
|
|
)
|
|
return
|
|
|
|
self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci))
|
|
|
|
# Exchange the modem status with the peer
|
|
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
|
|
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
|
|
logger.debug(f'>>> MCC MSC Command: {msc}')
|
|
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
|
|
|
|
self.change_state(DLC.State.CONNECTED)
|
|
self.emit(self.EVENT_OPEN)
|
|
|
|
def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
|
|
if self.state == DLC.State.CONNECTING:
|
|
# Exchange the modem status with the peer
|
|
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
|
|
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
|
|
logger.debug(f'>>> MCC MSC Command: {msc}')
|
|
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
|
|
|
|
self.change_state(DLC.State.CONNECTED)
|
|
if self.connection_result:
|
|
self.connection_result.set_result(None)
|
|
self.connection_result = None
|
|
self.multiplexer.on_dlc_open_complete(self)
|
|
elif self.state == DLC.State.DISCONNECTING:
|
|
self.change_state(DLC.State.DISCONNECTED)
|
|
if self.disconnection_result:
|
|
self.disconnection_result.set_result(None)
|
|
self.disconnection_result = None
|
|
self.multiplexer.on_dlc_disconnection(self)
|
|
self.emit(self.EVENT_CLOSE)
|
|
else:
|
|
logger.warning(
|
|
color(
|
|
(
|
|
'!!! received UA frame when not in '
|
|
'CONNECTING or DISCONNECTING state'
|
|
),
|
|
'red',
|
|
)
|
|
)
|
|
|
|
def on_dm_frame(self, frame: RFCOMM_Frame) -> None:
|
|
# TODO: handle all states
|
|
pass
|
|
|
|
def on_disc_frame(self, _frame: RFCOMM_Frame) -> None:
|
|
# TODO: handle all states
|
|
self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci))
|
|
|
|
def on_uih_frame(self, frame: RFCOMM_Frame) -> None:
|
|
data = frame.information
|
|
if frame.p_f == 1:
|
|
# With credits
|
|
received_credits = frame.information[0]
|
|
self.tx_credits += received_credits
|
|
|
|
logger.debug(
|
|
f'<<< Credits [{self.dlci}]: '
|
|
f'received {received_credits}, total={self.tx_credits}'
|
|
)
|
|
data = data[1:]
|
|
|
|
logger.debug(
|
|
f'{color("<<< Data", "yellow")} '
|
|
f'[{self.dlci}] {len(data)} bytes, '
|
|
f'rx_credits={self.rx_credits}: {data.hex()}'
|
|
)
|
|
if data:
|
|
if self._sink:
|
|
self._sink(data) # pylint: disable=not-callable
|
|
else:
|
|
self._enqueued_rx_packets.append(data)
|
|
if (
|
|
self._enqueued_rx_packets.maxlen
|
|
and len(self._enqueued_rx_packets) >= self._enqueued_rx_packets.maxlen
|
|
):
|
|
logger.warning(f'DLC [{self.dlci}] received packet queue is full')
|
|
|
|
# Update the credits
|
|
if self.rx_credits > 0:
|
|
self.rx_credits -= 1
|
|
else:
|
|
logger.warning(color('!!! received frame with no rx credits', 'red'))
|
|
|
|
# Check if there's anything to send (including credits)
|
|
self.process_tx()
|
|
|
|
def on_ui_frame(self, frame: RFCOMM_Frame) -> None:
|
|
pass
|
|
|
|
def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None:
|
|
if c_r:
|
|
# Command
|
|
logger.debug(f'<<< MCC MSC Command: {msc}')
|
|
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
|
|
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=0, data=bytes(msc))
|
|
logger.debug(f'>>> MCC MSC Response: {msc}')
|
|
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
|
|
else:
|
|
# Response
|
|
logger.debug(f'<<< MCC MSC Response: {msc}')
|
|
|
|
def connect(self) -> None:
|
|
if self.state != DLC.State.INIT:
|
|
raise InvalidStateError('invalid state')
|
|
|
|
self.change_state(DLC.State.CONNECTING)
|
|
self.connection_result = asyncio.get_running_loop().create_future()
|
|
self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci))
|
|
|
|
async def disconnect(self) -> None:
|
|
if self.state != DLC.State.CONNECTED:
|
|
raise InvalidStateError('invalid state')
|
|
|
|
self.disconnection_result = asyncio.get_running_loop().create_future()
|
|
self.change_state(DLC.State.DISCONNECTING)
|
|
self.send_frame(
|
|
RFCOMM_Frame.disc(
|
|
c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=self.dlci
|
|
)
|
|
)
|
|
await self.disconnection_result
|
|
|
|
def accept(self) -> None:
|
|
if self.state != DLC.State.INIT:
|
|
raise InvalidStateError('invalid state')
|
|
|
|
pn = RFCOMM_MCC_PN(
|
|
dlci=self.dlci,
|
|
cl=0xE0,
|
|
priority=7,
|
|
ack_timer=0,
|
|
max_frame_size=self.rx_max_frame_size,
|
|
max_retransmissions=0,
|
|
initial_credits=self.rx_initial_credits,
|
|
)
|
|
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn))
|
|
logger.debug(f'>>> PN Response: {pn}')
|
|
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
|
|
self.change_state(DLC.State.CONNECTING)
|
|
|
|
def rx_credits_needed(self) -> int:
|
|
if self.rx_credits <= self.rx_credits_threshold:
|
|
return self.rx_max_credits - self.rx_credits
|
|
|
|
return 0
|
|
|
|
def process_tx(self) -> None:
|
|
# Send anything we can (or an empty frame if we need to send rx credits)
|
|
rx_credits_needed = self.rx_credits_needed()
|
|
while (self.tx_buffer and self.tx_credits > 0) or rx_credits_needed > 0:
|
|
# Get the next chunk, up to MTU size
|
|
if rx_credits_needed > 0:
|
|
chunk = bytes([rx_credits_needed])
|
|
self.rx_credits += rx_credits_needed
|
|
if self.tx_buffer and self.tx_credits > 0:
|
|
chunk += self.tx_buffer[: self.mtu - 1]
|
|
self.tx_buffer = self.tx_buffer[len(chunk) - 1 :]
|
|
tx_credit_spent = True
|
|
else:
|
|
tx_credit_spent = False
|
|
else:
|
|
chunk = self.tx_buffer[: self.mtu]
|
|
self.tx_buffer = self.tx_buffer[len(chunk) :]
|
|
tx_credit_spent = True
|
|
|
|
# Update the tx credits
|
|
# (no tx credit spent for empty frames that only contain rx credits)
|
|
if tx_credit_spent:
|
|
self.tx_credits -= 1
|
|
|
|
# Send the frame
|
|
logger.debug(
|
|
f'>>> sending {len(chunk)} bytes with {rx_credits_needed} credits, '
|
|
f'rx_credits={self.rx_credits}, '
|
|
f'tx_credits={self.tx_credits}'
|
|
)
|
|
self.send_frame(
|
|
RFCOMM_Frame.uih(
|
|
c_r=self.c_r,
|
|
dlci=self.dlci,
|
|
information=chunk,
|
|
p_f=1 if rx_credits_needed > 0 else 0,
|
|
)
|
|
)
|
|
|
|
rx_credits_needed = 0
|
|
if not self.tx_buffer:
|
|
self.drained.set()
|
|
|
|
# Stream protocol
|
|
def write(self, data: bytes | str) -> None:
|
|
# We can only send bytes
|
|
if not isinstance(data, bytes):
|
|
if isinstance(data, str):
|
|
# Automatically convert strings to bytes using UTF-8
|
|
data = data.encode('utf-8')
|
|
else:
|
|
raise InvalidArgumentError('write only accept bytes or strings')
|
|
|
|
self.tx_buffer += data
|
|
self.drained.clear()
|
|
self.process_tx()
|
|
|
|
async def drain(self) -> None:
|
|
await self.drained.wait()
|
|
|
|
def abort(self) -> None:
|
|
logger.debug(f'aborting DLC: {self}')
|
|
if self.connection_result:
|
|
self.connection_result.cancel()
|
|
self.connection_result = None
|
|
if self.disconnection_result:
|
|
self.disconnection_result.cancel()
|
|
self.disconnection_result = None
|
|
self.change_state(DLC.State.RESET)
|
|
self.emit(self.EVENT_CLOSE)
|
|
|
|
def __str__(self) -> str:
|
|
return (
|
|
f'DLC(dlci={self.dlci}, '
|
|
f'state={self.state.name}, '
|
|
f'rx_max_frame_size={self.rx_max_frame_size}, '
|
|
f'rx_credits={self.rx_credits}, '
|
|
f'rx_max_credits={self.rx_max_credits}, '
|
|
f'tx_max_frame_size={self.tx_max_frame_size}, '
|
|
f'tx_credits={self.tx_credits}'
|
|
')'
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class Multiplexer(utils.EventEmitter):
|
|
class Role(enum.IntEnum):
|
|
INITIATOR = 0x00
|
|
RESPONDER = 0x01
|
|
|
|
class State(enum.IntEnum):
|
|
INIT = 0x00
|
|
CONNECTING = 0x01
|
|
CONNECTED = 0x02
|
|
OPENING = 0x03
|
|
DISCONNECTING = 0x04
|
|
DISCONNECTED = 0x05
|
|
RESET = 0x06
|
|
|
|
EVENT_DLC = "dlc"
|
|
|
|
connection_result: asyncio.Future | None
|
|
disconnection_result: asyncio.Future | None
|
|
open_result: asyncio.Future | None
|
|
acceptor: Callable[[int], tuple[int, int] | None] | None
|
|
dlcs: dict[int, DLC]
|
|
|
|
def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None:
|
|
super().__init__()
|
|
self.role = role
|
|
self.l2cap_channel = l2cap_channel
|
|
self.state = Multiplexer.State.INIT
|
|
self.dlcs = {} # DLCs, by DLCI
|
|
self.connection_result = None
|
|
self.disconnection_result = None
|
|
self.open_result = None
|
|
self.open_pn: RFCOMM_MCC_PN | None = None
|
|
self.open_rx_max_credits = 0
|
|
self.acceptor = None
|
|
|
|
# Become a sink for the L2CAP channel
|
|
l2cap_channel.sink = self.on_pdu
|
|
|
|
l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close)
|
|
|
|
def change_state(self, new_state: State) -> None:
|
|
logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
|
|
self.state = new_state
|
|
|
|
def send_frame(self, frame: RFCOMM_Frame) -> None:
|
|
logger.debug(f'>>> Multiplexer sending {frame}')
|
|
self.l2cap_channel.write(bytes(frame))
|
|
|
|
def on_pdu(self, pdu: bytes) -> None:
|
|
frame = RFCOMM_Frame.from_bytes(pdu)
|
|
logger.debug(f'<<< Multiplexer received {frame}')
|
|
|
|
# Dispatch to this multiplexer or to a dlc, depending on the address
|
|
if frame.dlci == 0:
|
|
self.on_frame(frame)
|
|
else:
|
|
if frame.type == FrameType.DM:
|
|
# DM responses are for a DLCI, but since we only create the dlc when we
|
|
# receive a PN response (because we need the parameters), we handle DM
|
|
# frames at the Multiplexer level
|
|
self.on_dm_frame(frame)
|
|
else:
|
|
dlc = self.dlcs.get(frame.dlci)
|
|
if dlc is None:
|
|
logger.warning(f'no dlc for DLCI {frame.dlci}')
|
|
return
|
|
dlc.on_frame(frame)
|
|
|
|
def on_frame(self, frame: RFCOMM_Frame) -> None:
|
|
handler = getattr(self, f'on_{frame.type.name}_frame'.lower())
|
|
handler(frame)
|
|
|
|
def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
|
|
if self.state != Multiplexer.State.INIT:
|
|
logger.debug('not in INIT state, ignoring SABM')
|
|
return
|
|
self.change_state(Multiplexer.State.CONNECTED)
|
|
self.send_frame(RFCOMM_Frame.ua(c_r=1, dlci=0))
|
|
|
|
def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
|
|
if self.state == Multiplexer.State.CONNECTING:
|
|
self.change_state(Multiplexer.State.CONNECTED)
|
|
if self.connection_result:
|
|
self.connection_result.set_result(0)
|
|
self.connection_result = None
|
|
elif self.state == Multiplexer.State.DISCONNECTING:
|
|
self.change_state(Multiplexer.State.DISCONNECTED)
|
|
if self.disconnection_result:
|
|
self.disconnection_result.set_result(None)
|
|
self.disconnection_result = None
|
|
|
|
def on_dm_frame(self, _frame: RFCOMM_Frame) -> None:
|
|
if self.state == Multiplexer.State.OPENING:
|
|
self.change_state(Multiplexer.State.CONNECTED)
|
|
if self.open_result:
|
|
self.open_result.set_exception(
|
|
core.ConnectionError(
|
|
core.ConnectionError.CONNECTION_REFUSED,
|
|
PhysicalTransport.BR_EDR,
|
|
self.l2cap_channel.connection.peer_address,
|
|
'rfcomm',
|
|
)
|
|
)
|
|
self.open_result = None
|
|
else:
|
|
logger.warning(f'unexpected state for DM: {self}')
|
|
|
|
def on_disc_frame(self, _frame: RFCOMM_Frame) -> None:
|
|
self.change_state(Multiplexer.State.DISCONNECTED)
|
|
self.send_frame(
|
|
RFCOMM_Frame.ua(
|
|
c_r=0 if self.role == Multiplexer.Role.INITIATOR else 1, dlci=0
|
|
)
|
|
)
|
|
|
|
def on_uih_frame(self, frame: RFCOMM_Frame) -> None:
|
|
(mcc_type, c_r, value) = RFCOMM_Frame.parse_mcc(frame.information)
|
|
|
|
if mcc_type == MccType.PN:
|
|
pn = RFCOMM_MCC_PN.from_bytes(value)
|
|
self.on_mcc_pn(c_r, pn)
|
|
elif mcc_type == MccType.MSC:
|
|
mcs = RFCOMM_MCC_MSC.from_bytes(value)
|
|
self.on_mcc_msc(c_r, mcs)
|
|
|
|
def on_ui_frame(self, frame: RFCOMM_Frame) -> None:
|
|
pass
|
|
|
|
def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None:
|
|
if c_r:
|
|
# Command
|
|
logger.debug(f'<<< PN Command: {pn}')
|
|
|
|
# Check with the multiplexer if there's an acceptor for this channel
|
|
if pn.dlci & 1:
|
|
# Not expected, this is an initiator-side number
|
|
# TODO: error out
|
|
logger.warning(f'invalid DLCI: {pn.dlci}')
|
|
else:
|
|
if self.acceptor:
|
|
channel_number = pn.dlci >> 1
|
|
if dlc_params := self.acceptor(channel_number):
|
|
# Create a new DLC
|
|
dlc = DLC(
|
|
self,
|
|
dlci=pn.dlci,
|
|
tx_max_frame_size=pn.max_frame_size,
|
|
tx_initial_credits=pn.initial_credits,
|
|
rx_max_frame_size=dlc_params[0],
|
|
rx_initial_credits=dlc_params[1],
|
|
)
|
|
self.dlcs[pn.dlci] = dlc
|
|
|
|
# Re-emit the handshake completion event
|
|
dlc.on(dlc.EVENT_OPEN, lambda: self.emit(self.EVENT_DLC, dlc))
|
|
|
|
# Respond to complete the handshake
|
|
dlc.accept()
|
|
else:
|
|
# No acceptor, we're in Disconnected Mode
|
|
self.send_frame(RFCOMM_Frame.dm(c_r=1, dlci=pn.dlci))
|
|
else:
|
|
# No acceptor?? shouldn't happen
|
|
logger.warning(color('!!! no acceptor registered', 'red'))
|
|
else:
|
|
# Response
|
|
logger.debug(f'>>> PN Response: {pn}')
|
|
if self.state == Multiplexer.State.OPENING:
|
|
assert self.open_pn
|
|
dlc = DLC(
|
|
self,
|
|
dlci=pn.dlci,
|
|
tx_max_frame_size=pn.max_frame_size,
|
|
tx_initial_credits=pn.initial_credits,
|
|
rx_max_frame_size=self.open_pn.max_frame_size,
|
|
rx_initial_credits=self.open_pn.initial_credits,
|
|
)
|
|
self.dlcs[pn.dlci] = dlc
|
|
self.open_pn = None
|
|
dlc.connect()
|
|
else:
|
|
logger.warning('ignoring PN response')
|
|
|
|
def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None:
|
|
dlc = self.dlcs.get(msc.dlci)
|
|
if dlc is None:
|
|
logger.warning(f'no dlc for DLCI {msc.dlci}')
|
|
return
|
|
dlc.on_mcc_msc(c_r, msc)
|
|
|
|
async def connect(self) -> None:
|
|
if self.state != Multiplexer.State.INIT:
|
|
raise InvalidStateError('invalid state')
|
|
|
|
self.change_state(Multiplexer.State.CONNECTING)
|
|
self.connection_result = asyncio.get_running_loop().create_future()
|
|
self.send_frame(RFCOMM_Frame.sabm(c_r=1, dlci=0))
|
|
return await self.connection_result
|
|
|
|
async def disconnect(self) -> None:
|
|
if self.state != Multiplexer.State.CONNECTED:
|
|
return
|
|
|
|
self.disconnection_result = asyncio.get_running_loop().create_future()
|
|
self.change_state(Multiplexer.State.DISCONNECTING)
|
|
self.send_frame(
|
|
RFCOMM_Frame.disc(
|
|
c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=0
|
|
)
|
|
)
|
|
await self.disconnection_result
|
|
|
|
async def open_dlc(
|
|
self,
|
|
channel: int,
|
|
max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE,
|
|
initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS,
|
|
) -> DLC:
|
|
if self.state != Multiplexer.State.CONNECTED:
|
|
if self.state == Multiplexer.State.OPENING:
|
|
raise InvalidStateError('open already in progress')
|
|
|
|
raise InvalidStateError('not connected')
|
|
|
|
self.open_pn = RFCOMM_MCC_PN(
|
|
dlci=channel << 1,
|
|
cl=0xF0,
|
|
priority=7,
|
|
ack_timer=0,
|
|
max_frame_size=max_frame_size,
|
|
max_retransmissions=0,
|
|
initial_credits=initial_credits,
|
|
)
|
|
mcc = RFCOMM_Frame.make_mcc(
|
|
mcc_type=MccType.PN, c_r=1, data=bytes(self.open_pn)
|
|
)
|
|
logger.debug(f'>>> Sending MCC: {self.open_pn}')
|
|
self.open_result = asyncio.get_running_loop().create_future()
|
|
self.change_state(Multiplexer.State.OPENING)
|
|
self.send_frame(
|
|
RFCOMM_Frame.uih(
|
|
c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0,
|
|
dlci=0,
|
|
information=mcc,
|
|
)
|
|
)
|
|
return await self.open_result
|
|
|
|
def on_dlc_open_complete(self, dlc: DLC) -> None:
|
|
logger.debug(f'DLC [{dlc.dlci}] open complete')
|
|
|
|
self.change_state(Multiplexer.State.CONNECTED)
|
|
|
|
if self.open_result:
|
|
self.open_result.set_result(dlc)
|
|
self.open_result = None
|
|
|
|
def on_dlc_disconnection(self, dlc: DLC) -> None:
|
|
logger.debug(f'DLC [{dlc.dlci}] disconnection')
|
|
self.dlcs.pop(dlc.dlci, None)
|
|
|
|
def on_l2cap_channel_close(self) -> None:
|
|
logger.debug('L2CAP channel closed, cleaning up')
|
|
if self.open_result:
|
|
self.open_result.cancel()
|
|
self.open_result = None
|
|
if self.disconnection_result:
|
|
self.disconnection_result.cancel()
|
|
self.disconnection_result = None
|
|
for dlc in self.dlcs.values():
|
|
dlc.abort()
|
|
|
|
def __str__(self) -> str:
|
|
return f'Multiplexer(state={self.state.name})'
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class Client:
|
|
multiplexer: Multiplexer | None
|
|
l2cap_channel: l2cap.ClassicChannel | None
|
|
|
|
def __init__(
|
|
self, connection: Connection, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
|
|
) -> None:
|
|
self.connection = connection
|
|
self.l2cap_mtu = l2cap_mtu
|
|
self.l2cap_channel = None
|
|
self.multiplexer = None
|
|
|
|
async def start(self) -> Multiplexer:
|
|
# Create a new L2CAP connection
|
|
try:
|
|
self.l2cap_channel = await self.connection.create_l2cap_channel(
|
|
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=self.l2cap_mtu)
|
|
)
|
|
except ProtocolError:
|
|
logger.exception('L2CAP connection failed')
|
|
raise
|
|
|
|
assert self.l2cap_channel is not None
|
|
# Create a multiplexer to manage DLCs with the server
|
|
self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.Role.INITIATOR)
|
|
|
|
# Connect the multiplexer
|
|
await self.multiplexer.connect()
|
|
|
|
return self.multiplexer
|
|
|
|
async def shutdown(self) -> None:
|
|
if self.multiplexer is None:
|
|
return
|
|
# Disconnect the multiplexer
|
|
await self.multiplexer.disconnect()
|
|
self.multiplexer = None
|
|
|
|
# Close the L2CAP channel
|
|
if self.l2cap_channel:
|
|
await self.l2cap_channel.disconnect()
|
|
self.l2cap_channel = None
|
|
|
|
async def __aenter__(self) -> Multiplexer:
|
|
return await self.start()
|
|
|
|
async def __aexit__(self, *args) -> None:
|
|
await self.shutdown()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class Server(utils.EventEmitter):
|
|
EVENT_START = "start"
|
|
|
|
def __init__(
|
|
self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
|
|
) -> None:
|
|
super().__init__()
|
|
self.device = device
|
|
self.acceptors: dict[int, Callable[[DLC], None]] = {}
|
|
self.dlc_configs: dict[int, tuple[int, int]] = {}
|
|
|
|
# Register ourselves with the L2CAP channel manager
|
|
self.l2cap_server = device.create_l2cap_server(
|
|
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=l2cap_mtu),
|
|
handler=self.on_connection,
|
|
)
|
|
|
|
def listen(
|
|
self,
|
|
acceptor: Callable[[DLC], None],
|
|
channel: int = 0,
|
|
max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE,
|
|
initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS,
|
|
) -> int:
|
|
if channel:
|
|
if channel in self.acceptors:
|
|
# Busy
|
|
return 0
|
|
else:
|
|
# Find a free channel number
|
|
for candidate in range(
|
|
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START,
|
|
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END + 1,
|
|
):
|
|
if candidate not in self.acceptors:
|
|
channel = candidate
|
|
break
|
|
|
|
if channel == 0:
|
|
# All channels used...
|
|
return 0
|
|
|
|
self.acceptors[channel] = acceptor
|
|
self.dlc_configs[channel] = (max_frame_size, initial_credits)
|
|
|
|
return channel
|
|
|
|
def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
|
|
logger.debug(f'+++ new L2CAP connection: {l2cap_channel}')
|
|
l2cap_channel.on(
|
|
l2cap_channel.EVENT_OPEN, lambda: self.on_l2cap_channel_open(l2cap_channel)
|
|
)
|
|
|
|
def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None:
|
|
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
|
|
|
|
# Create a new multiplexer for the channel
|
|
multiplexer = Multiplexer(l2cap_channel, Multiplexer.Role.RESPONDER)
|
|
multiplexer.acceptor = self.accept_dlc
|
|
multiplexer.on(multiplexer.EVENT_DLC, self.on_dlc)
|
|
|
|
# Notify
|
|
self.emit(self.EVENT_START, multiplexer)
|
|
|
|
def accept_dlc(self, channel_number: int) -> tuple[int, int] | None:
|
|
return self.dlc_configs.get(channel_number)
|
|
|
|
def on_dlc(self, dlc: DLC) -> None:
|
|
logger.debug(f'@@@ new DLC connected: {dlc}')
|
|
|
|
# Let the acceptor know
|
|
if acceptor := self.acceptors.get(dlc.dlci >> 1):
|
|
acceptor(dlc)
|
|
|
|
def __enter__(self) -> Self:
|
|
return self
|
|
|
|
def __exit__(self, *args) -> None:
|
|
self.l2cap_server.close()
|