Merge pull request #649 from google/gbg/async-read-phy

make connection phy async
This commit is contained in:
Gilles Boccon-Gibod
2025-02-18 07:25:06 -08:00
committed by GitHub
5 changed files with 56 additions and 74 deletions

View File

@@ -104,15 +104,16 @@ def le_phy_name(phy_id):
)
def print_connection_phy(phy):
logging.info(
color('@@@ PHY: ', 'yellow') + f'TX:{le_phy_name(phy.tx_phy)}/'
f'RX:{le_phy_name(phy.rx_phy)}'
)
def print_connection(connection):
params = []
if connection.transport == BT_LE_TRANSPORT:
params.append(
'PHY='
f'TX:{le_phy_name(connection.phy.tx_phy)}/'
f'RX:{le_phy_name(connection.phy.rx_phy)}'
)
params.append(
'DL=('
f'TX:{connection.data_length[0]}/{connection.data_length[1]},'
@@ -1288,6 +1289,8 @@ class Central(Connection.Listener):
logging.info(color('### Connected', 'cyan'))
self.connection.listener = self
print_connection(self.connection)
phy = await self.connection.get_phy()
print_connection_phy(phy)
# Switch roles if needed.
if self.role_switch:
@@ -1345,8 +1348,8 @@ class Central(Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)
@@ -1472,8 +1475,8 @@ class Peripheral(Device.Listener, Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)

View File

@@ -22,7 +22,6 @@
import asyncio
import logging
import os
import random
import re
import humanize
from typing import Optional, Union
@@ -57,7 +56,13 @@ from bumble import __version__
import bumble.core
from bumble import colors
from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.device import (
ConnectionParametersPreferences,
ConnectionPHY,
Device,
Connection,
Peer,
)
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
@@ -125,6 +130,7 @@ def parse_phys(phys):
# -----------------------------------------------------------------------------
class ConsoleApp:
connected_peer: Optional[Peer]
connection_phy: Optional[ConnectionPHY]
def __init__(self):
self.known_addresses = set()
@@ -132,6 +138,7 @@ class ConsoleApp:
self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.connection_phy = None
self.top_tab = 'device'
self.monitor_rssi = False
self.connection_rssi = None
@@ -332,10 +339,10 @@ class ConsoleApp:
f'{connection.parameters.peripheral_latency}/'
f'{connection.parameters.supervision_timeout}'
)
if connection.transport == BT_LE_TRANSPORT:
if self.connection_phy is not None:
phy_state = (
f' RX={le_phy_name(connection.phy.rx_phy)}/'
f'TX={le_phy_name(connection.phy.tx_phy)}'
f' RX={le_phy_name(self.connection_phy.rx_phy)}/'
f'TX={le_phy_name(self.connection_phy.tx_phy)}'
)
else:
phy_state = ''
@@ -654,11 +661,12 @@ class ConsoleApp:
self.append_to_output('connecting...')
try:
await self.device.connect(
connection = await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT,
)
self.connection_phy = await connection.get_phy()
self.top_tab = 'services'
except bumble.core.TimeoutError:
self.show_error('connection timed out')
@@ -838,8 +846,8 @@ class ConsoleApp:
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(
f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
f'TX={HCI_Constant.le_phy_name(phy[1])}'
f'PHY: RX={HCI_Constant.le_phy_name(phy.rx_phy)}, '
f'TX={HCI_Constant.le_phy_name(phy.tx_phy)}'
)
async def do_request_mtu(self, params):
@@ -1076,10 +1084,9 @@ class DeviceListener(Device.Listener, Connection.Listener):
f'{self.app.connected_peer.connection.parameters}'
)
def on_connection_phy_update(self):
self.app.append_to_output(
f'connection phy update: {self.app.connected_peer.connection.phy}'
)
def on_connection_phy_update(self, phy):
self.app.connection_phy = phy
self.app.append_to_output(f'connection phy update: {phy}')
def on_connection_att_mtu_update(self):
self.app.append_to_output(

View File

@@ -1586,7 +1586,7 @@ class Connection(CompositeEventEmitter):
def on_connection_data_length_change(self):
pass
def on_connection_phy_update(self):
def on_connection_phy_update(self, phy):
pass
def on_connection_phy_update_failure(self, error):
@@ -1612,7 +1612,6 @@ class Connection(CompositeEventEmitter):
peer_resolvable_address,
role,
parameters,
phy,
):
super().__init__()
self.device = device
@@ -1629,7 +1628,6 @@ class Connection(CompositeEventEmitter):
self.authenticated = False
self.sc = False
self.link_key_type = None
self.phy = phy
self.att_mtu = ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = None # Per-connection client
@@ -1658,7 +1656,6 @@ class Connection(CompositeEventEmitter):
None,
role,
None,
None,
)
# [Classic only]
@@ -1774,12 +1771,12 @@ class Connection(CompositeEventEmitter):
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
async def get_phy(self) -> ConnectionPHY:
return await self.device.get_connection_phy(self)
async def get_rssi(self):
return await self.device.get_connection_rssi(self)
async def get_phy(self):
return await self.device.get_connection_phy(self)
async def transfer_periodic_sync(
self, sync_handle: int, service_data: int = 0
) -> None:
@@ -3937,12 +3934,14 @@ class Device(CompositeEventEmitter):
)
return result.return_parameters.rssi
async def get_connection_phy(self, connection):
async def get_connection_phy(self, connection: Connection) -> ConnectionPHY:
result = await self.send_command(
hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
check_result=True,
)
return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
return ConnectionPHY(
result.return_parameters.tx_phy, result.return_parameters.rx_phy
)
async def set_connection_phy(
self, connection, tx_phys=None, rx_phys=None, phy_options=None
@@ -5101,29 +5100,6 @@ class Device(CompositeEventEmitter):
lambda _: self.abort_on('flush', advertising_set.start()),
)
self._emit_le_connection(connection)
def _emit_le_connection(self, connection: Connection) -> None:
# If supported, read which PHY we're connected with before
# notifying listeners of the new connection.
if self.host.supports_command(hci.HCI_LE_READ_PHY_COMMAND):
async def read_phy():
result = await self.send_command(
hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
check_result=True,
)
connection.phy = ConnectionPHY(
result.return_parameters.tx_phy, result.return_parameters.rx_phy
)
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
# Do so asynchronously to not block the current event handler
connection.abort_on('disconnection', read_phy())
return
self.emit('connection', connection)
@host_event_handler
@@ -5222,7 +5198,6 @@ class Device(CompositeEventEmitter):
peer_resolvable_address,
role,
connection_parameters,
ConnectionPHY(hci.HCI_LE_1M_PHY, hci.HCI_LE_1M_PHY),
)
self.connections[connection_handle] = connection
@@ -5238,7 +5213,7 @@ class Device(CompositeEventEmitter):
if role == hci.HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
# We can emit now, we have all the info we need
self._emit_le_connection(connection)
self.emit('connection', connection)
return
if role == hci.HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising:
@@ -5792,14 +5767,13 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_phy_update(self, connection, connection_phy):
def on_connection_phy_update(self, connection, phy):
logger.debug(
f'*** Connection PHY Update: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'{connection_phy}'
f'{phy}'
)
connection.phy = connection_phy
connection.emit('connection_phy_update')
connection.emit('connection_phy_update', phy)
@host_event_handler
@with_connection_from_handle

