instantiate keystore after power_on

This commit is contained in:
Gilles Boccon-Gibod
2023-05-05 15:57:13 -07:00
parent 022c23500a
commit 740a2e0ca0
3 changed files with 59 additions and 42 deletions
+14 -14
View File
@@ -207,7 +207,7 @@ def on_connection(connection, request):
# Listen for pairing events
connection.on('pairing_start', on_pairing_start)
connection.on('pairing', on_pairing)
connection.on('pairing', lambda keys: on_pairing(connection.peer_address, keys))
connection.on('pairing_failure', on_pairing_failure)
# Listen for encryption changes
@@ -242,9 +242,9 @@ def on_pairing_start():
# -----------------------------------------------------------------------------
def on_pairing(keys):
def on_pairing(address, keys):
print(color('***-----------------------------------', 'cyan'))
print(color('*** Paired!', 'cyan'))
print(color(f'*** Paired! (peer identity={address})', 'cyan'))
keys.print(prefix=color('*** ', 'cyan'))
print(color('***-----------------------------------', 'cyan'))
Waiter.instance.terminate()
@@ -283,17 +283,6 @@ async def pair(
# Create a device to manage the host
device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
# Set a custom keystore if specified on the command line
if keystore_file:
device.keystore = JsonKeyStore(namespace=None, filename=keystore_file)
# Print the existing keys before pairing
if print_keys and device.keystore:
print(color('@@@-----------------------------------', 'blue'))
print(color('@@@ Pairing Keys:', 'blue'))
await device.keystore.print(prefix=color('@@@ ', 'blue'))
print(color('@@@-----------------------------------', 'blue'))
# Expose a GATT characteristic that can be used to trigger pairing by
# responding with an authentication error when read
if mode == 'le':
@@ -323,6 +312,17 @@ async def pair(
# Get things going
await device.power_on()
# Set a custom keystore if specified on the command line
if keystore_file:
device.keystore = JsonKeyStore.from_device(device, filename=keystore_file)
# Print the existing keys before pairing
if print_keys and device.keystore:
print(color('@@@-----------------------------------', 'blue'))
print(color('@@@ Pairing Keys:', 'blue'))
await device.keystore.print(prefix=color('@@@ ', 'blue'))
print(color('@@@-----------------------------------', 'blue'))
# Set up a pairing config factory
device.pairing_config_factory = lambda connection: PairingConfig(
sc, mitm, bond, Delegate(mode, connection, io, prompt)
+5 -6
View File
@@ -133,15 +133,16 @@ async def scan(
'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
)
await device.power_on()
if keystore_file:
keystore = JsonKeyStore(namespace=None, filename=keystore_file)
device.keystore = keystore
else:
resolver = None
device.keystore = JsonKeyStore.from_device(device, filename=keystore_file)
if device.keystore:
resolving_keys = await device.keystore.get_resolving_keys()
resolver = AddressResolver(resolving_keys)
else:
resolver = None
printer = AdvertisementPrinter(min_rssi, resolver)
if raw:
@@ -149,8 +150,6 @@ async def scan(
else:
device.on('advertisement', printer.on_advertisement)
await device.power_on()
if phy is None:
scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
else:
+40 -22
View File
@@ -22,40 +22,58 @@ import click
from bumble.device import Device
from bumble.keys import JsonKeyStore
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def unbond_with_keystore(keystore, address):
if address is None:
return await keystore.print()
try:
await keystore.delete(address)
except KeyError:
print('!!! pairing not found')
# -----------------------------------------------------------------------------
async def unbond(keystore_file, device_config, address):
# Create a device to manage the host
device = Device.from_config_file(device_config)
# Get all entries in the keystore
async def unbond(keystore_file, device_config, hci_transport, address):
# With a keystore file, we can instantiate the keystore directly
if keystore_file:
keystore = JsonKeyStore(None, keystore_file)
else:
keystore = device.keystore
return await unbond_with_keystore(JsonKeyStore(None, keystore_file), address)
if keystore is None:
print('no keystore')
return
# Without a keystore file, we need to obtain the keystore from the device
async with await open_transport(hci_transport) as (hci_source, hci_sink):
# Create a device to manage the host
device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
if address is None:
await keystore.print()
else:
try:
await keystore.delete(address)
except KeyError:
print('!!! pairing not found')
# Power-on the device to ensure we have a key store
await device.power_on()
return await unbond_with_keystore(device.keystore, address)
# -----------------------------------------------------------------------------
@click.command()
@click.option('--keystore-file', help='File in which to store the pairing keys')
@click.argument('device-config')
@click.option('--keystore-file', help='File in which the pairing keys are stored')
@click.option('--hci-transport', help='HCI transport for the controller')
@click.argument('device-config', required=False)
@click.argument('address', required=False)
def main(keystore_file, device_config, address):
def main(keystore_file, hci_transport, device_config, address):
"""
Remove pairing keys for a device, given its address.
If no keystore file is specified, the --hci-transport option must be used to
connect to a controller, so that the keystore for that controller can be
instantiated.
If no address is passed, the existing pairing keys for all addresses are printed.
"""
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(unbond(keystore_file, device_config, address))
if not keystore_file and not hci_transport:
print('either --keystore-file or --hci-transport must be specified.')
return
asyncio.run(unbond(keystore_file, device_config, hci_transport, address))
# -----------------------------------------------------------------------------