Compare commits

...

12 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
9af426db45 Merge pull request #177 from google/gbg/pairing-delegate-refactor
refactor PairingDelegate
2023-04-18 15:07:23 -07:00
Gilles Boccon-Gibod
4286b2ab59 address PR comments 2023-04-18 15:05:18 -07:00
Gilles Boccon-Gibod
3442358dea refactor PairingDelegate 2023-04-18 15:04:53 -07:00
Gilles Boccon-Gibod
bf3e05ef91 Merge pull request #174 from google/gbg/pairing-fix
fix role state for classic connections
2023-04-11 20:32:29 -07:00
Gilles Boccon-Gibod
5351ab8a42 Merge pull request #176 from google/gbg/connection-defaults
only use 1M parameters by default
2023-04-11 20:26:42 -07:00
Gilles Boccon-Gibod
49b2c13e69 only use 1M parameters by default 2023-04-09 17:57:11 -07:00
Lucas Abel
962737a97b Merge pull request #173 from zxzxwu/smp
SMP: Determine initiator by direction instead of role
2023-04-07 09:50:06 -07:00
Josh Wu
85496aaff5 SMP: Determine initiator by direction instead of role
Though in the spec this is not allowed, but in some ambiguous cases such
as SMP over BR/EDR or we want to test how remote devices handle invalid
pairing requests, we still need to allow this behavior, with some logs
to let users know it's invalid.
2023-04-08 00:02:48 +08:00
Lucas Abel
a95e601a5c Merge pull request #172 from qiaoccolato/main
LICENCE: recorde colors.py licence
2023-04-05 13:31:13 -07:00
Qiao Yang
df218b5370 LICENCE: recorde colors.py licence 2023-04-05 19:02:44 +00:00
Alan Rosenthal
0f737244b5 Merge pull request #169 from AlanRosenthal/alan/remote-values
Add `show remote-values`
2023-04-05 09:00:48 -04:00
Alan Rosenthal
a258ba383a Add show remote-values
`gatt_client` caches values read/notifications/indications and displays the most recent value to the user
2023-04-04 18:15:42 +00:00
10 changed files with 427 additions and 184 deletions

19
LICENSE
View File

@@ -200,3 +200,22 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Files: bumble/colors.py
Copyright (c) 2012 Giorgos Verigakis <verigak@gmail.com>
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View File

@@ -24,6 +24,7 @@ import logging
import os
import random
import re
import humanize
from typing import Optional, Union
from collections import OrderedDict
@@ -165,6 +166,7 @@ class ConsoleApp:
'local-services': None,
'remote-services': None,
'local-values': None,
'remote-values': None,
},
'filter': {
'address': None,
@@ -212,6 +214,7 @@ class ConsoleApp:
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
)
self.local_values_text = FormattedTextControl()
self.remote_values_text = FormattedTextControl()
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = []
@@ -234,6 +237,10 @@ class ConsoleApp:
Frame(Window(self.remote_services_text), title='Remote Services'),
filter=Condition(lambda: self.top_tab == 'remote-services'),
),
ConditionalContainer(
Frame(Window(self.remote_values_text), title='Remote Values'),
filter=Condition(lambda: self.top_tab == 'remote-values'),
),
ConditionalContainer(
Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log'),
@@ -737,6 +744,7 @@ class ConsoleApp:
'local-services',
'remote-services',
'local-values',
'remote-values',
}:
self.top_tab = params[0]
self.ui.invalidate()
@@ -745,6 +753,10 @@ class ConsoleApp:
await self.do_show_local_values()
await asyncio.sleep(1)
while self.top_tab == 'remote-values':
await self.do_show_remote_values()
await asyncio.sleep(1)
async def do_show_local_values(self):
prettytable = PrettyTable()
field_names = ["Service", "Characteristic", "Descriptor"]
@@ -800,6 +812,40 @@ class ConsoleApp:
self.local_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_show_remote_values(self):
prettytable = PrettyTable(
field_names=[
"Connection",
"Service",
"Characteristic",
"Descriptor",
"Time",
"Value",
]
)
for connection in self.device.connections.values():
for handle, (time, value) in connection.gatt_client.cached_values.items():
row = [connection.handle]
attribute = connection.gatt_client.get_attributes(handle)
if not attribute:
continue
if len(attribute) == 3:
row.extend(
[attribute[0].uuid, attribute[1].uuid, attribute[2].type]
)
elif len(attribute) == 2:
row.extend([attribute[0].uuid, attribute[1].uuid, ""])
elif len(attribute) == 1:
row.extend([attribute[0].uuid, "", ""])
else:
continue
row.extend([humanize.naturaltime(time), value])
prettytable.add_row(row)
self.remote_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_get_phy(self, _):
if not self.connected_peer:
self.show_error('not connected')
@@ -899,9 +945,9 @@ class ConsoleApp:
# send data to any subscribers
if isinstance(attribute, Characteristic):
attribute.write_value(None, value)
if attribute.has_properties([Characteristic.NOTIFY]):
if attribute.has_properties(Characteristic.NOTIFY):
await self.device.gatt_server.notify_subscribers(attribute)
if attribute.has_properties([Characteristic.INDICATE]):
if attribute.has_properties(Characteristic.INDICATE):
await self.device.gatt_server.indicate_subscribers(attribute)
async def do_subscribe(self, params):

