forked from auracaster/bumble_mirror
62
apps/pair.py
62
apps/pair.py
@@ -24,10 +24,16 @@ from prompt_toolkit.shortcuts import PromptSession
|
|||||||
from bumble.colors import color
|
from bumble.colors import color
|
||||||
from bumble.device import Device, Peer
|
from bumble.device import Device, Peer
|
||||||
from bumble.transport import open_transport_or_link
|
from bumble.transport import open_transport_or_link
|
||||||
from bumble.pairing import PairingDelegate, PairingConfig
|
from bumble.pairing import OobData, PairingDelegate, PairingConfig
|
||||||
|
from bumble.smp import OobContext, OobLegacyContext
|
||||||
from bumble.smp import error_name as smp_error_name
|
from bumble.smp import error_name as smp_error_name
|
||||||
from bumble.keys import JsonKeyStore
|
from bumble.keys import JsonKeyStore
|
||||||
from bumble.core import ProtocolError
|
from bumble.core import (
|
||||||
|
AdvertisingData,
|
||||||
|
ProtocolError,
|
||||||
|
BT_LE_TRANSPORT,
|
||||||
|
BT_BR_EDR_TRANSPORT,
|
||||||
|
)
|
||||||
from bumble.gatt import (
|
from bumble.gatt import (
|
||||||
GATT_DEVICE_NAME_CHARACTERISTIC,
|
GATT_DEVICE_NAME_CHARACTERISTIC,
|
||||||
GATT_GENERIC_ACCESS_SERVICE,
|
GATT_GENERIC_ACCESS_SERVICE,
|
||||||
@@ -60,7 +66,7 @@ class Waiter:
|
|||||||
class Delegate(PairingDelegate):
|
class Delegate(PairingDelegate):
|
||||||
def __init__(self, mode, connection, capability_string, do_prompt):
|
def __init__(self, mode, connection, capability_string, do_prompt):
|
||||||
super().__init__(
|
super().__init__(
|
||||||
{
|
io_capability={
|
||||||
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
|
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
|
||||||
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
|
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
|
||||||
'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
|
'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
|
||||||
@@ -286,6 +292,7 @@ async def pair(
|
|||||||
bond,
|
bond,
|
||||||
ctkd,
|
ctkd,
|
||||||
io,
|
io,
|
||||||
|
oob,
|
||||||
prompt,
|
prompt,
|
||||||
request,
|
request,
|
||||||
print_keys,
|
print_keys,
|
||||||
@@ -343,16 +350,51 @@ async def pair(
|
|||||||
await device.keystore.print(prefix=color('@@@ ', 'blue'))
|
await device.keystore.print(prefix=color('@@@ ', 'blue'))
|
||||||
print(color('@@@-----------------------------------', 'blue'))
|
print(color('@@@-----------------------------------', 'blue'))
|
||||||
|
|
||||||
|
# Create an OOB context if needed
|
||||||
|
if oob:
|
||||||
|
our_oob_context = OobContext()
|
||||||
|
shared_data = (
|
||||||
|
None
|
||||||
|
if oob == '-'
|
||||||
|
else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob)))
|
||||||
|
)
|
||||||
|
legacy_context = OobLegacyContext()
|
||||||
|
oob_contexts = PairingConfig.OobConfig(
|
||||||
|
our_context=our_oob_context,
|
||||||
|
peer_data=shared_data,
|
||||||
|
legacy_context=legacy_context,
|
||||||
|
)
|
||||||
|
oob_data = OobData(
|
||||||
|
address=device.random_address,
|
||||||
|
shared_data=shared_data,
|
||||||
|
legacy_context=legacy_context,
|
||||||
|
)
|
||||||
|
print(color('@@@-----------------------------------', 'yellow'))
|
||||||
|
print(color('@@@ OOB Data:', 'yellow'))
|
||||||
|
print(color(f'@@@ {our_oob_context.share()}', 'yellow'))
|
||||||
|
print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow'))
|
||||||
|
print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
|
||||||
|
print(color('@@@-----------------------------------', 'yellow'))
|
||||||
|
else:
|
||||||
|
oob_contexts = None
|
||||||
|
|
||||||
# Set up a pairing config factory
|
# Set up a pairing config factory
|
||||||
device.pairing_config_factory = lambda connection: PairingConfig(
|
device.pairing_config_factory = lambda connection: PairingConfig(
|
||||||
sc, mitm, bond, Delegate(mode, connection, io, prompt)
|
sc=sc,
|
||||||
|
mitm=mitm,
|
||||||
|
bonding=bond,
|
||||||
|
oob=oob_contexts,
|
||||||
|
delegate=Delegate(mode, connection, io, prompt),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Connect to a peer or wait for a connection
|
# Connect to a peer or wait for a connection
|
||||||
device.on('connection', lambda connection: on_connection(connection, request))
|
device.on('connection', lambda connection: on_connection(connection, request))
|
||||||
if address_or_name is not None:
|
if address_or_name is not None:
|
||||||
print(color(f'=== Connecting to {address_or_name}...', 'green'))
|
print(color(f'=== Connecting to {address_or_name}...', 'green'))
|
||||||
connection = await device.connect(address_or_name)
|
connection = await device.connect(
|
||||||
|
address_or_name,
|
||||||
|
transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT,
|
||||||
|
)
|
||||||
|
|
||||||
if not request:
|
if not request:
|
||||||
try:
|
try:
|
||||||
@@ -421,6 +463,14 @@ class LogHandler(logging.Handler):
|
|||||||
default='display+keyboard',
|
default='display+keyboard',
|
||||||
show_default=True,
|
show_default=True,
|
||||||
)
|
)
|
||||||
|
@click.option(
|
||||||
|
'--oob',
|
||||||
|
metavar='<oob-data-hex>',
|
||||||
|
help=(
|
||||||
|
'Use OOB pairing with this data from the peer '
|
||||||
|
'(use "-" to enable OOB without peer data)'
|
||||||
|
),
|
||||||
|
)
|
||||||
@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
|
@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
|
||||||
@click.option(
|
@click.option(
|
||||||
'--request', is_flag=True, help='Request that the connecting peer initiate pairing'
|
'--request', is_flag=True, help='Request that the connecting peer initiate pairing'
|
||||||
@@ -441,6 +491,7 @@ def main(
|
|||||||
bond,
|
bond,
|
||||||
ctkd,
|
ctkd,
|
||||||
io,
|
io,
|
||||||
|
oob,
|
||||||
prompt,
|
prompt,
|
||||||
request,
|
request,
|
||||||
print_keys,
|
print_keys,
|
||||||
@@ -464,6 +515,7 @@ def main(
|
|||||||
bond,
|
bond,
|
||||||
ctkd,
|
ctkd,
|
||||||
io,
|
io,
|
||||||
|
oob,
|
||||||
prompt,
|
prompt,
|
||||||
request,
|
request,
|
||||||
print_keys,
|
print_keys,
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
# Imports
|
# Imports
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
import enum
|
||||||
import struct
|
import struct
|
||||||
from typing import List, Optional, Tuple, Union, cast, Dict
|
from typing import List, Optional, Tuple, Union, cast, Dict
|
||||||
|
|
||||||
@@ -1051,3 +1052,13 @@ class ConnectionPHY:
|
|||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
|
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# LE Role
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class LeRole(enum.IntEnum):
|
||||||
|
PERIPHERAL_ONLY = 0x00
|
||||||
|
CENTRAL_ONLY = 0x01
|
||||||
|
BOTH_PERIPHERAL_PREFERRED = 0x02
|
||||||
|
BOTH_CENTRAL_PREFERRED = 0x03
|
||||||
|
|||||||
@@ -15,7 +15,9 @@
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# Imports
|
# Imports
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
from __future__ import annotations
|
||||||
import enum
|
import enum
|
||||||
|
from dataclasses import dataclass
|
||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
from .hci import (
|
from .hci import (
|
||||||
@@ -35,7 +37,60 @@ from .smp import (
|
|||||||
SMP_ID_KEY_DISTRIBUTION_FLAG,
|
SMP_ID_KEY_DISTRIBUTION_FLAG,
|
||||||
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
|
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
|
||||||
SMP_LINK_KEY_DISTRIBUTION_FLAG,
|
SMP_LINK_KEY_DISTRIBUTION_FLAG,
|
||||||
|
OobContext,
|
||||||
|
OobLegacyContext,
|
||||||
|
OobSharedData,
|
||||||
)
|
)
|
||||||
|
from .core import AdvertisingData, LeRole
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class OobData:
|
||||||
|
"""OOB data that can be sent from one device to another."""
|
||||||
|
|
||||||
|
address: Optional[Address] = None
|
||||||
|
role: Optional[LeRole] = None
|
||||||
|
shared_data: Optional[OobSharedData] = None
|
||||||
|
legacy_context: Optional[OobLegacyContext] = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_ad(cls, ad: AdvertisingData) -> OobData:
|
||||||
|
instance = cls()
|
||||||
|
shared_data_c: Optional[bytes] = None
|
||||||
|
shared_data_r: Optional[bytes] = None
|
||||||
|
for ad_type, ad_data in ad.ad_structures:
|
||||||
|
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
|
||||||
|
instance.address = Address(ad_data)
|
||||||
|
elif ad_type == AdvertisingData.LE_ROLE:
|
||||||
|
instance.role = LeRole(ad_data[0])
|
||||||
|
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
|
||||||
|
shared_data_c = ad_data
|
||||||
|
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE:
|
||||||
|
shared_data_r = ad_data
|
||||||
|
elif ad_type == AdvertisingData.SECURITY_MANAGER_TK_VALUE:
|
||||||
|
instance.legacy_context = OobLegacyContext(tk=ad_data)
|
||||||
|
if shared_data_c and shared_data_r:
|
||||||
|
instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r)
|
||||||
|
|
||||||
|
return instance
|
||||||
|
|
||||||
|
def to_ad(self) -> AdvertisingData:
|
||||||
|
ad_structures = []
|
||||||
|
if self.address is not None:
|
||||||
|
ad_structures.append(
|
||||||
|
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
|
||||||
|
)
|
||||||
|
if self.role is not None:
|
||||||
|
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
|
||||||
|
if self.shared_data is not None:
|
||||||
|
ad_structures.extend(self.shared_data.to_ad().ad_structures)
|
||||||
|
if self.legacy_context is not None:
|
||||||
|
ad_structures.append(
|
||||||
|
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
|
||||||
|
)
|
||||||
|
|
||||||
|
return AdvertisingData(ad_structures)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -173,6 +228,14 @@ class PairingConfig:
|
|||||||
PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
|
PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
|
||||||
RANDOM = Address.RANDOM_DEVICE_ADDRESS
|
RANDOM = Address.RANDOM_DEVICE_ADDRESS
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OobConfig:
|
||||||
|
"""Config for OOB pairing."""
|
||||||
|
|
||||||
|
our_context: Optional[OobContext]
|
||||||
|
peer_data: Optional[OobSharedData]
|
||||||
|
legacy_context: Optional[OobLegacyContext]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
sc: bool = True,
|
sc: bool = True,
|
||||||
@@ -180,17 +243,20 @@ class PairingConfig:
|
|||||||
bonding: bool = True,
|
bonding: bool = True,
|
||||||
delegate: Optional[PairingDelegate] = None,
|
delegate: Optional[PairingDelegate] = None,
|
||||||
identity_address_type: Optional[AddressType] = None,
|
identity_address_type: Optional[AddressType] = None,
|
||||||
|
oob: Optional[OobConfig] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.sc = sc
|
self.sc = sc
|
||||||
self.mitm = mitm
|
self.mitm = mitm
|
||||||
self.bonding = bonding
|
self.bonding = bonding
|
||||||
self.delegate = delegate or PairingDelegate()
|
self.delegate = delegate or PairingDelegate()
|
||||||
self.identity_address_type = identity_address_type
|
self.identity_address_type = identity_address_type
|
||||||
|
self.oob = oob
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return (
|
return (
|
||||||
f'PairingConfig(sc={self.sc}, '
|
f'PairingConfig(sc={self.sc}, '
|
||||||
f'mitm={self.mitm}, bonding={self.bonding}, '
|
f'mitm={self.mitm}, bonding={self.bonding}, '
|
||||||
f'identity_address_type={self.identity_address_type}, '
|
f'identity_address_type={self.identity_address_type}, '
|
||||||
f'delegate[{self.delegate.io_capability}])'
|
f'delegate[{self.delegate.io_capability}]), '
|
||||||
|
f'oob[{self.oob}])'
|
||||||
)
|
)
|
||||||
|
|||||||
189
bumble/smp.py
189
bumble/smp.py
@@ -27,6 +27,7 @@ import logging
|
|||||||
import asyncio
|
import asyncio
|
||||||
import enum
|
import enum
|
||||||
import secrets
|
import secrets
|
||||||
|
from dataclasses import dataclass
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
@@ -53,6 +54,7 @@ from .core import (
|
|||||||
BT_BR_EDR_TRANSPORT,
|
BT_BR_EDR_TRANSPORT,
|
||||||
BT_CENTRAL_ROLE,
|
BT_CENTRAL_ROLE,
|
||||||
BT_LE_TRANSPORT,
|
BT_LE_TRANSPORT,
|
||||||
|
AdvertisingData,
|
||||||
ProtocolError,
|
ProtocolError,
|
||||||
name_or_number,
|
name_or_number,
|
||||||
)
|
)
|
||||||
@@ -563,6 +565,54 @@ class PairingMethod(enum.IntEnum):
|
|||||||
CTKD_OVER_CLASSIC = 4
|
CTKD_OVER_CLASSIC = 4
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class OobContext:
|
||||||
|
"""Cryptographic context for LE SC OOB pairing."""
|
||||||
|
|
||||||
|
ecc_key: crypto.EccKey
|
||||||
|
r: bytes
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None
|
||||||
|
) -> None:
|
||||||
|
self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key
|
||||||
|
self.r = crypto.r() if r is None else r
|
||||||
|
|
||||||
|
def share(self) -> OobSharedData:
|
||||||
|
pkx = bytes(reversed(self.ecc_key.x))
|
||||||
|
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class OobLegacyContext:
|
||||||
|
"""Cryptographic context for LE Legacy OOB pairing."""
|
||||||
|
|
||||||
|
tk: bytes
|
||||||
|
|
||||||
|
def __init__(self, tk: Optional[bytes] = None) -> None:
|
||||||
|
self.tk = crypto.r() if tk is None else tk
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class OobSharedData:
|
||||||
|
"""Shareable data for LE SC OOB pairing."""
|
||||||
|
|
||||||
|
c: bytes
|
||||||
|
r: bytes
|
||||||
|
|
||||||
|
def to_ad(self) -> AdvertisingData:
|
||||||
|
return AdvertisingData(
|
||||||
|
[
|
||||||
|
(AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c),
|
||||||
|
(AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return f'OOB(C={self.c.hex()}, R={self.r.hex()})'
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
class Session:
|
class Session:
|
||||||
# I/O Capability to pairing method decision matrix
|
# I/O Capability to pairing method decision matrix
|
||||||
@@ -640,8 +690,6 @@ class Session:
|
|||||||
self.pres: Optional[bytes] = None
|
self.pres: Optional[bytes] = None
|
||||||
self.ea = None
|
self.ea = None
|
||||||
self.eb = None
|
self.eb = None
|
||||||
self.tk = bytes(16)
|
|
||||||
self.r = bytes(16)
|
|
||||||
self.stk = None
|
self.stk = None
|
||||||
self.ltk = None
|
self.ltk = None
|
||||||
self.ltk_ediv = 0
|
self.ltk_ediv = 0
|
||||||
@@ -659,7 +707,7 @@ class Session:
|
|||||||
self.peer_bd_addr: Optional[Address] = None
|
self.peer_bd_addr: Optional[Address] = None
|
||||||
self.peer_signature_key = None
|
self.peer_signature_key = None
|
||||||
self.peer_expected_distributions: List[Type[SMP_Command]] = []
|
self.peer_expected_distributions: List[Type[SMP_Command]] = []
|
||||||
self.dh_key = None
|
self.dh_key = b''
|
||||||
self.confirm_value = None
|
self.confirm_value = None
|
||||||
self.passkey: Optional[int] = None
|
self.passkey: Optional[int] = None
|
||||||
self.passkey_ready = asyncio.Event()
|
self.passkey_ready = asyncio.Event()
|
||||||
@@ -712,8 +760,8 @@ class Session:
|
|||||||
self.io_capability = pairing_config.delegate.io_capability
|
self.io_capability = pairing_config.delegate.io_capability
|
||||||
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||||
|
|
||||||
# OOB (not supported yet)
|
# OOB
|
||||||
self.oob = False
|
self.oob_data_flag = 0 if pairing_config.oob is None else 1
|
||||||
|
|
||||||
# Set up addresses
|
# Set up addresses
|
||||||
self_address = connection.self_address
|
self_address = connection.self_address
|
||||||
@@ -729,9 +777,37 @@ class Session:
|
|||||||
self.ia = bytes(peer_address)
|
self.ia = bytes(peer_address)
|
||||||
self.iat = 1 if peer_address.is_random else 0
|
self.iat = 1 if peer_address.is_random else 0
|
||||||
|
|
||||||
|
# Select the ECC key, TK and r initial value
|
||||||
|
if pairing_config.oob:
|
||||||
|
self.peer_oob_data = pairing_config.oob.peer_data
|
||||||
|
if pairing_config.sc:
|
||||||
|
if pairing_config.oob.our_context is None:
|
||||||
|
raise ValueError(
|
||||||
|
"oob pairing config requires a context when sc is True"
|
||||||
|
)
|
||||||
|
self.r = pairing_config.oob.our_context.r
|
||||||
|
self.ecc_key = pairing_config.oob.our_context.ecc_key
|
||||||
|
if pairing_config.oob.legacy_context is None:
|
||||||
|
self.tk = None
|
||||||
|
else:
|
||||||
|
self.tk = pairing_config.oob.legacy_context.tk
|
||||||
|
else:
|
||||||
|
if pairing_config.oob.legacy_context is None:
|
||||||
|
raise ValueError(
|
||||||
|
"oob pairing config requires a legacy context when sc is False"
|
||||||
|
)
|
||||||
|
self.r = bytes(16)
|
||||||
|
self.ecc_key = manager.ecc_key
|
||||||
|
self.tk = pairing_config.oob.legacy_context.tk
|
||||||
|
else:
|
||||||
|
self.peer_oob_data = None
|
||||||
|
self.r = bytes(16)
|
||||||
|
self.ecc_key = manager.ecc_key
|
||||||
|
self.tk = bytes(16)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pkx(self) -> Tuple[bytes, bytes]:
|
def pkx(self) -> Tuple[bytes, bytes]:
|
||||||
return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x)
|
return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pka(self) -> bytes:
|
def pka(self) -> bytes:
|
||||||
@@ -768,7 +844,10 @@ class Session:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def decide_pairing_method(
|
def decide_pairing_method(
|
||||||
self, auth_req: int, initiator_io_capability: int, responder_io_capability: int
|
self,
|
||||||
|
auth_req: int,
|
||||||
|
initiator_io_capability: int,
|
||||||
|
responder_io_capability: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
if self.connection.transport == BT_BR_EDR_TRANSPORT:
|
if self.connection.transport == BT_BR_EDR_TRANSPORT:
|
||||||
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
|
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
|
||||||
@@ -909,7 +988,7 @@ class Session:
|
|||||||
|
|
||||||
command = SMP_Pairing_Request_Command(
|
command = SMP_Pairing_Request_Command(
|
||||||
io_capability=self.io_capability,
|
io_capability=self.io_capability,
|
||||||
oob_data_flag=0,
|
oob_data_flag=self.oob_data_flag,
|
||||||
auth_req=self.auth_req,
|
auth_req=self.auth_req,
|
||||||
maximum_encryption_key_size=16,
|
maximum_encryption_key_size=16,
|
||||||
initiator_key_distribution=self.initiator_key_distribution,
|
initiator_key_distribution=self.initiator_key_distribution,
|
||||||
@@ -921,7 +1000,7 @@ class Session:
|
|||||||
def send_pairing_response_command(self) -> None:
|
def send_pairing_response_command(self) -> None:
|
||||||
response = SMP_Pairing_Response_Command(
|
response = SMP_Pairing_Response_Command(
|
||||||
io_capability=self.io_capability,
|
io_capability=self.io_capability,
|
||||||
oob_data_flag=0,
|
oob_data_flag=self.oob_data_flag,
|
||||||
auth_req=self.auth_req,
|
auth_req=self.auth_req,
|
||||||
maximum_encryption_key_size=16,
|
maximum_encryption_key_size=16,
|
||||||
initiator_key_distribution=self.initiator_key_distribution,
|
initiator_key_distribution=self.initiator_key_distribution,
|
||||||
@@ -982,8 +1061,8 @@ class Session:
|
|||||||
def send_public_key_command(self) -> None:
|
def send_public_key_command(self) -> None:
|
||||||
self.send_command(
|
self.send_command(
|
||||||
SMP_Pairing_Public_Key_Command(
|
SMP_Pairing_Public_Key_Command(
|
||||||
public_key_x=bytes(reversed(self.manager.ecc_key.x)),
|
public_key_x=bytes(reversed(self.ecc_key.x)),
|
||||||
public_key_y=bytes(reversed(self.manager.ecc_key.y)),
|
public_key_y=bytes(reversed(self.ecc_key.y)),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1030,7 +1109,6 @@ class Session:
|
|||||||
self.ltk = crypto.h6(ilk, b'brle')
|
self.ltk = crypto.h6(ilk, b'brle')
|
||||||
|
|
||||||
def distribute_keys(self) -> None:
|
def distribute_keys(self) -> None:
|
||||||
|
|
||||||
# Distribute the keys as required
|
# Distribute the keys as required
|
||||||
if self.is_initiator:
|
if self.is_initiator:
|
||||||
# CTKD: Derive LTK from LinkKey
|
# CTKD: Derive LTK from LinkKey
|
||||||
@@ -1296,7 +1374,7 @@ class Session:
|
|||||||
try:
|
try:
|
||||||
handler(command)
|
handler(command)
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
|
logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
|
||||||
response = SMP_Pairing_Failed_Command(
|
response = SMP_Pairing_Failed_Command(
|
||||||
reason=SMP_UNSPECIFIED_REASON_ERROR
|
reason=SMP_UNSPECIFIED_REASON_ERROR
|
||||||
)
|
)
|
||||||
@@ -1333,15 +1411,28 @@ class Session:
|
|||||||
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
|
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
|
||||||
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
|
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
|
||||||
|
|
||||||
# Check for OOB
|
# Infer the pairing method
|
||||||
if command.oob_data_flag != 0:
|
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
|
||||||
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
|
not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
|
||||||
return
|
):
|
||||||
|
# Use OOB
|
||||||
|
self.pairing_method = PairingMethod.OOB
|
||||||
|
if not self.sc and self.tk is None:
|
||||||
|
# For legacy OOB, TK is required.
|
||||||
|
logger.warning("legacy OOB without TK")
|
||||||
|
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
|
||||||
|
return
|
||||||
|
if command.oob_data_flag == 0:
|
||||||
|
# The peer doesn't have OOB data, use r=0
|
||||||
|
self.r = bytes(16)
|
||||||
|
else:
|
||||||
|
# Decide which pairing method to use from the IO capability
|
||||||
|
self.decide_pairing_method(
|
||||||
|
command.auth_req,
|
||||||
|
command.io_capability,
|
||||||
|
self.io_capability,
|
||||||
|
)
|
||||||
|
|
||||||
# Decide which pairing method to use
|
|
||||||
self.decide_pairing_method(
|
|
||||||
command.auth_req, command.io_capability, self.io_capability
|
|
||||||
)
|
|
||||||
logger.debug(f'pairing method: {self.pairing_method.name}')
|
logger.debug(f'pairing method: {self.pairing_method.name}')
|
||||||
|
|
||||||
# Key distribution
|
# Key distribution
|
||||||
@@ -1390,15 +1481,26 @@ class Session:
|
|||||||
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
|
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
|
||||||
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
|
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
|
||||||
|
|
||||||
# Check for OOB
|
# Infer the pairing method
|
||||||
if self.sc and command.oob_data_flag:
|
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
|
||||||
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
|
not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
|
||||||
return
|
):
|
||||||
|
# Use OOB
|
||||||
|
self.pairing_method = PairingMethod.OOB
|
||||||
|
if not self.sc and self.tk is None:
|
||||||
|
# For legacy OOB, TK is required.
|
||||||
|
logger.warning("legacy OOB without TK")
|
||||||
|
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
|
||||||
|
return
|
||||||
|
if command.oob_data_flag == 0:
|
||||||
|
# The peer doesn't have OOB data, use r=0
|
||||||
|
self.r = bytes(16)
|
||||||
|
else:
|
||||||
|
# Decide which pairing method to use from the IO capability
|
||||||
|
self.decide_pairing_method(
|
||||||
|
command.auth_req, self.io_capability, command.io_capability
|
||||||
|
)
|
||||||
|
|
||||||
# Decide which pairing method to use
|
|
||||||
self.decide_pairing_method(
|
|
||||||
command.auth_req, self.io_capability, command.io_capability
|
|
||||||
)
|
|
||||||
logger.debug(f'pairing method: {self.pairing_method.name}')
|
logger.debug(f'pairing method: {self.pairing_method.name}')
|
||||||
|
|
||||||
# Key distribution
|
# Key distribution
|
||||||
@@ -1549,12 +1651,13 @@ class Session:
|
|||||||
if self.passkey_step < 20:
|
if self.passkey_step < 20:
|
||||||
self.send_pairing_confirm_command()
|
self.send_pairing_confirm_command()
|
||||||
return
|
return
|
||||||
else:
|
elif self.pairing_method != PairingMethod.OOB:
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
if self.pairing_method in (
|
if self.pairing_method in (
|
||||||
PairingMethod.JUST_WORKS,
|
PairingMethod.JUST_WORKS,
|
||||||
PairingMethod.NUMERIC_COMPARISON,
|
PairingMethod.NUMERIC_COMPARISON,
|
||||||
|
PairingMethod.OOB,
|
||||||
):
|
):
|
||||||
self.send_pairing_random_command()
|
self.send_pairing_random_command()
|
||||||
elif self.pairing_method == PairingMethod.PASSKEY:
|
elif self.pairing_method == PairingMethod.PASSKEY:
|
||||||
@@ -1591,6 +1694,7 @@ class Session:
|
|||||||
if self.pairing_method in (
|
if self.pairing_method in (
|
||||||
PairingMethod.JUST_WORKS,
|
PairingMethod.JUST_WORKS,
|
||||||
PairingMethod.NUMERIC_COMPARISON,
|
PairingMethod.NUMERIC_COMPARISON,
|
||||||
|
PairingMethod.OOB,
|
||||||
):
|
):
|
||||||
ra = bytes(16)
|
ra = bytes(16)
|
||||||
rb = ra
|
rb = ra
|
||||||
@@ -1599,7 +1703,6 @@ class Session:
|
|||||||
ra = self.passkey.to_bytes(16, byteorder='little')
|
ra = self.passkey.to_bytes(16, byteorder='little')
|
||||||
rb = ra
|
rb = ra
|
||||||
else:
|
else:
|
||||||
# OOB not implemented yet
|
|
||||||
return
|
return
|
||||||
|
|
||||||
assert self.preq and self.pres
|
assert self.preq and self.pres
|
||||||
@@ -1653,7 +1756,7 @@ class Session:
|
|||||||
# Compute the DH key
|
# Compute the DH key
|
||||||
self.dh_key = bytes(
|
self.dh_key = bytes(
|
||||||
reversed(
|
reversed(
|
||||||
self.manager.ecc_key.dh(
|
self.ecc_key.dh(
|
||||||
bytes(reversed(command.public_key_x)),
|
bytes(reversed(command.public_key_x)),
|
||||||
bytes(reversed(command.public_key_y)),
|
bytes(reversed(command.public_key_y)),
|
||||||
)
|
)
|
||||||
@@ -1661,8 +1764,27 @@ class Session:
|
|||||||
)
|
)
|
||||||
logger.debug(f'DH key: {self.dh_key.hex()}')
|
logger.debug(f'DH key: {self.dh_key.hex()}')
|
||||||
|
|
||||||
|
if self.pairing_method == PairingMethod.OOB:
|
||||||
|
# Check against shared OOB data
|
||||||
|
if self.peer_oob_data:
|
||||||
|
confirm_verifier = crypto.f4(
|
||||||
|
self.peer_public_key_x,
|
||||||
|
self.peer_public_key_x,
|
||||||
|
self.peer_oob_data.r,
|
||||||
|
bytes(1),
|
||||||
|
)
|
||||||
|
if not self.check_expected_value(
|
||||||
|
self.peer_oob_data.c,
|
||||||
|
confirm_verifier,
|
||||||
|
SMP_CONFIRM_VALUE_FAILED_ERROR,
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
if self.is_initiator:
|
if self.is_initiator:
|
||||||
self.send_pairing_confirm_command()
|
if self.pairing_method == PairingMethod.OOB:
|
||||||
|
self.send_pairing_random_command()
|
||||||
|
else:
|
||||||
|
self.send_pairing_confirm_command()
|
||||||
else:
|
else:
|
||||||
if self.pairing_method == PairingMethod.PASSKEY:
|
if self.pairing_method == PairingMethod.PASSKEY:
|
||||||
self.display_or_input_passkey()
|
self.display_or_input_passkey()
|
||||||
@@ -1673,6 +1795,7 @@ class Session:
|
|||||||
if self.pairing_method in (
|
if self.pairing_method in (
|
||||||
PairingMethod.JUST_WORKS,
|
PairingMethod.JUST_WORKS,
|
||||||
PairingMethod.NUMERIC_COMPARISON,
|
PairingMethod.NUMERIC_COMPARISON,
|
||||||
|
PairingMethod.OOB,
|
||||||
):
|
):
|
||||||
# We can now send the confirmation value
|
# We can now send the confirmation value
|
||||||
self.send_pairing_confirm_command()
|
self.send_pairing_confirm_command()
|
||||||
|
|||||||
@@ -34,6 +34,8 @@ from bumble.pairing import PairingConfig, PairingDelegate
|
|||||||
from bumble.smp import (
|
from bumble.smp import (
|
||||||
SMP_PAIRING_NOT_SUPPORTED_ERROR,
|
SMP_PAIRING_NOT_SUPPORTED_ERROR,
|
||||||
SMP_CONFIRM_VALUE_FAILED_ERROR,
|
SMP_CONFIRM_VALUE_FAILED_ERROR,
|
||||||
|
OobContext,
|
||||||
|
OobLegacyContext,
|
||||||
)
|
)
|
||||||
from bumble.core import ProtocolError
|
from bumble.core import ProtocolError
|
||||||
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
|
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
|
||||||
@@ -575,6 +577,77 @@ async def test_self_smp_public_address():
|
|||||||
await _test_self_smp_with_configs(pairing_config, pairing_config)
|
await _test_self_smp_with_configs(pairing_config, pairing_config)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_self_smp_oob_sc():
|
||||||
|
oob_context_1 = OobContext()
|
||||||
|
oob_context_2 = OobContext()
|
||||||
|
|
||||||
|
pairing_config_1 = PairingConfig(
|
||||||
|
mitm=True,
|
||||||
|
sc=True,
|
||||||
|
bonding=True,
|
||||||
|
oob=PairingConfig.OobConfig(oob_context_1, oob_context_2.share(), None),
|
||||||
|
)
|
||||||
|
|
||||||
|
pairing_config_2 = PairingConfig(
|
||||||
|
mitm=True,
|
||||||
|
sc=True,
|
||||||
|
bonding=True,
|
||||||
|
oob=PairingConfig.OobConfig(oob_context_2, oob_context_1.share(), None),
|
||||||
|
)
|
||||||
|
|
||||||
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
|
||||||
|
|
||||||
|
pairing_config_3 = PairingConfig(
|
||||||
|
mitm=True,
|
||||||
|
sc=True,
|
||||||
|
bonding=True,
|
||||||
|
oob=PairingConfig.OobConfig(oob_context_2, None, None),
|
||||||
|
)
|
||||||
|
|
||||||
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_3)
|
||||||
|
await _test_self_smp_with_configs(pairing_config_3, pairing_config_1)
|
||||||
|
|
||||||
|
pairing_config_4 = PairingConfig(
|
||||||
|
mitm=True,
|
||||||
|
sc=True,
|
||||||
|
bonding=True,
|
||||||
|
oob=PairingConfig.OobConfig(oob_context_2, oob_context_2.share(), None),
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ProtocolError) as error:
|
||||||
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_4)
|
||||||
|
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
|
||||||
|
|
||||||
|
with pytest.raises(ProtocolError):
|
||||||
|
await _test_self_smp_with_configs(pairing_config_4, pairing_config_1)
|
||||||
|
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_self_smp_oob_legacy():
|
||||||
|
legacy_context = OobLegacyContext()
|
||||||
|
|
||||||
|
pairing_config_1 = PairingConfig(
|
||||||
|
mitm=True,
|
||||||
|
sc=False,
|
||||||
|
bonding=True,
|
||||||
|
oob=PairingConfig.OobConfig(None, None, legacy_context),
|
||||||
|
)
|
||||||
|
|
||||||
|
pairing_config_2 = PairingConfig(
|
||||||
|
mitm=True,
|
||||||
|
sc=True,
|
||||||
|
bonding=True,
|
||||||
|
oob=PairingConfig.OobConfig(OobContext(), None, legacy_context),
|
||||||
|
)
|
||||||
|
|
||||||
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
|
||||||
|
await _test_self_smp_with_configs(pairing_config_2, pairing_config_1)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
async def run_test_self():
|
async def run_test_self():
|
||||||
await test_self_connection()
|
await test_self_connection()
|
||||||
@@ -585,6 +658,8 @@ async def run_test_self():
|
|||||||
await test_self_smp_wrong_pin()
|
await test_self_smp_wrong_pin()
|
||||||
await test_self_smp_over_classic()
|
await test_self_smp_over_classic()
|
||||||
await test_self_smp_public_address()
|
await test_self_smp_public_address()
|
||||||
|
await test_self_smp_oob_sc()
|
||||||
|
await test_self_smp_oob_legacy()
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -17,11 +17,16 @@
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
|
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
|
||||||
|
from bumble.pairing import OobData, OobSharedData, LeRole
|
||||||
|
from bumble.hci import Address
|
||||||
|
from bumble.core import AdvertisingData
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# pylint: disable=invalid-name
|
# pylint: disable=invalid-name
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def reversed_hex(hex_str):
|
def reversed_hex(hex_str):
|
||||||
return bytes(reversed(bytes.fromhex(hex_str)))
|
return bytes(reversed(bytes.fromhex(hex_str)))
|
||||||
@@ -233,6 +238,23 @@ def test_ah():
|
|||||||
assert value == expected
|
assert value == expected
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_oob_data():
|
||||||
|
oob_data = OobData(
|
||||||
|
address=Address("F0:F1:F2:F3:F4:F5"),
|
||||||
|
role=LeRole.BOTH_PERIPHERAL_PREFERRED,
|
||||||
|
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])),
|
||||||
|
)
|
||||||
|
oob_data_ad = oob_data.to_ad()
|
||||||
|
oob_data_bytes = bytes(oob_data_ad)
|
||||||
|
oob_data_ad_parsed = AdvertisingData.from_bytes(oob_data_bytes)
|
||||||
|
oob_data_parsed = OobData.from_ad(oob_data_ad_parsed)
|
||||||
|
assert oob_data_parsed.address == oob_data.address
|
||||||
|
assert oob_data_parsed.role == oob_data.role
|
||||||
|
assert oob_data_parsed.shared_data.c == oob_data.shared_data.c
|
||||||
|
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
test_ecc()
|
test_ecc()
|
||||||
@@ -246,3 +268,4 @@ if __name__ == '__main__':
|
|||||||
test_h6()
|
test_h6()
|
||||||
test_h7()
|
test_h7()
|
||||||
test_ah()
|
test_ah()
|
||||||
|
test_oob_data()
|
||||||
|
|||||||
Reference in New Issue
Block a user