add drain support and a few tool options

This commit is contained in:
Gilles Boccon-Gibod
2024-01-02 11:07:52 -08:00
parent 6810865670
commit 8980fb8cc7
7 changed files with 384 additions and 112 deletions

View File

@@ -80,10 +80,10 @@ SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53'
SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D'
DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
DEFAULT_L2CAP_PSM = 1234
DEFAULT_L2CAP_PSM = 128
DEFAULT_L2CAP_MAX_CREDITS = 128
DEFAULT_L2CAP_MTU = 1024
DEFAULT_L2CAP_MPS = 1022
DEFAULT_L2CAP_MPS = 1024
DEFAULT_LINGER_TIME = 1.0
DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0
@@ -240,6 +240,23 @@ async def find_rfcomm_channel_with_uuid(connection: Connection, uuid: str) -> in
return 0
def log_stats(title, stats):
stats_min = min(stats)
stats_max = max(stats)
stats_avg = sum(stats) / len(stats)
logging.info(
color(
(
f'### {title} stats: '
f'min={stats_min:.2f}, '
f'max={stats_max:.2f}, '
f'average={stats_avg:.2f}'
),
'cyan',
)
)
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
@@ -253,14 +270,27 @@ PACKET_FLAG_LAST = 1
# Sender
# -----------------------------------------------------------------------------
class Sender:
def __init__(self, packet_io, start_delay, packet_size, packet_count):
def __init__(
self,
packet_io,
start_delay,
repeat,
repeat_delay,
pace,
packet_size,
packet_count,
):
self.tx_start_delay = start_delay
self.tx_packet_size = packet_size
self.tx_packet_count = packet_count
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.repeat = repeat
self.repeat_delay = repeat_delay
self.pace = pace
self.start_time = 0
self.bytes_sent = 0
self.stats = []
self.done = asyncio.Event()
def reset(self):
@@ -271,27 +301,57 @@ class Sender:
await self.packet_io.ready.wait()
logging.info(color('--- Go!', 'blue'))
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
for run in range(self.repeat + 1):
self.done.clear()
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.start_time = time.time()
for tx_i in range(self.tx_packet_count):
packet_flags = PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6)
logging.info(color(f'Sending packet {tx_i}: {len(packet)} bytes', 'yellow'))
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
await asyncio.sleep(self.repeat_delay)
await self.done.wait()
logging.info(color('=== Done!', 'magenta'))
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.start_time = time.time()
self.bytes_sent = 0
for tx_i in range(self.tx_packet_count):
packet_flags = (
PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6 - self.packet_io.overhead_size)
logging.info(
color(
f'Sending packet {tx_i}: {self.tx_packet_size} bytes', 'yellow'
)
)
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
if self.pace is None:
continue
if self.pace > 0:
await asyncio.sleep(self.pace / 1000)
else:
await self.packet_io.drain()
await self.done.wait()
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
if self.repeat:
log_stats('Run', self.stats)
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
def on_packet_received(self, packet):
try:
@@ -302,6 +362,7 @@ class Sender:
if packet_type == PacketType.ACK:
elapsed = time.time() - self.start_time
average_tx_speed = self.bytes_sent / elapsed
self.stats.append(average_tx_speed)
logging.info(
color(
f'@@@ Received ACK. Speed: average={average_tx_speed:.4f}'
@@ -320,17 +381,17 @@ class Receiver:
start_timestamp: float
last_timestamp: float
def __init__(self, packet_io):
def __init__(self, packet_io, linger):
self.reset()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
def reset(self):
self.expected_packet_index = 0
self.start_timestamp = 0.0
self.last_timestamp = 0.0
self.bytes_received = 0
self.measurements = [(time.time(), 0)]
self.total_bytes_received = 0
def on_packet_received(self, packet):
try:
@@ -338,12 +399,9 @@ class Receiver:
except ValueError:
return
now = time.time()
if packet_type == PacketType.RESET:
logging.info(color('=== Received RESET', 'magenta'))
self.reset()
self.start_timestamp = now
return
try:
@@ -352,7 +410,8 @@ class Receiver:
return
logging.info(
f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, {len(packet)} bytes'
f'flags=0x{packet_flags:02X}, '
f'{len(packet) + self.packet_io.overhead_size} bytes'
)
if packet_index != self.expected_packet_index:
@@ -363,19 +422,27 @@ class Receiver:
)
)
elapsed_since_start = now - self.start_timestamp
elapsed_since_last = now - self.last_timestamp
self.bytes_received += len(packet)
now = time.time()
elapsed_since_start = now - self.measurements[0][0]
elapsed_since_last = now - self.measurements[-1][0]
self.measurements.append((now, len(packet)))
self.total_bytes_received += len(packet)
instant_rx_speed = len(packet) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start
average_rx_speed = self.total_bytes_received / elapsed_since_start
window = self.measurements[-64:]
windowed_rx_speed = sum(measurement[1] for measurement in window[1:]) / (
window[-1][0] - window[0][0]
)
logging.info(
color(
f'Speed: instant={instant_rx_speed:.4f}, average={average_rx_speed:.4f}',
'Speed: '
f'instant={instant_rx_speed:.4f}, '
f'windowed={windowed_rx_speed:.4f}, '
f'average={average_rx_speed:.4f}',
'yellow',
)
)
self.last_timestamp = now
self.expected_packet_index = packet_index + 1
if packet_flags & PACKET_FLAG_LAST:
@@ -385,7 +452,8 @@ class Receiver:
)
)
logging.info(color('@@@ Received last packet', 'green'))
self.done.set()
if not self.linger:
self.done.set()
async def run(self):
await self.done.wait()
@@ -396,16 +464,31 @@ class Receiver:
# Ping
# -----------------------------------------------------------------------------
class Ping:
def __init__(self, packet_io, start_delay, packet_size, packet_count):
def __init__(
self,
packet_io,
start_delay,
repeat,
repeat_delay,
pace,
packet_size,
packet_count,
):
self.tx_start_delay = start_delay
self.tx_packet_size = packet_size
self.tx_packet_count = packet_count
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.repeat = repeat
self.repeat_delay = repeat_delay
self.pace = pace
self.done = asyncio.Event()
self.current_packet_index = 0
self.ping_sent_time = 0.0
self.latencies = []
self.min_stats = []
self.max_stats = []
self.avg_stats = []
def reset(self):
pass
@@ -415,21 +498,53 @@ class Ping:
await self.packet_io.ready.wait()
logging.info(color('--- Go!', 'blue'))
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
for run in range(self.repeat + 1):
self.done.clear()
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
await asyncio.sleep(self.repeat_delay)
await self.send_next_ping()
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
await self.done.wait()
average_latency = sum(self.latencies) / len(self.latencies)
logging.info(color(f'@@@ Average latency: {average_latency:.2f}'))
logging.info(color('=== Done!', 'magenta'))
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.current_packet_index = 0
await self.send_next_ping()
await self.done.wait()
min_latency = min(self.latencies)
max_latency = max(self.latencies)
avg_latency = sum(self.latencies) / len(self.latencies)
logging.info(color(
'@@@ Latencies: '
f'min={min_latency:.2f}, '
f'max={max_latency:.2f}, '
f'average={avg_latency:.2f}'))
self.min_stats.append(min_latency)
self.max_stats.append(max_latency)
self.avg_stats.append(avg_latency)
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
if self.repeat:
log_stats('Min Latency', self.min_stats)
log_stats('Max Latency', self.max_stats)
log_stats('Average Latency', self.avg_stats)
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
async def send_next_ping(self):
if self.pace:
await asyncio.sleep(self.pace / 1000)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
@@ -488,10 +603,11 @@ class Ping:
class Pong:
expected_packet_index: int
def __init__(self, packet_io):
def __init__(self, packet_io, linger):
self.reset()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
def reset(self):
@@ -536,7 +652,7 @@ class Pong:
)
)
if packet_flags & PACKET_FLAG_LAST:
if packet_flags & PACKET_FLAG_LAST and not self.linger:
self.done.set()
async def run(self):
@@ -554,6 +670,7 @@ class GattClient:
self.speed_tx = None
self.packet_listener = None
self.ready = asyncio.Event()
self.overhead_size = 0
async def on_connection(self, connection):
peer = Peer(connection)
@@ -603,6 +720,9 @@ class GattClient:
async def send_packet(self, packet):
await self.speed_tx.write_value(packet)
async def drain(self):
pass
# -----------------------------------------------------------------------------
# GattServer
@@ -612,6 +732,7 @@ class GattServer:
self.device = device
self.packet_listener = None
self.ready = asyncio.Event()
self.overhead_size = 0
# Setup the GATT service
self.speed_tx = Characteristic(
@@ -653,6 +774,9 @@ class GattServer:
async def send_packet(self, packet):
await self.device.notify_subscribers(self.speed_rx, packet)
async def drain(self):
pass
# -----------------------------------------------------------------------------
# StreamedPacketIO
@@ -664,6 +788,7 @@ class StreamedPacketIO:
self.rx_packet = b''
self.rx_packet_header = b''
self.rx_packet_need = 0
self.overhead_size = 2
def on_packet(self, packet):
while packet:
@@ -715,6 +840,7 @@ class L2capClient(StreamedPacketIO):
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.l2cap_channel = None
self.ready = asyncio.Event()
async def on_connection(self, connection: Connection) -> None:
@@ -736,9 +862,10 @@ class L2capClient(StreamedPacketIO):
logging.info(color(f'!!! Connection failed: {error}', 'red'))
return
l2cap_channel.sink = self.on_packet
l2cap_channel.on('close', self.on_l2cap_close)
self.io_sink = l2cap_channel.write
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_packet
self.ready.set()
@@ -748,6 +875,10 @@ class L2capClient(StreamedPacketIO):
def on_l2cap_close(self):
logging.info(color('*** L2CAP channel closed', 'red'))
async def drain(self):
assert self.l2cap_channel
await self.l2cap_channel.drain()
# -----------------------------------------------------------------------------
# L2capServer
@@ -786,6 +917,7 @@ class L2capServer(StreamedPacketIO):
logging.info(color(f'*** L2CAP channel: {l2cap_channel}', 'cyan'))
self.io_sink = l2cap_channel.write
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_packet
@@ -795,6 +927,10 @@ class L2capServer(StreamedPacketIO):
logging.info(color('*** L2CAP channel closed', 'red'))
self.l2cap_channel = None
async def drain(self):
assert self.l2cap_channel
await self.l2cap_channel.drain()
# -----------------------------------------------------------------------------
# RfcommClient
@@ -805,6 +941,7 @@ class RfcommClient(StreamedPacketIO):
self.device = device
self.channel = channel
self.uuid = uuid
self.rfcomm_session = None
self.ready = asyncio.Event()
async def on_connection(self, connection):
@@ -840,12 +977,17 @@ class RfcommClient(StreamedPacketIO):
rfcomm_session.sink = self.on_packet
self.io_sink = rfcomm_session.write
self.rfcomm_session = rfcomm_session
self.ready.set()
def on_disconnection(self, _):
pass
async def drain(self):
assert self.rfcomm_session
await self.rfcomm_session.drain()
# -----------------------------------------------------------------------------
# RfcommServer
@@ -853,6 +995,7 @@ class RfcommClient(StreamedPacketIO):
class RfcommServer(StreamedPacketIO):
def __init__(self, device, channel):
super().__init__()
self.dlc = None
self.ready = asyncio.Event()
# Create and register a server
@@ -881,6 +1024,11 @@ class RfcommServer(StreamedPacketIO):
logging.info(color(f'*** DLC connected: {dlc}', 'blue'))
dlc.sink = self.on_packet
self.io_sink = dlc.write
self.dlc = dlc
async def drain(self):
assert self.dlc
await self.dlc.drain()
# -----------------------------------------------------------------------------
@@ -1030,6 +1178,7 @@ class Central(Connection.Listener):
await role.run()
await asyncio.sleep(DEFAULT_LINGER_TIME)
await self.connection.disconnect()
def on_disconnection(self, reason):
logging.info(color(f'!!! Disconnection: reason={reason}', 'red'))
@@ -1120,12 +1269,8 @@ class Peripheral(Device.Listener, Connection.Listener):
# Stop being discoverable and connectable
if self.classic:
async def stop_being_discoverable_connectable():
await self.device.set_discoverable(False)
await self.device.set_connectable(False)
AsyncRunner.spawn(stop_being_discoverable_connectable())
AsyncRunner.spawn(self.device.set_discoverable(False))
AsyncRunner.spawn(self.device.set_connectable(False))
# Request a new data length if needed
if self.extended_data_length:
@@ -1141,6 +1286,10 @@ class Peripheral(Device.Listener, Connection.Listener):
self.connection = None
self.role.reset()
if self.classic:
AsyncRunner.spawn(self.device.set_discoverable(True))
AsyncRunner.spawn(self.device.set_connectable(True))
def on_connection_parameters_update(self):
print_connection(self.connection)
@@ -1168,10 +1317,22 @@ def create_mode_factory(ctx, default_mode):
return GattServer(device)
if mode == 'l2cap-client':
return L2capClient(device, psm=ctx.obj['l2cap_psm'])
return L2capClient(
device,
psm=ctx.obj['l2cap_psm'],
mtu=ctx.obj['l2cap_mtu'],
mps=ctx.obj['l2cap_mps'],
max_credits=ctx.obj['l2cap_max_credits'],
)
if mode == 'l2cap-server':
return L2capServer(device, psm=ctx.obj['l2cap_psm'])
return L2capServer(
device,
psm=ctx.obj['l2cap_psm'],
mtu=ctx.obj['l2cap_mtu'],
mps=ctx.obj['l2cap_mps'],
max_credits=ctx.obj['l2cap_max_credits'],
)
if mode == 'rfcomm-client':
return RfcommClient(
@@ -1197,23 +1358,29 @@ def create_role_factory(ctx, default_role):
return Sender(
packet_io,
start_delay=ctx.obj['start_delay'],
repeat=ctx.obj['repeat'],
repeat_delay=ctx.obj['repeat_delay'],
pace=ctx.obj['pace'],
packet_size=ctx.obj['packet_size'],
packet_count=ctx.obj['packet_count'],
)
if role == 'receiver':
return Receiver(packet_io)
return Receiver(packet_io, ctx.obj['linger'])
if role == 'ping':
return Ping(
packet_io,
start_delay=ctx.obj['start_delay'],
repeat=ctx.obj['repeat'],
repeat_delay=ctx.obj['repeat_delay'],
pace=ctx.obj['pace'],
packet_size=ctx.obj['packet_size'],
packet_count=ctx.obj['packet_count'],
)
if role == 'pong':
return Pong(packet_io)
return Pong(packet_io, ctx.obj['linger'])
raise ValueError('invalid role')
@@ -1266,13 +1433,31 @@ def create_role_factory(ctx, default_role):
default=DEFAULT_L2CAP_PSM,
help='L2CAP PSM to use',
)
@click.option(
'--l2cap-mtu',
type=int,
default=DEFAULT_L2CAP_MTU,
help='L2CAP MTU to use',
)
@click.option(
'--l2cap-mps',
type=int,
default=DEFAULT_L2CAP_MPS,
help='L2CAP MPS to use',
)
@click.option(
'--l2cap-max-credits',
type=int,
default=DEFAULT_L2CAP_MAX_CREDITS,
help='L2CAP maximum number of credits allowed for the peer',
)
@click.option(
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 4096),
default=500,
help='Packet size (server role)',
help='Packet size (client or ping role)',
)
@click.option(
'--packet-count',
@@ -1280,7 +1465,7 @@ def create_role_factory(ctx, default_role):
metavar='COUNT',
type=int,
default=10,
help='Packet count (server role)',
help='Packet count (client or ping role)',
)
@click.option(
'--start-delay',
@@ -1288,7 +1473,39 @@ def create_role_factory(ctx, default_role):
metavar='SECONDS',
type=int,
default=1,
help='Start delay (server role)',
help='Start delay (client or ping role)',
)
@click.option(
'--repeat',
metavar='N',
type=int,
default=0,
help=(
'Repeat the run N times (client and ping roles)'
'(0, which is the fault, to run just once) '
),
)
@click.option(
'--repeat-delay',
metavar='SECONDS',
type=int,
default=1,
help=('Delay, in seconds, between repeats'),
)
@click.option(
'--pace',
metavar='MILLISECONDS',
type=int,
default=0,
help=(
'Wait N milliseconds between packets '
'(0, which is the fault, to send as fast as possible) '
),
)
@click.option(
'--linger',
is_flag=True,
help="Don't exit at the end of a run (server and pong roles)",
)
@click.pass_context
def bench(
@@ -1301,9 +1518,16 @@ def bench(
packet_size,
packet_count,
start_delay,
repeat,
repeat_delay,
pace,
linger,
rfcomm_channel,
rfcomm_uuid,
l2cap_psm,
l2cap_mtu,
l2cap_mps,
l2cap_max_credits,
):
ctx.ensure_object(dict)
ctx.obj['device_config'] = device_config
@@ -1313,9 +1537,16 @@ def bench(
ctx.obj['rfcomm_channel'] = rfcomm_channel
ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['l2cap_mtu'] = l2cap_mtu
ctx.obj['l2cap_mps'] = l2cap_mps
ctx.obj['l2cap_max_credits'] = l2cap_max_credits
ctx.obj['packet_size'] = packet_size
ctx.obj['packet_count'] = packet_count
ctx.obj['start_delay'] = start_delay
ctx.obj['repeat'] = repeat
ctx.obj['repeat_delay'] = repeat_delay
ctx.obj['pace'] = pace
ctx.obj['linger'] = linger
ctx.obj['extended_data_length'] = (
[int(x) for x in extended_data_length.split('/')]

View File

@@ -49,14 +49,16 @@ class ServerBridge:
self.tcp_port = tcp_port
async def start(self, device: Device) -> None:
# Listen for incoming L2CAP CoC connections
# Listen for incoming L2CAP channel connections
device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec(
psm=self.psm, mtu=self.mtu, mps=self.mps, max_credits=self.max_credits
),
handler=self.on_coc,
handler=self.on_channel,
)
print(
color(f'### Listening for channel connection on PSM {self.psm}', 'yellow')
)
print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
def on_ble_connection(connection):
def on_ble_disconnection(reason):
@@ -73,7 +75,7 @@ class ServerBridge:
await device.start_advertising(auto_restart=True)
# Called when a new L2CAP connection is established
def on_coc(self, l2cap_channel):
def on_channel(self, l2cap_channel):
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
class Pipe:
@@ -83,7 +85,7 @@ class ServerBridge:
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_coc_sdu
l2cap_channel.sink = self.on_channel_sdu
async def connect_to_tcp(self):
# Connect to the TCP server
@@ -128,7 +130,7 @@ class ServerBridge:
if self.tcp_transport is not None:
self.tcp_transport.close()
def on_coc_sdu(self, sdu):
def on_channel_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
if self.tcp_transport is None:
print(color('!!! TCP socket not open, dropping', 'red'))
@@ -183,7 +185,7 @@ class ClientBridge:
peer_name = writer.get_extra_info('peer_name')
print(color(f'<<< TCP connection from {peer_name}', 'magenta'))
def on_coc_sdu(sdu):
def on_channel_sdu(sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
l2cap_to_tcp_pipe.write(sdu)
@@ -209,7 +211,7 @@ class ClientBridge:
writer.close()
return
l2cap_channel.sink = on_coc_sdu
l2cap_channel.sink = on_channel_sdu
l2cap_channel.on('close', on_l2cap_close)
# Start a flow control pipe from L2CAP to TCP
@@ -274,23 +276,29 @@ async def run(device_config, hci_transport, bridge):
@click.pass_context
@click.option('--device-config', help='Device configuration file', required=True)
@click.option('--hci-transport', help='HCI transport', required=True)
@click.option('--psm', help='PSM for L2CAP CoC', type=int, default=1234)
@click.option('--psm', help='PSM for L2CAP', type=int, default=1234)
@click.option(
'--l2cap-coc-max-credits',
help='Maximum L2CAP CoC Credits',
'--l2cap-max-credits',
help='Maximum L2CAP Credits',
type=click.IntRange(1, 65535),
default=128,
)
@click.option(
'--l2cap-coc-mtu',
help='L2CAP CoC MTU',
type=click.IntRange(23, 65535),
default=1022,
'--l2cap-mtu',
help='L2CAP MTU',
type=click.IntRange(
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU,
),
default=1024,
)
@click.option(
'--l2cap-coc-mps',
help='L2CAP CoC MPS',
type=click.IntRange(23, 65533),
'--l2cap-mps',
help='L2CAP MPS',
type=click.IntRange(
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS,
),
default=1024,
)
def cli(
@@ -298,17 +306,17 @@ def cli(
device_config,
hci_transport,
psm,
l2cap_coc_max_credits,
l2cap_coc_mtu,
l2cap_coc_mps,
l2cap_max_credits,
l2cap_mtu,
l2cap_mps,
):
context.ensure_object(dict)
context.obj['device_config'] = device_config
context.obj['hci_transport'] = hci_transport
context.obj['psm'] = psm
context.obj['max_credits'] = l2cap_coc_max_credits
context.obj['mtu'] = l2cap_coc_mtu
context.obj['mps'] = l2cap_coc_mps
context.obj['max_credits'] = l2cap_max_credits
context.obj['mtu'] = l2cap_mtu
context.obj['mps'] = l2cap_mps
# -----------------------------------------------------------------------------

View File

@@ -149,10 +149,11 @@ L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2046
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
@@ -188,8 +189,11 @@ class LeCreditBasedChannelSpec:
or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
):
raise ValueError('max credits out of range')
if self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU:
raise ValueError('MTU too small')
if (
self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU
or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU
):
raise ValueError('MTU out of range')
if (
self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
@@ -1644,12 +1648,13 @@ class ChannelManager:
def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
pdu_bytes = bytes(pdu)
logger.debug(
f'{color(">>> Sending L2CAP PDU", "blue")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
f'{connection.peer_address}: {pdu_str}'
f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}'
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu))
self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes)
def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):

