From fca73a49a380e019fcf968f0cfe604959ee31f28 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Mon, 3 Apr 2023 12:39:22 -0700 Subject: [PATCH] use device public or static address for keystore namespace --- bumble/device.py | 15 +++++++++++---- bumble/keys.py | 34 +++++++++++++++++++++++++--------- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/bumble/device.py b/bumble/device.py index 0fa8f160..7add5a15 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -878,7 +878,7 @@ device_host_event_handlers: list[str] = [] # ----------------------------------------------------------------------------- class Device(CompositeEventEmitter): - # incomplete list of fields. + # Incomplete list of fields. random_address: Address public_address: Address classic_enabled: bool @@ -893,6 +893,7 @@ class Device(CompositeEventEmitter): Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]] ] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] + config: DeviceConfiguration @composite_listener class Listener: @@ -980,9 +981,10 @@ class Device(CompositeEventEmitter): self.connect_own_address_type = None # Use the initial config or a default + config = config or DeviceConfiguration() + self.config = config + self.public_address = Address('00:00:00:00:00:00') - if config is None: - config = DeviceConfiguration() self.name = config.name self.random_address = config.address self.class_of_device = config.class_of_device @@ -990,7 +992,7 @@ class Device(CompositeEventEmitter): self.advertising_data = config.advertising_data self.advertising_interval_min = config.advertising_interval_min self.advertising_interval_max = config.advertising_interval_max - self.keystore = KeyStore.create_for_device(config) + self.keystore = None self.irk = config.irk self.le_enabled = config.le_enabled self.classic_enabled = config.classic_enabled @@ -1167,6 +1169,7 @@ class Device(CompositeEventEmitter): # Reset the controller await self.host.reset() + # Try to get the public address from the controller response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg] if response.return_parameters.status == HCI_SUCCESS: logger.debug( @@ -1174,6 +1177,10 @@ class Device(CompositeEventEmitter): ) self.public_address = response.return_parameters.bd_addr + # Instantiate the Key Store (we do this here rather than at __init__ time + # because some Key Store implementations use the public address as a namespace) + self.keystore = KeyStore.create_for_device(self) + if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND): await self.send_command( HCI_Write_LE_Host_Support_Command( diff --git a/bumble/keys.py b/bumble/keys.py index 7fed6602..feb04abf 100644 --- a/bumble/keys.py +++ b/bumble/keys.py @@ -20,15 +20,19 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations import asyncio import logging import os import json -from typing import Optional +from typing import TYPE_CHECKING, Optional from .colors import color from .hci import Address +if TYPE_CHECKING: + from .device import Device + # ----------------------------------------------------------------------------- # Logging @@ -173,13 +177,13 @@ class KeyStore: separator = '\n' @staticmethod - def create_for_device(device_config): - if device_config.keystore is None: + def create_for_device(device: Device) -> Optional[KeyStore]: + if device.config.keystore is None: return None - keystore_type = device_config.keystore.split(':', 1)[0] + keystore_type = device.config.keystore.split(':', 1)[0] if keystore_type == 'JsonKeyStore': - return JsonKeyStore.from_device_config(device_config) + return JsonKeyStore.from_device(device) return None @@ -204,7 +208,9 @@ class JsonKeyStore(KeyStore): self.directory_name = os.path.join( appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR ) - json_filename = f'{self.namespace}.json'.lower().replace(':', '-') + json_filename = ( + f'{self.namespace}.json'.lower().replace(':', '-').replace('/p', '-p') + ) self.filename = os.path.join(self.directory_name, json_filename) else: self.filename = filename @@ -213,9 +219,19 @@ class JsonKeyStore(KeyStore): logger.debug(f'JSON keystore: {self.filename}') @staticmethod - def from_device_config(device_config): - params = device_config.keystore.split(':', 1)[1:] - namespace = str(device_config.address) + def from_device(device: Device) -> Optional[JsonKeyStore]: + if not device.config.keystore: + return None + + params = device.config.keystore.split(':', 1)[1:] + + # Use a namespace based on the device address + if device.public_address not in (Address.ANY, Address.ANY_RANDOM): + namespace = str(device.public_address) + elif device.random_address != Address.ANY_RANDOM: + namespace = str(device.random_address) + else: + namespace = JsonKeyStore.DEFAULT_NAMESPACE if params: filename = params[0] else: