This commit is contained in:
Gilles Boccon-Gibod
2023-11-06 13:19:13 -08:00
parent c67ca4a09e
commit 4ae612090b
6 changed files with 347 additions and 36 deletions

View File

@@ -25,6 +25,7 @@ from bumble.colors import color
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.pairing import PairingDelegate, PairingConfig from bumble.pairing import PairingDelegate, PairingConfig
from bumble.smp import OobContext
from bumble.smp import error_name as smp_error_name from bumble.smp import error_name as smp_error_name
from bumble.keys import JsonKeyStore from bumble.keys import JsonKeyStore
from bumble.core import ProtocolError from bumble.core import ProtocolError
@@ -60,7 +61,7 @@ class Waiter:
class Delegate(PairingDelegate): class Delegate(PairingDelegate):
def __init__(self, mode, connection, capability_string, do_prompt): def __init__(self, mode, connection, capability_string, do_prompt):
super().__init__( super().__init__(
{ io_capability={
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY, 'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY, 'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, 'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
@@ -286,6 +287,7 @@ async def pair(
bond, bond,
ctkd, ctkd,
io, io,
oob,
prompt, prompt,
request, request,
print_keys, print_keys,
@@ -343,9 +345,25 @@ async def pair(
await device.keystore.print(prefix=color('@@@ ', 'blue')) await device.keystore.print(prefix=color('@@@ ', 'blue'))
print(color('@@@-----------------------------------', 'blue')) print(color('@@@-----------------------------------', 'blue'))
# Create an OOB context if needed
if oob:
our_oob_context = OobContext()
peer_oob_context = None # TODO: parse from command line param
oob_contexts = PairingConfig.OobContexts(our_oob_context, peer_oob_context)
print(color('@@@-----------------------------------', 'yellow'))
print(color('@@@ OOB Data:', 'yellow'))
print(color(f'@@@ {our_oob_context.share()}', 'yellow'))
print(color('@@@-----------------------------------', 'yellow'))
else:
oob_contexts = None
# Set up a pairing config factory # Set up a pairing config factory
device.pairing_config_factory = lambda connection: PairingConfig( device.pairing_config_factory = lambda connection: PairingConfig(
sc, mitm, bond, Delegate(mode, connection, io, prompt) sc=sc,
mitm=mitm,
bonding=bond,
oob=oob_contexts,
delegate=Delegate(mode, connection, io, prompt),
) )
# Connect to a peer or wait for a connection # Connect to a peer or wait for a connection
@@ -421,6 +439,7 @@ class LogHandler(logging.Handler):
default='display+keyboard', default='display+keyboard',
show_default=True, show_default=True,
) )
@click.option('--oob', help='Use OOB pairing with this data from the peer')
@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request') @click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
@click.option( @click.option(
'--request', is_flag=True, help='Request that the connecting peer initiate pairing' '--request', is_flag=True, help='Request that the connecting peer initiate pairing'
@@ -441,6 +460,7 @@ def main(
bond, bond,
ctkd, ctkd,
io, io,
oob,
prompt, prompt,
request, request,
print_keys, print_keys,
@@ -464,6 +484,7 @@ def main(
bond, bond,
ctkd, ctkd,
io, io,
oob,
prompt, prompt,
request, request,
print_keys, print_keys,

View File

@@ -16,6 +16,7 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import enum
import struct import struct
from typing import List, Optional, Tuple, Union, cast, Dict from typing import List, Optional, Tuple, Union, cast, Dict
@@ -975,6 +976,9 @@ class AdvertisingData:
if ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA: if ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:]) return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
return Address(ad_data)
return ad_data return ad_data
def append(self, data): def append(self, data):
@@ -1051,3 +1055,13 @@ class ConnectionPHY:
def __str__(self): def __str__(self):
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})' return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
# -----------------------------------------------------------------------------
# LE Role
# -----------------------------------------------------------------------------
class LeRole(enum.IntEnum):
PERIPHERAL_ONLY = 0x00
CENTRAL_ONLY = 0x01
BOTH_PERIPHERAL_PREFERRED = 0x02
BOTH_CENTRAL_PREFERRED = 0x03

View File

@@ -15,7 +15,9 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import enum import enum
from dataclasses import dataclass
from typing import Optional, Tuple from typing import Optional, Tuple
from .hci import ( from .hci import (
@@ -35,7 +37,50 @@ from .smp import (
SMP_ID_KEY_DISTRIBUTION_FLAG, SMP_ID_KEY_DISTRIBUTION_FLAG,
SMP_SIGN_KEY_DISTRIBUTION_FLAG, SMP_SIGN_KEY_DISTRIBUTION_FLAG,
SMP_LINK_KEY_DISTRIBUTION_FLAG, SMP_LINK_KEY_DISTRIBUTION_FLAG,
OobContext,
OobLegacyContext,
OobSharedData,
) )
from .core import AdvertisingData, LeRole
# -----------------------------------------------------------------------------
@dataclass
class OobData:
"""OOB data that can be sent from one device to another."""
address: Optional[Address] = None
role: Optional[LeRole] = None
shared_data: Optional[OobSharedData] = None
@classmethod
def from_ad(cls, ad: AdvertisingData) -> OobData:
instance = cls()
shared_data_c = None
shared_data_r = None
for (ad_type, ad_data) in ad.ad_structures:
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
instance.address = Address(ad_data)
elif ad_type == AdvertisingData.LE_ROLE:
instance.role = LeRole(ad_data[0])
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
shared_data_c: bytes = AdvertisingData.ad_data_to_object(ad_type, ad_data)
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE:
shared_data_r: bytes = AdvertisingData.ad_data_to_object(ad_type, ad_data)
if shared_data_c or shared_data_r:
instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r)
return instance
def to_ad(self):
ad_structures = []
if self.address is not None:
ad_structures.append((AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address)))
if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures)
return AdvertisingData(ad_structures)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -173,6 +218,13 @@ class PairingConfig:
PUBLIC = Address.PUBLIC_DEVICE_ADDRESS PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
RANDOM = Address.RANDOM_DEVICE_ADDRESS RANDOM = Address.RANDOM_DEVICE_ADDRESS
@dataclass
class OobConfig:
"""Config for OOB pairing."""
our_context: Optional[OobContext]
peer_data: Optional[OobSharedData]
legacy_context: Optional[OobLegacyContext]
def __init__( def __init__(
self, self,
sc: bool = True, sc: bool = True,
@@ -180,17 +232,20 @@ class PairingConfig:
bonding: bool = True, bonding: bool = True,
delegate: Optional[PairingDelegate] = None, delegate: Optional[PairingDelegate] = None,
identity_address_type: Optional[AddressType] = None, identity_address_type: Optional[AddressType] = None,
oob: Optional[OobConfig] = None,
) -> None: ) -> None:
self.sc = sc self.sc = sc
self.mitm = mitm self.mitm = mitm
self.bonding = bonding self.bonding = bonding
self.delegate = delegate or PairingDelegate() self.delegate = delegate or PairingDelegate()
self.identity_address_type = identity_address_type self.identity_address_type = identity_address_type
self.oob = oob
def __str__(self) -> str: def __str__(self) -> str:
return ( return (
f'PairingConfig(sc={self.sc}, ' f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, ' f'mitm={self.mitm}, bonding={self.bonding}, '
f'identity_address_type={self.identity_address_type}, ' f'identity_address_type={self.identity_address_type}, '
f'delegate[{self.delegate.io_capability}])' f'delegate[{self.delegate.io_capability}]), '
f'oob[{self.oob}])'
) )

View File

@@ -27,6 +27,7 @@ import logging
import asyncio import asyncio
import enum import enum
import secrets import secrets
from dataclasses import dataclass
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
@@ -53,6 +54,7 @@ from .core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE, BT_CENTRAL_ROLE,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
AdvertisingData,
ProtocolError, ProtocolError,
name_or_number, name_or_number,
) )
@@ -563,6 +565,54 @@ class PairingMethod(enum.IntEnum):
CTKD_OVER_CLASSIC = 4 CTKD_OVER_CLASSIC = 4
# -----------------------------------------------------------------------------
class OobContext:
"""Cryptographic context for LE SC OOB pairing."""
ecc_key: crypto.EccKey
r: bytes
def __init__(
self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None
) -> None:
self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key
self.r = crypto.r() if r is None else r
def share(self) -> OobSharedData:
pkx = bytes(reversed(self.ecc_key.x))
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
# -----------------------------------------------------------------------------
class OobLegacyContext:
"""Cryptographic context for LE Legacy OOB pairing."""
tk: bytes
def __init__(self, tk: Optional[bytes] = None) -> None:
self.tk = crypto.r() if tk is None else tk
# -----------------------------------------------------------------------------
@dataclass
class OobSharedData:
"""Shareable data for LE SC OOB pairing."""
c: bytes
r: bytes
def to_ad(self) -> AdvertisingData:
return AdvertisingData(
[
(AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c),
(AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r),
]
)
def __str__(self) -> str:
return f'OOB(C={self.c.hex()}, R={self.r.hex()})'
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Session: class Session:
# I/O Capability to pairing method decision matrix # I/O Capability to pairing method decision matrix
@@ -640,8 +690,6 @@ class Session:
self.pres: Optional[bytes] = None self.pres: Optional[bytes] = None
self.ea = None self.ea = None
self.eb = None self.eb = None
self.tk = bytes(16)
self.r = bytes(16)
self.stk = None self.stk = None
self.ltk = None self.ltk = None
self.ltk_ediv = 0 self.ltk_ediv = 0
@@ -659,7 +707,7 @@ class Session:
self.peer_bd_addr: Optional[Address] = None self.peer_bd_addr: Optional[Address] = None
self.peer_signature_key = None self.peer_signature_key = None
self.peer_expected_distributions: List[Type[SMP_Command]] = [] self.peer_expected_distributions: List[Type[SMP_Command]] = []
self.dh_key = None self.dh_key = b''
self.confirm_value = None self.confirm_value = None
self.passkey: Optional[int] = None self.passkey: Optional[int] = None
self.passkey_ready = asyncio.Event() self.passkey_ready = asyncio.Event()
@@ -712,8 +760,8 @@ class Session:
self.io_capability = pairing_config.delegate.io_capability self.io_capability = pairing_config.delegate.io_capability
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# OOB (not supported yet) # OOB
self.oob = False self.oob_data_flag = 0 if pairing_config.oob is None else 1
# Set up addresses # Set up addresses
self_address = connection.self_address self_address = connection.self_address
@@ -729,9 +777,37 @@ class Session:
self.ia = bytes(peer_address) self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0 self.iat = 1 if peer_address.is_random else 0
# Select the ECC key, TK and r initial value
if pairing_config.oob:
self.peer_oob_data = pairing_config.oob.peer_data
if pairing_config.sc:
if pairing_config.oob.our_context is None:
raise ValueError(
"oob pairing config requires a context when sc is True"
)
self.r = pairing_config.oob.our_context.r
self.ecc_key = pairing_config.oob.our_context.ecc_key
if pairing_config.oob.legacy_context is None:
self.tk = None
else:
self.tk = pairing_config.oob.legacy_context.tk
else:
if pairing_config.oob.legacy_context is None:
raise ValueError(
"oob pairing config requires a legacy context when sc is False"
)
self.r = bytes(16)
self.ecc_key = manager.ecc_key
self.tk = pairing_config.oob.legacy_context.tk
else:
self.peer_oob_data = None
self.r = bytes(16)
self.ecc_key = manager.ecc_key
self.tk = bytes(16)
@property @property
def pkx(self) -> Tuple[bytes, bytes]: def pkx(self) -> Tuple[bytes, bytes]:
return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x) return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x)
@property @property
def pka(self) -> bytes: def pka(self) -> bytes:
@@ -768,7 +844,10 @@ class Session:
return None return None
def decide_pairing_method( def decide_pairing_method(
self, auth_req: int, initiator_io_capability: int, responder_io_capability: int self,
auth_req: int,
initiator_io_capability: int,
responder_io_capability: int,
) -> None: ) -> None:
if self.connection.transport == BT_BR_EDR_TRANSPORT: if self.connection.transport == BT_BR_EDR_TRANSPORT:
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
@@ -909,7 +988,7 @@ class Session:
command = SMP_Pairing_Request_Command( command = SMP_Pairing_Request_Command(
io_capability=self.io_capability, io_capability=self.io_capability,
oob_data_flag=0, oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req, auth_req=self.auth_req,
maximum_encryption_key_size=16, maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution, initiator_key_distribution=self.initiator_key_distribution,
@@ -921,7 +1000,7 @@ class Session:
def send_pairing_response_command(self) -> None: def send_pairing_response_command(self) -> None:
response = SMP_Pairing_Response_Command( response = SMP_Pairing_Response_Command(
io_capability=self.io_capability, io_capability=self.io_capability,
oob_data_flag=0, oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req, auth_req=self.auth_req,
maximum_encryption_key_size=16, maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution, initiator_key_distribution=self.initiator_key_distribution,
@@ -982,8 +1061,8 @@ class Session:
def send_public_key_command(self) -> None: def send_public_key_command(self) -> None:
self.send_command( self.send_command(
SMP_Pairing_Public_Key_Command( SMP_Pairing_Public_Key_Command(
public_key_x=bytes(reversed(self.manager.ecc_key.x)), public_key_x=bytes(reversed(self.ecc_key.x)),
public_key_y=bytes(reversed(self.manager.ecc_key.y)), public_key_y=bytes(reversed(self.ecc_key.y)),
) )
) )
@@ -1030,7 +1109,6 @@ class Session:
self.ltk = crypto.h6(ilk, b'brle') self.ltk = crypto.h6(ilk, b'brle')
def distribute_keys(self) -> None: def distribute_keys(self) -> None:
# Distribute the keys as required # Distribute the keys as required
if self.is_initiator: if self.is_initiator:
# CTKD: Derive LTK from LinkKey # CTKD: Derive LTK from LinkKey
@@ -1296,7 +1374,7 @@ class Session:
try: try:
handler(command) handler(command)
except Exception as error: except Exception as error:
logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
response = SMP_Pairing_Failed_Command( response = SMP_Pairing_Failed_Command(
reason=SMP_UNSPECIFIED_REASON_ERROR reason=SMP_UNSPECIFIED_REASON_ERROR
) )
@@ -1333,15 +1411,28 @@ class Session:
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
# Check for OOB # Infer the pairing method
if command.oob_data_flag != 0: if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
return ):
# Use OOB
self.pairing_method = PairingMethod.OOB
if not self.sc and self.tk is None:
# For legacy OOB, TK is required.
logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0
self.r = bytes(16)
else:
# Decide which pairing method to use from the IO capability
self.decide_pairing_method(
command.auth_req,
command.io_capability,
self.io_capability,
)
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req, command.io_capability, self.io_capability
)
logger.debug(f'pairing method: {self.pairing_method.name}') logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution # Key distribution
@@ -1390,15 +1481,26 @@ class Session:
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
# Check for OOB # Infer the pairing method
if self.sc and command.oob_data_flag: if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
return ):
# Use OOB
self.pairing_method = PairingMethod.OOB
if not self.sc and self.tk is None:
# For legacy OOB, TK is required.
logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0
self.r = bytes(16)
else:
# Decide which pairing method to use from the IO capability
self.decide_pairing_method(
command.auth_req, self.io_capability, command.io_capability
)
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req, self.io_capability, command.io_capability
)
logger.debug(f'pairing method: {self.pairing_method.name}') logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution # Key distribution
@@ -1549,12 +1651,13 @@ class Session:
if self.passkey_step < 20: if self.passkey_step < 20:
self.send_pairing_confirm_command() self.send_pairing_confirm_command()
return return
else: elif self.pairing_method != PairingMethod.OOB:
return return
else: else:
if self.pairing_method in ( if self.pairing_method in (
PairingMethod.JUST_WORKS, PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON, PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
): ):
self.send_pairing_random_command() self.send_pairing_random_command()
elif self.pairing_method == PairingMethod.PASSKEY: elif self.pairing_method == PairingMethod.PASSKEY:
@@ -1591,6 +1694,7 @@ class Session:
if self.pairing_method in ( if self.pairing_method in (
PairingMethod.JUST_WORKS, PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON, PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
): ):
ra = bytes(16) ra = bytes(16)
rb = ra rb = ra
@@ -1599,7 +1703,6 @@ class Session:
ra = self.passkey.to_bytes(16, byteorder='little') ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra rb = ra
else: else:
# OOB not implemented yet
return return
assert self.preq and self.pres assert self.preq and self.pres
@@ -1653,7 +1756,7 @@ class Session:
# Compute the DH key # Compute the DH key
self.dh_key = bytes( self.dh_key = bytes(
reversed( reversed(
self.manager.ecc_key.dh( self.ecc_key.dh(
bytes(reversed(command.public_key_x)), bytes(reversed(command.public_key_x)),
bytes(reversed(command.public_key_y)), bytes(reversed(command.public_key_y)),
) )
@@ -1661,8 +1764,27 @@ class Session:
) )
logger.debug(f'DH key: {self.dh_key.hex()}') logger.debug(f'DH key: {self.dh_key.hex()}')
if self.pairing_method == PairingMethod.OOB:
# Check against shared OOB data
if self.peer_oob_data:
confirm_verifier = crypto.f4(
self.peer_public_key_x,
self.peer_public_key_x,
self.peer_oob_data.r,
bytes(1),
)
if not self.check_expected_value(
self.peer_oob_data.c,
confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR,
):
return
if self.is_initiator: if self.is_initiator:
self.send_pairing_confirm_command() if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command()
else:
self.send_pairing_confirm_command()
else: else:
if self.pairing_method == PairingMethod.PASSKEY: if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey() self.display_or_input_passkey()
@@ -1673,6 +1795,7 @@ class Session:
if self.pairing_method in ( if self.pairing_method in (
PairingMethod.JUST_WORKS, PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON, PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
): ):
# We can now send the confirmation value # We can now send the confirmation value
self.send_pairing_confirm_command() self.send_pairing_confirm_command()

View File

@@ -34,6 +34,8 @@ from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import ( from bumble.smp import (
SMP_PAIRING_NOT_SUPPORTED_ERROR, SMP_PAIRING_NOT_SUPPORTED_ERROR,
SMP_CONFIRM_VALUE_FAILED_ERROR, SMP_CONFIRM_VALUE_FAILED_ERROR,
OobContext,
OobLegacyContext
) )
from bumble.core import ProtocolError from bumble.core import ProtocolError
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
@@ -575,6 +577,77 @@ async def test_self_smp_public_address():
await _test_self_smp_with_configs(pairing_config, pairing_config) await _test_self_smp_with_configs(pairing_config, pairing_config)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_smp_oob_sc():
oob_context_1 = OobContext()
oob_context_2 = OobContext()
pairing_config_1 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_1, oob_context_2.share(), None)
)
pairing_config_2 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_2, oob_context_1.share(), None)
)
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
pairing_config_3 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_2, None, None)
)
await _test_self_smp_with_configs(pairing_config_1, pairing_config_3)
await _test_self_smp_with_configs(pairing_config_3, pairing_config_1)
pairing_config_4 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_2, oob_context_2.share(), None)
)
with pytest.raises(ProtocolError) as error:
await _test_self_smp_with_configs(pairing_config_1, pairing_config_4)
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
with pytest.raises(ProtocolError):
await _test_self_smp_with_configs(pairing_config_4, pairing_config_1)
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_smp_oob_legacy():
legacy_context = OobLegacyContext()
pairing_config_1 = PairingConfig(
mitm=True,
sc=False,
bonding=True,
oob=PairingConfig.OobConfig(None, None, legacy_context)
)
pairing_config_2 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(OobContext(), None, legacy_context)
)
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
await _test_self_smp_with_configs(pairing_config_2, pairing_config_1)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def run_test_self(): async def run_test_self():
await test_self_connection() await test_self_connection()
@@ -585,6 +658,8 @@ async def run_test_self():
await test_self_smp_wrong_pin() await test_self_smp_wrong_pin()
await test_self_smp_over_classic() await test_self_smp_over_classic()
await test_self_smp_public_address() await test_self_smp_public_address()
await test_self_smp_oob_sc()
await test_self_smp_oob_legacy()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -17,11 +17,16 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1 from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.pairing import OobData, OobSharedData, LeRole
from bumble.hci import Address
from bumble.core import AdvertisingData
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# pylint: disable=invalid-name # pylint: disable=invalid-name
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def reversed_hex(hex_str): def reversed_hex(hex_str):
return bytes(reversed(bytes.fromhex(hex_str))) return bytes(reversed(bytes.fromhex(hex_str)))
@@ -233,6 +238,23 @@ def test_ah():
assert value == expected assert value == expected
# -----------------------------------------------------------------------------
def test_oob_data():
oob_data = OobData(
address=Address("F0:F1:F2:F3:F4:F5"),
role=LeRole.BOTH_PERIPHERAL_PREFERRED,
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])),
)
oob_data_ad = oob_data.to_ad()
oob_data_bytes = bytes(oob_data_ad)
oob_data_ad_parsed = AdvertisingData.from_bytes(oob_data_bytes)
oob_data_parsed = OobData.from_ad(oob_data_ad_parsed)
assert oob_data_parsed.address == oob_data.address
assert oob_data_parsed.role == oob_data.role
assert oob_data_parsed.shared_data.c == oob_data.shared_data.c
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_ecc() test_ecc()
@@ -246,3 +268,4 @@ if __name__ == '__main__':
test_h6() test_h6()
test_h7() test_h7()
test_ah() test_ah()
test_oob_data()