keys: add an in-memory key-store fallback

Instead of defaulting the key-store to `None`, use an in-memory one.
This way a keystore is always available. A future improvement could be
to rework the device keystore initialization to remove checks like
`if self.keystore:` along the codebase.
This commit is contained in:
uael
2023-04-19 22:20:55 +00:00
parent b5cc167e31
commit 4a333b6c0f
2 changed files with 32 additions and 10 deletions

View File

@@ -25,7 +25,7 @@ import asyncio
import logging
import os
import json
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from .colors import color
from .hci import Address
@@ -139,19 +139,19 @@ class PairingKeys:
# -----------------------------------------------------------------------------
class KeyStore:
async def delete(self, name):
async def delete(self, name: str):
pass
async def update(self, name, keys):
async def update(self, name: str, keys: PairingKeys) -> None:
pass
async def get(self, _name):
return PairingKeys()
async def get(self, _name: str) -> Optional[PairingKeys]:
return None
async def get_all(self):
async def get_all(self) -> List[Tuple[str, PairingKeys]]:
return []
async def delete_all(self):
async def delete_all(self) -> None:
all_keys = await self.get_all()
await asyncio.gather(*(self.delete(name) for (name, _) in all_keys))
@@ -177,15 +177,15 @@ class KeyStore:
separator = '\n'
@staticmethod
def create_for_device(device: Device) -> Optional[KeyStore]:
def create_for_device(device: Device) -> KeyStore:
if device.config.keystore is None:
return None
return MemoryKeyStore()
keystore_type = device.config.keystore.split(':', 1)[0]
if keystore_type == 'JsonKeyStore':
return JsonKeyStore.from_device(device)
return None
return MemoryKeyStore()
# -----------------------------------------------------------------------------
@@ -307,3 +307,24 @@ class JsonKeyStore(KeyStore):
return None
return PairingKeys.from_dict(keys)
# -----------------------------------------------------------------------------
class MemoryKeyStore(KeyStore):
all_keys: Dict[str, PairingKeys]
def __init__(self) -> None:
self.all_keys = {}
async def delete(self, name: str) -> None:
if name in self.all_keys:
del self.all_keys[name]
async def update(self, name: str, keys: PairingKeys) -> None:
self.all_keys[name] = keys
async def get(self, name: str) -> Optional[PairingKeys]:
return self.all_keys.get(name)
async def get_all(self) -> List[Tuple[str, PairingKeys]]:
return list(self.all_keys.items())