From f6c0bd88d787da5a39075afe0dc02bc5c63d3d51 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Tue, 25 Jul 2023 16:35:17 +0800 Subject: [PATCH] SMP: Do not send phase 2 commands in CTKD --- bumble/smp.py | 10 +++++- tests/self_test.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/bumble/smp.py b/bumble/smp.py index aa438c6..65206bd 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -559,6 +559,7 @@ class PairingMethod(enum.IntEnum): NUMERIC_COMPARISON = 1 PASSKEY = 2 OOB = 3 + CTKD_OVER_CLASSIC = 4 def __str__(self) -> str: return { @@ -566,6 +567,7 @@ class PairingMethod(enum.IntEnum): PairingMethod.NUMERIC_COMPARISON: 'NUMERIC_COMPARISON', PairingMethod.PASSKEY: 'PASSKEY', PairingMethod.OOB: 'OOB', + PairingMethod.CTKD_OVER_CLASSIC: 'CTKD_OVER_CLASSIC', }[self] @@ -777,6 +779,9 @@ class Session: def decide_pairing_method( self, auth_req: int, initiator_io_capability: int, responder_io_capability: int ) -> None: + if self.connection.transport == BT_BR_EDR_TRANSPORT: + self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC + return if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0): self.pairing_method = PairingMethod.JUST_WORKS return @@ -1414,7 +1419,10 @@ class Session: self.compute_peer_expected_distributions(self.responder_key_distribution) # Start phase 2 - if self.sc: + if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC: + # Authentication is already done in SMP, so remote shall start keys distribution immediately + return + elif self.sc: if self.pairing_method == PairingMethod.PASSKEY: self.display_or_input_passkey() diff --git a/tests/self_test.py b/tests/self_test.py index 1a1a474..d7e15e4 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -21,6 +21,8 @@ import logging import os import pytest +from unittest.mock import MagicMock + from bumble.controller import Controller from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE from bumble.link import LocalLink @@ -34,6 +36,8 @@ from bumble.smp import ( SMP_CONFIRM_VALUE_FAILED_ERROR, ) from bumble.core import ProtocolError +from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE +from bumble.keys import PairingKeys # ----------------------------------------------------------------------------- @@ -473,6 +477,86 @@ async def test_self_smp_wrong_pin(): assert not paired +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_self_smp_over_classic(): + # Create two devices, each with a controller, attached to the same link + two_devices = TwoDevices() + + # Attach listeners + two_devices.devices[0].on( + 'connection', lambda connection: two_devices.on_connection(0, connection) + ) + two_devices.devices[1].on( + 'connection', lambda connection: two_devices.on_connection(1, connection) + ) + + # Enable Classic connections + two_devices.devices[0].classic_enabled = True + two_devices.devices[1].classic_enabled = True + + # Start + await two_devices.devices[0].power_on() + await two_devices.devices[1].power_on() + + # Connect the two devices + await asyncio.gather( + two_devices.devices[0].connect( + two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT + ), + two_devices.devices[1].accept(two_devices.devices[0].public_address), + ) + + # Check the post conditions + assert two_devices.connections[0] is not None + assert two_devices.connections[1] is not None + + # Mock connection + # TODO: Implement Classic SSP and encryption in link relayer + LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835') + two_devices.devices[0].on_link_key( + two_devices.devices[1].public_address, + LINK_KEY, + HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, + ) + two_devices.devices[1].on_link_key( + two_devices.devices[0].public_address, + LINK_KEY, + HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, + ) + two_devices.connections[0].encryption = 1 + two_devices.connections[1].encryption = 1 + + paired = [ + asyncio.get_event_loop().create_future(), + asyncio.get_event_loop().create_future(), + ] + + def on_pairing(which: int, keys: PairingKeys): + paired[which].set_result(keys) + + two_devices.connections[0].on('pairing', lambda keys: on_pairing(0, keys)) + two_devices.connections[1].on('pairing', lambda keys: on_pairing(1, keys)) + + # Mock SMP + from bumble.smp import Session as SmpSession + + SmpSession.send_pairing_confirm_command = MagicMock() + SmpSession.send_pairing_dhkey_check_command = MagicMock() + SmpSession.send_public_key_command = MagicMock() + SmpSession.send_pairing_random_command = MagicMock() + + # Start CTKD + await two_devices.connections[0].pair() + await asyncio.gather(*paired) + + # Phase 2 commands should not be invoked + SmpSession.send_pairing_confirm_command.assert_not_called() + SmpSession.send_pairing_dhkey_check_command.assert_not_called() + SmpSession.send_public_key_command.assert_not_called() + SmpSession.send_pairing_random_command.assert_not_called() + + # ----------------------------------------------------------------------------- async def run_test_self(): await test_self_connection() @@ -481,6 +565,7 @@ async def run_test_self(): await test_self_smp() await test_self_smp_reject() await test_self_smp_wrong_pin() + await test_self_smp_over_classic() # -----------------------------------------------------------------------------