forked from auracaster/bumble_mirror
Merge pull request #755 from zxzxwu/smp
Migrate SMP commands to dataclasses
This commit is contained in:
248
bumble/smp.py
248
bumble/smp.py
@@ -26,13 +26,15 @@ from __future__ import annotations
|
|||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import enum
|
import enum
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Awaitable,
|
Awaitable,
|
||||||
Callable,
|
Callable,
|
||||||
Optional,
|
Optional,
|
||||||
|
TypeVar,
|
||||||
|
ClassVar,
|
||||||
cast,
|
cast,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -43,7 +45,9 @@ from bumble.hci import (
|
|||||||
Role,
|
Role,
|
||||||
HCI_LE_Enable_Encryption_Command,
|
HCI_LE_Enable_Encryption_Command,
|
||||||
HCI_Object,
|
HCI_Object,
|
||||||
|
Fields,
|
||||||
key_with_value,
|
key_with_value,
|
||||||
|
metadata,
|
||||||
)
|
)
|
||||||
from bumble.core import (
|
from bumble.core import (
|
||||||
PhysicalTransport,
|
PhysicalTransport,
|
||||||
@@ -200,31 +204,32 @@ def error_name(error_code: int) -> str:
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# Classes
|
# Classes
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
class SMP_Command:
|
class SMP_Command:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
|
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
|
||||||
'''
|
'''
|
||||||
|
|
||||||
smp_classes: dict[int, type[SMP_Command]] = {}
|
smp_classes: ClassVar[dict[int, type[SMP_Command]]] = {}
|
||||||
fields: Any
|
fields: ClassVar[Fields]
|
||||||
code = 0
|
code: int = field(default=0, init=False)
|
||||||
name = ''
|
name: str = field(default='', init=False)
|
||||||
|
_payload: Optional[bytes] = field(default=None, init=False)
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def from_bytes(pdu: bytes) -> "SMP_Command":
|
def from_bytes(cls, pdu: bytes) -> "SMP_Command":
|
||||||
code = pdu[0]
|
code = pdu[0]
|
||||||
|
|
||||||
cls = SMP_Command.smp_classes.get(code)
|
subclass = SMP_Command.smp_classes.get(code)
|
||||||
if cls is None:
|
if subclass is None:
|
||||||
instance = SMP_Command(pdu)
|
instance = SMP_Command()
|
||||||
instance.name = SMP_Command.command_name(code)
|
instance.name = SMP_Command.command_name(code)
|
||||||
instance.code = code
|
instance.code = code
|
||||||
|
instance.payload = pdu
|
||||||
return instance
|
return instance
|
||||||
self = cls.__new__(cls)
|
instance = subclass(**HCI_Object.dict_from_bytes(pdu, 1, subclass.fields))
|
||||||
SMP_Command.__init__(self, pdu)
|
instance.payload = pdu[1:]
|
||||||
if hasattr(self, 'fields'):
|
return instance
|
||||||
self.init_from_bytes(pdu, 1)
|
|
||||||
return self
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def command_name(code: int) -> str:
|
def command_name(code: int) -> str:
|
||||||
@@ -264,36 +269,35 @@ class SMP_Command:
|
|||||||
def keypress_notification_type_name(notification_type: int) -> str:
|
def keypress_notification_type_name(notification_type: int) -> str:
|
||||||
return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
|
return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
|
||||||
|
|
||||||
@staticmethod
|
_Command = TypeVar("_Command", bound="SMP_Command")
|
||||||
def subclass(fields):
|
|
||||||
def inner(cls):
|
|
||||||
cls.name = cls.__name__.upper()
|
|
||||||
cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name)
|
|
||||||
if cls.code is None:
|
|
||||||
raise KeyError(
|
|
||||||
f'Command name {cls.name} not found in SMP_COMMAND_NAMES'
|
|
||||||
)
|
|
||||||
cls.fields = fields
|
|
||||||
|
|
||||||
# Register a factory for this class
|
@classmethod
|
||||||
SMP_Command.smp_classes[cls.code] = cls
|
def subclass(cls, subclass: type[_Command]) -> type[_Command]:
|
||||||
|
subclass.name = subclass.__name__.upper()
|
||||||
|
subclass.code = key_with_value(SMP_COMMAND_NAMES, subclass.name)
|
||||||
|
if subclass.code is None:
|
||||||
|
raise KeyError(
|
||||||
|
f'Command name {subclass.name} not found in SMP_COMMAND_NAMES'
|
||||||
|
)
|
||||||
|
subclass.fields = HCI_Object.fields_from_dataclass(subclass)
|
||||||
|
|
||||||
return cls
|
# Register a factory for this class
|
||||||
|
SMP_Command.smp_classes[subclass.code] = subclass
|
||||||
|
|
||||||
return inner
|
return subclass
|
||||||
|
|
||||||
def __init__(self, pdu: Optional[bytes] = None, **kwargs: Any) -> None:
|
@property
|
||||||
if hasattr(self, 'fields') and kwargs:
|
def payload(self) -> bytes:
|
||||||
HCI_Object.init_from_fields(self, self.fields, kwargs)
|
if self._payload is None:
|
||||||
if pdu is None:
|
self._payload = HCI_Object.dict_to_bytes(self.__dict__, self.fields)
|
||||||
pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
|
return self._payload
|
||||||
self.pdu = pdu
|
|
||||||
|
|
||||||
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
|
@payload.setter
|
||||||
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
|
def payload(self, value: bytes) -> None:
|
||||||
|
self._payload = value
|
||||||
|
|
||||||
def __bytes__(self):
|
def __bytes__(self):
|
||||||
return self.pdu
|
return bytes([self.code]) + self.payload
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
result = color(self.name, 'yellow')
|
result = color(self.name, 'yellow')
|
||||||
@@ -306,206 +310,192 @@ class SMP_Command:
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass(
|
@SMP_Command.subclass
|
||||||
[
|
@dataclass
|
||||||
('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
|
|
||||||
('oob_data_flag', 1),
|
|
||||||
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
|
|
||||||
('maximum_encryption_key_size', 1),
|
|
||||||
(
|
|
||||||
'initiator_key_distribution',
|
|
||||||
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
|
|
||||||
),
|
|
||||||
(
|
|
||||||
'responder_key_distribution',
|
|
||||||
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
|
|
||||||
),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
class SMP_Pairing_Request_Command(SMP_Command):
|
class SMP_Pairing_Request_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
|
See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
|
||||||
'''
|
'''
|
||||||
|
|
||||||
io_capability: int
|
io_capability: int = field(
|
||||||
oob_data_flag: int
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name})
|
||||||
auth_req: int
|
)
|
||||||
maximum_encryption_key_size: int
|
oob_data_flag: int = field(metadata=metadata(1))
|
||||||
initiator_key_distribution: int
|
auth_req: int = field(
|
||||||
responder_key_distribution: int
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
|
||||||
|
)
|
||||||
|
maximum_encryption_key_size: int = field(metadata=metadata(1))
|
||||||
|
initiator_key_distribution: int = field(
|
||||||
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
|
||||||
|
)
|
||||||
|
responder_key_distribution: int = field(
|
||||||
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass(
|
@SMP_Command.subclass
|
||||||
[
|
@dataclass
|
||||||
('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
|
|
||||||
('oob_data_flag', 1),
|
|
||||||
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
|
|
||||||
('maximum_encryption_key_size', 1),
|
|
||||||
(
|
|
||||||
'initiator_key_distribution',
|
|
||||||
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
|
|
||||||
),
|
|
||||||
(
|
|
||||||
'responder_key_distribution',
|
|
||||||
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
|
|
||||||
),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
class SMP_Pairing_Response_Command(SMP_Command):
|
class SMP_Pairing_Response_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
|
See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
|
||||||
'''
|
'''
|
||||||
|
|
||||||
io_capability: int
|
io_capability: int = field(
|
||||||
oob_data_flag: int
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name})
|
||||||
auth_req: int
|
)
|
||||||
maximum_encryption_key_size: int
|
oob_data_flag: int = field(metadata=metadata(1))
|
||||||
initiator_key_distribution: int
|
auth_req: int = field(
|
||||||
responder_key_distribution: int
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
|
||||||
|
)
|
||||||
|
maximum_encryption_key_size: int = field(metadata=metadata(1))
|
||||||
|
initiator_key_distribution: int = field(
|
||||||
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
|
||||||
|
)
|
||||||
|
responder_key_distribution: int = field(
|
||||||
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('confirm_value', 16)])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Pairing_Confirm_Command(SMP_Command):
|
class SMP_Pairing_Confirm_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
|
See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
|
||||||
'''
|
'''
|
||||||
|
|
||||||
confirm_value: bytes
|
confirm_value: bytes = field(metadata=metadata(16))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('random_value', 16)])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Pairing_Random_Command(SMP_Command):
|
class SMP_Pairing_Random_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
|
See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
|
||||||
'''
|
'''
|
||||||
|
|
||||||
random_value: bytes
|
random_value: bytes = field(metadata=metadata(16))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Pairing_Failed_Command(SMP_Command):
|
class SMP_Pairing_Failed_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
|
See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
|
||||||
'''
|
'''
|
||||||
|
|
||||||
reason: int
|
reason: int = field(metadata=metadata({'size': 1, 'mapper': error_name}))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Pairing_Public_Key_Command(SMP_Command):
|
class SMP_Pairing_Public_Key_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
|
See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
|
||||||
'''
|
'''
|
||||||
|
|
||||||
public_key_x: bytes
|
public_key_x: bytes = field(metadata=metadata(32))
|
||||||
public_key_y: bytes
|
public_key_y: bytes = field(metadata=metadata(32))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass(
|
@SMP_Command.subclass
|
||||||
[
|
@dataclass
|
||||||
('dhkey_check', 16),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
class SMP_Pairing_DHKey_Check_Command(SMP_Command):
|
class SMP_Pairing_DHKey_Check_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
|
See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
|
||||||
'''
|
'''
|
||||||
|
|
||||||
dhkey_check: bytes
|
dhkey_check: bytes = field(metadata=metadata(16))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass(
|
@SMP_Command.subclass
|
||||||
[
|
@dataclass
|
||||||
(
|
|
||||||
'notification_type',
|
|
||||||
{'size': 1, 'mapper': SMP_Command.keypress_notification_type_name},
|
|
||||||
),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
|
class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
|
See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
|
||||||
'''
|
'''
|
||||||
|
|
||||||
notification_type: int
|
notification_type: int = field(
|
||||||
|
metadata=metadata(
|
||||||
|
{'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('long_term_key', 16)])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Encryption_Information_Command(SMP_Command):
|
class SMP_Encryption_Information_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
|
See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
|
||||||
'''
|
'''
|
||||||
|
|
||||||
long_term_key: bytes
|
long_term_key: bytes = field(metadata=metadata(16))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('ediv', 2), ('rand', 8)])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Master_Identification_Command(SMP_Command):
|
class SMP_Master_Identification_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
|
See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
|
||||||
'''
|
'''
|
||||||
|
|
||||||
ediv: int
|
ediv: int = field(metadata=metadata(2))
|
||||||
rand: bytes
|
rand: bytes = field(metadata=metadata(8))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('identity_resolving_key', 16)])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Identity_Information_Command(SMP_Command):
|
class SMP_Identity_Information_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
|
See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
|
||||||
'''
|
'''
|
||||||
|
|
||||||
identity_resolving_key: bytes
|
identity_resolving_key: bytes = field(metadata=metadata(16))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass(
|
@SMP_Command.subclass
|
||||||
[
|
@dataclass
|
||||||
('addr_type', Address.ADDRESS_TYPE_SPEC),
|
|
||||||
('bd_addr', Address.parse_address_preceded_by_type),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
class SMP_Identity_Address_Information_Command(SMP_Command):
|
class SMP_Identity_Address_Information_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
|
See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
|
||||||
'''
|
'''
|
||||||
|
|
||||||
addr_type: int
|
addr_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC))
|
||||||
bd_addr: Address
|
bd_addr: Address = field(metadata=metadata(Address.parse_address_preceded_by_type))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass([('signature_key', 16)])
|
@SMP_Command.subclass
|
||||||
|
@dataclass
|
||||||
class SMP_Signing_Information_Command(SMP_Command):
|
class SMP_Signing_Information_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
|
See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
|
||||||
'''
|
'''
|
||||||
|
|
||||||
signature_key: bytes
|
signature_key: bytes = field(metadata=metadata(16))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@SMP_Command.subclass(
|
@SMP_Command.subclass
|
||||||
[
|
@dataclass
|
||||||
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
class SMP_Security_Request_Command(SMP_Command):
|
class SMP_Security_Request_Command(SMP_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
|
See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
|
||||||
'''
|
'''
|
||||||
|
|
||||||
auth_req: int
|
auth_req: int = field(
|
||||||
|
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user