forked from auracaster/bumble_mirror
SMP: Do not send phase 2 commands in CTKD
This commit is contained in:
+9
-1
@@ -559,6 +559,7 @@ class PairingMethod(enum.IntEnum):
|
||||
NUMERIC_COMPARISON = 1
|
||||
PASSKEY = 2
|
||||
OOB = 3
|
||||
CTKD_OVER_CLASSIC = 4
|
||||
|
||||
def __str__(self) -> str:
|
||||
return {
|
||||
@@ -566,6 +567,7 @@ class PairingMethod(enum.IntEnum):
|
||||
PairingMethod.NUMERIC_COMPARISON: 'NUMERIC_COMPARISON',
|
||||
PairingMethod.PASSKEY: 'PASSKEY',
|
||||
PairingMethod.OOB: 'OOB',
|
||||
PairingMethod.CTKD_OVER_CLASSIC: 'CTKD_OVER_CLASSIC',
|
||||
}[self]
|
||||
|
||||
|
||||
@@ -777,6 +779,9 @@ class Session:
|
||||
def decide_pairing_method(
|
||||
self, auth_req: int, initiator_io_capability: int, responder_io_capability: int
|
||||
) -> None:
|
||||
if self.connection.transport == BT_BR_EDR_TRANSPORT:
|
||||
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
|
||||
return
|
||||
if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0):
|
||||
self.pairing_method = PairingMethod.JUST_WORKS
|
||||
return
|
||||
@@ -1414,7 +1419,10 @@ class Session:
|
||||
self.compute_peer_expected_distributions(self.responder_key_distribution)
|
||||
|
||||
# Start phase 2
|
||||
if self.sc:
|
||||
if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC:
|
||||
# Authentication is already done in SMP, so remote shall start keys distribution immediately
|
||||
return
|
||||
elif self.sc:
|
||||
if self.pairing_method == PairingMethod.PASSKEY:
|
||||
self.display_or_input_passkey()
|
||||
|
||||
|
||||
@@ -21,6 +21,8 @@ import logging
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from bumble.controller import Controller
|
||||
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
|
||||
from bumble.link import LocalLink
|
||||
@@ -34,6 +36,8 @@ from bumble.smp import (
|
||||
SMP_CONFIRM_VALUE_FAILED_ERROR,
|
||||
)
|
||||
from bumble.core import ProtocolError
|
||||
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
|
||||
from bumble.keys import PairingKeys
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -473,6 +477,86 @@ async def test_self_smp_wrong_pin():
|
||||
assert not paired
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_self_smp_over_classic():
|
||||
# Create two devices, each with a controller, attached to the same link
|
||||
two_devices = TwoDevices()
|
||||
|
||||
# Attach listeners
|
||||
two_devices.devices[0].on(
|
||||
'connection', lambda connection: two_devices.on_connection(0, connection)
|
||||
)
|
||||
two_devices.devices[1].on(
|
||||
'connection', lambda connection: two_devices.on_connection(1, connection)
|
||||
)
|
||||
|
||||
# Enable Classic connections
|
||||
two_devices.devices[0].classic_enabled = True
|
||||
two_devices.devices[1].classic_enabled = True
|
||||
|
||||
# Start
|
||||
await two_devices.devices[0].power_on()
|
||||
await two_devices.devices[1].power_on()
|
||||
|
||||
# Connect the two devices
|
||||
await asyncio.gather(
|
||||
two_devices.devices[0].connect(
|
||||
two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
|
||||
),
|
||||
two_devices.devices[1].accept(two_devices.devices[0].public_address),
|
||||
)
|
||||
|
||||
# Check the post conditions
|
||||
assert two_devices.connections[0] is not None
|
||||
assert two_devices.connections[1] is not None
|
||||
|
||||
# Mock connection
|
||||
# TODO: Implement Classic SSP and encryption in link relayer
|
||||
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
|
||||
two_devices.devices[0].on_link_key(
|
||||
two_devices.devices[1].public_address,
|
||||
LINK_KEY,
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
)
|
||||
two_devices.devices[1].on_link_key(
|
||||
two_devices.devices[0].public_address,
|
||||
LINK_KEY,
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
)
|
||||
two_devices.connections[0].encryption = 1
|
||||
two_devices.connections[1].encryption = 1
|
||||
|
||||
paired = [
|
||||
asyncio.get_event_loop().create_future(),
|
||||
asyncio.get_event_loop().create_future(),
|
||||
]
|
||||
|
||||
def on_pairing(which: int, keys: PairingKeys):
|
||||
paired[which].set_result(keys)
|
||||
|
||||
two_devices.connections[0].on('pairing', lambda keys: on_pairing(0, keys))
|
||||
two_devices.connections[1].on('pairing', lambda keys: on_pairing(1, keys))
|
||||
|
||||
# Mock SMP
|
||||
from bumble.smp import Session as SmpSession
|
||||
|
||||
SmpSession.send_pairing_confirm_command = MagicMock()
|
||||
SmpSession.send_pairing_dhkey_check_command = MagicMock()
|
||||
SmpSession.send_public_key_command = MagicMock()
|
||||
SmpSession.send_pairing_random_command = MagicMock()
|
||||
|
||||
# Start CTKD
|
||||
await two_devices.connections[0].pair()
|
||||
await asyncio.gather(*paired)
|
||||
|
||||
# Phase 2 commands should not be invoked
|
||||
SmpSession.send_pairing_confirm_command.assert_not_called()
|
||||
SmpSession.send_pairing_dhkey_check_command.assert_not_called()
|
||||
SmpSession.send_public_key_command.assert_not_called()
|
||||
SmpSession.send_pairing_random_command.assert_not_called()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def run_test_self():
|
||||
await test_self_connection()
|
||||
@@ -481,6 +565,7 @@ async def run_test_self():
|
||||
await test_self_smp()
|
||||
await test_self_smp_reject()
|
||||
await test_self_smp_wrong_pin()
|
||||
await test_self_smp_over_classic()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user