refactor PairingDelegate

This commit is contained in:
Gilles Boccon-Gibod
2023-04-11 20:25:54 -07:00
parent bf3e05ef91
commit 3442358dea
6 changed files with 301 additions and 172 deletions

View File

@@ -24,7 +24,7 @@ from prompt_toolkit.shortcuts import PromptSession
from bumble.colors import color from bumble.colors import color
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.smp import PairingDelegate, PairingConfig from bumble.pairing import PairingDelegate, PairingConfig
from bumble.smp import error_name as smp_error_name from bumble.smp import error_name as smp_error_name
from bumble.keys import JsonKeyStore from bumble.keys import JsonKeyStore
from bumble.core import ProtocolError from bumble.core import ProtocolError
@@ -345,8 +345,13 @@ async def pair(
print(color(f'Pairing failed: {error}', 'red')) print(color(f'Pairing failed: {error}', 'red'))
return return
else: else:
if mode == 'le':
# Advertise so that peers can find us and connect # Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True) await device.start_advertising(auto_restart=True)
else:
# Become discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
# Run until the user asks to exit # Run until the user asks to exit
await Waiter.instance.wait_until_terminated() await Waiter.instance.wait_until_terminated()

View File

@@ -29,11 +29,13 @@ from .colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
from .gatt import Characteristic, Descriptor, Service from .gatt import Characteristic, Descriptor, Service
from .hci import ( from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE, HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING, HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY, HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_EXTENDED_INQUIRY_MODE, HCI_EXTENDED_INQUIRY_MODE,
HCI_GENERAL_INQUIRY_LAP, HCI_GENERAL_INQUIRY_LAP,
HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
@@ -141,6 +143,7 @@ from .keys import (
KeyStore, KeyStore,
PairingKeys, PairingKeys,
) )
from .pairing import PairingConfig
from . import gatt_client from . import gatt_client
from . import gatt_server from . import gatt_server
from . import smp from . import smp
@@ -198,6 +201,7 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
# Classes # Classes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Advertisement: class Advertisement:
address: Address address: Address
@@ -530,6 +534,8 @@ class Connection(CompositeEventEmitter):
sc: bool sc: bool
link_key_type: int link_key_type: int
gatt_client: gatt_client.Client gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
@composite_listener @composite_listener
class Listener: class Listener:
@@ -593,6 +599,8 @@ class Connection(CompositeEventEmitter):
self.gatt_server = ( self.gatt_server = (
device.gatt_server device.gatt_server
) # By default, use the device's shared server ) # By default, use the device's shared server
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
# [Classic only] # [Classic only]
@classmethod @classmethod
@@ -1049,7 +1057,10 @@ class Device(CompositeEventEmitter):
self.random_address = address self.random_address = address
# Setup SMP # Setup SMP
self.smp_manager = smp.Manager(self) self.smp_manager = smp.Manager(
self, pairing_config_factory=lambda connection: PairingConfig()
)
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu) self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
# Register the SDP server with the L2CAP Channel Manager # Register the SDP server with the L2CAP Channel Manager
@@ -1240,7 +1251,7 @@ class Device(CompositeEventEmitter):
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg] await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
resolving_keys = await self.keystore.get_resolving_keys() resolving_keys = await self.keystore.get_resolving_keys()
for (irk, address) in resolving_keys: for irk, address in resolving_keys:
await self.send_command( await self.send_command(
HCI_LE_Add_Device_To_Resolving_List_Command( HCI_LE_Add_Device_To_Resolving_List_Command(
peer_identity_address_type=address.address_type, peer_identity_address_type=address.address_type,
@@ -2228,8 +2239,9 @@ class Device(CompositeEventEmitter):
if keys is not None: if keys is not None:
logger.debug('found keys in the key store') logger.debug('found keys in the key store')
if keys.link_key is None: if keys.link_key is None:
logger.debug('no link key') logger.warning('no link key')
return None return None
return keys.link_key.value return keys.link_key.value
# [Classic only] # [Classic only]
@@ -2434,8 +2446,14 @@ class Device(CompositeEventEmitter):
def on_link_key(self, bd_addr, link_key, key_type): def on_link_key(self, bd_addr, link_key, key_type):
# Store the keys in the key store # Store the keys in the key store
if self.keystore: if self.keystore:
authenticated = key_type in (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
pairing_keys = PairingKeys() pairing_keys = PairingKeys()
pairing_keys.link_key = PairingKeys.Key(value=link_key) pairing_keys.link_key = PairingKeys.Key(
value=link_key, authenticated=authenticated
)
async def store_keys(): async def store_keys():
try: try:
@@ -2701,7 +2719,7 @@ class Device(CompositeEventEmitter):
# On Secure Simple Pairing complete, in case: # On Secure Simple Pairing complete, in case:
# - Connection isn't already authenticated # - Connection isn't already authenticated
# - AND we are not the initiator of the authentication # - AND we are not the initiator of the authentication
# We must trigger authentication to known if we are truly authenticated # We must trigger authentication to know if we are truly authenticated
if not connection.authenticating and not connection.authenticated: if not connection.authenticating and not connection.authenticated:
logger.debug( logger.debug(
f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] ' f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] '
@@ -2716,22 +2734,6 @@ class Device(CompositeEventEmitter):
# Ask what the pairing config should be for this connection # Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection) pairing_config = self.pairing_config_factory(connection)
# Map the SMP IO capability to a Classic IO capability
# pylint: disable=line-too-long
io_capability = {
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
}.get(pairing_config.delegate.io_capability)
if io_capability is None:
logger.warning(
f'cannot map IO capability ({pairing_config.delegate.io_capability}'
)
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# Compute the authentication requirements # Compute the authentication requirements
authentication_requirements = ( authentication_requirements = (
# No Bonding # No Bonding
@@ -2750,53 +2752,50 @@ class Device(CompositeEventEmitter):
self.host.send_command_sync( self.host.send_command_sync(
HCI_IO_Capability_Request_Reply_Command( HCI_IO_Capability_Request_Reply_Command(
bd_addr=connection.peer_address, bd_addr=connection.peer_address,
io_capability=io_capability, io_capability=pairing_config.delegate.classic_io_capability,
oob_data_present=0x00, # Not present oob_data_present=0x00, # Not present
authentication_requirements=authentication_requirements, authentication_requirements=authentication_requirements,
) )
) )
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_response(
self, connection, io_capability, authentication_requirements
):
connection.peer_pairing_io_capability = io_capability
connection.peer_pairing_authentication_requirements = (
authentication_requirements
)
# [Classic only] # [Classic only]
@host_event_handler @host_event_handler
@with_connection_from_address @with_connection_from_address
def on_authentication_user_confirmation_request(self, connection, code): def on_authentication_user_confirmation_request(self, connection, code):
# Ask what the pairing config should be for this connection # Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection) pairing_config = self.pairing_config_factory(connection)
io_capability = pairing_config.delegate.classic_io_capability
can_compare = pairing_config.delegate.io_capability not in (
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY,
)
# Respond # Respond
if can_compare: if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY:
if connection.peer_pairing_io_capability in (
async def compare_numbers(): HCI_DISPLAY_YES_NO_IO_CAPABILITY,
numbers_match = await connection.abort_on( HCI_DISPLAY_ONLY_IO_CAPABILITY,
'disconnection', ):
pairing_config.delegate.compare_numbers(code, digits=6), # Display the code and ask the user to compare
) async def prompt():
if numbers_match: return (
await self.host.send_command( await pairing_config.delegate.compare_numbers(code, digits=6),
HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address
)
)
else:
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
) )
asyncio.create_task(compare_numbers())
else: else:
# Ask the user to confirm the pairing, without showing a code
async def prompt():
return await pairing_config.delegate.confirm()
async def confirm(): async def confirm():
confirm = await connection.abort_on( if await prompt():
'disconnection', pairing_config.delegate.confirm()
)
if confirm:
await self.host.send_command( await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command( HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address bd_addr=connection.peer_address
@@ -2809,7 +2808,17 @@ class Device(CompositeEventEmitter):
) )
) )
asyncio.create_task(confirm()) AsyncRunner.spawn(confirm())
return
if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY:
# Display the code to the user
AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6))
# Automatic confirmation
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only] # [Classic only]
@host_event_handler @host_event_handler
@@ -2817,15 +2826,11 @@ class Device(CompositeEventEmitter):
def on_authentication_user_passkey_request(self, connection): def on_authentication_user_passkey_request(self, connection):
# Ask what the pairing config should be for this connection # Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection) pairing_config = self.pairing_config_factory(connection)
io_capability = pairing_config.delegate.classic_io_capability
can_input = pairing_config.delegate.io_capability in (
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
)
# Respond # Respond
if can_input: if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
# Ask the user to input a number
async def get_number(): async def get_number():
number = await connection.abort_on( number = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number() 'disconnection', pairing_config.delegate.get_number()
@@ -2855,18 +2860,14 @@ class Device(CompositeEventEmitter):
@host_event_handler @host_event_handler
@with_connection_from_address @with_connection_from_address
def on_pin_code_request(self, connection): def on_pin_code_request(self, connection):
# classic legacy pairing # Classic legacy pairing
# Ask what the pairing config should be for this connection # Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection) pairing_config = self.pairing_config_factory(connection)
io_capability = pairing_config.delegate.classic_io_capability
can_input = pairing_config.delegate.io_capability in ( # Respond
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY, if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY, # Ask the user to enter a string
)
# respond the pin code
if can_input:
async def get_pin_code(): async def get_pin_code():
pin_code = await connection.abort_on( pin_code = await connection.abort_on(
'disconnection', pairing_config.delegate.get_string(16) 'disconnection', pairing_config.delegate.get_string(16)
@@ -2906,6 +2907,7 @@ class Device(CompositeEventEmitter):
# Ask what the pairing config should be for this connection # Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection) pairing_config = self.pairing_config_factory(connection)
# Show the passkey to the user
connection.abort_on( connection.abort_on(
'disconnection', pairing_config.delegate.display_number(passkey) 'disconnection', pairing_config.delegate.display_number(passkey)
) )

View File

@@ -395,8 +395,8 @@ class Host(AbortableEventEmitter):
def supports_command(self, command): def supports_command(self, command):
# Find the support flag position for this command # Find the support flag position for this command
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS): for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for (flag_position, value) in enumerate(flags): for flag_position, value in enumerate(flags):
if value == command: if value == command:
# Check if the flag is set # Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8: if octet < len(self.local_supported_commands) and flag_position < 8:
@@ -409,7 +409,7 @@ class Host(AbortableEventEmitter):
@property @property
def supported_commands(self): def supported_commands(self):
commands = [] commands = []
for (octet, flags) in enumerate(self.local_supported_commands): for octet, flags in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS): if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8): for flag in range(8):
if flags & (1 << flag) != 0: if flags & (1 << flag) != 0:
@@ -839,7 +839,12 @@ class Host(AbortableEventEmitter):
self.emit('authentication_io_capability_request', event.bd_addr) self.emit('authentication_io_capability_request', event.bd_addr)
def on_hci_io_capability_response_event(self, event): def on_hci_io_capability_response_event(self, event):
pass self.emit(
'authentication_io_capability_response',
event.bd_addr,
event.io_capability,
event.authentication_requirements,
)
def on_hci_user_confirmation_request_event(self, event): def on_hci_user_confirmation_request_event(self, event):
self.emit( self.emit(

184
bumble/pairing.py Normal file
View File

@@ -0,0 +1,184 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import enum
from typing import Optional, Tuple
from .hci import (
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
)
from .smp import (
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
SMP_DISPLAY_ONLY_IO_CAPABILITY,
SMP_DISPLAY_YES_NO_IO_CAPABILITY,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
SMP_ENC_KEY_DISTRIBUTION_FLAG,
SMP_ID_KEY_DISTRIBUTION_FLAG,
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
SMP_LINK_KEY_DISTRIBUTION_FLAG,
)
# -----------------------------------------------------------------------------
class PairingDelegate:
"""Abstract base class for Pairing Delegates."""
# I/O Capabilities.
# These are defined abstractly, and can be mapped to specific Classic pairing
# and/or SMP constants.
class IoCapability(enum.IntEnum):
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
# Direct names for backward compatibility.
NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT
KEYBOARD_INPUT_ONLY = IoCapability.KEYBOARD_INPUT_ONLY
DISPLAY_OUTPUT_ONLY = IoCapability.DISPLAY_OUTPUT_ONLY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
# Key Distribution [LE only]
class KeyDistribution(enum.IntFlag):
DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG
DEFAULT_KEY_DISTRIBUTION: int = (
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
)
# Default mapping from abstract to Classic I/O capabilities.
# Subclasses may override this if they prefer a different mapping.
CLASSIC_IO_CAPABILITIES_MAP = {
NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
}
io_capability: IoCapability
local_initiator_key_distribution: KeyDistribution
local_responder_key_distribution: KeyDistribution
def __init__(
self,
io_capability=NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
@property
def classic_io_capability(self) -> int:
"""Map the abstract I/O capability to a Classic constant."""
# pylint: disable=line-too-long
return self.CLASSIC_IO_CAPABILITIES_MAP.get(
self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
)
@property
def smp_io_capability(self) -> int:
"""Map the abstract I/O capability to an SMP constant."""
# This is just a 1-1 direct mapping
return self.io_capability
async def accept(self) -> bool:
"""Accept or reject a Pairing request."""
return True
async def confirm(self) -> bool:
"""Respond yes or not to a Pairing confirmation question."""
return True
# pylint: disable-next=unused-argument
async def compare_numbers(self, number: int, digits: int) -> bool:
"""Compare two numbers."""
return True
async def get_number(self) -> Optional[int]:
"""
Return an optional number as an answer to a passkey request.
Returning `None` will result in a negative reply.
"""
return 0
async def get_string(self, max_length) -> Optional[str]:
"""
Return a string whose utf-8 encoding is up to max_length bytes.
"""
return None
# pylint: disable-next=unused-argument
async def display_number(self, number: int, digits: int) -> None:
"""Display a number."""
# [LE only]
async def key_distribution_response(
self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int
) -> Tuple[int, int]:
"""
Return the key distribution response in an SMP protocol context.
NOTE: since it is only used by the SMP protocol, this method's input and output
are directly as integers, using the SMP constants, rather than the abstract
KeyDistribution enums.
"""
return (
int(
peer_initiator_key_distribution & self.local_initiator_key_distribution
),
int(
peer_responder_key_distribution & self.local_responder_key_distribution
),
)
# -----------------------------------------------------------------------------
class PairingConfig:
"""Configuration for the Pairing protocol."""
def __init__(
self,
sc: bool = True,
mitm: bool = True,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
def __str__(self) -> str:
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'delegate[{self.delegate.io_capability}])'
)

View File

@@ -31,7 +31,16 @@ from typing import Dict, Optional, Type
from pyee import EventEmitter from pyee import EventEmitter
from .colors import color from .colors import color
from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value from .hci import (
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
Address,
HCI_LE_Enable_Encryption_Command,
HCI_Object,
key_with_value,
)
from .core import ( from .core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE, BT_CENTRAL_ROLE,
@@ -476,7 +485,7 @@ class AddressResolver:
address_bytes = bytes(address) address_bytes = bytes(address)
hash_part = address_bytes[0:3] hash_part = address_bytes[0:3]
prand = address_bytes[3:6] prand = address_bytes[3:6]
for (irk, resolved_address) in self.resolving_keys: for irk, resolved_address in self.resolving_keys:
local_hash = crypto.ah(irk, prand) local_hash = crypto.ah(irk, prand)
if local_hash == hash_part: if local_hash == hash_part:
# Match! # Match!
@@ -491,86 +500,6 @@ class AddressResolver:
return None return None
# -----------------------------------------------------------------------------
class PairingDelegate:
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
DEFAULT_KEY_DISTRIBUTION: int = (
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
)
def __init__(
self,
io_capability: int = NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
async def accept(self) -> bool:
return True
async def confirm(self) -> bool:
return True
# pylint: disable-next=unused-argument
async def compare_numbers(self, number: int, digits: int) -> bool:
return True
async def get_number(self) -> Optional[int]:
'''
Returns an optional number as an answer to a passkey request.
Returning `None` will result in a negative reply.
'''
return 0
async def get_string(self, max_length) -> Optional[str]:
'''
Returns a string whose utf-8 encoding is up to max_length bytes.
'''
return None
# pylint: disable-next=unused-argument
async def display_number(self, number: int, digits: int) -> None:
pass
async def key_distribution_response(
self, peer_initiator_key_distribution, peer_responder_key_distribution
):
return (
(peer_initiator_key_distribution & self.local_initiator_key_distribution),
(peer_responder_key_distribution & self.local_responder_key_distribution),
)
# -----------------------------------------------------------------------------
class PairingConfig:
def __init__(
self,
sc: bool = True,
mitm: bool = False,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
def __str__(self):
io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability)
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'delegate[{io_capability_str}])'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Session: class Session:
# Pairing methods # Pairing methods
@@ -1662,12 +1591,12 @@ class Manager(EventEmitter):
Implements the Initiator and Responder roles of the Security Manager Protocol Implements the Initiator and Responder roles of the Security Manager Protocol
''' '''
def __init__(self, device): def __init__(self, device, pairing_config_factory):
super().__init__() super().__init__()
self.device = device self.device = device
self.sessions = {} self.sessions = {}
self._ecc_key = None self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig() self.pairing_config_factory = pairing_config_factory
def send_command(self, connection, command): def send_command(self, connection, command):
logger.debug( logger.debug(

View File

@@ -28,9 +28,8 @@ from bumble.device import Device, Peer
from bumble.host import Host from bumble.host import Host
from bumble.gatt import Service, Characteristic from bumble.gatt import Service, Characteristic
from bumble.transport import AsyncPipeSink from bumble.transport import AsyncPipeSink
from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import ( from bumble.smp import (
PairingConfig,
PairingDelegate,
SMP_PAIRING_NOT_SUPPORTED_ERROR, SMP_PAIRING_NOT_SUPPORTED_ERROR,
SMP_CONFIRM_VALUE_FAILED_ERROR, SMP_CONFIRM_VALUE_FAILED_ERROR,
) )
@@ -262,7 +261,7 @@ async def test_self_gatt_long_read():
found_service = result[0] found_service = result[0]
found_characteristics = await found_service.discover_characteristics() found_characteristics = await found_service.discover_characteristics()
assert len(found_characteristics) == 513 assert len(found_characteristics) == 513
for (i, characteristic) in enumerate(found_characteristics): for i, characteristic in enumerate(found_characteristics):
value = await characteristic.read_value() value = await characteristic.read_value()
assert value == characteristics[i].value assert value == characteristics[i].value
@@ -317,11 +316,11 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
IO_CAP = [ IO_CAP = [
PairingDelegate.NO_OUTPUT_NO_INPUT, PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT,
PairingDelegate.KEYBOARD_INPUT_ONLY, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_ONLY, PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
] ]
SC = [False, True] SC = [False, True]
MITM = [False, True] MITM = [False, True]
@@ -335,7 +334,10 @@ KEY_DIST = range(16)
itertools.chain( itertools.chain(
itertools.product([IO_CAP], SC, MITM, [15]), itertools.product([IO_CAP], SC, MITM, [15]),
itertools.product( itertools.product(
[[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST [[PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]],
SC,
MITM,
KEY_DIST,
), ),
), ),
) )
@@ -378,7 +380,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
else: else:
if ( if (
self.peer_delegate.io_capability self.peer_delegate.io_capability
== PairingDelegate.KEYBOARD_INPUT_ONLY == PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
): ):
peer_number = 6789 peer_number = 6789
else: else:
@@ -421,7 +423,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
async def test_self_smp_reject(): async def test_self_smp_reject():
class RejectingDelegate(PairingDelegate): class RejectingDelegate(PairingDelegate):
def __init__(self): def __init__(self):
super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT) super().__init__(PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT)
async def accept(self): async def accept(self):
return False return False
@@ -442,7 +444,9 @@ async def test_self_smp_reject():
async def test_self_smp_wrong_pin(): async def test_self_smp_wrong_pin():
class WrongPinDelegate(PairingDelegate): class WrongPinDelegate(PairingDelegate):
def __init__(self): def __init__(self):
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT) super().__init__(
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
)
async def compare_numbers(self, number, digits): async def compare_numbers(self, number, digits):
return False return False