diff --git a/bumble/controller.py b/bumble/controller.py index 9366c1d9..5fb1ff7f 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -25,8 +25,6 @@ import random import struct from bumble.colors import color from bumble.core import ( - BT_CENTRAL_ROLE, - BT_PERIPHERAL_ROLE, BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT, ) @@ -47,6 +45,7 @@ from bumble.hci import ( HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, HCI_VERSION_BLUETOOTH_CORE_5_0, Address, + Role, HCI_AclDataPacket, HCI_AclDataPacketAssembler, HCI_Command_Complete_Event, @@ -98,7 +97,7 @@ class CisLink: class Connection: controller: Controller handle: int - role: int + role: Role peer_address: Address link: Any transport: int @@ -390,7 +389,7 @@ class Controller: connection = Connection( controller=self, handle=connection_handle, - role=BT_PERIPHERAL_ROLE, + role=Role.PERIPHERAL, peer_address=peer_address, link=self.link, transport=BT_LE_TRANSPORT, @@ -450,7 +449,7 @@ class Controller: connection = Connection( controller=self, handle=connection_handle, - role=BT_CENTRAL_ROLE, + role=Role.CENTRAL, peer_address=peer_address, link=self.link, transport=BT_LE_TRANSPORT, @@ -469,7 +468,7 @@ class Controller: HCI_LE_Connection_Complete_Event( status=status, connection_handle=connection.handle if connection else 0, - role=BT_CENTRAL_ROLE, + role=Role.CENTRAL, peer_address_type=le_create_connection_command.peer_address_type, peer_address=le_create_connection_command.peer_address, connection_interval=le_create_connection_command.connection_interval_min, @@ -693,7 +692,7 @@ class Controller: controller=self, handle=connection_handle, # Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery - role=BT_CENTRAL_ROLE, + role=Role.CENTRAL, peer_address=peer_address, link=self.link, transport=BT_BR_EDR_TRANSPORT, @@ -761,7 +760,7 @@ class Controller: controller=self, handle=connection_handle, # Role doesn't matter in SCO. - role=BT_CENTRAL_ROLE, + role=Role.CENTRAL, peer_address=peer_address, link=self.link, transport=BT_BR_EDR_TRANSPORT, diff --git a/bumble/core.py b/bumble/core.py index 306dbde7..805c6f61 100644 --- a/bumble/core.py +++ b/bumble/core.py @@ -31,11 +31,12 @@ from bumble.utils import OpenIntEnum # ----------------------------------------------------------------------------- # fmt: off -BT_CENTRAL_ROLE = 0 -BT_PERIPHERAL_ROLE = 1 +class PhysicalTransport(enum.IntEnum): + BR_EDR = 0 + LE = 1 -BT_BR_EDR_TRANSPORT = 0 -BT_LE_TRANSPORT = 1 +BT_BR_EDR_TRANSPORT = PhysicalTransport.BR_EDR +BT_LE_TRANSPORT = PhysicalTransport.LE # fmt: on diff --git a/bumble/device.py b/bumble/device.py index 11cf2cce..452bff23 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -58,9 +58,7 @@ from .host import DataPacketQueue, Host from .profiles.gap import GenericAccessService from .core import ( BT_BR_EDR_TRANSPORT, - BT_CENTRAL_ROLE, BT_LE_TRANSPORT, - BT_PERIPHERAL_ROLE, AdvertisingData, BaseBumbleError, ConnectionParameterUpdateError, @@ -1555,13 +1553,13 @@ class IsoPacketStream: class Connection(CompositeEventEmitter): device: Device handle: int - transport: int + transport: core.PhysicalTransport self_address: hci.Address self_resolvable_address: Optional[hci.Address] peer_address: hci.Address peer_resolvable_address: Optional[hci.Address] peer_le_features: Optional[hci.LeFeatureMask] - role: int + role: hci.Role encryption: int authenticated: bool sc: bool @@ -1674,9 +1672,9 @@ class Connection(CompositeEventEmitter): def role_name(self): if self.role is None: return 'NOT-SET' - if self.role == BT_CENTRAL_ROLE: + if self.role == hci.Role.CENTRAL: return 'CENTRAL' - if self.role == BT_PERIPHERAL_ROLE: + if self.role == hci.Role.PERIPHERAL: return 'PERIPHERAL' return f'UNKNOWN[{self.role}]' @@ -1734,7 +1732,7 @@ class Connection(CompositeEventEmitter): async def encrypt(self, enable: bool = True) -> None: return await self.device.encrypt(self, enable) - async def switch_role(self, role: int) -> None: + async def switch_role(self, role: hci.Role) -> None: return await self.device.switch_role(self, role) async def sustain(self, timeout: Optional[float] = None) -> None: @@ -2713,7 +2711,7 @@ class Device(CompositeEventEmitter): if phy == hci.HCI_LE_1M_PHY: return True - feature_map = { + feature_map: dict[int, hci.LeFeatureMask] = { hci.HCI_LE_2M_PHY: hci.LeFeatureMask.LE_2M_PHY, hci.HCI_LE_CODED_PHY: hci.LeFeatureMask.LE_CODED_PHY, } @@ -2734,7 +2732,7 @@ class Device(CompositeEventEmitter): self, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, target: Optional[hci.Address] = None, - own_address_type: int = hci.OwnAddressType.RANDOM, + own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM, auto_restart: bool = False, advertising_data: Optional[bytes] = None, scan_response_data: Optional[bytes] = None, @@ -3015,7 +3013,7 @@ class Device(CompositeEventEmitter): active: bool = True, scan_interval: float = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms scan_window: float = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms - own_address_type: int = hci.OwnAddressType.RANDOM, + own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM, filter_duplicates: bool = False, scanning_phys: Sequence[int] = (hci.HCI_LE_1M_PHY, hci.HCI_LE_CODED_PHY), ) -> None: @@ -3381,11 +3379,11 @@ class Device(CompositeEventEmitter): async def connect( self, peer_address: Union[hci.Address, str], - transport: int = BT_LE_TRANSPORT, + transport: core.PhysicalTransport = BT_LE_TRANSPORT, connection_parameters_preferences: Optional[ - Dict[int, ConnectionParametersPreferences] + dict[hci.Phy, ConnectionParametersPreferences] ] = None, - own_address_type: int = hci.OwnAddressType.RANDOM, + own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM, timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, always_resolve: bool = False, ) -> Connection: @@ -3433,6 +3431,7 @@ class Device(CompositeEventEmitter): # Check parameters if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT): raise InvalidArgumentError('invalid transport') + transport = core.PhysicalTransport(transport) # Adjust the transport automatically if we need to if transport == BT_LE_TRANSPORT and not self.le_enabled: @@ -3628,7 +3627,7 @@ class Device(CompositeEventEmitter): else: # Save pending connection self.pending_connections[peer_address] = Connection.incomplete( - self, peer_address, BT_CENTRAL_ROLE + self, peer_address, hci.Role.CENTRAL ) # TODO: allow passing other settings @@ -3683,7 +3682,7 @@ class Device(CompositeEventEmitter): async def accept( self, peer_address: Union[hci.Address, str] = hci.Address.ANY, - role: int = BT_PERIPHERAL_ROLE, + role: hci.Role = hci.Role.PERIPHERAL, timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, ) -> Connection: ''' @@ -3769,12 +3768,12 @@ class Device(CompositeEventEmitter): self.on('connection', on_connection) self.on('connection_failure', on_connection_failure) - # Save pending connection, with the Peripheral role. + # Save pending connection, with the Peripheral hci.role. # Even if we requested a role switch in the hci.HCI_Accept_Connection_Request # command, this connection is still considered Peripheral until an eventual # role change event. self.pending_connections[peer_address] = Connection.incomplete( - self, peer_address, BT_PERIPHERAL_ROLE + self, peer_address, hci.Role.PERIPHERAL ) try: @@ -3903,7 +3902,7 @@ class Device(CompositeEventEmitter): ''' if use_l2cap: - if connection.role != BT_PERIPHERAL_ROLE: + if connection.role != hci.Role.PERIPHERAL: raise InvalidStateError( 'only peripheral can update connection parameters with l2cap' ) @@ -4148,10 +4147,10 @@ class Device(CompositeEventEmitter): if keys.ltk: return keys.ltk.value - if connection.role == BT_CENTRAL_ROLE and keys.ltk_central: + if connection.role == hci.Role.CENTRAL and keys.ltk_central: return keys.ltk_central.value - if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral: + if connection.role == hci.Role.PERIPHERAL and keys.ltk_peripheral: return keys.ltk_peripheral.value return None @@ -4303,7 +4302,7 @@ class Device(CompositeEventEmitter): self.emit('key_store_update') # [Classic only] - async def switch_role(self, connection: Connection, role: int): + async def switch_role(self, connection: Connection, role: hci.Role): pending_role_change = asyncio.get_running_loop().create_future() def on_role_change(new_role): @@ -5178,11 +5177,11 @@ class Device(CompositeEventEmitter): def on_connection( self, connection_handle: int, - transport: int, + transport: core.PhysicalTransport, peer_address: hci.Address, self_resolvable_address: Optional[hci.Address], peer_resolvable_address: Optional[hci.Address], - role: int, + role: hci.Role, connection_parameters: ConnectionParameters, ) -> None: # Convert all-zeros addresses into None. @@ -5225,7 +5224,7 @@ class Device(CompositeEventEmitter): peer_address = resolved_address self_address = None - own_address_type: Optional[int] = None + own_address_type: Optional[hci.OwnAddressType] = None if role == hci.HCI_CENTRAL_ROLE: own_address_type = self.connect_own_address_type assert own_address_type is not None @@ -5353,7 +5352,7 @@ class Device(CompositeEventEmitter): elif self.classic_accept_any: # Save pending connection self.pending_connections[bd_addr] = Connection.incomplete( - self, bd_addr, BT_PERIPHERAL_ROLE + self, bd_addr, hci.Role.PERIPHERAL ) self.host.send_command_sync( diff --git a/bumble/hci.py b/bumble/hci.py index a18d04f8..e720cb8c 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -24,6 +24,7 @@ import logging import secrets import struct from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar +from typing_extensions import Self from bumble import crypto from bumble.colors import color @@ -34,6 +35,7 @@ from bumble.core import ( InvalidArgumentError, InvalidPacketError, ProtocolError, + PhysicalTransport, bit_flags_to_strings, name_or_number, padded_bytes, @@ -94,7 +96,7 @@ def map_class_of_device(class_of_device): ) -def phy_list_to_bits(phys: Optional[Iterable[int]]) -> int: +def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int: if phys is None: return 0 @@ -700,30 +702,22 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS' HCI_COMMAND_STATUS_PENDING = 0 +class Phy(enum.IntEnum): + LE_1M = 1 + LE_2M = 2 + LE_CODED = 3 + + # ACL HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0 HCI_ACL_PB_CONTINUATION = 1 HCI_ACL_PB_FIRST_FLUSHABLE = 2 HCI_ACK_PB_COMPLETE_L2CAP = 3 -# Roles -HCI_CENTRAL_ROLE = 0 -HCI_PERIPHERAL_ROLE = 1 - -HCI_ROLE_NAMES = { - HCI_CENTRAL_ROLE: 'CENTRAL', - HCI_PERIPHERAL_ROLE: 'PERIPHERAL' -} - -# LE PHY Types -HCI_LE_1M_PHY = 1 -HCI_LE_2M_PHY = 2 -HCI_LE_CODED_PHY = 3 - -HCI_LE_PHY_NAMES = { - HCI_LE_1M_PHY: 'LE 1M', - HCI_LE_2M_PHY: 'LE 2M', - HCI_LE_CODED_PHY: 'LE Coded' +HCI_LE_PHY_NAMES: dict[int,str] = { + Phy.LE_1M: 'LE 1M', + Phy.LE_2M: 'LE 2M', + Phy.LE_CODED: 'LE Coded' } HCI_LE_1M_PHY_BIT = 0 @@ -732,19 +726,13 @@ HCI_LE_CODED_PHY_BIT = 2 HCI_LE_PHY_BIT_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY'] -HCI_LE_PHY_TYPE_TO_BIT = { - HCI_LE_1M_PHY: HCI_LE_1M_PHY_BIT, - HCI_LE_2M_PHY: HCI_LE_2M_PHY_BIT, - HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT +HCI_LE_PHY_TYPE_TO_BIT: dict[Phy, int] = { + Phy.LE_1M: HCI_LE_1M_PHY_BIT, + Phy.LE_2M: HCI_LE_2M_PHY_BIT, + Phy.LE_CODED: HCI_LE_CODED_PHY_BIT, } -class Phy(enum.IntEnum): - LE_1M = HCI_LE_1M_PHY - LE_2M = HCI_LE_2M_PHY - LE_CODED = HCI_LE_CODED_PHY - - class PhyBit(enum.IntFlag): LE_1M = 1 << HCI_LE_1M_PHY_BIT LE_2M = 1 << HCI_LE_2M_PHY_BIT @@ -811,6 +799,19 @@ class CsSubeventAbortReason(OpenIntEnum): SCHEDULING_CONFLICT_OR_LIMITED_RESOURCES = 0x03 UNSPECIFIED = 0x0F +class Role(enum.IntEnum): + CENTRAL = 0 + PERIPHERAL = 1 + +# For Backward Compatibility. +HCI_CENTRAL_ROLE = Role.CENTRAL +HCI_PERIPHERAL_ROLE = Role.PERIPHERAL + + +HCI_LE_1M_PHY = Phy.LE_1M +HCI_LE_2M_PHY = Phy.LE_2M +HCI_LE_CODED_PHY = Phy.LE_CODED + # Connection Parameters HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25 @@ -889,10 +890,15 @@ HCI_LINK_TYPE_NAMES = { } # Address types -HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00 -HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01 -HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02 -HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03 +class AddressType(OpenIntEnum): + PUBLIC_DEVICE = 0x00 + RANDOM_DEVICE = 0x01 + PUBLIC_IDENTITY = 0x02 + RANDOM_IDENTITY = 0x03 + # (Directed Only) Address is RPA, but controller cannot resolve. + UNABLE_TO_RESOLVE = 0xFE + # (Extended Only) No address. + ANONYMOUS = 0xFF # Supported Commands Masks # See Bluetooth spec @ 6.27 SUPPORTED COMMANDS @@ -1582,8 +1588,8 @@ class HCI_Constant: return HCI_ERROR_NAMES.get(status, f'0x{status:02X}') @staticmethod - def role_name(role): - return HCI_ROLE_NAMES.get(role, str(role)) + def role_name(role: int) -> str: + return Role(role).name @staticmethod def le_phy_name(phy): @@ -1949,17 +1955,10 @@ class Address: address[0] is the LSB of the address, address[5] is the MSB. ''' - PUBLIC_DEVICE_ADDRESS = 0x00 - RANDOM_DEVICE_ADDRESS = 0x01 - PUBLIC_IDENTITY_ADDRESS = 0x02 - RANDOM_IDENTITY_ADDRESS = 0x03 - - ADDRESS_TYPE_NAMES = { - PUBLIC_DEVICE_ADDRESS: 'PUBLIC_DEVICE_ADDRESS', - RANDOM_DEVICE_ADDRESS: 'RANDOM_DEVICE_ADDRESS', - PUBLIC_IDENTITY_ADDRESS: 'PUBLIC_IDENTITY_ADDRESS', - RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS', - } + PUBLIC_DEVICE_ADDRESS = AddressType.PUBLIC_DEVICE + RANDOM_DEVICE_ADDRESS = AddressType.RANDOM_DEVICE + PUBLIC_IDENTITY_ADDRESS = AddressType.PUBLIC_IDENTITY + RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY # Type declarations NIL: Address @@ -1969,40 +1968,44 @@ class Address: # pylint: disable-next=unnecessary-lambda ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)} - @staticmethod - def address_type_name(address_type): - return name_or_number(Address.ADDRESS_TYPE_NAMES, address_type) + @classmethod + def address_type_name(cls: type[Self], address_type: int) -> str: + return AddressType(address_type).name - @staticmethod - def from_string_for_transport(string, transport): + @classmethod + def from_string_for_transport( + cls: type[Self], string: str, transport: PhysicalTransport + ) -> Self: if transport == BT_BR_EDR_TRANSPORT: address_type = Address.PUBLIC_DEVICE_ADDRESS else: address_type = Address.RANDOM_DEVICE_ADDRESS - return Address(string, address_type) + return cls(string, address_type) - @staticmethod - def parse_address(data, offset): + @classmethod + def parse_address(cls: type[Self], data: bytes, offset: int) -> tuple[int, Self]: # Fix the type to a default value. This is used for parsing type-less Classic # addresses - return Address.parse_address_with_type( - data, offset, Address.PUBLIC_DEVICE_ADDRESS - ) + return cls.parse_address_with_type(data, offset, Address.PUBLIC_DEVICE_ADDRESS) - @staticmethod - def parse_random_address(data, offset): - return Address.parse_address_with_type( - data, offset, Address.RANDOM_DEVICE_ADDRESS - ) + @classmethod + def parse_random_address( + cls: type[Self], data: bytes, offset: int + ) -> tuple[int, Self]: + return cls.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS) - @staticmethod - def parse_address_with_type(data, offset, address_type): - return offset + 6, Address(data[offset : offset + 6], address_type) + @classmethod + def parse_address_with_type( + cls: type[Self], data: bytes, offset: int, address_type: AddressType + ) -> tuple[int, Self]: + return offset + 6, cls(data[offset : offset + 6], address_type) - @staticmethod - def parse_address_preceded_by_type(data, offset): - address_type = data[offset - 1] - return Address.parse_address_with_type(data, offset, address_type) + @classmethod + def parse_address_preceded_by_type( + cls: type[Self], data: bytes, offset: int + ) -> tuple[int, Self]: + address_type = AddressType(data[offset - 1]) + return cls.parse_address_with_type(data, offset, address_type) @classmethod def generate_static_address(cls) -> Address: @@ -2042,8 +2045,10 @@ class Address: ) def __init__( - self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS - ): + self, + address: Union[bytes, str], + address_type: AddressType = RANDOM_DEVICE_ADDRESS, + ) -> None: ''' Initialize an instance. `address` may be a byte array in little-endian format, or a hex string in big-endian format (with optional ':' diff --git a/bumble/host.py b/bumble/host.py index c369afb3..f86c5b28 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -44,6 +44,7 @@ from bumble import hci from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT, + PhysicalTransport, ConnectionPHY, ConnectionParameters, ) @@ -186,7 +187,11 @@ class DataPacketQueue(pyee.EventEmitter): # ----------------------------------------------------------------------------- class Connection: def __init__( - self, host: Host, handle: int, peer_address: hci.Address, transport: int + self, + host: Host, + handle: int, + peer_address: hci.Address, + transport: PhysicalTransport, ): self.host = host self.handle = handle @@ -979,7 +984,7 @@ class Host(AbortableEventEmitter): event.peer_address, getattr(event, 'local_resolvable_private_address', None), getattr(event, 'peer_resolvable_private_address', None), - event.role, + hci.Role(event.role), connection_parameters, ) else: @@ -1337,7 +1342,7 @@ class Host(AbortableEventEmitter): f'role change for {event.bd_addr}: ' f'{hci.HCI_Constant.role_name(event.new_role)}' ) - self.emit('role_change', event.bd_addr, event.new_role) + self.emit('role_change', event.bd_addr, hci.Role(event.new_role)) else: logger.debug( f'role change for {event.bd_addr} failed: ' diff --git a/bumble/l2cap.py b/bumble/l2cap.py index 4a49f793..fdd76fc5 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -42,7 +42,6 @@ from typing import ( from .utils import deprecated from .colors import color from .core import ( - BT_CENTRAL_ROLE, InvalidStateError, InvalidArgumentError, InvalidPacketError, @@ -52,6 +51,7 @@ from .core import ( from .hci import ( HCI_LE_Connection_Update_Command, HCI_Object, + Role, key_with_value, name_or_number, ) @@ -1908,7 +1908,7 @@ class ChannelManager: def on_l2cap_connection_parameter_update_request( self, connection: Connection, cid: int, request ): - if connection.role == BT_CENTRAL_ROLE: + if connection.role == Role.CENTRAL: self.send_control_frame( connection, cid, diff --git a/bumble/link.py b/bumble/link.py index 9c7f4664..a24247a2 100644 --- a/bumble/link.py +++ b/bumble/link.py @@ -20,7 +20,6 @@ import asyncio from functools import partial from bumble.core import ( - BT_PERIPHERAL_ROLE, BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT, InvalidStateError, @@ -28,6 +27,7 @@ from bumble.core import ( from bumble.colors import color from bumble.hci import ( Address, + Role, HCI_SUCCESS, HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR, HCI_CONNECTION_TIMEOUT_ERROR, @@ -292,7 +292,7 @@ class LocalLink: return async def task(): - if responder_role != BT_PERIPHERAL_ROLE: + if responder_role != Role.PERIPHERAL: initiator_controller.on_classic_role_change( responder_controller.public_address, int(not (responder_role)) ) diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py index f6e8ab9b..9c019525 100644 --- a/bumble/pandora/host.py +++ b/bumble/pandora/host.py @@ -25,7 +25,6 @@ from .config import Config from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT, - BT_PERIPHERAL_ROLE, UUID, AdvertisingData, Appearance, @@ -47,6 +46,8 @@ from bumble.hci import ( HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, Address, Phy, + Role, + OwnAddressType, ) from google.protobuf import any_pb2 # pytype: disable=pyi-error from google.protobuf import empty_pb2 # pytype: disable=pyi-error @@ -114,11 +115,11 @@ SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = { SECONDARY_CODED: Phy.LE_CODED, } -OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = { - host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC, - host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM, - host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC, - host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM, +OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, OwnAddressType] = { + host_pb2.PUBLIC: OwnAddressType.PUBLIC, + host_pb2.RANDOM: OwnAddressType.RANDOM, + host_pb2.RESOLVABLE_OR_PUBLIC: OwnAddressType.RESOLVABLE_OR_PUBLIC, + host_pb2.RESOLVABLE_OR_RANDOM: OwnAddressType.RESOLVABLE_OR_RANDOM, } @@ -250,7 +251,7 @@ class HostService(HostServicer): connection = await self.device.connect( address, transport=BT_LE_TRANSPORT, - own_address_type=request.own_address_type, + own_address_type=OwnAddressType(request.own_address_type), ) except ConnectionError as e: if e.error_code == HCI_PAGE_TIMEOUT_ERROR: @@ -378,7 +379,7 @@ class HostService(HostServicer): def on_connection(connection: bumble.device.Connection) -> None: if ( connection.transport == BT_LE_TRANSPORT - and connection.role == BT_PERIPHERAL_ROLE + and connection.role == Role.PERIPHERAL ): connections.put_nowait(connection) @@ -496,7 +497,7 @@ class HostService(HostServicer): def on_connection(connection: bumble.device.Connection) -> None: if ( connection.transport == BT_LE_TRANSPORT - and connection.role == BT_PERIPHERAL_ROLE + and connection.role == Role.PERIPHERAL ): connections.put_nowait(connection) @@ -509,7 +510,7 @@ class HostService(HostServicer): await self.device.start_advertising( target=target, advertising_type=advertising_type, - own_address_type=request.own_address_type, + own_address_type=OwnAddressType(request.own_address_type), ) if not request.connectable: @@ -558,7 +559,7 @@ class HostService(HostServicer): await self.device.start_scanning( legacy=request.legacy, active=not request.passive, - own_address_type=request.own_address_type, + own_address_type=OwnAddressType(request.own_address_type), scan_interval=( int(request.interval) if request.interval diff --git a/bumble/pandora/security.py b/bumble/pandora/security.py index 2cbb78af..4e295391 100644 --- a/bumble/pandora/security.py +++ b/bumble/pandora/security.py @@ -24,11 +24,10 @@ from bumble import hci from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT, - BT_PERIPHERAL_ROLE, ProtocolError, ) from bumble.device import Connection as BumbleConnection, Device -from bumble.hci import HCI_Error +from bumble.hci import HCI_Error, Role from bumble.utils import EventWatcher from bumble.pairing import PairingConfig, PairingDelegate as BasePairingDelegate from google.protobuf import any_pb2 # pytype: disable=pyi-error @@ -318,7 +317,7 @@ class SecurityService(SecurityServicer): if ( connection.transport == BT_LE_TRANSPORT - and connection.role == BT_PERIPHERAL_ROLE + and connection.role == Role.PERIPHERAL ): connection.request_pairing() else: diff --git a/bumble/pandora/utils.py b/bumble/pandora/utils.py index fba4b72a..a6171a96 100644 --- a/bumble/pandora/utils.py +++ b/bumble/pandora/utils.py @@ -20,11 +20,11 @@ import inspect import logging from bumble.device import Device -from bumble.hci import Address +from bumble.hci import Address, AddressType from google.protobuf.message import Message # pytype: disable=pyi-error from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple -ADDRESS_TYPES: Dict[str, int] = { +ADDRESS_TYPES: Dict[str, AddressType] = { "public": Address.PUBLIC_DEVICE_ADDRESS, "random": Address.RANDOM_DEVICE_ADDRESS, "public_identity": Address.PUBLIC_IDENTITY_ADDRESS, diff --git a/bumble/smp.py b/bumble/smp.py index 344959a1..2c66393c 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -46,13 +46,13 @@ from pyee import EventEmitter from .colors import color from .hci import ( Address, + Role, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value, ) from .core import ( BT_BR_EDR_TRANSPORT, - BT_CENTRAL_ROLE, BT_LE_TRANSPORT, AdvertisingData, InvalidArgumentError, @@ -1975,7 +1975,7 @@ class Manager(EventEmitter): # Look for a session with this connection, and create one if none exists if not (session := self.sessions.get(connection.handle)): - if connection.role == BT_CENTRAL_ROLE: + if connection.role == Role.CENTRAL: logger.warning('Remote starts pairing as Peripheral!') pairing_config = self.pairing_config_factory(connection) session = self.session_proxy( @@ -1995,7 +1995,7 @@ class Manager(EventEmitter): async def pair(self, connection: Connection) -> None: # TODO: check if there's already a session for this connection - if connection.role != BT_CENTRAL_ROLE: + if connection.role != Role.CENTRAL: logger.warning('Start pairing as Peripheral!') pairing_config = self.pairing_config_factory(connection) session = self.session_proxy( diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index 8bfd8b61..97188539 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -115,9 +115,7 @@ async def open_usb_transport(spec: str) -> Transport: self.acl_out = acl_out self.acl_out_transfer = device.getTransfer() self.acl_out_transfer_ready = asyncio.Semaphore(1) - self.packets: asyncio.Queue[bytes] = ( - asyncio.Queue() - ) # Queue of packets waiting to be sent + self.packets = asyncio.Queue[bytes]() # Queue of packets waiting to be sent self.loop = asyncio.get_running_loop() self.queue_task = None self.cancel_done = self.loop.create_future() diff --git a/tests/device_test.py b/tests/device_test.py index 44404efd..ffb407cf 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -24,7 +24,6 @@ import pytest from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT, - BT_PERIPHERAL_ROLE, ConnectionParameters, ) from bumble.device import ( @@ -43,6 +42,7 @@ from bumble.hci import ( HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR, Address, OwnAddressType, + Role, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, @@ -295,7 +295,7 @@ async def test_legacy_advertising_disconnection(auto_restart): peer_address, None, None, - BT_PERIPHERAL_ROLE, + Role.PERIPHERAL, ConnectionParameters(0, 0, 0), ) @@ -353,7 +353,7 @@ async def test_extended_advertising_connection(own_address_type): peer_address, None, None, - BT_PERIPHERAL_ROLE, + Role.PERIPHERAL, ConnectionParameters(0, 0, 0), ) device.on_advertising_set_termination( @@ -397,7 +397,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type): Address('F0:F1:F2:F3:F4:F5'), None, None, - BT_PERIPHERAL_ROLE, + Role.PERIPHERAL, ConnectionParameters(0, 0, 0), ) diff --git a/tests/self_test.py b/tests/self_test.py index 6654a546..1853433b 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -24,7 +24,7 @@ import pytest from unittest.mock import AsyncMock, MagicMock, patch from bumble.controller import Controller -from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE +from bumble.core import BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT from bumble.link import LocalLink from bumble.device import Device, Peer from bumble.host import Host @@ -39,6 +39,7 @@ from bumble.smp import ( ) from bumble.core import ProtocolError from bumble.keys import PairingKeys +from bumble.hci import Role # ----------------------------------------------------------------------------- @@ -111,7 +112,7 @@ async def test_self_connection(): @pytest.mark.asyncio @pytest.mark.parametrize( 'responder_role,', - (BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE), + (Role.CENTRAL, Role.PERIPHERAL), ) async def test_self_classic_connection(responder_role): # Create two devices, each with a controller, attached to the same link