From 740a2e0ca0000b5ad85e55102c77162949de95ef Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Fri, 5 May 2023 15:57:13 -0700 Subject: [PATCH] instantiate keystore after power_on --- apps/pair.py | 28 +++++++++++------------ apps/scan.py | 11 ++++----- apps/unbond.py | 62 ++++++++++++++++++++++++++++++++------------------ 3 files changed, 59 insertions(+), 42 deletions(-) diff --git a/apps/pair.py b/apps/pair.py index a7844fe..162442a 100644 --- a/apps/pair.py +++ b/apps/pair.py @@ -207,7 +207,7 @@ def on_connection(connection, request): # Listen for pairing events connection.on('pairing_start', on_pairing_start) - connection.on('pairing', on_pairing) + connection.on('pairing', lambda keys: on_pairing(connection.peer_address, keys)) connection.on('pairing_failure', on_pairing_failure) # Listen for encryption changes @@ -242,9 +242,9 @@ def on_pairing_start(): # ----------------------------------------------------------------------------- -def on_pairing(keys): +def on_pairing(address, keys): print(color('***-----------------------------------', 'cyan')) - print(color('*** Paired!', 'cyan')) + print(color(f'*** Paired! (peer identity={address})', 'cyan')) keys.print(prefix=color('*** ', 'cyan')) print(color('***-----------------------------------', 'cyan')) Waiter.instance.terminate() @@ -283,17 +283,6 @@ async def pair( # Create a device to manage the host device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) - # Set a custom keystore if specified on the command line - if keystore_file: - device.keystore = JsonKeyStore(namespace=None, filename=keystore_file) - - # Print the existing keys before pairing - if print_keys and device.keystore: - print(color('@@@-----------------------------------', 'blue')) - print(color('@@@ Pairing Keys:', 'blue')) - await device.keystore.print(prefix=color('@@@ ', 'blue')) - print(color('@@@-----------------------------------', 'blue')) - # Expose a GATT characteristic that can be used to trigger pairing by # responding with an authentication error when read if mode == 'le': @@ -323,6 +312,17 @@ async def pair( # Get things going await device.power_on() + # Set a custom keystore if specified on the command line + if keystore_file: + device.keystore = JsonKeyStore.from_device(device, filename=keystore_file) + + # Print the existing keys before pairing + if print_keys and device.keystore: + print(color('@@@-----------------------------------', 'blue')) + print(color('@@@ Pairing Keys:', 'blue')) + await device.keystore.print(prefix=color('@@@ ', 'blue')) + print(color('@@@-----------------------------------', 'blue')) + # Set up a pairing config factory device.pairing_config_factory = lambda connection: PairingConfig( sc, mitm, bond, Delegate(mode, connection, io, prompt) diff --git a/apps/scan.py b/apps/scan.py index dac7a2c..268912f 100644 --- a/apps/scan.py +++ b/apps/scan.py @@ -133,15 +133,16 @@ async def scan( 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink ) + await device.power_on() + if keystore_file: - keystore = JsonKeyStore(namespace=None, filename=keystore_file) - device.keystore = keystore - else: - resolver = None + device.keystore = JsonKeyStore.from_device(device, filename=keystore_file) if device.keystore: resolving_keys = await device.keystore.get_resolving_keys() resolver = AddressResolver(resolving_keys) + else: + resolver = None printer = AdvertisementPrinter(min_rssi, resolver) if raw: @@ -149,8 +150,6 @@ async def scan( else: device.on('advertisement', printer.on_advertisement) - await device.power_on() - if phy is None: scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY] else: diff --git a/apps/unbond.py b/apps/unbond.py index 105d9a4..5ffd746 100644 --- a/apps/unbond.py +++ b/apps/unbond.py @@ -22,40 +22,58 @@ import click from bumble.device import Device from bumble.keys import JsonKeyStore +from bumble.transport import open_transport + +# ----------------------------------------------------------------------------- +async def unbond_with_keystore(keystore, address): + if address is None: + return await keystore.print() + + try: + await keystore.delete(address) + except KeyError: + print('!!! pairing not found') # ----------------------------------------------------------------------------- -async def unbond(keystore_file, device_config, address): - # Create a device to manage the host - device = Device.from_config_file(device_config) - - # Get all entries in the keystore +async def unbond(keystore_file, device_config, hci_transport, address): + # With a keystore file, we can instantiate the keystore directly if keystore_file: - keystore = JsonKeyStore(None, keystore_file) - else: - keystore = device.keystore + return await unbond_with_keystore(JsonKeyStore(None, keystore_file), address) - if keystore is None: - print('no keystore') - return + # Without a keystore file, we need to obtain the keystore from the device + async with await open_transport(hci_transport) as (hci_source, hci_sink): + # Create a device to manage the host + device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) - if address is None: - await keystore.print() - else: - try: - await keystore.delete(address) - except KeyError: - print('!!! pairing not found') + # Power-on the device to ensure we have a key store + await device.power_on() + + return await unbond_with_keystore(device.keystore, address) # ----------------------------------------------------------------------------- @click.command() -@click.option('--keystore-file', help='File in which to store the pairing keys') -@click.argument('device-config') +@click.option('--keystore-file', help='File in which the pairing keys are stored') +@click.option('--hci-transport', help='HCI transport for the controller') +@click.argument('device-config', required=False) @click.argument('address', required=False) -def main(keystore_file, device_config, address): +def main(keystore_file, hci_transport, device_config, address): + """ + Remove pairing keys for a device, given its address. + + If no keystore file is specified, the --hci-transport option must be used to + connect to a controller, so that the keystore for that controller can be + instantiated. + If no address is passed, the existing pairing keys for all addresses are printed. + """ logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) - asyncio.run(unbond(keystore_file, device_config, address)) + + if not keystore_file and not hci_transport: + print('either --keystore-file or --hci-transport must be specified.') + return + + asyncio.run(unbond(keystore_file, device_config, hci_transport, address)) # -----------------------------------------------------------------------------