forked from auracaster/bumble_mirror
Enum: PhysicalTransport, Role, AddressType
This commit is contained in:
147
bumble/hci.py
147
bumble/hci.py
@@ -24,6 +24,7 @@ import logging
|
||||
import secrets
|
||||
import struct
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import crypto
|
||||
from bumble.colors import color
|
||||
@@ -34,6 +35,7 @@ from bumble.core import (
|
||||
InvalidArgumentError,
|
||||
InvalidPacketError,
|
||||
ProtocolError,
|
||||
PhysicalTransport,
|
||||
bit_flags_to_strings,
|
||||
name_or_number,
|
||||
padded_bytes,
|
||||
@@ -94,7 +96,7 @@ def map_class_of_device(class_of_device):
|
||||
)
|
||||
|
||||
|
||||
def phy_list_to_bits(phys: Optional[Iterable[int]]) -> int:
|
||||
def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int:
|
||||
if phys is None:
|
||||
return 0
|
||||
|
||||
@@ -700,30 +702,22 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
|
||||
HCI_COMMAND_STATUS_PENDING = 0
|
||||
|
||||
|
||||
class Phy(enum.IntEnum):
|
||||
LE_1M = 1
|
||||
LE_2M = 2
|
||||
LE_CODED = 3
|
||||
|
||||
|
||||
# ACL
|
||||
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
|
||||
HCI_ACL_PB_CONTINUATION = 1
|
||||
HCI_ACL_PB_FIRST_FLUSHABLE = 2
|
||||
HCI_ACK_PB_COMPLETE_L2CAP = 3
|
||||
|
||||
# Roles
|
||||
HCI_CENTRAL_ROLE = 0
|
||||
HCI_PERIPHERAL_ROLE = 1
|
||||
|
||||
HCI_ROLE_NAMES = {
|
||||
HCI_CENTRAL_ROLE: 'CENTRAL',
|
||||
HCI_PERIPHERAL_ROLE: 'PERIPHERAL'
|
||||
}
|
||||
|
||||
# LE PHY Types
|
||||
HCI_LE_1M_PHY = 1
|
||||
HCI_LE_2M_PHY = 2
|
||||
HCI_LE_CODED_PHY = 3
|
||||
|
||||
HCI_LE_PHY_NAMES = {
|
||||
HCI_LE_1M_PHY: 'LE 1M',
|
||||
HCI_LE_2M_PHY: 'LE 2M',
|
||||
HCI_LE_CODED_PHY: 'LE Coded'
|
||||
HCI_LE_PHY_NAMES: dict[int,str] = {
|
||||
Phy.LE_1M: 'LE 1M',
|
||||
Phy.LE_2M: 'LE 2M',
|
||||
Phy.LE_CODED: 'LE Coded'
|
||||
}
|
||||
|
||||
HCI_LE_1M_PHY_BIT = 0
|
||||
@@ -732,19 +726,13 @@ HCI_LE_CODED_PHY_BIT = 2
|
||||
|
||||
HCI_LE_PHY_BIT_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY']
|
||||
|
||||
HCI_LE_PHY_TYPE_TO_BIT = {
|
||||
HCI_LE_1M_PHY: HCI_LE_1M_PHY_BIT,
|
||||
HCI_LE_2M_PHY: HCI_LE_2M_PHY_BIT,
|
||||
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT
|
||||
HCI_LE_PHY_TYPE_TO_BIT: dict[Phy, int] = {
|
||||
Phy.LE_1M: HCI_LE_1M_PHY_BIT,
|
||||
Phy.LE_2M: HCI_LE_2M_PHY_BIT,
|
||||
Phy.LE_CODED: HCI_LE_CODED_PHY_BIT,
|
||||
}
|
||||
|
||||
|
||||
class Phy(enum.IntEnum):
|
||||
LE_1M = HCI_LE_1M_PHY
|
||||
LE_2M = HCI_LE_2M_PHY
|
||||
LE_CODED = HCI_LE_CODED_PHY
|
||||
|
||||
|
||||
class PhyBit(enum.IntFlag):
|
||||
LE_1M = 1 << HCI_LE_1M_PHY_BIT
|
||||
LE_2M = 1 << HCI_LE_2M_PHY_BIT
|
||||
@@ -811,6 +799,19 @@ class CsSubeventAbortReason(OpenIntEnum):
|
||||
SCHEDULING_CONFLICT_OR_LIMITED_RESOURCES = 0x03
|
||||
UNSPECIFIED = 0x0F
|
||||
|
||||
class Role(enum.IntEnum):
|
||||
CENTRAL = 0
|
||||
PERIPHERAL = 1
|
||||
|
||||
# For Backward Compatibility.
|
||||
HCI_CENTRAL_ROLE = Role.CENTRAL
|
||||
HCI_PERIPHERAL_ROLE = Role.PERIPHERAL
|
||||
|
||||
|
||||
HCI_LE_1M_PHY = Phy.LE_1M
|
||||
HCI_LE_2M_PHY = Phy.LE_2M
|
||||
HCI_LE_CODED_PHY = Phy.LE_CODED
|
||||
|
||||
|
||||
# Connection Parameters
|
||||
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
|
||||
@@ -889,10 +890,15 @@ HCI_LINK_TYPE_NAMES = {
|
||||
}
|
||||
|
||||
# Address types
|
||||
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00
|
||||
HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01
|
||||
HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02
|
||||
HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03
|
||||
class AddressType(OpenIntEnum):
|
||||
PUBLIC_DEVICE = 0x00
|
||||
RANDOM_DEVICE = 0x01
|
||||
PUBLIC_IDENTITY = 0x02
|
||||
RANDOM_IDENTITY = 0x03
|
||||
# (Directed Only) Address is RPA, but controller cannot resolve.
|
||||
UNABLE_TO_RESOLVE = 0xFE
|
||||
# (Extended Only) No address.
|
||||
ANONYMOUS = 0xFF
|
||||
|
||||
# Supported Commands Masks
|
||||
# See Bluetooth spec @ 6.27 SUPPORTED COMMANDS
|
||||
@@ -1582,8 +1588,8 @@ class HCI_Constant:
|
||||
return HCI_ERROR_NAMES.get(status, f'0x{status:02X}')
|
||||
|
||||
@staticmethod
|
||||
def role_name(role):
|
||||
return HCI_ROLE_NAMES.get(role, str(role))
|
||||
def role_name(role: int) -> str:
|
||||
return Role(role).name
|
||||
|
||||
@staticmethod
|
||||
def le_phy_name(phy):
|
||||
@@ -1949,17 +1955,10 @@ class Address:
|
||||
address[0] is the LSB of the address, address[5] is the MSB.
|
||||
'''
|
||||
|
||||
PUBLIC_DEVICE_ADDRESS = 0x00
|
||||
RANDOM_DEVICE_ADDRESS = 0x01
|
||||
PUBLIC_IDENTITY_ADDRESS = 0x02
|
||||
RANDOM_IDENTITY_ADDRESS = 0x03
|
||||
|
||||
ADDRESS_TYPE_NAMES = {
|
||||
PUBLIC_DEVICE_ADDRESS: 'PUBLIC_DEVICE_ADDRESS',
|
||||
RANDOM_DEVICE_ADDRESS: 'RANDOM_DEVICE_ADDRESS',
|
||||
PUBLIC_IDENTITY_ADDRESS: 'PUBLIC_IDENTITY_ADDRESS',
|
||||
RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS',
|
||||
}
|
||||
PUBLIC_DEVICE_ADDRESS = AddressType.PUBLIC_DEVICE
|
||||
RANDOM_DEVICE_ADDRESS = AddressType.RANDOM_DEVICE
|
||||
PUBLIC_IDENTITY_ADDRESS = AddressType.PUBLIC_IDENTITY
|
||||
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
|
||||
|
||||
# Type declarations
|
||||
NIL: Address
|
||||
@@ -1969,40 +1968,44 @@ class Address:
|
||||
# pylint: disable-next=unnecessary-lambda
|
||||
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
|
||||
|
||||
@staticmethod
|
||||
def address_type_name(address_type):
|
||||
return name_or_number(Address.ADDRESS_TYPE_NAMES, address_type)
|
||||
@classmethod
|
||||
def address_type_name(cls: type[Self], address_type: int) -> str:
|
||||
return AddressType(address_type).name
|
||||
|
||||
@staticmethod
|
||||
def from_string_for_transport(string, transport):
|
||||
@classmethod
|
||||
def from_string_for_transport(
|
||||
cls: type[Self], string: str, transport: PhysicalTransport
|
||||
) -> Self:
|
||||
if transport == BT_BR_EDR_TRANSPORT:
|
||||
address_type = Address.PUBLIC_DEVICE_ADDRESS
|
||||
else:
|
||||
address_type = Address.RANDOM_DEVICE_ADDRESS
|
||||
return Address(string, address_type)
|
||||
return cls(string, address_type)
|
||||
|
||||
@staticmethod
|
||||
def parse_address(data, offset):
|
||||
@classmethod
|
||||
def parse_address(cls: type[Self], data: bytes, offset: int) -> tuple[int, Self]:
|
||||
# Fix the type to a default value. This is used for parsing type-less Classic
|
||||
# addresses
|
||||
return Address.parse_address_with_type(
|
||||
data, offset, Address.PUBLIC_DEVICE_ADDRESS
|
||||
)
|
||||
return cls.parse_address_with_type(data, offset, Address.PUBLIC_DEVICE_ADDRESS)
|
||||
|
||||
@staticmethod
|
||||
def parse_random_address(data, offset):
|
||||
return Address.parse_address_with_type(
|
||||
data, offset, Address.RANDOM_DEVICE_ADDRESS
|
||||
)
|
||||
@classmethod
|
||||
def parse_random_address(
|
||||
cls: type[Self], data: bytes, offset: int
|
||||
) -> tuple[int, Self]:
|
||||
return cls.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS)
|
||||
|
||||
@staticmethod
|
||||
def parse_address_with_type(data, offset, address_type):
|
||||
return offset + 6, Address(data[offset : offset + 6], address_type)
|
||||
@classmethod
|
||||
def parse_address_with_type(
|
||||
cls: type[Self], data: bytes, offset: int, address_type: AddressType
|
||||
) -> tuple[int, Self]:
|
||||
return offset + 6, cls(data[offset : offset + 6], address_type)
|
||||
|
||||
@staticmethod
|
||||
def parse_address_preceded_by_type(data, offset):
|
||||
address_type = data[offset - 1]
|
||||
return Address.parse_address_with_type(data, offset, address_type)
|
||||
@classmethod
|
||||
def parse_address_preceded_by_type(
|
||||
cls: type[Self], data: bytes, offset: int
|
||||
) -> tuple[int, Self]:
|
||||
address_type = AddressType(data[offset - 1])
|
||||
return cls.parse_address_with_type(data, offset, address_type)
|
||||
|
||||
@classmethod
|
||||
def generate_static_address(cls) -> Address:
|
||||
@@ -2042,8 +2045,10 @@ class Address:
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS
|
||||
):
|
||||
self,
|
||||
address: Union[bytes, str],
|
||||
address_type: AddressType = RANDOM_DEVICE_ADDRESS,
|
||||
) -> None:
|
||||
'''
|
||||
Initialize an instance. `address` may be a byte array in little-endian
|
||||
format, or a hex string in big-endian format (with optional ':'
|
||||
|
||||
Reference in New Issue
Block a user