mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
123 lines
4.1 KiB
Python
123 lines
4.1 KiB
Python
# Copyright 2021-2023 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import struct
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
from bumble import device
|
|
from bumble.profiles import csip
|
|
|
|
from .test_utils import TwoDevices
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Logging
|
|
# -----------------------------------------------------------------------------
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_s1():
|
|
assert (
|
|
csip.s1(b'SIRKenc'[::-1])
|
|
== bytes.fromhex('6901983f 18149e82 3c7d133a 7d774572')[::-1]
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_k1():
|
|
K = bytes.fromhex('676e1b9b d448696f 061ec622 3ce5ced9')[::-1]
|
|
SALT = csip.s1(b'SIRKenc'[::-1])
|
|
P = b'csis'[::-1]
|
|
assert (
|
|
csip.k1(K, SALT, P)
|
|
== bytes.fromhex('5277453c c094d982 b0e8ee53 2f2d1f8b')[::-1]
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_sih():
|
|
SIRK = bytes.fromhex('457d7d09 21a1fd22 cecd8c86 dd72cccd')[::-1]
|
|
PRAND = bytes.fromhex('69f563')[::-1]
|
|
assert csip.sih(SIRK, PRAND) == bytes.fromhex('1948da')[::-1]
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_sef():
|
|
SIRK = bytes.fromhex('457d7d09 21a1fd22 cecd8c86 dd72cccd')[::-1]
|
|
K = bytes.fromhex('676e1b9b d448696f 061ec622 3ce5ced9')[::-1]
|
|
assert (
|
|
csip.sef(K, SIRK) == bytes.fromhex('170a3835 e13524a0 7e2562d5 f25fd346')[::-1]
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
'sirk_type,', [(csip.SirkType.ENCRYPTED), (csip.SirkType.PLAINTEXT)]
|
|
)
|
|
async def test_csis(sirk_type):
|
|
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
|
|
LTK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
|
|
|
|
devices = TwoDevices()
|
|
devices[0].add_service(
|
|
csip.CoordinatedSetIdentificationService(
|
|
set_identity_resolving_key=SIRK,
|
|
set_identity_resolving_key_type=sirk_type,
|
|
coordinated_set_size=2,
|
|
set_member_lock=csip.MemberLock.UNLOCKED,
|
|
set_member_rank=0,
|
|
)
|
|
)
|
|
|
|
await devices.setup_connection()
|
|
|
|
# Mock encryption.
|
|
devices.connections[0].encryption = 1
|
|
devices.connections[1].encryption = 1
|
|
devices[0].get_long_term_key = mock.AsyncMock(return_value=LTK)
|
|
devices[1].get_long_term_key = mock.AsyncMock(return_value=LTK)
|
|
|
|
peer = device.Peer(devices.connections[1])
|
|
csis_client = await peer.discover_service_and_create_proxy(
|
|
csip.CoordinatedSetIdentificationProxy
|
|
)
|
|
|
|
assert await csis_client.read_set_identity_resolving_key() == (sirk_type, SIRK)
|
|
assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2)
|
|
assert await csis_client.set_member_lock.read_value() == struct.pack(
|
|
'B', csip.MemberLock.UNLOCKED
|
|
)
|
|
assert await csis_client.set_member_rank.read_value() == struct.pack('B', 0)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def run():
|
|
test_sih()
|
|
await test_csis()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
|
asyncio.run(run())
|