better doc and default behavior for json keystore

This commit is contained in:
Gilles Boccon-Gibod
2023-05-05 15:58:21 -07:00
parent 3de35193bc
commit f80c83d0b3

View File

@@ -190,10 +190,44 @@ class KeyStore:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class JsonKeyStore(KeyStore): class JsonKeyStore(KeyStore):
"""
KeyStore implementation that is backed by a JSON file.
This implementation supports storing a hierarchy of key sets in a single file.
A key set is a representation of a PairingKeys object. Each key set is stored
in a map, with the address of paired peer as the key. Maps are themselves grouped
into namespaces, grouping pairing keys by controller addresses.
The JSON object model looks like:
{
"<namespace>": {
"peer-address": {
"address_type": <n>,
"irk" : {
"authenticated": <true/false>,
"value": "hex-encoded-key"
},
... other keys ...
},
... other peers ...
}
... other namespaces ...
}
A namespace is typically the BD_ADDR of a controller, since that is a convenient
unique identifier, but it may be something else.
A special namespace, called the "default" namespace, is used when instantiating this
class without a namespace. With the default namespace, reading from a file will
load an existing namespace if there is only one, which may be convenient for reading
from a file with a single key set and for which the namespace isn't known. If the
file does not include any existing key set, or if there are more than one and none
has the default name, a new one will be created with the name "__DEFAULT__".
"""
APP_NAME = 'Bumble' APP_NAME = 'Bumble'
APP_AUTHOR = 'Google' APP_AUTHOR = 'Google'
KEYS_DIR = 'Pairing' KEYS_DIR = 'Pairing'
DEFAULT_NAMESPACE = '__DEFAULT__' DEFAULT_NAMESPACE = '__DEFAULT__'
DEFAULT_BASE_NAME = "keys"
def __init__(self, namespace, filename=None): def __init__(self, namespace, filename=None):
self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE
@@ -208,8 +242,9 @@ class JsonKeyStore(KeyStore):
self.directory_name = os.path.join( self.directory_name = os.path.join(
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
) )
base_name = self.DEFAULT_BASE_NAME if namespace is None else self.namespace
json_filename = ( json_filename = (
f'{self.namespace}.json'.lower().replace(':', '-').replace('/p', '-p') f'{base_name}.json'.lower().replace(':', '-').replace('/p', '-p')
) )
self.filename = os.path.join(self.directory_name, json_filename) self.filename = os.path.join(self.directory_name, json_filename)
else: else:
@@ -219,11 +254,12 @@ class JsonKeyStore(KeyStore):
logger.debug(f'JSON keystore: {self.filename}') logger.debug(f'JSON keystore: {self.filename}')
@staticmethod @staticmethod
def from_device(device: Device) -> Optional[JsonKeyStore]: def from_device(device: Device, filename=None) -> Optional[JsonKeyStore]:
if not device.config.keystore: if not filename:
return None # Extract the filename from the config
params = device.config.keystore.split(':', 1)[1:] params = device.config.keystore.split(':', 1)[1:]
if params:
filename = params[0]
# Use a namespace based on the device address # Use a namespace based on the device address
if device.public_address not in (Address.ANY, Address.ANY_RANDOM): if device.public_address not in (Address.ANY, Address.ANY_RANDOM):
@@ -232,19 +268,31 @@ class JsonKeyStore(KeyStore):
namespace = str(device.random_address) namespace = str(device.random_address)
else: else:
namespace = JsonKeyStore.DEFAULT_NAMESPACE namespace = JsonKeyStore.DEFAULT_NAMESPACE
if params:
filename = params[0]
else:
filename = None
return JsonKeyStore(namespace, filename) return JsonKeyStore(namespace, filename)
async def load(self): async def load(self):
# Try to open the file, without failing. If the file does not exist, it
# will be created upon saving.
try: try:
with open(self.filename, 'r', encoding='utf-8') as json_file: with open(self.filename, 'r', encoding='utf-8') as json_file:
return json.load(json_file) db = json.load(json_file)
except FileNotFoundError: except FileNotFoundError:
return {} db = {}
# First, look for a namespace match
if self.namespace in db:
return (db, db[self.namespace])
# Then, if the namespace is the default namespace, and there's
# only one entry in the db, use that
if self.namespace == self.DEFAULT_NAMESPACE and len(db) == 1:
return next(iter(db.items()))
# Finally, just create an empty key map for the namespace
key_map = {}
db[self.namespace] = key_map
return (db, key_map)
async def save(self, db): async def save(self, db):
# Create the directory if it doesn't exist # Create the directory if it doesn't exist
@@ -260,53 +308,30 @@ class JsonKeyStore(KeyStore):
os.replace(temp_filename, self.filename) os.replace(temp_filename, self.filename)
async def delete(self, name: str) -> None: async def delete(self, name: str) -> None:
db = await self.load() db, key_map = await self.load()
del key_map[name]
namespace = db.get(self.namespace)
if namespace is None:
raise KeyError(name)
del namespace[name]
await self.save(db) await self.save(db)
async def update(self, name, keys): async def update(self, name, keys):
db = await self.load() db, key_map = await self.load()
key_map.setdefault(name, {}).update(keys.to_dict())
namespace = db.setdefault(self.namespace, {})
namespace.setdefault(name, {}).update(keys.to_dict())
await self.save(db) await self.save(db)
async def get_all(self): async def get_all(self):
db = await self.load() _, key_map = await self.load()
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in key_map.items()]
namespace = db.get(self.namespace)
if namespace is None:
return []
return [
(name, PairingKeys.from_dict(keys)) for (name, keys) in namespace.items()
]
async def delete_all(self): async def delete_all(self):
db = await self.load() db, key_map = await self.load()
key_map.clear()
db.pop(self.namespace, None)
await self.save(db) await self.save(db)
async def get(self, name: str) -> Optional[PairingKeys]: async def get(self, name: str) -> Optional[PairingKeys]:
db = await self.load() _, key_map = await self.load()
if name not in key_map:
namespace = db.get(self.namespace)
if namespace is None:
return None return None
keys = namespace.get(name) return PairingKeys.from_dict(key_map[name])
if keys is None:
return None
return PairingKeys.from_dict(keys)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------