Slightly refactor and fix CTKD

It seems sample input data provided in the spec is big-endian (just
like other AES-CMAC-based functions), but all keys are in little-endian(
HCI standard), so they need to be reverse before and after applying
AES-CMAC.
This commit is contained in:
Josh Wu
2023-11-23 03:46:19 +08:00
parent e08c84dd20
commit 80d34a226d
5 changed files with 220 additions and 193 deletions

View File

@@ -29,6 +29,7 @@
"deregistration", "deregistration",
"dhkey", "dhkey",
"diversifier", "diversifier",
"endianness",
"Fitbit", "Fitbit",
"GATTLINK", "GATTLINK",
"HANDSFREE", "HANDSFREE",

View File

@@ -21,6 +21,8 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import logging import logging
import operator import operator
@@ -29,11 +31,13 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.ec import ( from cryptography.hazmat.primitives.asymmetric.ec import (
generate_private_key, generate_private_key,
ECDH, ECDH,
EllipticCurvePrivateKey,
EllipticCurvePublicNumbers, EllipticCurvePublicNumbers,
EllipticCurvePrivateNumbers, EllipticCurvePrivateNumbers,
SECP256R1, SECP256R1,
) )
from cryptography.hazmat.primitives import cmac from cryptography.hazmat.primitives import cmac
from typing import Tuple
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -46,16 +50,18 @@ logger = logging.getLogger(__name__)
# Classes # Classes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class EccKey: class EccKey:
def __init__(self, private_key): def __init__(self, private_key: EllipticCurvePrivateKey) -> None:
self.private_key = private_key self.private_key = private_key
@classmethod @classmethod
def generate(cls): def generate(cls) -> EccKey:
private_key = generate_private_key(SECP256R1()) private_key = generate_private_key(SECP256R1())
return cls(private_key) return cls(private_key)
@classmethod @classmethod
def from_private_key_bytes(cls, d_bytes, x_bytes, y_bytes): def from_private_key_bytes(
cls, d_bytes: bytes, x_bytes: bytes, y_bytes: bytes
) -> EccKey:
d = int.from_bytes(d_bytes, byteorder='big', signed=False) d = int.from_bytes(d_bytes, byteorder='big', signed=False)
x = int.from_bytes(x_bytes, byteorder='big', signed=False) x = int.from_bytes(x_bytes, byteorder='big', signed=False)
y = int.from_bytes(y_bytes, byteorder='big', signed=False) y = int.from_bytes(y_bytes, byteorder='big', signed=False)
@@ -65,7 +71,7 @@ class EccKey:
return cls(private_key) return cls(private_key)
@property @property
def x(self): def x(self) -> bytes:
return ( return (
self.private_key.public_key() self.private_key.public_key()
.public_numbers() .public_numbers()
@@ -73,14 +79,14 @@ class EccKey:
) )
@property @property
def y(self): def y(self) -> bytes:
return ( return (
self.private_key.public_key() self.private_key.public_key()
.public_numbers() .public_numbers()
.y.to_bytes(32, byteorder='big') .y.to_bytes(32, byteorder='big')
) )
def dh(self, public_key_x, public_key_y): def dh(self, public_key_x: bytes, public_key_y: bytes) -> bytes:
x = int.from_bytes(public_key_x, byteorder='big', signed=False) x = int.from_bytes(public_key_x, byteorder='big', signed=False)
y = int.from_bytes(public_key_y, byteorder='big', signed=False) y = int.from_bytes(public_key_y, byteorder='big', signed=False)
public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key() public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key()
@@ -93,14 +99,23 @@ class EccKey:
# Functions # Functions
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def xor(x, y): def xor(x: bytes, y: bytes) -> bytes:
assert len(x) == len(y) assert len(x) == len(y)
return bytes(map(operator.xor, x, y)) return bytes(map(operator.xor, x, y))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def r(): def reverse(input: bytes) -> bytes:
'''
Returns bytes of input in reversed endianness.
'''
return input[::-1]
# -----------------------------------------------------------------------------
def r() -> bytes:
''' '''
Generate 16 bytes of random data Generate 16 bytes of random data
''' '''
@@ -108,20 +123,20 @@ def r():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def e(key, data): def e(key: bytes, data: bytes) -> bytes:
''' '''
AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output. AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output.
See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e
''' '''
cipher = Cipher(algorithms.AES(bytes(reversed(key))), modes.ECB()) cipher = Cipher(algorithms.AES(reverse(key)), modes.ECB())
encryptor = cipher.encryptor() encryptor = cipher.encryptor()
return bytes(reversed(encryptor.update(bytes(reversed(data))))) return reverse(encryptor.update(reverse(data)))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def ah(k, r): # pylint: disable=redefined-outer-name def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name
''' '''
See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah
''' '''
@@ -132,7 +147,16 @@ def ah(k, r): # pylint: disable=redefined-outer-name
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-name def c1(
k: bytes,
r: bytes,
preq: bytes,
pres: bytes,
iat: int,
rat: int,
ia: bytes,
ra: bytes,
) -> bytes: # pylint: disable=redefined-outer-name
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for
LE Legacy Pairing LE Legacy Pairing
@@ -144,7 +168,7 @@ def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-n
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def s1(k, r1, r2): def s1(k: bytes, r1: bytes, r2: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy
Pairing Pairing
@@ -154,7 +178,7 @@ def s1(k, r1, r2):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def aes_cmac(m, k): def aes_cmac(m: bytes, k: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC
@@ -166,20 +190,16 @@ def aes_cmac(m, k):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def f4(u, v, x, z): def f4(u: bytes, v: bytes, x: bytes, z: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value
Generation Function f4 Generation Function f4
''' '''
return bytes( return reverse(aes_cmac(reverse(u) + reverse(v) + z, reverse(x)))
reversed(
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + z, bytes(reversed(x)))
)
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def f5(w, n1, n2, a1, a2): def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, bytes]:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation
Function f5 Function f5
@@ -187,87 +207,83 @@ def f5(w, n1, n2, a1, a2):
NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order
''' '''
salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE') salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE')
t = aes_cmac(bytes(reversed(w)), salt) t = aes_cmac(reverse(w), salt)
key_id = bytes([0x62, 0x74, 0x6C, 0x65]) key_id = bytes([0x62, 0x74, 0x6C, 0x65])
return ( return (
bytes( reverse(
reversed( aes_cmac(
aes_cmac( bytes([0])
bytes([0]) + key_id
+ key_id + reverse(n1)
+ bytes(reversed(n1)) + reverse(n2)
+ bytes(reversed(n2)) + reverse(a1)
+ bytes(reversed(a1)) + reverse(a2)
+ bytes(reversed(a2)) + bytes([1, 0]),
+ bytes([1, 0]), t,
t,
)
) )
), ),
bytes( reverse(
reversed( aes_cmac(
aes_cmac( bytes([1])
bytes([1]) + key_id
+ key_id + reverse(n1)
+ bytes(reversed(n1)) + reverse(n2)
+ bytes(reversed(n2)) + reverse(a1)
+ bytes(reversed(a1)) + reverse(a2)
+ bytes(reversed(a2)) + bytes([1, 0]),
+ bytes([1, 0]), t,
t,
)
) )
), ),
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def f6(w, n1, n2, r, io_cap, a1, a2): # pylint: disable=redefined-outer-name def f6(
w: bytes, n1: bytes, n2: bytes, r: bytes, io_cap: bytes, a1: bytes, a2: bytes
) -> bytes: # pylint: disable=redefined-outer-name
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value
Generation Function f6 Generation Function f6
''' '''
return bytes( return reverse(
reversed( aes_cmac(
aes_cmac( reverse(n1)
bytes(reversed(n1)) + reverse(n2)
+ bytes(reversed(n2)) + reverse(r)
+ bytes(reversed(r)) + reverse(io_cap)
+ bytes(reversed(io_cap)) + reverse(a1)
+ bytes(reversed(a1)) + reverse(a2),
+ bytes(reversed(a2)), reverse(w),
bytes(reversed(w)),
)
) )
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def g2(u, v, x, y): def g2(u: bytes, v: bytes, x: bytes, y: bytes) -> int:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison
Value Generation Function g2 Value Generation Function g2
''' '''
return int.from_bytes( return int.from_bytes(
aes_cmac( aes_cmac(
bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)), reverse(u) + reverse(v) + reverse(y),
bytes(reversed(x)), reverse(x),
)[-4:], )[-4:],
byteorder='big', byteorder='big',
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def h6(w, key_id): def h6(w: bytes, key_id: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6 See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
''' '''
return aes_cmac(key_id, w) return reverse(aes_cmac(key_id, reverse(w)))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def h7(salt, w): def h7(salt: bytes, w: bytes) -> bytes:
''' '''
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7 See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
''' '''
return aes_cmac(w, salt) return reverse(aes_cmac(reverse(w), salt))

View File

@@ -187,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000 SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt # Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032') SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# fmt: on # fmt: on
# pylint: enable=line-too-long # pylint: enable=line-too-long
@@ -579,7 +579,7 @@ class OobContext:
self.r = crypto.r() if r is None else r self.r = crypto.r() if r is None else r
def share(self) -> OobSharedData: def share(self) -> OobSharedData:
pkx = bytes(reversed(self.ecc_key.x)) pkx = self.ecc_key.x[::-1]
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r) return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
@@ -677,6 +677,13 @@ class Session:
}, },
} }
ea: bytes
eb: bytes
ltk: bytes
preq: bytes
pres: bytes
tk: bytes
def __init__( def __init__(
self, self,
manager: Manager, manager: Manager,
@@ -686,15 +693,10 @@ class Session:
) -> None: ) -> None:
self.manager = manager self.manager = manager
self.connection = connection self.connection = connection
self.preq: Optional[bytes] = None
self.pres: Optional[bytes] = None
self.ea = None
self.eb = None
self.stk = None self.stk = None
self.ltk = None
self.ltk_ediv = 0 self.ltk_ediv = 0
self.ltk_rand = bytes(8) self.ltk_rand = bytes(8)
self.link_key = None self.link_key: Optional[bytes] = None
self.initiator_key_distribution: int = 0 self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0 self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None self.peer_random_value: Optional[bytes] = None
@@ -787,9 +789,7 @@ class Session:
) )
self.r = pairing_config.oob.our_context.r self.r = pairing_config.oob.our_context.r
self.ecc_key = pairing_config.oob.our_context.ecc_key self.ecc_key = pairing_config.oob.our_context.ecc_key
if pairing_config.oob.legacy_context is None: if pairing_config.oob.legacy_context is not None:
self.tk = None
else:
self.tk = pairing_config.oob.legacy_context.tk self.tk = pairing_config.oob.legacy_context.tk
else: else:
if pairing_config.oob.legacy_context is None: if pairing_config.oob.legacy_context is None:
@@ -807,7 +807,7 @@ class Session:
@property @property
def pkx(self) -> Tuple[bytes, bytes]: def pkx(self) -> Tuple[bytes, bytes]:
return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x) return (self.ecc_key.x[::-1], self.peer_public_key_x)
@property @property
def pka(self) -> bytes: def pka(self) -> bytes:
@@ -1061,8 +1061,8 @@ class Session:
def send_public_key_command(self) -> None: def send_public_key_command(self) -> None:
self.send_command( self.send_command(
SMP_Pairing_Public_Key_Command( SMP_Pairing_Public_Key_Command(
public_key_x=bytes(reversed(self.ecc_key.x)), public_key_x=self.ecc_key.x[::-1],
public_key_y=bytes(reversed(self.ecc_key.y)), public_key_y=self.ecc_key.y[::-1],
) )
) )
@@ -1098,15 +1098,52 @@ class Session:
) )
) )
async def derive_ltk(self) -> None: @classmethod
link_key = await self.manager.device.get_link_key(self.connection.peer_address) def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
assert link_key is not None '''Derives Long Term Key from Link Key.
Args:
link_key: BR/EDR Link Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
LE Long Tern Key bytes in little-endian.
'''
ilk = ( ilk = (
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key) crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
if self.ct2 if ct2
else crypto.h6(link_key, b'tmp2') else crypto.h6(link_key, b'tmp2')
) )
self.ltk = crypto.h6(ilk, b'brle') return crypto.h6(ilk, b'brle')
@classmethod
def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
'''Derives Link Key from Long Term Key.
Args:
ltk: LE Long Term Key bytes in little-endian.
ct2: whether ct2 is supported on both devices.
Returns:
BR/EDR Link Key bytes in little-endian.
'''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
if ct2
else crypto.h6(ltk, b'tmp1')
)
return crypto.h6(ilk, b'lebr')
async def get_link_key_and_derive_ltk(self) -> None:
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
if link_key is None:
logging.warning(
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
)
self.send_pairing_failed(
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
)
else:
self.ltk = self.derive_ltk(link_key, self.ct2)
def distribute_keys(self) -> None: def distribute_keys(self) -> None:
# Distribute the keys as required # Distribute the keys as required
@@ -1117,7 +1154,7 @@ class Session:
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
): ):
self.ctkd_task = self.connection.abort_on( self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk() 'disconnection', self.get_link_key_and_derive_ltk()
) )
elif not self.sc: elif not self.sc:
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
@@ -1147,12 +1184,7 @@ class Session:
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = ( self.link_key = self.derive_link_key(self.ltk, self.ct2)
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
else: else:
# CTKD: Derive LTK from LinkKey # CTKD: Derive LTK from LinkKey
@@ -1161,7 +1193,7 @@ class Session:
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
): ):
self.ctkd_task = self.connection.abort_on( self.ctkd_task = self.connection.abort_on(
'disconnection', self.derive_ltk() 'disconnection', self.get_link_key_and_derive_ltk()
) )
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
elif not self.sc: elif not self.sc:
@@ -1191,12 +1223,7 @@ class Session:
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = ( self.link_key = self.derive_link_key(self.ltk, self.ct2)
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
if self.ct2
else crypto.h6(self.ltk, b'tmp1')
)
self.link_key = crypto.h6(ilk, b'lebr')
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
# Set our expectations for what to wait for in the key distribution phase # Set our expectations for what to wait for in the key distribution phase
@@ -1754,14 +1781,10 @@ class Session:
self.peer_public_key_y = command.public_key_y self.peer_public_key_y = command.public_key_y
# Compute the DH key # Compute the DH key
self.dh_key = bytes( self.dh_key = self.ecc_key.dh(
reversed( command.public_key_x[::-1],
self.ecc_key.dh( command.public_key_y[::-1],
bytes(reversed(command.public_key_x)), )[::-1]
bytes(reversed(command.public_key_y)),
)
)
)
logger.debug(f'DH key: {self.dh_key.hex()}') logger.debug(f'DH key: {self.dh_key.hex()}')
if self.pairing_method == PairingMethod.OOB: if self.pairing_method == PairingMethod.OOB:
@@ -1824,7 +1847,6 @@ class Session:
else: else:
self.send_pairing_dhkey_check_command() self.send_pairing_dhkey_check_command()
else: else:
assert self.ltk
self.start_encryption(self.ltk) self.start_encryption(self.ltk)
def on_smp_pairing_failed_command( def on_smp_pairing_failed_command(
@@ -1874,6 +1896,7 @@ class Manager(EventEmitter):
sessions: Dict[int, Session] sessions: Dict[int, Session]
pairing_config_factory: Callable[[Connection], PairingConfig] pairing_config_factory: Callable[[Connection], PairingConfig]
session_proxy: Type[Session] session_proxy: Type[Session]
_ecc_key: Optional[crypto.EccKey]
def __init__( def __init__(
self, self,

View File

@@ -21,7 +21,7 @@ import logging
import os import os
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
@@ -38,7 +38,6 @@ from bumble.smp import (
OobLegacyContext, OobLegacyContext,
) )
from bumble.core import ProtocolError from bumble.core import ProtocolError
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
from bumble.keys import PairingKeys from bumble.keys import PairingKeys
@@ -519,16 +518,8 @@ async def test_self_smp_over_classic():
# Mock connection # Mock connection
# TODO: Implement Classic SSP and encryption in link relayer # TODO: Implement Classic SSP and encryption in link relayer
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835') LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
two_devices.devices[0].on_link_key( two_devices.devices[0].get_link_key = AsyncMock(return_value=LINK_KEY)
two_devices.devices[1].public_address, two_devices.devices[1].get_link_key = AsyncMock(return_value=LINK_KEY)
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.devices[1].on_link_key(
two_devices.devices[0].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.connections[0].encryption = 1 two_devices.connections[0].encryption = 1
two_devices.connections[1].encryption = 1 two_devices.connections[1].encryption = 1

View File

@@ -16,6 +16,9 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import pytest
from bumble import smp
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1 from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.pairing import OobData, OobSharedData, LeRole from bumble.pairing import OobData, OobSharedData, LeRole
from bumble.hci import Address from bumble.hci import Address
@@ -28,8 +31,8 @@ from bumble.core import AdvertisingData
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def reversed_hex(hex_str): def reversed_hex(hex_str: str) -> bytes:
return bytes(reversed(bytes.fromhex(hex_str))) return bytes.fromhex(hex_str)[::-1]
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -129,112 +132,79 @@ def test_aes_cmac():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_f4(): def test_f4():
u = bytes( u = reversed_hex(
reversed( '20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
) )
v = bytes( v = reversed_hex(
reversed( '55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
) )
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
z = bytes([0]) z = b'\0'
value = f4(u, v, x, z) value = f4(u, v, x, z)
assert bytes(reversed(value)) == bytes.fromhex( assert value == reversed_hex('f2c916f1 07a9bd1c f1eda1be a974872d')
'f2c916f1 07a9bd1c f1eda1be a974872d'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_f5(): def test_f5():
w = bytes( w = reversed_hex(
reversed( 'ec0234a3 57c8ad05 341010a6 0a397d9b 99796b13 b4f866f1 868d34f3 73bfa698'
bytes.fromhex(
'ec0234a3 57c8ad05 341010a6 0a397d9b'
+ '99796b13 b4f866f1 868d34f3 73bfa698'
)
)
) )
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce'))) a1 = reversed_hex('00561237 37bfce')
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1'))) a2 = reversed_hex('00a71370 2dcfc1')
value = f5(w, n1, n2, a1, a2) value = f5(w, n1, n2, a1, a2)
assert bytes(reversed(value[0])) == bytes.fromhex( assert value[0] == reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
'2965f176 a1084a02 fd3f6a20 ce636e20' assert value[1] == reversed_hex('69867911 69d7cd23 980522b5 94750a38')
)
assert bytes(reversed(value[1])) == bytes.fromhex(
'69867911 69d7cd23 980522b5 94750a38'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_f6(): def test_f6():
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
mac_key = bytes(reversed(bytes.fromhex('2965f176 a1084a02 fd3f6a20 ce636e20'))) mac_key = reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
r = bytes(reversed(bytes.fromhex('12a3343b b453bb54 08da42d2 0c2d0fc8'))) r = reversed_hex('12a3343b b453bb54 08da42d2 0c2d0fc8')
io_cap = bytes(reversed(bytes.fromhex('010102'))) io_cap = reversed_hex('010102')
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce'))) a1 = reversed_hex('00561237 37bfce')
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1'))) a2 = reversed_hex('00a71370 2dcfc1')
value = f6(mac_key, n1, n2, r, io_cap, a1, a2) value = f6(mac_key, n1, n2, r, io_cap, a1, a2)
assert bytes(reversed(value)) == bytes.fromhex( assert value == reversed_hex('e3c47398 9cd0e8c5 d26c0b09 da958f61')
'e3c47398 9cd0e8c5 d26c0b09 da958f61'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_g2(): def test_g2():
u = bytes( u = reversed_hex(
reversed( '20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
+ 'eff49111 acf4fddb cc030148 0e359de6'
)
)
) )
v = bytes( v = reversed_hex(
reversed( '55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
bytes.fromhex(
'55188b3d 32f6bb9a 900afcfb eed4e72a'
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
)
)
) )
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
y = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) y = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
value = g2(u, v, x, y) value = g2(u, v, x, y)
assert value == 0x2F9ED5BA assert value == 0x2F9ED5BA
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_h6(): def test_h6():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY_ID = bytes.fromhex('6c656272') KEY_ID = bytes.fromhex('6c656272')
assert h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399') assert h6(KEY, KEY_ID) == reversed_hex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_h7(): def test_h7():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031') SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
assert h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011') assert h7(SALT, KEY) == reversed_hex('fb173597 c6a3c0ec d2998c2a 75a57011')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_ah(): def test_ah():
irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b'))) irk = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
prand = bytes(reversed(bytes.fromhex('708194'))) prand = reversed_hex('708194')
value = ah(irk, prand) value = ah(irk, prand)
expected = bytes(reversed(bytes.fromhex('0dfbaa'))) expected = reversed_hex('0dfbaa')
assert value == expected assert value == expected
@@ -243,7 +213,7 @@ def test_oob_data():
oob_data = OobData( oob_data = OobData(
address=Address("F0:F1:F2:F3:F4:F5"), address=Address("F0:F1:F2:F3:F4:F5"),
role=LeRole.BOTH_PERIPHERAL_PREFERRED, role=LeRole.BOTH_PERIPHERAL_PREFERRED,
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])), shared_data=OobSharedData(c=b'12', r=b'34'),
) )
oob_data_ad = oob_data.to_ad() oob_data_ad = oob_data.to_ad()
oob_data_bytes = bytes(oob_data_ad) oob_data_bytes = bytes(oob_data_ad)
@@ -255,6 +225,32 @@ def test_oob_data():
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'bc1ca4ef 633fc1bd 0d8230af ee388fb0'),
(True, '287ad379 dca40253 0a39f1f4 3047b835'),
],
)
def test_ltk_to_link_key(ct2: bool, expected: str):
LTK = reversed_hex('368df9bc e3264b58 bd066c33 334fbf64')
assert smp.Session.derive_link_key(LTK, ct2) == reversed_hex(expected)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'ct2, expected',
[
(False, 'a813fb72 f1a3dfa1 8a2c9a43 f10d0a30'),
(True, 'e85e09eb 5eccb3e2 69418a13 3211bc79'),
],
)
def test_link_key_to_ltk(ct2: bool, expected: str):
LINK_KEY = reversed_hex('05040302 01000908 07060504 03020100')
assert smp.Session.derive_ltk(LINK_KEY, ct2) == reversed_hex(expected)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_ecc() test_ecc()