Migrate SMP commands to dataclasses

This commit is contained in:
Josh Wu
2025-07-02 13:16:49 +08:00
parent a275c399a3
commit 19d3616032

View File

@@ -26,13 +26,15 @@ from __future__ import annotations
import logging
import asyncio
import enum
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Optional,
TypeVar,
ClassVar,
cast,
)
@@ -43,7 +45,9 @@ from bumble.hci import (
Role,
HCI_LE_Enable_Encryption_Command,
HCI_Object,
Fields,
key_with_value,
metadata,
)
from bumble.core import (
PhysicalTransport,
@@ -200,31 +204,32 @@ def error_name(error_code: int) -> str:
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
@dataclass
class SMP_Command:
'''
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
'''
smp_classes: dict[int, type[SMP_Command]] = {}
fields: Any
code = 0
name = ''
smp_classes: ClassVar[dict[int, type[SMP_Command]]] = {}
fields: ClassVar[Fields]
code: int = field(default=0, init=False)
name: str = field(default='', init=False)
_payload: Optional[bytes] = field(default=None, init=False)
@staticmethod
def from_bytes(pdu: bytes) -> "SMP_Command":
@classmethod
def from_bytes(cls, pdu: bytes) -> "SMP_Command":
code = pdu[0]
cls = SMP_Command.smp_classes.get(code)
if cls is None:
instance = SMP_Command(pdu)
subclass = SMP_Command.smp_classes.get(code)
if subclass is None:
instance = SMP_Command()
instance.name = SMP_Command.command_name(code)
instance.code = code
instance.payload = pdu
return instance
self = cls.__new__(cls)
SMP_Command.__init__(self, pdu)
if hasattr(self, 'fields'):
self.init_from_bytes(pdu, 1)
return self
instance = subclass(**HCI_Object.dict_from_bytes(pdu, 1, subclass.fields))
instance.payload = pdu[1:]
return instance
@staticmethod
def command_name(code: int) -> str:
@@ -264,36 +269,35 @@ class SMP_Command:
def keypress_notification_type_name(notification_type: int) -> str:
return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
@staticmethod
def subclass(fields):
def inner(cls):
cls.name = cls.__name__.upper()
cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name)
if cls.code is None:
raise KeyError(
f'Command name {cls.name} not found in SMP_COMMAND_NAMES'
)
cls.fields = fields
_Command = TypeVar("_Command", bound="SMP_Command")
# Register a factory for this class
SMP_Command.smp_classes[cls.code] = cls
@classmethod
def subclass(cls, subclass: type[_Command]) -> type[_Command]:
subclass.name = subclass.__name__.upper()
subclass.code = key_with_value(SMP_COMMAND_NAMES, subclass.name)
if subclass.code is None:
raise KeyError(
f'Command name {subclass.name} not found in SMP_COMMAND_NAMES'
)
subclass.fields = HCI_Object.fields_from_dataclass(subclass)
return cls
# Register a factory for this class
SMP_Command.smp_classes[subclass.code] = subclass
return inner
return subclass
def __init__(self, pdu: Optional[bytes] = None, **kwargs: Any) -> None:
if hasattr(self, 'fields') and kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
self.pdu = pdu
@property
def payload(self) -> bytes:
if self._payload is None:
self._payload = HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._payload
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
@payload.setter
def payload(self, value: bytes) -> None:
self._payload = value
def __bytes__(self):
return self.pdu
return bytes([self.code]) + self.payload
def __str__(self):
result = color(self.name, 'yellow')
@@ -306,206 +310,192 @@ class SMP_Command:
# -----------------------------------------------------------------------------
@SMP_Command.subclass(
[
('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
('oob_data_flag', 1),
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
('maximum_encryption_key_size', 1),
(
'initiator_key_distribution',
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
),
(
'responder_key_distribution',
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
),
]
)
@SMP_Command.subclass
@dataclass
class SMP_Pairing_Request_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
'''
io_capability: int
oob_data_flag: int
auth_req: int
maximum_encryption_key_size: int
initiator_key_distribution: int
responder_key_distribution: int
io_capability: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name})
)
oob_data_flag: int = field(metadata=metadata(1))
auth_req: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
)
maximum_encryption_key_size: int = field(metadata=metadata(1))
initiator_key_distribution: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
)
responder_key_distribution: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
)
# -----------------------------------------------------------------------------
@SMP_Command.subclass(
[
('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
('oob_data_flag', 1),
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
('maximum_encryption_key_size', 1),
(
'initiator_key_distribution',
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
),
(
'responder_key_distribution',
{'size': 1, 'mapper': SMP_Command.key_distribution_str},
),
]
)
@SMP_Command.subclass
@dataclass
class SMP_Pairing_Response_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
'''
io_capability: int
oob_data_flag: int
auth_req: int
maximum_encryption_key_size: int
initiator_key_distribution: int
responder_key_distribution: int
io_capability: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name})
)
oob_data_flag: int = field(metadata=metadata(1))
auth_req: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
)
maximum_encryption_key_size: int = field(metadata=metadata(1))
initiator_key_distribution: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
)
responder_key_distribution: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str})
)
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('confirm_value', 16)])
@SMP_Command.subclass
@dataclass
class SMP_Pairing_Confirm_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
'''
confirm_value: bytes
confirm_value: bytes = field(metadata=metadata(16))
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('random_value', 16)])
@SMP_Command.subclass
@dataclass
class SMP_Pairing_Random_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
'''
random_value: bytes
random_value: bytes = field(metadata=metadata(16))
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})])
@SMP_Command.subclass
@dataclass
class SMP_Pairing_Failed_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
'''
reason: int
reason: int = field(metadata=metadata({'size': 1, 'mapper': error_name}))
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)])
@SMP_Command.subclass
@dataclass
class SMP_Pairing_Public_Key_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
'''
public_key_x: bytes
public_key_y: bytes
public_key_x: bytes = field(metadata=metadata(32))
public_key_y: bytes = field(metadata=metadata(32))
# -----------------------------------------------------------------------------
@SMP_Command.subclass(
[
('dhkey_check', 16),
]
)
@SMP_Command.subclass
@dataclass
class SMP_Pairing_DHKey_Check_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
'''
dhkey_check: bytes
dhkey_check: bytes = field(metadata=metadata(16))
# -----------------------------------------------------------------------------
@SMP_Command.subclass(
[
(
'notification_type',
{'size': 1, 'mapper': SMP_Command.keypress_notification_type_name},
),
]
)
@SMP_Command.subclass
@dataclass
class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
'''
notification_type: int
notification_type: int = field(
metadata=metadata(
{'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}
)
)
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('long_term_key', 16)])
@SMP_Command.subclass
@dataclass
class SMP_Encryption_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
'''
long_term_key: bytes
long_term_key: bytes = field(metadata=metadata(16))
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('ediv', 2), ('rand', 8)])
@SMP_Command.subclass
@dataclass
class SMP_Master_Identification_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
'''
ediv: int
rand: bytes
ediv: int = field(metadata=metadata(2))
rand: bytes = field(metadata=metadata(8))
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('identity_resolving_key', 16)])
@SMP_Command.subclass
@dataclass
class SMP_Identity_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
'''
identity_resolving_key: bytes
identity_resolving_key: bytes = field(metadata=metadata(16))
# -----------------------------------------------------------------------------
@SMP_Command.subclass(
[
('addr_type', Address.ADDRESS_TYPE_SPEC),
('bd_addr', Address.parse_address_preceded_by_type),
]
)
@SMP_Command.subclass
@dataclass
class SMP_Identity_Address_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
'''
addr_type: int
bd_addr: Address
addr_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC))
bd_addr: Address = field(metadata=metadata(Address.parse_address_preceded_by_type))
# -----------------------------------------------------------------------------
@SMP_Command.subclass([('signature_key', 16)])
@SMP_Command.subclass
@dataclass
class SMP_Signing_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
'''
signature_key: bytes
signature_key: bytes = field(metadata=metadata(16))
# -----------------------------------------------------------------------------
@SMP_Command.subclass(
[
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
]
)
@SMP_Command.subclass
@dataclass
class SMP_Security_Request_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
'''
auth_req: int
auth_req: int = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
)
# -----------------------------------------------------------------------------