Compare commits

...

34 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
48685c8587 improve vendor event support 2024-11-23 08:55:50 -08:00
Gilles Boccon-Gibod
8d908288c8 Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-15 10:19:20 -08:00
Gilles Boccon-Gibod
a06394ad4a Merge pull request #582 from google/gbg/580
fix #580
2024-11-04 13:03:15 -08:00
Gilles Boccon-Gibod
a1414c2b5b add unsubscribe test 2024-11-03 19:08:27 -08:00
Gilles Boccon-Gibod
b2864dac2d fix #580 2024-11-02 10:29:40 -07:00
Gilles Boccon-Gibod
b78f895143 Merge pull request #579 from jmdietrich-gcx/unsubscribe_characteristic_in_gatt_client
Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
2024-10-31 04:07:02 -07:00
zxzxwu
c4e9726828 Merge pull request #581 from zxzxwu/context
[BAP] Add missing Unspecified context type
2024-10-31 11:04:25 +00:00
Gilles Boccon-Gibod
d4b8e8348a Merge pull request #574 from google/gbg/update-python-versions
remove test for deprecated Python 3.8 and add 3.13
2024-10-31 03:44:01 -07:00
Josh Wu
19debaa52e [BAP] Add missing Unspecified context type 2024-10-31 18:11:40 +08:00
Jan-Marcel Dietrich
73fe564321 Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
GATT Client's subscribe() adds the characteristic itself as subscriber.
Therefore the characteristic has to be removed in unsubscribe(), if it's
the last subscriber. Otherwise the clean up does not work correctly and
the CCCD never is set back to 0 in the remote device.
2024-10-30 07:34:22 +01:00
Gilles Boccon-Gibod
a00abd65b3 fix some linter warnings 2024-10-28 12:30:37 -07:00
Gilles Boccon-Gibod
f169ceaebb update linter and type checker 2024-10-28 12:30:32 -07:00
Gilles Boccon-Gibod
528af0d338 remove test for deprecated Python 3.8 and add 3.13 2024-10-28 12:29:21 -07:00
Gilles Boccon-Gibod
4b25eed869 Merge pull request #570 from google/gbg/bench-mobly-snippets
bench mobly snippets
2024-10-28 10:25:28 -07:00
Gilles Boccon-Gibod
fcd6bd7136 address PR comments 2024-10-28 10:13:55 -07:00
Gilles Boccon-Gibod
32642c5d7c Merge pull request #576 from google/gbg/netsim-device-info
update to new netsim proto with DeviceInfo
2024-10-25 04:43:00 -07:00
Gilles Boccon-Gibod
ff8b0c375d add support for netsim device info variant 2024-10-25 04:37:30 -07:00
Gilles Boccon-Gibod
ae0228aeb8 Merge pull request #578 from jmdietrich-gcx/add_missing_parameter_to_att_execute_write
Add missing parameter 'flags' to ATT_Execute_Write_Request PDU
2024-10-25 02:57:24 -07:00
Jan-Marcel Dietrich
5d2dac18c8 Add missing parameter 'flags' to ATT_Execute_Write_Request PDU
Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Table 3.36 shows that the
ATT_EXECUTE_WRITE_REQ PDU contains the parameter 'Flags' with size 1
octet, which allows to cancel all prepared writes (0x00) or to
immediately write all pending prepared values (0x01).
2024-10-24 15:08:10 +02:00
zxzxwu
d03fc14cfd Merge pull request #573 from ypomortsev/yegor
HFP: Fix reading multiple AT commands from a single data packet
2024-10-23 13:23:58 +08:00
Gilles Boccon-Gibod
ad7ce79bc4 use all caps for device kind 2024-10-22 16:30:46 -07:00
Yegor Pomortsev
c6bf27fd2c Fix test_hf_batched_response 2024-10-22 12:41:17 -07:00
Gilles Boccon-Gibod
7584daa3f9 update to new netsim proto with DeviceInfo 2024-10-22 11:48:42 -07:00
Yegor Pomortsev
654030e789 Add tests for batched HFP commands/responses; reformat 2024-10-21 16:32:20 -07:00
Gilles Boccon-Gibod
1de7d2cd6f Merge pull request #571 from google/gbg/a2dp-player
a2dp player
2024-10-19 07:40:43 -07:00
Yegor Pomortsev
e1714c16cc HFP: Fix reading multiple AT commands from a single data packet
The `data` received in `_read_at` may have multiple commands.

