diff --git a/tests/keystore_test.py b/tests/keystore_test.py new file mode 100644 index 0000000..2e73039 --- /dev/null +++ b/tests/keystore_test.py @@ -0,0 +1,179 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import json +import logging +import tempfile +import os + +from bumble.keys import JsonKeyStore, PairingKeys + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Tests +# ----------------------------------------------------------------------------- + +JSON1 = """ + { + "my_namespace": { + "14:7D:DA:4E:53:A8/P": { + "address_type": 0, + "irk": { + "authenticated": false, + "value": "e7b2543b206e4e46b44f9e51dad22bd1" + }, + "link_key": { + "authenticated": false, + "value": "0745dd9691e693d9dca740f7d8dfea75" + }, + "ltk": { + "authenticated": false, + "value": "d1897ee10016eb1a08e4e037fd54c683" + } + } + } + } + """ + +JSON2 = """ + { + "my_namespace1": { + }, + "my_namespace2": { + } + } + """ + +JSON3 = """ + { + "my_namespace1": { + }, + "__DEFAULT__": { + "14:7D:DA:4E:53:A8/P": { + "address_type": 0, + "irk": { + "authenticated": false, + "value": "e7b2543b206e4e46b44f9e51dad22bd1" + } + } + } + } + """ + + +# ----------------------------------------------------------------------------- +async def test_basic(): + with tempfile.NamedTemporaryFile(mode="r+", encoding='utf-8') as file: + keystore = JsonKeyStore('my_namespace', file.name) + file.write("{}") + file.flush() + + keys = await keystore.get_all() + assert len(keys) == 0 + + keys = PairingKeys() + await keystore.update('foo', keys) + foo = await keystore.get('foo') + assert foo is not None + assert foo.ltk is None + ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) + keys.ltk = PairingKeys.Key(ltk) + await keystore.update('foo', keys) + foo = await keystore.get('foo') + assert foo is not None + assert foo.ltk is not None + assert foo.ltk.value == ltk + + file.flush() + with open(file.name, "r", encoding="utf-8") as json_file: + json_data = json.load(json_file) + assert 'my_namespace' in json_data + assert 'foo' in json_data['my_namespace'] + assert 'ltk' in json_data['my_namespace']['foo'] + + +# ----------------------------------------------------------------------------- +async def test_parsing(): + with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: + keystore = JsonKeyStore('my_namespace', file.name) + file.write(JSON1) + file.flush() + + foo = await keystore.get('14:7D:DA:4E:53:A8/P') + assert foo is not None + assert foo.ltk.value == bytes.fromhex('d1897ee10016eb1a08e4e037fd54c683') + + +# ----------------------------------------------------------------------------- +async def test_default_namespace(): + with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: + keystore = JsonKeyStore(None, file.name) + file.write(JSON1) + file.flush() + + all_keys = await keystore.get_all() + assert len(all_keys) == 1 + name, keys = all_keys[0] + assert name == '14:7D:DA:4E:53:A8/P' + assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1') + + with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: + keystore = JsonKeyStore(None, file.name) + file.write(JSON2) + file.flush() + + keys = PairingKeys() + ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) + keys.ltk = PairingKeys.Key(ltk) + await keystore.update('foo', keys) + file.flush() + with open(file.name, "r", encoding="utf-8") as json_file: + json_data = json.load(json_file) + assert '__DEFAULT__' in json_data + assert 'foo' in json_data['__DEFAULT__'] + assert 'ltk' in json_data['__DEFAULT__']['foo'] + + with tempfile.NamedTemporaryFile(mode="w", encoding='utf-8') as file: + keystore = JsonKeyStore(None, file.name) + file.write(JSON3) + file.flush() + + all_keys = await keystore.get_all() + assert len(all_keys) == 1 + name, keys = all_keys[0] + assert name == '14:7D:DA:4E:53:A8/P' + assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1') + + +# ----------------------------------------------------------------------------- +async def run_tests(): + await test_basic() + await test_parsing() + await test_default_namespace() + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + asyncio.run(run_tests())