Compare commits

...

26 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 54f2981267 fix uninitialized variable 2024-01-16 16:49:06 -08:00
Charlie Boutier bb025514e7 PandoraHost: compute advertising_interval_max with interval_range 2024-01-12 14:36:22 -08:00
Charlie Boutier e228597269 Pandora host: support advertising interval in advertise 2024-01-12 14:36:22 -08:00
Gilles Boccon-Gibod 95b0d6c6f2 Merge pull request #398 from google/gbg/rfcomm-no-sink
update credits even without a sink
2024-01-11 13:18:15 -08:00
zxzxwu 46ceea7ecd Merge pull request #391 from zxzxwu/remote_feature
LE read remote features
2024-01-10 15:51:32 +08:00
Gilles Boccon-Gibod 30f89d5739 simplify 2024-01-09 18:01:34 -08:00
Gilles Boccon-Gibod 481cf40831 update credits even without a sink 2024-01-09 17:58:52 -08:00
Josh Wu eff05afb7a LE read remote features 2024-01-09 11:30:08 +08:00
zxzxwu d8e6700611 Merge pull request #383 from zxzxwu/controller
Controller: SCO implementation
2024-01-09 09:39:13 +08:00
Gilles Boccon-Gibod 56eb5a933b Merge pull request #394 from google/gbg/hci-latency
add support for HCI latency probing
2024-01-08 09:21:00 -08:00
Gilles Boccon-Gibod caacc0c133 Merge pull request #395 from google/gbg/loopback-quick-fix
compatibility with recent host ACL property changes
2024-01-08 09:20:45 -08:00
Gilles Boccon-Gibod 5f377c024b format 2024-01-05 12:26:54 -08:00
Gilles Boccon-Gibod 00cd8fbdd0 compatibility with recent host ACL property changes 2024-01-05 12:17:09 -08:00
Gilles Boccon-Gibod aeeff18428 add support for HCI latency probing 2024-01-05 10:26:04 -08:00
Michael Mogenson c48e3f5e9c Merge pull request #393 from mogenson/controller-loopback
apps: Add a controller loopback throughput test app
2024-01-05 13:13:30 -05:00
Michael Mogenson d6bbc1145a apps: Add a controller loopback throughput test app
Add a command line utility to open a transport to a BT controller, put
the controller into local loopback mode, and send and receive ACL data
packets. Record the time it takes to send and receive all packets and
calculate a throughput measurement in kB/s.

This utility is usefull for characterizing the speed of a transport to a
BT controller (such as a TCP socket or serial port) without having to
deal with a connected peer or the variability of over the air
transmissions.