This fixes `execute_command` timing out when waiting for an `OK`
response when it is in the same data buffer, e.g. during SLC
initialization: b'\r\n+BRSF: 3904\r\n\r\nOK\r\n'
2024-10-18 13:21:24 -07:00
William Escande
23f46b36b3 HAP: wait for pairing event (#551) 2024-10-10 11:34:44 -07:00
Gilles Boccon-Gibod
2bed50b353 add mobly to dev deps 2024-10-09 21:22:35 -07:00
Gilles Boccon-Gibod
1fe3778a74 adjust mypy excludes 2024-10-08 22:02:43 -07:00
Gilles Boccon-Gibod
5e31bcf23d add mobly example 2024-10-04 18:17:56 -07:00
Gilles Boccon-Gibod
fe429cb2eb wip 2024-10-04 18:13:31 -07:00
Gilles Boccon-Gibod
c91695c23a wip 2024-10-04 18:13:31 -07:00
Gilles Boccon-Gibod
55f99e6887 wip 2024-10-04 18:13:31 -07:00
Gilles Boccon-Gibod
b190069f48 add snippets lib 2024-10-04 18:13:31 -07:00
89 changed files with 3067 additions and 554 deletions

View File

@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
fail-fast: false
steps:

View File

@@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
os: ['ubuntu-latest', 'macos-latest', 'windows-latest']
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
fail-fast: false
steps:
@@ -46,7 +46,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
rust-version: [ "1.76.0", "stable" ]
fail-fast: false
steps:

View File

@@ -19,6 +19,7 @@ import asyncio
import enum
import logging
import os
import statistics
import struct
import time
@@ -194,17 +195,19 @@ def make_sdp_records(channel):
}
def log_stats(title, stats):
def log_stats(title, stats, precision=2):
stats_min = min(stats)
stats_max = max(stats)
stats_avg = sum(stats) / len(stats)
stats_avg = statistics.mean(stats)
stats_stdev = statistics.stdev(stats) if len(stats) >= 2 else 0
logging.info(
color(
(
f'### {title} stats: '
f'min={stats_min:.2f}, '
f'max={stats_max:.2f}, '
f'average={stats_avg:.2f}'
f'min={stats_min:.{precision}f}, '
f'max={stats_max:.{precision}f}, '
f'average={stats_avg:.{precision}f}, '
f'stdev={stats_stdev:.{precision}f}'
),
'cyan',
)
@@ -448,9 +451,9 @@ class Ping:
self.repeat_delay = repeat_delay
self.pace = pace
self.done = asyncio.Event()
self.current_packet_index = 0
self.ping_sent_time = 0.0
self.latencies = []
self.ping_times = []
self.rtts = []
self.next_expected_packet_index = 0
self.min_stats = []
self.max_stats = []
self.avg_stats = []
@@ -465,6 +468,7 @@ class Ping:
for run in range(self.repeat + 1):
self.done.clear()
self.ping_times = []
if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
@@ -477,60 +481,57 @@ class Ping:
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
self.current_packet_index = 0
self.latencies = []
await self.send_next_ping()
packet_interval = self.pace / 1000
start_time = time.time()
self.next_expected_packet_index = 0
for i in range(self.tx_packet_count):
target_time = start_time + (i * packet_interval)
now = time.time()
if now < target_time:
await asyncio.sleep(target_time - now)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
(PACKET_FLAG_LAST if i == self.tx_packet_count - 1 else 0),
i,
) + bytes(self.tx_packet_size - 6)
logging.info(color(f'Sending packet {i}', 'yellow'))
self.ping_times.append(time.time())
await self.packet_io.send_packet(packet)
await self.done.wait()
min_latency = min(self.latencies)
max_latency = max(self.latencies)
avg_latency = sum(self.latencies) / len(self.latencies)
min_rtt = min(self.rtts)
max_rtt = max(self.rtts)
avg_rtt = statistics.mean(self.rtts)
stdev_rtt = statistics.stdev(self.rtts)
logging.info(
color(
'@@@ Latencies: '
f'min={min_latency:.2f}, '
f'max={max_latency:.2f}, '
f'average={avg_latency:.2f}'
'@@@ RTTs: '
f'min={min_rtt:.2f}, '
f'max={max_rtt:.2f}, '
f'average={avg_rtt:.2f}, '
f'stdev={stdev_rtt:.2f}'
)
)
self.min_stats.append(min_latency)
self.max_stats.append(max_latency)
self.avg_stats.append(avg_latency)
self.min_stats.append(min_rtt)
self.max_stats.append(max_rtt)
self.avg_stats.append(avg_rtt)
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
if self.repeat:
log_stats('Min Latency', self.min_stats)
log_stats('Max Latency', self.max_stats)
log_stats('Average Latency', self.avg_stats)
log_stats('Min RTT', self.min_stats)
log_stats('Max RTT', self.max_stats)
log_stats('Average RTT', self.avg_stats)
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
async def send_next_ping(self):
if self.pace:
await asyncio.sleep(self.pace / 1000)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
(
PACKET_FLAG_LAST
if self.current_packet_index == self.tx_packet_count - 1
else 0
),
self.current_packet_index,
) + bytes(self.tx_packet_size - 6)
logging.info(color(f'Sending packet {self.current_packet_index}', 'yellow'))
self.ping_sent_time = time.time()
await self.packet_io.send_packet(packet)
def on_packet_received(self, packet):
elapsed = time.time() - self.ping_sent_time
try:
packet_type, packet_data = parse_packet(packet)
except ValueError:
@@ -542,21 +543,23 @@ class Ping:
return
if packet_type == PacketType.ACK:
latency = elapsed * 1000
self.latencies.append(latency)
elapsed = time.time() - self.ping_times[packet_index]
rtt = elapsed * 1000
self.rtts.append(rtt)
logging.info(
color(
f'<<< Received ACK [{packet_index}], latency={latency:.2f}ms',
f'<<< Received ACK [{packet_index}], RTT={rtt:.2f}ms',
'green',
)
)
if packet_index == self.current_packet_index:
self.current_packet_index += 1
if packet_index == self.next_expected_packet_index:
self.next_expected_packet_index += 1
else:
logging.info(
color(
f'!!! Unexpected packet, expected {self.current_packet_index} '
f'!!! Unexpected packet, '
f'expected {self.next_expected_packet_index} '
f'but received {packet_index}'
)
)
@@ -565,8 +568,6 @@ class Ping:
self.done.set()
return
AsyncRunner.spawn(self.send_next_ping())
# -----------------------------------------------------------------------------
# Pong
@@ -583,8 +584,11 @@ class Pong:
def reset(self):
self.expected_packet_index = 0
self.receive_times = []
def on_packet_received(self, packet):
self.receive_times.append(time.time())
try:
packet_type, packet_data = parse_packet(packet)
except ValueError:
@@ -599,10 +603,16 @@ class Pong:
packet_flags, packet_index = parse_packet_sequence(packet_data)
except ValueError:
return
interval = (
self.receive_times[-1] - self.receive_times[-2]
if len(self.receive_times) >= 2
else 0
)
logging.info(
color(
f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, {len(packet)} bytes',
f'flags=0x{packet_flags:02X}, {len(packet)} bytes, '
f'interval={interval:.4f}',
'green',
)
)
@@ -623,8 +633,35 @@ class Pong:
)
)
if packet_flags & PACKET_FLAG_LAST and not self.linger:
self.done.set()
if packet_flags & PACKET_FLAG_LAST:
if len(self.receive_times) >= 3:
# Show basic stats
intervals = [
self.receive_times[i + 1] - self.receive_times[i]
for i in range(len(self.receive_times) - 1)
]
log_stats('Packet intervals', intervals, 3)
# Show a histogram
bin_count = 20
bins = [0] * bin_count
interval_min = min(intervals)
interval_max = max(intervals)
interval_range = interval_max - interval_min
bin_thresholds = [
interval_min + i * (interval_range / bin_count)
for i in range(bin_count)
]
for interval in intervals:
for i in reversed(range(bin_count)):
if interval >= bin_thresholds[i]:
bins[i] += 1
break
for i in range(bin_count):
logging.info(f'@@@ >= {bin_thresholds[i]:.4f}: {bins[i]}')
if not self.linger:
self.done.set()
async def run(self):
await self.done.wait()
@@ -942,9 +979,12 @@ class RfcommClient(StreamedPacketIO):
channel = await bumble.rfcomm.find_rfcomm_channel_with_uuid(
connection, self.uuid
)
logging.info(color(f'@@@ Channel number = {channel}', 'cyan'))
if channel == 0:
logging.info(color('!!! No RFComm service with this UUID found', 'red'))
if channel:
logging.info(color(f'@@@ Channel number = {channel}', 'cyan'))
else:
logging.warning(
color('!!! No RFComm service with this UUID found', 'red')
)
await connection.disconnect()
return
@@ -1054,6 +1094,8 @@ class RfcommServer(StreamedPacketIO):
if self.credits_threshold is not None:
dlc.rx_credits_threshold = self.credits_threshold
self.ready.set()
async def drain(self):
assert self.dlc
await self.dlc.drain()
@@ -1068,7 +1110,7 @@ class Central(Connection.Listener):
transport,
peripheral_address,
classic,
role_factory,
scenario_factory,
mode_factory,
connection_interval,
phy,
@@ -1081,7 +1123,7 @@ class Central(Connection.Listener):
self.transport = transport
self.peripheral_address = peripheral_address
self.classic = classic
self.role_factory = role_factory
self.scenario_factory = scenario_factory
self.mode_factory = mode_factory
self.authenticate = authenticate
self.encrypt = encrypt or authenticate
@@ -1134,7 +1176,7 @@ class Central(Connection.Listener):
DEFAULT_CENTRAL_NAME, central_address, hci_source, hci_sink
)
mode = self.mode_factory(self.device)
role = self.role_factory(mode)
scenario = self.scenario_factory(mode)
self.device.classic_enabled = self.classic
# Set up a pairing config factory with minimal requirements.
@@ -1215,7 +1257,7 @@ class Central(Connection.Listener):
await mode.on_connection(self.connection)
await role.run()
await scenario.run()
await asyncio.sleep(DEFAULT_LINGER_TIME)
await self.connection.disconnect()
@@ -1246,7 +1288,7 @@ class Peripheral(Device.Listener, Connection.Listener):
def __init__(
self,
transport,
role_factory,
scenario_factory,
mode_factory,
classic,
extended_data_length,
@@ -1254,11 +1296,11 @@ class Peripheral(Device.Listener, Connection.Listener):
):
self.transport = transport
self.classic = classic
self.role_factory = role_factory
self.scenario_factory = scenario_factory
self.mode_factory = mode_factory
self.extended_data_length = extended_data_length
self.role_switch = role_switch
self.role = None
self.scenario = None
self.mode = None
self.device = None
self.connection = None
@@ -1278,7 +1320,7 @@ class Peripheral(Device.Listener, Connection.Listener):
)
self.device.listener = self
self.mode = self.mode_factory(self.device)
self.role = self.role_factory(self.mode)
self.scenario = self.scenario_factory(self.mode)
self.device.classic_enabled = self.classic
# Set up a pairing config factory with minimal requirements.
@@ -1315,7 +1357,7 @@ class Peripheral(Device.Listener, Connection.Listener):
print_connection(self.connection)
await self.mode.on_connection(self.connection)
await self.role.run()
await self.scenario.run()
await asyncio.sleep(DEFAULT_LINGER_TIME)
def on_connection(self, connection):
@@ -1344,7 +1386,7 @@ class Peripheral(Device.Listener, Connection.Listener):
def on_disconnection(self, reason):
logging.info(color(f'!!! Disconnection: reason={reason}', 'red'))
self.connection = None
self.role.reset()
self.scenario.reset()
if self.classic:
AsyncRunner.spawn(self.device.set_discoverable(True))
@@ -1426,13 +1468,13 @@ def create_mode_factory(ctx, default_mode):
# -----------------------------------------------------------------------------
def create_role_factory(ctx, default_role):
role = ctx.obj['role']
if role is None:
role = default_role
def create_scenario_factory(ctx, default_scenario):
scenario = ctx.obj['scenario']
if scenario is None:
scenarion = default_scenario
def create_role(packet_io):
if role == 'sender':
def create_scenario(packet_io):
if scenario == 'send':
return Sender(
packet_io,
start_delay=ctx.obj['start_delay'],
@@ -1443,10 +1485,10 @@ def create_role_factory(ctx, default_role):
packet_count=ctx.obj['packet_count'],
)
if role == 'receiver':
if scenario == 'receive':
return Receiver(packet_io, ctx.obj['linger'])
if role == 'ping':
if scenario == 'ping':
return Ping(
packet_io,
start_delay=ctx.obj['start_delay'],
@@ -1457,12 +1499,12 @@ def create_role_factory(ctx, default_role):
packet_count=ctx.obj['packet_count'],
)
if role == 'pong':
if scenario == 'pong':
return Pong(packet_io, ctx.obj['linger'])
raise ValueError('invalid role')
raise ValueError('invalid scenario')
return create_role
return create_scenario
# -----------------------------------------------------------------------------
@@ -1470,7 +1512,7 @@ def create_role_factory(ctx, default_role):
# -----------------------------------------------------------------------------
@click.group()
@click.option('--device-config', metavar='FILENAME', help='Device configuration file')
@click.option('--role', type=click.Choice(['sender', 'receiver', 'ping', 'pong']))
@click.option('--scenario', type=click.Choice(['send', 'receive', 'ping', 'pong']))
@click.option(
'--mode',
type=click.Choice(
@@ -1503,7 +1545,7 @@ def create_role_factory(ctx, default_role):
'--rfcomm-channel',
type=int,
default=DEFAULT_RFCOMM_CHANNEL,
help='RFComm channel to use',
help='RFComm channel to use (specify 0 for channel discovery via SDP)',
)
@click.option(
'--rfcomm-uuid',
@@ -1565,7 +1607,7 @@ def create_role_factory(ctx, default_role):
metavar='SIZE',
type=click.IntRange(8, 8192),
default=500,
help='Packet size (client or ping role)',
help='Packet size (send or ping scenario)',
)
@click.option(
'--packet-count',
@@ -1573,7 +1615,7 @@ def create_role_factory(ctx, default_role):
metavar='COUNT',
type=int,
default=10,
help='Packet count (client or ping role)',
help='Packet count (send or ping scenario)',
)
@click.option(
'--start-delay',
@@ -1581,7 +1623,7 @@ def create_role_factory(ctx, default_role):
metavar='SECONDS',
type=int,
default=1,
help='Start delay (client or ping role)',
help='Start delay (send or ping scenario)',
)
@click.option(
'--repeat',
@@ -1589,7 +1631,7 @@ def create_role_factory(ctx, default_role):
type=int,
default=0,
help=(
'Repeat the run N times (client and ping roles)'
'Repeat the run N times (send and ping scenario)'
'(0, which is the fault, to run just once) '
),
)
@@ -1613,13 +1655,13 @@ def create_role_factory(ctx, default_role):
@click.option(
'--linger',
is_flag=True,
help="Don't exit at the end of a run (server and pong roles)",
help="Don't exit at the end of a run (receive and pong scenarios)",
)
@click.pass_context
def bench(
ctx,
device_config,
role,
scenario,
mode,
att_mtu,
extended_data_length,
@@ -1645,7 +1687,7 @@ def bench(
):
ctx.ensure_object(dict)
ctx.obj['device_config'] = device_config
ctx.obj['role'] = role
ctx.obj['scenario'] = scenario
ctx.obj['mode'] = mode
ctx.obj['att_mtu'] = att_mtu
ctx.obj['rfcomm_channel'] = rfcomm_channel
@@ -1699,7 +1741,7 @@ def central(
ctx, transport, peripheral_address, connection_interval, phy, authenticate, encrypt
):
"""Run as a central (initiates the connection)"""
role_factory = create_role_factory(ctx, 'sender')
scenario_factory = create_scenario_factory(ctx, 'send')
mode_factory = create_mode_factory(ctx, 'gatt-client')
classic = ctx.obj['classic']
@@ -1708,7 +1750,7 @@ def central(
transport,
peripheral_address,
classic,
role_factory,
scenario_factory,
mode_factory,
connection_interval,
phy,
@@ -1726,13 +1768,13 @@ def central(
@click.pass_context
def peripheral(ctx, transport):
"""Run as a peripheral (waits for a connection)"""
role_factory = create_role_factory(ctx, 'receiver')
scenario_factory = create_scenario_factory(ctx, 'receive')
mode_factory = create_mode_factory(ctx, 'gatt-server')
async def run_peripheral():
await Peripheral(
transport,
role_factory,
scenario_factory,
mode_factory,
ctx.obj['classic'],
ctx.obj['extended_data_length'],
@@ -1743,7 +1785,11 @@ def peripheral(ctx, transport):
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
logging.basicConfig(
level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper(),
format="[%(asctime)s.%(msecs)03d] %(levelname)s:%(name)s:%(message)s",
datefmt="%H:%M:%S",
)
bench()

View File

@@ -237,6 +237,7 @@ class ClientBridge:
address: str,
tcp_host: str,
tcp_port: int,
authenticate: bool,
encrypt: bool,
):
self.channel = channel
@@ -245,6 +246,7 @@ class ClientBridge:
self.address = address
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.authenticate = authenticate
self.encrypt = encrypt
self.device: Optional[Device] = None
self.connection: Optional[Connection] = None
@@ -274,6 +276,11 @@ class ClientBridge:
print(color(f"@@@ Bluetooth connection: {self.connection}", "blue"))
self.connection.on("disconnection", self.on_disconnection)
if self.authenticate:
print(color("@@@ Authenticating Bluetooth connection", "blue"))
await self.connection.authenticate()
print(color("@@@ Bluetooth connection authenticated", "blue"))
if self.encrypt:
print(color("@@@ Encrypting Bluetooth connection", "blue"))
await self.connection.encrypt()
@@ -491,8 +498,9 @@ def server(context, tcp_host, tcp_port):
@click.argument("bluetooth-address")
@click.option("--tcp-host", help="TCP host", default="_")
@click.option("--tcp-port", help="TCP port", default=DEFAULT_CLIENT_TCP_PORT)
@click.option("--authenticate", is_flag=True, help="Authenticate the connection")
@click.option("--encrypt", is_flag=True, help="Encrypt the connection")
def client(context, bluetooth_address, tcp_host, tcp_port, encrypt):
def client(context, bluetooth_address, tcp_host, tcp_port, authenticate, encrypt):
bridge = ClientBridge(
context.obj["channel"],
context.obj["uuid"],
@@ -500,6 +508,7 @@ def client(context, bluetooth_address, tcp_host, tcp_port, encrypt):
bluetooth_address,
tcp_host,
tcp_port,
authenticate,
encrypt,
)
asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge))

View File

@@ -144,18 +144,18 @@ class Printer:
help='Format of the input file',
)
@click.option(
'--vendors',
'--vendor',
type=click.Choice(['android', 'zephyr']),
multiple=True,
help='Support vendor-specific commands (list one or more)',
)
@click.argument('filename')
# pylint: disable=redefined-builtin
def main(format, vendors, filename):
for vendor in vendors:
if vendor == 'android':
def main(format, vendor, filename):
for vendor_name in vendor:
if vendor_name == 'android':
import bumble.vendor.android.hci
elif vendor == 'zephyr':
elif vendor_name == 'zephyr':
import bumble.vendor.zephyr.hci
input = open(filename, 'rb')
@@ -180,7 +180,7 @@ def main(format, vendors, filename):
else:
printer.print(color("[TRUNCATED]", "red"))
except Exception as error:
logger.exception()
logger.exception('')
print(color(f'!!! {error}', 'red'))

View File

@@ -710,7 +710,7 @@ class ATT_Prepare_Write_Response(ATT_PDU):
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
@ATT_PDU.subclass([("flags", 1)])
class ATT_Execute_Write_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request

View File

@@ -134,6 +134,8 @@ class Frame:
opcode_offset = 3
elif subunit_id == 6:
raise core.InvalidPacketError("reserved subunit ID")
else:
raise core.InvalidPacketError("invalid subunit ID")
opcode = Frame.OperationCode(data[opcode_offset])
operands = data[opcode_offset + 1 :]

View File

@@ -20,6 +20,8 @@ Common types for drivers.
# -----------------------------------------------------------------------------
import abc
from bumble import core
# -----------------------------------------------------------------------------
# Classes

View File

@@ -11,18 +11,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Support for Intel USB controllers.
Loosely based on the Fuchsia OS implementation.
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging
import os
import pathlib
import platform
import struct
from typing import Any, Deque, Optional, TYPE_CHECKING
from bumble import core
from bumble.drivers import common
from bumble.hci import (
hci_vendor_command_op_code, # type: ignore
HCI_Command,
HCI_Reset_Command,
)
from bumble import hci
from bumble import utils
if TYPE_CHECKING:
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
@@ -34,39 +49,328 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
INTEL_USB_PRODUCTS = {
# Intel AX210
(0x8087, 0x0032),
# Intel BE200
(0x8087, 0x0036),
(0x8087, 0x0032), # AX210
(0x8087, 0x0036), # BE200
}
INTEL_FW_IMAGE_NAMES = [
"ibt-0040-0041",
"ibt-0040-1020",
"ibt-0040-1050",
"ibt-0040-2120",
"ibt-0040-4150",
"ibt-0041-0041",
"ibt-0180-0041",
"ibt-0180-1050",
"ibt-0180-4150",
"ibt-0291-0291",
"ibt-1040-0041",
"ibt-1040-1020",
"ibt-1040-1050",
"ibt-1040-2120",
"ibt-1040-4150",
]
INTEL_FIRMWARE_DIR_ENV = "BUMBLE_INTEL_FIRMWARE_DIR"
INTEL_LINUX_FIRMWARE_DIR = "/lib/firmware/intel"
_MAX_FRAGMENT_SIZE = 252
_POST_RESET_DELAY = 0.2
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_INTEL_DDC_CONFIG_WRITE_COMMAND = hci_vendor_command_op_code(0xFC8B) # type: ignore
HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD = [0x03, 0xE4, 0x02, 0x00]
HCI_INTEL_WRITE_DEVICE_CONFIG_COMMAND = hci.hci_vendor_command_op_code(0x008B)
HCI_INTEL_READ_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x0005)
HCI_INTEL_RESET_COMMAND = hci.hci_vendor_command_op_code(0x0001)
HCI_INTEL_SECURE_SEND_COMMAND = hci.hci_vendor_command_op_code(0x0009)
HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
HCI_Command.register_commands(globals())
hci.HCI_Command.register_commands(globals())
@HCI_Command.command( # type: ignore
fields=[("params", "*")],
@hci.HCI_Command.command(
fields=[
("param0", 1),
],
return_parameters_fields=[
("params", "*"),
("status", hci.STATUS_SPEC),
("tlv", "*"),
],
)
class Hci_Intel_DDC_Config_Write_Command(HCI_Command):
class HCI_Intel_Read_Version_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data_type", 1), ("data", "*")],
return_parameters_fields=[
("status", 1),
],
)
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[
("reset_type", 1),
("patch_enable", 1),
("ddc_reload", 1),
("boot_option", 1),
("boot_address", 4),
],
return_parameters_fields=[
("data", "*"),
],
)
class HCI_Intel_Reset_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data", "*")],
return_parameters_fields=[
("status", hci.STATUS_SPEC),
("params", "*"),
],
)
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
pass
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def intel_firmware_dir() -> pathlib.Path:
"""
Returns:
A path to a subdir of the project data dir for Intel firmware.
The directory is created if it doesn't exist.
"""
from bumble.drivers import project_data_dir
p = project_data_dir() / "firmware" / "intel"
p.mkdir(parents=True, exist_ok=True)
return p
def _find_binary_path(file_name: str) -> pathlib.Path | None:
# First check if an environment variable is set
if INTEL_FIRMWARE_DIR_ENV in os.environ:
if (
path := pathlib.Path(os.environ[INTEL_FIRMWARE_DIR_ENV]) / file_name
).is_file():
logger.debug(f"{file_name} found in env dir")
return path
# When the environment variable is set, don't look elsewhere
return None
# Then, look where the firmware download tool writes by default
if (path := intel_firmware_dir() / file_name).is_file():
logger.debug(f"{file_name} found in project data dir")
return path
# Then, look in the package's driver directory
if (path := pathlib.Path(__file__).parent / "intel_fw" / file_name).is_file():
logger.debug(f"{file_name} found in package dir")
return path
# On Linux, check the system's FW directory
if (
platform.system() == "Linux"
and (path := pathlib.Path(INTEL_LINUX_FIRMWARE_DIR) / file_name).is_file()
):
logger.debug(f"{file_name} found in Linux system FW dir")
return path
# Finally look in the current directory
if (path := pathlib.Path.cwd() / file_name).is_file():
logger.debug(f"{file_name} found in CWD")
return path
return None
def _parse_tlv(data: bytes) -> list[tuple[ValueType, Any]]:
result: list[tuple[ValueType, Any]] = []
while len(data) >= 2:
value_type = ValueType(data[0])
value_length = data[1]
value = data[2 : 2 + value_length]
typed_value: Any
if value_type == ValueType.END:
break
if value_type in (ValueType.CNVI, ValueType.CNVR):
(v,) = struct.unpack("<I", value)
typed_value = (
(((v >> 0) & 0xF) << 12)
| (((v >> 4) & 0xF) << 0)
| (((v >> 8) & 0xF) << 4)
| (((v >> 24) & 0xF) << 8)
)
elif value_type == ValueType.HARDWARE_INFO:
(v,) = struct.unpack("<I", value)
typed_value = HardwareInfo(
HardwarePlatform((v >> 8) & 0xFF), HardwareVariant((v >> 16) & 0x3F)
)
elif value_type in (
ValueType.USB_VENDOR_ID,
ValueType.USB_PRODUCT_ID,
ValueType.DEVICE_REVISION,
):
(typed_value,) = struct.unpack("<H", value)
elif value_type == ValueType.CURRENT_MODE_OF_OPERATION:
typed_value = ModeOfOperation(value[0])
elif value_type in (
ValueType.BUILD_TYPE,
ValueType.BUILD_NUMBER,
ValueType.SECURE_BOOT,
ValueType.OTP_LOCK,
ValueType.API_LOCK,
ValueType.DEBUG_LOCK,
ValueType.SECURE_BOOT_ENGINE_TYPE,
):
typed_value = value[0]
elif value_type == ValueType.TIMESTAMP:
typed_value = Timestamp(value[0], value[1])
elif value_type == ValueType.FIRMWARE_BUILD:
typed_value = FirmwareBuild(value[0], Timestamp(value[1], value[2]))
elif value_type == ValueType.BLUETOOTH_ADDRESS:
typed_value = hci.Address(
value, address_type=hci.Address.PUBLIC_DEVICE_ADDRESS
)
else:
typed_value = value
result.append((value_type, typed_value))
data = data[2 + value_length :]
return result
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class DriverError(core.BaseBumbleError):
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
def __str__(self) -> str:
return f"IntelDriverError({self.message})"
class ValueType(utils.OpenIntEnum):
END = 0x00
CNVI = 0x10
CNVR = 0x11
HARDWARE_INFO = 0x12
DEVICE_REVISION = 0x16
CURRENT_MODE_OF_OPERATION = 0x1C
USB_VENDOR_ID = 0x17
USB_PRODUCT_ID = 0x18
TIMESTAMP = 0x1D
BUILD_TYPE = 0x1E
BUILD_NUMBER = 0x1F
SECURE_BOOT = 0x28
OTP_LOCK = 0x2A
API_LOCK = 0x2B
DEBUG_LOCK = 0x2C
FIRMWARE_BUILD = 0x2D
SECURE_BOOT_ENGINE_TYPE = 0x2F
BLUETOOTH_ADDRESS = 0x30
class HardwarePlatform(utils.OpenIntEnum):
INTEL_37 = 0x37
class HardwareVariant(utils.OpenIntEnum):
# This is a just a partial list.
# Add other constants here as new hardware is encountered and tested.
TYPHOON_PEAK = 0x17
GALE_PEAK = 0x1C
@dataclasses.dataclass
class HardwareInfo:
platform: HardwarePlatform
variant: HardwareVariant
@dataclasses.dataclass
class Timestamp:
week: int
year: int
@dataclasses.dataclass
class FirmwareBuild:
build_number: int
timestamp: Timestamp
class ModeOfOperation(utils.OpenIntEnum):
BOOTLOADER = 0x01
INTERMEDIATE = 0x02
OPERATIONAL = 0x03
class SecureBootEngineType(utils.OpenIntEnum):
RSA = 0x00
ECDSA = 0x01
@dataclasses.dataclass
class BootParams:
css_header_offset: int
css_header_size: int
pki_offset: int
pki_size: int
sig_offset: int
sig_size: int
write_offset: int
_BOOT_PARAMS = {
SecureBootEngineType.RSA: BootParams(0, 128, 128, 256, 388, 256, 964),
SecureBootEngineType.ECDSA: BootParams(644, 128, 772, 96, 868, 96, 964),
}
class Driver(common.Driver):
def __init__(self, host):
def __init__(self, host: Host) -> None:
self.host = host
self.max_in_flight_firmware_load_commands = 1
self.pending_firmware_load_commands: Deque[hci.HCI_Command] = (
collections.deque()
)
self.can_send_firmware_load_command = asyncio.Event()
self.can_send_firmware_load_command.set()
self.firmware_load_complete = asyncio.Event()
self.reset_complete = asyncio.Event()
# Parse configuration options from the driver name.
self.ddc_addon: Optional[bytes] = None
self.ddc_override: Optional[bytes] = None
driver = host.hci_metadata.get("driver")
if driver is not None and driver.startswith("intel/"):
for key, value in [
key_eq_value.split(":") for key_eq_value in driver[6:].split("+")
]:
if key == "ddc_addon":
self.ddc_addon = bytes.fromhex(value)
elif key == "ddc_override":
self.ddc_override = bytes.fromhex(value)
@staticmethod
def check(host):
def check(host: Host) -> bool:
driver = host.hci_metadata.get("driver")
if driver == "intel":
if driver == "intel" or driver is not None and driver.startswith("intel/"):
return True
vendor_id = host.hci_metadata.get("vendor_id")
@@ -85,18 +389,283 @@ class Driver(common.Driver):
return True
@classmethod
async def for_host(cls, host, force=False): # type: ignore
async def for_host(cls, host: Host, force: bool = False):
# Only instantiate this driver if explicitly selected
if not force and not cls.check(host):
return None
return cls(host)
async def init_controller(self):
def on_packet(self, packet: bytes) -> None:
"""Handler for event packets that are received from an ACL channel"""
event = hci.HCI_Event.from_bytes(packet)
if not isinstance(event, hci.HCI_Command_Complete_Event):
self.host.on_hci_event_packet(event)
return
if not event.return_parameters == hci.HCI_SUCCESS:
raise DriverError("HCI_Command_Complete_Event error")
if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets:
logger.debug(
"max_in_flight_firmware_load_commands update: "
f"{event.num_hci_command_packets}"
)
self.max_in_flight_firmware_load_commands = event.num_hci_command_packets
logger.debug(f"event: {event}")
self.pending_firmware_load_commands.popleft()
in_flight = len(self.pending_firmware_load_commands)
logger.debug(f"event received, {in_flight} still in flight")
if in_flight < self.max_in_flight_firmware_load_commands:
self.can_send_firmware_load_command.set()
async def send_firmware_load_command(self, command: hci.HCI_Command) -> None:
# Wait until we can send.
await self.can_send_firmware_load_command.wait()
# Send the command and adjust counters.
self.host.send_hci_packet(command)
self.pending_firmware_load_commands.append(command)
in_flight = len(self.pending_firmware_load_commands)
if in_flight >= self.max_in_flight_firmware_load_commands:
logger.debug(f"max commands in flight reached [{in_flight}]")
self.can_send_firmware_load_command.clear()
async def send_firmware_data(self, data_type: int, data: bytes) -> None:
while data:
fragment_size = min(len(data), _MAX_FRAGMENT_SIZE)
fragment = data[:fragment_size]
data = data[fragment_size:]
await self.send_firmware_load_command(
Hci_Intel_Secure_Send_Command(data_type=data_type, data=fragment)
)
async def load_firmware(self) -> None:
self.host.ready = True
await self.host.send_command(HCI_Reset_Command(), check_result=True)
await self.host.send_command(
Hci_Intel_DDC_Config_Write_Command(
params=HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD
device_info = await self.read_device_info()
logger.debug(
"device info: \n%s",
"\n".join(
[
f" {value_type.name}: {value}"
for value_type, value in device_info.items()
]
),
)
# Check if the firmware is already loaded.
if (
device_info.get(ValueType.CURRENT_MODE_OF_OPERATION)
== ModeOfOperation.OPERATIONAL
):
logger.debug("firmware already loaded")
return
# We only support some platforms and variants.
hardware_info = device_info.get(ValueType.HARDWARE_INFO)
if hardware_info is None:
raise DriverError("hardware info missing")
if hardware_info.platform != HardwarePlatform.INTEL_37:
raise DriverError("hardware platform not supported")
if hardware_info.variant not in (
HardwareVariant.TYPHOON_PEAK,
HardwareVariant.GALE_PEAK,
):
raise DriverError("hardware variant not supported")
# Compute the firmware name.
if ValueType.CNVI not in device_info or ValueType.CNVR not in device_info:
raise DriverError("insufficient device info, missing CNVI or CNVR")
firmware_base_name = (
"ibt-"
f"{device_info[ValueType.CNVI]:04X}-"
f"{device_info[ValueType.CNVR]:04X}"
)
logger.debug(f"FW base name: {firmware_base_name}")
firmware_name = f"{firmware_base_name}.sfi"
firmware_path = _find_binary_path(firmware_name)
if not firmware_path:
logger.warning(f"Firmware file {firmware_name} not found")
logger.warning("See https://google.github.io/bumble/drivers/intel.html")
return None
logger.debug(f"loading firmware from {firmware_path}")
firmware_image = firmware_path.read_bytes()
engine_type = device_info.get(ValueType.SECURE_BOOT_ENGINE_TYPE)
if engine_type is None:
raise DriverError("secure boot engine type missing")
if engine_type not in _BOOT_PARAMS:
raise DriverError("secure boot engine type not supported")
boot_params = _BOOT_PARAMS[engine_type]
if len(firmware_image) < boot_params.write_offset:
raise DriverError("firmware image too small")
# Register to receive vendor events.
def on_vendor_event(event: hci.HCI_Vendor_Event):
logger.debug(f"vendor event: {event}")
event_type = event.parameters[0]
if event_type == 0x02:
# Boot event
logger.debug("boot complete")
self.reset_complete.set()
elif event_type == 0x06:
# Firmware load event
logger.debug("download complete")
self.firmware_load_complete.set()
else:
logger.debug(f"ignoring vendor event type {event_type}")
self.host.on("vendor_event", on_vendor_event)
# We need to temporarily intercept packets from the controller,
# because they are formatted as HCI event packets but are received
# on the ACL channel, so the host parser would get confused.
saved_on_packet = self.host.on_packet
self.host.on_packet = self.on_packet # type: ignore
self.firmware_load_complete.clear()
# Send the CSS header
data = firmware_image[
boot_params.css_header_offset : boot_params.css_header_offset
+ boot_params.css_header_size
]
await self.send_firmware_data(0x00, data)
# Send the PKI header
data = firmware_image[
boot_params.pki_offset : boot_params.pki_offset + boot_params.pki_size
]
await self.send_firmware_data(0x03, data)
# Send the Signature header
data = firmware_image[
boot_params.sig_offset : boot_params.sig_offset + boot_params.sig_size
]
await self.send_firmware_data(0x02, data)
# Send the rest of the image.
# The payload consists of command objects, which are sent when they add up
# to a multiple of 4 bytes.
boot_address = 0
offset = boot_params.write_offset
fragment_size = 0
while offset + 3 < len(firmware_image):
(command_opcode,) = struct.unpack_from(
"<H", firmware_image, offset + fragment_size
)
command_size = firmware_image[offset + fragment_size + 2]
if command_opcode == HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND:
(boot_address,) = struct.unpack_from(
"<I", firmware_image, offset + fragment_size + 3
)
logger.debug(
"found HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND, "
f"boot_address={boot_address}"
)
fragment_size += 3 + command_size
if fragment_size % 4 == 0:
await self.send_firmware_data(
0x01, firmware_image[offset : offset + fragment_size]
)
logger.debug(f"sent {fragment_size} bytes")
offset += fragment_size
fragment_size = 0
# Wait for the firmware loading to be complete.
logger.debug("waiting for firmware to be loaded")
await self.firmware_load_complete.wait()
logger.debug("firmware loaded")
# Restore the original packet handler.
self.host.on_packet = saved_on_packet # type: ignore
# Reset
self.reset_complete.clear()
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x00,
patch_enable=0x01,
ddc_reload=0x00,
boot_option=0x01,
boot_address=boot_address,
)
)
logger.debug("waiting for reset completion")
await self.reset_complete.wait()
logger.debug("reset complete")
# Load the device config if there is one.
if self.ddc_override:
logger.debug("loading overridden DDC")
await self.load_device_config(self.ddc_override)
else:
ddc_name = f"{firmware_base_name}.ddc"
ddc_path = _find_binary_path(ddc_name)
if ddc_path:
logger.debug(f"loading DDC from {ddc_path}")
ddc_data = ddc_path.read_bytes()
await self.load_device_config(ddc_data)
if self.ddc_addon:
logger.debug("loading DDC addon")
await self.load_device_config(self.ddc_addon)
async def load_device_config(self, ddc_data: bytes) -> None:
while ddc_data:
ddc_len = 1 + ddc_data[0]
ddc_payload = ddc_data[:ddc_len]
await self.host.send_command(
Hci_Intel_Write_Device_Config_Command(data=ddc_payload)
)
ddc_data = ddc_data[ddc_len:]
async def reboot_bootloader(self) -> None:
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x01,
patch_enable=0x01,
ddc_reload=0x01,
boot_option=0x00,
boot_address=0,
)
)
await asyncio.sleep(_POST_RESET_DELAY)
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response = await self.host.send_command(hci.HCI_Reset_Command())
if not (
isinstance(response, hci.HCI_Command_Complete_Event)
and response.return_parameters
in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS)
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
# HCI_UNKNOWN_HCI_COMMAND_ERROR is the expected response. Anything
# else is a failure.
logger.warning(f"unexpected response: {response}")
raise DriverError("unexpected HCI response")
# Read the firmware version.
response = await self.host.send_command(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if not isinstance(response, hci.HCI_Command_Complete_Event):
raise DriverError("unexpected HCI response")
if response.return_parameters.status != 0: # type: ignore
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
return dict(tlvs)
async def init_controller(self):
await self.load_firmware()

View File

@@ -898,6 +898,12 @@ class Client:
) and subscriber in subscribers:
subscribers.remove(subscriber)
# The characteristic itself is added as subscriber. If it is the
# last remaining subscriber, we remove it, such that the clean up
# works correctly. Otherwise the CCCD never is set back to 0.
if len(subscribers) == 1 and characteristic in subscribers:
subscribers.remove(characteristic)
# Cleanup if we removed the last one
if not subscribers:
del subscriber_set[characteristic.handle]

