diff --git a/bumble/smp.py b/bumble/smp.py index f22f3208..7588912e 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -26,13 +26,15 @@ from __future__ import annotations import logging import asyncio import enum -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import ( TYPE_CHECKING, Any, Awaitable, Callable, Optional, + TypeVar, + ClassVar, cast, ) @@ -43,7 +45,9 @@ from bumble.hci import ( Role, HCI_LE_Enable_Encryption_Command, HCI_Object, + Fields, key_with_value, + metadata, ) from bumble.core import ( PhysicalTransport, @@ -200,31 +204,32 @@ def error_name(error_code: int) -> str: # ----------------------------------------------------------------------------- # Classes # ----------------------------------------------------------------------------- +@dataclass class SMP_Command: ''' See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL ''' - smp_classes: dict[int, type[SMP_Command]] = {} - fields: Any - code = 0 - name = '' + smp_classes: ClassVar[dict[int, type[SMP_Command]]] = {} + fields: ClassVar[Fields] + code: int = field(default=0, init=False) + name: str = field(default='', init=False) + _payload: Optional[bytes] = field(default=None, init=False) - @staticmethod - def from_bytes(pdu: bytes) -> "SMP_Command": + @classmethod + def from_bytes(cls, pdu: bytes) -> "SMP_Command": code = pdu[0] - cls = SMP_Command.smp_classes.get(code) - if cls is None: - instance = SMP_Command(pdu) + subclass = SMP_Command.smp_classes.get(code) + if subclass is None: + instance = SMP_Command() instance.name = SMP_Command.command_name(code) instance.code = code + instance.payload = pdu return instance - self = cls.__new__(cls) - SMP_Command.__init__(self, pdu) - if hasattr(self, 'fields'): - self.init_from_bytes(pdu, 1) - return self + instance = subclass(**HCI_Object.dict_from_bytes(pdu, 1, subclass.fields)) + instance.payload = pdu[1:] + return instance @staticmethod def command_name(code: int) -> str: @@ -264,36 +269,35 @@ class SMP_Command: def keypress_notification_type_name(notification_type: int) -> str: return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type) - @staticmethod - def subclass(fields): - def inner(cls): - cls.name = cls.__name__.upper() - cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name) - if cls.code is None: - raise KeyError( - f'Command name {cls.name} not found in SMP_COMMAND_NAMES' - ) - cls.fields = fields + _Command = TypeVar("_Command", bound="SMP_Command") - # Register a factory for this class - SMP_Command.smp_classes[cls.code] = cls + @classmethod + def subclass(cls, subclass: type[_Command]) -> type[_Command]: + subclass.name = subclass.__name__.upper() + subclass.code = key_with_value(SMP_COMMAND_NAMES, subclass.name) + if subclass.code is None: + raise KeyError( + f'Command name {subclass.name} not found in SMP_COMMAND_NAMES' + ) + subclass.fields = HCI_Object.fields_from_dataclass(subclass) - return cls + # Register a factory for this class + SMP_Command.smp_classes[subclass.code] = subclass - return inner + return subclass - def __init__(self, pdu: Optional[bytes] = None, **kwargs: Any) -> None: - if hasattr(self, 'fields') and kwargs: - HCI_Object.init_from_fields(self, self.fields, kwargs) - if pdu is None: - pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) - self.pdu = pdu + @property + def payload(self) -> bytes: + if self._payload is None: + self._payload = HCI_Object.dict_to_bytes(self.__dict__, self.fields) + return self._payload - def init_from_bytes(self, pdu: bytes, offset: int) -> None: - return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) + @payload.setter + def payload(self, value: bytes) -> None: + self._payload = value def __bytes__(self): - return self.pdu + return bytes([self.code]) + self.payload def __str__(self): result = color(self.name, 'yellow') @@ -306,206 +310,192 @@ class SMP_Command: # ----------------------------------------------------------------------------- -@SMP_Command.subclass( - [ - ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), - ('oob_data_flag', 1), - ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), - ('maximum_encryption_key_size', 1), - ( - 'initiator_key_distribution', - {'size': 1, 'mapper': SMP_Command.key_distribution_str}, - ), - ( - 'responder_key_distribution', - {'size': 1, 'mapper': SMP_Command.key_distribution_str}, - ), - ] -) +@SMP_Command.subclass +@dataclass class SMP_Pairing_Request_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request ''' - io_capability: int - oob_data_flag: int - auth_req: int - maximum_encryption_key_size: int - initiator_key_distribution: int - responder_key_distribution: int + io_capability: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name}) + ) + oob_data_flag: int = field(metadata=metadata(1)) + auth_req: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str}) + ) + maximum_encryption_key_size: int = field(metadata=metadata(1)) + initiator_key_distribution: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + ) + responder_key_distribution: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + ) # ----------------------------------------------------------------------------- -@SMP_Command.subclass( - [ - ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), - ('oob_data_flag', 1), - ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), - ('maximum_encryption_key_size', 1), - ( - 'initiator_key_distribution', - {'size': 1, 'mapper': SMP_Command.key_distribution_str}, - ), - ( - 'responder_key_distribution', - {'size': 1, 'mapper': SMP_Command.key_distribution_str}, - ), - ] -) +@SMP_Command.subclass +@dataclass class SMP_Pairing_Response_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response ''' - io_capability: int - oob_data_flag: int - auth_req: int - maximum_encryption_key_size: int - initiator_key_distribution: int - responder_key_distribution: int + io_capability: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name}) + ) + oob_data_flag: int = field(metadata=metadata(1)) + auth_req: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str}) + ) + maximum_encryption_key_size: int = field(metadata=metadata(1)) + initiator_key_distribution: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + ) + responder_key_distribution: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + ) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('confirm_value', 16)]) +@SMP_Command.subclass +@dataclass class SMP_Pairing_Confirm_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm ''' - confirm_value: bytes + confirm_value: bytes = field(metadata=metadata(16)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('random_value', 16)]) +@SMP_Command.subclass +@dataclass class SMP_Pairing_Random_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random ''' - random_value: bytes + random_value: bytes = field(metadata=metadata(16)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})]) +@SMP_Command.subclass +@dataclass class SMP_Pairing_Failed_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed ''' - reason: int + reason: int = field(metadata=metadata({'size': 1, 'mapper': error_name})) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)]) +@SMP_Command.subclass +@dataclass class SMP_Pairing_Public_Key_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key ''' - public_key_x: bytes - public_key_y: bytes + public_key_x: bytes = field(metadata=metadata(32)) + public_key_y: bytes = field(metadata=metadata(32)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass( - [ - ('dhkey_check', 16), - ] -) +@SMP_Command.subclass +@dataclass class SMP_Pairing_DHKey_Check_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check ''' - dhkey_check: bytes + dhkey_check: bytes = field(metadata=metadata(16)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass( - [ - ( - 'notification_type', - {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}, - ), - ] -) +@SMP_Command.subclass +@dataclass class SMP_Pairing_Keypress_Notification_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification ''' - notification_type: int + notification_type: int = field( + metadata=metadata( + {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name} + ) + ) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('long_term_key', 16)]) +@SMP_Command.subclass +@dataclass class SMP_Encryption_Information_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information ''' - long_term_key: bytes + long_term_key: bytes = field(metadata=metadata(16)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('ediv', 2), ('rand', 8)]) +@SMP_Command.subclass +@dataclass class SMP_Master_Identification_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification ''' - ediv: int - rand: bytes + ediv: int = field(metadata=metadata(2)) + rand: bytes = field(metadata=metadata(8)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('identity_resolving_key', 16)]) +@SMP_Command.subclass +@dataclass class SMP_Identity_Information_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information ''' - identity_resolving_key: bytes + identity_resolving_key: bytes = field(metadata=metadata(16)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass( - [ - ('addr_type', Address.ADDRESS_TYPE_SPEC), - ('bd_addr', Address.parse_address_preceded_by_type), - ] -) +@SMP_Command.subclass +@dataclass class SMP_Identity_Address_Information_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information ''' - addr_type: int - bd_addr: Address + addr_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC)) + bd_addr: Address = field(metadata=metadata(Address.parse_address_preceded_by_type)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass([('signature_key', 16)]) +@SMP_Command.subclass +@dataclass class SMP_Signing_Information_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information ''' - signature_key: bytes + signature_key: bytes = field(metadata=metadata(16)) # ----------------------------------------------------------------------------- -@SMP_Command.subclass( - [ - ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), - ] -) +@SMP_Command.subclass +@dataclass class SMP_Security_Request_Command(SMP_Command): ''' See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request ''' - auth_req: int + auth_req: int = field( + metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str}) + ) # -----------------------------------------------------------------------------