diff --git a/bumble/crypto.py b/bumble/crypto.py index 1462e7ff..4f134765 100644 --- a/bumble/crypto.py +++ b/bumble/crypto.py @@ -227,3 +227,17 @@ def g2(u, v, x, y): aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)), bytes(reversed(x)))[-4:], byteorder='big' ) + +# ----------------------------------------------------------------------------- +def h6(w, key_id): + ''' + See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6 + ''' + return aes_cmac(key_id, w) + +# ----------------------------------------------------------------------------- +def h7(salt, w): + ''' + See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7 + ''' + return aes_cmac(w, salt) diff --git a/bumble/smp.py b/bumble/smp.py index c25e0146..dbb2b7a6 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -150,6 +150,8 @@ SMP_SC_AUTHREQ = 0b00001000 SMP_KEYPRESS_AUTHREQ = 0b00010000 SMP_CT2_AUTHREQ = 0b00100000 +# Crypto salt +SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') # ----------------------------------------------------------------------------- # Utils @@ -457,9 +459,17 @@ class PairingDelegate: DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY + DEFAULT_KEY_DISTRIBUTION = (SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG) - def __init__(self, io_capability = NO_OUTPUT_NO_INPUT): + def __init__( + self, + io_capability=NO_OUTPUT_NO_INPUT, + local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION, + local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION + ): self.io_capability = io_capability + self.local_initiator_key_distribution = local_initiator_key_distribution + self.local_responder_key_distribution = local_responder_key_distribution async def accept(self): return True @@ -473,6 +483,14 @@ class PairingDelegate: async def display_number(self, number, digits=6): pass + async def key_distribution_response(self, peer_initiator_key_distribution, peer_responder_key_distribution): + return ( + (peer_initiator_key_distribution & + self.local_initiator_key_distribution), + (peer_responder_key_distribution & + self.local_responder_key_distribution) + ) + # ----------------------------------------------------------------------------- class PairingConfig: @@ -559,6 +577,7 @@ class Session: self.ltk = None self.ltk_ediv = 0 self.ltk_rand = bytes(8) + self.link_key = None self.initiator_key_distribution = 0 self.responder_key_distribution = 0 self.peer_random_value = None @@ -596,11 +615,8 @@ class Session: self.pairing_result = None # Key Distribution (default values before negotiation) - self.initiator_key_distribution = ( - SMP_ENC_KEY_DISTRIBUTION_FLAG | - SMP_ID_KEY_DISTRIBUTION_FLAG # |SMP_SIGN_KEY_DISTRIBUTION_FLAG - ) - self.responder_key_distribution = self.initiator_key_distribution + self.initiator_key_distribution = pairing_config.delegate.local_initiator_key_distribution + self.responder_key_distribution = pairing_config.delegate.local_responder_key_distribution # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3 self.bonding = pairing_config.bonding @@ -870,47 +886,56 @@ class Session: self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) - # Distribute IRK + # Distribute IRK & BD ADDR if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: self.send_command( SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) ) - - # Distribute BD ADDR - self.send_command(SMP_Identity_Address_Information_Command( - addr_type = self.manager.address.address_type, - bd_addr = self.manager.address - )) + self.send_command(SMP_Identity_Address_Information_Command( + addr_type = self.manager.address.address_type, + bd_addr = self.manager.address + )) # Distribute CSRK csrk = bytes(16) # FIXME: testing if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) + + # CTKD, calculate BR/EDR link key + if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: + ilk = crypto.h7( + salt=SMP_CTKD_H7_LEBR_SALT, + w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1') + self.link_key = crypto.h6(ilk, b'lebr') + else: - # Distribute the LTK + # Distribute the LTK, EDIV and RAND if not self.sc: if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) + self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) - # Distribute EDIV and RAND - self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) - - # Distribute IRK + # Distribute IRK & BD ADDR if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: self.send_command( SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) ) - - # Distribute BD ADDR - self.send_command(SMP_Identity_Address_Information_Command( - addr_type = self.manager.address.address_type, - bd_addr = self.manager.address - )) + self.send_command(SMP_Identity_Address_Information_Command( + addr_type = self.manager.address.address_type, + bd_addr = self.manager.address + )) # Distribute CSRK csrk = bytes(16) # FIXME: testing if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) + + # CTKD, calculate BR/EDR link key + if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: + ilk = crypto.h7( + salt=SMP_CTKD_H7_LEBR_SALT, + w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1') + self.link_key = crypto.h6(ilk, b'lebr') def compute_peer_expected_distributions(self, key_distribution_flags): # Set our expectations for what to wait for in the key distribution phase @@ -945,7 +970,7 @@ class Session: # Nothing left to expect, we're done self.on_pairing() else: - logger.warn(color('!!! unexpected key distribution command', 'red')) + logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red')) self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) async def pair(self): @@ -1029,6 +1054,11 @@ class Session: value = self.peer_signature_key, authenticated = authenticated ) + if self.link_key is not None: + keys.link_key = PairingKeys.Key( + value = self.link_key, + authenticated = authenticated + ) self.manager.on_pairing(self, peer_address, keys) @@ -1076,6 +1106,7 @@ class Session: # Bonding and SC require both sides to request/support it self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) + self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) # Check for OOB if command.oob_data_flag != 0: @@ -1091,8 +1122,8 @@ class Session: logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}') # Key distribution - self.initiator_key_distribution &= command.initiator_key_distribution - self.responder_key_distribution &= command.responder_key_distribution + self.initiator_key_distribution, self.responder_key_distribution = await self.pairing_config.delegate.key_distribution_response( + command.initiator_key_distribution, command.responder_key_distribution) self.compute_peer_expected_distributions(self.initiator_key_distribution) # The pairing is now starting diff --git a/tests/self_test.py b/tests/self_test.py index 0b3762a0..a1d2ddda 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import itertools import logging import os import pytest @@ -30,7 +31,8 @@ from bumble.smp import ( PairingConfig, PairingDelegate, SMP_PAIRING_NOT_SUPPORTED_ERROR, - SMP_CONFIRM_VALUE_FAILED_ERROR + SMP_CONFIRM_VALUE_FAILED_ERROR, + SMP_ID_KEY_DISTRIBUTION_FLAG, ) from bumble.core import ProtocolError @@ -196,11 +198,28 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2): # ----------------------------------------------------------------------------- +IO_CAP = [ + PairingDelegate.NO_OUTPUT_NO_INPUT, + PairingDelegate.KEYBOARD_INPUT_ONLY, + PairingDelegate.DISPLAY_OUTPUT_ONLY, + PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, + PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT +] +SC = [False, True] +MITM = [False, True] +# Key distribution is a 4-bit bitmask +# IdKey is necessary for current SMP structure +KEY_DIST = [i for i in range(16) if (i & SMP_ID_KEY_DISTRIBUTION_FLAG)] + @pytest.mark.asyncio -async def test_self_smp(): +@pytest.mark.parametrize('io_cap, sc, mitm, key_dist', + itertools.product(IO_CAP, SC, MITM, KEY_DIST) +) +async def test_self_smp(io_cap, sc, mitm, key_dist): class Delegate(PairingDelegate): - def __init__(self, name, io_capability): - super().__init__(io_capability) + def __init__(self, name, io_capability, local_initiator_key_distribution, local_responder_key_distribution): + super().__init__(io_capability, local_initiator_key_distribution, + local_responder_key_distribution) self.name = name self.reset() @@ -240,17 +259,8 @@ async def test_self_smp(): pairing_config_sets = [('Initiator', [None]), ('Responder', [None])] for pairing_config_set in pairing_config_sets: - for io_capability in [ - PairingDelegate.NO_OUTPUT_NO_INPUT, - PairingDelegate.KEYBOARD_INPUT_ONLY, - PairingDelegate.DISPLAY_OUTPUT_ONLY, - PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, - PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT - ]: - for sc in [False, True]: - for mitm in [False, True]: - delegate = Delegate(pairing_config_set[0], io_capability) - pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate)) + delegate = Delegate(pairing_config_set[0], io_cap, key_dist, key_dist) + pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate)) for pairing_config1 in pairing_config_sets[0][1]: for pairing_config2 in pairing_config_sets[1][1]: @@ -262,7 +272,9 @@ async def test_self_smp(): if pairing_config1 and pairing_config2: pairing_config1.delegate.peer_delegate = pairing_config2.delegate pairing_config2.delegate.peer_delegate = pairing_config1.delegate + await _test_self_smp_with_configs(pairing_config1, pairing_config2) + # ----------------------------------------------------------------------------- diff --git a/tests/smp_test.py b/tests/smp_test.py index 771fbc80..9120c477 100644 --- a/tests/smp_test.py +++ b/tests/smp_test.py @@ -176,6 +176,20 @@ def test_g2(): assert(value == 0x2f9ed5ba) +# ----------------------------------------------------------------------------- +def test_h6(): + KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') + KEY_ID = bytes.fromhex('6c656272') + assert(h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399')) + + +# ----------------------------------------------------------------------------- +def test_h7(): + KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') + SALT = bytes.fromhex('00000000 00000000 00000000 746D7031') + assert(h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011')) + + # ----------------------------------------------------------------------------- def test_ah(): irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b'))) @@ -195,4 +209,6 @@ if __name__ == '__main__': test_f5() test_f6() test_g2() + test_h6() + test_h7() test_ah()