View File

@@ -4996,6 +4996,7 @@ class HCI_Event(HCI_Packet):
hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
@staticmethod
def event(fields=()):
@@ -5053,37 +5054,41 @@ class HCI_Event(HCI_Packet):
return event_class
@staticmethod
def from_bytes(packet: bytes) -> HCI_Event:
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Event:
event_code = packet[1]
length = packet[2]
parameters = packet[3:]
if len(parameters) != length:
raise InvalidPacketError('invalid packet length')
cls: Any
subclass: Any
if event_code == HCI_LE_META_EVENT:
# We do this dispatch here and not in the subclass in order to avoid call
# loops
subevent_code = parameters[0]
cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if cls is None:
subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if subclass is None:
# No class registered, just use a generic class instance
return HCI_LE_Meta_Event(subevent_code, parameters)
elif event_code == HCI_VENDOR_EVENT:
subevent_code = parameters[0]
cls = HCI_Vendor_Event.subevent_classes.get(subevent_code)
if cls is None:
# No class registered, just use a generic class instance
return HCI_Vendor_Event(subevent_code, parameters)
# Invoke all the registered factories to see if any of them can handle
# the event
if cls.vendor_factory:
if event := cls.vendor_factory(parameters):
return event
# No factory, or the factory could not create an instance,
# return a generic vendor event
return HCI_Event(event_code, parameters)
else:
cls = HCI_Event.event_classes.get(event_code)
if cls is None:
subclass = HCI_Event.event_classes.get(event_code)
if subclass is None:
# No class registered, just use a generic class instance
return HCI_Event(event_code, parameters)
# Invoke the factory to create a new instance
return cls.from_parameters(parameters) # type: ignore
return subclass.from_parameters(parameters) # type: ignore
@classmethod
def from_parameters(cls, parameters):
@@ -5129,11 +5134,11 @@ HCI_Event.register_events(globals())
# -----------------------------------------------------------------------------
class HCI_Extended_Event(HCI_Event):
'''
HCI_Event subclass for events that has a subevent code.
HCI_Event subclass for events that have a subevent code.
'''
subevent_names: Dict[int, str] = {}
subevent_classes: Dict[int, Type[HCI_Extended_Event]]
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod
def event(cls, fields=()):
@@ -5184,7 +5189,22 @@ class HCI_Extended_Event(HCI_Event):
cls.subevent_names.update(cls.subevent_map(symbols))
@classmethod
def from_parameters(cls, parameters):
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
"""
Factory method that parses the subevent code, finds a registered subclass,
and creates an instance if found.
"""
subevent_code = parameters[0]
if subclass := cls.subevent_classes.get(subevent_code):
return subclass.from_parameters(parameters)
return None
@classmethod
def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event:
"""Factory method for subclasses (the subevent code has already been parsed)"""
self = cls.__new__(cls)
HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
if fields := getattr(self, 'fields', None):
@@ -5225,12 +5245,6 @@ class HCI_LE_Meta_Event(HCI_Extended_Event):
HCI_LE_Meta_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
class HCI_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes = {}
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
@@ -6104,8 +6118,9 @@ class HCI_Command_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.14 Command Complete Event
'''
return_parameters = b''
num_hci_command_packets: int
command_opcode: int
return_parameters = b''
def map_return_parameters(self, return_parameters):
'''Map simple 'status' return parameters to their named constant form'''
@@ -6641,6 +6656,14 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('data', "*")])
class HCI_Vendor_Event(HCI_Event):
'''
See Bluetooth spec @ 5.4.4 HCI Event packet
'''
# -----------------------------------------------------------------------------
class HCI_AclDataPacket(HCI_Packet):
'''

View File