View File

@@ -24,7 +24,7 @@ from prompt_toolkit.shortcuts import PromptSession
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link
from bumble.smp import PairingDelegate, PairingConfig
from bumble.pairing import PairingDelegate, PairingConfig
from bumble.smp import error_name as smp_error_name
from bumble.keys import JsonKeyStore
from bumble.core import ProtocolError
@@ -345,8 +345,13 @@ async def pair(
print(color(f'Pairing failed: {error}', 'red'))
return
else:
# Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True)
if mode == 'le':
# Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True)
else:
# Become discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
# Run until the user asks to exit
await Waiter.instance.wait_until_terminated()

View File

@@ -29,11 +29,13 @@ from .colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
from .gatt import Characteristic, Descriptor, Service
from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_EXTENDED_INQUIRY_MODE,
HCI_GENERAL_INQUIRY_LAP,
HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
@@ -141,6 +143,7 @@ from .keys import (
KeyStore,
PairingKeys,
)
from .pairing import PairingConfig
from . import gatt_client
from . import gatt_server
from . import smp
@@ -198,6 +201,7 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
# Classes
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class Advertisement:
address: Address
@@ -529,6 +533,9 @@ class Connection(CompositeEventEmitter):
authenticated: bool
sc: bool
link_key_type: int
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
@composite_listener
class Listener:
@@ -592,6 +599,8 @@ class Connection(CompositeEventEmitter):
self.gatt_server = (
device.gatt_server
) # By default, use the device's shared server
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
# [Classic only]
@classmethod
@@ -1048,7 +1057,10 @@ class Device(CompositeEventEmitter):
self.random_address = address
# Setup SMP
self.smp_manager = smp.Manager(self)
self.smp_manager = smp.Manager(
self, pairing_config_factory=lambda connection: PairingConfig()
)
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
# Register the SDP server with the L2CAP Channel Manager
@@ -1239,7 +1251,7 @@ class Device(CompositeEventEmitter):
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
resolving_keys = await self.keystore.get_resolving_keys()
for (irk, address) in resolving_keys:
for irk, address in resolving_keys:
await self.send_command(
HCI_LE_Add_Device_To_Resolving_List_Command(
peer_identity_address_type=address.address_type,
@@ -1620,7 +1632,7 @@ class Device(CompositeEventEmitter):
pending connection.
connection_parameters_preferences: (BLE only, ignored for BR/EDR)
* None: use all PHYs with default parameters
* None: use the 1M PHY with default parameters
* map: each entry has a PHY as key and a ConnectionParametersPreferences
object as value
@@ -1689,9 +1701,7 @@ class Device(CompositeEventEmitter):
if connection_parameters_preferences is None:
if connection_parameters_preferences is None:
connection_parameters_preferences = {
HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
HCI_LE_CODED_PHY: ConnectionParametersPreferences.default,
HCI_LE_1M_PHY: ConnectionParametersPreferences.default
}
self.connect_own_address_type = own_address_type
@@ -2229,8 +2239,9 @@ class Device(CompositeEventEmitter):
if keys is not None:
logger.debug('found keys in the key store')
if keys.link_key is None:
logger.debug('no link key')
logger.warning('no link key')
return None
return keys.link_key.value
# [Classic only]
@@ -2435,8 +2446,14 @@ class Device(CompositeEventEmitter):
def on_link_key(self, bd_addr, link_key, key_type):
# Store the keys in the key store
if self.keystore:
authenticated = key_type in (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
pairing_keys = PairingKeys()
pairing_keys.link_key = PairingKeys.Key(value=link_key)
pairing_keys.link_key = PairingKeys.Key(
value=link_key, authenticated=authenticated
)
async def store_keys():
try:
@@ -2702,7 +2719,7 @@ class Device(CompositeEventEmitter):
# On Secure Simple Pairing complete, in case:
# - Connection isn't already authenticated
# - AND we are not the initiator of the authentication
# We must trigger authentication to known if we are truly authenticated
# We must trigger authentication to know if we are truly authenticated
if not connection.authenticating and not connection.authenticated:
logger.debug(
f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] '
@@ -2717,22 +2734,6 @@ class Device(CompositeEventEmitter):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
# Map the SMP IO capability to a Classic IO capability
# pylint: disable=line-too-long
io_capability = {
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
}.get(pairing_config.delegate.io_capability)
if io_capability is None:
logger.warning(
f'cannot map IO capability ({pairing_config.delegate.io_capability}'
)
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# Compute the authentication requirements
authentication_requirements = (
# No Bonding
@@ -2751,53 +2752,50 @@ class Device(CompositeEventEmitter):
self.host.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr=connection.peer_address,
io_capability=io_capability,
io_capability=pairing_config.delegate.classic_io_capability,
oob_data_present=0x00, # Not present
authentication_requirements=authentication_requirements,
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_response(
self, connection, io_capability, authentication_requirements
):
connection.peer_pairing_io_capability = io_capability
connection.peer_pairing_authentication_requirements = (
authentication_requirements
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_confirmation_request(self, connection, code):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_compare = pairing_config.delegate.io_capability not in (
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY,
)
io_capability = pairing_config.delegate.classic_io_capability
# Respond
if can_compare:
async def compare_numbers():
numbers_match = await connection.abort_on(
'disconnection',
pairing_config.delegate.compare_numbers(code, digits=6),
)
if numbers_match:
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address
)
)
else:
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY:
if connection.peer_pairing_io_capability in (
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
):
# Display the code and ask the user to compare
async def prompt():
return (
await pairing_config.delegate.compare_numbers(code, digits=6),
)
asyncio.create_task(compare_numbers())
else:
else:
# Ask the user to confirm the pairing, without showing a code
async def prompt():
return await pairing_config.delegate.confirm()
async def confirm():
confirm = await connection.abort_on(
'disconnection', pairing_config.delegate.confirm()
)
if confirm:
if await prompt():
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address
@@ -2810,7 +2808,17 @@ class Device(CompositeEventEmitter):
)
)
asyncio.create_task(confirm())
AsyncRunner.spawn(connection.abort_on('disconnection', confirm()))
return
if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY:
# Display the code to the user
AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6))
# Automatic confirmation
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@@ -2818,15 +2826,11 @@ class Device(CompositeEventEmitter):
def on_authentication_user_passkey_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_input = pairing_config.delegate.io_capability in (
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
)
io_capability = pairing_config.delegate.classic_io_capability
# Respond
if can_input:
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
# Ask the user to input a number
async def get_number():
number = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
@@ -2856,18 +2860,14 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_address
def on_pin_code_request(self, connection):
# classic legacy pairing
# Classic legacy pairing
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
io_capability = pairing_config.delegate.classic_io_capability
can_input = pairing_config.delegate.io_capability in (
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
)
# respond the pin code
if can_input:
# Respond
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
# Ask the user to enter a string
async def get_pin_code():
pin_code = await connection.abort_on(
'disconnection', pairing_config.delegate.get_string(16)
@@ -2907,6 +2907,7 @@ class Device(CompositeEventEmitter):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
# Show the passkey to the user
connection.abort_on(
'disconnection', pairing_config.delegate.display_number(passkey)
)

View File

@@ -27,7 +27,8 @@ from __future__ import annotations
import asyncio
import logging
import struct
from typing import List, Optional, Dict, Any, Callable
from datetime import datetime
from typing import List, Optional, Dict, Tuple, Callable, Union, Any
from pyee import EventEmitter
@@ -167,7 +168,9 @@ class CharacteristicProxy(AttributeProxy):
async def discover_descriptors(self):
return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None, prefer_notify=True):
async def subscribe(
self, subscriber: Optional[Callable] = None, prefer_notify=True
):
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
@@ -221,6 +224,7 @@ class ProfileServiceProxy:
# -----------------------------------------------------------------------------
class Client:
services: List[ServiceProxy]
cached_values: Dict[int, Tuple[datetime, bytes]]
def __init__(self, connection):
self.connection = connection
@@ -233,6 +237,7 @@ class Client:
) # Notification subscribers, by attribute handle
self.indication_subscribers = {} # Indication subscribers, by attribute handle
self.services = []
self.cached_values = {}
def send_gatt_pdu(self, pdu):
self.connection.send_l2cap_pdu(ATT_CID, pdu)
@@ -317,6 +322,35 @@ class Client:
if c.uuid == uuid
]
def get_attribute_grouping(
self, attribute_handle: int
) -> Optional[
Union[
ServiceProxy,
Tuple[ServiceProxy, CharacteristicProxy],
Tuple[ServiceProxy, CharacteristicProxy, DescriptorProxy],
]
]:
"""
Get the attribute(s) associated with an attribute handle
"""
for service in self.services:
if service.handle == attribute_handle:
return service
if service.handle <= attribute_handle <= service.end_group_handle:
for characteristic in service.characteristics:
if characteristic.handle == attribute_handle:
return (service, characteristic)
if (
characteristic.handle
<= attribute_handle
<= characteristic.end_group_handle
):
for descriptor in characteristic.descriptors:
if descriptor.handle == attribute_handle:
return (service, characteristic, descriptor)
return None
def on_service_discovered(self, service):
'''Add a service to the service list if it wasn't already there'''
already_known = False
@@ -808,6 +842,7 @@ class Client:
offset += len(part)
self.cache_value(attribute_handle, attribute_value)
# Return the value as bytes
return attribute_value
@@ -942,6 +977,8 @@ class Client:
)
if not subscribers:
logger.warning('!!! received notification with no subscriber')
self.cache_value(notification.attribute_handle, notification.attribute_value)
for subscriber in subscribers:
if callable(subscriber):
subscriber(notification.attribute_value)
@@ -953,6 +990,8 @@ class Client:
subscribers = self.indication_subscribers.get(indication.attribute_handle, [])
if not subscribers:
logger.warning('!!! received indication with no subscriber')
self.cache_value(indication.attribute_handle, indication.attribute_value)
for subscriber in subscribers:
if callable(subscriber):
subscriber(indication.attribute_value)
@@ -961,3 +1000,9 @@ class Client:
# Confirm that we received the indication
self.send_confirmation(ATT_Handle_Value_Confirmation())
def cache_value(self, attribute_handle: int, value: bytes):
self.cached_values[attribute_handle] = (
datetime.now(),
value,
)

View File

@@ -395,8 +395,8 @@ class Host(AbortableEventEmitter):
def supports_command(self, command):
# Find the support flag position for this command
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for (flag_position, value) in enumerate(flags):
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag_position, value in enumerate(flags):
if value == command:
# Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8:
@@ -409,7 +409,7 @@ class Host(AbortableEventEmitter):
@property
def supported_commands(self):
commands = []
for (octet, flags) in enumerate(self.local_supported_commands):
for octet, flags in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
@@ -839,7 +839,12 @@ class Host(AbortableEventEmitter):
self.emit('authentication_io_capability_request', event.bd_addr)
def on_hci_io_capability_response_event(self, event):
pass
self.emit(
'authentication_io_capability_response',
event.bd_addr,
event.io_capability,
event.authentication_requirements,
)
def on_hci_user_confirmation_request_event(self, event):
self.emit(

184
bumble/pairing.py Normal file
View File

@@ -0,0 +1,184 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import enum
from typing import Optional, Tuple
from .hci import (
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
)
from .smp import (
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
SMP_DISPLAY_ONLY_IO_CAPABILITY,
SMP_DISPLAY_YES_NO_IO_CAPABILITY,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
SMP_ENC_KEY_DISTRIBUTION_FLAG,
SMP_ID_KEY_DISTRIBUTION_FLAG,
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
SMP_LINK_KEY_DISTRIBUTION_FLAG,
)
# -----------------------------------------------------------------------------
class PairingDelegate:
"""Abstract base class for Pairing Delegates."""
# I/O Capabilities.
# These are defined abstractly, and can be mapped to specific Classic pairing
# and/or SMP constants.
class IoCapability(enum.IntEnum):
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
# Direct names for backward compatibility.
NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT
KEYBOARD_INPUT_ONLY = IoCapability.KEYBOARD_INPUT_ONLY
DISPLAY_OUTPUT_ONLY = IoCapability.DISPLAY_OUTPUT_ONLY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
# Key Distribution [LE only]
class KeyDistribution(enum.IntFlag):
DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG
DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG
DEFAULT_KEY_DISTRIBUTION: int = (
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
)
# Default mapping from abstract to Classic I/O capabilities.
# Subclasses may override this if they prefer a different mapping.
CLASSIC_IO_CAPABILITIES_MAP = {
NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
}
io_capability: IoCapability
local_initiator_key_distribution: KeyDistribution
local_responder_key_distribution: KeyDistribution
def __init__(
self,
io_capability=NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
@property
def classic_io_capability(self) -> int:
"""Map the abstract I/O capability to a Classic constant."""
# pylint: disable=line-too-long
return self.CLASSIC_IO_CAPABILITIES_MAP.get(
self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
)
@property
def smp_io_capability(self) -> int:
"""Map the abstract I/O capability to an SMP constant."""
# This is just a 1-1 direct mapping
return self.io_capability
async def accept(self) -> bool:
"""Accept or reject a Pairing request."""
return True
async def confirm(self) -> bool:
"""Respond yes or no to a Pairing confirmation question."""
return True
# pylint: disable-next=unused-argument
async def compare_numbers(self, number: int, digits: int) -> bool:
"""Compare two numbers."""
return True
async def get_number(self) -> Optional[int]:
"""
Return an optional number as an answer to a passkey request.
Returning `None` will result in a negative reply.
"""
return 0
async def get_string(self, max_length) -> Optional[str]:
"""
Return a string whose utf-8 encoding is up to max_length bytes.
"""
return None
# pylint: disable-next=unused-argument
async def display_number(self, number: int, digits: int) -> None:
"""Display a number."""
# [LE only]
async def key_distribution_response(
self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int
) -> Tuple[int, int]:
"""
Return the key distribution response in an SMP protocol context.
NOTE: since it is only used by the SMP protocol, this method's input and output
are directly as integers, using the SMP constants, rather than the abstract
KeyDistribution enums.
"""
return (
int(
peer_initiator_key_distribution & self.local_initiator_key_distribution
),
int(
peer_responder_key_distribution & self.local_responder_key_distribution
),
)
# -----------------------------------------------------------------------------
class PairingConfig:
"""Configuration for the Pairing protocol."""
def __init__(
self,
sc: bool = True,
mitm: bool = True,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
def __str__(self) -> str:
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'delegate[{self.delegate.io_capability}])'
)

View File

@@ -31,7 +31,16 @@ from typing import Dict, Optional, Type
from pyee import EventEmitter
from .colors import color
from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value
from .hci import (
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
Address,
HCI_LE_Enable_Encryption_Command,
HCI_Object,
key_with_value,
)
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
@@ -476,7 +485,7 @@ class AddressResolver:
address_bytes = bytes(address)
hash_part = address_bytes[0:3]
prand = address_bytes[3:6]
for (irk, resolved_address) in self.resolving_keys:
for irk, resolved_address in self.resolving_keys:
local_hash = crypto.ah(irk, prand)
if local_hash == hash_part:
# Match!
@@ -491,86 +500,6 @@ class AddressResolver:
return None
# -----------------------------------------------------------------------------
class PairingDelegate:
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
DEFAULT_KEY_DISTRIBUTION: int = (
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
)
def __init__(
self,
io_capability: int = NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
async def accept(self) -> bool:
return True
async def confirm(self) -> bool:
return True
# pylint: disable-next=unused-argument
async def compare_numbers(self, number: int, digits: int) -> bool:
return True
async def get_number(self) -> Optional[int]:
'''
Returns an optional number as an answer to a passkey request.
Returning `None` will result in a negative reply.
'''
return 0
async def get_string(self, max_length) -> Optional[str]:
'''
Returns a string whose utf-8 encoding is up to max_length bytes.
'''
return None
# pylint: disable-next=unused-argument
async def display_number(self, number: int, digits: int) -> None:
pass
async def key_distribution_response(
self, peer_initiator_key_distribution, peer_responder_key_distribution
):
return (
(peer_initiator_key_distribution & self.local_initiator_key_distribution),
(peer_responder_key_distribution & self.local_responder_key_distribution),
)
# -----------------------------------------------------------------------------
class PairingConfig:
def __init__(
self,
sc: bool = True,
mitm: bool = False,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
def __str__(self):
io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability)
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'delegate[{io_capability_str}])'
)
# -----------------------------------------------------------------------------
class Session:
# Pairing methods
@@ -645,7 +574,7 @@ class Session:
},
}
def __init__(self, manager, connection, pairing_config):
def __init__(self, manager, connection, pairing_config, is_initiator):
self.manager = manager
self.connection = connection
self.preq = None
@@ -684,7 +613,7 @@ class Session:
self.ctkd_task = None
# Decide if we're the initiator or the responder
self.is_initiator = connection.role == BT_CENTRAL_ROLE
self.is_initiator = is_initiator
self.is_responder = not self.is_initiator
# Listen for connection events
@@ -1662,12 +1591,12 @@ class Manager(EventEmitter):
Implements the Initiator and Responder roles of the Security Manager Protocol
'''
def __init__(self, device):
def __init__(self, device, pairing_config_factory):
super().__init__()
self.device = device
self.sessions = {}
self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig()
self.pairing_config_factory = pairing_config_factory
def send_command(self, connection, command):
logger.debug(
@@ -1680,6 +1609,8 @@ class Manager(EventEmitter):
def on_smp_pdu(self, connection, pdu):
# Look for a session with this connection, and create one if none exists
if not (session := self.sessions.get(connection.handle)):
if connection.role == BT_CENTRAL_ROLE:
logger.warning('Remote starts pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
# Pairing disabled
@@ -1688,7 +1619,7 @@ class Manager(EventEmitter):
SMP_Pairing_Failed_Command(reason=SMP_PAIRING_NOT_SUPPORTED_ERROR),
)
return
session = Session(self, connection, pairing_config)
session = Session(self, connection, pairing_config, is_initiator=False)
self.sessions[connection.handle] = session
# Parse the L2CAP payload into an SMP Command object
@@ -1709,10 +1640,12 @@ class Manager(EventEmitter):
async def pair(self, connection):
# TODO: check if there's already a session for this connection
if connection.role != BT_CENTRAL_ROLE:
logger.warning('Start pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
raise ValueError('pairing config must not be None when initiating')
session = Session(self, connection, pairing_config)
session = Session(self, connection, pairing_config, is_initiator=True)
self.sessions[connection.handle] = session
return await session.pair()

View File

@@ -44,6 +44,7 @@ install_requires =
pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 8.1; platform_system!='Emscripten'
prettytable >= 3.6.0
humanize >= 4.6.0
[options.entry_points]
console_scripts =

View File

@@ -28,9 +28,8 @@ from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import Service, Characteristic
from bumble.transport import AsyncPipeSink
from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import (
PairingConfig,
PairingDelegate,
SMP_PAIRING_NOT_SUPPORTED_ERROR,
SMP_CONFIRM_VALUE_FAILED_ERROR,
)
@@ -262,7 +261,7 @@ async def test_self_gatt_long_read():
found_service = result[0]
found_characteristics = await found_service.discover_characteristics()
assert len(found_characteristics) == 513
for (i, characteristic) in enumerate(found_characteristics):
for i, characteristic in enumerate(found_characteristics):
value = await characteristic.read_value()
assert value == characteristics[i].value
@@ -317,11 +316,11 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
# -----------------------------------------------------------------------------
IO_CAP = [
PairingDelegate.NO_OUTPUT_NO_INPUT,
PairingDelegate.KEYBOARD_INPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT,
PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY,
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
]
SC = [False, True]
MITM = [False, True]
@@ -335,7 +334,10 @@ KEY_DIST = range(16)
itertools.chain(
itertools.product([IO_CAP], SC, MITM, [15]),
itertools.product(
[[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST
[[PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]],
SC,
MITM,
KEY_DIST,
),
),
)
@@ -378,7 +380,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
else:
if (
self.peer_delegate.io_capability
== PairingDelegate.KEYBOARD_INPUT_ONLY
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
):
peer_number = 6789
else:
@@ -421,7 +423,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
async def test_self_smp_reject():
class RejectingDelegate(PairingDelegate):
def __init__(self):
super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT)
super().__init__(PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT)
async def accept(self):
return False
@@ -442,7 +444,9 @@ async def test_self_smp_reject():
async def test_self_smp_wrong_pin():
class WrongPinDelegate(PairingDelegate):
def __init__(self):
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
super().__init__(
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
)
async def compare_numbers(self, number, digits):
return False