add advertise and scan options to the bench app

This commit is contained in:
Gilles Boccon-Gibod
2025-05-20 17:15:43 -04:00
parent 3b399ea1a2
commit 775b2d5d7f
2 changed files with 131 additions and 29 deletions

View File

@@ -23,6 +23,7 @@ import os
import statistics
import struct
import time
from typing import Optional
import click
@@ -75,6 +76,7 @@ DEFAULT_CENTRAL_ADDRESS = 'F0:F0:F0:F0:F0:F0'
DEFAULT_CENTRAL_NAME = 'Speed Central'
DEFAULT_PERIPHERAL_ADDRESS = 'F1:F1:F1:F1:F1:F1'
DEFAULT_PERIPHERAL_NAME = 'Speed Peripheral'
DEFAULT_ADVERTISING_INTERVAL = 100
SPEED_SERVICE_UUID = '50DB505C-8AC4-4738-8448-3B1D9CC09CC5'
SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53'
@@ -197,6 +199,51 @@ async def switch_roles(connection, role):
logging.info(f'{color("### Role switch failed:", "red")} {error}')
async def pre_power_on(device: Device, classic: bool) -> None:
device.classic_enabled = classic
# Set up a pairing config factory with minimal requirements.
device.config.keystore = "JsonKeyStore"
device.pairing_config_factory = lambda _: PairingConfig(
sc=False, mitm=False, bonding=False
)
async def post_power_on(
device: Device,
le_scan: Optional[tuple[int, int]],
le_advertise: Optional[int],
classic_page_scan: bool,
classic_inquiry_scan: bool,
) -> None:
if classic_page_scan:
logging.info(color("*** Enabling page scan", "blue"))
await device.set_connectable(True)
if classic_inquiry_scan:
logging.info(color("*** Enabling inquiry scan", "blue"))
await device.set_discoverable(True)
if le_scan:
scan_window, scan_interval = le_scan
logging.info(
color(
f"*** Starting LE scanning [{scan_window}ms/{scan_interval}ms]",
"blue",
)
)
await device.start_scanning(
scan_interval=scan_interval, scan_window=scan_window
)
if le_advertise:
logging.info(color(f"*** Starting LE advertising [{le_advertise}ms]", "blue"))
await device.start_advertising(
advertising_interval_min=le_advertise,
advertising_interval_max=le_advertise,
auto_restart=True,
)
# -----------------------------------------------------------------------------
# Packet
# -----------------------------------------------------------------------------
@@ -1194,6 +1241,10 @@ class Central(Connection.Listener):
encrypt,
extended_data_length,
role_switch,
le_scan,
le_advertise,
classic_page_scan,
classic_inquiry_scan,
):
super().__init__()
self.transport = transport
@@ -1205,6 +1256,10 @@ class Central(Connection.Listener):
self.encrypt = encrypt or authenticate
self.extended_data_length = extended_data_length
self.role_switch = role_switch
self.le_scan = le_scan
self.le_advertise = le_advertise
self.classic_page_scan = classic_page_scan
self.classic_inquiry_scan = classic_inquiry_scan
self.device = None
self.connection = None
@@ -1253,19 +1308,16 @@ class Central(Connection.Listener):
)
mode = self.mode_factory(self.device)
scenario = self.scenario_factory(mode)
self.device.classic_enabled = self.classic
# Set up a pairing config factory with minimal requirements.
self.device.config.keystore = "JsonKeyStore"
self.device.pairing_config_factory = lambda _: PairingConfig(
sc=False, mitm=False, bonding=False
)
await pre_power_on(self.device, self.classic)
await self.device.power_on()
if self.classic:
await self.device.set_discoverable(False)
await self.device.set_connectable(False)
await post_power_on(
self.device,
self.le_scan,
self.le_advertise,
self.classic_page_scan,
self.classic_inquiry_scan,
)
logging.info(
color(f'### Connecting to {self.peripheral_address}...', 'cyan')
@@ -1378,6 +1430,10 @@ class Peripheral(Device.Listener, Connection.Listener):
classic,
extended_data_length,
role_switch,
le_scan,
le_advertise,
classic_page_scan,
classic_inquiry_scan,
):
self.transport = transport
self.classic = classic
@@ -1385,12 +1441,20 @@ class Peripheral(Device.Listener, Connection.Listener):
self.mode_factory = mode_factory
self.extended_data_length = extended_data_length
self.role_switch = role_switch
self.le_scan = le_scan
self.classic_page_scan = classic_page_scan
self.classic_inquiry_scan = classic_inquiry_scan
self.scenario = None
self.mode = None
self.device = None
self.connection = None
self.connected = asyncio.Event()
if le_advertise:
self.le_advertise = le_advertise
else:
self.le_advertise = 0 if classic else DEFAULT_ADVERTISING_INTERVAL
async def run(self):
logging.info(color('>>> Connecting to HCI...', 'green'))
async with await open_transport_or_link(self.transport) as (
@@ -1406,21 +1470,16 @@ class Peripheral(Device.Listener, Connection.Listener):
self.device.listener = self
self.mode = self.mode_factory(self.device)
self.scenario = self.scenario_factory(self.mode)
self.device.classic_enabled = self.classic
# Set up a pairing config factory with minimal requirements.
self.device.config.keystore = "JsonKeyStore"
self.device.pairing_config_factory = lambda _: PairingConfig(
sc=False, mitm=False, bonding=False
)
await pre_power_on(self.device, self.classic)
await self.device.power_on()
if self.classic:
await self.device.set_discoverable(True)
await self.device.set_connectable(True)
else:
await self.device.start_advertising(auto_restart=True)
await post_power_on(
self.device,
self.le_scan,
self.le_advertise,
self.classic or self.classic_page_scan,
self.classic or self.classic_inquiry_scan,
)
if self.classic:
logging.info(
@@ -1451,10 +1510,14 @@ class Peripheral(Device.Listener, Connection.Listener):
self.connection = connection
self.connected.set()
# Stop being discoverable and connectable
# Stop being discoverable and connectable if possible
if self.classic:
AsyncRunner.spawn(self.device.set_discoverable(False))
AsyncRunner.spawn(self.device.set_connectable(False))
if not self.classic_inquiry_scan:
logging.info(color("*** Stopping inquiry scan", "blue"))
AsyncRunner.spawn(self.device.set_discoverable(False))
if not self.classic_page_scan:
logging.info(color("*** Stopping page scan", "blue"))
AsyncRunner.spawn(self.device.set_connectable(False))
# Request a new data length if needed
if not self.classic and self.extended_data_length:
@@ -1475,7 +1538,9 @@ class Peripheral(Device.Listener, Connection.Listener):
self.scenario.reset()
if self.classic:
logging.info(color("*** Enabling inquiry scan", "blue"))
AsyncRunner.spawn(self.device.set_discoverable(True))
logging.info(color("*** Enabling page scan", "blue"))
AsyncRunner.spawn(self.device.set_connectable(True))
def on_connection_parameters_update(self):
@@ -1621,6 +1686,7 @@ def create_scenario_factory(ctx, default_scenario):
)
@click.option(
'--extended-data-length',
metavar='<TX-OCTETS>/<TX-TIME>',
help='Request a data length upon connection, specified as tx_octets/tx_time',
)
@click.option(
@@ -1628,6 +1694,26 @@ def create_scenario_factory(ctx, default_scenario):
type=click.Choice(['central', 'peripheral']),
help='Request role switch upon connection (central or peripheral)',
)
@click.option(
'--le-scan',
metavar='<WINDOW>/<INTERVAL>',
help='Perform an LE scan with a given window and interval (milliseconds)',
)
@click.option(
'--le-advertise',
metavar='<INTERVAL>',
help='Advertise with a given interval (milliseconds)',
)
@click.option(
'--classic-page-scan',
is_flag=True,
help='Enable Classic page scanning',
)
@click.option(
'--classic-inquiry-scan',
is_flag=True,
help='Enable Classic enquiry scanning',
)
@click.option(
'--rfcomm-channel',
type=int,
@@ -1753,6 +1839,10 @@ def bench(
att_mtu,
extended_data_length,
role_switch,
le_scan,
le_advertise,
classic_page_scan,
classic_inquiry_scan,
packet_size,
packet_count,
start_delay,
@@ -1801,6 +1891,10 @@ def bench(
else None
)
ctx.obj['role_switch'] = role_switch
ctx.obj['le_scan'] = [float(x) for x in le_scan.split('/')] if le_scan else None
ctx.obj['le_advertise'] = float(le_advertise) if le_advertise else None
ctx.obj['classic_page_scan'] = classic_page_scan
ctx.obj['classic_inquiry_scan'] = classic_inquiry_scan
ctx.obj['classic'] = mode in ('rfcomm-client', 'rfcomm-server')
@@ -1845,6 +1939,10 @@ def central(
encrypt or authenticate,
ctx.obj['extended_data_length'],
ctx.obj['role_switch'],
ctx.obj['le_scan'],
ctx.obj['le_advertise'],
ctx.obj['classic_page_scan'],
ctx.obj['classic_inquiry_scan'],
).run()
asyncio.run(run_central())
@@ -1866,6 +1964,10 @@ def peripheral(ctx, transport):
ctx.obj['classic'],
ctx.obj['extended_data_length'],
ctx.obj['role_switch'],
ctx.obj['le_scan'],
ctx.obj['le_advertise'],
ctx.obj['classic_page_scan'],
ctx.obj['classic_inquiry_scan'],
).run()
asyncio.run(run_peripheral())

View File

@@ -99,9 +99,9 @@ logger = logging.getLogger(__name__)
# fmt: off
# pylint: disable=line-too-long
DEVICE_MIN_SCAN_INTERVAL = 25
DEVICE_MIN_SCAN_INTERVAL = 2.5
DEVICE_MAX_SCAN_INTERVAL = 10240
DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MIN_SCAN_WINDOW = 2.5
DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20