From f80c83d0b3b9134aaf7661b7cb3927de81b12cc9 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Fri, 5 May 2023 15:58:21 -0700 Subject: [PATCH] better doc and default behavior for json keystore --- bumble/keys.py | 117 ++++++++++++++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 46 deletions(-) diff --git a/bumble/keys.py b/bumble/keys.py index a30e753..b09f1c1 100644 --- a/bumble/keys.py +++ b/bumble/keys.py @@ -190,10 +190,44 @@ class KeyStore: # ----------------------------------------------------------------------------- class JsonKeyStore(KeyStore): + """ + KeyStore implementation that is backed by a JSON file. + + This implementation supports storing a hierarchy of key sets in a single file. + A key set is a representation of a PairingKeys object. Each key set is stored + in a map, with the address of paired peer as the key. Maps are themselves grouped + into namespaces, grouping pairing keys by controller addresses. + The JSON object model looks like: + { + "": { + "peer-address": { + "address_type": , + "irk" : { + "authenticated": , + "value": "hex-encoded-key" + }, + ... other keys ... + }, + ... other peers ... + } + ... other namespaces ... + } + + A namespace is typically the BD_ADDR of a controller, since that is a convenient + unique identifier, but it may be something else. + A special namespace, called the "default" namespace, is used when instantiating this + class without a namespace. With the default namespace, reading from a file will + load an existing namespace if there is only one, which may be convenient for reading + from a file with a single key set and for which the namespace isn't known. If the + file does not include any existing key set, or if there are more than one and none + has the default name, a new one will be created with the name "__DEFAULT__". + """ + APP_NAME = 'Bumble' APP_AUTHOR = 'Google' KEYS_DIR = 'Pairing' DEFAULT_NAMESPACE = '__DEFAULT__' + DEFAULT_BASE_NAME = "keys" def __init__(self, namespace, filename=None): self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE @@ -208,8 +242,9 @@ class JsonKeyStore(KeyStore): self.directory_name = os.path.join( appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR ) + base_name = self.DEFAULT_BASE_NAME if namespace is None else self.namespace json_filename = ( - f'{self.namespace}.json'.lower().replace(':', '-').replace('/p', '-p') + f'{base_name}.json'.lower().replace(':', '-').replace('/p', '-p') ) self.filename = os.path.join(self.directory_name, json_filename) else: @@ -219,11 +254,12 @@ class JsonKeyStore(KeyStore): logger.debug(f'JSON keystore: {self.filename}') @staticmethod - def from_device(device: Device) -> Optional[JsonKeyStore]: - if not device.config.keystore: - return None - - params = device.config.keystore.split(':', 1)[1:] + def from_device(device: Device, filename=None) -> Optional[JsonKeyStore]: + if not filename: + # Extract the filename from the config + params = device.config.keystore.split(':', 1)[1:] + if params: + filename = params[0] # Use a namespace based on the device address if device.public_address not in (Address.ANY, Address.ANY_RANDOM): @@ -232,19 +268,31 @@ class JsonKeyStore(KeyStore): namespace = str(device.random_address) else: namespace = JsonKeyStore.DEFAULT_NAMESPACE - if params: - filename = params[0] - else: - filename = None return JsonKeyStore(namespace, filename) async def load(self): + # Try to open the file, without failing. If the file does not exist, it + # will be created upon saving. try: with open(self.filename, 'r', encoding='utf-8') as json_file: - return json.load(json_file) + db = json.load(json_file) except FileNotFoundError: - return {} + db = {} + + # First, look for a namespace match + if self.namespace in db: + return (db, db[self.namespace]) + + # Then, if the namespace is the default namespace, and there's + # only one entry in the db, use that + if self.namespace == self.DEFAULT_NAMESPACE and len(db) == 1: + return next(iter(db.items())) + + # Finally, just create an empty key map for the namespace + key_map = {} + db[self.namespace] = key_map + return (db, key_map) async def save(self, db): # Create the directory if it doesn't exist @@ -260,53 +308,30 @@ class JsonKeyStore(KeyStore): os.replace(temp_filename, self.filename) async def delete(self, name: str) -> None: - db = await self.load() - - namespace = db.get(self.namespace) - if namespace is None: - raise KeyError(name) - - del namespace[name] + db, key_map = await self.load() + del key_map[name] await self.save(db) async def update(self, name, keys): - db = await self.load() - - namespace = db.setdefault(self.namespace, {}) - namespace.setdefault(name, {}).update(keys.to_dict()) - + db, key_map = await self.load() + key_map.setdefault(name, {}).update(keys.to_dict()) await self.save(db) async def get_all(self): - db = await self.load() - - namespace = db.get(self.namespace) - if namespace is None: - return [] - - return [ - (name, PairingKeys.from_dict(keys)) for (name, keys) in namespace.items() - ] + _, key_map = await self.load() + return [(name, PairingKeys.from_dict(keys)) for (name, keys) in key_map.items()] async def delete_all(self): - db = await self.load() - - db.pop(self.namespace, None) - + db, key_map = await self.load() + key_map.clear() await self.save(db) async def get(self, name: str) -> Optional[PairingKeys]: - db = await self.load() - - namespace = db.get(self.namespace) - if namespace is None: + _, key_map = await self.load() + if name not in key_map: return None - keys = namespace.get(name) - if keys is None: - return None - - return PairingKeys.from_dict(keys) + return PairingKeys.from_dict(key_map[name]) # -----------------------------------------------------------------------------