Compare commits

...

18 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 009ecfce96 use list comprehension 2022-07-19 19:53:18 -07:00
Gilles Boccon-Gibod d6075df356 add tool 2022-07-19 19:53:18 -07:00
Gilles Boccon-Gibod ebd0a0c8ca more complete set of HCI types and constants 2022-07-19 19:53:18 -07:00
Gilles Boccon-Gibod b64fa65921 Merge pull request #10 from zxzxwu/main
Make pairing and link mode configurable
2022-07-18 12:48:07 -07:00
Josh Wu 7d87c3cc3a Make pairing and link mode configurable 2022-07-18 14:28:21 +08:00
Gilles Boccon-Gibod 94fc81c183 Merge pull request #7 from DeltaEvo/david/config-load
Make DeviceConfiguration loadable from a dict
2022-06-25 05:08:44 -07:00
Gilles Boccon-Gibod b65b395fc4 Merge pull request #8 from Arrowbox/threadsafe_usb_close
Use threadsafe call when setting event_loop_done
2022-06-25 05:07:42 -07:00
David Duarte 0f157d55f7 Make DeviceConfiguration loadable from a dict 2022-06-24 09:57:16 +00:00
Jayson Messenger 925d79491f Use threadsafe call when setting event_loop_done
Previously, the close method would hang waiting on the future to be
done.
2022-06-23 15:19:05 -04:00
Gilles Boccon-Gibod 3d14df909c Merge pull request #5 from google/gbg/disconnection-event-routing
fix the routing of disconnection events
2022-06-15 10:50:14 -07:00
Gilles Boccon-Gibod 153788afe3 fix the routing of disconnection events 2022-06-14 14:38:40 -07:00
Gilles Boccon-Gibod 99ca31c063 Merge pull request #4 from google/gbg/classic-pairing-io
classic pairing io
2022-06-14 11:32:37 -07:00
Gilles Boccon-Gibod 9629e677f2 improve readability as per PR suggestion 2022-06-14 10:57:54 -07:00
Gilles Boccon-Gibod 250c1e3395 address PR comments 2022-06-13 16:44:57 -07:00
Gilles Boccon-Gibod 70dca1d7c9 cosmetic fix 2022-06-11 15:50:53 -07:00
Gilles Boccon-Gibod a5015c1305 add pytest async options 2022-06-11 12:28:00 -07:00
Gilles Boccon-Gibod 6e22df4838 add doc structure 2022-06-11 12:19:54 -07:00
Gilles Boccon-Gibod b4e2f21d2a add classic pairing io delegation 2022-06-11 01:33:51 -07:00
16 changed files with 1876 additions and 695 deletions
+105
View File
@@ -0,0 +1,105 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
import click
from colors import color
from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_Local_Name_Command,
HCI_READ_LOCAL_NAME_COMMAND
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def get_classic_info(host):
if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
response = await host.send_command(HCI_Read_BD_ADDR_Command())
if response.return_parameters.status == HCI_SUCCESS:
print()
print(color('Classic Address:', 'yellow'), response.return_parameters.bd_addr)
if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND):
response = await host.send_command(HCI_Read_Local_Name_Command())
if response.return_parameters.status == HCI_SUCCESS:
print()
print(color('Local Name:', 'yellow'), map_null_terminated_utf8_string(response.return_parameters.local_name))
# -----------------------------------------------------------------------------
async def get_le_info(host):
print()
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))
# -----------------------------------------------------------------------------
async def async_main(transport):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
print('<<< connected')
host = Host(hci_source, hci_sink)
await host.reset()
# Print version
print(color('Version:', 'yellow'))
print(color(' Manufacturer: ', 'green'), name_or_number(COMPANY_IDENTIFIERS, host.local_version.company_identifier))
print(color(' HCI Version: ', 'green'), name_or_number(HCI_VERSION_NAMES, host.local_version.hci_version))
print(color(' HCI Subversion:', 'green'), host.local_version.hci_subversion)
print(color(' LMP Version: ', 'green'), name_or_number(LMP_VERSION_NAMES, host.local_version.lmp_version))
print(color(' LMP Subversion:', 'green'), host.local_version.lmp_subversion)
# Get the Classic info
await get_classic_info(host)
# Get the LE info
await get_le_info(host)
# Print the list of commands supported by the controller
print()
print(color('Supported Commands:', 'yellow'))
for command in host.supported_commands:
print(' ', HCI_Command.command_name(command))
# -----------------------------------------------------------------------------
@click.command()
@click.argument('transport')
def main(transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(async_main(transport))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()
+63 -34
View File
@@ -44,7 +44,7 @@ from bumble.att import (
# -----------------------------------------------------------------------------
class Delegate(PairingDelegate):
def __init__(self, connection, capability_string, prompt):
def __init__(self, mode, connection, capability_string, prompt):
super().__init__({
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
@@ -53,6 +53,7 @@ class Delegate(PairingDelegate):
'none': PairingDelegate.NO_OUTPUT_NO_INPUT
}[capability_string.lower()])
self.mode = mode
self.peer = Peer(connection)
self.peer_name = None
self.prompt = prompt
@@ -64,7 +65,7 @@ class Delegate(PairingDelegate):
# Try to get the peer's name
if self.peer:
peer_name = await get_peer_name(self.peer)
peer_name = await get_peer_name(self.peer, self.mode)
self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]'
else:
self.peer_name = '[?]'
@@ -91,7 +92,7 @@ class Delegate(PairingDelegate):
# Accept silently
return True
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
await self.update_peer_name()
# Wait a bit to allow some of the log lines to print before we prompt
@@ -102,7 +103,7 @@ class Delegate(PairingDelegate):
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
print(color('###-----------------------------------', 'yellow'))
while True:
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:06}? ', 'yellow'))
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:0{digits}}? ', 'yellow'))
response = response.lower().strip()
if response == 'yes':
return True
@@ -125,7 +126,7 @@ class Delegate(PairingDelegate):
except ValueError:
pass
async def display_number(self, number):
async def display_number(self, number, digits):
await self.update_peer_name()
# Wait a bit to allow some of the log lines to print before we prompt
@@ -134,19 +135,23 @@ class Delegate(PairingDelegate):
# Display a PIN code
print(color('###-----------------------------------', 'yellow'))
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
print(color(f'### PIN: {number:06}', 'yellow'))
print(color(f'### PIN: {number:0{digits}}', 'yellow'))
print(color('###-----------------------------------', 'yellow'))
# -----------------------------------------------------------------------------
async def get_peer_name(peer):
services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
if not services:
return None
async def get_peer_name(peer, mode):
if mode == 'classic':
return await peer.request_name()
else:
# Try to get the peer name from GATT
services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
if not services:
return None
values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if values:
return values[0].decode('utf-8')
values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if values:
return values[0].decode('utf-8')
# -----------------------------------------------------------------------------
@@ -224,9 +229,22 @@ def on_pairing_failure(reason):
# -----------------------------------------------------------------------------
async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
async def pair(
mode,
sc,
mitm,
bond,
io,
prompt,
request,
print_keys,
keystore_file,
device_config,
hci_transport,
address_or_name
):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
# Create a device to manage the host
@@ -245,19 +263,25 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
# Expose a GATT characteristic that can be used to trigger pairing by
# responding with an authentication error when read
device.add_service(
Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=read_with_error, write=write_with_error)
)
]
if mode == 'le':
device.add_service(
Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=read_with_error, write=write_with_error)
)
]
)
)
)
# Select LE or Classic
if mode == 'classic':
device.classic_enabled = True
device.le_enabled = False
# Get things going
await device.power_on()
@@ -267,7 +291,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
sc,
mitm,
bond,
Delegate(connection, io, prompt)
Delegate(mode, connection, io, prompt)
)
# Connect to a peer or wait for a connection
@@ -278,10 +302,14 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
if not request:
try:
await connection.pair()
if mode == 'le':
await connection.pair()
else:
await connection.authenticate()
return
except ProtocolError as error:
print(color(f'Pairing failed: {error}', 'red'))
return
except ProtocolError:
pass
else:
# Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True)
@@ -291,6 +319,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
# -----------------------------------------------------------------------------
@click.command()
@click.option('--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True)
@click.option('--sc', type=bool, default=True, help='Use the Secure Connections protocol', show_default=True)
@click.option('--mitm', type=bool, default=True, help='Request MITM protection', show_default=True)
@click.option('--bond', type=bool, default=True, help='Enable bonding', show_default=True)
@@ -300,11 +329,11 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing')
@click.option('--keystore-file', help='File in which to store the pairing keys')
@click.argument('device-config')
@click.argument('transport')
@click.argument('hci_transport')
@click.argument('address-or-name', required=False)
def main(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
def main(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name))
asyncio.run(pair(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name))
# -----------------------------------------------------------------------------
+12 -12
View File
@@ -77,7 +77,7 @@ class Controller:
self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0
self.white_list_size = 8
self.filter_accept_list_size = 8
self.resolving_list_size = 8
self.supported_max_tx_octets = 27
self.supported_max_tx_time = 10000 # microseconds
@@ -731,27 +731,27 @@ class Controller:
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_white_list_size_command(self, command):
def on_hci_le_read_filter_accept_list_size_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read White List Size Command
See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read Filter Accept List Size Command
'''
return bytes([HCI_SUCCESS, self.white_list_size])
return bytes([HCI_SUCCESS, self.filter_accept_list_size])
def on_hci_le_clear_white_list_command(self, command):
def on_hci_le_clear_filter_accept_list_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear White List Command
See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_add_device_to_white_list_command(self, command):
def on_hci_le_add_device_to_filter_accept_list_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To White List Command
See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_remove_device_from_white_list_command(self, command):
def on_hci_le_remove_device_from_filter_accept_list_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From White List Command
See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
@@ -780,9 +780,9 @@ class Controller:
'''
return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64))
def on_hci_le_start_encryption_command(self, command):
def on_hci_le_enable_encryption_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.24 LE Start Encryption Command
See Bluetooth spec Vol 2, Part E - 7.8.24 LE Enable Encryption Command
'''
# Check the parameters
+255 -47
View File
@@ -137,6 +137,10 @@ class Peer:
def get_characteristics_by_uuid(self, uuid, service = None):
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
# [Classic only]
async def request_name(self):
return await self.connection.request_remote_name()
def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -176,6 +180,7 @@ class Connection(CompositeEventEmitter):
self.transport = transport
self.peer_address = peer_address
self.peer_resolvable_address = peer_resolvable_address
self.peer_name = None # Classic only
self.role = role
self.parameters = parameters
self.encryption = 0
@@ -231,6 +236,10 @@ class Connection(CompositeEventEmitter):
supervision_timeout
)
# [Classic only]
async def request_remote_name(self):
return await self.device.request_remote_name(self)
def __str__(self):
return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})'
@@ -245,39 +254,48 @@ class DeviceConfiguration:
self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.advertising_data = bytes(
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
)
self.irk = bytes(16) # This really must be changed for any level of security
self.keystore = None
def load_from_dict(self, config):
# Load simple properties
self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address))
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore')
self.le_enabled = config.get('le_enabled', self.le_enabled)
self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled)
self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled)
self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled)
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
self.irk = bytes.fromhex(irk)
else:
# Construct an IRK from the address bytes
# NOTE: this is not secure, but will always give the same IRK for the same address
address_bytes = bytes(self.address)
self.irk = (address_bytes * 3)[:16]
# Load advertising data
advertising_data = config.get('advertising_data')
if advertising_data:
self.advertising_data = bytes.fromhex(advertising_data)
def load_from_file(self, filename):
with open(filename, 'r') as file:
config = json.load(file)
# Load simple properties
self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address))
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore')
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
self.irk = bytes.fromhex(irk)
else:
# Construct an IRK from the address bytes
# NOTE: this is not secure, but will always give the same IRK for the same address
address_bytes = bytes(self.address)
self.irk = (address_bytes * 3)[:16]
# Load advertising data
advertising_data = config.get('advertising_data')
if advertising_data:
self.advertising_data = bytes.fromhex(advertising_data)
self.load_from_dict(json.load(file))
# -----------------------------------------------------------------------------
# Decorators used with the following Device class
@@ -285,14 +303,25 @@ class DeviceConfiguration:
# within a class requires unnecessarily complicated acrobatics)
# -----------------------------------------------------------------------------
# Decorator that converts the first argument from a connection handle to a connection
def with_connection_from_handle(function):
@functools.wraps(function)
def wrapper(self, connection_handle, *args, **kwargs):
if (connection := self.lookup_connection(connection_handle)) is None:
logger.warn(f'no connection found for handle 0x{connection_handle:04X}')
return
function(self, connection, *args, **kwargs)
raise ValueError('no connection for handle')
return function(self, connection, *args, **kwargs)
return wrapper
# Decorator that converts the first argument from a bluetooth address to a connection
def with_connection_from_address(function):
@functools.wraps(function)
def wrapper(self, address, *args, **kwargs):
for connection in self.connections.values():
if connection.peer_address == address:
return function(self, connection, *args, **kwargs)
raise ValueError('no connection for address')
return wrapper
@@ -368,7 +397,6 @@ class Device(CompositeEventEmitter):
self.connecting = False
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.le_enabled = True
self.classic_enabled = False
self.discoverable = False
self.connectable = False
@@ -388,6 +416,10 @@ class Device(CompositeEventEmitter):
self.advertising_interval_max = config.advertising_interval_max
self.keystore = keys.KeyStore.create_for_device(config)
self.irk = config.irk
self.le_enabled = config.le_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
# If a name is passed, override the name from the config
if name:
@@ -453,6 +485,11 @@ class Device(CompositeEventEmitter):
if connection := self.connections.get(connection_handle):
return connection
def find_connection_by_bd_addr(self, bd_addr):
for connection in self.connections.values():
if connection.peer_address == bd_addr:
return connection
def register_l2cap_server(self, psm, server):
self.l2cap_channel_manager.register_server(psm, server)
@@ -480,6 +517,11 @@ class Device(CompositeEventEmitter):
logger.debug(color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow'))
self.public_address = response.return_parameters.bd_addr
await self.send_command(HCI_Write_LE_Host_Support_Command(
le_supported_host = int(self.le_enabled),
simultaneous_le_host = int(self.le_simultaneous_enabled),
))
if self.le_enabled:
# Set the controller address
await self.send_command(HCI_LE_Set_Random_Address_Command(
@@ -517,7 +559,12 @@ class Device(CompositeEventEmitter):
HCI_Write_Class_Of_Device_Command(class_of_device = self.class_of_device)
)
await self.send_command(
HCI_Write_Simple_Pairing_Mode_Command(simple_pairing_mode = 0x01)
HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled))
)
await self.send_command(
HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled))
)
# Let the SMP manager know about the address
@@ -727,7 +774,7 @@ class Device(CompositeEventEmitter):
try:
peer_address = Address(peer_address)
except ValueError:
# If the address is not parssable, assume it is a name instead
# If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(peer_address, transport)
@@ -792,8 +839,8 @@ class Device(CompositeEventEmitter):
async def disconnect(self, connection, reason):
# Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future()
self.on('disconnection', pending_disconnection.set_result)
self.on('disconnection_failure', pending_disconnection.set_exception)
connection.on('disconnection', pending_disconnection.set_result)
connection.on('disconnection_failure', pending_disconnection.set_exception)
# Request a disconnection
result = await self.send_command(HCI_Disconnect_Command(connection_handle = connection.handle, reason = reason))
@@ -806,8 +853,8 @@ class Device(CompositeEventEmitter):
self.disconnecting = True
return await pending_disconnection
finally:
self.remove_listener('disconnection', pending_disconnection.set_result)
self.remove_listener('disconnection_failure', pending_disconnection.set_exception)
connection.remove_listener('disconnection', pending_disconnection.set_result)
connection.remove_listener('disconnection_failure', pending_disconnection.set_exception)
self.disconnecting = False
async def update_connection_parameters(
@@ -936,13 +983,13 @@ class Device(CompositeEventEmitter):
# Set up event handlers
pending_authentication = asyncio.get_running_loop().create_future()
def on_authentication_complete():
def on_authentication():
pending_authentication.set_result(None)
def on_authentication_failure(error):
pending_authentication.set_exception(error)
def on_authentication_failure(error_code):
pending_authentication.set_exception(HCI_Error(error_code))
connection.on('connection_authentication_complete', on_authentication_complete)
connection.on('connection_authentication', on_authentication)
connection.on('connection_authentication_failure', on_authentication_failure)
# Request the authentication
@@ -957,7 +1004,7 @@ class Device(CompositeEventEmitter):
# Wait for the authentication to complete
await pending_authentication
finally:
connection.remove_listener('connection_authentication_complete', on_authentication_complete)
connection.remove_listener('connection_authentication', on_authentication)
connection.remove_listener('connection_authentication_failure', on_authentication_failure)
async def encrypt(self, connection):
@@ -999,7 +1046,7 @@ class Device(CompositeEventEmitter):
raise InvalidStateError('only centrals can start encryption')
result = await self.send_command(
HCI_LE_Start_Encryption_Command(
HCI_LE_Enable_Encryption_Command(
connection_handle = connection.handle,
random_number = rand,
encrypted_diversifier = ediv,
@@ -1008,7 +1055,7 @@ class Device(CompositeEventEmitter):
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_LE_Start_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
logger.warn(f'HCI_LE_Enable_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
else:
result = await self.send_command(
@@ -1028,6 +1075,40 @@ class Device(CompositeEventEmitter):
connection.remove_listener('connection_encryption_change', on_encryption_change)
connection.remove_listener('connection_encryption_failure', on_encryption_failure)
# [Classic only]
async def request_remote_name(self, connection):
# Set up event handlers
pending_name = asyncio.get_running_loop().create_future()
def on_remote_name():
pending_name.set_result(connection.peer_name)
def on_remote_name_failure(error_code):
pending_name.set_exception(HCI_Error(error_code))
connection.on('remote_name', on_remote_name)
connection.on('remote_name_failure', on_remote_name_failure)
try:
result = await self.send_command(
HCI_Remote_Name_Request_Command(
bd_addr = connection.peer_address,
page_scan_repetition_mode = HCI_Remote_Name_Request_Command.R0, # TODO investigate other options
reserved = 0,
clock_offset = 0 # TODO investigate non-0 values
)
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_Set_Connection_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
# Wait for the result
return await pending_name
finally:
connection.remove_listener('remote_name', on_remote_name)
connection.remove_listener('remote_name_failure', on_remote_name_failure)
# [Classic only]
@host_event_handler
def on_link_key(self, bd_addr, link_key, key_type):
@@ -1123,14 +1204,15 @@ class Device(CompositeEventEmitter):
asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising))
@host_event_handler
def on_disconnection_failure(self, error_code):
@with_connection_from_handle
def on_disconnection_failure(self, connection, error_code):
logger.debug(f'*** Disconnection failed: {error_code}')
error = ConnectionError(
error_code,
'hci',
HCI_Constant.error_name(error_code)
)
self.emit('disconnection_failure', error)
connection.emit('disconnection_failure', error)
@host_event_handler
@AsyncRunner.run_in_task()
@@ -1141,10 +1223,10 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_authentication_complete(self, connection):
logger.debug(f'*** Connection Authentication Complete: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
def on_connection_authentication(self, connection):
logger.debug(f'*** Connection Authentication: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
connection.authenticated = True
connection.emit('connection_authentication_complete')
connection.emit('connection_authentication')
@host_event_handler
@with_connection_from_handle
@@ -1152,6 +1234,132 @@ class Device(CompositeEventEmitter):
logger.debug(f'*** Connection Authentication Failure: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
connection.emit('connection_authentication_failure', error)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
# Map the SMP IO capability to a Classic IO capability
io_capability = {
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY
}.get(pairing_config.delegate.io_capability)
if io_capability is None:
logger.warning(f'cannot map IO capability ({pairing_config.delegate.io_capability}')
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# Compute the authentication requirements
authentication_requirements = (
# No Bonding
(
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS
),
# General Bonding
(
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS
)
)[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
# Respond
self.host.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr = connection.peer_address,
io_capability = io_capability,
oob_data_present = 0x00, # Not present
authentication_requirements = authentication_requirements
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_confirmation_request(self, connection, code):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_confirm = pairing_config.delegate.io_capability not in {
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY
}
# Respond
if can_confirm and pairing_config.delegate:
async def compare_numbers():
numbers_match = await pairing_config.delegate.compare_numbers(code, digits=6)
if numbers_match:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
asyncio.create_task(compare_numbers())
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_passkey_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_input = pairing_config.delegate.io_capability in {
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
}
# Respond
if can_input and pairing_config.delegate:
async def get_number():
number = await pairing_config.delegate.get_number()
if number is not None:
self.host.send_command_sync(
HCI_User_Passkey_Request_Reply_Command(
bd_addr = connection.peer_address,
numeric_value = number)
)
else:
self.host.send_command_sync(
HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
asyncio.create_task(get_number())
else:
self.host.send_command_sync(
HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_remote_name(self, connection, remote_name):
# Try to decode the name
try:
connection.peer_name = remote_name.decode('utf-8')
connection.emit('remote_name')
except UnicodeDecodeError as error:
logger.warning('peer name is not valid UTF-8')
connection.emit('remote_name_failure', error)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_remote_name_failure(self, connection, error):
connection.emit('remote_name_failure', error)
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption):
+1311 -548
View File
File diff suppressed because it is too large Load Diff
+85 -38
View File
@@ -81,10 +81,13 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None
self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only
# Connect to the source and sink if specified
if controller_source:
@@ -96,34 +99,51 @@ class Host(EventEmitter):
await self.send_command(HCI_Reset_Command())
self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status != HCI_SUCCESS:
raise ProtocolError(response.return_parameters.status, 'hci')
self.local_supported_commands = response.return_parameters.supported_commands
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFFFF')))
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = bytes.fromhex('FFFFF00000000000')))
await self.send_command(HCI_Read_Local_Version_Information_Command())
await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0))
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={response.return_parameters.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={response.return_parameters.hc_total_num_le_acl_data_packets}')
self.local_supported_commands = response.return_parameters.supported_commands
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# Read the non-LE-specific values
response = await self.send_command(HCI_Read_Buffer_Size_Command())
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0))
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_le_acl_data_packet_length = self.hc_le_acl_data_packet_length or self.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = self.hc_total_num_le_acl_data_packets or self.hc_total_num_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}')
self.local_version = response.return_parameters
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={response.return_parameters.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={response.return_parameters.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# Read the non-LE-specific values
response = await self.send_command(HCI_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_le_acl_data_packet_length = self.hc_le_acl_data_packet_length or self.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = self.hc_total_num_le_acl_data_packets or self.hc_total_num_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
self.reset_done = True
@@ -210,6 +230,37 @@ class Host(EventEmitter):
self.send_hci_packet(packet)
self.acl_packets_in_flight += 1
def supports_command(self, command):
# Find the support flag position for this command
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for (flag_position, value) in enumerate(flags):
if value == command:
# Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8:
return (self.local_supported_commands[octet] & (1 << flag_position)) != 0
return False
@property
def supported_commands(self):
commands = []
for (octet, flags) in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None:
commands.append(command)
return commands
def supports_le_feature(self, feature):
return (self.local_le_features & (1 << feature)) != 0
@property
def supported_le_features(self):
return [feature for feature in range(64) if self.local_le_features & (1 << feature)]
# Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet):
hci_packet = HCI_Packet.from_bytes(packet)
@@ -363,7 +414,7 @@ class Host(EventEmitter):
logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
# Notify the client
self.emit('connection_failure', event.status)
self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_disconnection_complete_event(self, event):
# Find the connection
@@ -495,7 +546,7 @@ class Host(EventEmitter):
def on_hci_authentication_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
self.emit('connection_authentication_complete', event.connection_handle)
self.emit('connection_authentication', event.connection_handle)
else:
self.emit('connection_authentication_failure', event.connection_handle, event.status)
@@ -560,26 +611,16 @@ class Host(EventEmitter):
asyncio.create_task(send_link_key())
def on_hci_io_capability_request_event(self, event):
# For now, just return NoInputNoOutput and no MITM
# TODO: delegate the decision
self.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr = event.bd_addr,
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
oob_data_present = 0x00,
authentication_requirements = 0x00 # 0x02 # FIXME: testing only
)
)
self.emit('authentication_io_capability_request', event.bd_addr)
def on_hci_io_capability_response_event(self, event):
pass
def on_hci_user_confirmation_request_event(self, event):
# For now, just confirm everything
# TODO: delegate the decision
self.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr = event.bd_addr)
)
self.emit('authentication_user_confirmation_request', event.bd_addr, event.numeric_value)
def on_hci_user_passkey_request_event(self, event):
self.emit('authentication_user_passkey_request', event.bd_addr)
def on_hci_inquiry_complete_event(self, event):
self.emit('inquiry_complete')
@@ -602,3 +643,9 @@ class Host(EventEmitter):
event.extended_inquiry_response,
event.rssi
)
def on_hci_remote_name_request_complete_event(self, event):
if event.status != HCI_SUCCESS:
self.emit('remote_name_failure', event.bd_addr, event.status)
else:
self.emit('remote_name', event.bd_addr, event.remote_name)
+5 -5
View File
@@ -464,13 +464,13 @@ class PairingDelegate:
async def accept(self):
return True
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits=6):
return True
async def get_number(self):
return 0
async def display_number(self, number):
async def display_number(self, number, digits=6):
pass
@@ -699,7 +699,7 @@ class Session:
async def prompt():
logger.debug(f'verification code: {code}')
try:
response = await self.pairing_config.delegate.compare_numbers(code)
response = await self.pairing_config.delegate.compare_numbers(code, digits=6)
if response:
next_steps()
return
@@ -733,7 +733,7 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey))
asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey, digits=6))
def input_passkey(self, next_steps=None):
# Prompt the user for the passkey displayed on the peer
@@ -852,7 +852,7 @@ class Session:
# distribute the long term and/or other keys over an encrypted connection
asyncio.create_task(
self.manager.device.host.send_command(
HCI_LE_Start_Encryption_Command(
HCI_LE_Enable_Encryption_Command(
connection_handle = self.connection.handle,
random_number = bytes(8),
encrypted_diversifier = 0,
+1 -1
View File
@@ -221,7 +221,7 @@ async def open_usb_transport(spec):
pass
logger.debug('USB event loop done')
self.event_loop_done.set_result(None)
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self):
self.closed = True
@@ -0,0 +1,2 @@
GATT DUMP TOOL
==============
+2
View File
@@ -5,6 +5,8 @@ Included in the project are a few apps and tools, built on top of the core libra
These include:
* [Console](console.md) - an interactive text-based console
* [Pair](pair.md) - Pair/bond two devices (LE and Classic)
* [Unbond](unbond.md) - Remove a previously established bond
* [HCI Bridge](hci_bridge.md) - a HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets
* [Golden Gate Bridge](gg_bridge.md) - a bridge between GATT and UDP to use with the Golden Gate "stack tool"
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form
+2
View File
@@ -0,0 +1,2 @@
PAIR TOOL
=========
+2
View File
@@ -0,0 +1,2 @@
SHOW TOOL
=========
+2
View File
@@ -0,0 +1,2 @@
UNBOND TOOL
===========
+24 -6
View File
@@ -294,8 +294,8 @@ def test_HCI_LE_Create_Connection_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Add_Device_To_White_List_Command():
command = HCI_LE_Add_Device_To_White_List_Command(
def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command():
command = HCI_LE_Add_Device_To_Filter_Accept_List_Command(
address_type = 1,
address = Address('00:11:22:33:44:55')
)
@@ -303,8 +303,8 @@ def test_HCI_LE_Add_Device_To_White_List_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Remove_Device_From_White_List_Command():
command = HCI_LE_Remove_Device_From_White_List_Command(
def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command(
address_type = 1,
address = Address('00:11:22:33:44:55')
)
@@ -343,6 +343,23 @@ def test_HCI_LE_Set_Default_PHY_Command():
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
],
scan_intervals=[1, 2, 3],
scan_windows=[4, 5, 6]
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
@@ -391,11 +408,12 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Add_Device_To_White_List_Command()
test_HCI_LE_Remove_Device_From_White_List_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command()
test_HCI_LE_Read_Remote_Features_Command()
test_HCI_LE_Set_Default_PHY_Command()
test_HCI_LE_Set_Extended_Scan_Parameters_Command()
# -----------------------------------------------------------------------------
+1
View File
@@ -1,2 +1,3 @@
[pytest]
junit_logging = all
asyncio_mode = auto
+4 -4
View File
@@ -208,11 +208,11 @@ async def test_self_smp():
self.peer_delegate = None
self.number = asyncio.get_running_loop().create_future()
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
if self.peer_delegate is None:
logger.warn(f'[{self.name}] no peer delegate')
return False
await self.display_number(number)
await self.display_number(number, digits=6)
logger.debug(f'[{self.name}] waiting for peer number')
peer_number = await self.peer_delegate.number
logger.debug(f'[{self.name}] comparing numbers: {number} and {peer_number}')
@@ -231,7 +231,7 @@ async def test_self_smp():
logger.debug(f'[{self.name}] returning number: {peer_number}')
return peer_number
async def display_number(self, number):
async def display_number(self, number, digits):
logger.debug(f'[{self.name}] displaying number: {number}')
self.number.set_result(number)
@@ -293,7 +293,7 @@ async def test_self_smp_wrong_pin():
def __init__(self):
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
return False
wrong_pin_pairing_config = PairingConfig(delegate = WrongPinDelegate())