Replace legacy transport and role constants

This commit is contained in:
Josh Wu
2025-04-09 17:59:01 +08:00
parent a8019a70da
commit 7569da37e4
33 changed files with 169 additions and 157 deletions

View File

@@ -57,8 +57,7 @@ from .gatt import Attribute, Characteristic, Descriptor, Service
from .host import DataPacketQueue, Host
from .profiles.gap import GenericAccessService
from .core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
PhysicalTransport,
AdvertisingData,
BaseBumbleError,
ConnectionParameterUpdateError,
@@ -1660,7 +1659,7 @@ class Connection(CompositeEventEmitter):
return cls(
device,
None,
BT_BR_EDR_TRANSPORT,
PhysicalTransport.BR_EDR,
device.public_address,
None,
peer_address,
@@ -1675,7 +1674,7 @@ class Connection(CompositeEventEmitter):
Finish an incomplete connection upon completion.
"""
assert self.handle is None
assert self.transport == BT_BR_EDR_TRANSPORT
assert self.transport == PhysicalTransport.BR_EDR
self.handle = handle
self.parameters = parameters
@@ -1830,7 +1829,7 @@ class Connection(CompositeEventEmitter):
raise
def __str__(self):
if self.transport == BT_LE_TRANSPORT:
if self.transport == PhysicalTransport.LE:
return (
f'Connection(transport=LE, handle=0x{self.handle:04X}, '
f'role={self.role_name}, '
@@ -3397,7 +3396,7 @@ class Device(CompositeEventEmitter):
async def connect(
self,
peer_address: Union[hci.Address, str],
transport: core.PhysicalTransport = BT_LE_TRANSPORT,
transport: core.PhysicalTransport = PhysicalTransport.LE,
connection_parameters_preferences: Optional[
dict[hci.Phy, ConnectionParametersPreferences]
] = None,
@@ -3447,23 +3446,23 @@ class Device(CompositeEventEmitter):
'''
# Check parameters
if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT):
if transport not in (PhysicalTransport.LE, PhysicalTransport.BR_EDR):
raise InvalidArgumentError('invalid transport')
transport = core.PhysicalTransport(transport)
# Adjust the transport automatically if we need to
if transport == BT_LE_TRANSPORT and not self.le_enabled:
transport = BT_BR_EDR_TRANSPORT
elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled:
transport = BT_LE_TRANSPORT
if transport == PhysicalTransport.LE and not self.le_enabled:
transport = PhysicalTransport.BR_EDR
elif transport == PhysicalTransport.BR_EDR and not self.classic_enabled:
transport = PhysicalTransport.LE
# Check that there isn't already a pending connection
if transport == BT_LE_TRANSPORT and self.is_le_connecting:
if transport == PhysicalTransport.LE and self.is_le_connecting:
raise InvalidStateError('connection already pending')
if isinstance(peer_address, str):
try:
if transport == BT_LE_TRANSPORT and peer_address.endswith('@'):
if transport == PhysicalTransport.LE and peer_address.endswith('@'):
peer_address = hci.Address.from_string_for_transport(
peer_address[:-1], transport
)
@@ -3483,21 +3482,21 @@ class Device(CompositeEventEmitter):
else:
# All BR/EDR addresses should be public addresses
if (
transport == BT_BR_EDR_TRANSPORT
transport == PhysicalTransport.BR_EDR
and peer_address.address_type != hci.Address.PUBLIC_DEVICE_ADDRESS
):
raise InvalidArgumentError('BR/EDR addresses must be PUBLIC')
assert isinstance(peer_address, hci.Address)
if transport == BT_LE_TRANSPORT and always_resolve:
if transport == PhysicalTransport.LE and always_resolve:
logger.debug('resolving address')
peer_address = await self.find_peer_by_identity_address(
peer_address
) # TODO: timeout
def on_connection(connection):
if transport == BT_LE_TRANSPORT or (
if transport == PhysicalTransport.LE or (
# match BR/EDR connection event against peer address
connection.transport == transport
and connection.peer_address == peer_address
@@ -3505,7 +3504,7 @@ class Device(CompositeEventEmitter):
pending_connection.set_result(connection)
def on_connection_failure(error):
if transport == BT_LE_TRANSPORT or (
if transport == PhysicalTransport.LE or (
# match BR/EDR connection failure event against peer address
error.transport == transport
and error.peer_address == peer_address
@@ -3519,7 +3518,7 @@ class Device(CompositeEventEmitter):
try:
# Tell the controller to connect
if transport == BT_LE_TRANSPORT:
if transport == PhysicalTransport.LE:
if connection_parameters_preferences is None:
if connection_parameters_preferences is None:
connection_parameters_preferences = {
@@ -3664,7 +3663,7 @@ class Device(CompositeEventEmitter):
raise hci.HCI_StatusError(result)
# Wait for the connection process to complete
if transport == BT_LE_TRANSPORT:
if transport == PhysicalTransport.LE:
self.le_connecting = True
if timeout is None:
@@ -3675,7 +3674,7 @@ class Device(CompositeEventEmitter):
asyncio.shield(pending_connection), timeout
)
except asyncio.TimeoutError:
if transport == BT_LE_TRANSPORT:
if transport == PhysicalTransport.LE:
await self.send_command(
hci.HCI_LE_Create_Connection_Cancel_Command()
)
@@ -3691,7 +3690,7 @@ class Device(CompositeEventEmitter):
finally:
self.remove_listener('connection', on_connection)
self.remove_listener('connection_failure', on_connection_failure)
if transport == BT_LE_TRANSPORT:
if transport == PhysicalTransport.LE:
self.le_connecting = False
self.connect_own_address_type = None
else:
@@ -3721,7 +3720,7 @@ class Device(CompositeEventEmitter):
# If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(
peer_address, BT_BR_EDR_TRANSPORT
peer_address, PhysicalTransport.BR_EDR
) # TODO: timeout
assert isinstance(peer_address, hci.Address)
@@ -3771,14 +3770,14 @@ class Device(CompositeEventEmitter):
def on_connection(connection):
if (
connection.transport == BT_BR_EDR_TRANSPORT
connection.transport == PhysicalTransport.BR_EDR
and connection.peer_address == peer_address
):
pending_connection.set_result(connection)
def on_connection_failure(error):
if (
error.transport == BT_BR_EDR_TRANSPORT
error.transport == PhysicalTransport.BR_EDR
and error.peer_address == peer_address
):
pending_connection.set_exception(error)
@@ -3848,7 +3847,7 @@ class Device(CompositeEventEmitter):
# If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(
peer_address, BT_BR_EDR_TRANSPORT
peer_address, PhysicalTransport.BR_EDR
) # TODO: timeout
await self.send_command(
@@ -4031,7 +4030,7 @@ class Device(CompositeEventEmitter):
check_result=True,
)
async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
async def find_peer_by_name(self, name, transport=PhysicalTransport.LE):
"""
Scan for a peer with a given name and return its address.
"""
@@ -4050,7 +4049,7 @@ class Device(CompositeEventEmitter):
was_scanning = self.scanning
was_discovering = self.discovering
try:
if transport == BT_LE_TRANSPORT:
if transport == PhysicalTransport.LE:
event_name = 'advertisement'
listener = self.on(
event_name,
@@ -4062,7 +4061,7 @@ class Device(CompositeEventEmitter):
if not self.scanning:
await self.start_scanning(filter_duplicates=True)
elif transport == BT_BR_EDR_TRANSPORT:
elif transport == PhysicalTransport.BR_EDR:
event_name = 'inquiry_result'
listener = self.on(
event_name,
@@ -4081,9 +4080,9 @@ class Device(CompositeEventEmitter):
if listener is not None:
self.remove_listener(event_name, listener)
if transport == BT_LE_TRANSPORT and not was_scanning:
if transport == PhysicalTransport.LE and not was_scanning:
await self.stop_scanning()
elif transport == BT_BR_EDR_TRANSPORT and not was_discovering:
elif transport == PhysicalTransport.BR_EDR and not was_discovering:
await self.stop_discovery()
async def find_peer_by_identity_address(
@@ -4238,7 +4237,7 @@ class Device(CompositeEventEmitter):
)
async def encrypt(self, connection, enable=True):
if not enable and connection.transport == BT_LE_TRANSPORT:
if not enable and connection.transport == PhysicalTransport.LE:
raise InvalidArgumentError('`enable` parameter is classic only.')
# Set up event handlers
@@ -4255,7 +4254,7 @@ class Device(CompositeEventEmitter):
# Request the encryption
try:
if connection.transport == BT_LE_TRANSPORT:
if connection.transport == PhysicalTransport.LE:
# Look for a key in the key store
if self.keystore is None:
raise InvalidOperationError('no key store')
@@ -4276,7 +4275,7 @@ class Device(CompositeEventEmitter):
else:
raise InvalidOperationError('no LTK found for peer')
if connection.role != hci.HCI_CENTRAL_ROLE:
if connection.role != hci.Role.CENTRAL:
raise InvalidStateError('only centrals can start encryption')
result = await self.send_command(
@@ -4932,7 +4931,7 @@ class Device(CompositeEventEmitter):
self.abort_on('flush', self.update_keys(str(bd_addr), pairing_keys))
if connection := self.find_connection_by_bd_addr(
bd_addr, transport=BT_BR_EDR_TRANSPORT
bd_addr, transport=PhysicalTransport.BR_EDR
):
connection.link_key_type = key_type
@@ -5232,7 +5231,7 @@ class Device(CompositeEventEmitter):
'new connection reuses the same handle as a previous connection'
)
if transport == BT_BR_EDR_TRANSPORT:
if transport == PhysicalTransport.BR_EDR:
# Create a new connection
connection = self.pending_connections.pop(peer_address)
connection.complete(connection_handle, connection_parameters)
@@ -5255,7 +5254,7 @@ class Device(CompositeEventEmitter):
self_address = None
own_address_type: Optional[hci.OwnAddressType] = None
if role == hci.HCI_CENTRAL_ROLE:
if role == hci.Role.CENTRAL:
own_address_type = self.connect_own_address_type
assert own_address_type is not None
else:
@@ -5302,7 +5301,7 @@ class Device(CompositeEventEmitter):
)
self.connections[connection_handle] = connection
if role == hci.HCI_PERIPHERAL_ROLE and self.legacy_advertiser:
if role == hci.Role.PERIPHERAL and self.legacy_advertiser:
if self.legacy_advertiser.auto_restart:
advertiser = self.legacy_advertiser
connection.once(
@@ -5312,12 +5311,12 @@ class Device(CompositeEventEmitter):
else:
self.legacy_advertiser = None
if role == hci.HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
if role == hci.Role.CENTRAL or not self.supports_le_extended_advertising:
# We can emit now, we have all the info we need
self.emit('connection', connection)
return
if role == hci.HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising:
if role == hci.Role.PERIPHERAL and self.supports_le_extended_advertising:
if advertising_set := self.connecting_extended_advertising_sets.pop(
connection_handle, None
):
@@ -5334,7 +5333,7 @@ class Device(CompositeEventEmitter):
# For directed advertising, this means a timeout
if (
transport == BT_LE_TRANSPORT
transport == PhysicalTransport.LE
and self.legacy_advertiser
and self.legacy_advertiser.advertising_type.is_directed
):
@@ -5361,7 +5360,7 @@ class Device(CompositeEventEmitter):
hci.HCI_Connection_Complete_Event.ESCO_LINK_TYPE,
):
if connection := self.find_connection_by_bd_addr(
bd_addr, transport=BT_BR_EDR_TRANSPORT
bd_addr, transport=PhysicalTransport.BR_EDR
):
self.emit('sco_request', connection, link_type)
else:
@@ -5812,14 +5811,14 @@ class Device(CompositeEventEmitter):
connection.encryption = encryption
if (
not connection.authenticated
and connection.transport == BT_BR_EDR_TRANSPORT
and connection.transport == PhysicalTransport.BR_EDR
and encryption == hci.HCI_Encryption_Change_Event.AES_CCM
):
connection.authenticated = True
connection.sc = True
if (
not connection.authenticated
and connection.transport == BT_LE_TRANSPORT
and connection.transport == PhysicalTransport.LE
and encryption == hci.HCI_Encryption_Change_Event.E0_OR_AES_CCM
):
connection.authenticated = True