Merge pull request #499 from google/gbg/rfcomm-bridge

rfcomm bridge app
This commit is contained in:
Gilles Boccon-Gibod
2024-06-05 11:18:13 -07:00
committed by GitHub
8 changed files with 748 additions and 77 deletions

View File

@@ -899,14 +899,26 @@ class L2capServer(StreamedPacketIO):
# RfcommClient # RfcommClient
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class RfcommClient(StreamedPacketIO): class RfcommClient(StreamedPacketIO):
def __init__(self, device, channel, uuid, l2cap_mtu, max_frame_size, window_size): def __init__(
self,
device,
channel,
uuid,
l2cap_mtu,
max_frame_size,
initial_credits,
max_credits,
credits_threshold,
):
super().__init__() super().__init__()
self.device = device self.device = device
self.channel = channel self.channel = channel
self.uuid = uuid self.uuid = uuid
self.l2cap_mtu = l2cap_mtu self.l2cap_mtu = l2cap_mtu
self.max_frame_size = max_frame_size self.max_frame_size = max_frame_size
self.window_size = window_size self.initial_credits = initial_credits
self.max_credits = max_credits
self.credits_threshold = credits_threshold
self.rfcomm_session = None self.rfcomm_session = None
self.ready = asyncio.Event() self.ready = asyncio.Event()
@@ -940,12 +952,17 @@ class RfcommClient(StreamedPacketIO):
logging.info(color(f'### Opening session for channel {channel}...', 'yellow')) logging.info(color(f'### Opening session for channel {channel}...', 'yellow'))
try: try:
dlc_options = {} dlc_options = {}
if self.max_frame_size: if self.max_frame_size is not None:
dlc_options['max_frame_size'] = self.max_frame_size dlc_options['max_frame_size'] = self.max_frame_size
if self.window_size: if self.initial_credits is not None:
dlc_options['window_size'] = self.window_size dlc_options['initial_credits'] = self.initial_credits
rfcomm_session = await rfcomm_mux.open_dlc(channel, **dlc_options) rfcomm_session = await rfcomm_mux.open_dlc(channel, **dlc_options)
logging.info(color(f'### Session open: {rfcomm_session}', 'yellow')) logging.info(color(f'### Session open: {rfcomm_session}', 'yellow'))
if self.max_credits is not None:
rfcomm_session.rx_max_credits = self.max_credits
if self.credits_threshold is not None:
rfcomm_session.rx_credits_threshold = self.credits_threshold
except bumble.core.ConnectionError as error: except bumble.core.ConnectionError as error:
logging.info(color(f'!!! Session open failed: {error}', 'red')) logging.info(color(f'!!! Session open failed: {error}', 'red'))
await rfcomm_mux.disconnect() await rfcomm_mux.disconnect()
@@ -969,8 +986,19 @@ class RfcommClient(StreamedPacketIO):
# RfcommServer # RfcommServer
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class RfcommServer(StreamedPacketIO): class RfcommServer(StreamedPacketIO):
def __init__(self, device, channel, l2cap_mtu): def __init__(
self,
device,
channel,
l2cap_mtu,
max_frame_size,
initial_credits,
max_credits,
credits_threshold,
):
super().__init__() super().__init__()
self.max_credits = max_credits
self.credits_threshold = credits_threshold
self.dlc = None self.dlc = None
self.ready = asyncio.Event() self.ready = asyncio.Event()
@@ -981,7 +1009,12 @@ class RfcommServer(StreamedPacketIO):
rfcomm_server = bumble.rfcomm.Server(device, **server_options) rfcomm_server = bumble.rfcomm.Server(device, **server_options)
# Listen for incoming DLC connections # Listen for incoming DLC connections
channel_number = rfcomm_server.listen(self.on_dlc, channel) dlc_options = {}
if max_frame_size is not None:
dlc_options['max_frame_size'] = max_frame_size
if initial_credits is not None:
dlc_options['initial_credits'] = initial_credits
channel_number = rfcomm_server.listen(self.on_dlc, channel, **dlc_options)
# Setup the SDP to advertise this channel # Setup the SDP to advertise this channel
device.sdp_service_records = make_sdp_records(channel_number) device.sdp_service_records = make_sdp_records(channel_number)
@@ -1004,6 +1037,10 @@ class RfcommServer(StreamedPacketIO):
dlc.sink = self.on_packet dlc.sink = self.on_packet
self.io_sink = dlc.write self.io_sink = dlc.write
self.dlc = dlc self.dlc = dlc
if self.max_credits is not None:
dlc.rx_max_credits = self.max_credits
if self.credits_threshold is not None:
dlc.rx_credits_threshold = self.credits_threshold
async def drain(self): async def drain(self):
assert self.dlc assert self.dlc
@@ -1321,7 +1358,9 @@ def create_mode_factory(ctx, default_mode):
uuid=ctx.obj['rfcomm_uuid'], uuid=ctx.obj['rfcomm_uuid'],
l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'], l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'],
max_frame_size=ctx.obj['rfcomm_max_frame_size'], max_frame_size=ctx.obj['rfcomm_max_frame_size'],
window_size=ctx.obj['rfcomm_window_size'], initial_credits=ctx.obj['rfcomm_initial_credits'],
max_credits=ctx.obj['rfcomm_max_credits'],
credits_threshold=ctx.obj['rfcomm_credits_threshold'],
) )
if mode == 'rfcomm-server': if mode == 'rfcomm-server':
@@ -1329,6 +1368,10 @@ def create_mode_factory(ctx, default_mode):
device, device,
channel=ctx.obj['rfcomm_channel'], channel=ctx.obj['rfcomm_channel'],
l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'], l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'],
max_frame_size=ctx.obj['rfcomm_max_frame_size'],
initial_credits=ctx.obj['rfcomm_initial_credits'],
max_credits=ctx.obj['rfcomm_max_credits'],
credits_threshold=ctx.obj['rfcomm_credits_threshold'],
) )
raise ValueError('invalid mode') raise ValueError('invalid mode')
@@ -1427,9 +1470,19 @@ def create_role_factory(ctx, default_role):
help='RFComm maximum frame size', help='RFComm maximum frame size',
) )
@click.option( @click.option(
'--rfcomm-window-size', '--rfcomm-initial-credits',
type=int, type=int,
help='RFComm window size', help='RFComm initial credits',
)
@click.option(
'--rfcomm-max-credits',
type=int,
help='RFComm max credits',
)
@click.option(
'--rfcomm-credits-threshold',
type=int,
help='RFComm credits threshold',
) )
@click.option( @click.option(
'--l2cap-psm', '--l2cap-psm',
@@ -1530,7 +1583,9 @@ def bench(
rfcomm_uuid, rfcomm_uuid,
rfcomm_l2cap_mtu, rfcomm_l2cap_mtu,
rfcomm_max_frame_size, rfcomm_max_frame_size,
rfcomm_window_size, rfcomm_initial_credits,
rfcomm_max_credits,
rfcomm_credits_threshold,
l2cap_psm, l2cap_psm,
l2cap_mtu, l2cap_mtu,
l2cap_mps, l2cap_mps,
@@ -1545,7 +1600,9 @@ def bench(
ctx.obj['rfcomm_uuid'] = rfcomm_uuid ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['rfcomm_l2cap_mtu'] = rfcomm_l2cap_mtu ctx.obj['rfcomm_l2cap_mtu'] = rfcomm_l2cap_mtu
ctx.obj['rfcomm_max_frame_size'] = rfcomm_max_frame_size ctx.obj['rfcomm_max_frame_size'] = rfcomm_max_frame_size
ctx.obj['rfcomm_window_size'] = rfcomm_window_size ctx.obj['rfcomm_initial_credits'] = rfcomm_initial_credits
ctx.obj['rfcomm_max_credits'] = rfcomm_max_credits
ctx.obj['rfcomm_credits_threshold'] = rfcomm_credits_threshold
ctx.obj['l2cap_psm'] = l2cap_psm ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['l2cap_mtu'] = l2cap_mtu ctx.obj['l2cap_mtu'] = l2cap_mtu
ctx.obj['l2cap_mps'] = l2cap_mps ctx.obj['l2cap_mps'] = l2cap_mps

511
apps/rfcomm_bridge.py Normal file
View File

@@ -0,0 +1,511 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import time
from typing import Optional
import click
from bumble.colors import color
from bumble.device import Device, DeviceConfiguration, Connection
from bumble import core
from bumble import hci
from bumble import rfcomm
from bumble import transport
from bumble import utils
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
DEFAULT_RFCOMM_UUID = "E6D55659-C8B4-4B85-96BB-B1143AF6D3AE"
DEFAULT_MTU = 4096
DEFAULT_CLIENT_TCP_PORT = 9544
DEFAULT_SERVER_TCP_PORT = 9545
TRACE_MAX_SIZE = 48
# -----------------------------------------------------------------------------
class Tracer:
"""
Trace data buffers transmitted from one endpoint to another, with stats.
"""
def __init__(self, channel_name: str) -> None:
self.channel_name = channel_name
self.last_ts: float = 0.0
def trace_data(self, data: bytes) -> None:
now = time.time()
elapsed_s = now - self.last_ts if self.last_ts else 0
elapsed_ms = int(elapsed_s * 1000)
instant_throughput_kbps = ((len(data) / elapsed_s) / 1000) if elapsed_s else 0.0
hex_str = data[:TRACE_MAX_SIZE].hex() + (
"..." if len(data) > TRACE_MAX_SIZE else ""
)
print(
f"[{self.channel_name}] {len(data):4} bytes "
f"(+{elapsed_ms:4}ms, {instant_throughput_kbps: 7.2f}kB/s) "
f" {hex_str}"
)
self.last_ts = now
# -----------------------------------------------------------------------------
class ServerBridge:
"""
RFCOMM server bridge: waits for a peer to connect an RFCOMM channel.
The RFCOMM channel may be associated with a UUID published in an SDP service
description, or simply be on a system-assigned channel number.
When the connection is made, the bridge connects a TCP socket to a remote host and
bridges the data in both directions, with flow control.
When the RFCOMM channel is closed, the bridge disconnects the TCP socket
and waits for a new channel to be connected.
"""
READ_CHUNK_SIZE = 4096
def __init__(
self, channel: int, uuid: str, trace: bool, tcp_host: str, tcp_port: int
) -> None:
self.device: Optional[Device] = None
self.channel = channel
self.uuid = uuid
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.rfcomm_channel: Optional[rfcomm.DLC] = None
self.tcp_tracer: Optional[Tracer]
self.rfcomm_tracer: Optional[Tracer]
if trace:
self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan"))
self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta"))
else:
self.rfcomm_tracer = None
self.tcp_tracer = None
async def start(self, device: Device) -> None:
self.device = device
# Create and register a server
rfcomm_server = rfcomm.Server(self.device)
# Listen for incoming DLC connections
self.channel = rfcomm_server.listen(self.on_rfcomm_channel, self.channel)
# Setup the SDP to advertise this channel
service_record_handle = 0x00010001
self.device.sdp_service_records = {
service_record_handle: rfcomm.make_service_sdp_records(
service_record_handle, self.channel, core.UUID(self.uuid)
)
}
# We're ready for a connection
self.device.on("connection", self.on_connection)
await self.set_available(True)
print(
color(
(
f"### Listening for RFCOMM connection on {device.public_address}, "
f"channel {self.channel}"
),
"yellow",
)
)
async def set_available(self, available: bool):
# Become discoverable and connectable
assert self.device
await self.device.set_connectable(available)
await self.device.set_discoverable(available)
def on_connection(self, connection):
print(color(f"@@@ Bluetooth connection: {connection}", "blue"))
connection.on("disconnection", self.on_disconnection)
# Don't accept new connections until we're disconnected
utils.AsyncRunner.spawn(self.set_available(False))
def on_disconnection(self, reason: int):
print(
color("@@@ Bluetooth disconnection:", "red"),
hci.HCI_Constant.error_name(reason),
)
# We're ready for a new connection
utils.AsyncRunner.spawn(self.set_available(True))
# Called when an RFCOMM channel is established
@utils.AsyncRunner.run_in_task()
async def on_rfcomm_channel(self, rfcomm_channel):
print(color("*** RFCOMM channel:", "cyan"), rfcomm_channel)
# Connect to the TCP server
print(
color(
f"### Connecting to TCP {self.tcp_host}:{self.tcp_port}",
"yellow",
)
)
try:
reader, writer = await asyncio.open_connection(self.tcp_host, self.tcp_port)
except OSError:
print(color("!!! Connection failed", "red"))
await rfcomm_channel.disconnect()
return
# Pipe data from RFCOMM to TCP
def on_rfcomm_channel_closed():
print(color("*** RFCOMM channel closed", "cyan"))
writer.close()
def write_rfcomm_data(data):
if self.rfcomm_tracer:
self.rfcomm_tracer.trace_data(data)
writer.write(data)
rfcomm_channel.sink = write_rfcomm_data
rfcomm_channel.on("close", on_rfcomm_channel_closed)
# Pipe data from TCP to RFCOMM
while True:
try:
data = await reader.read(self.READ_CHUNK_SIZE)
if len(data) == 0:
print(color("### TCP end of stream", "yellow"))
if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED:
await rfcomm_channel.disconnect()
return
if self.tcp_tracer:
self.tcp_tracer.trace_data(data)
rfcomm_channel.write(data)
await rfcomm_channel.drain()
except Exception as error:
print(f"!!! Exception: {error}")
break
writer.close()
await writer.wait_closed()
print(color("~~~ Bye bye", "magenta"))
# -----------------------------------------------------------------------------
class ClientBridge:
"""
RFCOMM client bridge: connects to a BR/EDR device, then waits for an inbound
TCP connection on a specified port number. When a TCP client connects, an
RFCOMM connection to the device is established, and the data is bridged in both
directions, with flow control.
When the TCP connection is closed by the client, the RFCOMM channel is
disconnected, but the connection to the device remains, ready for a new TCP client
to connect.
"""
READ_CHUNK_SIZE = 4096
def __init__(
self,
channel: int,
uuid: str,
trace: bool,
address: str,
tcp_host: str,
tcp_port: int,
encrypt: bool,
):
self.channel = channel
self.uuid = uuid
self.trace = trace
self.address = address
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.encrypt = encrypt
self.device: Optional[Device] = None
self.connection: Optional[Connection] = None
self.rfcomm_client: Optional[rfcomm.Client]
self.rfcomm_mux: Optional[rfcomm.Multiplexer]
self.tcp_connected: bool = False
self.tcp_tracer: Optional[Tracer]
self.rfcomm_tracer: Optional[Tracer]
if trace:
self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan"))
self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta"))
else:
self.rfcomm_tracer = None
self.tcp_tracer = None
async def connect(self) -> None:
if self.connection:
return
print(color(f"@@@ Connecting to Bluetooth {self.address}", "blue"))
assert self.device
self.connection = await self.device.connect(
self.address, transport=core.BT_BR_EDR_TRANSPORT
)
print(color(f"@@@ Bluetooth connection: {self.connection}", "blue"))
self.connection.on("disconnection", self.on_disconnection)
if self.encrypt:
print(color("@@@ Encrypting Bluetooth connection", "blue"))
await self.connection.encrypt()
print(color("@@@ Bluetooth connection encrypted", "blue"))
self.rfcomm_client = rfcomm.Client(self.connection)
try:
self.rfcomm_mux = await self.rfcomm_client.start()
except BaseException as e:
print(color("!!! Failed to setup RFCOMM connection", "red"), e)
raise
async def start(self, device: Device) -> None:
self.device = device
await device.set_connectable(False)
await device.set_discoverable(False)
# Called when a TCP connection is established
async def on_tcp_connection(reader, writer):
print(color("<<< TCP connection", "magenta"))
if self.tcp_connected:
print(
color("!!! TCP connection already active, rejecting new one", "red")
)
writer.close()
return
self.tcp_connected = True
try:
await self.pipe(reader, writer)
except BaseException as error:
print(color("!!! Exception while piping data:", "red"), error)
return
finally:
writer.close()
await writer.wait_closed()
self.tcp_connected = False
await asyncio.start_server(
on_tcp_connection,
host=self.tcp_host if self.tcp_host != "_" else None,
port=self.tcp_port,
)
print(
color(
f"### Listening for TCP connections on port {self.tcp_port}", "magenta"
)
)
async def pipe(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
# Resolve the channel number from the UUID if needed
if self.channel == 0:
await self.connect()
assert self.connection
channel = await rfcomm.find_rfcomm_channel_with_uuid(
self.connection, self.uuid
)
if channel:
print(color(f"### Found RFCOMM channel {channel}", "yellow"))
else:
print(color(f"!!! RFCOMM channel with UUID {self.uuid} not found"))
return
else:
channel = self.channel
# Connect a new RFCOMM channel
await self.connect()
assert self.rfcomm_mux
print(color(f"*** Opening RFCOMM channel {channel}", "green"))
try:
rfcomm_channel = await self.rfcomm_mux.open_dlc(channel)
print(color(f"*** RFCOMM channel open: {rfcomm_channel}", "green"))
except Exception as error:
print(color(f"!!! RFCOMM open failed: {error}", "red"))
return
# Pipe data from RFCOMM to TCP
def on_rfcomm_channel_closed():
print(color("*** RFCOMM channel closed", "green"))
def write_rfcomm_data(data):
if self.trace:
self.rfcomm_tracer.trace_data(data)
writer.write(data)
rfcomm_channel.on("close", on_rfcomm_channel_closed)
rfcomm_channel.sink = write_rfcomm_data
# Pipe data from TCP to RFCOMM
while True:
try:
data = await reader.read(self.READ_CHUNK_SIZE)
if len(data) == 0:
print(color("### TCP end of stream", "yellow"))
if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED:
await rfcomm_channel.disconnect()
self.tcp_connected = False
return
if self.tcp_tracer:
self.tcp_tracer.trace_data(data)
rfcomm_channel.write(data)
await rfcomm_channel.drain()
except Exception as error:
print(f"!!! Exception: {error}")
break
print(color("~~~ Bye bye", "magenta"))
def on_disconnection(self, reason: int) -> None:
print(
color("@@@ Bluetooth disconnection:", "red"),
hci.HCI_Constant.error_name(reason),
)
self.connection = None
# -----------------------------------------------------------------------------
async def run(device_config, hci_transport, bridge):
print("<<< connecting to HCI...")
async with await transport.open_transport_or_link(hci_transport) as (
hci_source,
hci_sink,
):
print("<<< connected")
if device_config:
device = Device.from_config_file_with_hci(
device_config, hci_source, hci_sink
)
else:
device = Device.from_config_with_hci(
DeviceConfiguration(), hci_source, hci_sink
)
device.classic_enabled = True
# Let's go
await device.power_on()
try:
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
except core.ConnectionError as error:
print(color(f"!!! Bluetooth connection failed: {error}", "red"))
except Exception as error:
print(f"Exception while running bridge: {error}")
# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
@click.option(
"--device-config",
metavar="CONFIG_FILE",
help="Device configuration file",
)
@click.option(
"--hci-transport", metavar="TRANSPORT_NAME", help="HCI transport", required=True
)
@click.option("--trace", is_flag=True, help="Trace bridged data to stdout")
@click.option(
"--channel",
metavar="CHANNEL_NUMER",
help="RFCOMM channel number",
type=int,
default=0,
)
@click.option(
"--uuid",
metavar="UUID",
help="UUID for the RFCOMM channel",
default=DEFAULT_RFCOMM_UUID,
)
def cli(
context,
device_config,
hci_transport,
trace,
channel,
uuid,
):
context.ensure_object(dict)
context.obj["device_config"] = device_config
context.obj["hci_transport"] = hci_transport
context.obj["trace"] = trace
context.obj["channel"] = channel
context.obj["uuid"] = uuid
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.option("--tcp-host", help="TCP host", default="localhost")
@click.option("--tcp-port", help="TCP port", default=DEFAULT_SERVER_TCP_PORT)
def server(context, tcp_host, tcp_port):
bridge = ServerBridge(
context.obj["channel"],
context.obj["uuid"],
context.obj["trace"],
tcp_host,
tcp_port,
)
asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge))
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.argument("bluetooth-address")
@click.option("--tcp-host", help="TCP host", default="_")
@click.option("--tcp-port", help="TCP port", default=DEFAULT_CLIENT_TCP_PORT)
@click.option("--encrypt", is_flag=True, help="Encrypt the connection")
def client(context, bluetooth_address, tcp_host, tcp_port, encrypt):
bridge = ClientBridge(
context.obj["channel"],
context.obj["uuid"],
context.obj["trace"],
bluetooth_address,
tcp_host,
tcp_port,
encrypt,
)
asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge))
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get("BUMBLE_LOGLEVEL", "WARNING").upper())
if __name__ == "__main__":
cli(obj={}) # pylint: disable=no-value-for-parameter

