diff --git a/apps/bench.py b/apps/bench.py index e2200c10..7d815d25 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -23,6 +23,7 @@ import os import statistics import struct import time +from typing import Optional import click @@ -75,6 +76,7 @@ DEFAULT_CENTRAL_ADDRESS = 'F0:F0:F0:F0:F0:F0' DEFAULT_CENTRAL_NAME = 'Speed Central' DEFAULT_PERIPHERAL_ADDRESS = 'F1:F1:F1:F1:F1:F1' DEFAULT_PERIPHERAL_NAME = 'Speed Peripheral' +DEFAULT_ADVERTISING_INTERVAL = 100 SPEED_SERVICE_UUID = '50DB505C-8AC4-4738-8448-3B1D9CC09CC5' SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53' @@ -197,6 +199,51 @@ async def switch_roles(connection, role): logging.info(f'{color("### Role switch failed:", "red")} {error}') +async def pre_power_on(device: Device, classic: bool) -> None: + device.classic_enabled = classic + + # Set up a pairing config factory with minimal requirements. + device.config.keystore = "JsonKeyStore" + device.pairing_config_factory = lambda _: PairingConfig( + sc=False, mitm=False, bonding=False + ) + + +async def post_power_on( + device: Device, + le_scan: Optional[tuple[int, int]], + le_advertise: Optional[int], + classic_page_scan: bool, + classic_inquiry_scan: bool, +) -> None: + if classic_page_scan: + logging.info(color("*** Enabling page scan", "blue")) + await device.set_connectable(True) + if classic_inquiry_scan: + logging.info(color("*** Enabling inquiry scan", "blue")) + await device.set_discoverable(True) + + if le_scan: + scan_window, scan_interval = le_scan + logging.info( + color( + f"*** Starting LE scanning [{scan_window}ms/{scan_interval}ms]", + "blue", + ) + ) + await device.start_scanning( + scan_interval=scan_interval, scan_window=scan_window + ) + + if le_advertise: + logging.info(color(f"*** Starting LE advertising [{le_advertise}ms]", "blue")) + await device.start_advertising( + advertising_interval_min=le_advertise, + advertising_interval_max=le_advertise, + auto_restart=True, + ) + + # ----------------------------------------------------------------------------- # Packet # ----------------------------------------------------------------------------- @@ -1194,6 +1241,10 @@ class Central(Connection.Listener): encrypt, extended_data_length, role_switch, + le_scan, + le_advertise, + classic_page_scan, + classic_inquiry_scan, ): super().__init__() self.transport = transport @@ -1205,6 +1256,10 @@ class Central(Connection.Listener): self.encrypt = encrypt or authenticate self.extended_data_length = extended_data_length self.role_switch = role_switch + self.le_scan = le_scan + self.le_advertise = le_advertise + self.classic_page_scan = classic_page_scan + self.classic_inquiry_scan = classic_inquiry_scan self.device = None self.connection = None @@ -1253,19 +1308,16 @@ class Central(Connection.Listener): ) mode = self.mode_factory(self.device) scenario = self.scenario_factory(mode) - self.device.classic_enabled = self.classic - - # Set up a pairing config factory with minimal requirements. - self.device.config.keystore = "JsonKeyStore" - self.device.pairing_config_factory = lambda _: PairingConfig( - sc=False, mitm=False, bonding=False - ) + await pre_power_on(self.device, self.classic) await self.device.power_on() - - if self.classic: - await self.device.set_discoverable(False) - await self.device.set_connectable(False) + await post_power_on( + self.device, + self.le_scan, + self.le_advertise, + self.classic_page_scan, + self.classic_inquiry_scan, + ) logging.info( color(f'### Connecting to {self.peripheral_address}...', 'cyan') @@ -1378,6 +1430,10 @@ class Peripheral(Device.Listener, Connection.Listener): classic, extended_data_length, role_switch, + le_scan, + le_advertise, + classic_page_scan, + classic_inquiry_scan, ): self.transport = transport self.classic = classic @@ -1385,12 +1441,20 @@ class Peripheral(Device.Listener, Connection.Listener): self.mode_factory = mode_factory self.extended_data_length = extended_data_length self.role_switch = role_switch + self.le_scan = le_scan + self.classic_page_scan = classic_page_scan + self.classic_inquiry_scan = classic_inquiry_scan self.scenario = None self.mode = None self.device = None self.connection = None self.connected = asyncio.Event() + if le_advertise: + self.le_advertise = le_advertise + else: + self.le_advertise = 0 if classic else DEFAULT_ADVERTISING_INTERVAL + async def run(self): logging.info(color('>>> Connecting to HCI...', 'green')) async with await open_transport_or_link(self.transport) as ( @@ -1406,21 +1470,16 @@ class Peripheral(Device.Listener, Connection.Listener): self.device.listener = self self.mode = self.mode_factory(self.device) self.scenario = self.scenario_factory(self.mode) - self.device.classic_enabled = self.classic - - # Set up a pairing config factory with minimal requirements. - self.device.config.keystore = "JsonKeyStore" - self.device.pairing_config_factory = lambda _: PairingConfig( - sc=False, mitm=False, bonding=False - ) + await pre_power_on(self.device, self.classic) await self.device.power_on() - - if self.classic: - await self.device.set_discoverable(True) - await self.device.set_connectable(True) - else: - await self.device.start_advertising(auto_restart=True) + await post_power_on( + self.device, + self.le_scan, + self.le_advertise, + self.classic or self.classic_page_scan, + self.classic or self.classic_inquiry_scan, + ) if self.classic: logging.info( @@ -1451,10 +1510,14 @@ class Peripheral(Device.Listener, Connection.Listener): self.connection = connection self.connected.set() - # Stop being discoverable and connectable + # Stop being discoverable and connectable if possible if self.classic: - AsyncRunner.spawn(self.device.set_discoverable(False)) - AsyncRunner.spawn(self.device.set_connectable(False)) + if not self.classic_inquiry_scan: + logging.info(color("*** Stopping inquiry scan", "blue")) + AsyncRunner.spawn(self.device.set_discoverable(False)) + if not self.classic_page_scan: + logging.info(color("*** Stopping page scan", "blue")) + AsyncRunner.spawn(self.device.set_connectable(False)) # Request a new data length if needed if not self.classic and self.extended_data_length: @@ -1475,7 +1538,9 @@ class Peripheral(Device.Listener, Connection.Listener): self.scenario.reset() if self.classic: + logging.info(color("*** Enabling inquiry scan", "blue")) AsyncRunner.spawn(self.device.set_discoverable(True)) + logging.info(color("*** Enabling page scan", "blue")) AsyncRunner.spawn(self.device.set_connectable(True)) def on_connection_parameters_update(self): @@ -1621,6 +1686,7 @@ def create_scenario_factory(ctx, default_scenario): ) @click.option( '--extended-data-length', + metavar='/', help='Request a data length upon connection, specified as tx_octets/tx_time', ) @click.option( @@ -1628,6 +1694,26 @@ def create_scenario_factory(ctx, default_scenario): type=click.Choice(['central', 'peripheral']), help='Request role switch upon connection (central or peripheral)', ) +@click.option( + '--le-scan', + metavar='/', + help='Perform an LE scan with a given window and interval (milliseconds)', +) +@click.option( + '--le-advertise', + metavar='', + help='Advertise with a given interval (milliseconds)', +) +@click.option( + '--classic-page-scan', + is_flag=True, + help='Enable Classic page scanning', +) +@click.option( + '--classic-inquiry-scan', + is_flag=True, + help='Enable Classic enquiry scanning', +) @click.option( '--rfcomm-channel', type=int, @@ -1753,6 +1839,10 @@ def bench( att_mtu, extended_data_length, role_switch, + le_scan, + le_advertise, + classic_page_scan, + classic_inquiry_scan, packet_size, packet_count, start_delay, @@ -1801,6 +1891,10 @@ def bench( else None ) ctx.obj['role_switch'] = role_switch + ctx.obj['le_scan'] = [float(x) for x in le_scan.split('/')] if le_scan else None + ctx.obj['le_advertise'] = float(le_advertise) if le_advertise else None + ctx.obj['classic_page_scan'] = classic_page_scan + ctx.obj['classic_inquiry_scan'] = classic_inquiry_scan ctx.obj['classic'] = mode in ('rfcomm-client', 'rfcomm-server') @@ -1845,6 +1939,10 @@ def central( encrypt or authenticate, ctx.obj['extended_data_length'], ctx.obj['role_switch'], + ctx.obj['le_scan'], + ctx.obj['le_advertise'], + ctx.obj['classic_page_scan'], + ctx.obj['classic_inquiry_scan'], ).run() asyncio.run(run_central()) @@ -1866,6 +1964,10 @@ def peripheral(ctx, transport): ctx.obj['classic'], ctx.obj['extended_data_length'], ctx.obj['role_switch'], + ctx.obj['le_scan'], + ctx.obj['le_advertise'], + ctx.obj['classic_page_scan'], + ctx.obj['classic_inquiry_scan'], ).run() asyncio.run(run_peripheral()) diff --git a/bumble/device.py b/bumble/device.py index f92271d7..da407e41 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -99,9 +99,9 @@ logger = logging.getLogger(__name__) # fmt: off # pylint: disable=line-too-long -DEVICE_MIN_SCAN_INTERVAL = 25 +DEVICE_MIN_SCAN_INTERVAL = 2.5 DEVICE_MAX_SCAN_INTERVAL = 10240 -DEVICE_MIN_SCAN_WINDOW = 25 +DEVICE_MIN_SCAN_WINDOW = 2.5 DEVICE_MAX_SCAN_WINDOW = 10240 DEVICE_MIN_LE_RSSI = -127 DEVICE_MAX_LE_RSSI = 20