Compare commits

..

9 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
51d3a869a4 Merge pull request #183 from google/gbg/181
fix keystore save implementation for windows
2023-04-30 13:59:28 -07:00
Gilles Boccon-Gibod
dd930e3bde fix implementation for Windows 2023-04-30 11:42:28 -07:00
Gilles Boccon-Gibod
9af426db45 Merge pull request #177 from google/gbg/pairing-delegate-refactor
refactor PairingDelegate
2023-04-18 15:07:23 -07:00
Gilles Boccon-Gibod
4286b2ab59 address PR comments 2023-04-18 15:05:18 -07:00
Gilles Boccon-Gibod
3442358dea refactor PairingDelegate 2023-04-18 15:04:53 -07:00
Gilles Boccon-Gibod
bf3e05ef91 Merge pull request #174 from google/gbg/pairing-fix
fix role state for classic connections
2023-04-11 20:32:29 -07:00
Gilles Boccon-Gibod
5351ab8a42 Merge pull request #176 from google/gbg/connection-defaults
only use 1M parameters by default
2023-04-11 20:26:42 -07:00
Gilles Boccon-Gibod
2c2f512180 add comment to explain the initial role choice 2023-04-07 12:19:28 -07:00
Gilles Boccon-Gibod
859aea5a99 fix role state for classic connections 2023-04-07 10:24:26 -07:00
9 changed files with 361 additions and 212 deletions

View File