View File

@@ -70,6 +70,7 @@ L2CAP_LE_SIGNALING_CID = 0x05
L2CAP_MIN_LE_MTU = 23 L2CAP_MIN_LE_MTU = 23
L2CAP_MIN_BR_EDR_MTU = 48 L2CAP_MIN_BR_EDR_MTU = 48
L2CAP_MAX_BR_EDR_MTU = 65535
L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept
@@ -832,7 +833,9 @@ class ClassicChannel(EventEmitter):
# Wait for the connection to succeed or fail # Wait for the connection to succeed or fail
try: try:
return await self.connection_result return await self.connection.abort_on(
'disconnection', self.connection_result
)
finally: finally:
self.connection_result = None self.connection_result = None
@@ -2225,7 +2228,7 @@ class ChannelManager:
# Connect # Connect
try: try:
await channel.connect() await channel.connect()
except Exception as e: except BaseException as e:
del connection_channels[source_cid] del connection_channels[source_cid]
raise e raise e

View File

@@ -106,9 +106,11 @@ CRC_TABLE = bytes([
0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF 0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF
]) ])
RFCOMM_DEFAULT_L2CAP_MTU = 2048 RFCOMM_DEFAULT_L2CAP_MTU = 2048
RFCOMM_DEFAULT_WINDOW_SIZE = 7 RFCOMM_DEFAULT_INITIAL_CREDITS = 7
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 RFCOMM_DEFAULT_MAX_CREDITS = 32
RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1 RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30 RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
@@ -365,12 +367,12 @@ class RFCOMM_MCC_PN:
ack_timer: int ack_timer: int
max_frame_size: int max_frame_size: int
max_retransmissions: int max_retransmissions: int
window_size: int initial_credits: int
def __post_init__(self) -> None: def __post_init__(self) -> None:
if self.window_size < 1 or self.window_size > 7: if self.initial_credits < 1 or self.initial_credits > 7:
logger.warning( logger.warning(
f'Error Recovery Window size {self.window_size} is out of range [1, 7].' f'Initial credits {self.initial_credits} is out of range [1, 7].'
) )
@staticmethod @staticmethod
@@ -382,7 +384,7 @@ class RFCOMM_MCC_PN:
ack_timer=data[3], ack_timer=data[3],
max_frame_size=data[4] | data[5] << 8, max_frame_size=data[4] | data[5] << 8,
max_retransmissions=data[6], max_retransmissions=data[6],
window_size=data[7] & 0x07, initial_credits=data[7] & 0x07,
) )
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
@@ -396,7 +398,7 @@ class RFCOMM_MCC_PN:
(self.max_frame_size >> 8) & 0xFF, (self.max_frame_size >> 8) & 0xFF,
self.max_retransmissions & 0xFF, self.max_retransmissions & 0xFF,
# Only 3 bits are meaningful. # Only 3 bits are meaningful.
self.window_size & 0x07, self.initial_credits & 0x07,
] ]
) )
@@ -446,40 +448,43 @@ class DLC(EventEmitter):
DISCONNECTED = 0x04 DISCONNECTED = 0x04
RESET = 0x05 RESET = 0x05
connection_result: Optional[asyncio.Future]
_sink: Optional[Callable[[bytes], None]]
_enqueued_rx_packets: collections.deque[bytes]
def __init__( def __init__(
self, self,
multiplexer: Multiplexer, multiplexer: Multiplexer,
dlci: int, dlci: int,
max_frame_size: int, tx_max_frame_size: int,
window_size: int, tx_initial_credits: int,
rx_max_frame_size: int,
rx_initial_credits: int,
) -> None: ) -> None:
super().__init__() super().__init__()
self.multiplexer = multiplexer self.multiplexer = multiplexer
self.dlci = dlci self.dlci = dlci
self.max_frame_size = max_frame_size self.rx_max_frame_size = rx_max_frame_size
self.window_size = window_size self.rx_initial_credits = rx_initial_credits
self.rx_credits = window_size self.rx_max_credits = RFCOMM_DEFAULT_MAX_CREDITS
self.rx_threshold = window_size // 2 self.rx_credits = rx_initial_credits
self.tx_credits = window_size self.rx_credits_threshold = RFCOMM_DEFAULT_CREDIT_THRESHOLD
self.tx_max_frame_size = tx_max_frame_size
self.tx_credits = tx_initial_credits
self.tx_buffer = b'' self.tx_buffer = b''
self.state = DLC.State.INIT self.state = DLC.State.INIT
self.role = multiplexer.role self.role = multiplexer.role
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0 self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.connection_result = None self.connection_result: Optional[asyncio.Future] = None
self.disconnection_result: Optional[asyncio.Future] = None
self.drained = asyncio.Event() self.drained = asyncio.Event()
self.drained.set() self.drained.set()
# Queued packets when sink is not set. # Queued packets when sink is not set.
self._enqueued_rx_packets = collections.deque(maxlen=DEFAULT_RX_QUEUE_SIZE) self._enqueued_rx_packets: collections.deque[bytes] = collections.deque(
self._sink = None maxlen=DEFAULT_RX_QUEUE_SIZE
)
self._sink: Optional[Callable[[bytes], None]] = None
# Compute the MTU # Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs max_overhead = 4 + 1 # header with 2-byte length + fcs
self.mtu = min( self.mtu = min(
max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead tx_max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead
) )
@property @property
@@ -525,20 +530,35 @@ class DLC(EventEmitter):
self.emit('open') self.emit('open')
def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state != DLC.State.CONNECTING: if self.state == DLC.State.CONNECTING:
# Exchange the modem status with the peer
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
self.change_state(DLC.State.CONNECTED)
if self.connection_result:
self.connection_result.set_result(None)
self.connection_result = None
self.multiplexer.on_dlc_open_complete(self)
elif self.state == DLC.State.DISCONNECTING:
self.change_state(DLC.State.DISCONNECTED)
if self.disconnection_result:
self.disconnection_result.set_result(None)
self.disconnection_result = None
self.multiplexer.on_dlc_disconnection(self)
self.emit('close')
else:
logger.warning( logger.warning(
color('!!! received SABM when not in CONNECTING state', 'red') color(
(
'!!! received UA frame when not in '
'CONNECTING or DISCONNECTING state'
),
'red',
)
) )
return
# Exchange the modem status with the peer
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
self.change_state(DLC.State.CONNECTED)
self.multiplexer.on_dlc_open_complete(self)
def on_dm_frame(self, frame: RFCOMM_Frame) -> None: def on_dm_frame(self, frame: RFCOMM_Frame) -> None:
# TODO: handle all states # TODO: handle all states
@@ -609,6 +629,19 @@ class DLC(EventEmitter):
self.connection_result = asyncio.get_running_loop().create_future() self.connection_result = asyncio.get_running_loop().create_future()
self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci)) self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci))
async def disconnect(self) -> None:
if self.state != DLC.State.CONNECTED:
raise InvalidStateError('invalid state')
self.disconnection_result = asyncio.get_running_loop().create_future()
self.change_state(DLC.State.DISCONNECTING)
self.send_frame(
RFCOMM_Frame.disc(
c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=self.dlci
)
)
await self.disconnection_result
def accept(self) -> None: def accept(self) -> None:
if self.state != DLC.State.INIT: if self.state != DLC.State.INIT:
raise InvalidStateError('invalid state') raise InvalidStateError('invalid state')
@@ -618,9 +651,9 @@ class DLC(EventEmitter):
cl=0xE0, cl=0xE0,
priority=7, priority=7,
ack_timer=0, ack_timer=0,
max_frame_size=self.max_frame_size, max_frame_size=self.rx_max_frame_size,
max_retransmissions=0, max_retransmissions=0,
window_size=self.window_size, initial_credits=self.rx_initial_credits,
) )
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn)) mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn))
logger.debug(f'>>> PN Response: {pn}') logger.debug(f'>>> PN Response: {pn}')
@@ -628,8 +661,8 @@ class DLC(EventEmitter):
self.change_state(DLC.State.CONNECTING) self.change_state(DLC.State.CONNECTING)
def rx_credits_needed(self) -> int: def rx_credits_needed(self) -> int:
if self.rx_credits <= self.rx_threshold: if self.rx_credits <= self.rx_credits_threshold:
return self.window_size - self.rx_credits return self.rx_max_credits - self.rx_credits
return 0 return 0
@@ -689,6 +722,17 @@ class DLC(EventEmitter):
async def drain(self) -> None: async def drain(self) -> None:
await self.drained.wait() await self.drained.wait()
def abort(self) -> None:
logger.debug(f'aborting DLC: {self}')
if self.connection_result:
self.connection_result.cancel()
self.connection_result = None
if self.disconnection_result:
self.disconnection_result.cancel()
self.disconnection_result = None
self.change_state(DLC.State.RESET)
self.emit('close')
def __str__(self) -> str: def __str__(self) -> str:
return f'DLC(dlci={self.dlci},state={self.state.name})' return f'DLC(dlci={self.dlci},state={self.state.name})'
@@ -711,7 +755,7 @@ class Multiplexer(EventEmitter):
connection_result: Optional[asyncio.Future] connection_result: Optional[asyncio.Future]
disconnection_result: Optional[asyncio.Future] disconnection_result: Optional[asyncio.Future]
open_result: Optional[asyncio.Future] open_result: Optional[asyncio.Future]
acceptor: Optional[Callable[[int], bool]] acceptor: Optional[Callable[[int], Optional[Tuple[int, int]]]]
dlcs: Dict[int, DLC] dlcs: Dict[int, DLC]
def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None: def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None:
@@ -723,11 +767,15 @@ class Multiplexer(EventEmitter):
self.connection_result = None self.connection_result = None
self.disconnection_result = None self.disconnection_result = None
self.open_result = None self.open_result = None
self.open_pn: Optional[RFCOMM_MCC_PN] = None
self.open_rx_max_credits = 0
self.acceptor = None self.acceptor = None
# Become a sink for the L2CAP channel # Become a sink for the L2CAP channel
l2cap_channel.sink = self.on_pdu l2cap_channel.sink = self.on_pdu
l2cap_channel.on('close', self.on_l2cap_channel_close)
def change_state(self, new_state: State) -> None: def change_state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
self.state = new_state self.state = new_state
@@ -791,6 +839,7 @@ class Multiplexer(EventEmitter):
'rfcomm', 'rfcomm',
) )
) )
self.open_result = None
else: else:
logger.warning(f'unexpected state for DM: {self}') logger.warning(f'unexpected state for DM: {self}')
@@ -828,9 +877,16 @@ class Multiplexer(EventEmitter):
else: else:
if self.acceptor: if self.acceptor:
channel_number = pn.dlci >> 1 channel_number = pn.dlci >> 1
if self.acceptor(channel_number): if dlc_params := self.acceptor(channel_number):
# Create a new DLC # Create a new DLC
dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size) dlc = DLC(
self,
dlci=pn.dlci,
tx_max_frame_size=pn.max_frame_size,
tx_initial_credits=pn.initial_credits,
rx_max_frame_size=dlc_params[0],
rx_initial_credits=dlc_params[1],
)
self.dlcs[pn.dlci] = dlc self.dlcs[pn.dlci] = dlc
# Re-emit the handshake completion event # Re-emit the handshake completion event
@@ -848,8 +904,17 @@ class Multiplexer(EventEmitter):
# Response # Response
logger.debug(f'>>> PN Response: {pn}') logger.debug(f'>>> PN Response: {pn}')
if self.state == Multiplexer.State.OPENING: if self.state == Multiplexer.State.OPENING:
dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size) assert self.open_pn
dlc = DLC(
self,
dlci=pn.dlci,
tx_max_frame_size=pn.max_frame_size,
tx_initial_credits=pn.initial_credits,
rx_max_frame_size=self.open_pn.max_frame_size,
rx_initial_credits=self.open_pn.initial_credits,
)
self.dlcs[pn.dlci] = dlc self.dlcs[pn.dlci] = dlc
self.open_pn = None
dlc.connect() dlc.connect()
else: else:
logger.warning('ignoring PN response') logger.warning('ignoring PN response')
@@ -887,7 +952,7 @@ class Multiplexer(EventEmitter):
self, self,
channel: int, channel: int,
max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE,
window_size: int = RFCOMM_DEFAULT_WINDOW_SIZE, initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS,
) -> DLC: ) -> DLC:
if self.state != Multiplexer.State.CONNECTED: if self.state != Multiplexer.State.CONNECTED:
if self.state == Multiplexer.State.OPENING: if self.state == Multiplexer.State.OPENING:
@@ -895,17 +960,19 @@ class Multiplexer(EventEmitter):
raise InvalidStateError('not connected') raise InvalidStateError('not connected')
pn = RFCOMM_MCC_PN( self.open_pn = RFCOMM_MCC_PN(
dlci=channel << 1, dlci=channel << 1,
cl=0xF0, cl=0xF0,
priority=7, priority=7,
ack_timer=0, ack_timer=0,
max_frame_size=max_frame_size, max_frame_size=max_frame_size,
max_retransmissions=0, max_retransmissions=0,
window_size=window_size, initial_credits=initial_credits,
) )
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=1, data=bytes(pn)) mcc = RFCOMM_Frame.make_mcc(
logger.debug(f'>>> Sending MCC: {pn}') mcc_type=MccType.PN, c_r=1, data=bytes(self.open_pn)
)
logger.debug(f'>>> Sending MCC: {self.open_pn}')
self.open_result = asyncio.get_running_loop().create_future() self.open_result = asyncio.get_running_loop().create_future()
self.change_state(Multiplexer.State.OPENING) self.change_state(Multiplexer.State.OPENING)
self.send_frame( self.send_frame(
@@ -915,15 +982,31 @@ class Multiplexer(EventEmitter):
information=mcc, information=mcc,
) )
) )
result = await self.open_result return await self.open_result
self.open_result = None
return result
def on_dlc_open_complete(self, dlc: DLC) -> None: def on_dlc_open_complete(self, dlc: DLC) -> None:
logger.debug(f'DLC [{dlc.dlci}] open complete') logger.debug(f'DLC [{dlc.dlci}] open complete')
self.change_state(Multiplexer.State.CONNECTED) self.change_state(Multiplexer.State.CONNECTED)
if self.open_result: if self.open_result:
self.open_result.set_result(dlc) self.open_result.set_result(dlc)
self.open_result = None
def on_dlc_disconnection(self, dlc: DLC) -> None:
logger.debug(f'DLC [{dlc.dlci}] disconnection')
self.dlcs.pop(dlc.dlci, None)
def on_l2cap_channel_close(self) -> None:
logger.debug('L2CAP channel closed, cleaning up')
if self.open_result:
self.open_result.cancel()
self.open_result = None
if self.disconnection_result:
self.disconnection_result.cancel()
self.disconnection_result = None
for dlc in self.dlcs.values():
dlc.abort()
def __str__(self) -> str: def __str__(self) -> str:
return f'Multiplexer(state={self.state.name})' return f'Multiplexer(state={self.state.name})'
@@ -982,15 +1065,13 @@ class Client:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Server(EventEmitter): class Server(EventEmitter):
acceptors: Dict[int, Callable[[DLC], None]]
def __init__( def __init__(
self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
) -> None: ) -> None:
super().__init__() super().__init__()
self.device = device self.device = device
self.multiplexer = None self.acceptors: Dict[int, Callable[[DLC], None]] = {}
self.acceptors = {} self.dlc_configs: Dict[int, Tuple[int, int]] = {}
# Register ourselves with the L2CAP channel manager # Register ourselves with the L2CAP channel manager
self.l2cap_server = device.create_l2cap_server( self.l2cap_server = device.create_l2cap_server(
@@ -998,7 +1079,13 @@ class Server(EventEmitter):
handler=self.on_connection, handler=self.on_connection,
) )
def listen(self, acceptor: Callable[[DLC], None], channel: int = 0) -> int: def listen(
self,
acceptor: Callable[[DLC], None],
channel: int = 0,
max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE,
initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS,
) -> int:
if channel: if channel:
if channel in self.acceptors: if channel in self.acceptors:
# Busy # Busy
@@ -1018,6 +1105,8 @@ class Server(EventEmitter):
return 0 return 0
self.acceptors[channel] = acceptor self.acceptors[channel] = acceptor
self.dlc_configs[channel] = (max_frame_size, initial_credits)
return channel return channel
def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
@@ -1035,15 +1124,14 @@ class Server(EventEmitter):
# Notify # Notify
self.emit('start', multiplexer) self.emit('start', multiplexer)
def accept_dlc(self, channel_number: int) -> bool: def accept_dlc(self, channel_number: int) -> Optional[Tuple[int, int]]:
return channel_number in self.acceptors return self.dlc_configs.get(channel_number)
def on_dlc(self, dlc: DLC) -> None: def on_dlc(self, dlc: DLC) -> None:
logger.debug(f'@@@ new DLC connected: {dlc}') logger.debug(f'@@@ new DLC connected: {dlc}')
# Let the acceptor know # Let the acceptor know
acceptor = self.acceptors.get(dlc.dlci >> 1) if acceptor := self.acceptors.get(dlc.dlci >> 1):
if acceptor:
acceptor(dlc) acceptor(dlc)
def __enter__(self) -> Self: def __enter__(self) -> Self:

