mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
202 lines
6.5 KiB
Python
202 lines
6.5 KiB
Python
# Copyright 2026 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import sys
|
|
from collections.abc import Callable
|
|
|
|
import bumble.logging
|
|
from bumble.core import BaseError
|
|
from bumble.device import Connection, Device
|
|
from bumble.hci import Address, LeFeatureMask
|
|
from bumble.transport import open_transport
|
|
|
|
# -----------------------------------------------------------------------------
|
|
DEFAULT_CENTRAL_ADDRESS = Address("F0:F0:F0:F0:F0:F0")
|
|
DEFAULT_PERIPHERAL_ADDRESS = Address("F1:F1:F1:F1:F1:F1")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def run_as_central(
|
|
device: Device,
|
|
scenario: Callable | None,
|
|
) -> None:
|
|
# Connect to the peripheral
|
|
print(f'=== Connecting to {DEFAULT_PERIPHERAL_ADDRESS}...')
|
|
connection = await device.connect(DEFAULT_PERIPHERAL_ADDRESS)
|
|
print("=== Connected")
|
|
|
|
if scenario is not None:
|
|
await asyncio.sleep(1)
|
|
await scenario(connection)
|
|
|
|
await asyncio.get_running_loop().create_future()
|
|
|
|
|
|
async def run_as_peripheral(device: Device, scenario: Callable | None) -> None:
|
|
# Wait for a connection from the central
|
|
print(f'=== Advertising as {DEFAULT_PERIPHERAL_ADDRESS}...')
|
|
await device.start_advertising(auto_restart=True)
|
|
|
|
async def on_connection(connection: Connection) -> None:
|
|
assert scenario is not None
|
|
await asyncio.sleep(1)
|
|
await scenario(connection)
|
|
|
|
if scenario is not None:
|
|
device.on(Device.EVENT_CONNECTION, on_connection)
|
|
|
|
await asyncio.get_running_loop().create_future()
|
|
|
|
|
|
async def change_parameters(
|
|
connection: Connection,
|
|
parameter_request_procedure_supported: bool,
|
|
subrating_supported: bool,
|
|
shorter_connection_intervals_supported: bool,
|
|
) -> None:
|
|
if parameter_request_procedure_supported:
|
|
try:
|
|
print(">>> update_parameters(7.5, 200, 0, 4000)")
|
|
await connection.update_parameters(7.5, 200, 0, 4000)
|
|
await asyncio.sleep(3)
|
|
except BaseError as error:
|
|
print(f"Error: {error}")
|
|
|
|
if subrating_supported:
|
|
try:
|
|
print(">>> update_subrate(1, 2, 2, 1, 4000)")
|
|
await connection.update_subrate(1, 2, 2, 1, 4000)
|
|
await asyncio.sleep(3)
|
|
except BaseError as error:
|
|
print(f"Error: {error}")
|
|
|
|
if shorter_connection_intervals_supported:
|
|
try:
|
|
print(
|
|
">>> update_parameters_with_subrate(7.5, 200, 1, 1, 0, 0, 4000, 5, 1000)"
|
|
)
|
|
await connection.update_parameters_with_subrate(
|
|
7.5, 200, 1, 1, 0, 0, 4000, 5, 1000
|
|
)
|
|
await asyncio.sleep(3)
|
|
except BaseError as error:
|
|
print(f"Error: {error}")
|
|
|
|
try:
|
|
print(
|
|
">>> update_parameters_with_subrate(0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000)"
|
|
)
|
|
await connection.update_parameters_with_subrate(
|
|
0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000
|
|
)
|
|
await asyncio.sleep(3)
|
|
except BaseError as error:
|
|
print(f"Error: {error}")
|
|
|
|
print(">>> done")
|
|
|
|
|
|
def on_connection(connection: Connection) -> None:
|
|
print(f"+++ Connection established: {connection}")
|
|
|
|
def on_le_remote_features_change() -> None:
|
|
print(f'... LE Remote Features change: {connection.peer_le_features.name}')
|
|
|
|
connection.on(
|
|
connection.EVENT_LE_REMOTE_FEATURES_CHANGE, on_le_remote_features_change
|
|
)
|
|
|
|
def on_connection_parameters_change() -> None:
|
|
print(f'... LE Connection Parameters change: {connection.parameters}')
|
|
|
|
connection.on(
|
|
connection.EVENT_CONNECTION_PARAMETERS_UPDATE, on_connection_parameters_change
|
|
)
|
|
|
|
|
|
async def main() -> None:
|
|
if len(sys.argv) < 3:
|
|
print(
|
|
'Usage: run_connection_updates.py <transport-spec> '
|
|
'central|peripheral initiator|responder'
|
|
)
|
|
return
|
|
|
|
print('<<< connecting to HCI...')
|
|
async with await open_transport(sys.argv[1]) as hci_transport:
|
|
print('<<< connected')
|
|
|
|
role = sys.argv[2]
|
|
direction = sys.argv[3]
|
|
device = Device.with_hci(
|
|
role,
|
|
(
|
|
DEFAULT_CENTRAL_ADDRESS
|
|
if role == "central"
|
|
else DEFAULT_PERIPHERAL_ADDRESS
|
|
),
|
|
hci_transport.source,
|
|
hci_transport.sink,
|
|
)
|
|
device.le_subrate_enabled = True
|
|
device.le_shorter_connection_intervals_enabled = True
|
|
await device.power_on()
|
|
|
|
parameter_request_procedure_supported = device.supports_le_features(
|
|
LeFeatureMask.CONNECTION_PARAMETERS_REQUEST_PROCEDURE
|
|
)
|
|
print(
|
|
"Parameters Request Procedure supported: "
|
|
f"{parameter_request_procedure_supported}"
|
|
)
|
|
|
|
subrating_supported = device.supports_le_features(
|
|
LeFeatureMask.CONNECTION_SUBRATING
|
|
)
|
|
print(f"Subrating supported: {subrating_supported}")
|
|
|
|
shorter_connection_intervals_supported = device.supports_le_features(
|
|
LeFeatureMask.SHORTER_CONNECTION_INTERVALS
|
|
)
|
|
print(
|
|
"Shorter Connection Intervals supported: "
|
|
f"{shorter_connection_intervals_supported}"
|
|
)
|
|
|
|
device.on(Device.EVENT_CONNECTION, on_connection)
|
|
|
|
async def run(connection: Connection) -> None:
|
|
await change_parameters(
|
|
connection,
|
|
parameter_request_procedure_supported,
|
|
subrating_supported,
|
|
shorter_connection_intervals_supported,
|
|
)
|
|
|
|
scenario = run if direction == "initiator" else None
|
|
|
|
if role == "central":
|
|
await run_as_central(device, scenario)
|
|
else:
|
|
await run_as_peripheral(device, scenario)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
bumble.logging.setup_basic_logging('DEBUG')
|
|
asyncio.run(main())
|