forked from auracaster/bumble_mirror
Since keystore.update() overwrites all existing keys, the existing link key will be wiped out. To avoid this, SMP also need to keep the key.
667 lines
22 KiB
Python
667 lines
22 KiB
Python
# Copyright 2021-2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import itertools
|
|
import logging
|
|
import os
|
|
import pytest
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from bumble.controller import Controller
|
|
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
|
|
from bumble.link import LocalLink
|
|
from bumble.device import Device, Peer
|
|
from bumble.host import Host
|
|
from bumble.gatt import Service, Characteristic
|
|
from bumble.transport import AsyncPipeSink
|
|
from bumble.pairing import PairingConfig, PairingDelegate
|
|
from bumble.smp import (
|
|
SMP_PAIRING_NOT_SUPPORTED_ERROR,
|
|
SMP_CONFIRM_VALUE_FAILED_ERROR,
|
|
OobContext,
|
|
OobLegacyContext,
|
|
)
|
|
from bumble.core import ProtocolError
|
|
from bumble.keys import PairingKeys
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Logging
|
|
# -----------------------------------------------------------------------------
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class TwoDevices:
|
|
def __init__(self):
|
|
self.connections = [None, None]
|
|
|
|
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
|
|
self.link = LocalLink()
|
|
self.controllers = [
|
|
Controller('C1', link=self.link, public_address=addresses[0]),
|
|
Controller('C2', link=self.link, public_address=addresses[1]),
|
|
]
|
|
self.devices = [
|
|
Device(
|
|
address=addresses[0],
|
|
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
|
|
),
|
|
Device(
|
|
address=addresses[1],
|
|
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
|
|
),
|
|
]
|
|
|
|
self.paired = [
|
|
asyncio.get_event_loop().create_future(),
|
|
asyncio.get_event_loop().create_future(),
|
|
]
|
|
|
|
def on_connection(self, which, connection):
|
|
self.connections[which] = connection
|
|
|
|
def on_paired(self, which: int, keys: PairingKeys):
|
|
self.paired[which].set_result(keys)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_connection():
|
|
# Create two devices, each with a controller, attached to the same link
|
|
two_devices = TwoDevices()
|
|
|
|
# Attach listeners
|
|
two_devices.devices[0].on(
|
|
'connection', lambda connection: two_devices.on_connection(0, connection)
|
|
)
|
|
two_devices.devices[1].on(
|
|
'connection', lambda connection: two_devices.on_connection(1, connection)
|
|
)
|
|
|
|
# Start
|
|
await two_devices.devices[0].power_on()
|
|
await two_devices.devices[1].power_on()
|
|
|
|
# Connect the two devices
|
|
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
|
|
|
|
# Check the post conditions
|
|
assert two_devices.connections[0] is not None
|
|
assert two_devices.connections[1] is not None
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
'responder_role,',
|
|
(BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
|
|
)
|
|
async def test_self_classic_connection(responder_role):
|
|
# Create two devices, each with a controller, attached to the same link
|
|
two_devices = TwoDevices()
|
|
|
|
# Attach listeners
|
|
two_devices.devices[0].on(
|
|
'connection', lambda connection: two_devices.on_connection(0, connection)
|
|
)
|
|
two_devices.devices[1].on(
|
|
'connection', lambda connection: two_devices.on_connection(1, connection)
|
|
)
|
|
|
|
# Enable Classic connections
|
|
two_devices.devices[0].classic_enabled = True
|
|
two_devices.devices[1].classic_enabled = True
|
|
|
|
# Start
|
|
await two_devices.devices[0].power_on()
|
|
await two_devices.devices[1].power_on()
|
|
|
|
# Connect the two devices
|
|
await asyncio.gather(
|
|
two_devices.devices[0].connect(
|
|
two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
|
|
),
|
|
two_devices.devices[1].accept(
|
|
two_devices.devices[0].public_address, responder_role
|
|
),
|
|
)
|
|
|
|
# Check the post conditions
|
|
assert two_devices.connections[0] is not None
|
|
assert two_devices.connections[1] is not None
|
|
|
|
# Check the role
|
|
assert two_devices.connections[0].role != responder_role
|
|
assert two_devices.connections[1].role == responder_role
|
|
|
|
# Role switch
|
|
await two_devices.connections[0].switch_role(responder_role)
|
|
|
|
# Check the role
|
|
assert two_devices.connections[0].role == responder_role
|
|
assert two_devices.connections[1].role != responder_role
|
|
|
|
await two_devices.connections[0].disconnect()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_gatt():
|
|
# Create two devices, each with a controller, attached to the same link
|
|
two_devices = TwoDevices()
|
|
|
|
# Add some GATT characteristics to device 1
|
|
c1 = Characteristic(
|
|
'3A143AD7-D4A7-436B-97D6-5B62C315E833',
|
|
Characteristic.Properties.READ,
|
|
Characteristic.READABLE,
|
|
bytes([1, 2, 3]),
|
|
)
|
|
c2 = Characteristic(
|
|
'9557CCE2-DB37-46EB-94C4-50AE5B9CB0F8',
|
|
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
|
|
Characteristic.READABLE | Characteristic.WRITEABLE,
|
|
bytes([4, 5, 6]),
|
|
)
|
|
c3 = Characteristic(
|
|
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
|
|
Characteristic.Properties.READ
|
|
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
|
|
Characteristic.READABLE | Characteristic.WRITEABLE,
|
|
bytes([7, 8, 9]),
|
|
)
|
|
c4 = Characteristic(
|
|
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
|
|
Characteristic.Properties.READ
|
|
| Characteristic.Properties.NOTIFY
|
|
| Characteristic.Properties.INDICATE,
|
|
Characteristic.READABLE,
|
|
bytes([1, 1, 1]),
|
|
)
|
|
|
|
s1 = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', [c1, c2, c3])
|
|
s2 = Service('97210A0F-1875-4D05-9E5D-326EB171257A', [c4])
|
|
s3 = Service('1853', [])
|
|
s4 = Service('3A12C182-14E2-4FE0-8C5B-65D7C569F9DB', [], included_services=[s2, s3])
|
|
two_devices.devices[1].add_services([s1, s2, s4])
|
|
|
|
# Start
|
|
await two_devices.devices[0].power_on()
|
|
await two_devices.devices[1].power_on()
|
|
|
|
# Connect the two devices
|
|
connection = await two_devices.devices[0].connect(
|
|
two_devices.devices[1].random_address
|
|
)
|
|
peer = Peer(connection)
|
|
|
|
bogus_uuid = 'A0AA6007-0B48-4BBE-80AC-0DE9AAF541EA'
|
|
result = await peer.discover_services([bogus_uuid])
|
|
assert result == []
|
|
services = peer.get_services_by_uuid(bogus_uuid)
|
|
assert len(services) == 0
|
|
|
|
result = await peer.discover_service(s1.uuid)
|
|
assert len(result) == 1
|
|
services = peer.get_services_by_uuid(s1.uuid)
|
|
assert len(services) == 1
|
|
s = services[0]
|
|
assert services[0].uuid == s1.uuid
|
|
|
|
result = await peer.discover_characteristics([c1.uuid], s)
|
|
assert len(result) == 1
|
|
characteristics = peer.get_characteristics_by_uuid(c1.uuid)
|
|
assert len(characteristics) == 1
|
|
c = characteristics[0]
|
|
assert c.uuid == c1.uuid
|
|
result = await peer.read_value(c)
|
|
assert result is not None
|
|
assert result == c1.value
|
|
|
|
result = await peer.discover_service(s4.uuid)
|
|
assert len(result) == 1
|
|
result = await peer.discover_included_services(result[0])
|
|
assert len(result) == 2
|
|
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID
|
|
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_gatt_long_read():
|
|
# Create two devices, each with a controller, attached to the same link
|
|
two_devices = TwoDevices()
|
|
|
|
# Add some GATT characteristics to device 1
|
|
characteristics = [
|
|
Characteristic(
|
|
f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}',
|
|
Characteristic.Properties.READ,
|
|
Characteristic.READABLE,
|
|
bytes([x & 255 for x in range(i)]),
|
|
)
|
|
for i in range(0, 513)
|
|
]
|
|
|
|
service = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', characteristics)
|
|
two_devices.devices[1].add_service(service)
|
|
|
|
# Start
|
|
await two_devices.devices[0].power_on()
|
|
await two_devices.devices[1].power_on()
|
|
|
|
# Connect the two devices
|
|
connection = await two_devices.devices[0].connect(
|
|
two_devices.devices[1].random_address
|
|
)
|
|
peer = Peer(connection)
|
|
|
|
result = await peer.discover_service(service.uuid)
|
|
assert len(result) == 1
|
|
found_service = result[0]
|
|
found_characteristics = await found_service.discover_characteristics()
|
|
assert len(found_characteristics) == 513
|
|
for i, characteristic in enumerate(found_characteristics):
|
|
value = await characteristic.read_value()
|
|
assert value == characteristics[i].value
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
|
|
# Create two devices, each with a controller, attached to the same link
|
|
two_devices = TwoDevices()
|
|
|
|
# Start
|
|
await two_devices.devices[0].power_on()
|
|
await two_devices.devices[1].power_on()
|
|
|
|
# Attach listeners
|
|
two_devices.devices[0].on(
|
|
'connection', lambda connection: two_devices.on_connection(0, connection)
|
|
)
|
|
two_devices.devices[1].on(
|
|
'connection', lambda connection: two_devices.on_connection(1, connection)
|
|
)
|
|
|
|
# Connect the two devices
|
|
connection = await two_devices.devices[0].connect(
|
|
two_devices.devices[1].random_address
|
|
)
|
|
assert not connection.is_encrypted
|
|
|
|
# Attach connection listeners
|
|
two_devices.connections[0].on(
|
|
'pairing', lambda keys: two_devices.on_paired(0, keys)
|
|
)
|
|
two_devices.connections[1].on(
|
|
'pairing', lambda keys: two_devices.on_paired(1, keys)
|
|
)
|
|
|
|
# Set up the pairing configs
|
|
if pairing_config1:
|
|
two_devices.devices[
|
|
0
|
|
].pairing_config_factory = lambda connection: pairing_config1
|
|
if pairing_config2:
|
|
two_devices.devices[
|
|
1
|
|
].pairing_config_factory = lambda connection: pairing_config2
|
|
|
|
# Pair
|
|
await two_devices.devices[0].pair(connection)
|
|
assert connection.is_encrypted
|
|
assert await two_devices.paired[0] is not None
|
|
assert await two_devices.paired[1] is not None
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
IO_CAP = [
|
|
PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT,
|
|
PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
|
|
PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY,
|
|
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
|
|
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
|
|
]
|
|
SC = [False, True]
|
|
MITM = [False, True]
|
|
# Key distribution is a 4-bit bitmask
|
|
KEY_DIST = range(16)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
'io_caps, sc, mitm, key_dist',
|
|
itertools.chain(
|
|
itertools.product([IO_CAP], SC, MITM, [15]),
|
|
itertools.product(
|
|
[[PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]],
|
|
SC,
|
|
MITM,
|
|
KEY_DIST,
|
|
),
|
|
),
|
|
)
|
|
async def test_self_smp(io_caps, sc, mitm, key_dist):
|
|
class Delegate(PairingDelegate):
|
|
def __init__(
|
|
self,
|
|
name,
|
|
io_capability,
|
|
local_initiator_key_distribution,
|
|
local_responder_key_distribution,
|
|
):
|
|
super().__init__(
|
|
io_capability,
|
|
local_initiator_key_distribution,
|
|
local_responder_key_distribution,
|
|
)
|
|
self.name = name
|
|
self.reset()
|
|
|
|
def reset(self):
|
|
self.peer_delegate = None
|
|
self.number = asyncio.get_running_loop().create_future()
|
|
|
|
# pylint: disable-next=unused-argument
|
|
async def compare_numbers(self, number, digits):
|
|
if self.peer_delegate is None:
|
|
logger.warning(f'[{self.name}] no peer delegate')
|
|
return False
|
|
await self.display_number(number, digits=6)
|
|
logger.debug(f'[{self.name}] waiting for peer number')
|
|
peer_number = await self.peer_delegate.number
|
|
logger.debug(f'[{self.name}] comparing numbers: {number} and {peer_number}')
|
|
return number == peer_number
|
|
|
|
async def get_number(self):
|
|
if self.peer_delegate is None:
|
|
logger.warning(f'[{self.name}] no peer delegate')
|
|
return 0
|
|
else:
|
|
if (
|
|
self.peer_delegate.io_capability
|
|
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
|
|
):
|
|
peer_number = 6789
|
|
else:
|
|
logger.debug(f'[{self.name}] waiting for peer number')
|
|
peer_number = await self.peer_delegate.number
|
|
logger.debug(f'[{self.name}] returning number: {peer_number}')
|
|
return peer_number
|
|
|
|
async def display_number(self, number, digits):
|
|
logger.debug(f'[{self.name}] displaying number: {number}')
|
|
self.number.set_result(number)
|
|
|
|
def __str__(self):
|
|
return f'Delegate(name={self.name}, io_capability={self.io_capability})'
|
|
|
|
pairing_config_sets = [('Initiator', [None]), ('Responder', [None])]
|
|
for pairing_config_set in pairing_config_sets:
|
|
for io_cap in io_caps:
|
|
delegate = Delegate(pairing_config_set[0], io_cap, key_dist, key_dist)
|
|
pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate))
|
|
|
|
for pairing_config1 in pairing_config_sets[0][1]:
|
|
for pairing_config2 in pairing_config_sets[1][1]:
|
|
logger.info(
|
|
f'########## self_smp with {pairing_config1} and {pairing_config2}'
|
|
)
|
|
if pairing_config1:
|
|
pairing_config1.delegate.reset()
|
|
if pairing_config2:
|
|
pairing_config2.delegate.reset()
|
|
if pairing_config1 and pairing_config2:
|
|
pairing_config1.delegate.peer_delegate = pairing_config2.delegate
|
|
pairing_config2.delegate.peer_delegate = pairing_config1.delegate
|
|
|
|
await _test_self_smp_with_configs(pairing_config1, pairing_config2)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_smp_reject():
|
|
class RejectingDelegate(PairingDelegate):
|
|
def __init__(self):
|
|
super().__init__(PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT)
|
|
|
|
async def accept(self):
|
|
return False
|
|
|
|
rejecting_pairing_config = PairingConfig(delegate=RejectingDelegate())
|
|
paired = False
|
|
try:
|
|
await _test_self_smp_with_configs(None, rejecting_pairing_config)
|
|
paired = True
|
|
except ProtocolError as error:
|
|
assert error.error_code == SMP_PAIRING_NOT_SUPPORTED_ERROR
|
|
|
|
assert not paired
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_smp_wrong_pin():
|
|
class WrongPinDelegate(PairingDelegate):
|
|
def __init__(self):
|
|
super().__init__(
|
|
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
|
|
)
|
|
|
|
async def compare_numbers(self, number, digits):
|
|
return False
|
|
|
|
wrong_pin_pairing_config = PairingConfig(mitm=True, delegate=WrongPinDelegate())
|
|
paired = False
|
|
try:
|
|
await _test_self_smp_with_configs(
|
|
wrong_pin_pairing_config, wrong_pin_pairing_config
|
|
)
|
|
paired = True
|
|
except ProtocolError as error:
|
|
assert error.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
|
|
|
|
assert not paired
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_smp_over_classic():
|
|
# Create two devices, each with a controller, attached to the same link
|
|
two_devices = TwoDevices()
|
|
|
|
# Attach listeners
|
|
two_devices.devices[0].on(
|
|
'connection', lambda connection: two_devices.on_connection(0, connection)
|
|
)
|
|
two_devices.devices[1].on(
|
|
'connection', lambda connection: two_devices.on_connection(1, connection)
|
|
)
|
|
|
|
# Enable Classic connections
|
|
two_devices.devices[0].classic_enabled = True
|
|
two_devices.devices[1].classic_enabled = True
|
|
|
|
# Start
|
|
await two_devices.devices[0].power_on()
|
|
await two_devices.devices[1].power_on()
|
|
|
|
# Connect the two devices
|
|
await asyncio.gather(
|
|
two_devices.devices[0].connect(
|
|
two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
|
|
),
|
|
two_devices.devices[1].accept(two_devices.devices[0].public_address),
|
|
)
|
|
|
|
# Check the post conditions
|
|
assert two_devices.connections[0] is not None
|
|
assert two_devices.connections[1] is not None
|
|
|
|
# Mock connection
|
|
# TODO: Implement Classic SSP and encryption in link relayer
|
|
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
|
|
two_devices.devices[0].get_link_key = AsyncMock(return_value=LINK_KEY)
|
|
two_devices.devices[1].get_link_key = AsyncMock(return_value=LINK_KEY)
|
|
two_devices.connections[0].encryption = 1
|
|
two_devices.connections[1].encryption = 1
|
|
|
|
two_devices.connections[0].on(
|
|
'pairing', lambda keys: two_devices.on_paired(0, keys)
|
|
)
|
|
two_devices.connections[1].on(
|
|
'pairing', lambda keys: two_devices.on_paired(1, keys)
|
|
)
|
|
|
|
# Mock SMP
|
|
with patch('bumble.smp.Session', spec=True) as MockSmpSession:
|
|
MockSmpSession.send_pairing_confirm_command = MagicMock()
|
|
MockSmpSession.send_pairing_dhkey_check_command = MagicMock()
|
|
MockSmpSession.send_public_key_command = MagicMock()
|
|
MockSmpSession.send_pairing_random_command = MagicMock()
|
|
|
|
# Start CTKD
|
|
await two_devices.connections[0].pair()
|
|
await asyncio.gather(*two_devices.paired)
|
|
|
|
# Phase 2 commands should not be invoked
|
|
MockSmpSession.send_pairing_confirm_command.assert_not_called()
|
|
MockSmpSession.send_pairing_dhkey_check_command.assert_not_called()
|
|
MockSmpSession.send_public_key_command.assert_not_called()
|
|
MockSmpSession.send_pairing_random_command.assert_not_called()
|
|
|
|
for i in range(2):
|
|
assert (
|
|
await two_devices.devices[i].keystore.get(
|
|
str(two_devices.connections[i].peer_address)
|
|
)
|
|
).link_key
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_smp_public_address():
|
|
pairing_config = PairingConfig(
|
|
mitm=True,
|
|
sc=True,
|
|
bonding=True,
|
|
identity_address_type=PairingConfig.AddressType.PUBLIC,
|
|
delegate=PairingDelegate(
|
|
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
|
|
PairingDelegate.KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
|
|
| PairingDelegate.KeyDistribution.DISTRIBUTE_IDENTITY_KEY
|
|
| PairingDelegate.KeyDistribution.DISTRIBUTE_SIGNING_KEY
|
|
| PairingDelegate.KeyDistribution.DISTRIBUTE_LINK_KEY,
|
|
),
|
|
)
|
|
|
|
await _test_self_smp_with_configs(pairing_config, pairing_config)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_smp_oob_sc():
|
|
oob_context_1 = OobContext()
|
|
oob_context_2 = OobContext()
|
|
|
|
pairing_config_1 = PairingConfig(
|
|
mitm=True,
|
|
sc=True,
|
|
bonding=True,
|
|
oob=PairingConfig.OobConfig(oob_context_1, oob_context_2.share(), None),
|
|
)
|
|
|
|
pairing_config_2 = PairingConfig(
|
|
mitm=True,
|
|
sc=True,
|
|
bonding=True,
|
|
oob=PairingConfig.OobConfig(oob_context_2, oob_context_1.share(), None),
|
|
)
|
|
|
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
|
|
|
|
pairing_config_3 = PairingConfig(
|
|
mitm=True,
|
|
sc=True,
|
|
bonding=True,
|
|
oob=PairingConfig.OobConfig(oob_context_2, None, None),
|
|
)
|
|
|
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_3)
|
|
await _test_self_smp_with_configs(pairing_config_3, pairing_config_1)
|
|
|
|
pairing_config_4 = PairingConfig(
|
|
mitm=True,
|
|
sc=True,
|
|
bonding=True,
|
|
oob=PairingConfig.OobConfig(oob_context_2, oob_context_2.share(), None),
|
|
)
|
|
|
|
with pytest.raises(ProtocolError) as error:
|
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_4)
|
|
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
|
|
|
|
with pytest.raises(ProtocolError):
|
|
await _test_self_smp_with_configs(pairing_config_4, pairing_config_1)
|
|
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
async def test_self_smp_oob_legacy():
|
|
legacy_context = OobLegacyContext()
|
|
|
|
pairing_config_1 = PairingConfig(
|
|
mitm=True,
|
|
sc=False,
|
|
bonding=True,
|
|
oob=PairingConfig.OobConfig(None, None, legacy_context),
|
|
)
|
|
|
|
pairing_config_2 = PairingConfig(
|
|
mitm=True,
|
|
sc=True,
|
|
bonding=True,
|
|
oob=PairingConfig.OobConfig(OobContext(), None, legacy_context),
|
|
)
|
|
|
|
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
|
|
await _test_self_smp_with_configs(pairing_config_2, pairing_config_1)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def run_test_self():
|
|
await test_self_connection()
|
|
await test_self_gatt()
|
|
await test_self_gatt_long_read()
|
|
await test_self_smp()
|
|
await test_self_smp_reject()
|
|
await test_self_smp_wrong_pin()
|
|
await test_self_smp_over_classic()
|
|
await test_self_smp_public_address()
|
|
await test_self_smp_oob_sc()
|
|
await test_self_smp_oob_legacy()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
|
asyncio.run(run_test_self())
|