The transport CLI argument is required. The packet size and packet
count arguments are optional. They default to the same values as the
bumble-bench app.
2024-01-05 10:01:24 -05:00
zxzxwu e2fec67bd9 Merge pull request #390 from zxzxwu/csip
CSIP: Encrypted SIRK implementation
2024-01-04 13:28:23 +08:00
Josh Wu 88cb3b2a4d IWYU in CSIP 2024-01-04 13:22:09 +08:00
zxzxwu 9ebb03be46 Merge pull request #389 from zxzxwu/gitignore
.gitignore: Add venv directories
2024-01-04 12:54:30 +08:00
Gilles Boccon-Gibod 80d84af76c Merge pull request #392 from google/gbg/l2cap-drain
l2cap & rfcomm drain support
2024-01-03 09:59:36 -08:00
Gilles Boccon-Gibod 8f4721758f fix typo 2024-01-03 09:53:17 -08:00
Gilles Boccon-Gibod 8864af4acd format 2024-01-02 11:35:11 -08:00
Gilles Boccon-Gibod 8980fb8cc7 add drain support and a few tool options 2024-01-02 11:07:52 -08:00
Josh Wu 2c5f3472a9 CSIP: Encrypted SIRK implementation 2023-12-30 16:06:42 +08:00
Josh Wu f18277ac78 Ignore venv directories 2023-12-30 14:23:35 +08:00
Josh Wu 8d46bc04d2 Controller: SCO implementation 2023-12-30 14:22:58 +08:00
27 changed files with 1185 additions and 247 deletions
+2
View File
@@ -10,3 +10,5 @@ __pycache__
bumble/_version.py
.vscode/launch.json
/.idea
venv/
.venv/
+2
View File
@@ -22,6 +22,7 @@
"cmac",
"CONNECTIONLESS",
"csip",
"csis",
"csrcs",
"CVSD",
"datagram",
@@ -32,6 +33,7 @@
"dhkey",
"diversifier",
"endianness",
"ESCO",
"Fitbit",
"GATTLINK",
"HANDSFREE",
+301 -66
View File
@@ -80,10 +80,10 @@ SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53'
SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D'
DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
DEFAULT_L2CAP_PSM = 1234
DEFAULT_L2CAP_PSM = 128
DEFAULT_L2CAP_MAX_CREDITS = 128
DEFAULT_L2CAP_MTU = 1024
DEFAULT_L2CAP_MPS = 1022
DEFAULT_L2CAP_MPS = 1024
DEFAULT_LINGER_TIME = 1.0
DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0
@@ -240,6 +240,23 @@ async def find_rfcomm_channel_with_uuid(connection: Connection, uuid: str) -> in
return 0
def log_stats(title, stats):
stats_min = min(stats)
stats_max = max(stats)
stats_avg = sum(stats) / len(stats)
logging.info(
color(
(
f'### {title} stats: '
f'min={stats_min:.2f}, '
f'max={stats_max:.2f}, '
f'average={stats_avg:.2f}'
),
'cyan',
)
)
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
@@ -253,14 +270,27 @@ PACKET_FLAG_LAST = 1
# Sender
# -----------------------------------------------------------------------------
class Sender:
def __init__(self, packet_io, start_delay, packet_size, packet_count):
def __init__(
self,
packet_io,
start_delay,
repeat,
repeat_delay,
pace,
packet_size,
packet_count,
):
self.tx_start_delay = start_delay
self.tx_packet_size = packet_size
self.tx_packet_count = packet_count
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.repeat = repeat
self.repeat_delay = repeat_delay
self.pace = pace
self.start_time = 0
self.bytes_sent = 0
self.stats = []
self.done = asyncio.Event()
def reset(self):
@@ -271,27 +301,57 @@ class Sender:
await self.packet_io.ready.wait()
logging.info(color('--- Go!', 'blue'))
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
for run in range(self.repeat + 1):
self.done.clear()
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.start_time = time.time()
for tx_i in range(self.tx_packet_count):
packet_flags = PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6)
logging.info(color(f'Sending packet {tx_i}: {len(packet)} bytes', 'yellow'))
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
await asyncio.sleep(self.repeat_delay)
await self.done.wait()
logging.info(color('=== Done!', 'magenta'))
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.start_time = time.time()
self.bytes_sent = 0
for tx_i in range(self.tx_packet_count):
packet_flags = (
PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6 - self.packet_io.overhead_size)
logging.info(
color(
f'Sending packet {tx_i}: {self.tx_packet_size} bytes', 'yellow'
)
)
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
if self.pace is None:
continue
if self.pace > 0:
await asyncio.sleep(self.pace / 1000)
else:
await self.packet_io.drain()
await self.done.wait()
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
if self.repeat:
log_stats('Run', self.stats)
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
def on_packet_received(self, packet):
try:
@@ -302,6 +362,7 @@ class Sender:
if packet_type == PacketType.ACK:
elapsed = time.time() - self.start_time
average_tx_speed = self.bytes_sent / elapsed
self.stats.append(average_tx_speed)
logging.info(
color(
f'@@@ Received ACK. Speed: average={average_tx_speed:.4f}'
@@ -320,17 +381,17 @@ class Receiver:
start_timestamp: float
last_timestamp: float
def __init__(self, packet_io):
def __init__(self, packet_io, linger):
self.reset()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
def reset(self):
self.expected_packet_index = 0
self.start_timestamp = 0.0
self.last_timestamp = 0.0
self.bytes_received = 0
self.measurements = [(time.time(), 0)]
self.total_bytes_received = 0
def on_packet_received(self, packet):
try:
@@ -338,12 +399,9 @@ class Receiver:
except ValueError:
return
now = time.time()
if packet_type == PacketType.RESET:
logging.info(color('=== Received RESET', 'magenta'))
self.reset()
self.start_timestamp = now
return
try:
@@ -352,7 +410,8 @@ class Receiver:
return
logging.info(
f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, {len(packet)} bytes'
f'flags=0x{packet_flags:02X}, '
f'{len(packet) + self.packet_io.overhead_size} bytes'
)
if packet_index != self.expected_packet_index:
@@ -363,19 +422,27 @@ class Receiver:
)
)
elapsed_since_start = now - self.start_timestamp
elapsed_since_last = now - self.last_timestamp
self.bytes_received += len(packet)
now = time.time()
elapsed_since_start = now - self.measurements[0][0]
elapsed_since_last = now - self.measurements[-1][0]
self.measurements.append((now, len(packet)))
self.total_bytes_received += len(packet)
instant_rx_speed = len(packet) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start
average_rx_speed = self.total_bytes_received / elapsed_since_start
window = self.measurements[-64:]
windowed_rx_speed = sum(measurement[1] for measurement in window[1:]) / (
window[-1][0] - window[0][0]
)
logging.info(
color(
f'Speed: instant={instant_rx_speed:.4f}, average={average_rx_speed:.4f}',
'Speed: '
f'instant={instant_rx_speed:.4f}, '
f'windowed={windowed_rx_speed:.4f}, '
f'average={average_rx_speed:.4f}',
'yellow',
)
)
self.last_timestamp = now
self.expected_packet_index = packet_index + 1
if packet_flags & PACKET_FLAG_LAST:
@@ -385,7 +452,8 @@ class Receiver:
)
)
logging.info(color('@@@ Received last packet', 'green'))
self.done.set()
if not self.linger:
self.done.set()
async def run(self):
await self.done.wait()
@@ -396,16 +464,31 @@ class Receiver:
# Ping
# -----------------------------------------------------------------------------
class Ping:
def __init__(self, packet_io, start_delay, packet_size, packet_count):
def __init__(
self,
packet_io,
start_delay,
repeat,
repeat_delay,
pace,
packet_size,
packet_count,
):
self.tx_start_delay = start_delay
self.tx_packet_size = packet_size
self.tx_packet_count = packet_count
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.repeat = repeat
self.repeat_delay = repeat_delay
self.pace = pace
self.done = asyncio.Event()
self.current_packet_index = 0
self.ping_sent_time = 0.0
self.latencies = []
self.min_stats = []
self.max_stats = []
self.avg_stats = []
def reset(self):
pass
@@ -415,21 +498,57 @@ class Ping:
await self.packet_io.ready.wait()
logging.info(color('--- Go!', 'blue'))
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
for run in range(self.repeat + 1):
self.done.clear()
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
await asyncio.sleep(self.repeat_delay)
await self.send_next_ping()
if self.tx_start_delay:
logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
await asyncio.sleep(self.tx_start_delay)
await self.done.wait()
average_latency = sum(self.latencies) / len(self.latencies)
logging.info(color(f'@@@ Average latency: {average_latency:.2f}'))
logging.info(color('=== Done!', 'magenta'))
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.current_packet_index = 0
self.latencies = []
await self.send_next_ping()
await self.done.wait()
min_latency = min(self.latencies)
max_latency = max(self.latencies)
avg_latency = sum(self.latencies) / len(self.latencies)
logging.info(
color(
'@@@ Latencies: '
f'min={min_latency:.2f}, '
f'max={max_latency:.2f}, '
f'average={avg_latency:.2f}'
)
)
self.min_stats.append(min_latency)
self.max_stats.append(max_latency)
self.avg_stats.append(avg_latency)
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
if self.repeat:
log_stats('Min Latency', self.min_stats)
log_stats('Max Latency', self.max_stats)
log_stats('Average Latency', self.avg_stats)
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
async def send_next_ping(self):
if self.pace:
await asyncio.sleep(self.pace / 1000)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
@@ -488,10 +607,11 @@ class Ping:
class Pong:
expected_packet_index: int
def __init__(self, packet_io):
def __init__(self, packet_io, linger):
self.reset()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
def reset(self):
@@ -536,7 +656,7 @@ class Pong:
)
)
if packet_flags & PACKET_FLAG_LAST:
if packet_flags & PACKET_FLAG_LAST and not self.linger:
self.done.set()
async def run(self):
@@ -554,6 +674,7 @@ class GattClient:
self.speed_tx = None
self.packet_listener = None
self.ready = asyncio.Event()
self.overhead_size = 0
async def on_connection(self, connection):
peer = Peer(connection)
@@ -603,6 +724,9 @@ class GattClient:
async def send_packet(self, packet):
await self.speed_tx.write_value(packet)
async def drain(self):
pass
# -----------------------------------------------------------------------------
# GattServer
@@ -612,6 +736,7 @@ class GattServer:
self.device = device
self.packet_listener = None
self.ready = asyncio.Event()
self.overhead_size = 0
# Setup the GATT service
self.speed_tx = Characteristic(
@@ -653,6 +778,9 @@ class GattServer:
async def send_packet(self, packet):
await self.device.notify_subscribers(self.speed_rx, packet)
async def drain(self):
pass
# -----------------------------------------------------------------------------
# StreamedPacketIO
@@ -664,6 +792,7 @@ class StreamedPacketIO:
self.rx_packet = b''
self.rx_packet_header = b''
self.rx_packet_need = 0
self.overhead_size = 2
def on_packet(self, packet):
while packet:
@@ -715,6 +844,7 @@ class L2capClient(StreamedPacketIO):
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.l2cap_channel = None
self.ready = asyncio.Event()
async def on_connection(self, connection: Connection) -> None:
@@ -736,9 +866,10 @@ class L2capClient(StreamedPacketIO):
logging.info(color(f'!!! Connection failed: {error}', 'red'))
return
l2cap_channel.sink = self.on_packet
l2cap_channel.on('close', self.on_l2cap_close)
self.io_sink = l2cap_channel.write
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_packet
self.ready.set()
@@ -748,6 +879,10 @@ class L2capClient(StreamedPacketIO):
def on_l2cap_close(self):
logging.info(color('*** L2CAP channel closed', 'red'))
async def drain(self):
assert self.l2cap_channel
await self.l2cap_channel.drain()
# -----------------------------------------------------------------------------
# L2capServer
@@ -786,6 +921,7 @@ class L2capServer(StreamedPacketIO):
logging.info(color(f'*** L2CAP channel: {l2cap_channel}', 'cyan'))
self.io_sink = l2cap_channel.write
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_packet
@@ -795,6 +931,10 @@ class L2capServer(StreamedPacketIO):
logging.info(color('*** L2CAP channel closed', 'red'))
self.l2cap_channel = None
async def drain(self):
assert self.l2cap_channel
await self.l2cap_channel.drain()
# -----------------------------------------------------------------------------
# RfcommClient
@@ -805,6 +945,7 @@ class RfcommClient(StreamedPacketIO):
self.device = device
self.channel = channel
self.uuid = uuid
self.rfcomm_session = None
self.ready = asyncio.Event()
async def on_connection(self, connection):
@@ -840,12 +981,17 @@ class RfcommClient(StreamedPacketIO):
rfcomm_session.sink = self.on_packet
self.io_sink = rfcomm_session.write
self.rfcomm_session = rfcomm_session
self.ready.set()
def on_disconnection(self, _):
pass
async def drain(self):
assert self.rfcomm_session
await self.rfcomm_session.drain()
# -----------------------------------------------------------------------------
# RfcommServer
@@ -853,6 +999,7 @@ class RfcommClient(StreamedPacketIO):
class RfcommServer(StreamedPacketIO):
def __init__(self, device, channel):
super().__init__()
self.dlc = None
self.ready = asyncio.Event()
# Create and register a server
@@ -881,6 +1028,11 @@ class RfcommServer(StreamedPacketIO):
logging.info(color(f'*** DLC connected: {dlc}', 'blue'))
dlc.sink = self.on_packet
self.io_sink = dlc.write
self.dlc = dlc
async def drain(self):
assert self.dlc
await self.dlc.drain()
# -----------------------------------------------------------------------------
@@ -1030,6 +1182,7 @@ class Central(Connection.Listener):
await role.run()
await asyncio.sleep(DEFAULT_LINGER_TIME)
await self.connection.disconnect()
def on_disconnection(self, reason):
logging.info(color(f'!!! Disconnection: reason={reason}', 'red'))
@@ -1120,12 +1273,8 @@ class Peripheral(Device.Listener, Connection.Listener):
# Stop being discoverable and connectable
if self.classic:
async def stop_being_discoverable_connectable():
await self.device.set_discoverable(False)
await self.device.set_connectable(False)
AsyncRunner.spawn(stop_being_discoverable_connectable())
AsyncRunner.spawn(self.device.set_discoverable(False))
AsyncRunner.spawn(self.device.set_connectable(False))
# Request a new data length if needed
if self.extended_data_length:
@@ -1141,6 +1290,10 @@ class Peripheral(Device.Listener, Connection.Listener):
self.connection = None
self.role.reset()
if self.classic:
AsyncRunner.spawn(self.device.set_discoverable(True))
AsyncRunner.spawn(self.device.set_connectable(True))
def on_connection_parameters_update(self):
print_connection(self.connection)
@@ -1168,10 +1321,22 @@ def create_mode_factory(ctx, default_mode):
return GattServer(device)
if mode == 'l2cap-client':
return L2capClient(device, psm=ctx.obj['l2cap_psm'])
return L2capClient(
device,
psm=ctx.obj['l2cap_psm'],
mtu=ctx.obj['l2cap_mtu'],
mps=ctx.obj['l2cap_mps'],
max_credits=ctx.obj['l2cap_max_credits'],
)
if mode == 'l2cap-server':
return L2capServer(device, psm=ctx.obj['l2cap_psm'])
return L2capServer(
device,
psm=ctx.obj['l2cap_psm'],
mtu=ctx.obj['l2cap_mtu'],
mps=ctx.obj['l2cap_mps'],
max_credits=ctx.obj['l2cap_max_credits'],
)
if mode == 'rfcomm-client':
return RfcommClient(
@@ -1197,23 +1362,29 @@ def create_role_factory(ctx, default_role):
return Sender(
packet_io,
start_delay=ctx.obj['start_delay'],
repeat=ctx.obj['repeat'],
repeat_delay=ctx.obj['repeat_delay'],
pace=ctx.obj['pace'],
packet_size=ctx.obj['packet_size'],
packet_count=ctx.obj['packet_count'],
)
if role == 'receiver':
return Receiver(packet_io)
return Receiver(packet_io, ctx.obj['linger'])
if role == 'ping':
return Ping(
packet_io,
start_delay=ctx.obj['start_delay'],
repeat=ctx.obj['repeat'],
repeat_delay=ctx.obj['repeat_delay'],
pace=ctx.obj['pace'],
packet_size=ctx.obj['packet_size'],
packet_count=ctx.obj['packet_count'],
)
if role == 'pong':
return Pong(packet_io)
return Pong(packet_io, ctx.obj['linger'])
raise ValueError('invalid role')
@@ -1258,7 +1429,7 @@ def create_role_factory(ctx, default_role):
@click.option(
'--rfcomm-uuid',
default=DEFAULT_RFCOMM_UUID,
help='RFComm service UUID to use (ignored is --rfcomm-channel is not 0)',
help='RFComm service UUID to use (ignored if --rfcomm-channel is not 0)',
)
@click.option(
'--l2cap-psm',
@@ -1266,13 +1437,31 @@ def create_role_factory(ctx, default_role):
default=DEFAULT_L2CAP_PSM,
help='L2CAP PSM to use',
)
@click.option(
'--l2cap-mtu',
type=int,
default=DEFAULT_L2CAP_MTU,
help='L2CAP MTU to use',
)
@click.option(
'--l2cap-mps',
type=int,
default=DEFAULT_L2CAP_MPS,
help='L2CAP MPS to use',
)
@click.option(
'--l2cap-max-credits',
type=int,
default=DEFAULT_L2CAP_MAX_CREDITS,
help='L2CAP maximum number of credits allowed for the peer',
)
@click.option(
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 4096),
default=500,
help='Packet size (server role)',
help='Packet size (client or ping role)',
)
@click.option(
'--packet-count',
@@ -1280,7 +1469,7 @@ def create_role_factory(ctx, default_role):
metavar='COUNT',
type=int,
default=10,
help='Packet count (server role)',
help='Packet count (client or ping role)',
)
@click.option(
'--start-delay',
@@ -1288,7 +1477,39 @@ def create_role_factory(ctx, default_role):
metavar='SECONDS',
type=int,
default=1,
help='Start delay (server role)',
help='Start delay (client or ping role)',
)
@click.option(
'--repeat',
metavar='N',
type=int,
default=0,
help=(
'Repeat the run N times (client and ping roles)'
'(0, which is the fault, to run just once) '
),
)
@click.option(
'--repeat-delay',
metavar='SECONDS',
type=int,
default=1,
help=('Delay, in seconds, between repeats'),
)
@click.option(
'--pace',
metavar='MILLISECONDS',
type=int,
default=0,
help=(
'Wait N milliseconds between packets '
'(0, which is the fault, to send as fast as possible) '
),
)
@click.option(
'--linger',
is_flag=True,
help="Don't exit at the end of a run (server and pong roles)",
)
@click.pass_context
def bench(
@@ -1301,9 +1522,16 @@ def bench(
packet_size,
packet_count,
start_delay,
repeat,
repeat_delay,
pace,
linger,
rfcomm_channel,
rfcomm_uuid,
l2cap_psm,
l2cap_mtu,
l2cap_mps,
l2cap_max_credits,
):
ctx.ensure_object(dict)
ctx.obj['device_config'] = device_config
@@ -1313,9 +1541,16 @@ def bench(
ctx.obj['rfcomm_channel'] = rfcomm_channel
ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['l2cap_mtu'] = l2cap_mtu
ctx.obj['l2cap_mps'] = l2cap_mps
ctx.obj['l2cap_max_credits'] = l2cap_max_credits
ctx.obj['packet_size'] = packet_size
ctx.obj['packet_count'] = packet_count
ctx.obj['start_delay'] = start_delay
ctx.obj['repeat'] = repeat
ctx.obj['repeat_delay'] = repeat_delay
ctx.obj['pace'] = pace
ctx.obj['linger'] = linger
ctx.obj['extended_data_length'] = (
[int(x) for x in extended_data_length.split('/')]
+33 -7
View File
@@ -18,15 +18,17 @@
import asyncio
import os
import logging
import click
from bumble.company_ids import COMPANY_IDENTIFIERS
import time
import click
from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.colors import color
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
LeFeatureMask,
HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
@@ -48,6 +50,7 @@ from bumble.hci import (
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_Read_Local_Version_Information_Command,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
@@ -137,7 +140,7 @@ async def get_le_info(host: Host) -> None:
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))
print(LeFeatureMask(feature).name)
# -----------------------------------------------------------------------------
@@ -166,7 +169,7 @@ async def get_acl_flow_control_info(host: Host) -> None:
# -----------------------------------------------------------------------------
async def async_main(transport):
async def async_main(latency_probes, transport):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
print('<<< connected')
@@ -174,6 +177,23 @@ async def async_main(transport):
host = Host(hci_source, hci_sink)
await host.reset()
# Measure the latency if requested
latencies = []
if latency_probes:
for _ in range(latency_probes):
start = time.time()
await host.send_command(HCI_Read_Local_Version_Information_Command())
latencies.append(1000 * (time.time() - start))
print(
color('HCI Command Latency:', 'yellow'),
(
f'min={min(latencies):.2f}, '
f'max={max(latencies):.2f}, '
f'average={sum(latencies)/len(latencies):.2f}'
),
'\n',
)
# Print version
print(color('Version:', 'yellow'))
print(
@@ -209,10 +229,16 @@ async def async_main(transport):
# -----------------------------------------------------------------------------
@click.command()
@click.option(
'--latency-probes',
metavar='N',
type=int,
help='Send N commands to measure HCI transport latency statistics',
)
@click.argument('transport')
def main(transport):
def main(latency_probes, transport):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(async_main(transport))
asyncio.run(async_main(latency_probes, transport))
# -----------------------------------------------------------------------------
+200
View File
@@ -0,0 +1,200 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import time
from typing import Optional
from bumble.colors import color
from bumble.hci import (
HCI_READ_LOOPBACK_MODE_COMMAND,
HCI_Read_Loopback_Mode_Command,
HCI_WRITE_LOOPBACK_MODE_COMMAND,
HCI_Write_Loopback_Mode_Command,
LoopbackMode,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
import click
class Loopback:
"""Send and receive ACL data packets in local loopback mode"""
def __init__(self, packet_size: int, packet_count: int, transport: str):
self.transport = transport
self.packet_size = packet_size
self.packet_count = packet_count
self.connection_handle: Optional[int] = None
self.connection_event = asyncio.Event()
self.done = asyncio.Event()
self.expected_cid = 0
self.bytes_received = 0
self.start_timestamp = 0.0
self.last_timestamp = 0.0
def on_connection(self, connection_handle: int, *args):
"""Retrieve connection handle from new connection event"""
if not self.connection_event.is_set():
# save first connection handle for ACL
# subsequent connections are SCO
self.connection_handle = connection_handle
self.connection_event.set()
def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes):
"""Calculate packet receive speed"""
now = time.time()
print(f'<<< Received packet {cid}: {len(pdu)} bytes')
assert connection_handle == self.connection_handle
assert cid == self.expected_cid
self.expected_cid += 1
if cid == 0:
self.start_timestamp = now
else:
elapsed_since_start = now - self.start_timestamp
elapsed_since_last = now - self.last_timestamp
self.bytes_received += len(pdu)
instant_rx_speed = len(pdu) / elapsed_since_last
average_rx_speed = self.bytes_received / elapsed_since_start
print(
color(
f'@@@ RX speed: instant={instant_rx_speed:.4f},'
f' average={average_rx_speed:.4f}',
'cyan',
)
)
self.last_timestamp = now
if self.expected_cid == self.packet_count:
print(color('@@@ Received last packet', 'green'))
self.done.set()
async def run(self):
"""Run a loopback throughput test"""
print(color('>>> Connecting to HCI...', 'green'))
async with await open_transport_or_link(self.transport) as (
hci_source,
hci_sink,
):
print(color('>>> Connected', 'green'))
host = Host(hci_source, hci_sink)
await host.reset()
# make sure data can fit in one l2cap pdu
l2cap_header_size = 4
max_packet_size = host.acl_packet_queue.max_packet_size - l2cap_header_size
if self.packet_size > max_packet_size:
print(
color(
f'!!! Packet size ({self.packet_size}) larger than max supported'
f' size ({max_packet_size})',
'red',
)
)
return
if not host.supports_command(
HCI_WRITE_LOOPBACK_MODE_COMMAND
) or not host.supports_command(HCI_READ_LOOPBACK_MODE_COMMAND):
print(color('!!! Loopback mode not supported', 'red'))
return
# set event callbacks
host.on('connection', self.on_connection)
host.on('l2cap_pdu', self.on_l2cap_pdu)
loopback_mode = LoopbackMode.LOCAL
print(color('### Setting loopback mode', 'blue'))
await host.send_command(
HCI_Write_Loopback_Mode_Command(loopback_mode=LoopbackMode.LOCAL),
check_result=True,
)
print(color('### Checking loopback mode', 'blue'))
response = await host.send_command(
HCI_Read_Loopback_Mode_Command(), check_result=True
)
if response.return_parameters.loopback_mode != loopback_mode:
print(color('!!! Loopback mode mismatch', 'red'))
return
await self.connection_event.wait()
print(color('### Connected', 'cyan'))
print(color('=== Start sending', 'magenta'))
start_time = time.time()
bytes_sent = 0
for cid in range(0, self.packet_count):
# using the cid as an incremental index
host.send_l2cap_pdu(
self.connection_handle, cid, bytes(self.packet_size)
)
print(
color(
f'>>> Sending packet {cid}: {self.packet_size} bytes', 'yellow'
)
)
bytes_sent += self.packet_size # don't count L2CAP or HCI header sizes
await asyncio.sleep(0) # yield to allow packet receive
await self.done.wait()
print(color('=== Done!', 'magenta'))
elapsed = time.time() - start_time
average_tx_speed = bytes_sent / elapsed
print(
color(
f'@@@ TX speed: average={average_tx_speed:.4f} ({bytes_sent} bytes'
f' in {elapsed:.2f} seconds)',
'green',
)
)
# -----------------------------------------------------------------------------
@click.command()
@click.option(
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 4096),
default=500,
help='Packet size',
)
@click.option(
'--packet-count',
'-c',
metavar='COUNT',
type=int,
default=10,
help='Packet count',
)
@click.argument('transport')
def main(packet_size, packet_count, transport):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
loopback = Loopback(packet_size, packet_count, transport)
asyncio.run(loopback.run())
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()
+32 -24
View File
@@ -49,14 +49,16 @@ class ServerBridge:
self.tcp_port = tcp_port
async def start(self, device: Device) -> None:
# Listen for incoming L2CAP CoC connections
# Listen for incoming L2CAP channel connections
device.create_l2cap_server(
spec=l2cap.LeCreditBasedChannelSpec(
psm=self.psm, mtu=self.mtu, mps=self.mps, max_credits=self.max_credits
),
handler=self.on_coc,
handler=self.on_channel,
)
print(
color(f'### Listening for channel connection on PSM {self.psm}', 'yellow')
)
print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
def on_ble_connection(connection):
def on_ble_disconnection(reason):
@@ -73,7 +75,7 @@ class ServerBridge:
await device.start_advertising(auto_restart=True)
# Called when a new L2CAP connection is established
def on_coc(self, l2cap_channel):
def on_channel(self, l2cap_channel):
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
class Pipe:
@@ -83,7 +85,7 @@ class ServerBridge:
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_coc_sdu
l2cap_channel.sink = self.on_channel_sdu
async def connect_to_tcp(self):
# Connect to the TCP server
@@ -128,7 +130,7 @@ class ServerBridge:
if self.tcp_transport is not None:
self.tcp_transport.close()
def on_coc_sdu(self, sdu):
def on_channel_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
if self.tcp_transport is None:
print(color('!!! TCP socket not open, dropping', 'red'))
@@ -183,7 +185,7 @@ class ClientBridge:
peer_name = writer.get_extra_info('peer_name')
print(color(f'<<< TCP connection from {peer_name}', 'magenta'))
def on_coc_sdu(sdu):
def on_channel_sdu(sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
l2cap_to_tcp_pipe.write(sdu)
@@ -209,7 +211,7 @@ class ClientBridge:
writer.close()
return
l2cap_channel.sink = on_coc_sdu
l2cap_channel.sink = on_channel_sdu
l2cap_channel.on('close', on_l2cap_close)
# Start a flow control pipe from L2CAP to TCP
@@ -274,23 +276,29 @@ async def run(device_config, hci_transport, bridge):
@click.pass_context
@click.option('--device-config', help='Device configuration file', required=True)
@click.option('--hci-transport', help='HCI transport', required=True)
@click.option('--psm', help='PSM for L2CAP CoC', type=int, default=1234)
@click.option('--psm', help='PSM for L2CAP', type=int, default=1234)
@click.option(
'--l2cap-coc-max-credits',
help='Maximum L2CAP CoC Credits',
'--l2cap-max-credits',
help='Maximum L2CAP Credits',
type=click.IntRange(1, 65535),
default=128,
)
@click.option(
'--l2cap-coc-mtu',
help='L2CAP CoC MTU',
type=click.IntRange(23, 65535),
default=1022,
'--l2cap-mtu',
help='L2CAP MTU',
type=click.IntRange(
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU,
),
default=1024,
)
@click.option(
'--l2cap-coc-mps',
help='L2CAP CoC MPS',
type=click.IntRange(23, 65533),
'--l2cap-mps',
help='L2CAP MPS',
type=click.IntRange(
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS,
l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS,
),
default=1024,
)
def cli(
@@ -298,17 +306,17 @@ def cli(
device_config,
hci_transport,
psm,
l2cap_coc_max_credits,
l2cap_coc_mtu,
l2cap_coc_mps,
l2cap_max_credits,
l2cap_mtu,
l2cap_mps,
):
context.ensure_object(dict)
context.obj['device_config'] = device_config
context.obj['hci_transport'] = hci_transport
context.obj['psm'] = psm
context.obj['max_credits'] = l2cap_coc_max_credits
context.obj['mtu'] = l2cap_coc_mtu
context.obj['mps'] = l2cap_coc_mps
context.obj['max_credits'] = l2cap_max_credits
context.obj['mtu'] = l2cap_mtu
context.obj['mps'] = l2cap_mps
# -----------------------------------------------------------------------------
+144 -24
View File
@@ -19,6 +19,7 @@ from __future__ import annotations
import logging
import asyncio
import dataclasses
import itertools
import random
import struct
@@ -42,6 +43,7 @@ from bumble.hci import (
HCI_LE_1M_PHY,
HCI_SUCCESS,
HCI_UNKNOWN_HCI_COMMAND_ERROR,
HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
HCI_VERSION_BLUETOOTH_CORE_5_0,
Address,
@@ -53,6 +55,7 @@ from bumble.hci import (
HCI_Connection_Request_Event,
HCI_Disconnection_Complete_Event,
HCI_Encryption_Change_Event,
HCI_Synchronous_Connection_Complete_Event,
HCI_LE_Advertising_Report_Event,
HCI_LE_Connection_Complete_Event,
HCI_LE_Read_Remote_Features_Complete_Event,
@@ -60,10 +63,11 @@ from bumble.hci import (
HCI_Packet,
HCI_Role_Change_Event,
)
from typing import Optional, Union, Dict, TYPE_CHECKING
from typing import Optional, Union, Dict, Any, TYPE_CHECKING
if TYPE_CHECKING:
from bumble.transport.common import TransportSink, TransportSource
from bumble.link import LocalLink
from bumble.transport.common import TransportSink
# -----------------------------------------------------------------------------
# Logging
@@ -79,15 +83,18 @@ class DataObject:
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class Connection:
def __init__(self, controller, handle, role, peer_address, link, transport):
self.controller = controller
self.handle = handle
self.role = role
self.peer_address = peer_address
self.link = link
controller: Controller
handle: int
role: int
peer_address: Address
link: Any
transport: int
link_type: int
def __post_init__(self):
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
def on_hci_acl_data_packet(self, packet):
self.assembler.feed_packet(packet)
@@ -106,10 +113,10 @@ class Connection:
class Controller:
def __init__(
self,
name,
name: str,
host_source=None,
host_sink: Optional[TransportSink] = None,
link=None,
link: Optional[LocalLink] = None,
public_address: Optional[Union[bytes, str, Address]] = None,
):
self.name = name
@@ -359,12 +366,13 @@ class Controller:
if connection is None:
connection_handle = self.allocate_connection_handle()
connection = Connection(
self,
connection_handle,
BT_PERIPHERAL_ROLE,
peer_address,
self.link,
BT_LE_TRANSPORT,
controller=self,
handle=connection_handle,
role=BT_PERIPHERAL_ROLE,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
self.peripheral_connections[peer_address] = connection
logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}')
@@ -418,12 +426,13 @@ class Controller:
if connection is None:
connection_handle = self.allocate_connection_handle()
connection = Connection(
self,
connection_handle,
BT_CENTRAL_ROLE,
peer_address,
self.link,
BT_LE_TRANSPORT,
controller=self,
handle=connection_handle,
role=BT_CENTRAL_ROLE,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
self.central_connections[peer_address] = connection
logger.debug(
@@ -568,6 +577,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
self.classic_connections[peer_address] = connection
logger.debug(
@@ -621,6 +631,42 @@ class Controller:
)
)
def on_classic_sco_connection_complete(
self, peer_address: Address, status: int, link_type: int
):
if status == HCI_SUCCESS:
# Allocate (or reuse) a connection handle
connection_handle = self.allocate_connection_handle()
connection = Connection(
controller=self,
handle=connection_handle,
# Role doesn't matter in SCO.
role=BT_CENTRAL_ROLE,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
link_type=link_type,
)
self.classic_connections[peer_address] = connection
logger.debug(f'New SCO connection handle: 0x{connection_handle:04X}')
else:
connection_handle = 0
self.send_hci_packet(
HCI_Synchronous_Connection_Complete_Event(
status=status,
connection_handle=connection_handle,
bd_addr=peer_address,
link_type=link_type,
# TODO: Provide SCO connection parameters.
transmission_interval=0,
retransmission_window=0,
rx_packet_length=0,
tx_packet_length=0,
air_mode=0,
)
)
############################################################
# Advertising support
############################################################
@@ -740,6 +786,68 @@ class Controller:
)
self.link.classic_accept_connection(self, command.bd_addr, command.role)
def on_hci_enhanced_setup_synchronous_connection_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.1.45 Enhanced Setup Synchronous Connection command
'''
if self.link is None:
return
if not (
connection := self.find_classic_connection_by_handle(
command.connection_handle
)
):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.link.classic_sco_connect(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
)
def on_hci_enhanced_accept_synchronous_connection_request_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.1.46 Enhanced Accept Synchronous Connection Request command
'''
if self.link is None:
return
if not (connection := self.find_classic_connection_by_address(command.bd_addr)):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.link.classic_accept_sco_connection(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
)
def on_hci_switch_role_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
@@ -1104,6 +1212,18 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command
'''
handle = command.connection_handle
if not self.find_connection_by_handle(handle):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
# First, say that the command is pending
self.send_hci_packet(
HCI_Command_Status_Event(
@@ -1117,7 +1237,7 @@ class Controller:
self.send_hci_packet(
HCI_LE_Read_Remote_Features_Complete_Event(
status=HCI_SUCCESS,
connection_handle=0,
connection_handle=handle,
le_features=bytes.fromhex('dd40000000000000'),
)
)
+64 -19
View File
@@ -49,7 +49,6 @@ from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
@@ -60,12 +59,8 @@ from .hci import (
HCI_LE_1M_PHY,
HCI_LE_1M_PHY_BIT,
HCI_LE_2M_PHY,
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_CLEAR_RESOLVING_LIST_COMMAND,
HCI_LE_CODED_PHY,
HCI_LE_CODED_PHY_BIT,
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE,
HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND,
HCI_LE_RAND_COMMAND,
HCI_LE_READ_PHY_COMMAND,
@@ -86,7 +81,7 @@ from .hci import (
HCI_Constant,
HCI_Create_Connection_Cancel_Command,
HCI_Create_Connection_Command,
HCI_Create_Connection_Command,
HCI_Connection_Complete_Event,
HCI_Disconnect_Command,
HCI_Encryption_Change_Event,
HCI_Error,
@@ -107,6 +102,7 @@ from .hci import (
HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command,
HCI_LE_Read_Remote_Features_Command,
HCI_LE_Reject_CIS_Request_Command,
HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command,
@@ -152,6 +148,7 @@ from .hci import (
HCI_Write_Secure_Connections_Host_Support_Command,
HCI_Write_Simple_Pairing_Mode_Command,
OwnAddressType,
LeFeatureMask,
phy_list_to_bits,
)
from .host import Host
@@ -682,6 +679,7 @@ class Connection(CompositeEventEmitter):
self_address: Address
peer_address: Address
peer_resolvable_address: Optional[Address]
peer_le_features: Optional[LeFeatureMask]
role: int
encryption: int
authenticated: bool
@@ -758,6 +756,7 @@ class Connection(CompositeEventEmitter):
) # By default, use the device's shared server
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
self.peer_le_features = None
# [Classic only]
@classmethod
@@ -906,6 +905,15 @@ class Connection(CompositeEventEmitter):
async def request_remote_name(self):
return await self.device.request_remote_name(self)
async def get_remote_le_features(self) -> LeFeatureMask:
"""[LE Only] Reads remote LE supported features.
Returns:
LE features supported by the remote device.
"""
self.peer_le_features = await self.device.get_remote_le_features(self)
return self.peer_le_features
async def __aenter__(self):
return self
@@ -1538,9 +1546,7 @@ class Device(CompositeEventEmitter):
if self.cis_enabled:
await self.send_command(
HCI_LE_Set_Host_Feature_Command(
bit_number=(
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
),
bit_number=LeFeatureMask.CONNECTED_ISOCHRONOUS_STREAM,
bit_value=1,
)
)
@@ -1596,21 +1602,21 @@ class Device(CompositeEventEmitter):
)
)
def supports_le_feature(self, feature):
return self.host.supports_le_feature(feature)
def supports_le_features(self, feature: LeFeatureMask) -> bool:
return self.host.supports_le_features(feature)
def supports_le_phy(self, phy):
if phy == HCI_LE_1M_PHY:
return True
feature_map = {
HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY,
HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY,
}
if phy not in feature_map:
raise ValueError('invalid PHY')
return self.host.supports_le_feature(feature_map[phy])
return self.host.supports_le_features(feature_map[phy])
@deprecated("Please use start_legacy_advertising.")
async def start_advertising(
@@ -1920,8 +1926,8 @@ class Device(CompositeEventEmitter):
self.advertisement_accumulators = {}
# Enable scanning
if not legacy and self.supports_le_feature(
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE
if not legacy and self.supports_le_features(
LeFeatureMask.LE_EXTENDED_ADVERTISING
):
# Set the scanning parameters
scan_type = (
@@ -1939,7 +1945,7 @@ class Device(CompositeEventEmitter):
scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
scanning_phy_count += 1
if HCI_LE_CODED_PHY in scanning_phys:
if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
if self.supports_le_features(LeFeatureMask.LE_CODED_PHY):
scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
scanning_phy_count += 1
@@ -2000,7 +2006,7 @@ class Device(CompositeEventEmitter):
async def stop_scanning(self) -> None:
# Disable scanning
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
if self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING):
await self.send_command(
HCI_LE_Set_Extended_Scan_Enable_Command(
enable=0, filter_duplicates=0, duration=0, period=0
@@ -3142,6 +3148,32 @@ class Device(CompositeEventEmitter):
)
raise HCI_StatusError(result)
async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask:
"""[LE Only] Reads remote LE supported features.
Args:
handle: connection handle to read LE features.
Returns:
LE features supported by the remote device.
"""
with closing(EventWatcher()) as watcher:
read_feature_future: asyncio.Future[
LeFeatureMask
] = asyncio.get_running_loop().create_future()
def on_le_remote_features(handle: int, features: int):
if handle == connection.handle:
read_feature_future.set_result(LeFeatureMask(features))
watcher.on(self.host, 'le_remote_features', on_le_remote_features)
await self.send_command(
HCI_LE_Read_Remote_Features_Command(
connection_handle=connection.handle
),
)
return await read_feature_future
@host_event_handler
def on_flush(self):
self.emit('flush')
@@ -3319,8 +3351,21 @@ class Device(CompositeEventEmitter):
def on_connection_request(self, bd_addr, class_of_device, link_type):
logger.debug(f'*** Connection request: {bd_addr}')
# Handle SCO request.
if link_type in (
HCI_Connection_Complete_Event.SCO_LINK_TYPE,
HCI_Connection_Complete_Event.ESCO_LINK_TYPE,
):
if connection := self.find_connection_by_bd_addr(
bd_addr, transport=BT_BR_EDR_TRANSPORT
):
self.emit('sco_request', connection, link_type)
else:
logger.error(f'SCO request from a non-connected device {bd_addr}')
return
# match a pending future using `bd_addr`
if bd_addr in self.classic_pending_accepts:
elif bd_addr in self.classic_pending_accepts:
future, *_ = self.classic_pending_accepts.pop(bd_addr)
future.set_result((bd_addr, class_of_device, link_type))
+82 -50
View File
@@ -1360,55 +1360,51 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
# LE Supported Features
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40
HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44
HCI_LE_SUPPORTED_FEATURES_NAMES = {
flag: feature_name for (feature_name, flag) in globals().items()
if feature_name.startswith('HCI_') and feature_name.endswith('_LE_SUPPORTED_FEATURE')
}
class LeFeatureMask(enum.IntFlag):
LE_ENCRYPTION = 1 << 0
CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1 << 1
EXTENDED_REJECT_INDICATION = 1 << 2
PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 1 << 3
LE_PING = 1 << 4
LE_DATA_PACKET_LENGTH_EXTENSION = 1 << 5
LL_PRIVACY = 1 << 6
EXTENDED_SCANNER_FILTER_POLICIES = 1 << 7
LE_2M_PHY = 1 << 8
STABLE_MODULATION_INDEX_TRANSMITTER = 1 << 9
STABLE_MODULATION_INDEX_RECEIVER = 1 << 10
LE_CODED_PHY = 1 << 11
LE_EXTENDED_ADVERTISING = 1 << 12
LE_PERIODIC_ADVERTISING = 1 << 13
CHANNEL_SELECTION_ALGORITHM_2 = 1 << 14
LE_POWER_CLASS_1 = 1 << 15
MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 1 << 16
CONNECTION_CTE_REQUEST = 1 << 17
CONNECTION_CTE_RESPONSE = 1 << 18
CONNECTIONLESS_CTE_TRANSMITTER = 1 << 19
CONNECTIONLESS_CTR_RECEIVER = 1 << 20
ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 1 << 21
ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 1 << 22
RECEIVING_CONSTANT_TONE_EXTENSIONS = 1 << 23
PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 1 << 24
PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 1 << 25
SLEEP_CLOCK_ACCURACY_UPDATES = 1 << 26
REMOTE_PUBLIC_KEY_VALIDATION = 1 << 27
CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 1 << 28
CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 1 << 29
ISOCHRONOUS_BROADCASTER = 1 << 30
SYNCHRONIZED_RECEIVER = 1 << 31
CONNECTED_ISOCHRONOUS_STREAM = 1 << 32
LE_POWER_CONTROL_REQUEST = 1 << 33
LE_POWER_CONTROL_REQUEST_DUP = 1 << 34
LE_PATH_LOSS_MONITORING = 1 << 35
PERIODIC_ADVERTISING_ADI_SUPPORT = 1 << 36
CONNECTION_SUBRATING = 1 << 37
CONNECTION_SUBRATING_HOST_SUPPORT = 1 << 38
CHANNEL_CLASSIFICATION = 1 << 39
ADVERTISING_CODING_SELECTION = 1 << 40
ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 1 << 41
PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 1 << 43
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 1 << 44
# fmt: on
@@ -2026,6 +2022,17 @@ class OwnAddressType(enum.IntEnum):
return {'size': 1, 'mapper': lambda x: OwnAddressType(x).name}
# -----------------------------------------------------------------------------
class LoopbackMode(enum.IntEnum):
DISABLED = 0
LOCAL = 1
REMOTE = 2
@classmethod
def type_spec(cls):
return {'size': 1, 'mapper': lambda x: LoopbackMode(x).name}
# -----------------------------------------------------------------------------
class HCI_Packet:
'''
@@ -3352,6 +3359,27 @@ class HCI_Read_Encryption_Key_Size_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('loopback_mode', LoopbackMode.type_spec()),
],
)
class HCI_Read_Loopback_Mode_Command(HCI_Command):
'''
See Bluetooth spec @ 7.6.1 Read Loopback Mode Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('loopback_mode', 1)])
class HCI_Write_Loopback_Mode_Command(HCI_Command):
'''
See Bluetooth spec @ 7.6.2 Write Loopback Mode Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('le_event_mask', 8)])
class HCI_LE_Set_Event_Mask_Command(HCI_Command):
@@ -4733,7 +4761,11 @@ class HCI_Event(HCI_Packet):
HCI_Object.init_from_bytes(self, parameters, 0, fields)
return self
def __init__(self, event_code, parameters=None, **kwargs):
def __init__(self, event_code=-1, parameters=None, **kwargs):
# Since the legacy implementation relies on an __init__ injector, typing always
# complains that positional argument event_code is not passed, so here sets a
# default value to allow building derived HCI_Event without event_code.
assert event_code != -1
super().__init__(HCI_Event.event_name(event_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
HCI_Object.init_from_fields(self, fields, kwargs)
+3 -2
View File
@@ -18,7 +18,7 @@
from __future__ import annotations
from collections.abc import Callable, MutableMapping
from typing import cast, Any
from typing import cast, Any, Optional
import logging
from bumble import avdtp
@@ -69,7 +69,7 @@ PSM_NAMES = {
class PacketTracer:
class AclStream:
psms: MutableMapping[int, int]
peer: PacketTracer.AclStream
peer: Optional[PacketTracer.AclStream]
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
@@ -77,6 +77,7 @@ class PacketTracer:
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None
# pylint: disable=too-many-nested-blocks
def on_acl_pdu(self, pdu: bytes) -> None:
+15 -2
View File
@@ -70,6 +70,7 @@ from .hci import (
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_SynchronousDataPacket,
LeFeatureMask,
)
from .core import (
BT_BR_EDR_TRANSPORT,
@@ -487,8 +488,8 @@ class Host(AbortableEventEmitter):
return commands
def supports_le_feature(self, feature):
return (self.local_le_features & (1 << feature)) != 0
def supports_le_features(self, feature: LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature
@property
def supported_le_features(self):
@@ -1033,3 +1034,15 @@ class Host(AbortableEventEmitter):
event.bd_addr,
event.host_supported_features,
)
def on_hci_le_read_remote_features_complete_event(self, event):
if event.status != HCI_SUCCESS:
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)
else:
self.emit(
'le_remote_features',
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)
+10 -5
View File
@@ -149,10 +149,11 @@ L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU = 65535
L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23
L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2046
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048
L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
@@ -188,8 +189,11 @@ class LeCreditBasedChannelSpec:
or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
):
raise ValueError('max credits out of range')
if self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU:
raise ValueError('MTU too small')
if (
self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU
or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU
):
raise ValueError('MTU out of range')
if (
self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS
or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS
@@ -1644,12 +1648,13 @@ class ChannelManager:
def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None:
pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
pdu_bytes = bytes(pdu)
logger.debug(
f'{color(">>> Sending L2CAP PDU", "blue")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
f'{connection.peer_address}: {pdu_str}'
f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}'
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu))
self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes)
def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
+55 -1
View File
@@ -26,9 +26,13 @@ from bumble.hci import (
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
HCI_PAGE_TIMEOUT_ERROR,
HCI_Connection_Complete_Event,
)
from bumble import controller
from typing import Optional, Set
# -----------------------------------------------------------------------------
# Logging
@@ -57,6 +61,8 @@ class LocalLink:
Link bus for controllers to communicate with each other
'''
controllers: Set[controller.Controller]
def __init__(self):
self.controllers = set()
self.pending_connection = None
@@ -79,7 +85,9 @@ class LocalLink:
return controller
return None
def find_classic_controller(self, address):
def find_classic_controller(
self, address: Address
) -> Optional[controller.Controller]:
for controller in self.controllers:
if controller.public_address == address:
return controller
@@ -271,6 +279,52 @@ class LocalLink:
initiator_controller.public_address, int(not (initiator_new_role))
)
def classic_sco_connect(
self,
initiator_controller: controller.Controller,
responder_address: Address,
link_type: int,
):
logger.debug(
f'[Classic] {initiator_controller.public_address} connects SCO to {responder_address}'
)
responder_controller = self.find_classic_controller(responder_address)
# Initiator controller should handle it.
assert responder_controller
responder_controller.on_classic_connection_request(
initiator_controller.public_address,
link_type,
)
def classic_accept_sco_connection(
self,
responder_controller: controller.Controller,
initiator_address: Address,
link_type: int,
):
logger.debug(
f'[Classic] {responder_controller.public_address} accepts to connect SCO {initiator_address}'
)
initiator_controller = self.find_classic_controller(initiator_address)
if initiator_controller is None:
responder_controller.on_classic_sco_connection_complete(
responder_controller.public_address,
HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
link_type,
)
return
async def task():
initiator_controller.on_classic_sco_connection_complete(
responder_controller.public_address, HCI_SUCCESS, link_type
)
asyncio.create_task(task())
responder_controller.on_classic_sco_connection_complete(
initiator_controller.public_address, HCI_SUCCESS, link_type
)
# -----------------------------------------------------------------------------
class RemoteLink:
+5 -4
View File
@@ -285,10 +285,11 @@ class HostService(HostServicer):
raise NotImplementedError(
"TODO: add support for extended advertising in Bumble"
)
if request.interval:
raise NotImplementedError("TODO: add support for `request.interval`")
if request.interval_range:
raise NotImplementedError("TODO: add support for `request.interval_range`")
if advertising_interval := request.interval:
self.device.config.advertising_interval_min = int(advertising_interval)
self.device.config.advertising_interval_max = int(advertising_interval)
if interval_range := request.interval_range:
self.device.config.advertising_interval_max += int(interval_range)
if request.primary_phy:
raise NotImplementedError("TODO: add support for `request.primary_phy`")
if request.secondary_phy:
+60 -8
View File
@@ -19,7 +19,7 @@
from __future__ import annotations
import enum
import struct
from typing import Optional
from typing import Optional, Tuple
from bumble import core
from bumble import crypto
@@ -31,6 +31,9 @@ from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
SET_IDENTITY_RESOLVING_KEY_LENGTH = 16
class SirkType(enum.IntEnum):
'''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.'''
@@ -66,6 +69,10 @@ def k1(n: bytes, salt: bytes, p: bytes) -> bytes:
def sef(k: bytes, r: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.5 SIRK encryption function sef.
SIRK decryption function sdf shares the same algorithm. The only difference is that argument r is:
* Plaintext in encryption
* Cipher in decryption
'''
return crypto.xor(k1(k, s1(b'SIRKenc'[::-1]), b'csis'[::-1]), r)
@@ -105,6 +112,11 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
if len(set_identity_resolving_key) != SET_IDENTITY_RESOLVING_KEY_LENGTH:
raise ValueError(
f'Invalid SIRK length {len(set_identity_resolving_key)}, expected {SET_IDENTITY_RESOLVING_KEY_LENGTH}'
)
characteristics = []
self.set_identity_resolving_key = set_identity_resolving_key
@@ -113,7 +125,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(read=self.on_sirk_read),
)
characteristics.append(self.set_identity_resolving_key_characteristic)
@@ -123,7 +135,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', coordinated_set_size),
)
characteristics.append(self.coordinated_set_size_characteristic)
@@ -134,7 +146,7 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
| gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.READABLE
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITEABLE,
value=struct.pack('B', set_member_lock),
)
@@ -145,18 +157,32 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=struct.pack('B', set_member_rank),
)
characteristics.append(self.set_member_rank_characteristic)
super().__init__(characteristics)
def on_sirk_read(self, _connection: Optional[device.Connection]) -> bytes:
async def on_sirk_read(self, connection: Optional[device.Connection]) -> bytes:
if self.set_identity_resolving_key_type == SirkType.PLAINTEXT:
return bytes([SirkType.PLAINTEXT]) + self.set_identity_resolving_key
sirk_bytes = self.set_identity_resolving_key
else:
raise NotImplementedError('TODO: Pending async Characteristic read.')
assert connection
if connection.transport == core.BT_LE_TRANSPORT:
key = await connection.device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await connection.device.get_link_key(connection.peer_address)
if not key:
raise RuntimeError('LTK or LinkKey is not present')
sirk_bytes = sef(key, self.set_identity_resolving_key)
return bytes([self.set_identity_resolving_key_type]) + sirk_bytes
def get_advertising_data(self) -> bytes:
return bytes(
@@ -203,3 +229,29 @@ class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy):
gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC
):
self.set_member_rank = characteristics[0]
async def read_set_identity_resolving_key(self) -> Tuple[SirkType, bytes]:
'''Reads SIRK and decrypts if encrypted.'''
response = await self.set_identity_resolving_key.read_value()
if len(response) != SET_IDENTITY_RESOLVING_KEY_LENGTH + 1:
raise RuntimeError('Invalid SIRK value')
sirk_type = SirkType(response[0])
if sirk_type == SirkType.PLAINTEXT:
sirk = response[1:]
else:
connection = self.service_proxy.client.connection
device = connection.device
if connection.transport == core.BT_LE_TRANSPORT:
key = await device.get_long_term_key(
connection_handle=connection.handle, rand=b'', ediv=0
)
else:
key = await device.get_link_key(connection.peer_address)
if not key:
raise RuntimeError('LTK or LinkKey is not present')
sirk = sef(key, response[1:])
return (sirk_type, sirk)
+10 -5
View File
@@ -454,6 +454,8 @@ class DLC(EventEmitter):
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.sink = None
self.connection_result = None
self.drained = asyncio.Event()
self.drained.set()
# Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs
@@ -536,8 +538,9 @@ class DLC(EventEmitter):
f'[{self.dlci}] {len(data)} bytes, '
f'rx_credits={self.rx_credits}: {data.hex()}'
)
if len(data) and self.sink:
self.sink(data) # pylint: disable=not-callable
if data:
if self.sink:
self.sink(data) # pylint: disable=not-callable
# Update the credits
if self.rx_credits > 0:
@@ -633,6 +636,8 @@ class DLC(EventEmitter):
)
rx_credits_needed = 0
if not self.tx_buffer:
self.drained.set()
# Stream protocol
def write(self, data: Union[bytes, str]) -> None:
@@ -645,11 +650,11 @@ class DLC(EventEmitter):
raise ValueError('write only accept bytes or strings')
self.tx_buffer += data
self.drained.clear()
self.process_tx()
def drain(self) -> None:
# TODO
pass
async def drain(self) -> None:
await self.drained.wait()
def __str__(self) -> str:
return f'DLC(dlci={self.dlci},state={self.state.name})'
+3 -3
View File
@@ -82,14 +82,13 @@ async def open_transport(name: str) -> Transport:
scheme, *tail = name.split(':', 1)
spec = tail[0] if tail else None
metadata = None
if spec:
# Metadata may precede the spec
if spec.startswith('['):
metadata_str, *tail = spec[1:].split(']')
spec = tail[0] if tail else None
metadata = dict([entry.split('=') for entry in metadata_str.split(',')])
else:
metadata = None
transport = await _open_transport(scheme, spec)
if metadata:
@@ -198,12 +197,13 @@ async def open_transport_or_link(name: str) -> Transport:
"""
if name.startswith('link-relay:'):
logger.warning('Link Relay has been deprecated.')
from ..controller import Controller
from ..link import RemoteLink # lazy import
link = RemoteLink(name[11:])
await link.wait_until_connected()
controller = Controller('remote', link=link)
controller = Controller('remote', link=link) # type:ignore[arg-type]
class LinkTransport(Transport):
async def close(self):
+30 -9
View File
@@ -7,16 +7,36 @@ throughput and/or latency between two devices.
# General Usage
```
Usage: bench.py [OPTIONS] COMMAND [ARGS]...
Usage: bumble-bench [OPTIONS] COMMAND [ARGS]...
Options:
--device-config FILENAME Device configuration file
--role [sender|receiver|ping|pong]
--mode [gatt-client|gatt-server|l2cap-client|l2cap-server|rfcomm-client|rfcomm-server]
--att-mtu MTU GATT MTU (gatt-client mode) [23<=x<=517]
-s, --packet-size SIZE Packet size (server role) [8<=x<=4096]
-c, --packet-count COUNT Packet count (server role)
-sd, --start-delay SECONDS Start delay (server role)
--extended-data-length TEXT Request a data length upon connection,
specified as tx_octets/tx_time
--rfcomm-channel INTEGER RFComm channel to use
--rfcomm-uuid TEXT RFComm service UUID to use (ignored if
--rfcomm-channel is not 0)
--l2cap-psm INTEGER L2CAP PSM to use
--l2cap-mtu INTEGER L2CAP MTU to use
--l2cap-mps INTEGER L2CAP MPS to use
--l2cap-max-credits INTEGER L2CAP maximum number of credits allowed for
the peer
-s, --packet-size SIZE Packet size (client or ping role)
[8<=x<=4096]
-c, --packet-count COUNT Packet count (client or ping role)
-sd, --start-delay SECONDS Start delay (client or ping role)
--repeat N Repeat the run N times (client and ping
roles)(0, which is the fault, to run just
once)
--repeat-delay SECONDS Delay, in seconds, between repeats
--pace MILLISECONDS Wait N milliseconds between packets (0,
which is the fault, to send as fast as
possible)
--linger Don't exit at the end of a run (server and
pong roles)
--help Show this message and exit.
Commands:
@@ -35,17 +55,18 @@ Options:
--connection-interval, --ci CONNECTION_INTERVAL
Connection interval (in ms)
--phy [1m|2m|coded] PHY to use
--authenticate Authenticate (RFComm only)
--encrypt Encrypt the connection (RFComm only)
--help Show this message and exit.
```
To test once device against another, one of the two devices must be running
To test once device against another, one of the two devices must be running
the ``peripheral`` command and the other the ``central`` command. The device
running the ``peripheral`` command will accept connections from the device
running the ``central`` command.
When using Bluetooth LE (all modes except for ``rfcomm-server`` and ``rfcomm-client``utils),
the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central
the default addresses configured in the tool should be sufficient. But when using
Bluetooth Classic, the address of the Peripheral must be specified on the Central
using the ``--peripheral`` option. The address will be printed by the Peripheral when
it starts.
@@ -83,7 +104,7 @@ the other on `usb:1`, and two consoles/terminals. We will run a command in each.
$ bumble-bench central usb:1
```
In this default configuration, the Central runs a Sender, as a GATT client,
In this default configuration, the Central runs a Sender, as a GATT client,
connecting to the Peripheral running a Receiver, as a GATT server.
!!! example "L2CAP Throughput"
@@ -74,11 +74,13 @@ class L2capClient(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
if (viewModel.use2mPhy) {
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android
@@ -27,11 +27,12 @@ val DEFAULT_RFCOMM_UUID: UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024
const val DEFAULT_PSM = 128
class AppViewModel : ViewModel() {
private var preferences: SharedPreferences? = null
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var l2capPsm by mutableIntStateOf(0)
var l2capPsm by mutableIntStateOf(DEFAULT_PSM)
var use2mPhy by mutableStateOf(true)
var mtu by mutableIntStateOf(0)
var rxPhy by mutableIntStateOf(0)
@@ -10,7 +10,7 @@ android {
defaultConfig {
applicationId = "com.github.google.bumble.remotehci"
minSdk = 26
minSdk = 29
targetSdk = 33
versionCode = 1
versionName = "1.0"
@@ -4,6 +4,7 @@ import android.hardware.bluetooth.V1_0.Status;
import android.os.IBinder;
import android.os.RemoteException;
import android.os.ServiceManager;
import android.os.Trace;
import android.util.Log;
import java.util.ArrayList;
@@ -53,6 +54,7 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
private final android.hardware.bluetooth.V1_0.IBluetoothHci mHciService;
private final HciHalCallback mHciCallbacks;
private int mInitializationStatus = -1;
private final boolean mTracingEnabled = Trace.isEnabled();
public static HciHidlHal create(HciHalCallback hciCallbacks) {
@@ -89,6 +91,7 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
}
// Map the status code.
Log.d(TAG, "Initialization status = " + mInitializationStatus);
switch (mInitializationStatus) {
case android.hardware.bluetooth.V1_0.Status.SUCCESS:
return Status.SUCCESS;
@@ -108,6 +111,10 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
public void sendPacket(HciPacket.Type type, byte[] packet) {
ArrayList<Byte> data = HciPacket.byteArrayToList(packet);
if (mTracingEnabled) {
Trace.beginAsyncSection("SEND_PACKET_TO_HAL", 1);
}
try {
switch (type) {
case COMMAND:
@@ -125,6 +132,10 @@ class HciHidlHal extends android.hardware.bluetooth.V1_0.IBluetoothHciCallbacks.
} catch (RemoteException error) {
Log.w(TAG, "failed to forward packet: " + error);
}
if (mTracingEnabled) {
Trace.endAsyncSection("SEND_PACKET_TO_HAL", 1);
}
}
@Override
@@ -157,6 +168,7 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
private final android.hardware.bluetooth.IBluetoothHci mHciService;
private final HciHalCallback mHciCallbacks;
private int mInitializationStatus = android.hardware.bluetooth.Status.SUCCESS;
private final boolean mTracingEnabled = Trace.isEnabled();
public static HciAidlHal create(HciHalCallback hciCallbacks) {
IBinder binder = ServiceManager.getService("android.hardware.bluetooth.IBluetoothHci/default");
@@ -187,6 +199,7 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
}
// Map the status code.
Log.d(TAG, "Initialization status = " + mInitializationStatus);
switch (mInitializationStatus) {
case android.hardware.bluetooth.Status.SUCCESS:
return Status.SUCCESS;
@@ -208,6 +221,10 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
// HciHal methods.
@Override
public void sendPacket(HciPacket.Type type, byte[] packet) {
if (mTracingEnabled) {
Trace.beginAsyncSection("SEND_PACKET_TO_HAL", 1);
}
try {
switch (type) {
case COMMAND:
@@ -229,6 +246,10 @@ class HciAidlHal extends android.hardware.bluetooth.IBluetoothHciCallbacks.Stub
} catch (RemoteException error) {
Log.w(TAG, "failed to forward packet: " + error);
}
if (mTracingEnabled) {
Trace.endAsyncSection("SEND_PACKET_TO_HAL", 1);
}
}
// IBluetoothHciCallbacks methods.
@@ -1,5 +1,6 @@
package com.github.google.bumble.remotehci;
import android.os.Trace;
import android.util.Log;
import java.io.IOException;
@@ -15,6 +16,7 @@ public class HciServer {
private final int mPort;
private final Listener mListener;
private OutputStream mOutputStream;
private final boolean mTracingEnabled = Trace.isEnabled();
public interface Listener extends HciParser.Sink {
void onHostConnectionState(boolean connected);
@@ -27,6 +29,8 @@ public class HciServer {
}
public void run() throws IOException {
Log.i(TAG, "Tracing enabled: " + mTracingEnabled);
for (;;) {
try {
loop();
@@ -73,6 +77,10 @@ public class HciServer {
}
public void sendPacket(HciPacket.Type type, byte[] packet) {
if (mTracingEnabled) {
Trace.beginAsyncSection("SEND_PACKET_FROM_HAL", 2);
}
// Create a combined data buffer so we can write it out in a single call.
byte[] data = new byte[packet.length + 1];
data[0] = type.value;
@@ -89,5 +97,9 @@ public class HciServer {
Log.d(TAG, "no client, dropping packet");
}
}
if (mTracingEnabled) {
Trace.endAsyncSection("SEND_PACKET_FROM_HAL", 2);
}
}
}
+15 -6
View File
@@ -20,6 +20,7 @@ import os
import pytest
import struct
import logging
from unittest import mock
from bumble import device
from bumble.profiles import csip
@@ -68,14 +69,18 @@ def test_sef():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_csis():
@pytest.mark.parametrize(
'sirk_type,', [(csip.SirkType.ENCRYPTED), (csip.SirkType.PLAINTEXT)]
)
async def test_csis(sirk_type):
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
LTK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
devices = TwoDevices()
devices[0].add_service(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
set_identity_resolving_key_type=csip.SirkType.PLAINTEXT,
set_identity_resolving_key_type=sirk_type,
coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0,
@@ -83,15 +88,19 @@ async def test_csis():
)
await devices.setup_connection()
# Mock encryption.
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
devices[0].get_long_term_key = mock.AsyncMock(return_value=LTK)
devices[1].get_long_term_key = mock.AsyncMock(return_value=LTK)
peer = device.Peer(devices.connections[1])
csis_client = await peer.discover_service_and_create_proxy(
csip.CoordinatedSetIdentificationProxy
)
assert (
await csis_client.set_identity_resolving_key.read_value()
== bytes([csip.SirkType.PLAINTEXT]) + SIRK
)
assert await csis_client.read_set_identity_resolving_key() == (sirk_type, SIRK)
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
assert await csis_client.set_member_lock.read_value() == struct.pack(
'B', csip.MemberLock.UNLOCKED
+11
View File
@@ -50,6 +50,8 @@ from bumble.gatt import (
GATT_APPEARANCE_CHARACTERISTIC,
)
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -412,6 +414,15 @@ async def test_extended_advertising_disconnection(auto_restart):
device.start_extended_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():
devices = TwoDevices()
await devices.setup_connection()
assert (await devices.connections[0].get_remote_le_features()) is not None
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
device = Device(host=Host(None, None))
+60 -1
View File
@@ -23,9 +23,11 @@ import pytest
from typing import Tuple
from .test_utils import TwoDevices
from bumble import core
from bumble import device
from bumble import hfp
from bumble import rfcomm
from bumble import hci
# -----------------------------------------------------------------------------
# Logging
@@ -87,6 +89,63 @@ async def test_slc():
ag_task.cancel()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sco_setup():
devices = TwoDevices()
# Enable Classic connections
devices[0].classic_enabled = True
devices[1].classic_enabled = True
# Start
await devices[0].power_on()
await devices[1].power_on()
connections = await asyncio.gather(
devices[0].connect(
devices[1].public_address, transport=core.BT_BR_EDR_TRANSPORT
),
devices[1].accept(devices[0].public_address),
)
def on_sco_request(_connection: device.Connection, _link_type: int):
connections[1].abort_on(
'disconnection',
devices[1].send_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connections[1].peer_address,
**hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S1
].asdict(),
)
),
)
devices[1].on('sco_request', on_sco_request)
sco_connections = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
devices[0].on(
'sco_connection', lambda sco_link: sco_connections[0].set_result(sco_link)
)
devices[1].on(
'sco_connection', lambda sco_link: sco_connections[1].set_result(sco_link)
)
await devices[0].send_command(
hci.HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(),
)
)
await asyncio.gather(*sco_connections)
# -----------------------------------------------------------------------------
async def run():
await test_slc()
+5 -4
View File
@@ -29,17 +29,18 @@ class TwoDevices:
self.connections = [None, None]
self.link = LocalLink()
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.controllers = [
Controller('C1', link=self.link),
Controller('C2', link=self.link),
Controller('C1', link=self.link, public_address=addresses[0]),
Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
Device(
address=Address('F0:F1:F2:F3:F4:F5'),
address=Address(addresses[0]),
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address=Address('F5:F4:F3:F2:F1:F0'),
address=Address(addresses[1]),
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
]