View File

@@ -997,7 +997,7 @@ class Server:
try: try:
handler(sdp_pdu) handler(sdp_pdu)
except Exception as error: except Exception as error:
logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
self.send_response( self.send_response(
SDP_ErrorResponse( SDP_ErrorResponse(
transaction_id=sdp_pdu.transaction_id, transaction_id=sdp_pdu.transaction_id,

View File

@@ -56,13 +56,19 @@ class SocketClient(private val viewModel: AppViewModel, private val socket: Blue
thread { thread {
socketDataSource.receive() socketDataSource.receive()
socket.close()
sender.abort()
} }
Log.info("Startup delay: $DEFAULT_STARTUP_DELAY") Log.info("Startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong()); Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("Starting to send") Log.info("Starting to send")
sender.run() try {
sender.run()
} catch (error: IOException) {
Log.info("run ended abruptly")
}
cleanup() cleanup()
} }
} }

View File

@@ -62,6 +62,7 @@ console_scripts =
bumble-gatt-dump = bumble.apps.gatt_dump:main bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
bumble-rfcomm-bridge = bumble.apps.rfcomm_bridge:main
bumble-pair = bumble.apps.pair:main bumble-pair = bumble.apps.pair:main
bumble-scan = bumble.apps.scan:main bumble-scan = bumble.apps.scan:main
bumble-show = bumble.apps.show:main bumble-show = bumble.apps.show:main

View File

@@ -62,7 +62,7 @@ def test_frames():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_basic_connection() -> None: async def test_connection_and_disconnection() -> None:
devices = test_utils.TwoDevices() devices = test_utils.TwoDevices()
await devices.setup_connection() await devices.setup_connection()
@@ -83,6 +83,11 @@ async def test_basic_connection() -> None:
dlcs[1].write(b'Lorem ipsum dolor sit amet') dlcs[1].write(b'Lorem ipsum dolor sit amet')
assert await queues[0].get() == b'Lorem ipsum dolor sit amet' assert await queues[0].get() == b'Lorem ipsum dolor sit amet'
closed = asyncio.Event()
dlcs[1].on('close', closed.set)
await dlcs[1].disconnect()
await closed.wait()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio