Compare commits

...

12 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 00cd8fbdd0 compatibility with recent host ACL property changes 2024-01-05 12:17:09 -08:00
Michael Mogenson c48e3f5e9c Merge pull request #393 from mogenson/controller-loopback
apps: Add a controller loopback throughput test app
2024-01-05 13:13:30 -05:00
Michael Mogenson d6bbc1145a apps: Add a controller loopback throughput test app
Add a command line utility to open a transport to a BT controller, put
the controller into local loopback mode, and send and receive ACL data
packets. Record the time it takes to send and receive all packets and
calculate a throughput measurement in kB/s.

This utility is usefull for characterizing the speed of a transport to a
BT controller (such as a TCP socket or serial port) without having to
deal with a connected peer or the variability of over the air
transmissions.

The transport CLI argument is required. The packet size and packet
count arguments are optional. They default to the same values as the
bumble-bench app.
2024-01-05 10:01:24 -05:00
zxzxwu e2fec67bd9 Merge pull request #390 from zxzxwu/csip
CSIP: Encrypted SIRK implementation
2024-01-04 13:28:23 +08:00
Josh Wu 88cb3b2a4d IWYU in CSIP 2024-01-04 13:22:09 +08:00
zxzxwu 9ebb03be46 Merge pull request #389 from zxzxwu/gitignore
.gitignore: Add venv directories
2024-01-04 12:54:30 +08:00
Gilles Boccon-Gibod 80d84af76c Merge pull request #392 from google/gbg/l2cap-drain
l2cap & rfcomm drain support
2024-01-03 09:59:36 -08:00
Gilles Boccon-Gibod 8f4721758f fix typo 2024-01-03 09:53:17 -08:00
Gilles Boccon-Gibod 8864af4acd format 2024-01-02 11:35:11 -08:00
Gilles Boccon-Gibod 8980fb8cc7 add drain support and a few tool options 2024-01-02 11:07:52 -08:00
Josh Wu 2c5f3472a9 CSIP: Encrypted SIRK implementation 2023-12-30 16:06:42 +08:00
Josh Wu f18277ac78 Ignore venv directories 2023-12-30 14:23:35 +08:00
13 changed files with 698 additions and 127 deletions
+2
View File
@@ -10,3 +10,5 @@ __pycache__
bumble/_version.py bumble/_version.py
.vscode/launch.json .vscode/launch.json
/.idea /.idea
venv/
.venv/
+1
View File
@@ -22,6 +22,7 @@
"cmac", "cmac",
"CONNECTIONLESS", "CONNECTIONLESS",
"csip", "csip",
"csis",
"csrcs", "csrcs",
"CVSD", "CVSD",
"datagram", "datagram",
+300 -66
View File
@@ -80,10 +80,10 @@ SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53'
SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D' SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D'
DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE' DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
DEFAULT_L2CAP_PSM = 1234 DEFAULT_L2CAP_PSM = 128
DEFAULT_L2CAP_MAX_CREDITS = 128 DEFAULT_L2CAP_MAX_CREDITS = 128
DEFAULT_L2CAP_MTU = 1024 DEFAULT_L2CAP_MTU = 1024
DEFAULT_L2CAP_MPS = 1022 DEFAULT_L2CAP_MPS = 1024
DEFAULT_LINGER_TIME = 1.0 DEFAULT_LINGER_TIME = 1.0
DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0 DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0
@@ -240,6 +240,23 @@ async def find_rfcomm_channel_with_uuid(connection: Connection, uuid: str) -> in
return 0 return 0
def log_stats(title, stats):
stats_min = min(stats)
stats_max = max(stats)
stats_avg = sum(stats) / len(stats)
logging.info(
color(
(
f'### {title} stats: '
f'min={stats_min:.2f}, '
f'max={stats_max:.2f}, '
f'average={stats_avg:.2f}'
),
'cyan',
)
)
class PacketType(enum.IntEnum): class PacketType(enum.IntEnum):
RESET = 0 RESET = 0
SEQUENCE = 1 SEQUENCE = 1
@@ -253,14 +270,27 @@ PACKET_FLAG_LAST = 1
# Sender # Sender
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Sender: class Sender:
def __init__(self, packet_io, start_delay, packet_size, packet_count): def __init__(
self,
packet_io,
start_delay,
repeat,
repeat_delay,
pace,
packet_size,
packet_count,
):
self.tx_start_delay = start_delay self.tx_start_delay = start_delay
self.tx_packet_size = packet_size self.tx_packet_size = packet_size
self.tx_packet_count = packet_count self.tx_packet_count = packet_count
self.packet_io = packet_io self.packet_io = packet_io
self.packet_io.packet_listener = self self.packet_io.packet_listener = self
self.repeat = repeat
self.repeat_delay = repeat_delay
self.pace = pace
self.start_time = 0 self.start_time = 0
self.bytes_sent = 0 self.bytes_sent = 0
self.stats = []
self.done = asyncio.Event() self.done = asyncio.Event()
def reset(self): def reset(self):
@@ -271,27 +301,57 @@ class Sender:
await self.packet_io.ready.wait() await self.packet_io.ready.wait()
logging.info(color('--- Go!', 'blue')) logging.info(color('--- Go!', 'blue'))
if self.tx_start_delay: for run in range(self.repeat + 1):
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue')) self.done.clear()
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta')) if run > 0 and self.repeat and self.repeat_delay:
await self.packet_io.send_packet(bytes([PacketType.RESET])) logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
self.start_time = time.time() await asyncio.sleep(self.repeat_delay)
for tx_i in range(self.tx_packet_count):
packet_flags = PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6)
logging.info(color(f'Sending packet {tx_i}: {len(packet)} bytes', 'yellow'))
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
await self.done.wait() if self.tx_start_delay:
logging.info(color('=== Done!', 'magenta')) logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.start_time = time.time()
self.bytes_sent = 0
for tx_i in range(self.tx_packet_count):
packet_flags = (
PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6 - self.packet_io.overhead_size)
logging.info(
color(
f'Sending packet {tx_i}: {self.tx_packet_size} bytes', 'yellow'
)
)
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
if self.pace is None:
continue
if self.pace > 0:
await asyncio.sleep(self.pace / 1000)
else:
await self.packet_io.drain()
await self.done.wait()
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
if self.repeat:
log_stats('Run', self.stats)
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
def on_packet_received(self, packet): def on_packet_received(self, packet):
try: try:
@@ -302,6 +362,7 @@ class Sender:
if packet_type == PacketType.ACK: if packet_type == PacketType.ACK:
elapsed = time.time() - self.start_time elapsed = time.time() - self.start_time
average_tx_speed = self.bytes_sent / elapsed average_tx_speed = self.bytes_sent / elapsed
self.stats.append(average_tx_speed)
logging.info( logging.info(
color( color(
f'@@@ Received ACK. Speed: average={average_tx_speed:.4f}' f'@@@ Received ACK. Speed: average={average_tx_speed:.4f}'
@@ -320,17 +381,17 @@ class Receiver:
start_timestamp: float start_timestamp: float
last_timestamp: float last_timestamp: float
def __init__(self, packet_io): def __init__(self, packet_io, linger):
self.reset() self.reset()
self.packet_io = packet_io self.packet_io = packet_io
self.packet_io.packet_listener = self self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event() self.done = asyncio.Event()
def reset(self): def reset(self):
self.expected_packet_index = 0 self.expected_packet_index = 0
self.start_timestamp = 0.0 self.measurements = [(time.time(), 0)]
self.last_timestamp = 0.0 self.total_bytes_received = 0
self.bytes_received = 0
def on_packet_received(self, packet): def on_packet_received(self, packet):
try: try:
@@ -338,12 +399,9 @@ class Receiver:
except ValueError: except ValueError:
return return
now = time.time()
if packet_type == PacketType.RESET: if packet_type == PacketType.RESET:
logging.info(color('=== Received RESET', 'magenta')) logging.info(color('=== Received RESET', 'magenta'))
self.reset() self.reset()
self.start_timestamp = now
return return
try: try:
@@ -352,7 +410,8 @@ class Receiver:
return return
logging.info( logging.info(
f'<<< Received packet {packet_index}: ' f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, {len(packet)} bytes' f'flags=0x{packet_flags:02X}, '
f'{len(packet) + self.packet_io.overhead_size} bytes'
) )
if packet_index != self.expected_packet_index: if packet_index != self.expected_packet_index:
@@ -363,19 +422,27 @@ class Receiver:
) )
) )
elapsed_since_start = now - self.start_timestamp now = time.time()
elapsed_since_last = now - self.last_timestamp elapsed_since_start = now - self.measurements[0][0]
self.bytes_received += len(packet) elapsed_since_last = now - self.measurements[-1][0]
self.measurements.append((now, len(packet)))
self.total_bytes_received += len(packet)
instant_rx_speed = len(packet) / elapsed_since_last instant_rx_speed = len(packet) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start average_rx_speed = self.total_bytes_received / elapsed_since_start
window = self.measurements[-64:]
windowed_rx_speed = sum(measurement[1] for measurement in window[1:]) / (
window[-1][0] - window[0][0]
)
logging.info( logging.info(
color( color(
f'Speed: instant={instant_rx_speed:.4f}, average={average_rx_speed:.4f}', 'Speed: '
f'instant={instant_rx_speed:.4f}, '
f'windowed={windowed_rx_speed:.4f}, '
f'average={average_rx_speed:.4f}',
'yellow', 'yellow',
) )
) )
self.last_timestamp = now
self.expected_packet_index = packet_index + 1 self.expected_packet_index = packet_index + 1
if packet_flags & PACKET_FLAG_LAST: if packet_flags & PACKET_FLAG_LAST:
@@ -385,7 +452,8 @@ class Receiver:
) )
) )
logging.info(color('@@@ Received last packet', 'green')) logging.info(color('@@@ Received last packet', 'green'))
self.done.set() if not self.linger:
self.done.set()
async def run(self): async def run(self):
await self.done.wait() await self.done.wait()
@@ -396,16 +464,31 @@ class Receiver:
# Ping # Ping
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Ping: class Ping:
def __init__(self, packet_io, start_delay, packet_size, packet_count): def __init__(
self,
packet_io,
start_delay,
repeat,
repeat_delay,
pace,
packet_size,
packet_count,
):
self.tx_start_delay = start_delay self.tx_start_delay = start_delay
self.tx_packet_size = packet_size self.tx_packet_size = packet_size
self.tx_packet_count = packet_count self.tx_packet_count = packet_count
self.packet_io = packet_io self.packet_io = packet_io
self.packet_io.packet_listener = self self.packet_io.packet_listener = self
self.repeat = repeat
self.repeat_delay = repeat_delay
self.pace = pace
self.done = asyncio.Event() self.done = asyncio.Event()
self.current_packet_index = 0 self.current_packet_index = 0
self.ping_sent_time = 0.0 self.ping_sent_time = 0.0
self.latencies = [] self.latencies = []
self.min_stats = []
self.max_stats = []
self.avg_stats = []
def reset(self): def reset(self):
pass pass
@@ -415,21 +498,56 @@ class Ping:
await self.packet_io.ready.wait() await self.packet_io.ready.wait()
logging.info(color('--- Go!', 'blue')) logging.info(color('--- Go!', 'blue'))
if self.tx_start_delay: for run in range(self.repeat + 1):
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue')) self.done.clear()
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta')) if run > 0 and self.repeat and self.repeat_delay:
await self.packet_io.send_packet(bytes([PacketType.RESET])) logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
await asyncio.sleep(self.repeat_delay)
await self.send_next_ping() if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
await self.done.wait() logging.info(color('=== Sending RESET', 'magenta'))
average_latency = sum(self.latencies) / len(self.latencies) await self.packet_io.send_packet(bytes([PacketType.RESET]))
logging.info(color(f'@@@ Average latency: {average_latency:.2f}'))
logging.info(color('=== Done!', 'magenta')) self.current_packet_index = 0
await self.send_next_ping()
await self.done.wait()
min_latency = min(self.latencies)
max_latency = max(self.latencies)
avg_latency = sum(self.latencies) / len(self.latencies)
logging.info(
color(
'@@@ Latencies: '
f'min={min_latency:.2f}, '
f'max={max_latency:.2f}, '
f'average={avg_latency:.2f}'
)
)
self.min_stats.append(min_latency)
self.max_stats.append(max_latency)
self.avg_stats.append(avg_latency)
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
if self.repeat:
log_stats('Min Latency', self.min_stats)
log_stats('Max Latency', self.max_stats)
log_stats('Average Latency', self.avg_stats)
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
async def send_next_ping(self): async def send_next_ping(self):
if self.pace:
await asyncio.sleep(self.pace / 1000)
packet = struct.pack( packet = struct.pack(
'>bbI', '>bbI',
PacketType.SEQUENCE, PacketType.SEQUENCE,
@@ -488,10 +606,11 @@ class Ping:
class Pong: class Pong:
expected_packet_index: int expected_packet_index: int
def __init__(self, packet_io): def __init__(self, packet_io, linger):
self.reset() self.reset()
self.packet_io = packet_io self.packet_io = packet_io
self.packet_io.packet_listener = self self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event() self.done = asyncio.Event()
def reset(self): def reset(self):
@@ -536,7 +655,7 @@ class Pong:
) )
) )
if packet_flags & PACKET_FLAG_LAST: if packet_flags & PACKET_FLAG_LAST and not self.linger:
self.done.set() self.done.set()
async def run(self): async def run(self):
@@ -554,6 +673,7 @@ class GattClient:
self.speed_tx = None self.speed_tx = None
self.packet_listener = None self.packet_listener = None
self.ready = asyncio.Event() self.ready = asyncio.Event()
self.overhead_size = 0
async def on_connection(self, connection): async def on_connection(self, connection):
peer = Peer(connection) peer = Peer(connection)
@@ -603,6 +723,9 @@ class GattClient:
async def send_packet(self, packet): async def send_packet(self, packet):
await self.speed_tx.write_value(packet) await self.speed_tx.write_value(packet)
async def drain(self):
pass
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# GattServer # GattServer
@@ -612,6 +735,7 @@ class GattServer:
self.device = device self.device = device
self.packet_listener = None self.packet_listener = None
self.ready = asyncio.Event() self.ready = asyncio.Event()
self.overhead_size = 0
# Setup the GATT service # Setup the GATT service
self.speed_tx = Characteristic( self.speed_tx = Characteristic(
@@ -653,6 +777,9 @@ class GattServer:
async def send_packet(self, packet): async def send_packet(self, packet):
await self.device.notify_subscribers(self.speed_rx, packet) await self.device.notify_subscribers(self.speed_rx, packet)
async def drain(self):
pass
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# StreamedPacketIO # StreamedPacketIO
@@ -664,6 +791,7 @@ class StreamedPacketIO:
self.rx_packet = b'' self.rx_packet = b''
self.rx_packet_header = b'' self.rx_packet_header = b''
self.rx_packet_need = 0 self.rx_packet_need = 0
self.overhead_size = 2
def on_packet(self, packet): def on_packet(self, packet):
while packet: while packet:
@@ -715,6 +843,7 @@ class L2capClient(StreamedPacketIO):
self.max_credits = max_credits self.max_credits = max_credits
self.mtu = mtu self.mtu = mtu
self.mps = mps self.mps = mps
self.l2cap_channel = None
self.ready = asyncio.Event() self.ready = asyncio.Event()
async def on_connection(self, connection: Connection) -> None: async def on_connection(self, connection: Connection) -> None:
@@ -736,9 +865,10 @@ class L2capClient(StreamedPacketIO):
logging.info(color(f'!!! Connection failed: {error}', 'red')) logging.info(color(f'!!! Connection failed: {error}', 'red'))
return return
l2cap_channel.sink = self.on_packet
l2cap_channel.on('close', self.on_l2cap_close)
self.io_sink = l2cap_channel.write self.io_sink = l2cap_channel.write
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_packet
self.ready.set() self.ready.set()
@@ -748,6 +878,10 @@ class L2capClient(StreamedPacketIO):
def on_l2cap_close(self): def on_l2cap_close(self):
logging.info(color('*** L2CAP channel closed', 'red')) logging.info(color('*** L2CAP channel closed', 'red'))
async def drain(self):
assert self.l2cap_channel
await self.l2cap_channel.drain()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# L2capServer # L2capServer
@@ -786,6 +920,7 @@ class L2capServer(StreamedPacketIO):
logging.info(color(f'*** L2CAP channel: {l2cap_channel}', 'cyan')) logging.info(color(f'*** L2CAP channel: {l2cap_channel}', 'cyan'))
self.io_sink = l2cap_channel.write self.io_sink = l2cap_channel.write
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close) l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_packet l2cap_channel.sink = self.on_packet
@@ -795,6 +930,10 @@ class L2capServer(StreamedPacketIO):
logging.info(color('*** L2CAP channel closed', 'red')) logging.info(color('*** L2CAP channel closed', 'red'))
self.l2cap_channel = None self.l2cap_channel = None
async def drain(self):
assert self.l2cap_channel
await self.l2cap_channel.drain()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# RfcommClient # RfcommClient
@@ -805,6 +944,7 @@ class RfcommClient(StreamedPacketIO):
self.device = device self.device = device
self.channel = channel self.channel = channel
self.uuid = uuid self.uuid = uuid
self.rfcomm_session = None
self.ready = asyncio.Event() self.ready = asyncio.Event()
async def on_connection(self, connection): async def on_connection(self, connection):
@@ -840,12 +980,17 @@ class RfcommClient(StreamedPacketIO):
rfcomm_session.sink = self.on_packet rfcomm_session.sink = self.on_packet
self.io_sink = rfcomm_session.write self.io_sink = rfcomm_session.write
self.rfcomm_session = rfcomm_session
self.ready.set() self.ready.set()
def on_disconnection(self, _): def on_disconnection(self, _):
pass pass
async def drain(self):
assert self.rfcomm_session
await self.rfcomm_session.drain()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# RfcommServer # RfcommServer
@@ -853,6 +998,7 @@ class RfcommClient(StreamedPacketIO):
class RfcommServer(StreamedPacketIO): class RfcommServer(StreamedPacketIO):
def __init__(self, device, channel): def __init__(self, device, channel):
super().__init__() super().__init__()
self.dlc = None
self.ready = asyncio.Event() self.ready = asyncio.Event()
# Create and register a server # Create and register a server
@@ -881,6 +1027,11 @@ class RfcommServer(StreamedPacketIO):
logging.info(color(f'*** DLC connected: {dlc}', 'blue')) logging.info(color(f'*** DLC connected: {dlc}', 'blue'))
dlc.sink = self.on_packet dlc.sink = self.on_packet
self.io_sink = dlc.write self.io_sink = dlc.write
self.dlc = dlc
async def drain(self):
assert self.dlc
await self.dlc.drain()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -1030,6 +1181,7 @@ class Central(Connection.Listener):
await role.run() await role.run()
await asyncio.sleep(DEFAULT_LINGER_TIME) await asyncio.sleep(DEFAULT_LINGER_TIME)
await self.connection.disconnect()
def on_disconnection(self, reason): def on_disconnection(self, reason):
logging.info(color(f'!!! Disconnection: reason={reason}', 'red')) logging.info(color(f'!!! Disconnection: reason={reason}', 'red'))
@@ -1120,12 +1272,8 @@ class Peripheral(Device.Listener, Connection.Listener):
# Stop being discoverable and connectable # Stop being discoverable and connectable
if self.classic: if self.classic:
AsyncRunner.spawn(self.device.set_discoverable(False))
async def stop_being_discoverable_connectable(): AsyncRunner.spawn(self.device.set_connectable(False))
await self.device.set_discoverable(False)
await self.device.set_connectable(False)
AsyncRunner.spawn(stop_being_discoverable_connectable())
# Request a new data length if needed # Request a new data length if needed
if self.extended_data_length: if self.extended_data_length:
@@ -1141,6 +1289,10 @@ class Peripheral(Device.Listener, Connection.Listener):
self.connection = None self.connection = None
self.role.reset() self.role.reset()
if self.classic:
AsyncRunner.spawn(self.device.set_discoverable(True))
AsyncRunner.spawn(self.device.set_connectable(True))
def on_connection_parameters_update(self): def on_connection_parameters_update(self):
print_connection(self.connection) print_connection(self.connection)
@@ -1168,10 +1320,22 @@ def create_mode_factory(ctx, default_mode):
return GattServer(device) return GattServer(device)
if mode == 'l2cap-client': if mode == 'l2cap-client':
return L2capClient(device, psm=ctx.obj['l2cap_psm']) return L2capClient(
device,
psm=ctx.obj['l2cap_psm'],
mtu=ctx.obj['l2cap_mtu'],
mps=ctx.obj['l2cap_mps'],
max_credits=ctx.obj['l2cap_max_credits'],
)
if mode == 'l2cap-server': if mode == 'l2cap-server':
return L2capServer(device, psm=ctx.obj['l2cap_psm']) return L2capServer(
device,
psm=ctx.obj['l2cap_psm'],
mtu=ctx.obj['l2cap_mtu'],
mps=ctx.obj['l2cap_mps'],
max_credits=ctx.obj['l2cap_max_credits'],
)
if mode == 'rfcomm-client': if mode == 'rfcomm-client':
return RfcommClient( return RfcommClient(
@@ -1197,23 +1361,29 @@ def create_role_factory(ctx, default_role):
return Sender( return Sender(
packet_io, packet_io,
start_delay=ctx.obj['start_delay'], start_delay=ctx.obj['start_delay'],
repeat=ctx.obj['repeat'],
repeat_delay=ctx.obj['repeat_delay'],
pace=ctx.obj['pace'],
packet_size=ctx.obj['packet_size'], packet_size=ctx.obj['packet_size'],
packet_count=ctx.obj['packet_count'], packet_count=ctx.obj['packet_count'],
) )
if role == 'receiver': if role == 'receiver':
return Receiver(packet_io) return Receiver(packet_io, ctx.obj['linger'])
if role == 'ping': if role == 'ping':
return Ping( return Ping(
packet_io, packet_io,
start_delay=ctx.obj['start_delay'], start_delay=ctx.obj['start_delay'],
repeat=ctx.obj['repeat'],
repeat_delay=ctx.obj['repeat_delay'],
pace=ctx.obj['pace'],
packet_size=ctx.obj['packet_size'], packet_size=ctx.obj['packet_size'],
packet_count=ctx.obj['packet_count'], packet_count=ctx.obj['packet_count'],
) )
if role == 'pong': if role == 'pong':
return Pong(packet_io) return Pong(packet_io, ctx.obj['linger'])
raise ValueError('invalid role') raise ValueError('invalid role')
@@ -1258,7 +1428,7 @@ def create_role_factory(ctx, default_role):
@click.option( @click.option(
'--rfcomm-uuid', '--rfcomm-uuid',
default=DEFAULT_RFCOMM_UUID, default=DEFAULT_RFCOMM_UUID,
help='RFComm service UUID to use (ignored is --rfcomm-channel is not 0)', help='RFComm service UUID to use (ignored if --rfcomm-channel is not 0)',
) )
@click.option( @click.option(
'--l2cap-psm', '--l2cap-psm',
@@ -1266,13 +1436,31 @@ def create_role_factory(ctx, default_role):
default=DEFAULT_L2CAP_PSM, default=DEFAULT_L2CAP_PSM,
help='L2CAP PSM to use', help='L2CAP PSM to use',
) )
@click.option(
'--l2cap-mtu',
type=int,
default=DEFAULT_L2CAP_MTU,
help='L2CAP MTU to use',
)
@click.option(
'--l2cap-mps',
type=int,
default=DEFAULT_L2CAP_MPS,
help='L2CAP MPS to use',
)
@click.option(
'--l2cap-max-credits',
type=int,
default=DEFAULT_L2CAP_MAX_CREDITS,
help='L2CAP maximum number of credits allowed for the peer',
)
@click.option( @click.option(
'--packet-size', '--packet-size',
'-s', '-s',
metavar='SIZE', metavar='SIZE',
type=click.IntRange(8, 4096), type=click.IntRange(8, 4096),
default=500, default=500,
help='Packet size (server role)', help='Packet size (client or ping role)',
) )
@click.option( @click.option(
'--packet-count', '--packet-count',
@@ -1280,7 +1468,7 @@ def create_role_factory(ctx, default_role):
metavar='COUNT', metavar='COUNT',
type=int, type=int,
default=10, default=10,
help='Packet count (server role)', help='Packet count (client or ping role)',
) )
@click.option( @click.option(
'--start-delay', '--start-delay',
@@ -1288,7 +1476,39 @@ def create_role_factory(ctx, default_role):
metavar='SECONDS', metavar='SECONDS',
type=int, type=int,
default=1, default=1,
help='Start delay (server role)', help='Start delay (client or ping role)',
)
@click.option(
'--repeat',
metavar='N',
type=int,
default=0,
help=(
'Repeat the run N times (client and ping roles)'
'(0, which is the fault, to run just once) '
),
)
@click.option(
'--repeat-delay',
metavar='SECONDS',
type=int,
default=1,
help=('Delay, in seconds, between repeats'),
)
@click.option(
'--pace',
metavar='MILLISECONDS',
type=int,
default=0,
help=(
'Wait N milliseconds between packets '
'(0, which is the fault, to send as fast as possible) '
),
)
@click.option(
'--linger',
is_flag=True,
help="Don't exit at the end of a run (server and pong roles)",
) )
@click.pass_context @click.pass_context
def bench( def bench(
@@ -1301,9 +1521,16 @@ def bench(
packet_size, packet_size,
packet_count, packet_count,
start_delay, start_delay,
repeat,
repeat_delay,
pace,
linger,
rfcomm_channel, rfcomm_channel,
rfcomm_uuid, rfcomm_uuid,
l2cap_psm, l2cap_psm,
l2cap_mtu,
l2cap_mps,
l2cap_max_credits,
): ):
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj['device_config'] = device_config ctx.obj['device_config'] = device_config
@@ -1313,9 +1540,16 @@ def bench(
ctx.obj['rfcomm_channel'] = rfcomm_channel ctx.obj['rfcomm_channel'] = rfcomm_channel
ctx.obj['rfcomm_uuid'] = rfcomm_uuid ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['l2cap_psm'] = l2cap_psm ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['l2cap_mtu'] = l2cap_mtu
ctx.obj['l2cap_mps'] = l2cap_mps
ctx.obj['l2cap_max_credits'] = l2cap_max_credits
ctx.obj['packet_size'] = packet_size ctx.obj['packet_size'] = packet_size
ctx.obj['packet_count'] = packet_count ctx.obj['packet_count'] = packet_count
ctx.obj['start_delay'] = start_delay ctx.obj['start_delay'] = start_delay
ctx.obj['repeat'] = repeat
ctx.obj['repeat_delay'] = repeat_delay
ctx.obj['pace'] = pace
ctx.obj['linger'] = linger
ctx.obj['extended_data_length'] = ( ctx.obj['extended_data_length'] = (
[int(x) for x in extended_data_length.split('/')] [int(x) for x in extended_data_length.split('/')]
+200
View File
@@ -0,0 +1,200 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import time
from typing import Optional
from bumble.colors import color
from bumble.hci import (
HCI_READ_LOOPBACK_MODE_COMMAND,
HCI_Read_Loopback_Mode_Command,
HCI_WRITE_LOOPBACK_MODE_COMMAND,
HCI_Write_Loopback_Mode_Command,
LoopbackMode,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
import click
class Loopback:
"""Send and receive ACL data packets in local loopback mode"""
def __init__(self, packet_size: int, packet_count: int, transport: str):
self.transport = transport
self.packet_size = packet_size
self.packet_count = packet_count
self.connection_handle: Optional[int] = None
self.connection_event = asyncio.Event()
self.done = asyncio.Event()
self.expected_cid = 0
self.bytes_received = 0
self.start_timestamp = 0.0
self.last_timestamp = 0.0
def on_connection(self, connection_handle: int, *args):
"""Retrieve connection handle from new connection event"""
if not self.connection_event.is_set():
# save first connection handle for ACL
# subsequent connections are SCO
self.connection_handle = connection_handle
self.connection_event.set()
def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes):
"""Calculate packet receive speed"""
now = time.time()
print(f'<<< Received packet {cid}: {len(pdu)} bytes')
assert connection_handle == self.connection_handle
assert cid == self.expected_cid
self.expected_cid += 1
if cid == 0:
self.start_timestamp = now
else:
elapsed_since_start = now - self.start_timestamp
elapsed_since_last = now - self.last_timestamp
self.bytes_received += len(pdu)
instant_rx_speed = len(pdu) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start
print(
color(
f'@@@ RX speed: instant={instant_rx_speed:.4f},'
f' average={average_rx_speed:.4f}',
'cyan',
)
)
self.last_timestamp = now
if self.expected_cid == self.packet_count:
print(color('@@@ Received last packet', 'green'))
self.done.set()
async def run(self):
"""Run a loopback throughput test"""
print(color('>>> Connecting to HCI...', 'green'))
async with await open_transport_or_link(self.transport) as (
hci_source,
hci_sink,
):
print(color('>>> Connected', 'green'))
host = Host(hci_source, hci_sink)
await host.reset()
# make sure data can fit in one l2cap pdu
l2cap_header_size = 4
max_packet_size = host.acl_packet_queue.max_packet_size - l2cap_header_size
if self.packet_size > max_packet_size:
print(
color(
f'!!! Packet size ({self.packet_size}) larger than max supported'
f' size ({max_packet_size})',
'red',
)
)
return
if not host.supports_command(
HCI_WRITE_LOOPBACK_MODE_COMMAND
) or not host.supports_command(HCI_READ_LOOPBACK_MODE_COMMAND):
print(color('!!! Loopback mode not supported', 'red'))
return
# set event callbacks
host.on('connection', self.on_connection)
host.on('l2cap_pdu', self.on_l2cap_pdu)
loopback_mode = LoopbackMode.LOCAL
print(color('### Setting loopback mode', 'blue'))
await host.send_command(
HCI_Write_Loopback_Mode_Command(loopback_mode=LoopbackMode.LOCAL),
check_result=True,
)
print(color('### Checking loopback mode', 'blue'))
response = await host.send_command(
HCI_Read_Loopback_Mode_Command(), check_result=True
)
if response.return_parameters.loopback_mode != loopback_mode:
print(color('!!! Loopback mode mismatch', 'red'))
return
await self.connection_event.wait()
print(color('### Connected', 'cyan'))
print(color('=== Start sending', 'magenta'))
start_time = time.time()
bytes_sent = 0
for cid in range(0, self.packet_count):
# using the cid as an incremental index
host.send_l2cap_pdu(
self.connection_handle, cid, bytes(self.packet_size)
)
print(
color(
f'>>> Sending packet {cid}: {self.packet_size} bytes', 'yellow'
)
)
bytes_sent += self.packet_size # don't count L2CAP or HCI header sizes
await asyncio.sleep(0) # yield to allow packet receive
await self.done.wait()
print(color('=== Done!', 'magenta'))
elapsed = time.time() - start_time
average_tx_speed = bytes_sent / elapsed
print(
color(
f'@@@ TX speed: average={average_tx_speed:.4f} ({bytes_sent} bytes'
f' in {elapsed:.2f} seconds)',
'green',
)
)
# -----------------------------------------------------------------------------
@click.command()
@click.option(
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 4096),
default=500,
help='Packet size',
)
@click.option(
'--packet-count',
'-c',
metavar='COUNT',
type=int,
default=10,
help='Packet count',
)
@click.argument('transport')
def main(packet_size, packet_count, transport):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
loopback = Loopback(packet_size, packet_count, transport)
asyncio.run(loopback.run())
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()
+32 -24
View File
@@ -49,14 +49,16 @@ class ServerBridge:
self.tcp_port = tcp_port self.tcp_port = tcp_port
async def start(self, device: Device) -> None: async def start(self, device: Device) -> None:
# Listen for incoming L2CAP CoC connections # Listen for incoming L2CAP channel connections
device.create_l2cap_server( device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec( spec=l2cap.LeCreditBasedChannelSpec(
psm=self.psm, mtu=self.mtu, mps=self.mps, max_credits=self.max_credits psm=self.psm, mtu=self.mtu, mps=self.mps, max_credits=self.max_credits
), ),
handler=self.on_coc, handler=self.on_channel,
)
print(
color(f'### Listening for channel connection on PSM {self.psm}', 'yellow')
) )
print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
def on_ble_connection(connection): def on_ble_connection(connection):
def on_ble_disconnection(reason): def on_ble_disconnection(reason):
@@ -73,7 +75,7 @@ class ServerBridge:
await device.start_advertising(auto_restart=True) await device.start_advertising(auto_restart=True)
# Called when a new L2CAP connection is established # Called when a new L2CAP connection is established
def on_coc(self, l2cap_channel): def on_channel(self, l2cap_channel):
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel) print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
class Pipe: class Pipe:
@@ -83,7 +85,7 @@ class ServerBridge:
self.l2cap_channel = l2cap_channel self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close) l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_coc_sdu l2cap_channel.sink = self.on_channel_sdu
async def connect_to_tcp(self): async def connect_to_tcp(self):
# Connect to the TCP server # Connect to the TCP server
@@ -128,7 +130,7 @@ class ServerBridge:
if self.tcp_transport is not None: if self.tcp_transport is not None:
self.tcp_transport.close() self.tcp_transport.close()
def on_coc_sdu(self, sdu): def on_channel_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan')) print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
if self.tcp_transport is None: if self.tcp_transport is None:
print(color('!!! TCP socket not open, dropping', 'red')) print(color('!!! TCP socket not open, dropping', 'red'))
@@ -183,7 +185,7 @@ class ClientBridge:
peer_name = writer.get_extra_info('peer_name') peer_name = writer.get_extra_info('peer_name')
print(color(f'<<< TCP connection from {peer_name}', 'magenta')) print(color(f'<<< TCP connection from {peer_name}', 'magenta'))
def on_coc_sdu(sdu): def on_channel_sdu(sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan')) print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
l2cap_to_tcp_pipe.write(sdu) l2cap_to_tcp_pipe.write(sdu)
@@ -209,7 +211,7 @@ class ClientBridge:
writer.close() writer.close()
return return
l2cap_channel.sink = on_coc_sdu l2cap_channel.sink = on_channel_sdu
l2cap_channel.on('close', on_l2cap_close) l2cap_channel.on('close', on_l2cap_close)
# Start a flow control pipe from L2CAP to TCP # Start a flow control pipe from L2CAP to TCP
@@ -274,23 +276,29 @@ async def run(device_config, hci_transport, bridge):
@click.pass_context @click.pass_context
@click.option('--device-config', help='Device configuration file', required=True) @click.option('--device-config', help='Device configuration file', required=True)
@click.option('--hci-transport', help='HCI transport', required=True) @click.option('--hci-transport', help='HCI transport', required=True)
@click.option('--psm', help='PSM for L2CAP CoC', type=int, default=1234) @click.option('--psm', help='PSM for L2CAP', type=int, default=1234)
@click.option( @click.option(
'--l2cap-coc-max-credits', '--l2cap-max-credits',
help='Maximum L2CAP CoC Credits', help='Maximum L2CAP Credits',
type=click.IntRange(1, 65535), type=click.IntRange(1, 65535),
default=128, default=128,
) )
@click.option( @click.option(
'--l2cap-coc-mtu', '--l2cap-mtu',
help='L2CAP CoC MTU', help='L2CAP MTU',
type=click.IntRange(23, 65535), type=click.IntRange(
default=1022, l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU,
),
default=1024,
) )
@click.option( @click.option(
'--l2cap-coc-mps', '--l2cap-mps',
help='L2CAP CoC MPS', help='L2CAP MPS',
type=click.IntRange(23, 65533), type=click.IntRange(
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS,
),
default=1024, default=1024,
) )
def cli( def cli(
@@ -298,17 +306,17 @@ def cli(
device_config, device_config,
hci_transport, hci_transport,
psm, psm,
l2cap_coc_max_credits, l2cap_max_credits,
l2cap_coc_mtu, l2cap_mtu,
l2cap_coc_mps, l2cap_mps,
): ):
context.ensure_object(dict) context.ensure_object(dict)
context.obj['device_config'] = device_config context.obj['device_config'] = device_config
context.obj['hci_transport'] = hci_transport context.obj['hci_transport'] = hci_transport
context.obj['psm'] = psm context.obj['psm'] = psm
context.obj['max_credits'] = l2cap_coc_max_credits context.obj['max_credits'] = l2cap_max_credits
context.obj['mtu'] = l2cap_coc_mtu context.obj['mtu'] = l2cap_mtu
context.obj['mps'] = l2cap_coc_mps context.obj['mps'] = l2cap_mps
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
+32
View File
@@ -2026,6 +2026,17 @@ class OwnAddressType(enum.IntEnum):
return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name} return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name}
# -----------------------------------------------------------------------------
class LoopbackMode(enum.IntEnum):
DISABLED = 0
LOCAL = 1
REMOTE = 2
@classmethod
def type_spec(cls):
return {'size': 1, 'mapper': lambda x: LoopbackMode(x).name}
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_Packet: class HCI_Packet:
''' '''
@@ -3352,6 +3363,27 @@ class HCI_Read_Encryption_Key_Size_Command(HCI_Command):
''' '''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('loopback_mode', LoopbackMode.type_spec()),
],
)
class HCI_Read_Loopback_Mode_Command(HCI_Command):
'''
See Bluetooth spec @ 7.6.1 Read Loopback Mode Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('loopback_mode', 1)])
class HCI_Write_Loopback_Mode_Command(HCI_Command):
'''
See Bluetooth spec @ 7.6.2 Write Loopback Mode Command
'''
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command([('le_event_mask', 8)]) @HCI_Command.command([('le_event_mask', 8)])
class HCI_LE_Set_Event_Mask_Command(HCI_Command): class HCI_LE_Set_Event_Mask_Command(HCI_Command):
+10 -5
View File
@@ -149,10 +149,11 @@ L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535 L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23 L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23 L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533 L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048 L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2046 L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256 L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01 L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
@@ -188,8 +189,11 @@ class LeCreditBasedChannelSpec:
or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
): ):
raise ValueError('max credits out of range') raise ValueError('max credits out of range')
if self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU: if (
raise ValueError('MTU too small') self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU
or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU
):
raise ValueError('MTU out of range')
if ( if (
self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
@@ -1644,12 +1648,13 @@ class ChannelManager:
def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None: def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu) pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
pdu_bytes = bytes(pdu)
logger.debug( logger.debug(
f'{color(">>> Sending L2CAP PDU", "blue")} ' f'{color(">>> Sending L2CAP PDU", "blue")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) ' f'on connection [0x{connection.handle:04X}] (CID={cid}) '
f'{connection.peer_address}: {pdu_str}' f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}'
) )
self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu)) self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes)
def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID): if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
+60 -8
View File
@@ -19,7 +19,7 @@
from __future__ import annotations from __future__ import annotations
import enum import enum
import struct import struct
from typing import Optional from typing import Optional, Tuple
from bumble import core from bumble import core
from bumble import crypto from bumble import crypto
@@ -31,6 +31,9 @@ from bumble import gatt_client
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Constants # Constants
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
SET_IDENTITY_RESOLVING_KEY_LENGTH = 16
class SirkType(enum.IntEnum): class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.''' '''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''
@@ -66,6 +69,10 @@ def k1(n: bytes, salt: bytes, p: bytes) -> bytes:
def sef(k: bytes, r: bytes) -> bytes: def sef(k: bytes, r: bytes) -> bytes:
''' '''
Coordinated Set Identification Service - 4.5 SIRK encryption function sef. Coordinated Set Identification Service - 4.5 SIRK encryption function sef.
SIRK decryption function sdf shares the same algorithm. The only difference is that argument r is:
* Plaintext in encryption
* Cipher in decryption
''' '''
return crypto.xor(k1(k, s1(b'SIRKenc'[::-1]), b'csis'[::-1]), r) return crypto.xor(k1(k, s1(b'SIRKenc'[::-1]), b'csis'[::-1]), r)
@@ -105,6 +112,11 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
set_member_lock: Optional[MemberLock] = None, set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None, set_member_rank: Optional[int] = None,
) -> None: ) -> None:
if len(set_identity_resolving_key) != SET_IDENTITY_RESOLVING_KEY_LENGTH:
raise ValueError(
f'Invalid SIRK length {len(set_identity_resolving_key)}, expected {SET_IDENTITY_RESOLVING_KEY_LENGTH}'
)
characteristics = [] characteristics = []
self.set_identity_resolving_key = set_identity_resolving_key self.set_identity_resolving_key = set_identity_resolving_key
@@ -113,7 +125,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC, uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY, | gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE, permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(read=self.on_sirk_read), value=gatt.CharacteristicValue(read=self.on_sirk_read),
) )
characteristics.append(self.set_identity_resolving_key_characteristic) characteristics.append(self.set_identity_resolving_key_characteristic)
@@ -123,7 +135,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC, uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY, | gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE, permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', coordinated_set_size), value=struct.pack('B', coordinated_set_size),
) )
characteristics.append(self.coordinated_set_size_characteristic) characteristics.append(self.coordinated_set_size_characteristic)
@@ -134,7 +146,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
properties=gatt.Characteristic.Properties.READ properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY | gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE, | gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITEABLE, | gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock), value=struct.pack('B', set_member_lock),
) )
@@ -145,18 +157,32 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC, uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY, | gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE, permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', set_member_rank), value=struct.pack('B', set_member_rank),
) )
characteristics.append(self.set_member_rank_characteristic) characteristics.append(self.set_member_rank_characteristic)
super().__init__(characteristics) super().__init__(characteristics)
def on_sirk_read(self, _connection: Optional[device.Connection]) -> bytes: async def on_sirk_read(self, connection: Optional[device.Connection]) -> bytes:
if self.set_identity_resolving_key_type == SirkType.PLAINTEXT: if self.set_identity_resolving_key_type == SirkType.PLAINTEXT:
return bytes([SirkType.PLAINTEXT]) + self.set_identity_resolving_key sirk_bytes = self.set_identity_resolving_key
else: else:
raise NotImplementedError('TODO: Pending async Characteristic read.') assert connection
if connection.transport == core.BT_LE_TRANSPORT:
key = await connection.device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await connection.device.get_link_key(connection.peer_address)
if not key:
raise RuntimeError('LTK or LinkKey is not present')
sirk_bytes = sef(key, self.set_identity_resolving_key)
return bytes([self.set_identity_resolving_key_type]) + sirk_bytes
def get_advertising_data(self) -> bytes: def get_advertising_data(self) -> bytes:
return bytes( return bytes(
@@ -203,3 +229,29 @@ class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
): ):
self.set_member_rank = characteristics[0] self.set_member_rank = characteristics[0]
async def read_set_identity_resolving_key(self) -> Tuple[SirkType, bytes]:
'''Reads SIRK and decrypts if encrypted.'''
response = await self.set_identity_resolving_key.read_value()
if len(response) != SET_IDENTITY_RESOLVING_KEY_LENGTH + 1:
raise RuntimeError('Invalid SIRK value')
sirk_type = SirkType(response[0])
if sirk_type == SirkType.PLAINTEXT:
sirk = response[1:]
else:
connection = self.service_proxy.client.connection
device = connection.device
if connection.transport == core.BT_LE_TRANSPORT:
key = await device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await device.get_link_key(connection.peer_address)
if not key:
raise RuntimeError('LTK or LinkKey is not present')
sirk = sef(key, response[1:])
return (sirk_type, sirk)
+7 -3
View File
@@ -454,6 +454,8 @@ class DLC(EventEmitter):
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0 self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.sink = None self.sink = None
self.connection_result = None self.connection_result = None
self.drained = asyncio.Event()
self.drained.set()
# Compute the MTU # Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs max_overhead = 4 + 1 # header with 2-byte length + fcs
@@ -633,6 +635,8 @@ class DLC(EventEmitter):
) )
rx_credits_needed = 0 rx_credits_needed = 0
if not self.tx_buffer:
self.drained.set()
# Stream protocol # Stream protocol
def write(self, data: Union[bytes, str]) -> None: def write(self, data: Union[bytes, str]) -> None:
@@ -645,11 +649,11 @@ class DLC(EventEmitter):
raise ValueError('write only accept bytes or strings') raise ValueError('write only accept bytes or strings')
self.tx_buffer += data self.tx_buffer += data
self.drained.clear()
self.process_tx() self.process_tx()
def drain(self) -> None: async def drain(self) -> None:
# TODO await self.drained.wait()
pass
def __str__(self) -> str: def __str__(self) -> str:
return f'DLC(dlci={self.dlci},state={self.state.name})' return f'DLC(dlci={self.dlci},state={self.state.name})'
+30 -9
View File
@@ -7,16 +7,36 @@ throughput and/or latency between two devices.
# General Usage # General Usage
``` ```
Usage: bench.py [OPTIONS] COMMAND [ARGS]... Usage: bumble-bench [OPTIONS] COMMAND [ARGS]...
Options: Options:
--device-config FILENAME Device configuration file --device-config FILENAME Device configuration file
--role [sender|receiver|ping|pong] --role [sender|receiver|ping|pong]
--mode [gatt-client|gatt-server|l2cap-client|l2cap-server|rfcomm-client|rfcomm-server] --mode [gatt-client|gatt-server|l2cap-client|l2cap-server|rfcomm-client|rfcomm-server]
--att-mtu MTU GATT MTU (gatt-client mode) [23<=x<=517] --att-mtu MTU GATT MTU (gatt-client mode) [23<=x<=517]
-s, --packet-size SIZE Packet size (server role) [8<=x<=4096] --extended-data-length TEXT Request a data length upon connection,
-c, --packet-count COUNT Packet count (server role) specified as tx_octets/tx_time
-sd, --start-delay SECONDS Start delay (server role) --rfcomm-channel INTEGER RFComm channel to use
--rfcomm-uuid TEXT RFComm service UUID to use (ignored if
--rfcomm-channel is not 0)
--l2cap-psm INTEGER L2CAP PSM to use
--l2cap-mtu INTEGER L2CAP MTU to use
--l2cap-mps INTEGER L2CAP MPS to use
--l2cap-max-credits INTEGER L2CAP maximum number of credits allowed for
the peer
-s, --packet-size SIZE Packet size (client or ping role)
[8<=x<=4096]
-c, --packet-count COUNT Packet count (client or ping role)
-sd, --start-delay SECONDS Start delay (client or ping role)
--repeat N Repeat the run N times (client and ping
roles)(0, which is the fault, to run just
once)
--repeat-delay SECONDS Delay, in seconds, between repeats
--pace MILLISECONDS Wait N milliseconds between packets (0,
which is the fault, to send as fast as
possible)
--linger Don't exit at the end of a run (server and
pong roles)
--help Show this message and exit. --help Show this message and exit.
Commands: Commands:
@@ -35,17 +55,18 @@ Options:
--connection-interval, --ci CONNECTION_INTERVAL --connection-interval, --ci CONNECTION_INTERVAL
Connection interval (in ms) Connection interval (in ms)
--phy [1m|2m|coded] PHY to use --phy [1m|2m|coded] PHY to use
--authenticate Authenticate (RFComm only)
--encrypt Encrypt the connection (RFComm only)
--help Show this message and exit. --help Show this message and exit.
``` ```
To test once device against another, one of the two devices must be running
To test once device against another, one of the two devices must be running
the ``peripheral`` command and the other the ``central`` command. The device the ``peripheral`` command and the other the ``central`` command. The device
running the ``peripheral`` command will accept connections from the device running the ``peripheral`` command will accept connections from the device
running the ``central`` command. running the ``central`` command.
When using Bluetooth LE (all modes except for ``rfcomm-server`` and ``rfcomm-client``utils), When using Bluetooth LE (all modes except for ``rfcomm-server`` and ``rfcomm-client``utils),
the default addresses configured in the tool should be sufficient. But when using the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central Bluetooth Classic, the address of the Peripheral must be specified on the Central
using the ``--peripheral`` option. The address will be printed by the Peripheral when using the ``--peripheral`` option. The address will be printed by the Peripheral when
it starts. it starts.
@@ -83,7 +104,7 @@ the other on `usb:1`, and two consoles/terminals. We will run a command in each.
$ bumble-bench central usb:1 $ bumble-bench central usb:1
``` ```
In this default configuration, the Central runs a Sender, as a GATT client, In this default configuration, the Central runs a Sender, as a GATT client,
connecting to the Peripheral running a Receiver, as a GATT server. connecting to the Peripheral running a Receiver, as a GATT server.
!!! example "L2CAP Throughput" !!! example "L2CAP Throughput"
@@ -74,11 +74,13 @@ class L2capClient(
gatt: BluetoothGatt?, status: Int, newState: Int gatt: BluetoothGatt?, status: Int, newState: Int
) { ) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) { if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
gatt.setPreferredPhy( if (viewModel.use2mPhy) {
BluetoothDevice.PHY_LE_2M_MASK, gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK, BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED BluetoothDevice.PHY_LE_2M_MASK,
) BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy() gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android // Request an MTU update, even though we don't use GATT, because Android
@@ -27,11 +27,12 @@ val DEFAULT_RFCOMM_UUID: UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF" const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_SENDER_PACKET_COUNT = 100 const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024 const val DEFAULT_SENDER_PACKET_SIZE = 1024
const val DEFAULT_PSM = 128
class AppViewModel : ViewModel() { class AppViewModel : ViewModel() {
private var preferences: SharedPreferences? = null private var preferences: SharedPreferences? = null
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS) var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var l2capPsm by mutableIntStateOf(0) var l2capPsm by mutableIntStateOf(DEFAULT_PSM)
var use2mPhy by mutableStateOf(true) var use2mPhy by mutableStateOf(true)
var mtu by mutableIntStateOf(0) var mtu by mutableIntStateOf(0)
var rxPhy by mutableIntStateOf(0) var rxPhy by mutableIntStateOf(0)
+15 -6
View File
@@ -20,6 +20,7 @@ import os
import pytest import pytest
import struct import struct
import logging import logging
from unittest import mock
from bumble import device from bumble import device
from bumble.profiles import csip from bumble.profiles import csip
@@ -68,14 +69,18 @@ def test_sef():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_csis(): @pytest.mark.parametrize(
'sirk_type,', [(csip.SirkType.ENCRYPTED), (csip.SirkType.PLAINTEXT)]
)
async def test_csis(sirk_type):
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa') SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
LTK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
devices = TwoDevices() devices = TwoDevices()
devices[0].add_service( devices[0].add_service(
csip.CoordinatedSetIdentificationService( csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK, set_identity_resolving_key=SIRK,
set_identity_resolving_key_type=csip.SirkType.PLAINTEXT, set_identity_resolving_key_type=sirk_type,
coordinated_set_size=2, coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED, set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0, set_member_rank=0,
@@ -83,15 +88,19 @@ async def test_csis():
) )
await devices.setup_connection() await devices.setup_connection()
# Mock encryption.
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
devices[0].get_long_term_key = mock.AsyncMock(return_value=LTK)
devices[1].get_long_term_key = mock.AsyncMock(return_value=LTK)
peer = device.Peer(devices.connections[1]) peer = device.Peer(devices.connections[1])
csis_client = await peer.discover_service_and_create_proxy( csis_client = await peer.discover_service_and_create_proxy(
csip.CoordinatedSetIdentificationProxy csip.CoordinatedSetIdentificationProxy
) )
assert ( assert await csis_client.read_set_identity_resolving_key() == (sirk_type, SIRK)
await csis_client.set_identity_resolving_key.read_value()
== bytes([csip.SirkType.PLAINTEXT]) + SIRK
)
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2) assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
assert await csis_client.set_member_lock.read_value() == struct.pack( assert await csis_client.set_member_lock.read_value() == struct.pack(
'B', csip.MemberLock.UNLOCKED 'B', csip.MemberLock.UNLOCKED