@@ -795,29 +795,32 @@ class HfProtocol(pyee.EventEmitter):
# Append to the read buffer.
self.read_buffer.extend(data)
# Locate header and trailer.
header = self.read_buffer.find(b'\r\n')
trailer = self.read_buffer.find(b'\r\n', header + 2)
if header == -1 or trailer == -1:
return
while self.read_buffer:
# Locate header and trailer.
header = self.read_buffer.find(b'\r\n')
trailer = self.read_buffer.find(b'\r\n', header + 2)
if header == -1 or trailer == -1:
return
# Isolate the AT response code and parameters.
raw_response = self.read_buffer[header + 2 : trailer]
response = AtResponse.parse_from(raw_response)
logger.debug(f"<<< {raw_response.decode()}")
# Isolate the AT response code and parameters.
raw_response = self.read_buffer[header + 2 : trailer]
response = AtResponse.parse_from(raw_response)
logger.debug(f"<<< {raw_response.decode()}")
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 2 :]
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 2 :]
# Forward the received code to the correct queue.
if self.command_lock.locked() and (
response.code in STATUS_CODES or response.code in RESPONSE_CODES
):
self.response_queue.put_nowait(response)
elif response.code in UNSOLICITED_CODES:
self.unsolicited_queue.put_nowait(response)
else:
logger.warning(f"dropping unexpected response with code '{response.code}'")
# Forward the received code to the correct queue.
if self.command_lock.locked() and (
response.code in STATUS_CODES or response.code in RESPONSE_CODES
):
self.response_queue.put_nowait(response)
elif response.code in UNSOLICITED_CODES:
self.unsolicited_queue.put_nowait(response)
else:
logger.warning(
f"dropping unexpected response with code '{response.code}'"
)
async def execute_command(
self,
@@ -1244,31 +1247,32 @@ class AgProtocol(pyee.EventEmitter):
# Append to the read buffer.
self.read_buffer.extend(data)
# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return
while self.read_buffer:
# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return
# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")
# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]
if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'
if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'
if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')
if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')
def send_response(self, response: str) -> None:
"""Sends an AT response."""

View File

@@ -552,7 +552,7 @@ class Host(AbortableEventEmitter):
return response
except Exception as error:
logger.warning(
logger.exception(
f'{color("!!! Exception while sending command:", "red")} {error}'
)
raise error
@@ -1248,3 +1248,6 @@ class Host(AbortableEventEmitter):
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)
def on_hci_vendor_event(self, event):
self.emit('vendor_event', event)

View File

@@ -1911,6 +1911,7 @@ class ChannelManager:
data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little')
else:
result = L2CAP_Information_Response.NOT_SUPPORTED
data = b''
self.send_control_frame(
connection,

View File

@@ -122,6 +122,8 @@ class LocalLink:
elif transport == BT_BR_EDR_TRANSPORT:
destination_controller = self.find_classic_controller(destination_address)
source_address = sender_controller.public_address
else:
raise ValueError("unsupported transport type")
if destination_controller is not None:
destination_controller.on_link_acl_data(source_address, transport, data)

View File

@@ -102,6 +102,7 @@ class ContextType(enum.IntFlag):
# fmt: off
PROHIBITED = 0x0000
UNSPECIFIED = 0x0001
CONVERSATIONAL = 0x0002
MEDIA = 0x0004
GAME = 0x0008
@@ -350,6 +351,7 @@ class CodecSpecificCapabilities:
supported_max_codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificCapabilities(
supported_sampling_frequencies=supported_sampling_frequencies,
supported_frame_durations=supported_frame_durations,
@@ -426,6 +428,7 @@ class CodecSpecificConfiguration:
codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency,
frame_duration=frame_duration,

View File

@@ -25,7 +25,7 @@ from bumble.utils import AsyncRunner, OpenIntEnum
from bumble.hci import Address
from dataclasses import dataclass, field
import logging
from typing import Dict, List, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set, Union
# -----------------------------------------------------------------------------
@@ -271,24 +271,12 @@ class HearingAccessService(gatt.TemplateService):
def on_disconnection(_reason) -> None:
self.currently_connected_clients.remove(connection)
# TODO Should we filter on device bonded && device is HAP ?
self.currently_connected_clients.add(connection)
if (
connection.peer_address
not in self.preset_changed_operations_history_per_device
):
self.preset_changed_operations_history_per_device[
connection.peer_address
] = []
return
@connection.on('pairing') # type: ignore
def on_pairing(*_: Any) -> None:
self.on_incoming_paired_connection(connection)
async def on_connection_async() -> None:
# Send all the PresetChangedOperation that occur when not connected
await self._preset_changed_operation(connection)
# Update the active preset index if needed
await self.notify_active_preset_for_connection(connection)
connection.abort_on('disconnection', on_connection_async())
if connection.peer_resolvable_address:
self.on_incoming_paired_connection(connection)
self.hearing_aid_features_characteristic = gatt.Characteristic(
uuid=gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC,
@@ -325,6 +313,27 @@ class HearingAccessService(gatt.TemplateService):
]
)
def on_incoming_paired_connection(self, connection: Connection):
'''Setup initial operations to handle a remote bonded HAP device'''
# TODO Should we filter on HAP device only ?
self.currently_connected_clients.add(connection)
if (
connection.peer_address
not in self.preset_changed_operations_history_per_device
):
self.preset_changed_operations_history_per_device[
connection.peer_address
] = []
return
async def on_connection_async() -> None:
# Send all the PresetChangedOperation that occur when not connected
await self._preset_changed_operation(connection)
# Update the active preset index if needed
await self.notify_active_preset_for_connection(connection)
connection.abort_on('disconnection', on_connection_async())
def _on_read_active_preset_index(
self, __connection__: Optional[Connection]
) -> bytes:

View File

@@ -434,6 +434,8 @@ class DataElement:
if size != 1:
raise InvalidArgumentError('boolean must be 1 byte')
size_index = 0
else:
raise RuntimeError("internal error - self.type not supported")
self.bytes = bytes([self.type << 3 | size_index]) + size_bytes + data
return self.bytes

View File

@@ -1839,7 +1839,7 @@ class Session:
if self.is_initiator:
if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command()
else:
elif self.pairing_method == PairingMethod.PASSKEY:
self.send_pairing_confirm_command()
else:
if self.pairing_method == PairingMethod.PASSKEY:

View File

@@ -20,12 +20,14 @@ import atexit
import logging
import os
import pathlib
import platform
import sys
from typing import Dict, Optional
import grpc.aio
from .common import (
import bumble
from bumble.transport.common import (
ParserSource,
PumpedTransport,
PumpedPacketSource,
@@ -36,15 +38,15 @@ from .common import (
)
# pylint: disable=no-name-in-module
from .grpc_protobuf.packet_streamer_pb2_grpc import (
from .grpc_protobuf.netsim.packet_streamer_pb2_grpc import (
PacketStreamerStub,
PacketStreamerServicer,
add_PacketStreamerServicer_to_server,
)
from .grpc_protobuf.packet_streamer_pb2 import PacketRequest, PacketResponse
from .grpc_protobuf.hci_packet_pb2 import HCIPacket
from .grpc_protobuf.startup_pb2 import Chip, ChipInfo
from .grpc_protobuf.common_pb2 import ChipKind
from .grpc_protobuf.netsim.packet_streamer_pb2 import PacketRequest, PacketResponse
from .grpc_protobuf.netsim.hci_packet_pb2 import HCIPacket
from .grpc_protobuf.netsim.startup_pb2 import Chip, ChipInfo, DeviceInfo
from .grpc_protobuf.netsim.common_pb2 import ChipKind
# -----------------------------------------------------------------------------
@@ -58,6 +60,7 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
DEFAULT_NAME = 'bumble0'
DEFAULT_MANUFACTURER = 'Bumble'
DEFAULT_VARIANT = ''
# -----------------------------------------------------------------------------
@@ -199,7 +202,6 @@ async def open_android_netsim_controller_transport(
data = (
bytes([request.hci_packet.packet_type]) + request.hci_packet.packet
)
logger.debug(f'<<< PACKET: {data.hex()}')
self.on_data_received(data)
async def send_packet(self, data):
@@ -253,7 +255,7 @@ async def open_android_netsim_controller_transport(
# Check that we don't already have a device
if self.device:
logger.debug('busy, already serving a device')
logger.debug('Busy, already serving a device')
return PacketResponse(error='Busy')
# Instantiate a new device
@@ -312,16 +314,24 @@ async def open_android_netsim_host_transport_with_channel(
):
# Wrapper for I/O operations
class HciDevice:
def __init__(self, name, manufacturer, hci_device):
def __init__(self, name, variant, manufacturer, hci_device):
self.name = name
self.variant = variant
self.manufacturer = manufacturer
self.hci_device = hci_device
async def start(self): # Send the startup info
chip_info = ChipInfo(
device_info = DeviceInfo(
name=self.name,
chip=Chip(kind=ChipKind.BLUETOOTH, manufacturer=self.manufacturer),
kind='BUMBLE',
version=bumble.__version__,
sdk_version=platform.python_version(),
build_id=platform.platform(),
arch=platform.machine(),
variant=self.variant,
)
chip = Chip(kind=ChipKind.BLUETOOTH, manufacturer=self.manufacturer)
chip_info = ChipInfo(name=self.name, chip=chip, device_info=device_info)
logger.debug(f'Sending chip info to netsim: {chip_info}')
await self.hci_device.write(PacketRequest(initial_info=chip_info))
@@ -349,12 +359,16 @@ async def open_android_netsim_host_transport_with_channel(
)
name = DEFAULT_NAME if options is None else options.get('name', DEFAULT_NAME)
variant = (
DEFAULT_VARIANT if options is None else options.get('variant', DEFAULT_VARIANT)
)
manufacturer = DEFAULT_MANUFACTURER
# Connect as a host
service = PacketStreamerStub(channel)
hci_device = HciDevice(
name=name,
variant=variant,
manufacturer=manufacturer,
hci_device=service.StreamPackets(),
)
@@ -404,6 +418,9 @@ async def open_android_netsim_transport(spec: Optional[str]) -> Transport:
The "chip" name, used to identify the "chip" instance. This
may be useful when several clients are connected, since each needs to use a
different name.
variant=<variant>
The device info variant field, which may be used to convey a device or
application type (ex: "virtual-speaker", or "keyboard")
In `controller` mode:
The <host>:<port> part is required. <host> may be the address of a local network

View File

@@ -370,11 +370,13 @@ class PumpedPacketSource(ParserSource):
self.parser.feed_data(packet)
except asyncio.CancelledError:
logger.debug('source pump task done')
self.terminated.set_result(None)
if not self.terminated.done():
self.terminated.set_result(None)
break
except Exception as error:
logger.warning(f'exception while waiting for packet: {error}')
self.terminated.set_exception(error)
if not self.terminated.done():
self.terminated.set_exception(error)
break
self.pump_task = asyncio.create_task(pump_packets())

View File

@@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: hci_packet.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10hci_packet.proto\x12\rnetsim.packet\"\xb2\x01\n\tHCIPacket\x12\x38\n\x0bpacket_type\x18\x01 \x01(\x0e\x32#.netsim.packet.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"[\n\nPacketType\x12\x1a\n\x16HCI_PACKET_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x43OMMAND\x10\x01\x12\x07\n\x03\x41\x43L\x10\x02\x12\x07\n\x03SCO\x10\x03\x12\t\n\x05\x45VENT\x10\x04\x12\x07\n\x03ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'hci_packet_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_HCIPACKET._serialized_start=36
_HCIPACKET._serialized_end=214
_HCIPACKET_PACKETTYPE._serialized_start=123
_HCIPACKET_PACKETTYPE._serialized_end=214
# @@protoc_insertion_point(module_scope)

View File

@@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: common.proto
# source: netsim/common.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@@ -13,13 +14,13 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x63ommon.proto\x12\rnetsim.common*=\n\x08\x43hipKind\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\r\n\tBLUETOOTH\x10\x01\x12\x08\n\x04WIFI\x10\x02\x12\x07\n\x03UWB\x10\x03\x62\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13netsim/common.proto\x12\rnetsim.common*S\n\x08\x43hipKind\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\r\n\tBLUETOOTH\x10\x01\x12\x08\n\x04WIFI\x10\x02\x12\x07\n\x03UWB\x10\x03\x12\x14\n\x10\x42LUETOOTH_BEACON\x10\x04\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'common_pb2', globals())
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.common_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_CHIPKIND._serialized_start=31
_CHIPKIND._serialized_end=92
_globals['_CHIPKIND']._serialized_start=38
_globals['_CHIPKIND']._serialized_end=121
# @@protoc_insertion_point(module_scope)

View File

@@ -2,11 +2,17 @@ from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from typing import ClassVar as _ClassVar
BLUETOOTH: ChipKind
DESCRIPTOR: _descriptor.FileDescriptor
UNSPECIFIED: ChipKind
UWB: ChipKind
WIFI: ChipKind
class ChipKind(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
__slots__ = ()
UNSPECIFIED: _ClassVar[ChipKind]
BLUETOOTH: _ClassVar[ChipKind]
WIFI: _ClassVar[ChipKind]
UWB: _ClassVar[ChipKind]
BLUETOOTH_BEACON: _ClassVar[ChipKind]
UNSPECIFIED: ChipKind
BLUETOOTH: ChipKind
WIFI: ChipKind
UWB: ChipKind
BLUETOOTH_BEACON: ChipKind

View File

@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: netsim/hci_packet.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x17netsim/hci_packet.proto\x12\rnetsim.packet\"\xb2\x01\n\tHCIPacket\x12\x38\n\x0bpacket_type\x18\x01 \x01(\x0e\x32#.netsim.packet.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"[\n\nPacketType\x12\x1a\n\x16HCI_PACKET_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x43OMMAND\x10\x01\x12\x07\n\x03\x41\x43L\x10\x02\x12\x07\n\x03SCO\x10\x03\x12\t\n\x05\x45VENT\x10\x04\x12\x07\n\x03ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.hci_packet_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'\n\037com.android.emulation.bluetoothP\001\370\001\001\242\002\003AEB\252\002\033Android.Emulation.Bluetooth'
_globals['_HCIPACKET']._serialized_start=43
_globals['_HCIPACKET']._serialized_end=221
_globals['_HCIPACKET_PACKETTYPE']._serialized_start=130
_globals['_HCIPACKET_PACKETTYPE']._serialized_end=221
# @@protoc_insertion_point(module_scope)

View File

@@ -6,17 +6,23 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class HCIPacket(_message.Message):
__slots__ = ["packet", "packet_type"]
__slots__ = ("packet_type", "packet")
class PacketType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
ACL: HCIPacket.PacketType
COMMAND: HCIPacket.PacketType
EVENT: HCIPacket.PacketType
__slots__ = ()
HCI_PACKET_UNSPECIFIED: _ClassVar[HCIPacket.PacketType]
COMMAND: _ClassVar[HCIPacket.PacketType]
ACL: _ClassVar[HCIPacket.PacketType]
SCO: _ClassVar[HCIPacket.PacketType]
EVENT: _ClassVar[HCIPacket.PacketType]
ISO: _ClassVar[HCIPacket.PacketType]
HCI_PACKET_UNSPECIFIED: HCIPacket.PacketType
ISO: HCIPacket.PacketType
PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_TYPE_FIELD_NUMBER: _ClassVar[int]
COMMAND: HCIPacket.PacketType
ACL: HCIPacket.PacketType
SCO: HCIPacket.PacketType
packet: bytes
EVENT: HCIPacket.PacketType
ISO: HCIPacket.PacketType
PACKET_TYPE_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]
packet_type: HCIPacket.PacketType
packet: bytes
def __init__(self, packet_type: _Optional[_Union[HCIPacket.PacketType, str]] = ..., packet: _Optional[bytes] = ...) -> None: ...

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,238 @@
from bumble.transport.grpc_protobuf.netsim import common_pb2 as _common_pb2
from google.protobuf import timestamp_pb2 as _timestamp_pb2
from bumble.transport.grpc_protobuf.rootcanal import configuration_pb2 as _configuration_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class PhyKind(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
NONE: _ClassVar[PhyKind]
BLUETOOTH_CLASSIC: _ClassVar[PhyKind]
BLUETOOTH_LOW_ENERGY: _ClassVar[PhyKind]
WIFI: _ClassVar[PhyKind]
UWB: _ClassVar[PhyKind]
WIFI_RTT: _ClassVar[PhyKind]
NONE: PhyKind
BLUETOOTH_CLASSIC: PhyKind
BLUETOOTH_LOW_ENERGY: PhyKind
WIFI: PhyKind
UWB: PhyKind
WIFI_RTT: PhyKind
class Position(_message.Message):
__slots__ = ("x", "y", "z")
X_FIELD_NUMBER: _ClassVar[int]
Y_FIELD_NUMBER: _ClassVar[int]
Z_FIELD_NUMBER: _ClassVar[int]
x: float
y: float
z: float
def __init__(self, x: _Optional[float] = ..., y: _Optional[float] = ..., z: _Optional[float] = ...) -> None: ...
class Orientation(_message.Message):
__slots__ = ("yaw", "pitch", "roll")
YAW_FIELD_NUMBER: _ClassVar[int]
PITCH_FIELD_NUMBER: _ClassVar[int]
ROLL_FIELD_NUMBER: _ClassVar[int]
yaw: float
pitch: float
roll: float
def __init__(self, yaw: _Optional[float] = ..., pitch: _Optional[float] = ..., roll: _Optional[float] = ...) -> None: ...
class Chip(_message.Message):
__slots__ = ("kind", "id", "name", "manufacturer", "product_name", "bt", "ble_beacon", "uwb", "wifi", "offset")
class Radio(_message.Message):
__slots__ = ("state", "range", "tx_count", "rx_count")
STATE_FIELD_NUMBER: _ClassVar[int]
RANGE_FIELD_NUMBER: _ClassVar[int]
TX_COUNT_FIELD_NUMBER: _ClassVar[int]
RX_COUNT_FIELD_NUMBER: _ClassVar[int]
state: bool
range: float
tx_count: int
rx_count: int
def __init__(self, state: bool = ..., range: _Optional[float] = ..., tx_count: _Optional[int] = ..., rx_count: _Optional[int] = ...) -> None: ...
class Bluetooth(_message.Message):
__slots__ = ("low_energy", "classic", "address", "bt_properties")
LOW_ENERGY_FIELD_NUMBER: _ClassVar[int]
CLASSIC_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
BT_PROPERTIES_FIELD_NUMBER: _ClassVar[int]
low_energy: Chip.Radio
classic: Chip.Radio
address: str
bt_properties: _configuration_pb2.Controller
def __init__(self, low_energy: _Optional[_Union[Chip.Radio, _Mapping]] = ..., classic: _Optional[_Union[Chip.Radio, _Mapping]] = ..., address: _Optional[str] = ..., bt_properties: _Optional[_Union[_configuration_pb2.Controller, _Mapping]] = ...) -> None: ...
class BleBeacon(_message.Message):
__slots__ = ("bt", "address", "settings", "adv_data", "scan_response")
class AdvertiseSettings(_message.Message):
__slots__ = ("advertise_mode", "milliseconds", "tx_power_level", "dbm", "scannable", "timeout")
class AdvertiseMode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
LOW_POWER: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode]
BALANCED: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode]
LOW_LATENCY: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode]
LOW_POWER: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
BALANCED: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
LOW_LATENCY: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
class AdvertiseTxPower(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
ULTRA_LOW: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
LOW: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
MEDIUM: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
HIGH: _ClassVar[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower]
ULTRA_LOW: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
LOW: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
MEDIUM: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
HIGH: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
ADVERTISE_MODE_FIELD_NUMBER: _ClassVar[int]
MILLISECONDS_FIELD_NUMBER: _ClassVar[int]
TX_POWER_LEVEL_FIELD_NUMBER: _ClassVar[int]
DBM_FIELD_NUMBER: _ClassVar[int]
SCANNABLE_FIELD_NUMBER: _ClassVar[int]
TIMEOUT_FIELD_NUMBER: _ClassVar[int]
advertise_mode: Chip.BleBeacon.AdvertiseSettings.AdvertiseMode
milliseconds: int
tx_power_level: Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower
dbm: int
scannable: bool
timeout: int
def __init__(self, advertise_mode: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings.AdvertiseMode, str]] = ..., milliseconds: _Optional[int] = ..., tx_power_level: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings.AdvertiseTxPower, str]] = ..., dbm: _Optional[int] = ..., scannable: bool = ..., timeout: _Optional[int] = ...) -> None: ...
class AdvertiseData(_message.Message):
__slots__ = ("include_device_name", "include_tx_power_level", "manufacturer_data", "services")
class Service(_message.Message):
__slots__ = ("uuid", "data")
UUID_FIELD_NUMBER: _ClassVar[int]
DATA_FIELD_NUMBER: _ClassVar[int]
uuid: str
data: bytes
def __init__(self, uuid: _Optional[str] = ..., data: _Optional[bytes] = ...) -> None: ...
INCLUDE_DEVICE_NAME_FIELD_NUMBER: _ClassVar[int]
INCLUDE_TX_POWER_LEVEL_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_DATA_FIELD_NUMBER: _ClassVar[int]
SERVICES_FIELD_NUMBER: _ClassVar[int]
include_device_name: bool
include_tx_power_level: bool
manufacturer_data: bytes
services: _containers.RepeatedCompositeFieldContainer[Chip.BleBeacon.AdvertiseData.Service]
def __init__(self, include_device_name: bool = ..., include_tx_power_level: bool = ..., manufacturer_data: _Optional[bytes] = ..., services: _Optional[_Iterable[_Union[Chip.BleBeacon.AdvertiseData.Service, _Mapping]]] = ...) -> None: ...
BT_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
SETTINGS_FIELD_NUMBER: _ClassVar[int]
ADV_DATA_FIELD_NUMBER: _ClassVar[int]
SCAN_RESPONSE_FIELD_NUMBER: _ClassVar[int]
bt: Chip.Bluetooth
address: str
settings: Chip.BleBeacon.AdvertiseSettings
adv_data: Chip.BleBeacon.AdvertiseData
scan_response: Chip.BleBeacon.AdvertiseData
def __init__(self, bt: _Optional[_Union[Chip.Bluetooth, _Mapping]] = ..., address: _Optional[str] = ..., settings: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings, _Mapping]] = ..., adv_data: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ..., scan_response: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ...) -> None: ...
KIND_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
BT_FIELD_NUMBER: _ClassVar[int]
BLE_BEACON_FIELD_NUMBER: _ClassVar[int]
UWB_FIELD_NUMBER: _ClassVar[int]
WIFI_FIELD_NUMBER: _ClassVar[int]
OFFSET_FIELD_NUMBER: _ClassVar[int]
kind: _common_pb2.ChipKind
id: int
name: str
manufacturer: str
product_name: str
bt: Chip.Bluetooth
ble_beacon: Chip.BleBeacon
uwb: Chip.Radio
wifi: Chip.Radio
offset: Position
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., id: _Optional[int] = ..., name: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., bt: _Optional[_Union[Chip.Bluetooth, _Mapping]] = ..., ble_beacon: _Optional[_Union[Chip.BleBeacon, _Mapping]] = ..., uwb: _Optional[_Union[Chip.Radio, _Mapping]] = ..., wifi: _Optional[_Union[Chip.Radio, _Mapping]] = ..., offset: _Optional[_Union[Position, _Mapping]] = ...) -> None: ...
class ChipCreate(_message.Message):
__slots__ = ("kind", "address", "name", "manufacturer", "product_name", "ble_beacon", "bt_properties")
class BleBeaconCreate(_message.Message):
__slots__ = ("address", "settings", "adv_data", "scan_response")
ADDRESS_FIELD_NUMBER: _ClassVar[int]
SETTINGS_FIELD_NUMBER: _ClassVar[int]
ADV_DATA_FIELD_NUMBER: _ClassVar[int]
SCAN_RESPONSE_FIELD_NUMBER: _ClassVar[int]
address: str
settings: Chip.BleBeacon.AdvertiseSettings
adv_data: Chip.BleBeacon.AdvertiseData
scan_response: Chip.BleBeacon.AdvertiseData
def __init__(self, address: _Optional[str] = ..., settings: _Optional[_Union[Chip.BleBeacon.AdvertiseSettings, _Mapping]] = ..., adv_data: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ..., scan_response: _Optional[_Union[Chip.BleBeacon.AdvertiseData, _Mapping]] = ...) -> None: ...
KIND_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
BLE_BEACON_FIELD_NUMBER: _ClassVar[int]
BT_PROPERTIES_FIELD_NUMBER: _ClassVar[int]
kind: _common_pb2.ChipKind
address: str
name: str
manufacturer: str
product_name: str
ble_beacon: ChipCreate.BleBeaconCreate
bt_properties: _configuration_pb2.Controller
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., address: _Optional[str] = ..., name: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., ble_beacon: _Optional[_Union[ChipCreate.BleBeaconCreate, _Mapping]] = ..., bt_properties: _Optional[_Union[_configuration_pb2.Controller, _Mapping]] = ...) -> None: ...
class Device(_message.Message):
__slots__ = ("id", "name", "visible", "position", "orientation", "chips")
ID_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
VISIBLE_FIELD_NUMBER: _ClassVar[int]
POSITION_FIELD_NUMBER: _ClassVar[int]
ORIENTATION_FIELD_NUMBER: _ClassVar[int]
CHIPS_FIELD_NUMBER: _ClassVar[int]
id: int
name: str
visible: bool
position: Position
orientation: Orientation
chips: _containers.RepeatedCompositeFieldContainer[Chip]
def __init__(self, id: _Optional[int] = ..., name: _Optional[str] = ..., visible: bool = ..., position: _Optional[_Union[Position, _Mapping]] = ..., orientation: _Optional[_Union[Orientation, _Mapping]] = ..., chips: _Optional[_Iterable[_Union[Chip, _Mapping]]] = ...) -> None: ...
class DeviceCreate(_message.Message):
__slots__ = ("name", "position", "orientation", "chips")
NAME_FIELD_NUMBER: _ClassVar[int]
POSITION_FIELD_NUMBER: _ClassVar[int]
ORIENTATION_FIELD_NUMBER: _ClassVar[int]
CHIPS_FIELD_NUMBER: _ClassVar[int]
name: str
position: Position
orientation: Orientation
chips: _containers.RepeatedCompositeFieldContainer[ChipCreate]
def __init__(self, name: _Optional[str] = ..., position: _Optional[_Union[Position, _Mapping]] = ..., orientation: _Optional[_Union[Orientation, _Mapping]] = ..., chips: _Optional[_Iterable[_Union[ChipCreate, _Mapping]]] = ...) -> None: ...
class Scene(_message.Message):
__slots__ = ("devices",)
DEVICES_FIELD_NUMBER: _ClassVar[int]
devices: _containers.RepeatedCompositeFieldContainer[Device]
def __init__(self, devices: _Optional[_Iterable[_Union[Device, _Mapping]]] = ...) -> None: ...
class Capture(_message.Message):
__slots__ = ("id", "chip_kind", "device_name", "state", "size", "records", "timestamp", "valid")
ID_FIELD_NUMBER: _ClassVar[int]
CHIP_KIND_FIELD_NUMBER: _ClassVar[int]
DEVICE_NAME_FIELD_NUMBER: _ClassVar[int]
STATE_FIELD_NUMBER: _ClassVar[int]
SIZE_FIELD_NUMBER: _ClassVar[int]
RECORDS_FIELD_NUMBER: _ClassVar[int]
TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
VALID_FIELD_NUMBER: _ClassVar[int]
id: int
chip_kind: _common_pb2.ChipKind
device_name: str
state: bool
size: int
records: int
timestamp: _timestamp_pb2.Timestamp
valid: bool
def __init__(self, id: _Optional[int] = ..., chip_kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., device_name: _Optional[str] = ..., state: bool = ..., size: _Optional[int] = ..., records: _Optional[int] = ..., timestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., valid: bool = ...) -> None: ...

View File

@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: netsim/packet_streamer.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from bumble.transport.grpc_protobuf.netsim import hci_packet_pb2 as netsim_dot_hci__packet__pb2
from bumble.transport.grpc_protobuf.netsim import startup_pb2 as netsim_dot_startup__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1cnetsim/packet_streamer.proto\x12\rnetsim.packet\x1a\x17netsim/hci_packet.proto\x1a\x14netsim/startup.proto\"\x93\x01\n\rPacketRequest\x12\x30\n\x0cinitial_info\x18\x01 \x01(\x0b\x32\x18.netsim.startup.ChipInfoH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0e\n\x0crequest_type\"t\n\x0ePacketResponse\x12\x0f\n\x05\x65rror\x18\x01 \x01(\tH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0f\n\rresponse_type2b\n\x0ePacketStreamer\x12P\n\rStreamPackets\x12\x1c.netsim.packet.PacketRequest\x1a\x1d.netsim.packet.PacketResponse(\x01\x30\x01\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.packet_streamer_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_PACKETREQUEST']._serialized_start=95
_globals['_PACKETREQUEST']._serialized_end=242
_globals['_PACKETRESPONSE']._serialized_start=244
_globals['_PACKETRESPONSE']._serialized_end=360
_globals['_PACKETSTREAMER']._serialized_start=362
_globals['_PACKETSTREAMER']._serialized_end=460
# @@protoc_insertion_point(module_scope)

View File

@@ -1,5 +1,5 @@
from . import hci_packet_pb2 as _hci_packet_pb2
from . import startup_pb2 as _startup_pb2
from bumble.transport.grpc_protobuf.netsim import hci_packet_pb2 as _hci_packet_pb2
from bumble.transport.grpc_protobuf.netsim import startup_pb2 as _startup_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union
@@ -7,17 +7,17 @@ from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Opti
DESCRIPTOR: _descriptor.FileDescriptor
class PacketRequest(_message.Message):
__slots__ = ["hci_packet", "initial_info", "packet"]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
__slots__ = ("initial_info", "hci_packet", "packet")
INITIAL_INFO_FIELD_NUMBER: _ClassVar[int]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]
hci_packet: _hci_packet_pb2.HCIPacket
initial_info: _startup_pb2.ChipInfo
hci_packet: _hci_packet_pb2.HCIPacket
packet: bytes
def __init__(self, initial_info: _Optional[_Union[_startup_pb2.ChipInfo, _Mapping]] = ..., hci_packet: _Optional[_Union[_hci_packet_pb2.HCIPacket, _Mapping]] = ..., packet: _Optional[bytes] = ...) -> None: ...
class PacketResponse(_message.Message):
__slots__ = ["error", "hci_packet", "packet"]
__slots__ = ("error", "hci_packet", "packet")
ERROR_FIELD_NUMBER: _ClassVar[int]
HCI_PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_FIELD_NUMBER: _ClassVar[int]

View File