View File

@@ -454,6 +454,8 @@ class DLC(EventEmitter):
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.sink = None
self.connection_result = None
self.drained = asyncio.Event()
self.drained.set()
# Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs
@@ -633,6 +635,8 @@ class DLC(EventEmitter):
)
rx_credits_needed = 0
if not self.tx_buffer:
self.drained.set()
# Stream protocol
def write(self, data: Union[bytes, str]) -> None:
@@ -645,11 +649,11 @@ class DLC(EventEmitter):
raise ValueError('write only accept bytes or strings')
self.tx_buffer += data
self.drained.clear()
self.process_tx()
def drain(self) -> None:
# TODO
pass
async def drain(self) -> None:
await self.drained.wait()
def __str__(self) -> str:
return f'DLC(dlci={self.dlci},state={self.state.name})'

View File

@@ -7,16 +7,36 @@ throughput and/or latency between two devices.
# General Usage
```
Usage: bench.py [OPTIONS] COMMAND [ARGS]...
Usage: bumble-bench [OPTIONS] COMMAND [ARGS]...
Options:
--device-config FILENAME Device configuration file
--role [sender|receiver|ping|pong]
--mode [gatt-client|gatt-server|l2cap-client|l2cap-server|rfcomm-client|rfcomm-server]
--att-mtu MTU GATT MTU (gatt-client mode) [23<=x<=517]
-s, --packet-size SIZE Packet size (server role) [8<=x<=4096]
-c, --packet-count COUNT Packet count (server role)
-sd, --start-delay SECONDS Start delay (server role)
--extended-data-length TEXT Request a data length upon connection,
specified as tx_octets/tx_time
--rfcomm-channel INTEGER RFComm channel to use
--rfcomm-uuid TEXT RFComm service UUID to use (ignored is
--rfcomm-channel is not 0)
--l2cap-psm INTEGER L2CAP PSM to use
--l2cap-mtu INTEGER L2CAP MTU to use
--l2cap-mps INTEGER L2CAP MPS to use
--l2cap-max-credits INTEGER L2CAP maximum number of credits allowed for
the peer
-s, --packet-size SIZE Packet size (client or ping role)
[8<=x<=4096]
-c, --packet-count COUNT Packet count (client or ping role)
-sd, --start-delay SECONDS Start delay (client or ping role)
--repeat N Repeat the run N times (client and ping
roles)(0, which is the fault, to run just
once)
--repeat-delay SECONDS Delay, in seconds, between repeats
--pace MILLISECONDS Wait N milliseconds between packets (0,
which is the fault, to send as fast as
possible)
--linger Don't exit at the end of a run (server and
pong roles)
--help Show this message and exit.
Commands:
@@ -35,17 +55,18 @@ Options:
--connection-interval, --ci CONNECTION_INTERVAL
Connection interval (in ms)
--phy [1m|2m|coded] PHY to use
--authenticate Authenticate (RFComm only)
--encrypt Encrypt the connection (RFComm only)
--help Show this message and exit.
```
To test once device against another, one of the two devices must be running
To test once device against another, one of the two devices must be running
the ``peripheral`` command and the other the ``central`` command. The device
running the ``peripheral`` command will accept connections from the device
running the ``central`` command.
When using Bluetooth LE (all modes except for ``rfcomm-server`` and ``rfcomm-client``utils),
the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central
the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central
using the ``--peripheral`` option. The address will be printed by the Peripheral when
it starts.
@@ -83,7 +104,7 @@ the other on `usb:1`, and two consoles/terminals. We will run a command in each.
$ bumble-bench central usb:1
```
In this default configuration, the Central runs a Sender, as a GATT client,
In this default configuration, the Central runs a Sender, as a GATT client,
connecting to the Peripheral running a Receiver, as a GATT server.
!!! example "L2CAP Throughput"

View File

@@ -74,11 +74,13 @@ class L2capClient(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
if (viewModel.use2mPhy) {
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android

View File

@@ -27,11 +27,12 @@ val DEFAULT_RFCOMM_UUID: UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024
const val DEFAULT_PSM = 128
class AppViewModel : ViewModel() {
private var preferences: SharedPreferences? = null
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var l2capPsm by mutableIntStateOf(0)
var l2capPsm by mutableIntStateOf(DEFAULT_PSM)
var use2mPhy by mutableStateOf(true)
var mtu by mutableIntStateOf(0)
var rxPhy by mutableIntStateOf(0)