Merge pull request #332 from zxzxwu/role

Enumify: PhysicalTransport, Role, AddressType
This commit is contained in:
zxzxwu
2025-03-10 00:04:18 +08:00
committed by GitHub
14 changed files with 151 additions and 143 deletions

View File

@@ -25,8 +25,6 @@ import random
import struct
from bumble.colors import color
from bumble.core import (
BT_CENTRAL_ROLE,
BT_PERIPHERAL_ROLE,
BT_LE_TRANSPORT,
BT_BR_EDR_TRANSPORT,
)
@@ -47,6 +45,7 @@ from bumble.hci import (
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
HCI_VERSION_BLUETOOTH_CORE_5_0,
Address,
Role,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command_Complete_Event,
@@ -98,7 +97,7 @@ class CisLink:
class Connection:
controller: Controller
handle: int
role: int
role: Role
peer_address: Address
link: Any
transport: int
@@ -390,7 +389,7 @@ class Controller:
connection = Connection(
controller=self,
handle=connection_handle,
role=BT_PERIPHERAL_ROLE,
role=Role.PERIPHERAL,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
@@ -450,7 +449,7 @@ class Controller:
connection = Connection(
controller=self,
handle=connection_handle,
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
@@ -469,7 +468,7 @@ class Controller:
HCI_LE_Connection_Complete_Event(
status=status,
connection_handle=connection.handle if connection else 0,
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address_type=le_create_connection_command.peer_address_type,
peer_address=le_create_connection_command.peer_address,
connection_interval=le_create_connection_command.connection_interval_min,
@@ -693,7 +692,7 @@ class Controller:
controller=self,
handle=connection_handle,
# Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
@@ -761,7 +760,7 @@ class Controller:
controller=self,
handle=connection_handle,
# Role doesn't matter in SCO.
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,

View File

@@ -31,11 +31,12 @@ from bumble.utils import OpenIntEnum
# -----------------------------------------------------------------------------
# fmt: off
BT_CENTRAL_ROLE = 0
BT_PERIPHERAL_ROLE = 1
class PhysicalTransport(enum.IntEnum):
BR_EDR = 0
LE = 1
BT_BR_EDR_TRANSPORT = 0
BT_LE_TRANSPORT = 1
BT_BR_EDR_TRANSPORT = PhysicalTransport.BR_EDR
BT_LE_TRANSPORT = PhysicalTransport.LE
# fmt: on

View File

@@ -58,9 +58,7 @@ from .host import DataPacketQueue, Host
from .profiles.gap import GenericAccessService
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
AdvertisingData,
BaseBumbleError,
ConnectionParameterUpdateError,
@@ -1555,13 +1553,13 @@ class IsoPacketStream:
class Connection(CompositeEventEmitter):
device: Device
handle: int
transport: int
transport: core.PhysicalTransport
self_address: hci.Address
self_resolvable_address: Optional[hci.Address]
peer_address: hci.Address
peer_resolvable_address: Optional[hci.Address]
peer_le_features: Optional[hci.LeFeatureMask]
role: int
role: hci.Role
encryption: int
authenticated: bool
sc: bool
@@ -1674,9 +1672,9 @@ class Connection(CompositeEventEmitter):
def role_name(self):
if self.role is None:
return 'NOT-SET'
if self.role == BT_CENTRAL_ROLE:
if self.role == hci.Role.CENTRAL:
return 'CENTRAL'
if self.role == BT_PERIPHERAL_ROLE:
if self.role == hci.Role.PERIPHERAL:
return 'PERIPHERAL'
return f'UNKNOWN[{self.role}]'
@@ -1734,7 +1732,7 @@ class Connection(CompositeEventEmitter):
async def encrypt(self, enable: bool = True) -> None:
return await self.device.encrypt(self, enable)
async def switch_role(self, role: int) -> None:
async def switch_role(self, role: hci.Role) -> None:
return await self.device.switch_role(self, role)
async def sustain(self, timeout: Optional[float] = None) -> None:
@@ -2713,7 +2711,7 @@ class Device(CompositeEventEmitter):
if phy == hci.HCI_LE_1M_PHY:
return True
feature_map = {
feature_map: dict[int, hci.LeFeatureMask] = {
hci.HCI_LE_2M_PHY: hci.LeFeatureMask.LE_2M_PHY,
hci.HCI_LE_CODED_PHY: hci.LeFeatureMask.LE_CODED_PHY,
}
@@ -2734,7 +2732,7 @@ class Device(CompositeEventEmitter):
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[hci.Address] = None,
own_address_type: int = hci.OwnAddressType.RANDOM,
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
auto_restart: bool = False,
advertising_data: Optional[bytes] = None,
scan_response_data: Optional[bytes] = None,
@@ -3015,7 +3013,7 @@ class Device(CompositeEventEmitter):
active: bool = True,
scan_interval: float = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window: float = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type: int = hci.OwnAddressType.RANDOM,
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
filter_duplicates: bool = False,
scanning_phys: Sequence[int] = (hci.HCI_LE_1M_PHY, hci.HCI_LE_CODED_PHY),
) -> None:
@@ -3381,11 +3379,11 @@ class Device(CompositeEventEmitter):
async def connect(
self,
peer_address: Union[hci.Address, str],
transport: int = BT_LE_TRANSPORT,
transport: core.PhysicalTransport = BT_LE_TRANSPORT,
connection_parameters_preferences: Optional[
Dict[int, ConnectionParametersPreferences]
dict[hci.Phy, ConnectionParametersPreferences]
] = None,
own_address_type: int = hci.OwnAddressType.RANDOM,
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
always_resolve: bool = False,
) -> Connection:
@@ -3433,6 +3431,7 @@ class Device(CompositeEventEmitter):
# Check parameters
if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT):
raise InvalidArgumentError('invalid transport')
transport = core.PhysicalTransport(transport)
# Adjust the transport automatically if we need to
if transport == BT_LE_TRANSPORT and not self.le_enabled:
@@ -3628,7 +3627,7 @@ class Device(CompositeEventEmitter):
else:
# Save pending connection
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address, BT_CENTRAL_ROLE
self, peer_address, hci.Role.CENTRAL
)
# TODO: allow passing other settings
@@ -3683,7 +3682,7 @@ class Device(CompositeEventEmitter):
async def accept(
self,
peer_address: Union[hci.Address, str] = hci.Address.ANY,
role: int = BT_PERIPHERAL_ROLE,
role: hci.Role = hci.Role.PERIPHERAL,
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
) -> Connection:
'''
@@ -3769,12 +3768,12 @@ class Device(CompositeEventEmitter):
self.on('connection', on_connection)
self.on('connection_failure', on_connection_failure)
# Save pending connection, with the Peripheral role.
# Save pending connection, with the Peripheral hci.role.
# Even if we requested a role switch in the hci.HCI_Accept_Connection_Request
# command, this connection is still considered Peripheral until an eventual
# role change event.
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address, BT_PERIPHERAL_ROLE
self, peer_address, hci.Role.PERIPHERAL
)
try:
@@ -3903,7 +3902,7 @@ class Device(CompositeEventEmitter):
'''
if use_l2cap:
if connection.role != BT_PERIPHERAL_ROLE:
if connection.role != hci.Role.PERIPHERAL:
raise InvalidStateError(
'only peripheral can update connection parameters with l2cap'
)
@@ -4148,10 +4147,10 @@ class Device(CompositeEventEmitter):
if keys.ltk:
return keys.ltk.value
if connection.role == BT_CENTRAL_ROLE and keys.ltk_central:
if connection.role == hci.Role.CENTRAL and keys.ltk_central:
return keys.ltk_central.value
if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral:
if connection.role == hci.Role.PERIPHERAL and keys.ltk_peripheral:
return keys.ltk_peripheral.value
return None
@@ -4303,7 +4302,7 @@ class Device(CompositeEventEmitter):
self.emit('key_store_update')
# [Classic only]
async def switch_role(self, connection: Connection, role: int):
async def switch_role(self, connection: Connection, role: hci.Role):
pending_role_change = asyncio.get_running_loop().create_future()
def on_role_change(new_role):
@@ -5178,11 +5177,11 @@ class Device(CompositeEventEmitter):
def on_connection(
self,
connection_handle: int,
transport: int,
transport: core.PhysicalTransport,
peer_address: hci.Address,
self_resolvable_address: Optional[hci.Address],
peer_resolvable_address: Optional[hci.Address],
role: int,
role: hci.Role,
connection_parameters: ConnectionParameters,
) -> None:
# Convert all-zeros addresses into None.
@@ -5225,7 +5224,7 @@ class Device(CompositeEventEmitter):
peer_address = resolved_address
self_address = None
own_address_type: Optional[int] = None
own_address_type: Optional[hci.OwnAddressType] = None
if role == hci.HCI_CENTRAL_ROLE:
own_address_type = self.connect_own_address_type
assert own_address_type is not None
@@ -5353,7 +5352,7 @@ class Device(CompositeEventEmitter):
elif self.classic_accept_any:
# Save pending connection
self.pending_connections[bd_addr] = Connection.incomplete(
self, bd_addr, BT_PERIPHERAL_ROLE
self, bd_addr, hci.Role.PERIPHERAL
)
self.host.send_command_sync(

View File

@@ -24,6 +24,7 @@ import logging
import secrets
import struct
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar
from typing_extensions import Self
from bumble import crypto
from bumble.colors import color
@@ -34,6 +35,7 @@ from bumble.core import (
InvalidArgumentError,
InvalidPacketError,
ProtocolError,
PhysicalTransport,
bit_flags_to_strings,
name_or_number,
padded_bytes,
@@ -94,7 +96,7 @@ def map_class_of_device(class_of_device):
)
def phy_list_to_bits(phys: Optional[Iterable[int]]) -> int:
def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int:
if phys is None:
return 0
@@ -700,30 +702,22 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
HCI_COMMAND_STATUS_PENDING = 0
class Phy(enum.IntEnum):
LE_1M = 1
LE_2M = 2
LE_CODED = 3
# ACL
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
HCI_ACL_PB_CONTINUATION = 1
HCI_ACL_PB_FIRST_FLUSHABLE = 2
HCI_ACK_PB_COMPLETE_L2CAP = 3
# Roles
HCI_CENTRAL_ROLE = 0
HCI_PERIPHERAL_ROLE = 1
HCI_ROLE_NAMES = {
HCI_CENTRAL_ROLE: 'CENTRAL',
HCI_PERIPHERAL_ROLE: 'PERIPHERAL'
}
# LE PHY Types
HCI_LE_1M_PHY = 1
HCI_LE_2M_PHY = 2
HCI_LE_CODED_PHY = 3
HCI_LE_PHY_NAMES = {
HCI_LE_1M_PHY: 'LE 1M',
HCI_LE_2M_PHY: 'LE 2M',
HCI_LE_CODED_PHY: 'LE Coded'
HCI_LE_PHY_NAMES: dict[int,str] = {
Phy.LE_1M: 'LE 1M',
Phy.LE_2M: 'LE 2M',
Phy.LE_CODED: 'LE Coded'
}
HCI_LE_1M_PHY_BIT = 0
@@ -732,19 +726,13 @@ HCI_LE_CODED_PHY_BIT = 2
HCI_LE_PHY_BIT_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY']
HCI_LE_PHY_TYPE_TO_BIT = {
HCI_LE_1M_PHY: HCI_LE_1M_PHY_BIT,
HCI_LE_2M_PHY: HCI_LE_2M_PHY_BIT,
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT
HCI_LE_PHY_TYPE_TO_BIT: dict[Phy, int] = {
Phy.LE_1M: HCI_LE_1M_PHY_BIT,
Phy.LE_2M: HCI_LE_2M_PHY_BIT,
Phy.LE_CODED: HCI_LE_CODED_PHY_BIT,
}
class Phy(enum.IntEnum):
LE_1M = HCI_LE_1M_PHY
LE_2M = HCI_LE_2M_PHY
LE_CODED = HCI_LE_CODED_PHY
class PhyBit(enum.IntFlag):
LE_1M = 1 << HCI_LE_1M_PHY_BIT
LE_2M = 1 << HCI_LE_2M_PHY_BIT
@@ -811,6 +799,19 @@ class CsSubeventAbortReason(OpenIntEnum):
SCHEDULING_CONFLICT_OR_LIMITED_RESOURCES = 0x03
UNSPECIFIED = 0x0F
class Role(enum.IntEnum):
CENTRAL = 0
PERIPHERAL = 1
# For Backward Compatibility.
HCI_CENTRAL_ROLE = Role.CENTRAL
HCI_PERIPHERAL_ROLE = Role.PERIPHERAL
HCI_LE_1M_PHY = Phy.LE_1M
HCI_LE_2M_PHY = Phy.LE_2M
HCI_LE_CODED_PHY = Phy.LE_CODED
# Connection Parameters
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
@@ -889,10 +890,15 @@ HCI_LINK_TYPE_NAMES = {
}
# Address types
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00
HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01
HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02
HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03
class AddressType(OpenIntEnum):
PUBLIC_DEVICE = 0x00
RANDOM_DEVICE = 0x01
PUBLIC_IDENTITY = 0x02
RANDOM_IDENTITY = 0x03
# (Directed Only) Address is RPA, but controller cannot resolve.
UNABLE_TO_RESOLVE = 0xFE
# (Extended Only) No address.
ANONYMOUS = 0xFF
# Supported Commands Masks
# See Bluetooth spec @ 6.27 SUPPORTED COMMANDS
@@ -1582,8 +1588,8 @@ class HCI_Constant:
return HCI_ERROR_NAMES.get(status, f'0x{status:02X}')
@staticmethod
def role_name(role):
return HCI_ROLE_NAMES.get(role, str(role))
def role_name(role: int) -> str:
return Role(role).name
@staticmethod
def le_phy_name(phy):
@@ -1949,17 +1955,10 @@ class Address:
address[0] is the LSB of the address, address[5] is the MSB.
'''
PUBLIC_DEVICE_ADDRESS = 0x00
RANDOM_DEVICE_ADDRESS = 0x01
PUBLIC_IDENTITY_ADDRESS = 0x02
RANDOM_IDENTITY_ADDRESS = 0x03
ADDRESS_TYPE_NAMES = {
PUBLIC_DEVICE_ADDRESS: 'PUBLIC_DEVICE_ADDRESS',
RANDOM_DEVICE_ADDRESS: 'RANDOM_DEVICE_ADDRESS',
PUBLIC_IDENTITY_ADDRESS: 'PUBLIC_IDENTITY_ADDRESS',
RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS',
}
PUBLIC_DEVICE_ADDRESS = AddressType.PUBLIC_DEVICE
RANDOM_DEVICE_ADDRESS = AddressType.RANDOM_DEVICE
PUBLIC_IDENTITY_ADDRESS = AddressType.PUBLIC_IDENTITY
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
# Type declarations
NIL: Address
@@ -1969,40 +1968,44 @@ class Address:
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@staticmethod
def address_type_name(address_type):
return name_or_number(Address.ADDRESS_TYPE_NAMES, address_type)
@classmethod
def address_type_name(cls: type[Self], address_type: int) -> str:
return AddressType(address_type).name
@staticmethod
def from_string_for_transport(string, transport):
@classmethod
def from_string_for_transport(
cls: type[Self], string: str, transport: PhysicalTransport
) -> Self:
if transport == BT_BR_EDR_TRANSPORT:
address_type = Address.PUBLIC_DEVICE_ADDRESS
else:
address_type = Address.RANDOM_DEVICE_ADDRESS
return Address(string, address_type)
return cls(string, address_type)
@staticmethod
def parse_address(data, offset):
@classmethod
def parse_address(cls: type[Self], data: bytes, offset: int) -> tuple[int, Self]:
# Fix the type to a default value. This is used for parsing type-less Classic
# addresses
return Address.parse_address_with_type(
data, offset, Address.PUBLIC_DEVICE_ADDRESS
)
return cls.parse_address_with_type(data, offset, Address.PUBLIC_DEVICE_ADDRESS)
@staticmethod
def parse_random_address(data, offset):
return Address.parse_address_with_type(
data, offset, Address.RANDOM_DEVICE_ADDRESS
)
@classmethod
def parse_random_address(
cls: type[Self], data: bytes, offset: int
) -> tuple[int, Self]:
return cls.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS)
@staticmethod
def parse_address_with_type(data, offset, address_type):
return offset + 6, Address(data[offset : offset + 6], address_type)
@classmethod
def parse_address_with_type(
cls: type[Self], data: bytes, offset: int, address_type: AddressType
) -> tuple[int, Self]:
return offset + 6, cls(data[offset : offset + 6], address_type)
@staticmethod
def parse_address_preceded_by_type(data, offset):
address_type = data[offset - 1]
return Address.parse_address_with_type(data, offset, address_type)
@classmethod
def parse_address_preceded_by_type(
cls: type[Self], data: bytes, offset: int
) -> tuple[int, Self]:
address_type = AddressType(data[offset - 1])
return cls.parse_address_with_type(data, offset, address_type)
@classmethod
def generate_static_address(cls) -> Address:
@@ -2042,8 +2045,10 @@ class Address:
)
def __init__(
self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS
):
self,
address: Union[bytes, str],
address_type: AddressType = RANDOM_DEVICE_ADDRESS,
) -> None:
'''
Initialize an instance. `address` may be a byte array in little-endian
format, or a hex string in big-endian format (with optional ':'

View File

@@ -44,6 +44,7 @@ from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
PhysicalTransport,
ConnectionPHY,
ConnectionParameters,
)
@@ -186,7 +187,11 @@ class DataPacketQueue(pyee.EventEmitter):
# -----------------------------------------------------------------------------
class Connection:
def __init__(
self, host: Host, handle: int, peer_address: hci.Address, transport: int
self,
host: Host,
handle: int,
peer_address: hci.Address,
transport: PhysicalTransport,
):
self.host = host
self.handle = handle
@@ -979,7 +984,7 @@ class Host(AbortableEventEmitter):
event.peer_address,
getattr(event, 'local_resolvable_private_address', None),
getattr(event, 'peer_resolvable_private_address', None),
event.role,
hci.Role(event.role),
connection_parameters,
)
else:
@@ -1337,7 +1342,7 @@ class Host(AbortableEventEmitter):
f'role change for {event.bd_addr}: '
f'{hci.HCI_Constant.role_name(event.new_role)}'
)
self.emit('role_change', event.bd_addr, event.new_role)
self.emit('role_change', event.bd_addr, hci.Role(event.new_role))
else:
logger.debug(
f'role change for {event.bd_addr} failed: '

View File

@@ -42,7 +42,6 @@ from typing import (
from .utils import deprecated
from .colors import color
from .core import (
BT_CENTRAL_ROLE,
InvalidStateError,
InvalidArgumentError,
InvalidPacketError,
@@ -52,6 +51,7 @@ from .core import (
from .hci import (
HCI_LE_Connection_Update_Command,
HCI_Object,
Role,
key_with_value,
name_or_number,
)
@@ -1908,7 +1908,7 @@ class ChannelManager:
def on_l2cap_connection_parameter_update_request(
self, connection: Connection, cid: int, request
):
if connection.role == BT_CENTRAL_ROLE:
if connection.role == Role.CENTRAL:
self.send_control_frame(
connection,
cid,

View File

@@ -20,7 +20,6 @@ import asyncio
from functools import partial
from bumble.core import (
BT_PERIPHERAL_ROLE,
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
InvalidStateError,
@@ -28,6 +27,7 @@ from bumble.core import (
from bumble.colors import color
from bumble.hci import (
Address,
Role,
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
@@ -292,7 +292,7 @@ class LocalLink:
return
async def task():
if responder_role != BT_PERIPHERAL_ROLE:
if responder_role != Role.PERIPHERAL:
initiator_controller.on_classic_role_change(
responder_controller.public_address, int(not (responder_role))
)

View File

@@ -25,7 +25,6 @@ from .config import Config
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
UUID,
AdvertisingData,
Appearance,
@@ -47,6 +46,8 @@ from bumble.hci import (
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address,
Phy,
Role,
OwnAddressType,
)
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
@@ -114,11 +115,11 @@ SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
SECONDARY_CODED: Phy.LE_CODED,
}
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = {
host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC,
host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM,
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, OwnAddressType] = {
host_pb2.PUBLIC: OwnAddressType.PUBLIC,
host_pb2.RANDOM: OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: OwnAddressType.RESOLVABLE_OR_RANDOM,
}
@@ -250,7 +251,7 @@ class HostService(HostServicer):
connection = await self.device.connect(
address,
transport=BT_LE_TRANSPORT,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
)
except ConnectionError as e:
if e.error_code == HCI_PAGE_TIMEOUT_ERROR:
@@ -378,7 +379,7 @@ class HostService(HostServicer):
def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
connections.put_nowait(connection)
@@ -496,7 +497,7 @@ class HostService(HostServicer):
def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
connections.put_nowait(connection)
@@ -509,7 +510,7 @@ class HostService(HostServicer):
await self.device.start_advertising(
target=target,
advertising_type=advertising_type,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
)
if not request.connectable:
@@ -558,7 +559,7 @@ class HostService(HostServicer):
await self.device.start_scanning(
legacy=request.legacy,
active=not request.passive,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
scan_interval=(
int(request.interval)
if request.interval

View File

@@ -24,11 +24,10 @@ from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ProtocolError,
)
from bumble.device import Connection as BumbleConnection, Device
from bumble.hci import HCI_Error
from bumble.hci import HCI_Error, Role
from bumble.utils import EventWatcher
from bumble.pairing import PairingConfig, PairingDelegate as BasePairingDelegate
from google.protobuf import any_pb2 # pytype: disable=pyi-error
@@ -318,7 +317,7 @@ class SecurityService(SecurityServicer):
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
connection.request_pairing()
else:

View File

@@ -20,11 +20,11 @@ import inspect
import logging
from bumble.device import Device
from bumble.hci import Address
from bumble.hci import Address, AddressType
from google.protobuf.message import Message # pytype: disable=pyi-error
from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple
ADDRESS_TYPES: Dict[str, int] = {
ADDRESS_TYPES: Dict[str, AddressType] = {
"public": Address.PUBLIC_DEVICE_ADDRESS,
"random": Address.RANDOM_DEVICE_ADDRESS,
"public_identity": Address.PUBLIC_IDENTITY_ADDRESS,

View File

@@ -46,13 +46,13 @@ from pyee import EventEmitter
from .colors import color
from .hci import (
Address,
Role,
HCI_LE_Enable_Encryption_Command,
HCI_Object,
key_with_value,
)
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
AdvertisingData,
InvalidArgumentError,
@@ -1975,7 +1975,7 @@ class Manager(EventEmitter):
# Look for a session with this connection, and create one if none exists
if not (session := self.sessions.get(connection.handle)):
if connection.role == BT_CENTRAL_ROLE:
if connection.role == Role.CENTRAL:
logger.warning('Remote starts pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
session = self.session_proxy(
@@ -1995,7 +1995,7 @@ class Manager(EventEmitter):
async def pair(self, connection: Connection) -> None:
# TODO: check if there's already a session for this connection
if connection.role != BT_CENTRAL_ROLE:
if connection.role != Role.CENTRAL:
logger.warning('Start pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
session = self.session_proxy(

View File

@@ -115,9 +115,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_out = acl_out
self.acl_out_transfer = device.getTransfer()
self.acl_out_transfer_ready = asyncio.Semaphore(1)
self.packets: asyncio.Queue[bytes] = (
asyncio.Queue()
) # Queue of packets waiting to be sent
self.packets = asyncio.Queue[bytes]() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.queue_task = None
self.cancel_done = self.loop.create_future()

View File

@@ -24,7 +24,6 @@ import pytest
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import (
@@ -43,6 +42,7 @@ from bumble.hci import (
HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR,
Address,
OwnAddressType,
Role,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
@@ -295,7 +295,7 @@ async def test_legacy_advertising_disconnection(auto_restart):
peer_address,
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
@@ -353,7 +353,7 @@ async def test_extended_advertising_connection(own_address_type):
peer_address,
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
@@ -397,7 +397,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
Address('F0:F1:F2:F3:F4:F5'),
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)

View File

@@ -24,7 +24,7 @@ import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
from bumble.core import BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -39,6 +39,7 @@ from bumble.smp import (
)
from bumble.core import ProtocolError
from bumble.keys import PairingKeys
from bumble.hci import Role
# -----------------------------------------------------------------------------
@@ -111,7 +112,7 @@ async def test_self_connection():
@pytest.mark.asyncio
@pytest.mark.parametrize(
'responder_role,',
(BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
(Role.CENTRAL, Role.PERIPHERAL),
)
async def test_self_classic_connection(responder_role):
# Create two devices, each with a controller, attached to the same link