@@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import packet_streamer_pb2 as packet__streamer__pb2
from bumble.transport.grpc_protobuf.netsim import packet_streamer_pb2 as netsim_dot_packet__streamer__pb2
class PacketStreamerStub(object):
@@ -30,8 +30,8 @@ class PacketStreamerStub(object):
"""
self.StreamPackets = channel.stream_stream(
'/netsim.packet.PacketStreamer/StreamPackets',
request_serializer=packet__streamer__pb2.PacketRequest.SerializeToString,
response_deserializer=packet__streamer__pb2.PacketResponse.FromString,
request_serializer=netsim_dot_packet__streamer__pb2.PacketRequest.SerializeToString,
response_deserializer=netsim_dot_packet__streamer__pb2.PacketResponse.FromString,
)
@@ -64,8 +64,8 @@ def add_PacketStreamerServicer_to_server(servicer, server):
rpc_method_handlers = {
'StreamPackets': grpc.stream_stream_rpc_method_handler(
servicer.StreamPackets,
request_deserializer=packet__streamer__pb2.PacketRequest.FromString,
response_serializer=packet__streamer__pb2.PacketResponse.SerializeToString,
request_deserializer=netsim_dot_packet__streamer__pb2.PacketRequest.FromString,
response_serializer=netsim_dot_packet__streamer__pb2.PacketResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
@@ -103,7 +103,7 @@ class PacketStreamer(object):
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/netsim.packet.PacketStreamer/StreamPackets',
packet__streamer__pb2.PacketRequest.SerializeToString,
packet__streamer__pb2.PacketResponse.FromString,
netsim_dot_packet__streamer__pb2.PacketRequest.SerializeToString,
netsim_dot_packet__streamer__pb2.PacketResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View File

@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: netsim/startup.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from bumble.transport.grpc_protobuf.netsim import common_pb2 as netsim_dot_common__pb2
from bumble.transport.grpc_protobuf.netsim import model_pb2 as netsim_dot_model__pb2
from bumble.transport.grpc_protobuf.rootcanal import configuration_pb2 as rootcanal_dot_configuration__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14netsim/startup.proto\x12\x0enetsim.startup\x1a\x13netsim/common.proto\x1a\x12netsim/model.proto\x1a\x1drootcanal/configuration.proto\"\xb4\x01\n\x0bStartupInfo\x12\x33\n\x07\x64\x65vices\x18\x01 \x03(\x0b\x32\".netsim.startup.StartupInfo.Device\x1ap\n\x06\x44\x65vice\x12\x10\n\x04name\x18\x01 \x01(\tB\x02\x18\x01\x12#\n\x05\x63hips\x18\x02 \x03(\x0b\x32\x14.netsim.startup.Chip\x12/\n\x0b\x64\x65vice_info\x18\x03 \x01(\x0b\x32\x1a.netsim.startup.DeviceInfo\"q\n\x08\x43hipInfo\x12\x10\n\x04name\x18\x01 \x01(\tB\x02\x18\x01\x12\"\n\x04\x63hip\x18\x02 \x01(\x0b\x32\x14.netsim.startup.Chip\x12/\n\x0b\x64\x65vice_info\x18\x03 \x01(\x0b\x32\x1a.netsim.startup.DeviceInfo\"\x7f\n\nDeviceInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04kind\x18\x02 \x01(\t\x12\x0f\n\x07version\x18\x03 \x01(\t\x12\x13\n\x0bsdk_version\x18\x04 \x01(\t\x12\x10\n\x08\x62uild_id\x18\x05 \x01(\t\x12\x0f\n\x07variant\x18\x06 \x01(\t\x12\x0c\n\x04\x61rch\x18\x07 \x01(\t\"\x9b\x02\n\x04\x43hip\x12%\n\x04kind\x18\x01 \x01(\x0e\x32\x17.netsim.common.ChipKind\x12\n\n\x02id\x18\x02 \x01(\t\x12\x14\n\x0cmanufacturer\x18\x03 \x01(\t\x12\x14\n\x0cproduct_name\x18\x04 \x01(\t\x12\r\n\x05\x66\x64_in\x18\x05 \x01(\x05\x12\x0e\n\x06\x66\x64_out\x18\x06 \x01(\x05\x12\x10\n\x08loopback\x18\x07 \x01(\x08\x12:\n\rbt_properties\x18\x08 \x01(\x0b\x32#.rootcanal.configuration.Controller\x12\x0f\n\x07\x61\x64\x64ress\x18\t \x01(\t\x12+\n\x06offset\x18\n \x01(\x0b\x32\x16.netsim.model.PositionH\x00\x88\x01\x01\x42\t\n\x07_offsetb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'netsim.startup_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_STARTUPINFO_DEVICE'].fields_by_name['name']._options = None
_globals['_STARTUPINFO_DEVICE'].fields_by_name['name']._serialized_options = b'\030\001'
_globals['_CHIPINFO'].fields_by_name['name']._options = None
_globals['_CHIPINFO'].fields_by_name['name']._serialized_options = b'\030\001'
_globals['_STARTUPINFO']._serialized_start=113
_globals['_STARTUPINFO']._serialized_end=293
_globals['_STARTUPINFO_DEVICE']._serialized_start=181
_globals['_STARTUPINFO_DEVICE']._serialized_end=293
_globals['_CHIPINFO']._serialized_start=295
_globals['_CHIPINFO']._serialized_end=408
_globals['_DEVICEINFO']._serialized_start=410
_globals['_DEVICEINFO']._serialized_end=537
_globals['_CHIP']._serialized_start=540
_globals['_CHIP']._serialized_end=823
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,76 @@
from bumble.transport.grpc_protobuf.netsim import common_pb2 as _common_pb2
from bumble.transport.grpc_protobuf.netsim import model_pb2 as _model_pb2
from bumble.transport.grpc_protobuf.rootcanal import configuration_pb2 as _configuration_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class StartupInfo(_message.Message):
__slots__ = ("devices",)
class Device(_message.Message):
__slots__ = ("name", "chips", "device_info")
NAME_FIELD_NUMBER: _ClassVar[int]
CHIPS_FIELD_NUMBER: _ClassVar[int]
DEVICE_INFO_FIELD_NUMBER: _ClassVar[int]
name: str
chips: _containers.RepeatedCompositeFieldContainer[Chip]
device_info: DeviceInfo
def __init__(self, name: _Optional[str] = ..., chips: _Optional[_Iterable[_Union[Chip, _Mapping]]] = ..., device_info: _Optional[_Union[DeviceInfo, _Mapping]] = ...) -> None: ...
DEVICES_FIELD_NUMBER: _ClassVar[int]
devices: _containers.RepeatedCompositeFieldContainer[StartupInfo.Device]
def __init__(self, devices: _Optional[_Iterable[_Union[StartupInfo.Device, _Mapping]]] = ...) -> None: ...
class ChipInfo(_message.Message):
__slots__ = ("name", "chip", "device_info")
NAME_FIELD_NUMBER: _ClassVar[int]
CHIP_FIELD_NUMBER: _ClassVar[int]
DEVICE_INFO_FIELD_NUMBER: _ClassVar[int]
name: str
chip: Chip
device_info: DeviceInfo
def __init__(self, name: _Optional[str] = ..., chip: _Optional[_Union[Chip, _Mapping]] = ..., device_info: _Optional[_Union[DeviceInfo, _Mapping]] = ...) -> None: ...
class DeviceInfo(_message.Message):
__slots__ = ("name", "kind", "version", "sdk_version", "build_id", "variant", "arch")
NAME_FIELD_NUMBER: _ClassVar[int]
KIND_FIELD_NUMBER: _ClassVar[int]
VERSION_FIELD_NUMBER: _ClassVar[int]
SDK_VERSION_FIELD_NUMBER: _ClassVar[int]
BUILD_ID_FIELD_NUMBER: _ClassVar[int]
VARIANT_FIELD_NUMBER: _ClassVar[int]
ARCH_FIELD_NUMBER: _ClassVar[int]
name: str
kind: str
version: str
sdk_version: str
build_id: str
variant: str
arch: str
def __init__(self, name: _Optional[str] = ..., kind: _Optional[str] = ..., version: _Optional[str] = ..., sdk_version: _Optional[str] = ..., build_id: _Optional[str] = ..., variant: _Optional[str] = ..., arch: _Optional[str] = ...) -> None: ...
class Chip(_message.Message):
__slots__ = ("kind", "id", "manufacturer", "product_name", "fd_in", "fd_out", "loopback", "bt_properties", "address", "offset")
KIND_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
FD_IN_FIELD_NUMBER: _ClassVar[int]
FD_OUT_FIELD_NUMBER: _ClassVar[int]
LOOPBACK_FIELD_NUMBER: _ClassVar[int]
BT_PROPERTIES_FIELD_NUMBER: _ClassVar[int]
ADDRESS_FIELD_NUMBER: _ClassVar[int]
OFFSET_FIELD_NUMBER: _ClassVar[int]
kind: _common_pb2.ChipKind
id: str
manufacturer: str
product_name: str
fd_in: int
fd_out: int
loopback: bool
bt_properties: _configuration_pb2.Controller
address: str
offset: _model_pb2.Position
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., id: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., fd_in: _Optional[int] = ..., fd_out: _Optional[int] = ..., loopback: bool = ..., bt_properties: _Optional[_Union[_configuration_pb2.Controller, _Mapping]] = ..., address: _Optional[str] = ..., offset: _Optional[_Union[_model_pb2.Position, _Mapping]] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -1,31 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: packet_streamer.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import hci_packet_pb2 as hci__packet__pb2
from . import startup_pb2 as startup__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15packet_streamer.proto\x12\rnetsim.packet\x1a\x10hci_packet.proto\x1a\rstartup.proto\"\x93\x01\n\rPacketRequest\x12\x30\n\x0cinitial_info\x18\x01 \x01(\x0b\x32\x18.netsim.startup.ChipInfoH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0e\n\x0crequest_type\"t\n\x0ePacketResponse\x12\x0f\n\x05\x65rror\x18\x01 \x01(\tH\x00\x12.\n\nhci_packet\x18\x02 \x01(\x0b\x32\x18.netsim.packet.HCIPacketH\x00\x12\x10\n\x06packet\x18\x03 \x01(\x0cH\x00\x42\x0f\n\rresponse_type2b\n\x0ePacketStreamer\x12P\n\rStreamPackets\x12\x1c.netsim.packet.PacketRequest\x1a\x1d.netsim.packet.PacketResponse(\x01\x30\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'packet_streamer_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_PACKETREQUEST._serialized_start=74
_PACKETREQUEST._serialized_end=221
_PACKETRESPONSE._serialized_start=223
_PACKETRESPONSE._serialized_end=339
_PACKETSTREAMER._serialized_start=341
_PACKETSTREAMER._serialized_end=439
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: rootcanal/configuration.proto
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1drootcanal/configuration.proto\x12\x17rootcanal.configuration\"\xbc\x01\n\x12\x43ontrollerFeatures\x12\x1f\n\x17le_extended_advertising\x18\x01 \x01(\x08\x12\x1f\n\x17le_periodic_advertising\x18\x02 \x01(\x08\x12\x12\n\nll_privacy\x18\x03 \x01(\x08\x12\x11\n\tle_2m_phy\x18\x04 \x01(\x08\x12\x14\n\x0cle_coded_phy\x18\x05 \x01(\x08\x12\'\n\x1fle_connected_isochronous_stream\x18\x06 \x01(\x08\"\x8d\x01\n\x10\x43ontrollerQuirks\x12\x30\n(send_acl_data_before_connection_complete\x18\x01 \x01(\x08\x12\"\n\x1ahas_default_random_address\x18\x02 \x01(\x08\x12#\n\x1bhardware_error_before_reset\x18\x03 \x01(\x08\".\n\x0eVendorFeatures\x12\x0b\n\x03\x63sr\x18\x01 \x01(\x08\x12\x0f\n\x07\x61ndroid\x18\x02 \x01(\x08\"\x8a\x02\n\nController\x12\x39\n\x06preset\x18\x01 \x01(\x0e\x32).rootcanal.configuration.ControllerPreset\x12=\n\x08\x66\x65\x61tures\x18\x02 \x01(\x0b\x32+.rootcanal.configuration.ControllerFeatures\x12\x39\n\x06quirks\x18\x03 \x01(\x0b\x32).rootcanal.configuration.ControllerQuirks\x12\x0e\n\x06strict\x18\x04 \x01(\x08\x12\x37\n\x06vendor\x18\x05 \x01(\x0b\x32\'.rootcanal.configuration.VendorFeatures\"Y\n\tTcpServer\x12\x10\n\x08tcp_port\x18\x01 \x02(\x05\x12:\n\rconfiguration\x18\x02 \x01(\x0b\x32#.rootcanal.configuration.Controller\"G\n\rConfiguration\x12\x36\n\ntcp_server\x18\x01 \x03(\x0b\x32\".rootcanal.configuration.TcpServer*H\n\x10\x43ontrollerPreset\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x0f\n\x0bLAIRD_BL654\x10\x01\x12\x16\n\x12\x43SR_RCK_PTS_DONGLE\x10\x02\x42\x02H\x02')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'rootcanal.configuration_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
_globals['DESCRIPTOR']._options = None
_globals['DESCRIPTOR']._serialized_options = b'H\002'
_globals['_CONTROLLERPRESET']._serialized_start=874
_globals['_CONTROLLERPRESET']._serialized_end=946
_globals['_CONTROLLERFEATURES']._serialized_start=59
_globals['_CONTROLLERFEATURES']._serialized_end=247
_globals['_CONTROLLERQUIRKS']._serialized_start=250
_globals['_CONTROLLERQUIRKS']._serialized_end=391
_globals['_VENDORFEATURES']._serialized_start=393
_globals['_VENDORFEATURES']._serialized_end=439
_globals['_CONTROLLER']._serialized_start=442
_globals['_CONTROLLER']._serialized_end=708
_globals['_TCPSERVER']._serialized_start=710
_globals['_TCPSERVER']._serialized_end=799
_globals['_CONFIGURATION']._serialized_start=801
_globals['_CONFIGURATION']._serialized_end=872
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,78 @@
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class ControllerPreset(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
DEFAULT: _ClassVar[ControllerPreset]
LAIRD_BL654: _ClassVar[ControllerPreset]
CSR_RCK_PTS_DONGLE: _ClassVar[ControllerPreset]
DEFAULT: ControllerPreset
LAIRD_BL654: ControllerPreset
CSR_RCK_PTS_DONGLE: ControllerPreset
class ControllerFeatures(_message.Message):
__slots__ = ("le_extended_advertising", "le_periodic_advertising", "ll_privacy", "le_2m_phy", "le_coded_phy", "le_connected_isochronous_stream")
LE_EXTENDED_ADVERTISING_FIELD_NUMBER: _ClassVar[int]
LE_PERIODIC_ADVERTISING_FIELD_NUMBER: _ClassVar[int]
LL_PRIVACY_FIELD_NUMBER: _ClassVar[int]
LE_2M_PHY_FIELD_NUMBER: _ClassVar[int]
LE_CODED_PHY_FIELD_NUMBER: _ClassVar[int]
LE_CONNECTED_ISOCHRONOUS_STREAM_FIELD_NUMBER: _ClassVar[int]
le_extended_advertising: bool
le_periodic_advertising: bool
ll_privacy: bool
le_2m_phy: bool
le_coded_phy: bool
le_connected_isochronous_stream: bool
def __init__(self, le_extended_advertising: bool = ..., le_periodic_advertising: bool = ..., ll_privacy: bool = ..., le_2m_phy: bool = ..., le_coded_phy: bool = ..., le_connected_isochronous_stream: bool = ...) -> None: ...
class ControllerQuirks(_message.Message):
__slots__ = ("send_acl_data_before_connection_complete", "has_default_random_address", "hardware_error_before_reset")
SEND_ACL_DATA_BEFORE_CONNECTION_COMPLETE_FIELD_NUMBER: _ClassVar[int]
HAS_DEFAULT_RANDOM_ADDRESS_FIELD_NUMBER: _ClassVar[int]
HARDWARE_ERROR_BEFORE_RESET_FIELD_NUMBER: _ClassVar[int]
send_acl_data_before_connection_complete: bool
has_default_random_address: bool
hardware_error_before_reset: bool
def __init__(self, send_acl_data_before_connection_complete: bool = ..., has_default_random_address: bool = ..., hardware_error_before_reset: bool = ...) -> None: ...
class VendorFeatures(_message.Message):
__slots__ = ("csr", "android")
CSR_FIELD_NUMBER: _ClassVar[int]
ANDROID_FIELD_NUMBER: _ClassVar[int]
csr: bool
android: bool
def __init__(self, csr: bool = ..., android: bool = ...) -> None: ...
class Controller(_message.Message):
__slots__ = ("preset", "features", "quirks", "strict", "vendor")
PRESET_FIELD_NUMBER: _ClassVar[int]
FEATURES_FIELD_NUMBER: _ClassVar[int]
QUIRKS_FIELD_NUMBER: _ClassVar[int]
STRICT_FIELD_NUMBER: _ClassVar[int]
VENDOR_FIELD_NUMBER: _ClassVar[int]
preset: ControllerPreset
features: ControllerFeatures
quirks: ControllerQuirks
strict: bool
vendor: VendorFeatures
def __init__(self, preset: _Optional[_Union[ControllerPreset, str]] = ..., features: _Optional[_Union[ControllerFeatures, _Mapping]] = ..., quirks: _Optional[_Union[ControllerQuirks, _Mapping]] = ..., strict: bool = ..., vendor: _Optional[_Union[VendorFeatures, _Mapping]] = ...) -> None: ...
class TcpServer(_message.Message):
__slots__ = ("tcp_port", "configuration")
TCP_PORT_FIELD_NUMBER: _ClassVar[int]
CONFIGURATION_FIELD_NUMBER: _ClassVar[int]
tcp_port: int
configuration: Controller
def __init__(self, tcp_port: _Optional[int] = ..., configuration: _Optional[_Union[Controller, _Mapping]] = ...) -> None: ...
class Configuration(_message.Message):
__slots__ = ("tcp_server",)
TCP_SERVER_FIELD_NUMBER: _ClassVar[int]
tcp_server: _containers.RepeatedCompositeFieldContainer[TcpServer]
def __init__(self, tcp_server: _Optional[_Iterable[_Union[TcpServer, _Mapping]]] = ...) -> None: ...

View File

@@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: startup.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import common_pb2 as common__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rstartup.proto\x12\x0enetsim.startup\x1a\x0c\x63ommon.proto\"\x7f\n\x0bStartupInfo\x12\x33\n\x07\x64\x65vices\x18\x01 \x03(\x0b\x32\".netsim.startup.StartupInfo.Device\x1a;\n\x06\x44\x65vice\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x05\x63hips\x18\x02 \x03(\x0b\x32\x14.netsim.startup.Chip\"<\n\x08\x43hipInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\"\n\x04\x63hip\x18\x02 \x01(\x0b\x32\x14.netsim.startup.Chip\"\x96\x01\n\x04\x43hip\x12%\n\x04kind\x18\x01 \x01(\x0e\x32\x17.netsim.common.ChipKind\x12\n\n\x02id\x18\x02 \x01(\t\x12\x14\n\x0cmanufacturer\x18\x03 \x01(\t\x12\x14\n\x0cproduct_name\x18\x04 \x01(\t\x12\r\n\x05\x66\x64_in\x18\x05 \x01(\x05\x12\x0e\n\x06\x66\x64_out\x18\x06 \x01(\x05\x12\x10\n\x08loopback\x18\x07 \x01(\x08\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'startup_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_STARTUPINFO._serialized_start=47
_STARTUPINFO._serialized_end=174
_STARTUPINFO_DEVICE._serialized_start=115
_STARTUPINFO_DEVICE._serialized_end=174
_CHIPINFO._serialized_start=176
_CHIPINFO._serialized_end=236
_CHIP._serialized_start=239
_CHIP._serialized_end=389
# @@protoc_insertion_point(module_scope)

View File

@@ -1,46 +0,0 @@
from . import common_pb2 as _common_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class Chip(_message.Message):
__slots__ = ["fd_in", "fd_out", "id", "kind", "loopback", "manufacturer", "product_name"]
FD_IN_FIELD_NUMBER: _ClassVar[int]
FD_OUT_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
KIND_FIELD_NUMBER: _ClassVar[int]
LOOPBACK_FIELD_NUMBER: _ClassVar[int]
MANUFACTURER_FIELD_NUMBER: _ClassVar[int]
PRODUCT_NAME_FIELD_NUMBER: _ClassVar[int]
fd_in: int
fd_out: int
id: str
kind: _common_pb2.ChipKind
loopback: bool
manufacturer: str
product_name: str
def __init__(self, kind: _Optional[_Union[_common_pb2.ChipKind, str]] = ..., id: _Optional[str] = ..., manufacturer: _Optional[str] = ..., product_name: _Optional[str] = ..., fd_in: _Optional[int] = ..., fd_out: _Optional[int] = ..., loopback: bool = ...) -> None: ...
class ChipInfo(_message.Message):
__slots__ = ["chip", "name"]
CHIP_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
chip: Chip
name: str
def __init__(self, name: _Optional[str] = ..., chip: _Optional[_Union[Chip, _Mapping]] = ...) -> None: ...
class StartupInfo(_message.Message):
__slots__ = ["devices"]
class Device(_message.Message):
__slots__ = ["chips", "name"]
CHIPS_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
chips: _containers.RepeatedCompositeFieldContainer[Chip]
name: str
def __init__(self, name: _Optional[str] = ..., chips: _Optional[_Iterable[_Union[Chip, _Mapping]]] = ...) -> None: ...
DEVICES_FIELD_NUMBER: _ClassVar[int]
devices: _containers.RepeatedCompositeFieldContainer[StartupInfo.Device]
def __init__(self, devices: _Optional[_Iterable[_Union[StartupInfo.Device, _Mapping]]] = ...) -> None: ...

View File

@@ -149,7 +149,10 @@ async def open_usb_transport(spec: str) -> Transport:
if status != usb1.TRANSFER_COMPLETED:
logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red')
color(
f'!!! OUT transfer not completed: status={status}',
'red',
)
)
async def process_queue(self):
@@ -275,7 +278,10 @@ async def open_usb_transport(spec: str) -> Transport:
)
else:
logger.warning(
color(f'!!! IN transfer not completed: status={status}', 'red')
color(
f'!!! IN[{packet_type}] transfer not completed: status={status}',
'red',
)
)
self.loop.call_soon_threadsafe(self.on_transport_lost)

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Dict, Optional, Type
from bumble.hci import (
name_or_number,
@@ -24,7 +25,9 @@ from bumble.hci import (
HCI_Constant,
HCI_Object,
HCI_Command,
HCI_Vendor_Event,
HCI_Event,
HCI_Extended_Event,
HCI_VENDOR_EVENT,
STATUS_SPEC,
)
@@ -48,7 +51,6 @@ HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F)
HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58
HCI_Command.register_commands(globals())
HCI_Vendor_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
@@ -279,7 +281,29 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Vendor_Event.event(
class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
subevent_code = parameters[0]
if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT:
quality_report_id = parameters[1]
if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09):
return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters)
return None
HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters
# -----------------------------------------------------------------------------
@HCI_Extended_Event.event(
fields=[
('quality_report_id', 1),
('packet_types', 1),
@@ -308,10 +332,11 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
('tx_last_subevent_packets', 4),
('crc_error_packets', 4),
('rx_duplicate_packets', 4),
('rx_unreceived_packets', 4),
('vendor_specific_parameters', '*'),
]
)
class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event):
class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event

View File

@@ -11,32 +11,44 @@ Usage: bumble-bench [OPTIONS] COMMAND [ARGS]...
Options:
--device-config FILENAME Device configuration file
--role [sender|receiver|ping|pong]
--scenario [send|receive|ping|pong]
--mode [gatt-client|gatt-server|l2cap-client|l2cap-server|rfcomm-client|rfcomm-server]
--att-mtu MTU GATT MTU (gatt-client mode) [23<=x<=517]
--extended-data-length TEXT Request a data length upon connection,
specified as tx_octets/tx_time
--rfcomm-channel INTEGER RFComm channel to use
--role-switch [central|peripheral]
Request role switch upon connection (central
or peripheral)
--rfcomm-channel INTEGER RFComm channel to use (specify 0 for channel
discovery via SDP)
--rfcomm-uuid TEXT RFComm service UUID to use (ignored if
--rfcomm-channel is not 0)
--rfcomm-l2cap-mtu INTEGER RFComm L2CAP MTU
--rfcomm-max-frame-size INTEGER
RFComm maximum frame size
--rfcomm-initial-credits INTEGER
RFComm initial credits
--rfcomm-max-credits INTEGER RFComm max credits
--rfcomm-credits-threshold INTEGER
RFComm credits threshold
--l2cap-psm INTEGER L2CAP PSM to use
--l2cap-mtu INTEGER L2CAP MTU to use
--l2cap-mps INTEGER L2CAP MPS to use
--l2cap-max-credits INTEGER L2CAP maximum number of credits allowed for
the peer
-s, --packet-size SIZE Packet size (client or ping role)
[8<=x<=4096]
-c, --packet-count COUNT Packet count (client or ping role)
-sd, --start-delay SECONDS Start delay (client or ping role)
--repeat N Repeat the run N times (client and ping
roles)(0, which is the fault, to run just
-s, --packet-size SIZE Packet size (send or ping scenario)
[8<=x<=8192]
-c, --packet-count COUNT Packet count (send or ping scenario)
-sd, --start-delay SECONDS Start delay (send or ping scenario)
--repeat N Repeat the run N times (send and ping
scenario)(0, which is the fault, to run just
once)
--repeat-delay SECONDS Delay, in seconds, between repeats
--pace MILLISECONDS Wait N milliseconds between packets (0,
which is the fault, to send as fast as
possible)
--linger Don't exit at the end of a run (server and
pong roles)
--linger Don't exit at the end of a run (receive and
pong scenarios)
--help Show this message and exit.
Commands:
@@ -71,19 +83,19 @@ using the ``--peripheral`` option. The address will be printed by the Peripheral
it starts.
Independently of whether the device is the Central or Peripheral, each device selects a
``mode`` and and ``role`` to run as. The ``mode`` and ``role`` of the Central and Peripheral
``mode`` and and ``scenario`` to run as. The ``mode`` and ``scenario`` of the Central and Peripheral
must be compatible.
Device 1 mode | Device 2 mode
Device 1 scenario | Device 2 scenario
------------------|------------------
``gatt-client`` | ``gatt-server``
``l2cap-client`` | ``l2cap-server``
``rfcomm-client`` | ``rfcomm-server``
Device 1 role | Device 2 role
--------------|--------------
``sender`` | ``receiver``
``ping`` | ``pong``
Device 1 scenario | Device 2 scenario
------------------|--------------
``send`` | ``receive``
``ping`` | ``pong``
# Examples
@@ -92,7 +104,7 @@ In the following examples, we have two USB Bluetooth controllers, one on `usb:0`
the other on `usb:1`, and two consoles/terminals. We will run a command in each.
!!! example "GATT Throughput"
Using the default mode and role for the Central and Peripheral.
Using the default mode and scenario for the Central and Peripheral.
In the first console/terminal:
```
@@ -137,12 +149,12 @@ the other on `usb:1`, and two consoles/terminals. We will run a command in each.
!!! example "Ping/Pong Latency"
In the first console/terminal:
```
$ bumble-bench --role pong peripheral usb:0
$ bumble-bench --scenario pong peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench --role ping central usb:1
$ bumble-bench --scenario ping central usb:1
```
!!! example "Reversed modes with GATT and custom connection interval"
@@ -167,13 +179,13 @@ the other on `usb:1`, and two consoles/terminals. We will run a command in each.
$ bumble-bench --mode l2cap-server central --phy 2m usb:1
```
!!! example "Reversed roles with L2CAP"
!!! example "Reversed scenarios with L2CAP"
In the first console/terminal:
```
$ bumble-bench --mode l2cap-client --role sender peripheral usb:0
$ bumble-bench --mode l2cap-client --scenario send peripheral usb:0
```
In the second console/terminal:
```
$ bumble-bench --mode l2cap-server --role receiver central usb:1
$ bumble-bench --mode l2cap-server --scenario receive central usb:1
```

View File

@@ -16,4 +16,5 @@ USB vendor ID and product ID.
Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Intel](intel.md): Loading of Firmware and Config for Intel USB controllers.

View File

@@ -0,0 +1,73 @@
INTEL DRIVER
==============
This driver supports loading firmware images and optional config data to
Intel USB controllers.
A number of USB dongles are supported, but likely not all.
The initial implementation has been tested on BE200 and AX210 controllers.
When using a USB controller, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=intel`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for the firmware and config files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR`
if set.
* The directory `<package-dir>/drivers/intel_fw` where `<package-dir>` is the directory
where the `bumble` package is installed.
* The current directory.
It is also possible to override or extend the config data with parameters passed via the
transport name. The driver name `intel` may be suffixed with `/<param:value>[+<param:value>]...`
The supported params are:
* `ddc_addon`: configuration data to add to the data loaded from the config data file
* `ddc_override`: configuration data to use instead of the data loaded from the config data file.
With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing
the config data (same format as the config file).
Example transport name:
`usb:[driver=intel/dcc_addon:03E40200]0`
Obtaining Firmware Images and Config Data
-----------------------------------------
Firmware images and config data may be obtained from a variety of online
sources.
To facilitate finding a downloading the, the utility program `bumble-intel-fw-download`
may be used.
```
Usage: bumble-intel-fw-download [OPTIONS]
Download Intel firmware images and configs.
Options:
--output-dir TEXT Output directory where the files will be saved.
Defaults to the OS-specific app data dir, which the
driver will check when trying to find firmware
--source [linux-kernel] [default: linux-kernel]
--single TEXT Only download a single image set, by its base name
--force Overwrite files if they already exist
--help Show this message and exit.
```
Utility
-------
The `bumble-intel-util` utility may be used to interact with an Intel USB controller.
```
Usage: bumble-intel-util [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
bootloader Reboot in bootloader mode.
info Get the firmware info.
load Load a firmware image.
```

View File

@@ -3,9 +3,7 @@ GETTING STARTED WITH BUMBLE
# Prerequisites
You need Python 3.8 or above. Python >= 3.9 is recommended, but 3.8 should be sufficient if
necessary (there may be some optional functionality that will not work on some platforms with
python 3.8).
You need Python 3.9 or above.
Visit the [Python site](https://www.python.org/) for instructions on how to install Python
for your platform.
Throughout the documentation, when shell commands are shown, it is assumed that you can

View File

@@ -31,7 +31,7 @@ Some of the configurations that may be useful:
See the [use cases page](use_cases/index.md) for more use cases.
The project is implemented in Python (Python >= 3.8 is required). A number of APIs for functionality that is inherently I/O bound is implemented in terms of python coroutines with async IO. This means that all of the concurrent tasks run in the same thread, which makes everything much simpler and more predictable.
The project is implemented in Python (Python >= 3.9 is required). A number of APIs for functionality that is inherently I/O bound is implemented in terms of python coroutines with async IO. This means that all of the concurrent tasks run in the same thread, which makes everything much simpler and more predictable.
![layers](images/bumble_layers.svg)

View File

@@ -1,7 +1,7 @@
PLATFORMS
=========
Most of the code included in the project should run on any platform that supports Python >= 3.8. Not all features are supported on all platforms (for example, USB dongle support is only available on platforms where the python USB library is functional).
Most of the code included in the project should run on any platform that supports Python >= 3.9. Not all features are supported on all platforms (for example, USB dongle support is only available on platforms where the python USB library is functional).
For platform-specific information, see the following pages:

View File

@@ -4,6 +4,6 @@ channels:
- conda-forge
dependencies:
- pip=23
- python=3.8
- python=3.9
- pip:
- --editable .[development,documentation,test]

View File

@@ -0,0 +1,47 @@
from mobly import base_test
from mobly import test_runner
from mobly.controllers import android_device
class OneDeviceBenchTest(base_test.BaseTestClass):
def setup_class(self):
self.ads = self.register_controller(android_device)
self.dut = self.ads[0]
self.dut.load_snippet("bench", "com.github.google.bumble.btbench")
def test_rfcomm_client_ping(self):
runner = self.dut.bench.runRfcommClient(
"ping", "DC:E5:5B:E5:51:2C", 100, 970, 100
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
def test_rfcomm_client_send(self):
runner = self.dut.bench.runRfcommClient(
"send", "DC:E5:5B:E5:51:2C", 100, 970, 0
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
def test_l2cap_client_ping(self):
runner = self.dut.bench.runL2capClient(
"ping", "4B:2A:67:76:2B:E3", 128, True, 100, 970, 100
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
def test_l2cap_client_send(self):
runner = self.dut.bench.runL2capClient(
"send", "7E:90:D0:F2:7A:11", 131, True, 100, 970, 0
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
if __name__ == "__main__":
test_runner.main()

View File

@@ -0,0 +1,9 @@
TestBeds:
- Name: BenchTestBed
Controllers:
AndroidDevice:
- serial: 37211FDJG000DJ
local_bt_address: 94:45:60:5E:03:B0
- serial: 23071FDEE001F7
local_bt_address: DC:E5:5B:E5:51:2C

View File

@@ -0,0 +1,38 @@
import time
from mobly import base_test
from mobly import test_runner
from mobly.controllers import android_device
class TwoDevicesBenchTest(base_test.BaseTestClass):
def setup_class(self):
self.ads = self.register_controller(android_device)
self.dut1 = self.ads[0]
self.dut1.load_snippet("bench", "com.github.google.bumble.btbench")
self.dut2 = self.ads[1]
self.dut2.load_snippet("bench", "com.github.google.bumble.btbench")
def test_rfcomm_client_send_receive(self):
print("### Starting Receiver")
receiver = self.dut2.bench.runRfcommServer("receive")
receiver_id = receiver["id"]
print("--- Receiver status:", receiver)
while not receiver["model"]["running"]:
print("--- Waiting for Receiver to be running...")
time.sleep(1)
receiver = self.dut2.bench.getRunner(receiver_id)
print("### Starting Sender")
sender = self.dut1.bench.runRfcommClient(
"send", "DC:E5:5B:E5:51:2C", 100, 970, 100
)
print("--- Sender status:", sender)
print("--- Waiting for Sender to complete...")
sender_result = self.dut1.bench.waitForRunnerCompletion(sender["id"])
print("--- Sender result:", sender_result)
if __name__ == "__main__":
test_runner.main()

View File

@@ -64,6 +64,7 @@ async def main() -> None:
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))]
)
# pylint: disable=possibly-used-before-assignment
if device.host.number_of_supported_advertising_sets >= 2:
set2 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F1"),

View File

@@ -57,6 +57,9 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S4
]
else:
raise RuntimeError("unknown active codec")
connection.abort_on(
'disconnection',
connection.device.send_command(

View File

@@ -60,6 +60,8 @@ dependencies {
implementation(libs.ui.graphics)
implementation(libs.ui.tooling.preview)
implementation(libs.material3)
implementation(libs.mobly.snippet)
implementation(libs.androidx.core)
testImplementation(libs.junit)
androidTestImplementation(libs.androidx.test.ext.junit)
androidTestImplementation(libs.espresso.core)

View File

@@ -23,6 +23,9 @@
android:supportsRtl="true"
android:theme="@style/Theme.BTBench"
>
<meta-data
android:name="mobly-snippets"
android:value="com.github.google.bumble.btbench.AutomationSnippet"/>
<activity
android:name=".MainActivity"
android:exported="true"
@@ -35,5 +38,7 @@
</activity>
<!-- <profileable android:shell="true"/>-->
</application>
</manifest>
<instrumentation
android:name="com.google.android.mobly.snippet.SnippetRunner"
android:targetPackage="com.github.google.bumble.btbench" />
</manifest>

View File

@@ -0,0 +1,289 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench;
import android.bluetooth.BluetoothAdapter;
import android.bluetooth.BluetoothManager;
import android.content.Context;
import androidx.test.core.app.ApplicationProvider;
import com.google.android.mobly.snippet.Snippet;
import com.google.android.mobly.snippet.rpc.Rpc;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.UUID;
class Runner {
public UUID mId;
private final Mode mMode;
private final String mModeName;
private final String mScenario;
private final AppViewModel mModel;
Runner(Mode mode, String modeName, String scenario, AppViewModel model) {
this.mId = UUID.randomUUID();
this.mMode = mode;
this.mModeName = modeName;
this.mScenario = scenario;
this.mModel = model;
}
public JSONObject toJson() throws JSONException {
JSONObject result = new JSONObject();
result.put("id", mId.toString());
result.put("mode", mModeName);
result.put("scenario", mScenario);
result.put("model", AutomationSnippet.modelToJson(mModel));
return result;
}
public void stop() {
mModel.abort();
}
public void waitForCompletion() {
mMode.waitForCompletion();
}
}
public class AutomationSnippet implements Snippet {
private static final String TAG = "btbench.snippet";
private final BluetoothAdapter mBluetoothAdapter;
private final Context mContext;
private final ArrayList<Runner> mRunners = new ArrayList<>();
public AutomationSnippet() {
mContext = ApplicationProvider.getApplicationContext();
BluetoothManager bluetoothManager = mContext.getSystemService(BluetoothManager.class);
mBluetoothAdapter = bluetoothManager.getAdapter();
if (mBluetoothAdapter == null) {
throw new RuntimeException("bluetooth not supported");
}
}
private Runner runScenario(AppViewModel model, String mode, String scenario) {
Mode runnable;
switch (mode) {
case "rfcomm-client":
runnable = new RfcommClient(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "rfcomm-server":
runnable = new RfcommServer(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "l2cap-client":
runnable = new L2capClient(model, mBluetoothAdapter, mContext,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "l2cap-server":
runnable = new L2capServer(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
default:
return null;
}
runnable.run();
Runner runner = new Runner(runnable, mode, scenario, model);
mRunners.add(runner);
return runner;
}
private IoClient createIoClient(AppViewModel model, String scenario, PacketIO packetIO) {
switch (scenario) {
case "send":
return new Sender(model, packetIO);
case "receive":
return new Receiver(model, packetIO);
case "ping":
return new Pinger(model, packetIO);
case "pong":
return new Ponger(model, packetIO);
default:
return null;
}
}
public static JSONObject modelToJson(AppViewModel model) throws JSONException {
JSONObject result = new JSONObject();
result.put("status", model.getStatus());
result.put("running", model.getRunning());
result.put("l2cap_psm", model.getL2capPsm());
if (model.getStatus().equals("OK")) {
JSONObject stats = new JSONObject();
result.put("stats", stats);
stats.put("throughput", model.getThroughput());
JSONObject rttStats = new JSONObject();
stats.put("rtt", rttStats);
rttStats.put("compound", model.getStats());
} else {
result.put("last_error", model.getLastError());
}
return result;
}
private Runner findRunner(String runnerId) {
for (Runner runner : mRunners) {
if (runner.mId.toString().equals(runnerId)) {
return runner;
}
}
return null;
}
@Rpc(description = "Run a scenario in RFComm Client mode")
public JSONObject runRfcommClient(String scenario, String peerBluetoothAddress, int packetCount,
int packetSize, int packetInterval) throws JSONException {
assert (mBluetoothAdapter != null);
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException("only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
model.setPeerBluetoothAddress(peerBluetoothAddress);
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
Runner runner = runScenario(model, "rfcomm-client", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in RFComm Server mode")
public JSONObject runRfcommServer(String scenario) throws JSONException {
assert (mBluetoothAdapter != null);
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException("only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
Runner runner = runScenario(model, "rfcomm-server", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in L2CAP Client mode")
public JSONObject runL2capClient(String scenario, String peerBluetoothAddress, int psm,
boolean use_2m_phy, int packetCount, int packetSize,
int packetInterval) throws JSONException {
assert (mBluetoothAdapter != null);
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException("only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
model.setPeerBluetoothAddress(peerBluetoothAddress);
model.setL2capPsm(psm);
model.setUse2mPhy(use_2m_phy);
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
Runner runner = runScenario(model, "l2cap-client", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in L2CAP Server mode")
public JSONObject runL2capServer(String scenario) throws JSONException {
assert (mBluetoothAdapter != null);
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException("only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
Runner runner = runScenario(model, "l2cap-server", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Stop a Runner")
public JSONObject stopRunner(String runnerId) throws JSONException {
Runner runner = findRunner(runnerId);
if (runner == null) {
return new JSONObject();
}
runner.stop();
return runner.toJson();
}
@Rpc(description = "Wait for a Runner to complete")
public JSONObject waitForRunnerCompletion(String runnerId) throws JSONException {
Runner runner = findRunner(runnerId);
if (runner == null) {
return new JSONObject();
}
runner.waitForCompletion();
return runner.toJson();
}
@Rpc(description = "Get a Runner by ID")
public JSONObject getRunner(String runnerId) throws JSONException {
Runner runner = findRunner(runnerId);
if (runner == null) {
return new JSONObject();
}
return runner.toJson();
}
@Rpc(description = "Get all Runners")
public JSONObject getRunners() throws JSONException {
JSONObject result = new JSONObject();
JSONArray runners = new JSONArray();
result.put("runners", runners);
for (Runner runner: mRunners) {
runners.put(runner.toJson());
}
return result;
}
@Override
public void shutdown() {
}
}

View File

@@ -0,0 +1,20 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
interface IoClient {
fun run()
fun abort()
}

View File

@@ -29,10 +29,13 @@ private val Log = Logger.getLogger("btbench.l2cap-client")
class L2capClient(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val context: Context
) {
private val context: Context,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var socketClient: SocketClient? = null
@SuppressLint("MissingPermission")
fun run() {
override fun run() {
viewModel.running = true
val addressIsPublic = viewModel.peerBluetoothAddress.endsWith("/P")
val address = viewModel.peerBluetoothAddress.take(17)
@@ -75,6 +78,7 @@ class L2capClient(
) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
if (viewModel.use2mPhy) {
Log.info("requesting 2M PHY")
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
@@ -95,7 +99,11 @@ class L2capClient(
val socket = remoteDevice.createInsecureL2capChannel(viewModel.l2capPsm)
val client = SocketClient(viewModel, socket)
client.run()
socketClient = SocketClient(viewModel, socket, createIoClient)
socketClient!!.run()
}
}
override fun waitForCompletion() {
socketClient?.waitForCompletion()
}
}

View File

@@ -27,11 +27,17 @@ import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.l2cap-server")
class L2capServer(private val viewModel: AppViewModel, private val bluetoothAdapter: BluetoothAdapter) {
class L2capServer(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var socketServer: SocketServer? = null
@SuppressLint("MissingPermission")
fun run() {
override fun run() {
// Advertise so that the peer can find us and connect.
val callback = object: AdvertiseCallback() {
val callback = object : AdvertiseCallback() {
override fun onStartFailure(errorCode: Int) {
Log.warning("failed to start advertising: $errorCode")
}
@@ -55,7 +61,14 @@ class L2capServer(private val viewModel: AppViewModel, private val bluetoothAdap
viewModel.l2capPsm = serverSocket.psm
Log.info("psm = $serverSocket.psm")
val server = SocketServer(viewModel, serverSocket)
server.run({ advertiser.stopAdvertising(callback) }, { advertiser.startAdvertising(advertiseSettings, advertiseData, scanData, callback) })
socketServer = SocketServer(viewModel, serverSocket, createIoClient)
socketServer!!.run(
{ advertiser.stopAdvertising(callback) },
{ advertiser.startAdvertising(advertiseSettings, advertiseData, scanData, callback) }
)
}
}
override fun waitForCompletion() {
socketServer?.waitForCompletion()
}
}

View File

@@ -34,12 +34,15 @@ import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.rememberScrollState
import androidx.compose.foundation.selection.selectable
import androidx.compose.foundation.selection.selectableGroup
import androidx.compose.foundation.text.KeyboardActions
import androidx.compose.foundation.text.KeyboardOptions
import androidx.compose.foundation.verticalScroll
import androidx.compose.material3.Button
import androidx.compose.material3.Divider
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.RadioButton
import androidx.compose.material3.Slider
import androidx.compose.material3.Surface
import androidx.compose.material3.Switch
@@ -54,6 +57,7 @@ import androidx.compose.ui.focus.FocusRequester
import androidx.compose.ui.focus.focusRequester
import androidx.compose.ui.platform.LocalFocusManager
import androidx.compose.ui.platform.LocalSoftwareKeyboardController
import androidx.compose.ui.semantics.Role
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.text.input.ImeAction
import androidx.compose.ui.text.input.KeyboardType
@@ -69,6 +73,9 @@ private val Log = Logger.getLogger("bumble.main-activity")
const val PEER_BLUETOOTH_ADDRESS_PREF_KEY = "peer_bluetooth_address"
const val SENDER_PACKET_COUNT_PREF_KEY = "sender_packet_count"
const val SENDER_PACKET_SIZE_PREF_KEY = "sender_packet_size"
const val SENDER_PACKET_INTERVAL_PREF_KEY = "sender_packet_interval"
const val SCENARIO_PREF_KEY = "scenario"
const val MODE_PREF_KEY = "mode"
class MainActivity : ComponentActivity() {
private val appViewModel = AppViewModel()
@@ -139,10 +146,7 @@ class MainActivity : ComponentActivity() {
MainView(
appViewModel,
::becomeDiscoverable,
::runRfcommClient,
::runRfcommServer,
::runL2capClient,
::runL2capServer,
::runScenario
)
}
@@ -159,37 +163,54 @@ class MainActivity : ComponentActivity() {
if (packetSize > 0) {
appViewModel.senderPacketSize = packetSize
}
val packetInterval = intent.getIntExtra("packet-interval", 0)
if (packetInterval > 0) {
appViewModel.senderPacketInterval = packetInterval
}
appViewModel.updateSenderPacketSizeSlider()
intent.getStringExtra("scenario")?.let {
when (it) {
"send" -> appViewModel.scenario = SEND_SCENARIO
"receive" -> appViewModel.scenario = RECEIVE_SCENARIO
"ping" -> appViewModel.scenario = PING_SCENARIO
"pong" -> appViewModel.scenario = PONG_SCENARIO
}
}
intent.getStringExtra("mode")?.let {
when (it) {
"rfcomm-client" -> appViewModel.mode = RFCOMM_CLIENT_MODE
"rfcomm-server" -> appViewModel.mode = RFCOMM_SERVER_MODE
"l2cap-client" -> appViewModel.mode = L2CAP_CLIENT_MODE
"l2cap-server" -> appViewModel.mode = L2CAP_SERVER_MODE
}
}
intent.getStringExtra("autostart")?.let {
when (it) {
"rfcomm-client" -> runRfcommClient()
"rfcomm-server" -> runRfcommServer()
"l2cap-client" -> runL2capClient()
"l2cap-server" -> runL2capServer()
"run-scenario" -> runScenario()
"scan-start" -> runScan(true)
"stop-start" -> runScan(false)
}
}
}
private fun runRfcommClient() {
val rfcommClient = bluetoothAdapter?.let { RfcommClient(appViewModel, it) }
rfcommClient?.run()
}
private fun runScenario() {
if (bluetoothAdapter == null) {
return
}
private fun runRfcommServer() {
val rfcommServer = bluetoothAdapter?.let { RfcommServer(appViewModel, it) }
rfcommServer?.run()
}
private fun runL2capClient() {
val l2capClient = bluetoothAdapter?.let { L2capClient(appViewModel, it, baseContext) }
l2capClient?.run()
}
private fun runL2capServer() {
val l2capServer = bluetoothAdapter?.let { L2capServer(appViewModel, it) }
l2capServer?.run()
val runner = when (appViewModel.mode) {
RFCOMM_CLIENT_MODE -> RfcommClient(appViewModel, bluetoothAdapter!!, ::createIoClient)
RFCOMM_SERVER_MODE -> RfcommServer(appViewModel, bluetoothAdapter!!, ::createIoClient)
L2CAP_CLIENT_MODE -> L2capClient(
appViewModel,
bluetoothAdapter!!,
baseContext,
::createIoClient
)
L2CAP_SERVER_MODE -> L2capServer(appViewModel, bluetoothAdapter!!, ::createIoClient)
else -> throw IllegalStateException()
}
runner.run()
}
private fun runScan(startScan: Boolean) {
@@ -197,6 +218,17 @@ class MainActivity : ComponentActivity() {
scan?.run(startScan)
}
private fun createIoClient(packetIo: PacketIO): IoClient {
return when (appViewModel.scenario) {
SEND_SCENARIO -> Sender(appViewModel, packetIo)
RECEIVE_SCENARIO -> Receiver(appViewModel, packetIo)
PING_SCENARIO -> Pinger(appViewModel, packetIo)
PONG_SCENARIO -> Ponger(appViewModel, packetIo)
else -> throw IllegalStateException()
}
}
@SuppressLint("MissingPermission")
fun becomeDiscoverable() {
val discoverableIntent = Intent(BluetoothAdapter.ACTION_REQUEST_DISCOVERABLE)
@@ -210,10 +242,7 @@ class MainActivity : ComponentActivity() {
fun MainView(
appViewModel: AppViewModel,
becomeDiscoverable: () -> Unit,
runRfcommClient: () -> Unit,
runRfcommServer: () -> Unit,
runL2capClient: () -> Unit,
runL2capServer: () -> Unit,
runScenario: () -> Unit,
) {
BTBenchTheme {
val scrollState = rememberScrollState()
@@ -239,7 +268,9 @@ fun MainView(
Text(text = "Peer Bluetooth Address")
},
value = appViewModel.peerBluetoothAddress,
modifier = Modifier.fillMaxWidth().focusRequester(focusRequester),
modifier = Modifier
.fillMaxWidth()
.focusRequester(focusRequester),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Ascii, imeAction = ImeAction.Done
),
@@ -249,14 +280,18 @@ fun MainView(
keyboardActions = KeyboardActions(onDone = {
keyboardController?.hide()
focusManager.clearFocus()
})
}),
enabled = (appViewModel.mode == RFCOMM_CLIENT_MODE) or (appViewModel.mode == L2CAP_CLIENT_MODE)
)
Divider()
TextField(label = {
Text(text = "L2CAP PSM")
},
TextField(
label = {
Text(text = "L2CAP PSM")
},
value = appViewModel.l2capPsm.toString(),
modifier = Modifier.fillMaxWidth().focusRequester(focusRequester),
modifier = Modifier
.fillMaxWidth()
.focusRequester(focusRequester),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Number, imeAction = ImeAction.Done
),
@@ -271,7 +306,8 @@ fun MainView(
keyboardActions = KeyboardActions(onDone = {
keyboardController?.hide()
focusManager.clearFocus()
})
}),
enabled = (appViewModel.mode == L2CAP_CLIENT_MODE)
)
Divider()
Slider(
@@ -290,6 +326,32 @@ fun MainView(
)
Text(text = "Packet Size: " + appViewModel.senderPacketSize.toString())
Divider()
TextField(
label = {
Text(text = "Packet Interval (ms)")
},
value = appViewModel.senderPacketInterval.toString(),
modifier = Modifier
.fillMaxWidth()
.focusRequester(focusRequester),
keyboardOptions = KeyboardOptions.Default.copy(
keyboardType = KeyboardType.Number, imeAction = ImeAction.Done
),
onValueChange = {
if (it.isNotEmpty()) {
val interval = it.toIntOrNull()
if (interval != null) {
appViewModel.updateSenderPacketInterval(interval)
}
}
},
keyboardActions = KeyboardActions(onDone = {
keyboardController?.hide()
focusManager.clearFocus()
}),
enabled = (appViewModel.scenario == PING_SCENARIO)
)
Divider()
ActionButton(
text = "Become Discoverable", onClick = becomeDiscoverable, true
)
@@ -300,25 +362,78 @@ fun MainView(
Text(text = "2M PHY")
Spacer(modifier = Modifier.padding(start = 8.dp))
Switch(
enabled = (appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == L2CAP_SERVER_MODE),
checked = appViewModel.use2mPhy,
onCheckedChange = { appViewModel.use2mPhy = it }
)
}
Row {
ActionButton(
text = "RFCOMM Client", onClick = runRfcommClient, !appViewModel.running
)
ActionButton(
text = "RFCOMM Server", onClick = runRfcommServer, !appViewModel.running
)
Column(Modifier.selectableGroup()) {
listOf(
RFCOMM_CLIENT_MODE,
RFCOMM_SERVER_MODE,
L2CAP_CLIENT_MODE,
L2CAP_SERVER_MODE
).forEach { text ->
Row(
Modifier
.selectable(
selected = (text == appViewModel.mode),
onClick = { appViewModel.updateMode(text) },
role = Role.RadioButton
)
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.mode),
onClick = null
)
Text(
text = text,
style = MaterialTheme.typography.bodyLarge,
modifier = Modifier.padding(start = 16.dp)
)
}
}
}
Column(Modifier.selectableGroup()) {
listOf(
SEND_SCENARIO,
RECEIVE_SCENARIO,
PING_SCENARIO,
PONG_SCENARIO
).forEach { text ->
Row(
Modifier
.selectable(
selected = (text == appViewModel.scenario),
onClick = { appViewModel.updateScenario(text) },
role = Role.RadioButton
)
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.scenario),
onClick = null
)
Text(
text = text,
style = MaterialTheme.typography.bodyLarge,
modifier = Modifier.padding(start = 16.dp)
)
}
}
}
}
Row {
ActionButton(
text = "L2CAP Client", onClick = runL2capClient, !appViewModel.running
text = "Start", onClick = runScenario, enabled = !appViewModel.running
)
ActionButton(
text = "L2CAP Server", onClick = runL2capServer, !appViewModel.running
text = "Stop", onClick = appViewModel::abort, enabled = appViewModel.running
)
}
Divider()
@@ -328,6 +443,12 @@ fun MainView(
Text(
text = if (appViewModel.rxPhy != 0 || appViewModel.txPhy != 0) "PHY: tx=${appViewModel.txPhy}, rx=${appViewModel.rxPhy}" else ""
)
Text(
text = "Status: ${appViewModel.status}"
)
Text(
text = "Last Error: ${appViewModel.lastError}"
)
Text(
text = "Packets Sent: ${appViewModel.packetsSent}"
)
@@ -337,9 +458,8 @@ fun MainView(
Text(
text = "Throughput: ${appViewModel.throughput}"
)
Divider()
ActionButton(
text = "Abort", onClick = appViewModel::abort, appViewModel.running
Text(
text = "Stats: ${appViewModel.stats}"
)
}
}
@@ -351,4 +471,4 @@ fun ActionButton(text: String, onClick: () -> Unit, enabled: Boolean) {
Button(onClick = onClick, enabled = enabled) {
Text(text = text)
}
}
}

View File

@@ -0,0 +1,20 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
interface Mode {
fun run()
fun waitForCompletion()
}

View File

@@ -27,10 +27,25 @@ val DEFAULT_RFCOMM_UUID: UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024
const val DEFAULT_SENDER_PACKET_INTERVAL = 100
const val DEFAULT_PSM = 128
const val L2CAP_CLIENT_MODE = "L2CAP Client"
const val L2CAP_SERVER_MODE = "L2CAP Server"
const val RFCOMM_CLIENT_MODE = "RFCOMM Client"
const val RFCOMM_SERVER_MODE = "RFCOMM Server"
const val SEND_SCENARIO = "Send"
const val RECEIVE_SCENARIO = "Receive"
const val PING_SCENARIO = "Ping"
const val PONG_SCENARIO = "Pong"
class AppViewModel : ViewModel() {
private var preferences: SharedPreferences? = null
var status by mutableStateOf("")
var lastError by mutableStateOf("")
var mode by mutableStateOf(RFCOMM_SERVER_MODE)
var scenario by mutableStateOf(RECEIVE_SCENARIO)
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var l2capPsm by mutableIntStateOf(DEFAULT_PSM)
var use2mPhy by mutableStateOf(true)
@@ -41,9 +56,11 @@ class AppViewModel : ViewModel() {
var senderPacketSizeSlider by mutableFloatStateOf(0.0F)
var senderPacketCount by mutableIntStateOf(DEFAULT_SENDER_PACKET_COUNT)
var senderPacketSize by mutableIntStateOf(DEFAULT_SENDER_PACKET_SIZE)
var senderPacketInterval by mutableIntStateOf(DEFAULT_SENDER_PACKET_INTERVAL)
var packetsSent by mutableIntStateOf(0)
var packetsReceived by mutableIntStateOf(0)
var throughput by mutableIntStateOf(0)
var stats by mutableStateOf("")
var running by mutableStateOf(false)
var aborter: (() -> Unit)? = null
@@ -66,6 +83,21 @@ class AppViewModel : ViewModel() {
senderPacketSize = savedSenderPacketSize
}
updateSenderPacketSizeSlider()
val savedSenderPacketInterval = preferences.getInt(SENDER_PACKET_INTERVAL_PREF_KEY, -1)
if (savedSenderPacketInterval != -1) {
senderPacketInterval = savedSenderPacketInterval
}
val savedMode = preferences.getString(MODE_PREF_KEY, null)
if (savedMode != null) {
mode = savedMode
}
val savedScenario = preferences.getString(SCENARIO_PREF_KEY, null)
if (savedScenario != null) {
scenario = savedScenario
}
}
fun updatePeerBluetoothAddress(peerBluetoothAddress: String) {
@@ -164,6 +196,42 @@ class AppViewModel : ViewModel() {
}
}
fun updateSenderPacketInterval(senderPacketInterval: Int) {
this.senderPacketInterval = senderPacketInterval
with(preferences!!.edit()) {
putInt(SENDER_PACKET_INTERVAL_PREF_KEY, senderPacketInterval)
apply()
}
}
fun updateScenario(scenario: String) {
this.scenario = scenario
with(preferences!!.edit()) {
putString(SCENARIO_PREF_KEY, scenario)
apply()
}
}
fun updateMode(mode: String) {
this.mode = mode
with(preferences!!.edit()) {
putString(MODE_PREF_KEY, mode)
apply()
}
}
fun clear() {
status = ""
lastError = ""
mtu = 0
rxPhy = 0
txPhy = 0
packetsSent = 0
packetsReceived = 0
throughput = 0
stats = ""
}
fun abort() {
aborter?.let { it() }
}

View File

@@ -74,13 +74,13 @@ abstract class PacketSink {
fun onPacket(packet: Packet) {
when (packet) {
is ResetPacket -> onResetPacket()
is AckPacket -> onAckPacket()
is AckPacket -> onAckPacket(packet)
is SequencePacket -> onSequencePacket(packet)
}
}
abstract fun onResetPacket()
abstract fun onAckPacket()
abstract fun onAckPacket(packet: AckPacket)
abstract fun onSequencePacket(packet: SequencePacket)
}
@@ -175,4 +175,4 @@ class SocketDataSource(
} while (true)
Log.info("end of stream")
}
}
}

View File

@@ -0,0 +1,104 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.TimeSource
private const val DEFAULT_STARTUP_DELAY = 3000
private val Log = Logger.getLogger("btbench.pinger")
class Pinger(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient,
PacketSink() {
private val pingTimes: ArrayList<TimeSource.Monotonic.ValueTimeMark> = ArrayList()
private val rtts: ArrayList<Long> = ArrayList()
private val done = Semaphore(0)
init {
packetIO.packetSink = this
}
override fun run() {
viewModel.clear()
Log.info("startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("running")
Log.info("sending reset")
packetIO.sendPacket(ResetPacket())
val packetCount = viewModel.senderPacketCount
val packetSize = viewModel.senderPacketSize
val startTime = TimeSource.Monotonic.markNow()
for (i in 0..<packetCount) {
val now = TimeSource.Monotonic.markNow()
val targetTime = startTime + (i * viewModel.senderPacketInterval).milliseconds
val delay = targetTime - now
if (delay.isPositive()) {
Log.info("sleeping ${delay.inWholeMilliseconds} ms")
Thread.sleep(delay.inWholeMilliseconds)
}
pingTimes.add(TimeSource.Monotonic.markNow())
packetIO.sendPacket(
SequencePacket(
if (i < packetCount - 1) 0 else Packet.LAST_FLAG,
i,
ByteArray(packetSize - 6)
)
)
viewModel.packetsSent = i + 1
}
// Wait for the last ACK
Log.info("waiting for last ACK")
done.acquire()
Log.info("got last ACK")
}
override fun abort() {
done.release()
}
override fun onResetPacket() {
}
override fun onAckPacket(packet: AckPacket) {
val now = TimeSource.Monotonic.markNow()
viewModel.packetsReceived += 1
if (packet.sequenceNumber < pingTimes.size) {
val rtt = (now - pingTimes[packet.sequenceNumber]).inWholeMilliseconds
rtts.add(rtt)
Log.info("received ACK ${packet.sequenceNumber}, RTT=$rtt")
} else {
Log.warning("received ACK with unexpected sequence ${packet.sequenceNumber}")
}
if (packet.flags and Packet.LAST_FLAG != 0) {
Log.info("last packet received")
val stats = "RTTs: min=${rtts.min()}, max=${rtts.max()}, avg=${rtts.sum() / rtts.size}"
Log.info(stats)
viewModel.stats = stats
done.release()
}
}
override fun onSequencePacket(packet: SequencePacket) {
}
}

View File

@@ -0,0 +1,62 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import java.util.logging.Logger
import kotlin.time.TimeSource
private val Log = Logger.getLogger("btbench.receiver")
class Ponger(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient, PacketSink() {
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var lastPacketTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var expectedSequenceNumber: Int = 0
init {
packetIO.packetSink = this
}
override fun run() {
viewModel.clear()
}
override fun abort() {}
override fun onResetPacket() {
startTime = TimeSource.Monotonic.markNow()
lastPacketTime = startTime
expectedSequenceNumber = 0
viewModel.packetsSent = 0
viewModel.packetsReceived = 0
viewModel.stats = ""
}
override fun onAckPacket(packet: AckPacket) {
}
override fun onSequencePacket(packet: SequencePacket) {
val now = TimeSource.Monotonic.markNow()
lastPacketTime = now
viewModel.packetsReceived += 1
if (packet.sequenceNumber != expectedSequenceNumber) {
Log.warning("unexpected packet sequence number (expected ${expectedSequenceNumber}, got ${packet.sequenceNumber})")
}
expectedSequenceNumber += 1
packetIO.sendPacket(AckPacket(packet.flags, packet.sequenceNumber))
viewModel.packetsSent += 1
}
}

View File

@@ -20,7 +20,7 @@ import kotlin.time.TimeSource
private val Log = Logger.getLogger("btbench.receiver")
class Receiver(private val viewModel: AppViewModel, private val packetIO: PacketIO) : PacketSink() {
class Receiver(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient, PacketSink() {
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var lastPacketTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var bytesReceived = 0
@@ -29,6 +29,12 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
packetIO.packetSink = this
}
override fun run() {
viewModel.clear()
}
override fun abort() {}
override fun onResetPacket() {
startTime = TimeSource.Monotonic.markNow()
lastPacketTime = startTime
@@ -36,9 +42,10 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
viewModel.throughput = 0
viewModel.packetsSent = 0
viewModel.packetsReceived = 0
viewModel.stats = ""
}
override fun onAckPacket() {
override fun onAckPacket(packet: AckPacket) {
}

View File

@@ -16,22 +16,30 @@ package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.rfcomm-client")
class RfcommClient(private val viewModel: AppViewModel, val bluetoothAdapter: BluetoothAdapter) {
class RfcommClient(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var socketClient: SocketClient? = null
@SuppressLint("MissingPermission")
fun run() {
override fun run() {
val address = viewModel.peerBluetoothAddress.take(17)
val remoteDevice = bluetoothAdapter.getRemoteDevice(address)
val socket = remoteDevice.createInsecureRfcommSocketToServiceRecord(
DEFAULT_RFCOMM_UUID
)
val client = SocketClient(viewModel, socket)
client.run()
socketClient = SocketClient(viewModel, socket, createIoClient)
socketClient!!.run()
}
override fun waitForCompletion() {
socketClient?.waitForCompletion()
}
}

View File

@@ -16,20 +16,27 @@ package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import java.io.IOException
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.rfcomm-server")
class RfcommServer(private val viewModel: AppViewModel, val bluetoothAdapter: BluetoothAdapter) {
class RfcommServer(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var socketServer: SocketServer? = null
@SuppressLint("MissingPermission")
fun run() {
override fun run() {
val serverSocket = bluetoothAdapter.listenUsingInsecureRfcommWithServiceRecord(
"BumbleBench", DEFAULT_RFCOMM_UUID
)
val server = SocketServer(viewModel, serverSocket)
server.run({}, {})
socketServer = SocketServer(viewModel, serverSocket, createIoClient)
socketServer!!.run({}, {})
}
}
override fun waitForCompletion() {
socketServer?.waitForCompletion()
}
}

View File

@@ -35,4 +35,4 @@ class Scan(val bluetoothAdapter: BluetoothAdapter) {
bluetoothLeScanner?.stopScan(scanCallback)
}
}
}
}

View File

@@ -19,9 +19,12 @@ import java.util.logging.Logger
import kotlin.time.DurationUnit
import kotlin.time.TimeSource
private const val DEFAULT_STARTUP_DELAY = 3000
private val Log = Logger.getLogger("btbench.sender")
class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO) : PacketSink() {
class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient,
PacketSink() {
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var bytesSent = 0
private val done = Semaphore(0)
@@ -30,10 +33,12 @@ class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO
packetIO.packetSink = this
}
fun run() {
viewModel.packetsSent = 0
viewModel.packetsReceived = 0
viewModel.throughput = 0
override fun run() {
viewModel.clear()
Log.info("startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("running")
Log.info("sending reset")
packetIO.sendPacket(ResetPacket())
@@ -63,14 +68,14 @@ class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO
Log.info("got ACK")
}
fun abort() {
override fun abort() {
done.release()
}
override fun onResetPacket() {
}
override fun onAckPacket() {
override fun onAckPacket(packet: AckPacket) {
Log.info("received ACK")
val elapsed = TimeSource.Monotonic.markNow() - startTime
val throughput = (bytesSent / elapsed.toDouble(DurationUnit.SECONDS)).toInt()
@@ -81,4 +86,4 @@ class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO
override fun onSequencePacket(packet: SequencePacket) {
}
}
}

View File

@@ -22,16 +22,20 @@ import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.socket-client")
private const val DEFAULT_STARTUP_DELAY = 3000
class SocketClient(
private val viewModel: AppViewModel,
private val socket: BluetoothSocket,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) {
private var clientThread: Thread? = null
class SocketClient(private val viewModel: AppViewModel, private val socket: BluetoothSocket) {
@SuppressLint("MissingPermission")
fun run() {
viewModel.running = true
val socketDataSink = SocketDataSink(socket)
val streamIO = StreamedPacketIO(socketDataSink)
val socketDataSource = SocketDataSource(socket, streamIO::onData)
val sender = Sender(viewModel, streamIO)
val ioClient = createIoClient(streamIO)
fun cleanup() {
socket.close()
@@ -39,9 +43,9 @@ class SocketClient(private val viewModel: AppViewModel, private val socket: Blue
viewModel.running = false
}
thread(name = "SocketClient") {
clientThread = thread(name = "SocketClient") {
viewModel.aborter = {
sender.abort()
ioClient.abort()
socket.close()
}
Log.info("connecting to remote")
@@ -49,27 +53,37 @@ class SocketClient(private val viewModel: AppViewModel, private val socket: Blue
socket.connect()
} catch (error: IOException) {
Log.warning("connection failed")
viewModel.status = "ABORTED"
viewModel.lastError = "CONNECTION_FAILED"
cleanup()
return@thread
}
Log.info("connected")
thread {
val sourceThread = thread {
socketDataSource.receive()
socket.close()
sender.abort()
ioClient.abort()
}
Log.info("Startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("Starting to send")
try {
sender.run()
ioClient.run()
socket.close()
viewModel.status = "OK"
} catch (error: IOException) {
Log.info("run ended abruptly")
viewModel.status = "ABORTED"
viewModel.lastError = "IO_ERROR"
}
Log.info("waiting for source thread to finish")
sourceThread.join()
cleanup()
}
}
}
fun waitForCompletion() {
clientThread?.join()
}
}

View File

@@ -21,7 +21,13 @@ import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.socket-server")
class SocketServer(private val viewModel: AppViewModel, private val serverSocket: BluetoothServerSocket) {
class SocketServer(
private val viewModel: AppViewModel,
private val serverSocket: BluetoothServerSocket,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) {
private var serverThread: Thread? = null
fun run(onConnected: () -> Unit, onDisconnected: () -> Unit) {
var aborted = false
viewModel.running = true
@@ -31,7 +37,7 @@ class SocketServer(private val viewModel: AppViewModel, private val serverSocket
viewModel.running = false
}
thread(name = "SocketServer") {
serverThread = thread(name = "SocketServer") {
while (!aborted) {
viewModel.aborter = {
serverSocket.close()
@@ -46,6 +52,8 @@ class SocketServer(private val viewModel: AppViewModel, private val serverSocket
return@thread
}
Log.info("got connection from ${socket.remoteDevice.address}")
Log.info("maxReceivePacketSize=${socket.maxReceivePacketSize}")
Log.info("maxTransmitPacketSize=${socket.maxTransmitPacketSize}")
onConnected()
viewModel.aborter = {
@@ -57,11 +65,22 @@ class SocketServer(private val viewModel: AppViewModel, private val serverSocket
val socketDataSink = SocketDataSink(socket)
val streamIO = StreamedPacketIO(socketDataSink)
val socketDataSource = SocketDataSource(socket, streamIO::onData)
val receiver = Receiver(viewModel, streamIO)
val ioThread = thread(name = "IoClient") {
val ioClient = createIoClient(streamIO)
ioClient.run()
}
socketDataSource.receive()
socket.close()
ioThread.join()
}
cleanup()
}
}
}
fun waitForCompletion() {
serverThread?.join()
}
}

View File

@@ -1,5 +1,5 @@
[versions]
agp = "8.2.0"
agp = "8.4.0"
kotlin = "1.9.0"
core-ktx = "1.12.0"
junit = "4.13.2"
@@ -8,6 +8,8 @@ espresso-core = "3.5.1"
lifecycle-runtime-ktx = "2.6.2"
activity-compose = "1.7.2"
compose-bom = "2023.08.00"
mobly-snippet = "1.4.0"
core = "1.6.1"
[libraries]
core-ktx = { group = "androidx.core", name = "core-ktx", version.ref = "core-ktx" }
@@ -24,6 +26,8 @@ ui-tooling-preview = { group = "androidx.compose.ui", name = "ui-tooling-preview
ui-test-manifest = { group = "androidx.compose.ui", name = "ui-test-manifest" }
ui-test-junit4 = { group = "androidx.compose.ui", name = "ui-test-junit4" }
material3 = { group = "androidx.compose.material3", name = "material3" }
mobly-snippet = { group = "com.google.android.mobly", name = "mobly-snippet-lib", version.ref = "mobly.snippet" }
androidx-core = { group = "androidx.test", name = "core", version.ref = "core" }
[plugins]
androidApplication = { id = "com.android.application", version.ref = "agp" }

View File

@@ -1,6 +1,6 @@
#Wed Oct 25 07:40:52 PDT 2023
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@@ -23,6 +23,7 @@ public class HciProxy {
HciHal hciHal = HciHal.create(new HciHalCallback() {
@Override
public void onPacket(HciPacket.Type type, byte[] packet) {
Log.d(TAG, String.format("CONTROLLER->HOST: type=%s, size=%d", type, packet.length));
mServer.sendPacket(type, packet);
switch (type) {
@@ -83,7 +84,7 @@ public class HciProxy {
@Override
public void onPacket(HciPacket.Type type, byte[] packet) {
Log.d(TAG, String.format("onPacket: type=%s, size=%d", type, packet.length));
Log.d(TAG, String.format("HOST->CONTROLLER: type=%s, size=%d", type, packet.length));
hciHal.sendPacket(type, packet);
switch (type) {

View File

@@ -45,6 +45,7 @@ ignore="pandora" # FIXME: pylint does not support stubs yet:
[tool.pylint.typecheck]
signature-mutators="AsyncRunner.run_in_task"
disable="not-callable"
[tool.black]
skip-string-normalization = true
@@ -55,7 +56,7 @@ extend-exclude = '''
'''
[tool.mypy]
exclude = ['bumble/transport/grpc_protobuf']
exclude = ['bumble/transport/grpc_protobuf', 'examples/mobly/bench']
[[tool.mypy.overrides]]
module = "bumble.transport.grpc_protobuf.*"

View File

@@ -1,14 +1,23 @@
# Invoke this script with an argument pointing to where the AOSP `tools/netsim/src/proto` is
# Invoke this script with two arguments:
# Arg 1: directory path where the AOSP `tools/netsim/proto` directory is located
# Arg 2: directory path where the RootCanal `proto/rootcanal` directory is located
PROTOC_OUT=bumble/transport/grpc_protobuf
proto_files=(common.proto packet_streamer.proto hci_packet.proto startup.proto)
for proto_file in "${proto_files[@]}"
netsim_proto_files=(netsim/common.proto netsim/packet_streamer.proto netsim/hci_packet.proto netsim/startup.proto netsim/model.proto)
for proto_file in "${netsim_proto_files[@]}"
do
python -m grpc_tools.protoc -I$1 --proto_path=bumble/transport --python_out=$PROTOC_OUT --pyi_out=$PROTOC_OUT --grpc_python_out=$PROTOC_OUT $1/$proto_file
python -m grpc_tools.protoc -I$1 -I$2 --proto_path=bumble/transport --python_out=$PROTOC_OUT --pyi_out=$PROTOC_OUT --grpc_python_out=$PROTOC_OUT $1/$proto_file
done
python_files=(packet_streamer_pb2_grpc.py packet_streamer_pb2.py hci_packet_pb2.py startup_pb2.py)
rootcanal_proto_files=(rootcanal/configuration.proto)
for proto_file in "${rootcanal_proto_files[@]}"
do
python -m grpc_tools.protoc -I$1 -I$2 --proto_path=bumble/transport --python_out=$PROTOC_OUT --pyi_out=$PROTOC_OUT --grpc_python_out=$PROTOC_OUT $2/$proto_file
done
python_files=(netsim/*.py netsim/*.pyi)
for python_file in "${python_files[@]}"
do
sed -i 's/^import .*_pb2 as/from . \0/' $PROTOC_OUT/$python_file
sed -i '' 's/^from netsim/from bumble.transport.grpc_protobuf.netsim/' $PROTOC_OUT/$python_file
sed -i '' 's/^from rootcanal/from bumble.transport.grpc_protobuf.rootcanal/' $PROTOC_OUT/$python_file
done

View File

@@ -75,6 +75,8 @@ console_scripts =
bumble-pandora-server = bumble.apps.pandora_server:main
bumble-rtk-util = bumble.tools.rtk_util:main
bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main
bumble-intel-util = bumble.tools.intel_util:main
bumble-intel-fw-download = bumble.tools.intel_fw_download:main
[options.package_data]
* = py.typed, *.pyi
@@ -91,9 +93,10 @@ development =
black == 24.3
grpcio-tools >= 1.62.1
invoke >= 1.7.3
mypy == 1.10.0
mobly >= 1.12.2
mypy == 1.12.0
nox >= 2022
pylint == 3.1.0
pylint == 3.3.1
pyyaml >= 6.0
types-appdirs >= 1.4.3
types-invoke >= 1.7.3

View File

@@ -851,7 +851,12 @@ async def test_unsubscribe():
await async_barrier()
mock1.assert_called_once_with(ANY, True, False)
await c2.subscribe()
assert len(server.gatt_server.subscribers) == 1
def callback(_):
pass
await c2.subscribe(callback)
await async_barrier()
mock2.assert_called_once_with(ANY, True, False)
@@ -861,10 +866,16 @@ async def test_unsubscribe():
mock1.assert_called_once_with(ANY, False, False)
mock2.reset_mock()
await c2.unsubscribe()
await c2.unsubscribe(callback)
await async_barrier()
mock2.assert_called_once_with(ANY, False, False)
# All CCCDs should be zeros now
assert list(server.gatt_server.subscribers.values())[0] == {
c1.handle: bytes([0, 0]),
c2.handle: bytes([0, 0]),
}
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()

View File

@@ -25,6 +25,7 @@ import sys
from bumble import att, device
from bumble.profiles import hap
from .test_utils import TwoDevices
from bumble.keys import PairingKeys
# -----------------------------------------------------------------------------
# Logging
@@ -86,6 +87,10 @@ async def hap_client():
devices.connections[0].encryption = 1 # type: ignore
devices.connections[1].encryption = 1 # type: ignore
devices[0].on_pairing(
devices.connections[0], devices.connections[0].peer_address, PairingKeys(), True
)
peer = device.Peer(devices.connections[1]) # type: ignore
hap_client = await peer.discover_service_and_create_proxy(
hap.HearingAccessServiceProxy

View File

@@ -569,6 +569,37 @@ async def test_sco_setup():
await asyncio.gather(*sco_disconnection_futures)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_batched_response(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
ag.dlc.write(b'\r\n+BIND: (1,2)\r\n\r\nOK\r\n')
await hf.execute_command("AT+BIND=?", response_type=hfp.AtResponseType.SINGLE)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_batched_commands(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
answer_future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: answer_future.set_result(None))
hang_up_future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: hang_up_future.set_result(None))
hf.dlc.write(b'ATA\rAT+CHUP\r')
await answer_future
await hang_up_future
# -----------------------------------------------------------------------------
async def run():
await test_slc()

130
tools/intel_fw_download.py Normal file
View File

@@ -0,0 +1,130 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import pathlib
import urllib.request
import urllib.error
import click
from bumble.colors import color
from bumble.drivers import intel
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/intel"
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def download_file(base_url, name):
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
# -----------------------------------------------------------------------------
@click.command
@click.option(
"--output-dir",
default="",
help="Output directory where the files will be saved. Defaults to the OS-specific"
"app data dir, which the driver will check when trying to find firmware",
show_default=True,
)
@click.option(
"--source",
type=click.Choice(["linux-kernel"]),
default="linux-kernel",
show_default=True,
)
@click.option("--single", help="Only download a single image set, by its base name")
@click.option("--force", is_flag=True, help="Overwrite files if they already exist")
def main(output_dir, source, single, force):
"""Download Intel firmware images and configs."""
# Check that the output dir exists
if output_dir == '':
output_dir = intel.intel_firmware_dir()
else:
output_dir = pathlib.Path(output_dir)
if not output_dir.is_dir():
print("Output dir does not exist or is not a directory")
return
base_url = {
"linux-kernel": LINUX_KERNEL_GIT_SOURCE,
}[source]
print("Downloading")
print(color("FROM:", "green"), base_url)
print(color("TO:", "green"), output_dir)
if single:
images = [(f"{single}.sfi", f"{single}.ddc")]
else:
images = [
(f"{base_name}.sfi", f"{base_name}.ddc")
for base_name in intel.INTEL_FW_IMAGE_NAMES
]
for fw_name, config_name in images:
print(color("---", "yellow"))
fw_image_out = output_dir / fw_name
if not force and fw_image_out.exists():
print(color(f"{fw_image_out} already exists, skipping", "red"))
continue
if config_name:
config_image_out = output_dir / config_name
if not force and config_image_out.exists():
print(color("f{config_image_out} already exists, skipping", "red"))
continue
try:
fw_image = download_file(base_url, fw_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {fw_name}: {error}")
continue
config_image = None
if config_name:
try:
config_image = download_file(base_url, config_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {config_name}: {error}")
continue
fw_image_out.write_bytes(fw_image)
if config_image:
config_image_out.write_bytes(config_image)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()

154
tools/intel_util.py Normal file
View File

@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
from typing import Any, Optional
import click
from bumble.colors import color
from bumble import transport
from bumble.drivers import intel
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def print_device_info(device_info: dict[intel.ValueType, Any]) -> None:
if (mode := device_info.get(intel.ValueType.CURRENT_MODE_OF_OPERATION)) is not None:
print(
color("MODE:", "yellow"),
mode.name,
)
print(color("DETAILS:", "yellow"))
for key, value in device_info.items():
print(f" {color(key.name, 'green')}: {value}")
# -----------------------------------------------------------------------------
async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]:
# Create a driver
driver = await intel.Driver.for_host(host, force)
if driver is None:
print("Device does not appear to be an Intel device")
return None
return driver
# -----------------------------------------------------------------------------
async def do_info(usb_transport, force):
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_load(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.load_firmware()
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_bootloader(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.reboot_bootloader()
# -----------------------------------------------------------------------------
@click.group()
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Try to get the device info even if the USB info doesn't match",
)
def info(usb_transport, force):
"""Get the firmware info."""
asyncio.run(do_info(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Load even if the USB info doesn't match",
)
def load(usb_transport, force):
"""Load a firmware image."""
asyncio.run(do_load(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Attempt to reboot event if the USB info doesn't match",
)
def bootloader(usb_transport, force):
"""Reboot in bootloader mode."""
asyncio.run(do_bootloader(usb_transport, force))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()