@@ -24,7 +24,7 @@ from prompt_toolkit.shortcuts import PromptSession
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link
from bumble.smp import PairingDelegate, PairingConfig
from bumble.pairing import PairingDelegate, PairingConfig
from bumble.smp import error_name as smp_error_name
from bumble.keys import JsonKeyStore
from bumble.core import ProtocolError
@@ -264,6 +264,7 @@ async def pair(
sc,
mitm,
bond,
ctkd,
io,
prompt,
request,
@@ -317,6 +318,7 @@ async def pair(
if mode == 'classic':
device.classic_enabled = True
device.le_enabled = False
device.classic_smp_enabled = ctkd
# Get things going
await device.power_on()
@@ -343,8 +345,13 @@ async def pair(
print(color(f'Pairing failed: {error}', 'red'))
return
else:
# Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True)
if mode == 'le':
# Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True)
else:
# Become discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
# Run until the user asks to exit
await Waiter.instance.wait_until_terminated()
@@ -379,6 +386,13 @@ class LogHandler(logging.Handler):
@click.option(
'--bond', type=bool, default=True, help='Enable bonding', show_default=True
)
@click.option(
'--ctkd',
type=bool,
default=True,
help='Enable CTKD',
show_default=True,
)
@click.option(
'--io',
type=click.Choice(
@@ -405,6 +419,7 @@ def main(
sc,
mitm,
bond,
ctkd,
io,
prompt,
request,
@@ -427,6 +442,7 @@ def main(
sc,
mitm,
bond,
ctkd,
io,
prompt,
request,

View File

@@ -29,11 +29,13 @@ from .colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
from .gatt import Characteristic, Descriptor, Service
from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_EXTENDED_INQUIRY_MODE,
HCI_GENERAL_INQUIRY_LAP,
HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
@@ -141,6 +143,7 @@ from .keys import (
KeyStore,
PairingKeys,
)
from .pairing import PairingConfig
from . import gatt_client
from . import gatt_server
from . import smp
@@ -198,6 +201,7 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
# Classes
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class Advertisement:
address: Address
@@ -530,6 +534,8 @@ class Connection(CompositeEventEmitter):
sc: bool
link_key_type: int
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
@composite_listener
class Listener:
@@ -593,10 +599,12 @@ class Connection(CompositeEventEmitter):
self.gatt_server = (
device.gatt_server
) # By default, use the device's shared server
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
# [Classic only]
@classmethod
def incomplete(cls, device, peer_address):
def incomplete(cls, device, peer_address, role):
"""
Instantiate an incomplete connection (ie. one waiting for a HCI Connection
Complete event).
@@ -609,28 +617,30 @@ class Connection(CompositeEventEmitter):
device.public_address,
peer_address,
None,
None,
role,
None,
None,
)
# [Classic only]
def complete(self, handle, peer_resolvable_address, role, parameters):
def complete(self, handle, parameters):
"""
Finish an incomplete connection upon completion.
"""
assert self.handle is None
assert self.transport == BT_BR_EDR_TRANSPORT
self.handle = handle
self.peer_resolvable_address = peer_resolvable_address
# Quirk: role might be known before complete
if self.role is None:
self.role = role
self.parameters = parameters
@property
def role_name(self):
return 'CENTRAL' if self.role == BT_CENTRAL_ROLE else 'PERIPHERAL'
if self.role is None:
return 'NOT-SET'
if self.role == BT_CENTRAL_ROLE:
return 'CENTRAL'
if self.role == BT_PERIPHERAL_ROLE:
return 'PERIPHERAL'
return f'UNKNOWN[{self.role}]'
@property
def is_encrypted(self):
@@ -638,7 +648,7 @@ class Connection(CompositeEventEmitter):
@property
def is_incomplete(self) -> bool:
return self.handle == None
return self.handle is None
def send_l2cap_pdu(self, cid, pdu):
self.device.send_l2cap_pdu(self.handle, cid, pdu)
@@ -751,10 +761,11 @@ class DeviceConfiguration:
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True
self.le_simultaneous_enabled = False
self.classic_enabled = False
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.classic_smp_enabled = True
self.classic_accept_any = True
self.connectable = True
self.discoverable = True
@@ -789,6 +800,9 @@ class DeviceConfiguration:
self.classic_ssp_enabled = config.get(
'classic_ssp_enabled', self.classic_ssp_enabled
)
self.classic_smp_enabled = config.get(
'classic_smp_enabled', self.classic_smp_enabled
)
self.classic_accept_any = config.get(
'classic_accept_any', self.classic_accept_any
)
@@ -998,8 +1012,9 @@ class Device(CompositeEventEmitter):
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_smp_enabled = config.classic_smp_enabled
self.discoverable = config.discoverable
self.connectable = config.connectable
self.classic_accept_any = config.classic_accept_any
@@ -1042,12 +1057,12 @@ class Device(CompositeEventEmitter):
self.random_address = address
# Setup SMP
self.smp_manager = smp.Manager(self)
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_BR_CID, self.on_smp_pdu
self.smp_manager = smp.Manager(
self, pairing_config_factory=lambda connection: PairingConfig()
)
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
# Register the SDP server with the L2CAP Channel Manager
self.sdp_server.register(self.l2cap_channel_manager)
@@ -1183,6 +1198,12 @@ class Device(CompositeEventEmitter):
if self.keystore is None:
self.keystore = KeyStore.create_for_device(self)
# Finish setting up SMP based on post-init configurable options
if self.classic_smp_enabled:
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_BR_CID, self.on_smp_pdu
)
if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
await self.send_command(
HCI_Write_LE_Host_Support_Command(
@@ -1230,7 +1251,7 @@ class Device(CompositeEventEmitter):
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
resolving_keys = await self.keystore.get_resolving_keys()
for (irk, address) in resolving_keys:
for irk, address in resolving_keys:
await self.send_command(
HCI_LE_Add_Device_To_Resolving_List_Command(
peer_identity_address_type=address.address_type,
@@ -1802,7 +1823,7 @@ class Device(CompositeEventEmitter):
else:
# Save pending connection
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address
self, peer_address, BT_CENTRAL_ROLE
)
# TODO: allow passing other settings
@@ -1939,9 +1960,12 @@ class Device(CompositeEventEmitter):
self.on('connection', on_connection)
self.on('connection_failure', on_connection_failure)
# Save pending connection
# Save pending connection, with the Peripheral role.
# Even if we requested a role switch in the HCI_Accept_Connection_Request
# command, this connection is still considered Peripheral until an eventual
# role change event.
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address
self, peer_address, BT_PERIPHERAL_ROLE
)
try:
@@ -2214,6 +2238,10 @@ class Device(CompositeEventEmitter):
keys = await self.keystore.get(str(address))
if keys is not None:
logger.debug('found keys in the key store')
if keys.link_key is None:
logger.warning('no link key')
return None
return keys.link_key.value
# [Classic only]
@@ -2418,8 +2446,14 @@ class Device(CompositeEventEmitter):
def on_link_key(self, bd_addr, link_key, key_type):
# Store the keys in the key store
if self.keystore:
authenticated = key_type in (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
pairing_keys = PairingKeys()
pairing_keys.link_key = PairingKeys.Key(value=link_key)
pairing_keys.link_key = PairingKeys.Key(
value=link_key, authenticated=authenticated
)
async def store_keys():
try:
@@ -2463,25 +2497,24 @@ class Device(CompositeEventEmitter):
connection_handle,
transport,
peer_address,
peer_resolvable_address,
role,
connection_parameters,
):
logger.debug(
f'*** Connection: [0x{connection_handle:04X}] '
f'{peer_address} as {HCI_Constant.role_name(role)}'
f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}'
)
if connection_handle in self.connections:
logger.warning(
'new connection reuses the same handle as a previous connection'
)
peer_resolvable_address = None
if transport == BT_BR_EDR_TRANSPORT:
# Create a new connection
connection = self.pending_connections.pop(peer_address)
connection.complete(
connection_handle, peer_resolvable_address, role, connection_parameters
)
connection.complete(connection_handle, connection_parameters)
self.connections[connection_handle] = connection
# Emit an event to notify listeners of the new connection
@@ -2593,7 +2626,9 @@ class Device(CompositeEventEmitter):
# device configuration is set to accept any incoming connection
elif self.classic_accept_any:
# Save pending connection
self.pending_connections[bd_addr] = Connection.incomplete(self, bd_addr)
self.pending_connections[bd_addr] = Connection.incomplete(
self, bd_addr, BT_PERIPHERAL_ROLE
)
self.host.send_command_sync(
HCI_Accept_Connection_Request_Command(
@@ -2684,7 +2719,7 @@ class Device(CompositeEventEmitter):
# On Secure Simple Pairing complete, in case:
# - Connection isn't already authenticated
# - AND we are not the initiator of the authentication
# We must trigger authentication to known if we are truly authenticated
# We must trigger authentication to know if we are truly authenticated
if not connection.authenticating and not connection.authenticated:
logger.debug(
f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] '
@@ -2699,22 +2734,6 @@ class Device(CompositeEventEmitter):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
# Map the SMP IO capability to a Classic IO capability
# pylint: disable=line-too-long
io_capability = {
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
}.get(pairing_config.delegate.io_capability)
if io_capability is None:
logger.warning(
f'cannot map IO capability ({pairing_config.delegate.io_capability}'
)
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# Compute the authentication requirements
authentication_requirements = (
# No Bonding
@@ -2733,53 +2752,50 @@ class Device(CompositeEventEmitter):
self.host.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr=connection.peer_address,
io_capability=io_capability,
io_capability=pairing_config.delegate.classic_io_capability,
oob_data_present=0x00, # Not present
authentication_requirements=authentication_requirements,
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_response(
self, connection, io_capability, authentication_requirements
):
connection.peer_pairing_io_capability = io_capability
connection.peer_pairing_authentication_requirements = (
authentication_requirements
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_confirmation_request(self, connection, code):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_compare = pairing_config.delegate.io_capability not in (
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY,
)
io_capability = pairing_config.delegate.classic_io_capability
# Respond
if can_compare:
async def compare_numbers():
numbers_match = await connection.abort_on(
'disconnection',
pairing_config.delegate.compare_numbers(code, digits=6),
)
if numbers_match:
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address
)
)
else:
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY:
if connection.peer_pairing_io_capability in (
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
):
# Display the code and ask the user to compare
async def prompt():
return (
await pairing_config.delegate.compare_numbers(code, digits=6),
)
asyncio.create_task(compare_numbers())
else:
else:
# Ask the user to confirm the pairing, without showing a code
async def prompt():
return await pairing_config.delegate.confirm()
async def confirm():
confirm = await connection.abort_on(
'disconnection', pairing_config.delegate.confirm()
)
if confirm:
if await prompt():
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address
@@ -2792,7 +2808,17 @@ class Device(CompositeEventEmitter):
)
)
asyncio.create_task(confirm())
AsyncRunner.spawn(connection.abort_on('disconnection', confirm()))
return
if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY:
# Display the code to the user
AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6))
# Automatic confirmation
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@@ -2800,15 +2826,11 @@ class Device(CompositeEventEmitter):
def on_authentication_user_passkey_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_input = pairing_config.delegate.io_capability in (
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
)
io_capability = pairing_config.delegate.classic_io_capability
# Respond
if can_input:
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
# Ask the user to input a number
async def get_number():
number = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
@@ -2838,18 +2860,14 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_address
def on_pin_code_request(self, connection):
# classic legacy pairing
# Classic legacy pairing
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
io_capability = pairing_config.delegate.classic_io_capability
can_input = pairing_config.delegate.io_capability in (
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
)
# respond the pin code
if can_input:
# Respond
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
# Ask the user to enter a string
async def get_pin_code():
pin_code = await connection.abort_on(
'disconnection', pairing_config.delegate.get_string(16)
@@ -2889,6 +2907,7 @@ class Device(CompositeEventEmitter):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
# Show the passkey to the user
connection.abort_on(
'disconnection', pairing_config.delegate.display_number(passkey)
)

View File

@@ -94,10 +94,9 @@ HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS = 1
# -----------------------------------------------------------------------------
class Connection:
def __init__(self, host, handle, role, peer_address, transport):
def __init__(self, host, handle, peer_address, transport):
self.host = host
self.handle = handle
self.role = role
self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
@@ -396,8 +395,8 @@ class Host(AbortableEventEmitter):
def supports_command(self, command):
# Find the support flag position for this command
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for (flag_position, value) in enumerate(flags):
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag_position, value in enumerate(flags):
if value == command:
# Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8:
@@ -410,7 +409,7 @@ class Host(AbortableEventEmitter):
@property
def supported_commands(self):
commands = []
for (octet, flags) in enumerate(self.local_supported_commands):
for octet, flags in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
@@ -534,7 +533,7 @@ class Host(AbortableEventEmitter):
if event.status == HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### CONNECTION: [0x{event.connection_handle:04X}] '
f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.peer_address} as {HCI_Constant.role_name(event.role)}'
)
@@ -543,7 +542,6 @@ class Host(AbortableEventEmitter):
connection = Connection(
self,
event.connection_handle,
event.role,
event.peer_address,
BT_LE_TRANSPORT,
)
@@ -560,7 +558,6 @@ class Host(AbortableEventEmitter):
event.connection_handle,
BT_LE_TRANSPORT,
event.peer_address,
None,
event.role,
connection_parameters,
)
@@ -589,7 +586,6 @@ class Host(AbortableEventEmitter):
connection = Connection(
self,
event.connection_handle,
BT_CENTRAL_ROLE,
event.bd_addr,
BT_BR_EDR_TRANSPORT,
)
@@ -602,7 +598,6 @@ class Host(AbortableEventEmitter):
BT_BR_EDR_TRANSPORT,
event.bd_addr,
None,
BT_CENTRAL_ROLE,
None,
)
else:
@@ -622,8 +617,7 @@ class Host(AbortableEventEmitter):
if event.status == HCI_SUCCESS:
logger.debug(
f'### DISCONNECTION: [0x{event.connection_handle:04X}] '
f'{connection.peer_address} as '
f'{HCI_Constant.role_name(connection.role)}, '
f'{connection.peer_address} '
f'reason={event.reason}'
)
del self.connections[event.connection_handle]
@@ -739,10 +733,6 @@ class Host(AbortableEventEmitter):
f'role change for {event.bd_addr}: '
f'{HCI_Constant.role_name(event.new_role)}'
)
if connection := self.find_connection_by_bd_addr(
event.bd_addr, BT_BR_EDR_TRANSPORT
):
connection.role = event.new_role
self.emit('role_change', event.bd_addr, event.new_role)
else:
logger.debug(
@@ -849,7 +839,12 @@ class Host(AbortableEventEmitter):
self.emit('authentication_io_capability_request', event.bd_addr)
def on_hci_io_capability_response_event(self, event):
pass
self.emit(
'authentication_io_capability_response',
event.bd_addr,
event.io_capability,
event.authentication_requirements,
)
def on_hci_user_confirmation_request_event(self, event):
self.emit(

View File

@@ -257,7 +257,7 @@ class JsonKeyStore(KeyStore):
json.dump(db, output, sort_keys=True, indent=4)
# Atomically replace the previous file
os.rename(temp_filename, self.filename)
os.replace(temp_filename, self.filename)
async def delete(self, name: str) -> None:
db = await self.load()
@@ -273,7 +273,7 @@ class JsonKeyStore(KeyStore):
db = await self.load()
namespace = db.setdefault(self.namespace, {})
namespace[name] = keys.to_dict()
namespace.setdefault(name, {}).update(keys.to_dict())
await self.save(db)

184
bumble/pairing.py Normal file
View File

@@ -0,0 +1,184 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import enum
from typing import Optional, Tuple
from .hci import (
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
)
from .smp import (
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
SMP_DISPLAY_ONLY_IO_CAPABILITY,
SMP_DISPLAY_YES_NO_IO_CAPABILITY,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
SMP_ENC_KEY_DISTRIBUTION_FLAG,
SMP_ID_KEY_DISTRIBUTION_FLAG,
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
SMP_LINK_KEY_DISTRIBUTION_FLAG,
)
# -----------------------------------------------------------------------------
class PairingDelegate:
"""Abstract base class for Pairing Delegates."""
# I/O Capabilities.
# These are defined abstractly, and can be mapped to specific Classic pairing
# and/or SMP constants.
class IoCapability(enum.IntEnum):
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
# Direct names for backward compatibility.
NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT
KEYBOARD_INPUT_ONLY = IoCapability.KEYBOARD_INPUT_ONLY
DISPLAY_OUTPUT_ONLY = IoCapability.DISPLAY_OUTPUT_ONLY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
# Key Distribution [LE only]
class KeyDistribution(enum.IntFlag):
DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG
DEFAULT_KEY_DISTRIBUTION: int = (
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
)
# Default mapping from abstract to Classic I/O capabilities.
# Subclasses may override this if they prefer a different mapping.
CLASSIC_IO_CAPABILITIES_MAP = {
NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
}
io_capability: IoCapability
local_initiator_key_distribution: KeyDistribution
local_responder_key_distribution: KeyDistribution
def __init__(
self,
io_capability=NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
@property
def classic_io_capability(self) -> int:
"""Map the abstract I/O capability to a Classic constant."""
# pylint: disable=line-too-long
return self.CLASSIC_IO_CAPABILITIES_MAP.get(
self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
)
@property
def smp_io_capability(self) -> int:
"""Map the abstract I/O capability to an SMP constant."""
# This is just a 1-1 direct mapping
return self.io_capability
async def accept(self) -> bool:
"""Accept or reject a Pairing request."""
return True
async def confirm(self) -> bool:
"""Respond yes or no to a Pairing confirmation question."""
return True
# pylint: disable-next=unused-argument
async def compare_numbers(self, number: int, digits: int) -> bool:
"""Compare two numbers."""
return True
async def get_number(self) -> Optional[int]:
"""
Return an optional number as an answer to a passkey request.
Returning `None` will result in a negative reply.
"""
return 0
async def get_string(self, max_length) -> Optional[str]:
"""
Return a string whose utf-8 encoding is up to max_length bytes.
"""
return None
# pylint: disable-next=unused-argument
async def display_number(self, number: int, digits: int) -> None:
"""Display a number."""
# [LE only]
async def key_distribution_response(
self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int
) -> Tuple[int, int]:
"""
Return the key distribution response in an SMP protocol context.
NOTE: since it is only used by the SMP protocol, this method's input and output
are directly as integers, using the SMP constants, rather than the abstract
KeyDistribution enums.
"""
return (
int(
peer_initiator_key_distribution & self.local_initiator_key_distribution
),
int(
peer_responder_key_distribution & self.local_responder_key_distribution
),
)
# -----------------------------------------------------------------------------
class PairingConfig:
"""Configuration for the Pairing protocol."""
def __init__(
self,
sc: bool = True,
mitm: bool = True,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
def __str__(self) -> str:
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'delegate[{self.delegate.io_capability}])'
)

View File

@@ -439,7 +439,7 @@ class DLC(EventEmitter):
logger.debug(
f'<<< Credits [{self.dlci}]: '
f'received {credits}, total={self.tx_credits}'
f'received {received_credits}, total={self.tx_credits}'
)
data = data[1:]

View File

@@ -31,7 +31,16 @@ from typing import Dict, Optional, Type
from pyee import EventEmitter
from .colors import color
from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value
from .hci import (
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
Address,
HCI_LE_Enable_Encryption_Command,
HCI_Object,
key_with_value,
)
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
@@ -476,7 +485,7 @@ class AddressResolver:
address_bytes = bytes(address)
hash_part = address_bytes[0:3]
prand = address_bytes[3:6]
for (irk, resolved_address) in self.resolving_keys:
for irk, resolved_address in self.resolving_keys:
local_hash = crypto.ah(irk, prand)
if local_hash == hash_part:
# Match!
@@ -491,86 +500,6 @@ class AddressResolver:
return None
# -----------------------------------------------------------------------------
class PairingDelegate:
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
DEFAULT_KEY_DISTRIBUTION: int = (
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
)
def __init__(
self,
io_capability: int = NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
async def accept(self) -> bool:
return True
async def confirm(self) -> bool:
return True
# pylint: disable-next=unused-argument
async def compare_numbers(self, number: int, digits: int) -> bool:
return True
async def get_number(self) -> Optional[int]:
'''
Returns an optional number as an answer to a passkey request.
Returning `None` will result in a negative reply.
'''
return 0
async def get_string(self, max_length) -> Optional[str]:
'''
Returns a string whose utf-8 encoding is up to max_length bytes.
'''
return None
# pylint: disable-next=unused-argument
async def display_number(self, number: int, digits: int) -> None:
pass
async def key_distribution_response(
self, peer_initiator_key_distribution, peer_responder_key_distribution
):
return (
(peer_initiator_key_distribution & self.local_initiator_key_distribution),
(peer_responder_key_distribution & self.local_responder_key_distribution),
)
# -----------------------------------------------------------------------------
class PairingConfig:
def __init__(
self,
sc: bool = True,
mitm: bool = True,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
def __str__(self):
io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability)
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'delegate[{io_capability_str}])'
)
# -----------------------------------------------------------------------------
class Session:
# Pairing methods
@@ -1662,12 +1591,12 @@ class Manager(EventEmitter):
Implements the Initiator and Responder roles of the Security Manager Protocol
'''
def __init__(self, device):
def __init__(self, device, pairing_config_factory):
super().__init__()
self.device = device
self.sessions = {}
self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig()
self.pairing_config_factory = pairing_config_factory
def send_command(self, connection, command):
logger.debug(

View File

@@ -1,4 +1,6 @@
{
"name": "Bumble Hands-Free",
"class_of_device": 2360324
"class_of_device": 2360324,
"keystore": "JsonKeyStore",
"le_enabled": false
}

View File

@@ -28,9 +28,8 @@ from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import Service, Characteristic
from bumble.transport import AsyncPipeSink
from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import (
PairingConfig,
PairingDelegate,
SMP_PAIRING_NOT_SUPPORTED_ERROR,
SMP_CONFIRM_VALUE_FAILED_ERROR,
)
@@ -262,7 +261,7 @@ async def test_self_gatt_long_read():
found_service = result[0]
found_characteristics = await found_service.discover_characteristics()
assert len(found_characteristics) == 513
for (i, characteristic) in enumerate(found_characteristics):
for i, characteristic in enumerate(found_characteristics):
value = await characteristic.read_value()
assert value == characteristics[i].value
@@ -317,11 +316,11 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
# -----------------------------------------------------------------------------
IO_CAP = [
PairingDelegate.NO_OUTPUT_NO_INPUT,
PairingDelegate.KEYBOARD_INPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT,
PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY,
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
]
SC = [False, True]
MITM = [False, True]
@@ -335,7 +334,10 @@ KEY_DIST = range(16)
itertools.chain(
itertools.product([IO_CAP], SC, MITM, [15]),
itertools.product(
[[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST
[[PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]],
SC,
MITM,
KEY_DIST,
),
),
)
@@ -378,7 +380,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
else:
if (
self.peer_delegate.io_capability
== PairingDelegate.KEYBOARD_INPUT_ONLY
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
):
peer_number = 6789
else:
@@ -421,7 +423,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
async def test_self_smp_reject():
class RejectingDelegate(PairingDelegate):
def __init__(self):
super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT)
super().__init__(PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT)
async def accept(self):
return False
@@ -442,12 +444,14 @@ async def test_self_smp_reject():
async def test_self_smp_wrong_pin():
class WrongPinDelegate(PairingDelegate):
def __init__(self):
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
super().__init__(
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
)
async def compare_numbers(self, number, digits):
return False
wrong_pin_pairing_config = PairingConfig(delegate=WrongPinDelegate())
wrong_pin_pairing_config = PairingConfig(mitm=True, delegate=WrongPinDelegate())
paired = False
try:
await _test_self_smp_with_configs(