SMP: Migrate all enums

This commit is contained in:
Josh Wu
2025-11-07 01:42:50 +08:00
parent d810d93aaf
commit 797cd216d4
6 changed files with 268 additions and 338 deletions

View File

@@ -20,11 +20,12 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os import os
from typing import ClassVar
import click import click
from prompt_toolkit.shortcuts import PromptSession from prompt_toolkit.shortcuts import PromptSession
from bumble import data_types from bumble import data_types, smp
from bumble.a2dp import make_audio_sink_service_sdp_records from bumble.a2dp import make_audio_sink_service_sdp_records
from bumble.att import ( from bumble.att import (
ATT_INSUFFICIENT_AUTHENTICATION_ERROR, ATT_INSUFFICIENT_AUTHENTICATION_ERROR,
@@ -40,7 +41,7 @@ from bumble.core import (
PhysicalTransport, PhysicalTransport,
ProtocolError, ProtocolError,
) )
from bumble.device import Device, Peer from bumble.device import Connection, Device, Peer
from bumble.gatt import ( from bumble.gatt import (
GATT_DEVICE_NAME_CHARACTERISTIC, GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_GENERIC_ACCESS_SERVICE, GATT_GENERIC_ACCESS_SERVICE,
@@ -53,7 +54,6 @@ from bumble.hci import OwnAddressType
from bumble.keys import JsonKeyStore from bumble.keys import JsonKeyStore
from bumble.pairing import OobData, PairingConfig, PairingDelegate from bumble.pairing import OobData, PairingConfig, PairingDelegate
from bumble.smp import OobContext, OobLegacyContext from bumble.smp import OobContext, OobLegacyContext
from bumble.smp import error_name as smp_error_name
from bumble.transport import open_transport from bumble.transport import open_transport
from bumble.utils import AsyncRunner from bumble.utils import AsyncRunner
@@ -65,7 +65,7 @@ POST_PAIRING_DELAY = 1
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Waiter: class Waiter:
instance: Waiter | None = None instance: ClassVar[Waiter | None] = None
def __init__(self, linger=False): def __init__(self, linger=False):
self.done = asyncio.get_running_loop().create_future() self.done = asyncio.get_running_loop().create_future()
@@ -319,12 +319,13 @@ async def on_classic_pairing(connection):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@AsyncRunner.run_in_task() @AsyncRunner.run_in_task()
async def on_pairing_failure(connection, reason): async def on_pairing_failure(connection: Connection, reason: smp.ErrorCode):
print(color('***-----------------------------------', 'red')) print(color('***-----------------------------------', 'red'))
print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red')) print(color(f'*** Pairing failed: {reason.name}', 'red'))
print(color('***-----------------------------------', 'red')) print(color('***-----------------------------------', 'red'))
await connection.disconnect() await connection.disconnect()
Waiter.instance.terminate() if Waiter.instance:
Waiter.instance.terminate()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -88,13 +88,6 @@ SBC_DUAL_CHANNEL_MODE = 0x01
SBC_STEREO_CHANNEL_MODE = 0x02 SBC_STEREO_CHANNEL_MODE = 0x02
SBC_JOINT_STEREO_CHANNEL_MODE = 0x03 SBC_JOINT_STEREO_CHANNEL_MODE = 0x03
SBC_CHANNEL_MODE_NAMES = {
SBC_MONO_CHANNEL_MODE: 'SBC_MONO_CHANNEL_MODE',
SBC_DUAL_CHANNEL_MODE: 'SBC_DUAL_CHANNEL_MODE',
SBC_STEREO_CHANNEL_MODE: 'SBC_STEREO_CHANNEL_MODE',
SBC_JOINT_STEREO_CHANNEL_MODE: 'SBC_JOINT_STEREO_CHANNEL_MODE'
}
SBC_BLOCK_LENGTHS = [4, 8, 12, 16] SBC_BLOCK_LENGTHS = [4, 8, 12, 16]
SBC_SUBBANDS = [4, 8] SBC_SUBBANDS = [4, 8]
@@ -102,11 +95,6 @@ SBC_SUBBANDS = [4, 8]
SBC_SNR_ALLOCATION_METHOD = 0x00 SBC_SNR_ALLOCATION_METHOD = 0x00
SBC_LOUDNESS_ALLOCATION_METHOD = 0x01 SBC_LOUDNESS_ALLOCATION_METHOD = 0x01
SBC_ALLOCATION_METHOD_NAMES = {
SBC_SNR_ALLOCATION_METHOD: 'SBC_SNR_ALLOCATION_METHOD',
SBC_LOUDNESS_ALLOCATION_METHOD: 'SBC_LOUDNESS_ALLOCATION_METHOD'
}
SBC_MAX_FRAMES_IN_RTP_PAYLOAD = 15 SBC_MAX_FRAMES_IN_RTP_PAYLOAD = 15
MPEG_2_4_AAC_SAMPLING_FREQUENCIES = [ MPEG_2_4_AAC_SAMPLING_FREQUENCIES = [
@@ -129,13 +117,6 @@ MPEG_4_AAC_LC_OBJECT_TYPE = 0x01
MPEG_4_AAC_LTP_OBJECT_TYPE = 0x02 MPEG_4_AAC_LTP_OBJECT_TYPE = 0x02
MPEG_4_AAC_SCALABLE_OBJECT_TYPE = 0x03 MPEG_4_AAC_SCALABLE_OBJECT_TYPE = 0x03
MPEG_2_4_OBJECT_TYPE_NAMES = {
MPEG_2_AAC_LC_OBJECT_TYPE: 'MPEG_2_AAC_LC_OBJECT_TYPE',
MPEG_4_AAC_LC_OBJECT_TYPE: 'MPEG_4_AAC_LC_OBJECT_TYPE',
MPEG_4_AAC_LTP_OBJECT_TYPE: 'MPEG_4_AAC_LTP_OBJECT_TYPE',
MPEG_4_AAC_SCALABLE_OBJECT_TYPE: 'MPEG_4_AAC_SCALABLE_OBJECT_TYPE'
}
OPUS_MAX_FRAMES_IN_RTP_PAYLOAD = 15 OPUS_MAX_FRAMES_IN_RTP_PAYLOAD = 15

View File

@@ -247,28 +247,6 @@ HCI_VERSION_BLUETOOTH_CORE_6_0 = SpecificationVersion.BLUETOOTH_CORE_6_0
HCI_VERSION_BLUETOOTH_CORE_6_1 = SpecificationVersion.BLUETOOTH_CORE_6_1 HCI_VERSION_BLUETOOTH_CORE_6_1 = SpecificationVersion.BLUETOOTH_CORE_6_1
HCI_VERSION_BLUETOOTH_CORE_6_2 = SpecificationVersion.BLUETOOTH_CORE_6_2 HCI_VERSION_BLUETOOTH_CORE_6_2 = SpecificationVersion.BLUETOOTH_CORE_6_2
HCI_VERSION_NAMES = {
HCI_VERSION_BLUETOOTH_CORE_1_0B: 'HCI_VERSION_BLUETOOTH_CORE_1_0B',
HCI_VERSION_BLUETOOTH_CORE_1_1: 'HCI_VERSION_BLUETOOTH_CORE_1_1',
HCI_VERSION_BLUETOOTH_CORE_1_2: 'HCI_VERSION_BLUETOOTH_CORE_1_2',
HCI_VERSION_BLUETOOTH_CORE_2_0_EDR: 'HCI_VERSION_BLUETOOTH_CORE_2_0_EDR',
HCI_VERSION_BLUETOOTH_CORE_2_1_EDR: 'HCI_VERSION_BLUETOOTH_CORE_2_1_EDR',
HCI_VERSION_BLUETOOTH_CORE_3_0_HS: 'HCI_VERSION_BLUETOOTH_CORE_3_0_HS',
HCI_VERSION_BLUETOOTH_CORE_4_0: 'HCI_VERSION_BLUETOOTH_CORE_4_0',
HCI_VERSION_BLUETOOTH_CORE_4_1: 'HCI_VERSION_BLUETOOTH_CORE_4_1',
HCI_VERSION_BLUETOOTH_CORE_4_2: 'HCI_VERSION_BLUETOOTH_CORE_4_2',
HCI_VERSION_BLUETOOTH_CORE_5_0: 'HCI_VERSION_BLUETOOTH_CORE_5_0',
HCI_VERSION_BLUETOOTH_CORE_5_1: 'HCI_VERSION_BLUETOOTH_CORE_5_1',
HCI_VERSION_BLUETOOTH_CORE_5_2: 'HCI_VERSION_BLUETOOTH_CORE_5_2',
HCI_VERSION_BLUETOOTH_CORE_5_3: 'HCI_VERSION_BLUETOOTH_CORE_5_3',
HCI_VERSION_BLUETOOTH_CORE_5_4: 'HCI_VERSION_BLUETOOTH_CORE_5_4',
HCI_VERSION_BLUETOOTH_CORE_6_0: 'HCI_VERSION_BLUETOOTH_CORE_6_0',
HCI_VERSION_BLUETOOTH_CORE_6_1: 'HCI_VERSION_BLUETOOTH_CORE_6_1',
HCI_VERSION_BLUETOOTH_CORE_6_2: 'HCI_VERSION_BLUETOOTH_CORE_6_2',
}
LMP_VERSION_NAMES = HCI_VERSION_NAMES
# HCI Packet types # HCI Packet types
HCI_COMMAND_PACKET = 0x01 HCI_COMMAND_PACKET = 0x01
HCI_ACL_DATA_PACKET = 0x02 HCI_ACL_DATA_PACKET = 0x02

View File

@@ -21,18 +21,9 @@ import enum
import secrets import secrets
from dataclasses import dataclass from dataclasses import dataclass
from bumble import hci from bumble import hci, smp
from bumble.core import AdvertisingData, LeRole from bumble.core import AdvertisingData, LeRole
from bumble.smp import ( from bumble.smp import (
SMP_DISPLAY_ONLY_IO_CAPABILITY,
SMP_DISPLAY_YES_NO_IO_CAPABILITY,
SMP_ENC_KEY_DISTRIBUTION_FLAG,
SMP_ID_KEY_DISTRIBUTION_FLAG,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
SMP_LINK_KEY_DISTRIBUTION_FLAG,
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
OobContext, OobContext,
OobLegacyContext, OobLegacyContext,
OobSharedData, OobSharedData,
@@ -96,11 +87,11 @@ class PairingDelegate:
# These are defined abstractly, and can be mapped to specific Classic pairing # These are defined abstractly, and can be mapped to specific Classic pairing
# and/or SMP constants. # and/or SMP constants.
class IoCapability(enum.IntEnum): class IoCapability(enum.IntEnum):
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY NO_OUTPUT_NO_INPUT = smp.IoCapability.NO_INPUT_NO_OUTPUT
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY KEYBOARD_INPUT_ONLY = smp.IoCapability.KEYBOARD_ONLY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY DISPLAY_OUTPUT_ONLY = smp.IoCapability.DISPLAY_ONLY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY DISPLAY_OUTPUT_AND_YES_NO_INPUT = smp.IoCapability.DISPLAY_YES_NO
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = smp.IoCapability.KEYBOARD_DISPLAY
# Direct names for backward compatibility. # Direct names for backward compatibility.
NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT
@@ -111,10 +102,10 @@ class PairingDelegate:
# Key Distribution [LE only] # Key Distribution [LE only]
class KeyDistribution(enum.IntFlag): class KeyDistribution(enum.IntFlag):
DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG DISTRIBUTE_ENCRYPTION_KEY = smp.KeyDistribution.ENC_KEY
DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG DISTRIBUTE_IDENTITY_KEY = smp.KeyDistribution.ID_KEY
DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG DISTRIBUTE_SIGNING_KEY = smp.KeyDistribution.SIGN_KEY
DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG DISTRIBUTE_LINK_KEY = smp.KeyDistribution.LINK_KEY
DEFAULT_KEY_DISTRIBUTION: KeyDistribution = ( DEFAULT_KEY_DISTRIBUTION: KeyDistribution = (
KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY

View File

@@ -31,14 +31,13 @@ from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import TYPE_CHECKING, ClassVar, TypeVar, cast from typing import TYPE_CHECKING, ClassVar, TypeVar, cast
from bumble import crypto, utils from bumble import crypto, hci, utils
from bumble.colors import color from bumble.colors import color
from bumble.core import ( from bumble.core import (
AdvertisingData, AdvertisingData,
InvalidArgumentError, InvalidArgumentError,
PhysicalTransport, PhysicalTransport,
ProtocolError, ProtocolError,
name_or_number,
) )
from bumble.hci import ( from bumble.hci import (
Address, Address,
@@ -46,7 +45,6 @@ from bumble.hci import (
HCI_LE_Enable_Encryption_Command, HCI_LE_Enable_Encryption_Command,
HCI_Object, HCI_Object,
Role, Role,
key_with_value,
metadata, metadata,
) )
from bumble.keys import PairingKeys from bumble.keys import PairingKeys
@@ -71,110 +69,110 @@ logger = logging.getLogger(__name__)
SMP_CID = 0x06 SMP_CID = 0x06
SMP_BR_CID = 0x07 SMP_BR_CID = 0x07
SMP_PAIRING_REQUEST_COMMAND = 0x01 class CommandCode(hci.SpecableEnum):
SMP_PAIRING_RESPONSE_COMMAND = 0x02 PAIRING_REQUEST = 0x01
SMP_PAIRING_CONFIRM_COMMAND = 0x03 PAIRING_RESPONSE = 0x02
SMP_PAIRING_RANDOM_COMMAND = 0x04 PAIRING_CONFIRM = 0x03
SMP_PAIRING_FAILED_COMMAND = 0x05 PAIRING_RANDOM = 0x04
SMP_ENCRYPTION_INFORMATION_COMMAND = 0x06 PAIRING_FAILED = 0x05
SMP_MASTER_IDENTIFICATION_COMMAND = 0x07 ENCRYPTION_INFORMATION = 0x06
SMP_IDENTITY_INFORMATION_COMMAND = 0x08 MASTER_IDENTIFICATION = 0x07
SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND = 0x09 IDENTITY_INFORMATION = 0x08
SMP_SIGNING_INFORMATION_COMMAND = 0x0A IDENTITY_ADDRESS_INFORMATION = 0x09
SMP_SECURITY_REQUEST_COMMAND = 0x0B SIGNING_INFORMATION = 0x0A
SMP_PAIRING_PUBLIC_KEY_COMMAND = 0x0C SECURITY_REQUEST = 0x0B
SMP_PAIRING_DHKEY_CHECK_COMMAND = 0x0D PAIRING_PUBLIC_KEY = 0x0C
SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E PAIRING_DHKEY_CHECK = 0x0D
PAIRING_KEYPRESS_NOTIFICATION = 0x0E
SMP_COMMAND_NAMES = {
SMP_PAIRING_REQUEST_COMMAND: 'SMP_PAIRING_REQUEST_COMMAND',
SMP_PAIRING_RESPONSE_COMMAND: 'SMP_PAIRING_RESPONSE_COMMAND',
SMP_PAIRING_CONFIRM_COMMAND: 'SMP_PAIRING_CONFIRM_COMMAND',
SMP_PAIRING_RANDOM_COMMAND: 'SMP_PAIRING_RANDOM_COMMAND',
SMP_PAIRING_FAILED_COMMAND: 'SMP_PAIRING_FAILED_COMMAND',
SMP_ENCRYPTION_INFORMATION_COMMAND: 'SMP_ENCRYPTION_INFORMATION_COMMAND',
SMP_MASTER_IDENTIFICATION_COMMAND: 'SMP_MASTER_IDENTIFICATION_COMMAND',
SMP_IDENTITY_INFORMATION_COMMAND: 'SMP_IDENTITY_INFORMATION_COMMAND',
SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND: 'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND',
SMP_SIGNING_INFORMATION_COMMAND: 'SMP_SIGNING_INFORMATION_COMMAND',
SMP_SECURITY_REQUEST_COMMAND: 'SMP_SECURITY_REQUEST_COMMAND',
SMP_PAIRING_PUBLIC_KEY_COMMAND: 'SMP_PAIRING_PUBLIC_KEY_COMMAND',
SMP_PAIRING_DHKEY_CHECK_COMMAND: 'SMP_PAIRING_DHKEY_CHECK_COMMAND',
SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND'
}
SMP_DISPLAY_ONLY_IO_CAPABILITY = 0x00 class IoCapability(hci.SpecableEnum):
SMP_DISPLAY_YES_NO_IO_CAPABILITY = 0x01 DISPLAY_ONLY = 0x00
SMP_KEYBOARD_ONLY_IO_CAPABILITY = 0x02 DISPLAY_YES_NO = 0x01
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03 KEYBOARD_ONLY = 0x02
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = 0x04 NO_INPUT_NO_OUTPUT = 0x03
KEYBOARD_DISPLAY = 0x04
SMP_IO_CAPABILITY_NAMES = { SMP_DISPLAY_ONLY_IO_CAPABILITY = IoCapability.DISPLAY_ONLY
SMP_DISPLAY_ONLY_IO_CAPABILITY: 'SMP_DISPLAY_ONLY_IO_CAPABILITY', SMP_DISPLAY_YES_NO_IO_CAPABILITY = IoCapability.DISPLAY_YES_NO
SMP_DISPLAY_YES_NO_IO_CAPABILITY: 'SMP_DISPLAY_YES_NO_IO_CAPABILITY', SMP_KEYBOARD_ONLY_IO_CAPABILITY = IoCapability.KEYBOARD_ONLY
SMP_KEYBOARD_ONLY_IO_CAPABILITY: 'SMP_KEYBOARD_ONLY_IO_CAPABILITY', SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = IoCapability.NO_INPUT_NO_OUTPUT
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY', SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = IoCapability.KEYBOARD_DISPLAY
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: 'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY'
}
SMP_PASSKEY_ENTRY_FAILED_ERROR = 0x01 class ErrorCode(hci.SpecableEnum):
SMP_OOB_NOT_AVAILABLE_ERROR = 0x02 PASSKEY_ENTRY_FAILED = 0x01
SMP_AUTHENTICATION_REQUIREMENTS_ERROR = 0x03 OOB_NOT_AVAILABLE = 0x02
SMP_CONFIRM_VALUE_FAILED_ERROR = 0x04 AUTHENTICATION_REQUIREMENTS = 0x03
SMP_PAIRING_NOT_SUPPORTED_ERROR = 0x05 CONFIRM_VALUE_FAILED = 0x04
SMP_ENCRYPTION_KEY_SIZE_ERROR = 0x06 PAIRING_NOT_SUPPORTED = 0x05
SMP_COMMAND_NOT_SUPPORTED_ERROR = 0x07 ENCRYPTION_KEY_SIZE = 0x06
SMP_UNSPECIFIED_REASON_ERROR = 0x08 COMMAND_NOT_SUPPORTED = 0x07
SMP_REPEATED_ATTEMPTS_ERROR = 0x09 UNSPECIFIED_REASON = 0x08
SMP_INVALID_PARAMETERS_ERROR = 0x0A REPEATED_ATTEMPTS = 0x09
SMP_DHKEY_CHECK_FAILED_ERROR = 0x0B INVALID_PARAMETERS = 0x0A
SMP_NUMERIC_COMPARISON_FAILED_ERROR = 0x0C DHKEY_CHECK_FAILED = 0x0B
SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = 0x0D NUMERIC_COMPARISON_FAILED = 0x0C
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E BD_EDR_PAIRING_IN_PROGRESS = 0x0D
CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED = 0x0E
SMP_ERROR_NAMES = { SMP_PASSKEY_ENTRY_FAILED_ERROR = ErrorCode.PASSKEY_ENTRY_FAILED
SMP_PASSKEY_ENTRY_FAILED_ERROR: 'SMP_PASSKEY_ENTRY_FAILED_ERROR', SMP_OOB_NOT_AVAILABLE_ERROR = ErrorCode.OOB_NOT_AVAILABLE
SMP_OOB_NOT_AVAILABLE_ERROR: 'SMP_OOB_NOT_AVAILABLE_ERROR', SMP_AUTHENTICATION_REQUIREMENTS_ERROR = ErrorCode.AUTHENTICATION_REQUIREMENTS
SMP_AUTHENTICATION_REQUIREMENTS_ERROR: 'SMP_AUTHENTICATION_REQUIREMENTS_ERROR', SMP_CONFIRM_VALUE_FAILED_ERROR = ErrorCode.CONFIRM_VALUE_FAILED
SMP_CONFIRM_VALUE_FAILED_ERROR: 'SMP_CONFIRM_VALUE_FAILED_ERROR', SMP_PAIRING_NOT_SUPPORTED_ERROR = ErrorCode.PAIRING_NOT_SUPPORTED
SMP_PAIRING_NOT_SUPPORTED_ERROR: 'SMP_PAIRING_NOT_SUPPORTED_ERROR', SMP_ENCRYPTION_KEY_SIZE_ERROR = ErrorCode.ENCRYPTION_KEY_SIZE
SMP_ENCRYPTION_KEY_SIZE_ERROR: 'SMP_ENCRYPTION_KEY_SIZE_ERROR', SMP_COMMAND_NOT_SUPPORTED_ERROR = ErrorCode.COMMAND_NOT_SUPPORTED
SMP_COMMAND_NOT_SUPPORTED_ERROR: 'SMP_COMMAND_NOT_SUPPORTED_ERROR', SMP_UNSPECIFIED_REASON_ERROR = ErrorCode.UNSPECIFIED_REASON
SMP_UNSPECIFIED_REASON_ERROR: 'SMP_UNSPECIFIED_REASON_ERROR', SMP_REPEATED_ATTEMPTS_ERROR = ErrorCode.REPEATED_ATTEMPTS
SMP_REPEATED_ATTEMPTS_ERROR: 'SMP_REPEATED_ATTEMPTS_ERROR', SMP_INVALID_PARAMETERS_ERROR = ErrorCode.INVALID_PARAMETERS
SMP_INVALID_PARAMETERS_ERROR: 'SMP_INVALID_PARAMETERS_ERROR', SMP_DHKEY_CHECK_FAILED_ERROR = ErrorCode.DHKEY_CHECK_FAILED
SMP_DHKEY_CHECK_FAILED_ERROR: 'SMP_DHKEY_CHECK_FAILED_ERROR', SMP_NUMERIC_COMPARISON_FAILED_ERROR = ErrorCode.NUMERIC_COMPARISON_FAILED
SMP_NUMERIC_COMPARISON_FAILED_ERROR: 'SMP_NUMERIC_COMPARISON_FAILED_ERROR', SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = ErrorCode.BD_EDR_PAIRING_IN_PROGRESS
SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR: 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR', SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = ErrorCode.CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR'
}
SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE = 0 class KeypressNotificationType(hci.SpecableEnum):
SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE = 1 PASSKEY_ENTRY_STARTED = 0
SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE = 2 PASSKEY_DIGIT_ENTERED = 1
SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE = 3 PASSKEY_DIGIT_ERASED = 2
SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4 PASSKEY_CLEARED = 3
PASSKEY_ENTRY_COMPLETED = 4
SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = {
SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE'
}
# Bit flags for key distribution/generation # Bit flags for key distribution/generation
SMP_ENC_KEY_DISTRIBUTION_FLAG = 0b0001 class KeyDistribution(hci.SpecableFlag):
SMP_ID_KEY_DISTRIBUTION_FLAG = 0b0010 ENC_KEY = 0b0001
SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100 ID_KEY = 0b0010
SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000 SIGN_KEY = 0b0100
LINK_KEY = 0b1000
# AuthReq fields # AuthReq fields
SMP_BONDING_AUTHREQ = 0b00000001 class AuthReq(hci.SpecableFlag):
SMP_MITM_AUTHREQ = 0b00000100 BONDING = 0b00000001
SMP_SC_AUTHREQ = 0b00001000 MITM = 0b00000100
SMP_KEYPRESS_AUTHREQ = 0b00010000 SC = 0b00001000
SMP_CT2_AUTHREQ = 0b00100000 KEYPRESS = 0b00010000
CT2 = 0b00100000
@classmethod
def from_booleans(
cls,
bonding: bool = False,
sc: bool = False,
mitm: bool = False,
keypress: bool = False,
ct2: bool = False,
) -> AuthReq:
auth_req = AuthReq(0)
if bonding:
auth_req |= AuthReq.BONDING
if sc:
auth_req |= AuthReq.SC
if mitm:
auth_req |= AuthReq.MITM
if keypress:
auth_req |= AuthReq.KEYPRESS
if ct2:
auth_req |= AuthReq.CT2
return auth_req
# Crypto salt # Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031') SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
@@ -188,8 +186,6 @@ SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Utils # Utils
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def error_name(error_code: int) -> str:
return name_or_number(SMP_ERROR_NAMES, error_code)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -201,20 +197,20 @@ class SMP_Command:
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
''' '''
smp_classes: ClassVar[dict[int, type[SMP_Command]]] = {} smp_classes: ClassVar[dict[CommandCode, type[SMP_Command]]] = {}
fields: ClassVar[Fields] fields: ClassVar[Fields]
code: int = field(default=0, init=False) code: CommandCode = field(default=CommandCode(0), init=False)
name: str = field(default='', init=False) name: str = field(default='', init=False)
_payload: bytes | None = field(default=None, init=False) _payload: bytes | None = field(default=None, init=False)
@classmethod @classmethod
def from_bytes(cls, pdu: bytes) -> SMP_Command: def from_bytes(cls, pdu: bytes) -> SMP_Command:
code = pdu[0] code = CommandCode(pdu[0])
subclass = SMP_Command.smp_classes.get(code) subclass = SMP_Command.smp_classes.get(code)
if subclass is None: if subclass is None:
instance = SMP_Command() instance = SMP_Command()
instance.name = SMP_Command.command_name(code) instance.name = code.name
instance.code = code instance.code = code
instance.payload = pdu instance.payload = pdu
return instance return instance
@@ -222,59 +218,14 @@ class SMP_Command:
instance.payload = pdu[1:] instance.payload = pdu[1:]
return instance return instance
@staticmethod
def command_name(code: int) -> str:
return name_or_number(SMP_COMMAND_NAMES, code)
@staticmethod
def auth_req_str(value: int) -> str:
bonding_flags = value & 3
mitm = (value >> 2) & 1
sc = (value >> 3) & 1
keypress = (value >> 4) & 1
ct2 = (value >> 5) & 1
return (
f'bonding_flags={bonding_flags}, '
f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}'
)
@staticmethod
def io_capability_name(io_capability: int) -> str:
return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability)
@staticmethod
def key_distribution_str(value: int) -> str:
key_types: list[str] = []
if value & SMP_ENC_KEY_DISTRIBUTION_FLAG:
key_types.append('ENC')
if value & SMP_ID_KEY_DISTRIBUTION_FLAG:
key_types.append('ID')
if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
key_types.append('SIGN')
if value & SMP_LINK_KEY_DISTRIBUTION_FLAG:
key_types.append('LINK')
return ','.join(key_types)
@staticmethod
def keypress_notification_type_name(notification_type: int) -> str:
return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
_Command = TypeVar("_Command", bound="SMP_Command") _Command = TypeVar("_Command", bound="SMP_Command")
@classmethod @classmethod
def subclass(cls, subclass: type[_Command]) -> type[_Command]: def subclass(cls, subclass: type[_Command]) -> type[_Command]:
subclass.name = subclass.__name__.upper()
subclass.code = key_with_value(SMP_COMMAND_NAMES, subclass.name)
if subclass.code is None:
raise KeyError(
f'Command name {subclass.name} not found in SMP_COMMAND_NAMES'
)
subclass.fields = HCI_Object.fields_from_dataclass(subclass) subclass.fields = HCI_Object.fields_from_dataclass(subclass)
subclass.name = subclass.__name__.upper()
# Register a factory for this class # Register a factory for this class
SMP_Command.smp_classes[subclass.code] = subclass SMP_Command.smp_classes[subclass.code] = subclass
return subclass return subclass
@property @property
@@ -308,19 +259,17 @@ class SMP_Pairing_Request_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
''' '''
io_capability: int = field( code = CommandCode.PAIRING_REQUEST
metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name})
) io_capability: IoCapability = field(metadata=IoCapability.type_metadata(1))
oob_data_flag: int = field(metadata=metadata(1)) oob_data_flag: int = field(metadata=metadata(1))
auth_req: int = field( auth_req: AuthReq = field(metadata=AuthReq.type_metadata(1))
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
)
maximum_encryption_key_size: int = field(metadata=metadata(1)) maximum_encryption_key_size: int = field(metadata=metadata(1))
initiator_key_distribution: int = field( initiator_key_distribution: KeyDistribution = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) metadata=KeyDistribution.type_metadata(1)
) )
responder_key_distribution: int = field( responder_key_distribution: KeyDistribution = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) metadata=KeyDistribution.type_metadata(1)
) )
@@ -332,19 +281,17 @@ class SMP_Pairing_Response_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
''' '''
io_capability: int = field( code = CommandCode.PAIRING_RESPONSE
metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name})
) io_capability: IoCapability = field(metadata=IoCapability.type_metadata(1))
oob_data_flag: int = field(metadata=metadata(1)) oob_data_flag: int = field(metadata=metadata(1))
auth_req: int = field( auth_req: AuthReq = field(metadata=AuthReq.type_metadata(1))
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
)
maximum_encryption_key_size: int = field(metadata=metadata(1)) maximum_encryption_key_size: int = field(metadata=metadata(1))
initiator_key_distribution: int = field( initiator_key_distribution: KeyDistribution = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) metadata=KeyDistribution.type_metadata(1)
) )
responder_key_distribution: int = field( responder_key_distribution: KeyDistribution = field(
metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) metadata=KeyDistribution.type_metadata(1)
) )
@@ -356,6 +303,8 @@ class SMP_Pairing_Confirm_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
''' '''
code = CommandCode.PAIRING_CONFIRM
confirm_value: bytes = field(metadata=metadata(16)) confirm_value: bytes = field(metadata=metadata(16))
@@ -367,6 +316,8 @@ class SMP_Pairing_Random_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
''' '''
code = CommandCode.PAIRING_RANDOM
random_value: bytes = field(metadata=metadata(16)) random_value: bytes = field(metadata=metadata(16))
@@ -378,7 +329,9 @@ class SMP_Pairing_Failed_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
''' '''
reason: int = field(metadata=metadata({'size': 1, 'mapper': error_name})) code = CommandCode.PAIRING_FAILED
reason: ErrorCode = field(metadata=ErrorCode.type_metadata(1))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -389,6 +342,8 @@ class SMP_Pairing_Public_Key_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
''' '''
code = CommandCode.PAIRING_PUBLIC_KEY
public_key_x: bytes = field(metadata=metadata(32)) public_key_x: bytes = field(metadata=metadata(32))
public_key_y: bytes = field(metadata=metadata(32)) public_key_y: bytes = field(metadata=metadata(32))
@@ -401,6 +356,8 @@ class SMP_Pairing_DHKey_Check_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
''' '''
code = CommandCode.PAIRING_DHKEY_CHECK
dhkey_check: bytes = field(metadata=metadata(16)) dhkey_check: bytes = field(metadata=metadata(16))
@@ -412,10 +369,10 @@ class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
''' '''
notification_type: int = field( code = CommandCode.PAIRING_KEYPRESS_NOTIFICATION
metadata=metadata(
{'size': 1, 'mapper': SMP_Command.keypress_notification_type_name} notification_type: KeypressNotificationType = field(
) metadata=KeypressNotificationType.type_metadata(1)
) )
@@ -427,6 +384,8 @@ class SMP_Encryption_Information_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
''' '''
code = CommandCode.ENCRYPTION_INFORMATION
long_term_key: bytes = field(metadata=metadata(16)) long_term_key: bytes = field(metadata=metadata(16))
@@ -438,6 +397,8 @@ class SMP_Master_Identification_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
''' '''
code = CommandCode.MASTER_IDENTIFICATION
ediv: int = field(metadata=metadata(2)) ediv: int = field(metadata=metadata(2))
rand: bytes = field(metadata=metadata(8)) rand: bytes = field(metadata=metadata(8))
@@ -450,6 +411,8 @@ class SMP_Identity_Information_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
''' '''
code = CommandCode.IDENTITY_INFORMATION
identity_resolving_key: bytes = field(metadata=metadata(16)) identity_resolving_key: bytes = field(metadata=metadata(16))
@@ -461,6 +424,8 @@ class SMP_Identity_Address_Information_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
''' '''
code = CommandCode.IDENTITY_ADDRESS_INFORMATION
addr_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC)) addr_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC))
bd_addr: Address = field(metadata=metadata(Address.parse_address_preceded_by_type)) bd_addr: Address = field(metadata=metadata(Address.parse_address_preceded_by_type))
@@ -473,6 +438,8 @@ class SMP_Signing_Information_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
''' '''
code = CommandCode.SIGNING_INFORMATION
signature_key: bytes = field(metadata=metadata(16)) signature_key: bytes = field(metadata=metadata(16))
@@ -484,25 +451,9 @@ class SMP_Security_Request_Command(SMP_Command):
See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
''' '''
auth_req: int = field( code = CommandCode.SECURITY_REQUEST
metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str})
)
auth_req: AuthReq = field(metadata=AuthReq.type_metadata(1))
# -----------------------------------------------------------------------------
def smp_auth_req(bonding: bool, mitm: bool, sc: bool, keypress: bool, ct2: bool) -> int:
value = 0
if bonding:
value |= SMP_BONDING_AUTHREQ
if mitm:
value |= SMP_MITM_AUTHREQ
if sc:
value |= SMP_SC_AUTHREQ
if keypress:
value |= SMP_KEYPRESS_AUTHREQ
if ct2:
value |= SMP_CT2_AUTHREQ
return value
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -676,8 +627,8 @@ class Session:
self.ltk_rand = bytes(8) self.ltk_rand = bytes(8)
self.link_key: bytes | None = None self.link_key: bytes | None = None
self.maximum_encryption_key_size: int = 0 self.maximum_encryption_key_size: int = 0
self.initiator_key_distribution: int = 0 self.initiator_key_distribution: KeyDistribution = KeyDistribution(0)
self.responder_key_distribution: int = 0 self.responder_key_distribution: KeyDistribution = KeyDistribution(0)
self.peer_random_value: bytes | None = None self.peer_random_value: bytes | None = None
self.peer_public_key_x: bytes = bytes(32) self.peer_public_key_x: bytes = bytes(32)
self.peer_public_key_y = bytes(32) self.peer_public_key_y = bytes(32)
@@ -728,10 +679,10 @@ class Session:
) )
# Key Distribution (default values before negotiation) # Key Distribution (default values before negotiation)
self.initiator_key_distribution = ( self.initiator_key_distribution = KeyDistribution(
pairing_config.delegate.local_initiator_key_distribution pairing_config.delegate.local_initiator_key_distribution
) )
self.responder_key_distribution = ( self.responder_key_distribution = KeyDistribution(
pairing_config.delegate.local_responder_key_distribution pairing_config.delegate.local_responder_key_distribution
) )
@@ -743,7 +694,7 @@ class Session:
self.ct2: bool = False self.ct2: bool = False
# I/O Capabilities # I/O Capabilities
self.io_capability = pairing_config.delegate.io_capability self.io_capability = IoCapability(pairing_config.delegate.io_capability)
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# OOB # OOB
@@ -822,8 +773,14 @@ class Session:
return self.nx[0 if self.is_responder else 1] return self.nx[0 if self.is_responder else 1]
@property @property
def auth_req(self) -> int: def auth_req(self) -> AuthReq:
return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2) return AuthReq.from_booleans(
bonding=self.bonding,
sc=self.sc,
mitm=self.mitm,
keypress=self.keypress,
ct2=self.ct2,
)
def get_long_term_key(self, rand: bytes, ediv: int) -> bytes | None: def get_long_term_key(self, rand: bytes, ediv: int) -> bytes | None:
if not self.sc and not self.completed: if not self.sc and not self.completed:
@@ -843,7 +800,7 @@ class Session:
if self.connection.transport == PhysicalTransport.BR_EDR: if self.connection.transport == PhysicalTransport.BR_EDR:
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
return return
if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0): if (not self.mitm) and (auth_req & AuthReq.MITM == 0):
self.pairing_method = PairingMethod.JUST_WORKS self.pairing_method = PairingMethod.JUST_WORKS
return return
@@ -861,7 +818,7 @@ class Session:
self.passkey_display = details[1 if self.is_initiator else 2] self.passkey_display = details[1 if self.is_initiator else 2]
def check_expected_value( def check_expected_value(
self, expected: bytes, received: bytes, error: int self, expected: bytes, received: bytes, error: ErrorCode
) -> bool: ) -> bool:
logger.debug(f'expected={expected.hex()} got={received.hex()}') logger.debug(f'expected={expected.hex()} got={received.hex()}')
if expected != received: if expected != received:
@@ -881,7 +838,7 @@ class Session:
except Exception: except Exception:
logger.exception('exception while confirm') logger.exception('exception while confirm')
self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) self.send_pairing_failed(ErrorCode.CONFIRM_VALUE_FAILED)
self.connection.cancel_on_disconnection(prompt()) self.connection.cancel_on_disconnection(prompt())
@@ -900,7 +857,7 @@ class Session:
except Exception: except Exception:
logger.exception('exception while prompting') logger.exception('exception while prompting')
self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) self.send_pairing_failed(ErrorCode.CONFIRM_VALUE_FAILED)
self.connection.cancel_on_disconnection(prompt()) self.connection.cancel_on_disconnection(prompt())
@@ -911,13 +868,13 @@ class Session:
passkey = await self.pairing_config.delegate.get_number() passkey = await self.pairing_config.delegate.get_number()
if passkey is None: if passkey is None:
logger.debug('Passkey request rejected') logger.debug('Passkey request rejected')
self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) self.send_pairing_failed(ErrorCode.PASSKEY_ENTRY_FAILED)
return return
logger.debug(f'user input: {passkey}') logger.debug(f'user input: {passkey}')
next_steps(passkey) next_steps(passkey)
except Exception: except Exception:
logger.exception('exception while prompting') logger.exception('exception while prompting')
self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) self.send_pairing_failed(ErrorCode.PASSKEY_ENTRY_FAILED)
self.connection.cancel_on_disconnection(prompt()) self.connection.cancel_on_disconnection(prompt())
@@ -972,7 +929,7 @@ class Session:
def send_command(self, command: SMP_Command) -> None: def send_command(self, command: SMP_Command) -> None:
self.manager.send_command(self.connection, command) self.manager.send_command(self.connection, command)
def send_pairing_failed(self, error: int) -> None: def send_pairing_failed(self, error: ErrorCode) -> None:
self.send_command(SMP_Pairing_Failed_Command(reason=error)) self.send_command(SMP_Pairing_Failed_Command(reason=error))
self.on_pairing_failure(error) self.on_pairing_failure(error)
@@ -1144,7 +1101,7 @@ class Session:
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!' 'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
) )
self.send_pairing_failed( self.send_pairing_failed(
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR ErrorCode.CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED
) )
else: else:
self.ltk = self.derive_ltk(self.link_key, self.ct2) self.ltk = self.derive_ltk(self.link_key, self.ct2)
@@ -1155,14 +1112,14 @@ class Session:
# CTKD: Derive LTK from LinkKey # CTKD: Derive LTK from LinkKey
if ( if (
self.connection.transport == PhysicalTransport.BR_EDR self.connection.transport == PhysicalTransport.BR_EDR
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG and self.initiator_key_distribution & KeyDistribution.ENC_KEY
): ):
self.ctkd_task = self.connection.cancel_on_disconnection( self.ctkd_task = self.connection.cancel_on_disconnection(
self.get_link_key_and_derive_ltk() self.get_link_key_and_derive_ltk()
) )
elif not self.sc: elif not self.sc:
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & KeyDistribution.ENC_KEY:
self.send_command( self.send_command(
SMP_Encryption_Information_Command(long_term_key=self.ltk) SMP_Encryption_Information_Command(long_term_key=self.ltk)
) )
@@ -1173,7 +1130,7 @@ class Session:
) )
# Distribute IRK & BD ADDR # Distribute IRK & BD ADDR
if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & KeyDistribution.ID_KEY:
self.send_command( self.send_command(
SMP_Identity_Information_Command( SMP_Identity_Information_Command(
identity_resolving_key=self.manager.device.irk identity_resolving_key=self.manager.device.irk
@@ -1183,25 +1140,25 @@ class Session:
# Distribute CSRK # Distribute CSRK
csrk = bytes(16) # FIXME: testing csrk = bytes(16) # FIXME: testing
if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & KeyDistribution.SIGN_KEY:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & KeyDistribution.LINK_KEY:
self.link_key = self.derive_link_key(self.ltk, self.ct2) self.link_key = self.derive_link_key(self.ltk, self.ct2)
else: else:
# CTKD: Derive LTK from LinkKey # CTKD: Derive LTK from LinkKey
if ( if (
self.connection.transport == PhysicalTransport.BR_EDR self.connection.transport == PhysicalTransport.BR_EDR
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG and self.responder_key_distribution & KeyDistribution.ENC_KEY
): ):
self.ctkd_task = self.connection.cancel_on_disconnection( self.ctkd_task = self.connection.cancel_on_disconnection(
self.get_link_key_and_derive_ltk() self.get_link_key_and_derive_ltk()
) )
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
elif not self.sc: elif not self.sc:
if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & KeyDistribution.ENC_KEY:
self.send_command( self.send_command(
SMP_Encryption_Information_Command(long_term_key=self.ltk) SMP_Encryption_Information_Command(long_term_key=self.ltk)
) )
@@ -1212,7 +1169,7 @@ class Session:
) )
# Distribute IRK & BD ADDR # Distribute IRK & BD ADDR
if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & KeyDistribution.ID_KEY:
self.send_command( self.send_command(
SMP_Identity_Information_Command( SMP_Identity_Information_Command(
identity_resolving_key=self.manager.device.irk identity_resolving_key=self.manager.device.irk
@@ -1222,30 +1179,30 @@ class Session:
# Distribute CSRK # Distribute CSRK
csrk = bytes(16) # FIXME: testing csrk = bytes(16) # FIXME: testing
if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & KeyDistribution.SIGN_KEY:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & KeyDistribution.LINK_KEY:
self.link_key = self.derive_link_key(self.ltk, self.ct2) self.link_key = self.derive_link_key(self.ltk, self.ct2)
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
# Set our expectations for what to wait for in the key distribution phase # Set our expectations for what to wait for in the key distribution phase
self.peer_expected_distributions = [] self.peer_expected_distributions = []
if not self.sc and self.connection.transport == PhysicalTransport.LE: if not self.sc and self.connection.transport == PhysicalTransport.LE:
if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0: if key_distribution_flags & KeyDistribution.ENC_KEY != 0:
self.peer_expected_distributions.append( self.peer_expected_distributions.append(
SMP_Encryption_Information_Command SMP_Encryption_Information_Command
) )
self.peer_expected_distributions.append( self.peer_expected_distributions.append(
SMP_Master_Identification_Command SMP_Master_Identification_Command
) )
if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0: if key_distribution_flags & KeyDistribution.ID_KEY != 0:
self.peer_expected_distributions.append(SMP_Identity_Information_Command) self.peer_expected_distributions.append(SMP_Identity_Information_Command)
self.peer_expected_distributions.append( self.peer_expected_distributions.append(
SMP_Identity_Address_Information_Command SMP_Identity_Address_Information_Command
) )
if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0: if key_distribution_flags & KeyDistribution.SIGN_KEY != 0:
self.peer_expected_distributions.append(SMP_Signing_Information_Command) self.peer_expected_distributions.append(SMP_Signing_Information_Command)
logger.debug( logger.debug(
'expecting distributions: ' 'expecting distributions: '
@@ -1258,7 +1215,7 @@ class Session:
logger.warning( logger.warning(
color('received key distribution on a non-encrypted connection', 'red') color('received key distribution on a non-encrypted connection', 'red')
) )
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) self.send_pairing_failed(ErrorCode.UNSPECIFIED_REASON)
return return
# Check that this command class is expected # Check that this command class is expected
@@ -1278,7 +1235,7 @@ class Session:
'red', 'red',
) )
) )
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) self.send_pairing_failed(ErrorCode.UNSPECIFIED_REASON)
async def pair(self) -> None: async def pair(self) -> None:
# Start pairing as an initiator # Start pairing as an initiator
@@ -1389,34 +1346,56 @@ class Session:
) )
await self.manager.on_pairing(self, peer_address, keys) await self.manager.on_pairing(self, peer_address, keys)
def on_pairing_failure(self, reason: int) -> None: def on_pairing_failure(self, reason: ErrorCode) -> None:
logger.warning(f'pairing failure ({error_name(reason)})') logger.warning('pairing failure (%s)', reason.name)
if self.completed: if self.completed:
return return
self.completed = True self.completed = True
error = ProtocolError(reason, 'smp', error_name(reason)) error = ProtocolError(reason, 'smp', reason.name)
if self.pairing_result is not None and not self.pairing_result.done(): if self.pairing_result is not None and not self.pairing_result.done():
self.pairing_result.set_exception(error) self.pairing_result.set_exception(error)
self.manager.on_pairing_failure(self, reason) self.manager.on_pairing_failure(self, reason)
def on_smp_command(self, command: SMP_Command) -> None: def on_smp_command(self, command: SMP_Command) -> None:
# Find the handler method try:
handler_name = f'on_{command.name.lower()}' match command:
handler = getattr(self, handler_name, None) case SMP_Pairing_Request_Command():
if handler is not None: self.on_smp_pairing_request_command(command)
try: case SMP_Pairing_Response_Command():
handler(command) self.on_smp_pairing_response_command(command)
except Exception: case SMP_Pairing_Confirm_Command():
logger.exception(color("!!! Exception in handler:", "red")) self.on_smp_pairing_confirm_command(command)
response = SMP_Pairing_Failed_Command( case SMP_Pairing_Random_Command():
reason=SMP_UNSPECIFIED_REASON_ERROR self.on_smp_pairing_random_command(command)
) case SMP_Pairing_Failed_Command():
self.send_command(response) self.on_smp_pairing_failed_command(command)
else: case SMP_Encryption_Information_Command():
logger.error(color('SMP command not handled???', 'red')) self.on_smp_encryption_information_command(command)
case SMP_Master_Identification_Command():
self.on_smp_master_identification_command(command)
case SMP_Identity_Information_Command():
self.on_smp_identity_information_command(command)
case SMP_Identity_Address_Information_Command():
self.on_smp_identity_address_information_command(command)
case SMP_Signing_Information_Command():
self.on_smp_signing_information_command(command)
case SMP_Pairing_Public_Key_Command():
self.on_smp_pairing_public_key_command(command)
case SMP_Pairing_DHKey_Check_Command():
self.on_smp_pairing_dhkey_check_command(command)
# case SMP_Security_Request_Command():
# self.on_smp_security_request_command(command)
# case SMP_Pairing_Keypress_Notification_Command():
# self.on_smp_pairing_keypress_notification_command(command)
case _:
logger.error(color('SMP command not handled', 'red'))
except Exception:
logger.exception(color("!!! Exception in handler:", "red"))
response = SMP_Pairing_Failed_Command(reason=ErrorCode.UNSPECIFIED_REASON)
self.send_command(response)
def on_smp_pairing_request_command( def on_smp_pairing_request_command(
self, command: SMP_Pairing_Request_Command self, command: SMP_Pairing_Request_Command
@@ -1436,16 +1415,16 @@ class Session:
accepted = False accepted = False
if not accepted: if not accepted:
logger.debug('pairing rejected by delegate') logger.debug('pairing rejected by delegate')
self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR) self.send_pairing_failed(ErrorCode.PAIRING_NOT_SUPPORTED)
return return
# Save the request # Save the request
self.preq = bytes(command) self.preq = bytes(command)
# Bonding and SC require both sides to request/support it # Bonding and SC require both sides to request/support it
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) self.bonding = self.bonding and (command.auth_req & AuthReq.BONDING != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) self.sc = self.sc and (command.auth_req & AuthReq.SC != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) self.ct2 = self.ct2 and (command.auth_req & AuthReq.CT2 != 0)
# Infer the pairing method # Infer the pairing method
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
@@ -1456,7 +1435,7 @@ class Session:
if not self.sc and self.tk is None: if not self.sc and self.tk is None:
# For legacy OOB, TK is required. # For legacy OOB, TK is required.
logger.warning("legacy OOB without TK") logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) self.send_pairing_failed(ErrorCode.OOB_NOT_AVAILABLE)
return return
if command.oob_data_flag == 0: if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0 # The peer doesn't have OOB data, use r=0
@@ -1475,8 +1454,11 @@ class Session:
( (
self.initiator_key_distribution, self.initiator_key_distribution,
self.responder_key_distribution, self.responder_key_distribution,
) = await self.pairing_config.delegate.key_distribution_response( ) = map(
command.initiator_key_distribution, command.responder_key_distribution KeyDistribution,
await self.pairing_config.delegate.key_distribution_response(
command.initiator_key_distribution, command.responder_key_distribution
),
) )
self.compute_peer_expected_distributions(self.initiator_key_distribution) self.compute_peer_expected_distributions(self.initiator_key_distribution)
@@ -1514,8 +1496,8 @@ class Session:
self.peer_io_capability = command.io_capability self.peer_io_capability = command.io_capability
# Bonding and SC require both sides to request/support it # Bonding and SC require both sides to request/support it
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) self.bonding = self.bonding and (command.auth_req & AuthReq.BONDING != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) self.sc = self.sc and (command.auth_req & AuthReq.SC != 0)
# Infer the pairing method # Infer the pairing method
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
@@ -1526,7 +1508,7 @@ class Session:
if not self.sc and self.tk is None: if not self.sc and self.tk is None:
# For legacy OOB, TK is required. # For legacy OOB, TK is required.
logger.warning("legacy OOB without TK") logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) self.send_pairing_failed(ErrorCode.OOB_NOT_AVAILABLE)
return return
if command.oob_data_flag == 0: if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0 # The peer doesn't have OOB data, use r=0
@@ -1546,7 +1528,7 @@ class Session:
command.responder_key_distribution & ~self.responder_key_distribution != 0 command.responder_key_distribution & ~self.responder_key_distribution != 0
): ):
# The response isn't a subset of the request # The response isn't a subset of the request
self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR) self.send_pairing_failed(ErrorCode.INVALID_PARAMETERS)
return return
self.initiator_key_distribution = command.initiator_key_distribution self.initiator_key_distribution = command.initiator_key_distribution
self.responder_key_distribution = command.responder_key_distribution self.responder_key_distribution = command.responder_key_distribution
@@ -1624,7 +1606,7 @@ class Session:
) )
assert self.confirm_value assert self.confirm_value
if not self.check_expected_value( if not self.check_expected_value(
self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED
): ):
return return
@@ -1665,7 +1647,7 @@ class Session:
self.pkb, self.pka, command.random_value, bytes([0]) self.pkb, self.pka, command.random_value, bytes([0])
) )
if not self.check_expected_value( if not self.check_expected_value(
self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED
): ):
return return
elif self.pairing_method == PairingMethod.PASSKEY: elif self.pairing_method == PairingMethod.PASSKEY:
@@ -1678,7 +1660,7 @@ class Session:
bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
) )
if not self.check_expected_value( if not self.check_expected_value(
self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED
): ):
return return
@@ -1707,7 +1689,7 @@ class Session:
bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
) )
if not self.check_expected_value( if not self.check_expected_value(
self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED
): ):
return return
@@ -1824,7 +1806,7 @@ class Session:
if not self.check_expected_value( if not self.check_expected_value(
self.peer_oob_data.c, self.peer_oob_data.c,
confirm_verifier, confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR, ErrorCode.CONFIRM_VALUE_FAILED,
): ):
return return
@@ -1858,7 +1840,7 @@ class Session:
expected = self.eb if self.is_initiator else self.ea expected = self.eb if self.is_initiator else self.ea
assert expected assert expected
if not self.check_expected_value( if not self.check_expected_value(
expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR expected, command.dhkey_check, ErrorCode.DHKEY_CHECK_FAILED
): ):
return return
@@ -1962,7 +1944,7 @@ class Manager(utils.EventEmitter):
) )
# Security request is more than just pairing, so let applications handle them # Security request is more than just pairing, so let applications handle them
if command.code == SMP_SECURITY_REQUEST_COMMAND: if command.code == CommandCode.SECURITY_REQUEST:
self.on_smp_security_request_command( self.on_smp_security_request_command(
connection, cast(SMP_Security_Request_Command, command) connection, cast(SMP_Security_Request_Command, command)
) )
@@ -2002,15 +1984,13 @@ class Manager(utils.EventEmitter):
def request_pairing(self, connection: Connection) -> None: def request_pairing(self, connection: Connection) -> None:
pairing_config = self.pairing_config_factory(connection) pairing_config = self.pairing_config_factory(connection)
if pairing_config: if pairing_config:
auth_req = smp_auth_req( auth_req = AuthReq.from_booleans(
pairing_config.bonding, bonding=pairing_config.bonding,
pairing_config.mitm, sc=pairing_config.sc,
pairing_config.sc, mitm=pairing_config.mitm,
False,
False,
) )
else: else:
auth_req = 0 auth_req = AuthReq(0)
self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req)) self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req))
def on_session_start(self, session: Session) -> None: def on_session_start(self, session: Session) -> None:
@@ -2026,7 +2006,7 @@ class Manager(utils.EventEmitter):
# Notify the device # Notify the device
self.device.on_pairing(session.connection, identity_address, keys, session.sc) self.device.on_pairing(session.connection, identity_address, keys, session.sc)
def on_pairing_failure(self, session: Session, reason: int) -> None: def on_pairing_failure(self, session: Session, reason: ErrorCode) -> None:
self.device.on_pairing_failure(session.connection, reason) self.device.on_pairing_failure(session.connection, reason)
def on_session_end(self, session: Session) -> None: def on_session_end(self, session: Session) -> None:

