From ca23d6b89a86c3decca5fef3f319eca218279618 Mon Sep 17 00:00:00 2001 From: khsiao-google Date: Wed, 10 Sep 2025 15:00:41 +0800 Subject: [PATCH] Revert "Improve connection related functions and names" --- bumble/core.py | 17 +++++ bumble/device.py | 165 ++++++++++++++++++++++--------------------- bumble/host.py | 36 ++++++---- tests/device_test.py | 23 +++--- 4 files changed, 136 insertions(+), 105 deletions(-) diff --git a/bumble/core.py b/bumble/core.py index 284184b..4eb6a97 100644 --- a/bumble/core.py +++ b/bumble/core.py @@ -2110,6 +2110,23 @@ class AdvertisingData: return self.to_string() +# ----------------------------------------------------------------------------- +# Connection Parameters +# ----------------------------------------------------------------------------- +class ConnectionParameters: + def __init__(self, connection_interval, peripheral_latency, supervision_timeout): + self.connection_interval = connection_interval + self.peripheral_latency = peripheral_latency + self.supervision_timeout = supervision_timeout + + def __str__(self): + return ( + f'ConnectionParameters(connection_interval={self.connection_interval}, ' + f'peripheral_latency={self.peripheral_latency}, ' + f'supervision_timeout={self.supervision_timeout}' + ) + + # ----------------------------------------------------------------------------- # Connection PHY # ----------------------------------------------------------------------------- diff --git a/bumble/device.py b/bumble/device.py index 408b1c9..b777cb3 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -1831,6 +1831,36 @@ class Connection(utils.CompositeEventEmitter): self.cs_configs = {} self.cs_procedures = {} + # [Classic only] + @classmethod + def incomplete(cls, device, peer_address, role): + """ + Instantiate an incomplete connection (ie. one waiting for a HCI Connection + Complete event). + Once received it shall be completed using the `.complete` method. + """ + return cls( + device, + None, + PhysicalTransport.BR_EDR, + device.public_address, + None, + peer_address, + None, + role, + None, + ) + + # [Classic only] + def complete(self, handle, parameters): + """ + Finish an incomplete connection upon completion. + """ + assert self.handle is None + assert self.transport == PhysicalTransport.BR_EDR + self.handle = handle + self.parameters = parameters + @property def role_name(self): if self.role is None: @@ -1842,7 +1872,7 @@ class Connection(utils.CompositeEventEmitter): return f'UNKNOWN[{self.role}]' @property - def is_encrypted(self) -> bool: + def is_encrypted(self): return self.encryption != 0 @property @@ -2147,6 +2177,8 @@ def with_connection_from_handle(function): def with_connection_from_address(function): @functools.wraps(function) def wrapper(self, address: hci.Address, *args, **kwargs): + if connection := self.pending_connections.get(address, False): + return function(self, connection, *args, **kwargs) for connection in self.connections.values(): if connection.peer_address == address: return function(self, connection, *args, **kwargs) @@ -2160,6 +2192,8 @@ def with_connection_from_address(function): def try_with_connection_from_address(function): @functools.wraps(function) def wrapper(self, address, *args, **kwargs): + if connection := self.pending_connections.get(address, False): + return function(self, connection, address, *args, **kwargs) for connection in self.connections.values(): if connection.peer_address == address: return function(self, connection, address, *args, **kwargs) @@ -2211,7 +2245,7 @@ class Device(utils.CompositeEventEmitter): scan_response_data: bytes cs_capabilities: ChannelSoundingCapabilities | None = None connections: dict[int, Connection] - connection_roles: dict[hci.Address, hci.Role] + pending_connections: dict[hci.Address, Connection] classic_pending_accepts: dict[ hci.Address, list[asyncio.Future[Union[Connection, tuple[hci.Address, int, int]]]], @@ -2333,9 +2367,7 @@ class Device(utils.CompositeEventEmitter): self.le_connecting = False self.disconnecting = False self.connections = {} # Connections, by connection handle - self.connection_roles = ( - {} - ) # Local connection roles, by BD address (BR/EDR only) + self.pending_connections = {} # Connections, by BD address (BR/EDR only) self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only) self.cis_links = {} # CisLinks, by connection handle (LE only) self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle @@ -3804,7 +3836,9 @@ class Device(utils.CompositeEventEmitter): ) else: # Save pending connection - self.connection_roles[peer_address] = hci.Role.CENTRAL + self.pending_connections[peer_address] = Connection.incomplete( + self, peer_address, hci.Role.CENTRAL + ) # TODO: allow passing other settings result = await self.send_command( @@ -3857,7 +3891,7 @@ class Device(utils.CompositeEventEmitter): self.le_connecting = False self.connect_own_address_type = None else: - self.connection_roles.pop(peer_address, None) + self.pending_connections.pop(peer_address, None) async def accept( self, @@ -3951,11 +3985,13 @@ class Device(utils.CompositeEventEmitter): self.on(self.EVENT_CONNECTION, on_connection) self.on(self.EVENT_CONNECTION_FAILURE, on_connection_failure) - # Save Peripheral hci.role. + # Save pending connection, with the Peripheral hci.role. # Even if we requested a role switch in the hci.HCI_Accept_Connection_Request # command, this connection is still considered Peripheral until an eventual # role change event. - self.connection_roles[peer_address] = hci.Role.PERIPHERAL + self.pending_connections[peer_address] = Connection.incomplete( + self, peer_address, hci.Role.PERIPHERAL + ) try: # Accept connection request @@ -3973,7 +4009,7 @@ class Device(utils.CompositeEventEmitter): finally: self.remove_listener(self.EVENT_CONNECTION, on_connection) self.remove_listener(self.EVENT_CONNECTION_FAILURE, on_connection_failure) - self.connection_roles.pop(peer_address, None) + self.pending_connections.pop(peer_address, None) @asynccontextmanager async def connect_as_gatt(self, peer_address: Union[hci.Address, str]): @@ -5413,56 +5449,15 @@ class Device(utils.CompositeEventEmitter): self.emit(self.EVENT_CONNECTION, connection) @host_event_handler - def on_connection_complete( - self, - connection_handle: int, - peer_address: hci.Address, - connection_interval: int, - peripheral_latency: int, - supervision_timeout: int, - ) -> None: - connection_role = self.connection_roles.pop(peer_address, hci.Role.PERIPHERAL) - - logger.debug( - f'*** Connection: [0x{connection_handle:04X}] ' - f'{peer_address} {hci.HCI_Constant.role_name(connection_role)}' - ) - if connection_handle in self.connections: - logger.warning( - 'new connection reuses the same handle as a previous connection' - ) - - # Create a new connection - connection = Connection( - device=self, - handle=connection_handle, - transport=PhysicalTransport.BR_EDR, - self_address=self.public_address, - self_resolvable_address=None, - peer_address=peer_address, - peer_resolvable_address=None, - role=connection_role, - parameters=Connection.Parameters( - connection_interval * 1.25, - peripheral_latency, - supervision_timeout * 10.0, - ), - ) - self.connections[connection_handle] = connection - - self.emit(self.EVENT_CONNECTION, connection) - - @host_event_handler - def on_le_connection_complete( + def on_connection( self, connection_handle: int, + transport: core.PhysicalTransport, peer_address: hci.Address, self_resolvable_address: Optional[hci.Address], peer_resolvable_address: Optional[hci.Address], role: hci.Role, - connection_interval: int, - peripheral_latency: int, - supervision_timeout: int, + connection_parameters: Optional[core.ConnectionParameters], ) -> None: # Convert all-zeros addresses into None. if self_resolvable_address == hci.Address.ANY_RANDOM: @@ -5482,6 +5477,19 @@ class Device(utils.CompositeEventEmitter): 'new connection reuses the same handle as a previous connection' ) + if transport == PhysicalTransport.BR_EDR: + # Create a new connection + connection = self.pending_connections.pop(peer_address) + connection.complete(connection_handle, connection_parameters) + self.connections[connection_handle] = connection + + # Emit an event to notify listeners of the new connection + self.emit(self.EVENT_CONNECTION, connection) + + return + + assert connection_parameters is not None + if peer_resolvable_address is None: # Resolve the peer address if we can if self.address_resolver: @@ -5531,16 +5539,16 @@ class Device(utils.CompositeEventEmitter): connection = Connection( self, connection_handle, - PhysicalTransport.LE, + transport, self_address, self_resolvable_address, peer_address, peer_resolvable_address, role, Connection.Parameters( - connection_interval * 1.25, - peripheral_latency, - supervision_timeout * 10.0, + connection_parameters.connection_interval * 1.25, + connection_parameters.peripheral_latency, + connection_parameters.supervision_timeout * 10.0, ), ) self.connections[connection_handle] = connection @@ -5631,7 +5639,9 @@ class Device(utils.CompositeEventEmitter): # device configuration is set to accept any incoming connection elif self.classic_accept_any: # Save pending connection - self.connection_roles[bd_addr] = hci.Role.PERIPHERAL + self.pending_connections[bd_addr] = Connection.incomplete( + self, bd_addr, hci.Role.PERIPHERAL + ) self.host.send_command_sync( hci.HCI_Accept_Connection_Request_Command( @@ -6173,27 +6183,27 @@ class Device(utils.CompositeEventEmitter): @host_event_handler @with_connection_from_handle def on_connection_parameters_update( - self, - connection: Connection, - connection_interval: int, - peripheral_latency: int, - supervision_timeout: int, + self, connection: Connection, connection_parameters: core.ConnectionParameters ): logger.debug( f'*** Connection Parameters Update: [0x{connection.handle:04X}] ' f'{connection.peer_address} as {connection.role_name}, ' + f'{connection_parameters}' ) - if connection.parameters.connection_interval != connection_interval * 1.25: + if ( + connection.parameters.connection_interval + != connection_parameters.connection_interval * 1.25 + ): connection.parameters = Connection.Parameters( - connection_interval * 1.25, - peripheral_latency, - supervision_timeout * 10.0, + connection_parameters.connection_interval * 1.25, + connection_parameters.peripheral_latency, + connection_parameters.supervision_timeout * 10.0, ) else: connection.parameters = Connection.Parameters( - connection_interval * 1.25, - peripheral_latency, - supervision_timeout * 10.0, + connection_parameters.connection_interval * 1.25, + connection_parameters.peripheral_latency, + connection_parameters.supervision_timeout * 10.0, connection.parameters.subrate_factor, connection.parameters.continuation_number, ) @@ -6393,15 +6403,10 @@ class Device(utils.CompositeEventEmitter): # [Classic only] @host_event_handler - @try_with_connection_from_address - def on_role_change( - self, connection: Connection, peer_address: hci.Address, new_role: hci.Role - ): - if connection: - connection.role = new_role - connection.emit(connection.EVENT_ROLE_CHANGE, new_role) - else: - self.connection_roles[peer_address] = new_role + @with_connection_from_address + def on_role_change(self, connection: Connection, new_role: hci.Role): + connection.role = new_role + connection.emit(connection.EVENT_ROLE_CHANGE, new_role) # [Classic only] @host_event_handler diff --git a/bumble/host.py b/bumble/host.py index c1afaf2..8452f3d 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -26,7 +26,12 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Union, cas from bumble import drivers, hci, utils from bumble.colors import color -from bumble.core import ConnectionPHY, InvalidStateError, PhysicalTransport +from bumble.core import ( + ConnectionParameters, + ConnectionPHY, + InvalidStateError, + PhysicalTransport, +) from bumble.l2cap import L2CAP_PDU from bumble.snoop import Snooper from bumble.transport.common import TransportLostError @@ -991,16 +996,20 @@ class Host(utils.EventEmitter): self.connections[event.connection_handle] = connection # Notify the client + connection_parameters = ConnectionParameters( + event.connection_interval, + event.peripheral_latency, + event.supervision_timeout, + ) self.emit( - 'le_connection_complete', + 'connection', event.connection_handle, + PhysicalTransport.LE, event.peer_address, getattr(event, 'local_resolvable_private_address', None), getattr(event, 'peer_resolvable_private_address', None), hci.Role(event.role), - event.connection_interval, - event.peripheral_latency, - event.supervision_timeout, + connection_parameters, ) else: logger.debug(f'### CONNECTION FAILED: {event.status}') @@ -1051,12 +1060,14 @@ class Host(utils.EventEmitter): # Notify the client self.emit( - 'connection_complete', + 'connection', event.connection_handle, + PhysicalTransport.BR_EDR, event.bd_addr, - 0, - 0, - 0, + None, + None, + None, + None, ) else: logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}') @@ -1119,13 +1130,14 @@ class Host(utils.EventEmitter): # Notify the client if event.status == hci.HCI_SUCCESS: - self.emit( - 'connection_parameters_update', - connection.handle, + connection_parameters = ConnectionParameters( event.connection_interval, event.peripheral_latency, event.supervision_timeout, ) + self.emit( + 'connection_parameters_update', connection.handle, connection_parameters + ) else: self.emit( 'connection_parameters_update_failure', connection.handle, event.status diff --git a/tests/device_test.py b/tests/device_test.py index f8539ec..c7b7baf 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -24,7 +24,7 @@ from unittest import mock import pytest from bumble import device, gatt, hci, utils -from bumble.core import PhysicalTransport +from bumble.core import ConnectionParameters, PhysicalTransport from bumble.device import ( AdvertisingEventProperties, AdvertisingParameters, @@ -289,15 +289,14 @@ async def test_legacy_advertising_disconnection(auto_restart): await device.power_on() peer_address = Address('F0:F1:F2:F3:F4:F5') await device.start_advertising(auto_restart=auto_restart) - device.on_le_connection_complete( + device.on_connection( 0x0001, + PhysicalTransport.LE, peer_address, None, None, Role.PERIPHERAL, - 0, - 0, - 0, + ConnectionParameters(0, 0, 0), ) device.on_advertising_set_termination( @@ -348,15 +347,14 @@ async def test_extended_advertising_connection(own_address_type): advertising_set = await device.create_advertising_set( advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) ) - device.on_le_connection_complete( + device.on_connection( 0x0001, + PhysicalTransport.LE, peer_address, None, None, Role.PERIPHERAL, - 0, - 0, - 0, + ConnectionParameters(0, 0, 0), ) device.on_advertising_set_termination( HCI_SUCCESS, @@ -393,15 +391,14 @@ async def test_extended_advertising_connection_out_of_order(own_address_type): 0x0001, 0, ) - device.on_le_connection_complete( + device.on_connection( 0x0001, + PhysicalTransport.LE, Address('F0:F1:F2:F3:F4:F5'), None, None, Role.PERIPHERAL, - 0, - 0, - 0, + ConnectionParameters(0, 0, 0), ) if own_address_type == OwnAddressType.PUBLIC: