add rfcomm options and fix l2cap mtu negotiation

This commit is contained in:
Gilles Boccon-Gibod
2024-02-02 15:17:27 -08:00
parent 6d91e7e79b
commit a877283360
7 changed files with 99 additions and 22 deletions

View File

@@ -1470,10 +1470,10 @@ class Protocol(EventEmitter):
f'[{transaction_label}] {message}'
)
max_fragment_size = (
self.l2cap_channel.mtu - 3
self.l2cap_channel.peer_mtu - 3
) # Enough space for a 3-byte start packet header
payload = message.payload
if len(payload) + 2 <= self.l2cap_channel.mtu:
if len(payload) + 2 <= self.l2cap_channel.peer_mtu:
# Fits in a single packet
packet_type = self.PacketType.SINGLE_PACKET
else:

View File

@@ -416,7 +416,7 @@ class Device(HID):
data = bytearray()
data.append(report_id)
data.extend(ret.data)
if len(data) < self.l2cap_ctrl_channel.mtu: # type: ignore[union-attr]
if len(data) < self.l2cap_ctrl_channel.peer_mtu: # type: ignore[union-attr]
self.send_control_data(report_type=report_type, data=data)
else:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)

View File

@@ -173,7 +173,7 @@ L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01
@dataclasses.dataclass
class ClassicChannelSpec:
psm: Optional[int] = None
mtu: int = L2CAP_MIN_BR_EDR_MTU
mtu: int = L2CAP_DEFAULT_MTU
@dataclasses.dataclass
@@ -749,6 +749,8 @@ class ClassicChannel(EventEmitter):
sink: Optional[Callable[[bytes], Any]]
state: State
connection: Connection
mtu: int
peer_mtu: int
def __init__(
self,
@@ -765,6 +767,7 @@ class ClassicChannel(EventEmitter):
self.signaling_cid = signaling_cid
self.state = self.State.CLOSED
self.mtu = mtu
self.peer_mtu = L2CAP_MIN_BR_EDR_MTU
self.psm = psm
self.source_cid = source_cid
self.destination_cid = 0
@@ -861,7 +864,7 @@ class ClassicChannel(EventEmitter):
[
(
L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE,
struct.pack('<H', L2CAP_DEFAULT_MTU),
struct.pack('<H', self.mtu),
)
]
)
@@ -926,8 +929,8 @@ class ClassicChannel(EventEmitter):
options = L2CAP_Control_Frame.decode_configuration_options(request.options)
for option in options:
if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE:
self.mtu = struct.unpack('<H', option[1])[0]
logger.debug(f'MTU = {self.mtu}')
self.peer_mtu = struct.unpack('<H', option[1])[0]
logger.debug(f'peer MTU = {self.peer_mtu}')
self.send_control_frame(
L2CAP_Configure_Response(
@@ -1026,7 +1029,7 @@ class ClassicChannel(EventEmitter):
return (
f'Channel({self.source_cid}->{self.destination_cid}, '
f'PSM={self.psm}, '
f'MTU={self.mtu}, '
f'MTU={self.mtu}/{self.peer_mtu}, '
f'state={self.state.name})'
)

View File

@@ -104,6 +104,7 @@ CRC_TABLE = bytes([
0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF
])
RFCOMM_DEFAULT_L2CAP_MTU = 2048
RFCOMM_DEFAULT_WINDOW_SIZE = 7
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
@@ -473,7 +474,7 @@ class DLC(EventEmitter):
# Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs
self.mtu = min(
max_frame_size, self.multiplexer.l2cap_channel.mtu - max_overhead
max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead
)
def change_state(self, new_state: State) -> None:
@@ -908,8 +909,11 @@ class Client:
multiplexer: Optional[Multiplexer]
l2cap_channel: Optional[l2cap.ClassicChannel]
def __init__(self, connection: Connection) -> None:
def __init__(
self, connection: Connection, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
) -> None:
self.connection = connection
self.l2cap_mtu = l2cap_mtu
self.l2cap_channel = None
self.multiplexer = None
@@ -917,7 +921,7 @@ class Client:
# Create a new L2CAP connection
try:
self.l2cap_channel = await self.connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(RFCOMM_PSM)
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=self.l2cap_mtu)
)
except ProtocolError as error:
logger.warning(f'L2CAP connection failed: {error}')
@@ -955,7 +959,9 @@ class Client:
class Server(EventEmitter):
acceptors: Dict[int, Callable[[DLC], None]]
def __init__(self, device: Device) -> None:
def __init__(
self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
) -> None:
super().__init__()
self.device = device
self.multiplexer = None
@@ -963,7 +969,8 @@ class Server(EventEmitter):
# Register ourselves with the L2CAP channel manager
self.l2cap_server = device.create_l2cap_server(
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM), handler=self.on_connection
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=l2cap_mtu),
handler=self.on_connection,
)
def listen(self, acceptor: Callable[[DLC], None], channel: int = 0) -> int: