Revert "Improve connection related functions and names"

This commit is contained in:
khsiao-google
2025-09-10 15:00:41 +08:00
committed by GitHub
parent d86d69d816
commit ca23d6b89a
4 changed files with 136 additions and 105 deletions

View File

@@ -2110,6 +2110,23 @@ class AdvertisingData:
return self.to_string()
# -----------------------------------------------------------------------------
# Connection Parameters
# -----------------------------------------------------------------------------
class ConnectionParameters:
def __init__(self, connection_interval, peripheral_latency, supervision_timeout):
self.connection_interval = connection_interval
self.peripheral_latency = peripheral_latency
self.supervision_timeout = supervision_timeout
def __str__(self):
return (
f'ConnectionParameters(connection_interval={self.connection_interval}, '
f'peripheral_latency={self.peripheral_latency}, '
f'supervision_timeout={self.supervision_timeout}'
)
# -----------------------------------------------------------------------------
# Connection PHY
# -----------------------------------------------------------------------------

View File

@@ -1831,6 +1831,36 @@ class Connection(utils.CompositeEventEmitter):
self.cs_configs = {}
self.cs_procedures = {}
# [Classic only]
@classmethod
def incomplete(cls, device, peer_address, role):
"""
Instantiate an incomplete connection (ie. one waiting for a HCI Connection
Complete event).
Once received it shall be completed using the `.complete` method.
"""
return cls(
device,
None,
PhysicalTransport.BR_EDR,
device.public_address,
None,
peer_address,
None,
role,
None,
)
# [Classic only]
def complete(self, handle, parameters):
"""
Finish an incomplete connection upon completion.
"""
assert self.handle is None
assert self.transport == PhysicalTransport.BR_EDR
self.handle = handle
self.parameters = parameters
@property
def role_name(self):
if self.role is None:
@@ -1842,7 +1872,7 @@ class Connection(utils.CompositeEventEmitter):
return f'UNKNOWN[{self.role}]'
@property
def is_encrypted(self) -> bool:
def is_encrypted(self):
return self.encryption != 0
@property
@@ -2147,6 +2177,8 @@ def with_connection_from_handle(function):
def with_connection_from_address(function):
@functools.wraps(function)
def wrapper(self, address: hci.Address, *args, **kwargs):
if connection := self.pending_connections.get(address, False):
return function(self, connection, *args, **kwargs)
for connection in self.connections.values():
if connection.peer_address == address:
return function(self, connection, *args, **kwargs)
@@ -2160,6 +2192,8 @@ def with_connection_from_address(function):
def try_with_connection_from_address(function):
@functools.wraps(function)
def wrapper(self, address, *args, **kwargs):
if connection := self.pending_connections.get(address, False):
return function(self, connection, address, *args, **kwargs)
for connection in self.connections.values():
if connection.peer_address == address:
return function(self, connection, address, *args, **kwargs)
@@ -2211,7 +2245,7 @@ class Device(utils.CompositeEventEmitter):
scan_response_data: bytes
cs_capabilities: ChannelSoundingCapabilities | None = None
connections: dict[int, Connection]
connection_roles: dict[hci.Address, hci.Role]
pending_connections: dict[hci.Address, Connection]
classic_pending_accepts: dict[
hci.Address,
list[asyncio.Future[Union[Connection, tuple[hci.Address, int, int]]]],
@@ -2333,9 +2367,7 @@ class Device(utils.CompositeEventEmitter):
self.le_connecting = False
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.connection_roles = (
{}
) # Local connection roles, by BD address (BR/EDR only)
self.pending_connections = {} # Connections, by BD address (BR/EDR only)
self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
self.cis_links = {} # CisLinks, by connection handle (LE only)
self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
@@ -3804,7 +3836,9 @@ class Device(utils.CompositeEventEmitter):
)
else:
# Save pending connection
self.connection_roles[peer_address] = hci.Role.CENTRAL
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address, hci.Role.CENTRAL
)
# TODO: allow passing other settings
result = await self.send_command(
@@ -3857,7 +3891,7 @@ class Device(utils.CompositeEventEmitter):
self.le_connecting = False
self.connect_own_address_type = None
else:
self.connection_roles.pop(peer_address, None)
self.pending_connections.pop(peer_address, None)
async def accept(
self,
@@ -3951,11 +3985,13 @@ class Device(utils.CompositeEventEmitter):
self.on(self.EVENT_CONNECTION, on_connection)
self.on(self.EVENT_CONNECTION_FAILURE, on_connection_failure)
# Save Peripheral hci.role.
# Save pending connection, with the Peripheral hci.role.
# Even if we requested a role switch in the hci.HCI_Accept_Connection_Request
# command, this connection is still considered Peripheral until an eventual
# role change event.
self.connection_roles[peer_address] = hci.Role.PERIPHERAL
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address, hci.Role.PERIPHERAL
)
try:
# Accept connection request
@@ -3973,7 +4009,7 @@ class Device(utils.CompositeEventEmitter):
finally:
self.remove_listener(self.EVENT_CONNECTION, on_connection)
self.remove_listener(self.EVENT_CONNECTION_FAILURE, on_connection_failure)
self.connection_roles.pop(peer_address, None)
self.pending_connections.pop(peer_address, None)
@asynccontextmanager
async def connect_as_gatt(self, peer_address: Union[hci.Address, str]):
@@ -5413,56 +5449,15 @@ class Device(utils.CompositeEventEmitter):
self.emit(self.EVENT_CONNECTION, connection)
@host_event_handler
def on_connection_complete(
self,
connection_handle: int,
peer_address: hci.Address,
connection_interval: int,
peripheral_latency: int,
supervision_timeout: int,
) -> None:
connection_role = self.connection_roles.pop(peer_address, hci.Role.PERIPHERAL)
logger.debug(
f'*** Connection: [0x{connection_handle:04X}] '
f'{peer_address} {hci.HCI_Constant.role_name(connection_role)}'
)
if connection_handle in self.connections:
logger.warning(
'new connection reuses the same handle as a previous connection'
)
# Create a new connection
connection = Connection(
device=self,
handle=connection_handle,
transport=PhysicalTransport.BR_EDR,
self_address=self.public_address,
self_resolvable_address=None,
peer_address=peer_address,
peer_resolvable_address=None,
role=connection_role,
parameters=Connection.Parameters(
connection_interval * 1.25,
peripheral_latency,
supervision_timeout * 10.0,
),
)
self.connections[connection_handle] = connection
self.emit(self.EVENT_CONNECTION, connection)
@host_event_handler
def on_le_connection_complete(
def on_connection(
self,
connection_handle: int,
transport: core.PhysicalTransport,
peer_address: hci.Address,
self_resolvable_address: Optional[hci.Address],
peer_resolvable_address: Optional[hci.Address],
role: hci.Role,
connection_interval: int,
peripheral_latency: int,
supervision_timeout: int,
connection_parameters: Optional[core.ConnectionParameters],
) -> None:
# Convert all-zeros addresses into None.
if self_resolvable_address == hci.Address.ANY_RANDOM:
@@ -5482,6 +5477,19 @@ class Device(utils.CompositeEventEmitter):
'new connection reuses the same handle as a previous connection'
)
if transport == PhysicalTransport.BR_EDR:
# Create a new connection
connection = self.pending_connections.pop(peer_address)
connection.complete(connection_handle, connection_parameters)
self.connections[connection_handle] = connection
# Emit an event to notify listeners of the new connection
self.emit(self.EVENT_CONNECTION, connection)
return
assert connection_parameters is not None
if peer_resolvable_address is None:
# Resolve the peer address if we can
if self.address_resolver:
@@ -5531,16 +5539,16 @@ class Device(utils.CompositeEventEmitter):
connection = Connection(
self,
connection_handle,
PhysicalTransport.LE,
transport,
self_address,
self_resolvable_address,
peer_address,
peer_resolvable_address,
role,
Connection.Parameters(
connection_interval * 1.25,
peripheral_latency,
supervision_timeout * 10.0,
connection_parameters.connection_interval * 1.25,
connection_parameters.peripheral_latency,
connection_parameters.supervision_timeout * 10.0,
),
)
self.connections[connection_handle] = connection
@@ -5631,7 +5639,9 @@ class Device(utils.CompositeEventEmitter):
# device configuration is set to accept any incoming connection
elif self.classic_accept_any:
# Save pending connection
self.connection_roles[bd_addr] = hci.Role.PERIPHERAL
self.pending_connections[bd_addr] = Connection.incomplete(
self, bd_addr, hci.Role.PERIPHERAL
)
self.host.send_command_sync(
hci.HCI_Accept_Connection_Request_Command(
@@ -6173,27 +6183,27 @@ class Device(utils.CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_parameters_update(
self,
connection: Connection,
connection_interval: int,
peripheral_latency: int,
supervision_timeout: int,
self, connection: Connection, connection_parameters: core.ConnectionParameters
):
logger.debug(
f'*** Connection Parameters Update: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'{connection_parameters}'
)
if connection.parameters.connection_interval != connection_interval * 1.25:
if (
connection.parameters.connection_interval
!= connection_parameters.connection_interval * 1.25
):
connection.parameters = Connection.Parameters(
connection_interval * 1.25,
peripheral_latency,
supervision_timeout * 10.0,
connection_parameters.connection_interval * 1.25,
connection_parameters.peripheral_latency,
connection_parameters.supervision_timeout * 10.0,
)
else:
connection.parameters = Connection.Parameters(
connection_interval * 1.25,
peripheral_latency,
supervision_timeout * 10.0,
connection_parameters.connection_interval * 1.25,
connection_parameters.peripheral_latency,
connection_parameters.supervision_timeout * 10.0,
connection.parameters.subrate_factor,
connection.parameters.continuation_number,
)
@@ -6393,15 +6403,10 @@ class Device(utils.CompositeEventEmitter):
# [Classic only]
@host_event_handler
@try_with_connection_from_address
def on_role_change(
self, connection: Connection, peer_address: hci.Address, new_role: hci.Role
):
if connection:
@with_connection_from_address
def on_role_change(self, connection: Connection, new_role: hci.Role):
connection.role = new_role
connection.emit(connection.EVENT_ROLE_CHANGE, new_role)
else:
self.connection_roles[peer_address] = new_role
# [Classic only]
@host_event_handler

View File

@@ -26,7 +26,12 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Union, cas
from bumble import drivers, hci, utils
from bumble.colors import color
from bumble.core import ConnectionPHY, InvalidStateError, PhysicalTransport
from bumble.core import (
ConnectionParameters,
ConnectionPHY,
InvalidStateError,
PhysicalTransport,
)
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
from bumble.transport.common import TransportLostError
@@ -991,16 +996,20 @@ class Host(utils.EventEmitter):
self.connections[event.connection_handle] = connection
# Notify the client
connection_parameters = ConnectionParameters(
event.connection_interval,
event.peripheral_latency,
event.supervision_timeout,
)
self.emit(
'le_connection_complete',
'connection',
event.connection_handle,
PhysicalTransport.LE,
event.peer_address,
getattr(event, 'local_resolvable_private_address', None),
getattr(event, 'peer_resolvable_private_address', None),
hci.Role(event.role),
event.connection_interval,
event.peripheral_latency,
event.supervision_timeout,
connection_parameters,
)
else:
logger.debug(f'### CONNECTION FAILED: {event.status}')
@@ -1051,12 +1060,14 @@ class Host(utils.EventEmitter):
# Notify the client
self.emit(
'connection_complete',
'connection',
event.connection_handle,
PhysicalTransport.BR_EDR,
event.bd_addr,
0,
0,
0,
None,
None,
None,
None,
)
else:
logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
@@ -1119,13 +1130,14 @@ class Host(utils.EventEmitter):
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(
'connection_parameters_update',
connection.handle,
connection_parameters = ConnectionParameters(
event.connection_interval,
event.peripheral_latency,
event.supervision_timeout,
)
self.emit(
'connection_parameters_update', connection.handle, connection_parameters
)
else:
self.emit(
'connection_parameters_update_failure', connection.handle, event.status

View File

@@ -24,7 +24,7 @@ from unittest import mock
import pytest
from bumble import device, gatt, hci, utils
from bumble.core import PhysicalTransport
from bumble.core import ConnectionParameters, PhysicalTransport
from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
@@ -289,15 +289,14 @@ async def test_legacy_advertising_disconnection(auto_restart):
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart)
device.on_le_connection_complete(
device.on_connection(
0x0001,
PhysicalTransport.LE,
peer_address,
None,
None,
Role.PERIPHERAL,
0,
0,
0,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
@@ -348,15 +347,14 @@ async def test_extended_advertising_connection(own_address_type):
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
device.on_le_connection_complete(
device.on_connection(
0x0001,
PhysicalTransport.LE,
peer_address,
None,
None,
Role.PERIPHERAL,
0,
0,
0,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
@@ -393,15 +391,14 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
0x0001,
0,
)
device.on_le_connection_complete(
device.on_connection(
0x0001,
PhysicalTransport.LE,
Address('F0:F1:F2:F3:F4:F5'),
None,
None,
Role.PERIPHERAL,
0,
0,
0,
ConnectionParameters(0, 0, 0),
)
if own_address_type == OwnAddressType.PUBLIC: