diff --git a/apps/pair.py b/apps/pair.py index 483595d1..a7844fe3 100644 --- a/apps/pair.py +++ b/apps/pair.py @@ -24,7 +24,7 @@ from prompt_toolkit.shortcuts import PromptSession from bumble.colors import color from bumble.device import Device, Peer from bumble.transport import open_transport_or_link -from bumble.smp import PairingDelegate, PairingConfig +from bumble.pairing import PairingDelegate, PairingConfig from bumble.smp import error_name as smp_error_name from bumble.keys import JsonKeyStore from bumble.core import ProtocolError @@ -345,8 +345,13 @@ async def pair( print(color(f'Pairing failed: {error}', 'red')) return else: - # Advertise so that peers can find us and connect - await device.start_advertising(auto_restart=True) + if mode == 'le': + # Advertise so that peers can find us and connect + await device.start_advertising(auto_restart=True) + else: + # Become discoverable and connectable + await device.set_discoverable(True) + await device.set_connectable(True) # Run until the user asks to exit await Waiter.instance.wait_until_terminated() diff --git a/bumble/device.py b/bumble/device.py index bdc4de26..93148aed 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -29,11 +29,13 @@ from .colors import color from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU from .gatt import Characteristic, Descriptor, Service from .hci import ( + HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, + HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_CENTRAL_ROLE, HCI_COMMAND_STATUS_PENDING, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, - HCI_DISPLAY_ONLY_IO_CAPABILITY, HCI_DISPLAY_YES_NO_IO_CAPABILITY, + HCI_DISPLAY_ONLY_IO_CAPABILITY, HCI_EXTENDED_INQUIRY_MODE, HCI_GENERAL_INQUIRY_LAP, HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, @@ -141,6 +143,7 @@ from .keys import ( KeyStore, PairingKeys, ) +from .pairing import PairingConfig from . import gatt_client from . import gatt_server from . import smp @@ -198,6 +201,7 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN # Classes # ----------------------------------------------------------------------------- + # ----------------------------------------------------------------------------- class Advertisement: address: Address @@ -530,6 +534,8 @@ class Connection(CompositeEventEmitter): sc: bool link_key_type: int gatt_client: gatt_client.Client + pairing_peer_io_capability: Optional[int] + pairing_peer_authentication_requirements: Optional[int] @composite_listener class Listener: @@ -593,6 +599,8 @@ class Connection(CompositeEventEmitter): self.gatt_server = ( device.gatt_server ) # By default, use the device's shared server + self.pairing_peer_io_capability = None + self.pairing_peer_authentication_requirements = None # [Classic only] @classmethod @@ -1049,7 +1057,10 @@ class Device(CompositeEventEmitter): self.random_address = address # Setup SMP - self.smp_manager = smp.Manager(self) + self.smp_manager = smp.Manager( + self, pairing_config_factory=lambda connection: PairingConfig() + ) + self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu) # Register the SDP server with the L2CAP Channel Manager @@ -1240,7 +1251,7 @@ class Device(CompositeEventEmitter): await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg] resolving_keys = await self.keystore.get_resolving_keys() - for (irk, address) in resolving_keys: + for irk, address in resolving_keys: await self.send_command( HCI_LE_Add_Device_To_Resolving_List_Command( peer_identity_address_type=address.address_type, @@ -2228,8 +2239,9 @@ class Device(CompositeEventEmitter): if keys is not None: logger.debug('found keys in the key store') if keys.link_key is None: - logger.debug('no link key') + logger.warning('no link key') return None + return keys.link_key.value # [Classic only] @@ -2434,8 +2446,14 @@ class Device(CompositeEventEmitter): def on_link_key(self, bd_addr, link_key, key_type): # Store the keys in the key store if self.keystore: + authenticated = key_type in ( + HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, + HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, + ) pairing_keys = PairingKeys() - pairing_keys.link_key = PairingKeys.Key(value=link_key) + pairing_keys.link_key = PairingKeys.Key( + value=link_key, authenticated=authenticated + ) async def store_keys(): try: @@ -2701,7 +2719,7 @@ class Device(CompositeEventEmitter): # On Secure Simple Pairing complete, in case: # - Connection isn't already authenticated # - AND we are not the initiator of the authentication - # We must trigger authentication to known if we are truly authenticated + # We must trigger authentication to know if we are truly authenticated if not connection.authenticating and not connection.authenticated: logger.debug( f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] ' @@ -2716,22 +2734,6 @@ class Device(CompositeEventEmitter): # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) - # Map the SMP IO capability to a Classic IO capability - # pylint: disable=line-too-long - io_capability = { - smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY, - smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY, - smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY, - smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, - smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY, - }.get(pairing_config.delegate.io_capability) - - if io_capability is None: - logger.warning( - f'cannot map IO capability ({pairing_config.delegate.io_capability}' - ) - io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY - # Compute the authentication requirements authentication_requirements = ( # No Bonding @@ -2750,53 +2752,50 @@ class Device(CompositeEventEmitter): self.host.send_command_sync( HCI_IO_Capability_Request_Reply_Command( bd_addr=connection.peer_address, - io_capability=io_capability, + io_capability=pairing_config.delegate.classic_io_capability, oob_data_present=0x00, # Not present authentication_requirements=authentication_requirements, ) ) + # [Classic only] + @host_event_handler + @with_connection_from_address + def on_authentication_io_capability_response( + self, connection, io_capability, authentication_requirements + ): + connection.peer_pairing_io_capability = io_capability + connection.peer_pairing_authentication_requirements = ( + authentication_requirements + ) + # [Classic only] @host_event_handler @with_connection_from_address def on_authentication_user_confirmation_request(self, connection, code): # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) - - can_compare = pairing_config.delegate.io_capability not in ( - smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, - smp.SMP_DISPLAY_ONLY_IO_CAPABILITY, - ) + io_capability = pairing_config.delegate.classic_io_capability # Respond - if can_compare: - - async def compare_numbers(): - numbers_match = await connection.abort_on( - 'disconnection', - pairing_config.delegate.compare_numbers(code, digits=6), - ) - if numbers_match: - await self.host.send_command( - HCI_User_Confirmation_Request_Reply_Command( - bd_addr=connection.peer_address - ) - ) - else: - await self.host.send_command( - HCI_User_Confirmation_Request_Negative_Reply_Command( - bd_addr=connection.peer_address - ) + if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY: + if connection.peer_pairing_io_capability in ( + HCI_DISPLAY_YES_NO_IO_CAPABILITY, + HCI_DISPLAY_ONLY_IO_CAPABILITY, + ): + # Display the code and ask the user to compare + async def prompt(): + return ( + await pairing_config.delegate.compare_numbers(code, digits=6), ) - asyncio.create_task(compare_numbers()) - else: + else: + # Ask the user to confirm the pairing, without showing a code + async def prompt(): + return await pairing_config.delegate.confirm() async def confirm(): - confirm = await connection.abort_on( - 'disconnection', pairing_config.delegate.confirm() - ) - if confirm: + if await prompt(): await self.host.send_command( HCI_User_Confirmation_Request_Reply_Command( bd_addr=connection.peer_address @@ -2809,7 +2808,17 @@ class Device(CompositeEventEmitter): ) ) - asyncio.create_task(confirm()) + AsyncRunner.spawn(confirm()) + return + + if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY: + # Display the code to the user + AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6)) + + # Automatic confirmation + self.host.send_command_sync( + HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address) + ) # [Classic only] @host_event_handler @@ -2817,15 +2826,11 @@ class Device(CompositeEventEmitter): def on_authentication_user_passkey_request(self, connection): # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) - - can_input = pairing_config.delegate.io_capability in ( - smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY, - smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY, - ) + io_capability = pairing_config.delegate.classic_io_capability # Respond - if can_input: - + if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY: + # Ask the user to input a number async def get_number(): number = await connection.abort_on( 'disconnection', pairing_config.delegate.get_number() @@ -2855,18 +2860,14 @@ class Device(CompositeEventEmitter): @host_event_handler @with_connection_from_address def on_pin_code_request(self, connection): - # classic legacy pairing + # Classic legacy pairing # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) + io_capability = pairing_config.delegate.classic_io_capability - can_input = pairing_config.delegate.io_capability in ( - smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY, - smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY, - ) - - # respond the pin code - if can_input: - + # Respond + if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY: + # Ask the user to enter a string async def get_pin_code(): pin_code = await connection.abort_on( 'disconnection', pairing_config.delegate.get_string(16) @@ -2906,6 +2907,7 @@ class Device(CompositeEventEmitter): # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) + # Show the passkey to the user connection.abort_on( 'disconnection', pairing_config.delegate.display_number(passkey) ) diff --git a/bumble/host.py b/bumble/host.py index 9aa44640..afde2ee6 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -395,8 +395,8 @@ class Host(AbortableEventEmitter): def supports_command(self, command): # Find the support flag position for this command - for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS): - for (flag_position, value) in enumerate(flags): + for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS): + for flag_position, value in enumerate(flags): if value == command: # Check if the flag is set if octet < len(self.local_supported_commands) and flag_position < 8: @@ -409,7 +409,7 @@ class Host(AbortableEventEmitter): @property def supported_commands(self): commands = [] - for (octet, flags) in enumerate(self.local_supported_commands): + for octet, flags in enumerate(self.local_supported_commands): if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS): for flag in range(8): if flags & (1 << flag) != 0: @@ -839,7 +839,12 @@ class Host(AbortableEventEmitter): self.emit('authentication_io_capability_request', event.bd_addr) def on_hci_io_capability_response_event(self, event): - pass + self.emit( + 'authentication_io_capability_response', + event.bd_addr, + event.io_capability, + event.authentication_requirements, + ) def on_hci_user_confirmation_request_event(self, event): self.emit( diff --git a/bumble/pairing.py b/bumble/pairing.py new file mode 100644 index 00000000..ba5ef463 --- /dev/null +++ b/bumble/pairing.py @@ -0,0 +1,184 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import enum +from typing import Optional, Tuple + +from .hci import ( + HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, + HCI_DISPLAY_ONLY_IO_CAPABILITY, + HCI_DISPLAY_YES_NO_IO_CAPABILITY, + HCI_KEYBOARD_ONLY_IO_CAPABILITY, +) +from .smp import ( + SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, + SMP_KEYBOARD_ONLY_IO_CAPABILITY, + SMP_DISPLAY_ONLY_IO_CAPABILITY, + SMP_DISPLAY_YES_NO_IO_CAPABILITY, + SMP_KEYBOARD_DISPLAY_IO_CAPABILITY, + SMP_ENC_KEY_DISTRIBUTION_FLAG, + SMP_ID_KEY_DISTRIBUTION_FLAG, + SMP_SIGN_KEY_DISTRIBUTION_FLAG, + SMP_LINK_KEY_DISTRIBUTION_FLAG, +) + + +# ----------------------------------------------------------------------------- +class PairingDelegate: + """Abstract base class for Pairing Delegates.""" + + # I/O Capabilities. + # These are defined abstractly, and can be mapped to specific Classic pairing + # and/or SMP constants. + class IoCapability(enum.IntEnum): + NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY + KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY + DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY + DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY + DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY + + # Direct names for backward compatibility. + NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT + KEYBOARD_INPUT_ONLY = IoCapability.KEYBOARD_INPUT_ONLY + DISPLAY_OUTPUT_ONLY = IoCapability.DISPLAY_OUTPUT_ONLY + DISPLAY_OUTPUT_AND_YES_NO_INPUT = IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT + DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT + + # Key Distribution [LE only] + class KeyDistribution(enum.IntFlag): + DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG + DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG + DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG + DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG + + DEFAULT_KEY_DISTRIBUTION: int = ( + SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG + ) + + # Default mapping from abstract to Classic I/O capabilities. + # Subclasses may override this if they prefer a different mapping. + CLASSIC_IO_CAPABILITIES_MAP = { + NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, + KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY, + DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY, + DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY, + DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY, + } + + io_capability: IoCapability + local_initiator_key_distribution: KeyDistribution + local_responder_key_distribution: KeyDistribution + + def __init__( + self, + io_capability=NO_OUTPUT_NO_INPUT, + local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION, + local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION, + ) -> None: + self.io_capability = io_capability + self.local_initiator_key_distribution = local_initiator_key_distribution + self.local_responder_key_distribution = local_responder_key_distribution + + @property + def classic_io_capability(self) -> int: + """Map the abstract I/O capability to a Classic constant.""" + + # pylint: disable=line-too-long + return self.CLASSIC_IO_CAPABILITIES_MAP.get( + self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY + ) + + @property + def smp_io_capability(self) -> int: + """Map the abstract I/O capability to an SMP constant.""" + + # This is just a 1-1 direct mapping + return self.io_capability + + async def accept(self) -> bool: + """Accept or reject a Pairing request.""" + return True + + async def confirm(self) -> bool: + """Respond yes or not to a Pairing confirmation question.""" + return True + + # pylint: disable-next=unused-argument + async def compare_numbers(self, number: int, digits: int) -> bool: + """Compare two numbers.""" + return True + + async def get_number(self) -> Optional[int]: + """ + Return an optional number as an answer to a passkey request. + Returning `None` will result in a negative reply. + """ + return 0 + + async def get_string(self, max_length) -> Optional[str]: + """ + Return a string whose utf-8 encoding is up to max_length bytes. + """ + return None + + # pylint: disable-next=unused-argument + async def display_number(self, number: int, digits: int) -> None: + """Display a number.""" + + # [LE only] + async def key_distribution_response( + self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int + ) -> Tuple[int, int]: + """ + Return the key distribution response in an SMP protocol context. + + NOTE: since it is only used by the SMP protocol, this method's input and output + are directly as integers, using the SMP constants, rather than the abstract + KeyDistribution enums. + """ + return ( + int( + peer_initiator_key_distribution & self.local_initiator_key_distribution + ), + int( + peer_responder_key_distribution & self.local_responder_key_distribution + ), + ) + + +# ----------------------------------------------------------------------------- +class PairingConfig: + """Configuration for the Pairing protocol.""" + + def __init__( + self, + sc: bool = True, + mitm: bool = True, + bonding: bool = True, + delegate: Optional[PairingDelegate] = None, + ) -> None: + self.sc = sc + self.mitm = mitm + self.bonding = bonding + self.delegate = delegate or PairingDelegate() + + def __str__(self) -> str: + return ( + f'PairingConfig(sc={self.sc}, ' + f'mitm={self.mitm}, bonding={self.bonding}, ' + f'delegate[{self.delegate.io_capability}])' + ) diff --git a/bumble/smp.py b/bumble/smp.py index 3fe3b134..d345f8b7 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -31,7 +31,16 @@ from typing import Dict, Optional, Type from pyee import EventEmitter from .colors import color -from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value +from .hci import ( + HCI_DISPLAY_ONLY_IO_CAPABILITY, + HCI_DISPLAY_YES_NO_IO_CAPABILITY, + HCI_KEYBOARD_ONLY_IO_CAPABILITY, + HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, + Address, + HCI_LE_Enable_Encryption_Command, + HCI_Object, + key_with_value, +) from .core import ( BT_BR_EDR_TRANSPORT, BT_CENTRAL_ROLE, @@ -476,7 +485,7 @@ class AddressResolver: address_bytes = bytes(address) hash_part = address_bytes[0:3] prand = address_bytes[3:6] - for (irk, resolved_address) in self.resolving_keys: + for irk, resolved_address in self.resolving_keys: local_hash = crypto.ah(irk, prand) if local_hash == hash_part: # Match! @@ -491,86 +500,6 @@ class AddressResolver: return None -# ----------------------------------------------------------------------------- -class PairingDelegate: - NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY - KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY - DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY - DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY - DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY - DEFAULT_KEY_DISTRIBUTION: int = ( - SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG - ) - - def __init__( - self, - io_capability: int = NO_OUTPUT_NO_INPUT, - local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION, - local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION, - ) -> None: - self.io_capability = io_capability - self.local_initiator_key_distribution = local_initiator_key_distribution - self.local_responder_key_distribution = local_responder_key_distribution - - async def accept(self) -> bool: - return True - - async def confirm(self) -> bool: - return True - - # pylint: disable-next=unused-argument - async def compare_numbers(self, number: int, digits: int) -> bool: - return True - - async def get_number(self) -> Optional[int]: - ''' - Returns an optional number as an answer to a passkey request. - Returning `None` will result in a negative reply. - ''' - return 0 - - async def get_string(self, max_length) -> Optional[str]: - ''' - Returns a string whose utf-8 encoding is up to max_length bytes. - ''' - return None - - # pylint: disable-next=unused-argument - async def display_number(self, number: int, digits: int) -> None: - pass - - async def key_distribution_response( - self, peer_initiator_key_distribution, peer_responder_key_distribution - ): - return ( - (peer_initiator_key_distribution & self.local_initiator_key_distribution), - (peer_responder_key_distribution & self.local_responder_key_distribution), - ) - - -# ----------------------------------------------------------------------------- -class PairingConfig: - def __init__( - self, - sc: bool = True, - mitm: bool = False, - bonding: bool = True, - delegate: Optional[PairingDelegate] = None, - ) -> None: - self.sc = sc - self.mitm = mitm - self.bonding = bonding - self.delegate = delegate or PairingDelegate() - - def __str__(self): - io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability) - return ( - f'PairingConfig(sc={self.sc}, ' - f'mitm={self.mitm}, bonding={self.bonding}, ' - f'delegate[{io_capability_str}])' - ) - - # ----------------------------------------------------------------------------- class Session: # Pairing methods @@ -1662,12 +1591,12 @@ class Manager(EventEmitter): Implements the Initiator and Responder roles of the Security Manager Protocol ''' - def __init__(self, device): + def __init__(self, device, pairing_config_factory): super().__init__() self.device = device self.sessions = {} self._ecc_key = None - self.pairing_config_factory = lambda connection: PairingConfig() + self.pairing_config_factory = pairing_config_factory def send_command(self, connection, command): logger.debug( diff --git a/tests/self_test.py b/tests/self_test.py index 9727d4f0..9105d2b6 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -28,9 +28,8 @@ from bumble.device import Device, Peer from bumble.host import Host from bumble.gatt import Service, Characteristic from bumble.transport import AsyncPipeSink +from bumble.pairing import PairingConfig, PairingDelegate from bumble.smp import ( - PairingConfig, - PairingDelegate, SMP_PAIRING_NOT_SUPPORTED_ERROR, SMP_CONFIRM_VALUE_FAILED_ERROR, ) @@ -262,7 +261,7 @@ async def test_self_gatt_long_read(): found_service = result[0] found_characteristics = await found_service.discover_characteristics() assert len(found_characteristics) == 513 - for (i, characteristic) in enumerate(found_characteristics): + for i, characteristic in enumerate(found_characteristics): value = await characteristic.read_value() assert value == characteristics[i].value @@ -317,11 +316,11 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2): # ----------------------------------------------------------------------------- IO_CAP = [ - PairingDelegate.NO_OUTPUT_NO_INPUT, - PairingDelegate.KEYBOARD_INPUT_ONLY, - PairingDelegate.DISPLAY_OUTPUT_ONLY, - PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, - PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, + PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT, + PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY, + PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY, + PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT, + PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, ] SC = [False, True] MITM = [False, True] @@ -335,7 +334,10 @@ KEY_DIST = range(16) itertools.chain( itertools.product([IO_CAP], SC, MITM, [15]), itertools.product( - [[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST + [[PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], + SC, + MITM, + KEY_DIST, ), ), ) @@ -378,7 +380,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist): else: if ( self.peer_delegate.io_capability - == PairingDelegate.KEYBOARD_INPUT_ONLY + == PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY ): peer_number = 6789 else: @@ -421,7 +423,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist): async def test_self_smp_reject(): class RejectingDelegate(PairingDelegate): def __init__(self): - super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT) + super().__init__(PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT) async def accept(self): return False @@ -442,7 +444,9 @@ async def test_self_smp_reject(): async def test_self_smp_wrong_pin(): class WrongPinDelegate(PairingDelegate): def __init__(self): - super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT) + super().__init__( + PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT + ) async def compare_numbers(self, number, digits): return False