diff --git a/bumble/smp.py b/bumble/smp.py index 91ca1be5..dbb2b7a6 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -459,9 +459,17 @@ class PairingDelegate: DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY + DEFAULT_KEY_DISTRIBUTION = (SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG) - def __init__(self, io_capability = NO_OUTPUT_NO_INPUT): + def __init__( + self, + io_capability=NO_OUTPUT_NO_INPUT, + local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION, + local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION + ): self.io_capability = io_capability + self.local_initiator_key_distribution = local_initiator_key_distribution + self.local_responder_key_distribution = local_responder_key_distribution async def accept(self): return True @@ -475,6 +483,14 @@ class PairingDelegate: async def display_number(self, number, digits=6): pass + async def key_distribution_response(self, peer_initiator_key_distribution, peer_responder_key_distribution): + return ( + (peer_initiator_key_distribution & + self.local_initiator_key_distribution), + (peer_responder_key_distribution & + self.local_responder_key_distribution) + ) + # ----------------------------------------------------------------------------- class PairingConfig: @@ -599,11 +615,8 @@ class Session: self.pairing_result = None # Key Distribution (default values before negotiation) - self.initiator_key_distribution = ( - SMP_ENC_KEY_DISTRIBUTION_FLAG | - SMP_ID_KEY_DISTRIBUTION_FLAG # |SMP_SIGN_KEY_DISTRIBUTION_FLAG - ) - self.responder_key_distribution = self.initiator_key_distribution + self.initiator_key_distribution = pairing_config.delegate.local_initiator_key_distribution + self.responder_key_distribution = pairing_config.delegate.local_responder_key_distribution # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3 self.bonding = pairing_config.bonding @@ -873,17 +886,15 @@ class Session: self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) - # Distribute IRK + # Distribute IRK & BD ADDR if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: self.send_command( SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) ) - - # Distribute BD ADDR - self.send_command(SMP_Identity_Address_Information_Command( - addr_type = self.manager.address.address_type, - bd_addr = self.manager.address - )) + self.send_command(SMP_Identity_Address_Information_Command( + addr_type = self.manager.address.address_type, + bd_addr = self.manager.address + )) # Distribute CSRK csrk = bytes(16) # FIXME: testing @@ -898,25 +909,21 @@ class Session: self.link_key = crypto.h6(ilk, b'lebr') else: - # Distribute the LTK + # Distribute the LTK, EDIV and RAND if not self.sc: if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) + self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) - # Distribute EDIV and RAND - self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) - - # Distribute IRK + # Distribute IRK & BD ADDR if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: self.send_command( SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) ) - - # Distribute BD ADDR - self.send_command(SMP_Identity_Address_Information_Command( - addr_type = self.manager.address.address_type, - bd_addr = self.manager.address - )) + self.send_command(SMP_Identity_Address_Information_Command( + addr_type = self.manager.address.address_type, + bd_addr = self.manager.address + )) # Distribute CSRK csrk = bytes(16) # FIXME: testing @@ -963,7 +970,7 @@ class Session: # Nothing left to expect, we're done self.on_pairing() else: - logger.warn(color('!!! unexpected key distribution command', 'red')) + logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red')) self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) async def pair(self): @@ -1115,8 +1122,8 @@ class Session: logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}') # Key distribution - self.initiator_key_distribution &= command.initiator_key_distribution - self.responder_key_distribution &= command.responder_key_distribution + self.initiator_key_distribution, self.responder_key_distribution = await self.pairing_config.delegate.key_distribution_response( + command.initiator_key_distribution, command.responder_key_distribution) self.compute_peer_expected_distributions(self.initiator_key_distribution) # The pairing is now starting diff --git a/tests/self_test.py b/tests/self_test.py index 0b3762a0..a1d2ddda 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import itertools import logging import os import pytest @@ -30,7 +31,8 @@ from bumble.smp import ( PairingConfig, PairingDelegate, SMP_PAIRING_NOT_SUPPORTED_ERROR, - SMP_CONFIRM_VALUE_FAILED_ERROR + SMP_CONFIRM_VALUE_FAILED_ERROR, + SMP_ID_KEY_DISTRIBUTION_FLAG, ) from bumble.core import ProtocolError @@ -196,11 +198,28 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2): # ----------------------------------------------------------------------------- +IO_CAP = [ + PairingDelegate.NO_OUTPUT_NO_INPUT, + PairingDelegate.KEYBOARD_INPUT_ONLY, + PairingDelegate.DISPLAY_OUTPUT_ONLY, + PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, + PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT +] +SC = [False, True] +MITM = [False, True] +# Key distribution is a 4-bit bitmask +# IdKey is necessary for current SMP structure +KEY_DIST = [i for i in range(16) if (i & SMP_ID_KEY_DISTRIBUTION_FLAG)] + @pytest.mark.asyncio -async def test_self_smp(): +@pytest.mark.parametrize('io_cap, sc, mitm, key_dist', + itertools.product(IO_CAP, SC, MITM, KEY_DIST) +) +async def test_self_smp(io_cap, sc, mitm, key_dist): class Delegate(PairingDelegate): - def __init__(self, name, io_capability): - super().__init__(io_capability) + def __init__(self, name, io_capability, local_initiator_key_distribution, local_responder_key_distribution): + super().__init__(io_capability, local_initiator_key_distribution, + local_responder_key_distribution) self.name = name self.reset() @@ -240,17 +259,8 @@ async def test_self_smp(): pairing_config_sets = [('Initiator', [None]), ('Responder', [None])] for pairing_config_set in pairing_config_sets: - for io_capability in [ - PairingDelegate.NO_OUTPUT_NO_INPUT, - PairingDelegate.KEYBOARD_INPUT_ONLY, - PairingDelegate.DISPLAY_OUTPUT_ONLY, - PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, - PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT - ]: - for sc in [False, True]: - for mitm in [False, True]: - delegate = Delegate(pairing_config_set[0], io_capability) - pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate)) + delegate = Delegate(pairing_config_set[0], io_cap, key_dist, key_dist) + pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate)) for pairing_config1 in pairing_config_sets[0][1]: for pairing_config2 in pairing_config_sets[1][1]: @@ -262,7 +272,9 @@ async def test_self_smp(): if pairing_config1 and pairing_config2: pairing_config1.delegate.peer_delegate = pairing_config2.delegate pairing_config2.delegate.peer_delegate = pairing_config1.delegate + await _test_self_smp_with_configs(pairing_config1, pairing_config2) + # -----------------------------------------------------------------------------