View File

@@ -1098,8 +1098,11 @@ class Host(AbortableEventEmitter):
# Notify the client
if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy)
self.emit(
'connection_phy_update',
connection.handle,
ConnectionPHY(event.tx_phy, event.rx_phy),
)
else:
self.emit('connection_phy_update_failure', connection.handle, event.status)

View File

@@ -371,9 +371,7 @@ class HostService(HostServicer):
scan_response_data=scan_response_data,
)
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
@@ -382,7 +380,7 @@ class HostService(HostServicer):
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -397,8 +395,7 @@ class HostService(HostServicer):
await asyncio.sleep(1)
continue
connection = await pending_connection
pending_connection = asyncio.get_running_loop().create_future()
connection = await connections.get()
cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
yield AdvertiseResponse(connection=Connection(cookie=cookie))
@@ -492,6 +489,8 @@ class HostService(HostServicer):
target = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
def on_connection(connection: bumble.device.Connection) -> None:
@@ -499,7 +498,7 @@ class HostService(HostServicer):
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -517,12 +516,8 @@ class HostService(HostServicer):
await asyncio.sleep(1)
continue
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
self.log.debug('Wait for LE connection...')
connection = await pending_connection
connection = await connections.get()
self.log.debug(
f"Advertise: Connected to {connection.peer_address} (handle={connection.handle})"