View File

@@ -29,8 +29,7 @@ from bumble.gatt import Characteristic, Service
from bumble.hci import Role from bumble.hci import Role
from bumble.pairing import PairingConfig, PairingDelegate from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import ( from bumble.smp import (
SMP_CONFIRM_VALUE_FAILED_ERROR, ErrorCode,
SMP_PAIRING_NOT_SUPPORTED_ERROR,
OobContext, OobContext,
OobLegacyContext, OobLegacyContext,
) )
@@ -378,7 +377,7 @@ async def test_self_smp_reject():
await _test_self_smp_with_configs(None, rejecting_pairing_config) await _test_self_smp_with_configs(None, rejecting_pairing_config)
paired = True paired = True
except ProtocolError as error: except ProtocolError as error:
assert error.error_code == SMP_PAIRING_NOT_SUPPORTED_ERROR assert error.error_code == ErrorCode.PAIRING_NOT_SUPPORTED
assert not paired assert not paired
@@ -403,7 +402,7 @@ async def test_self_smp_wrong_pin():
) )
paired = True paired = True
except ProtocolError as error: except ProtocolError as error:
assert error.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR assert error.error_code == ErrorCode.CONFIRM_VALUE_FAILED
assert not paired assert not paired
@@ -534,11 +533,11 @@ async def test_self_smp_oob_sc():
with pytest.raises(ProtocolError) as error: with pytest.raises(ProtocolError) as error:
await _test_self_smp_with_configs(pairing_config_1, pairing_config_4) await _test_self_smp_with_configs(pairing_config_1, pairing_config_4)
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR assert error.value.error_code == ErrorCode.CONFIRM_VALUE_FAILED
with pytest.raises(ProtocolError): with pytest.raises(ProtocolError):
await _test_self_smp_with_configs(pairing_config_4, pairing_config_1) await _test_self_smp_with_configs(pairing_config_4, pairing_config_1)
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR assert error.value.error_code == ErrorCode.CONFIRM_VALUE_FAILED
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------