From 901eb55b0e1f9a7148162db8f15d970dbb7af878 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Wed, 23 Aug 2023 15:40:34 +0800 Subject: [PATCH] Add SDP self tests --- tests/__init__.py | 13 +++++ tests/l2cap_test.py | 69 +++--------------------- tests/sdp_test.py | 124 ++++++++++++++++++++++++++++++++++++++++++-- tests/test_utils.py | 73 ++++++++++++++++++++++++++ 4 files changed, 214 insertions(+), 65 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/test_utils.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..1e45f74 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/l2cap_test.py b/tests/l2cap_test.py index 6f8e181..c6b2340 100644 --- a/tests/l2cap_test.py +++ b/tests/l2cap_test.py @@ -21,13 +21,9 @@ import os import random import pytest -from bumble.controller import Controller -from bumble.link import LocalLink -from bumble.device import Device -from bumble.host import Host -from bumble.transport import AsyncPipeSink from bumble.core import ProtocolError from bumble.l2cap import L2CAP_Connection_Request +from .test_utils import TwoDevices # ----------------------------------------------------------------------------- @@ -37,60 +33,6 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- -class TwoDevices: - def __init__(self): - self.connections = [None, None] - - self.link = LocalLink() - self.controllers = [ - Controller('C1', link=self.link), - Controller('C2', link=self.link), - ] - self.devices = [ - Device( - address='F0:F1:F2:F3:F4:F5', - host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), - ), - Device( - address='F5:F4:F3:F2:F1:F0', - host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), - ), - ] - - self.paired = [None, None] - - def on_connection(self, which, connection): - self.connections[which] = connection - - def on_paired(self, which, keys): - self.paired[which] = keys - - -# ----------------------------------------------------------------------------- -async def setup_connection(): - # Create two devices, each with a controller, attached to the same link - two_devices = TwoDevices() - - # Attach listeners - two_devices.devices[0].on( - 'connection', lambda connection: two_devices.on_connection(0, connection) - ) - two_devices.devices[1].on( - 'connection', lambda connection: two_devices.on_connection(1, connection) - ) - - # Start - await two_devices.devices[0].power_on() - await two_devices.devices[1].power_on() - - # Connect the two devices - await two_devices.devices[0].connect(two_devices.devices[1].random_address) - - # Check the post conditions - assert two_devices.connections[0] is not None - assert two_devices.connections[1] is not None - - return two_devices # ----------------------------------------------------------------------------- @@ -132,7 +74,8 @@ def test_helpers(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_basic_connection(): - devices = await setup_connection() + devices = TwoDevices() + await devices.setup_connection() psm = 1234 # Check that if there's no one listening, we can't connect @@ -184,7 +127,8 @@ async def test_basic_connection(): # ----------------------------------------------------------------------------- async def transfer_payload(max_credits, mtu, mps): - devices = await setup_connection() + devices = TwoDevices() + await devices.setup_connection() received = [] @@ -226,7 +170,8 @@ async def test_transfer(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_bidirectional_transfer(): - devices = await setup_connection() + devices = TwoDevices() + await devices.setup_connection() client_received = [] server_received = [] diff --git a/tests/sdp_test.py b/tests/sdp_test.py index 505539c..090e7b2 100644 --- a/tests/sdp_test.py +++ b/tests/sdp_test.py @@ -15,8 +15,23 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- -from bumble.core import UUID -from bumble.sdp import DataElement +import asyncio +import logging +import os + +from bumble.core import UUID, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID +from bumble.sdp import ( + DataElement, + ServiceAttribute, + Client, + Server, + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + SDP_PUBLIC_BROWSE_ROOT, + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, +) +from .test_utils import TwoDevices # ----------------------------------------------------------------------------- # pylint: disable=invalid-name @@ -157,5 +172,108 @@ def test_data_elements() -> None: # ----------------------------------------------------------------------------- -if __name__ == '__main__': +def sdp_records(): + return { + 0x00010001: [ + ServiceAttribute( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(0x00010001), + ), + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), + ), + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))] + ), + ), + ServiceAttribute( + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), + ] + ), + ), + ] + } + + +# ----------------------------------------------------------------------------- +async def test_service_search(): + # Setup connections + devices = TwoDevices() + await devices.setup_connection() + assert devices.connections[0] + assert devices.connections[1] + + # Register SDP service + devices.devices[0].sdp_server.service_records.update(sdp_records()) + + # Search for service + client = Client(devices.devices[1]) + await client.connect(devices.connections[1]) + services = await client.search_services( + [UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')] + ) + + # Then + assert services[0] == 0x00010001 + + +# ----------------------------------------------------------------------------- +async def test_service_attribute(): + # Setup connections + devices = TwoDevices() + await devices.setup_connection() + + # Register SDP service + devices.devices[0].sdp_server.service_records.update(sdp_records()) + + # Search for service + client = Client(devices.devices[1]) + await client.connect(devices.connections[1]) + attributes = await client.get_attributes( + 0x00010001, [SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID] + ) + + # Then + assert attributes[0].value.value == sdp_records()[0x00010001][0].value.value + + +# ----------------------------------------------------------------------------- +async def test_service_search_attribute(): + # Setup connections + devices = TwoDevices() + await devices.setup_connection() + + # Register SDP service + devices.devices[0].sdp_server.service_records.update(sdp_records()) + + # Search for service + client = Client(devices.devices[1]) + await client.connect(devices.connections[1]) + attributes = await client.search_attributes( + [UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0x0000FFFF, 8)] + ) + + # Then + for expect, actual in zip(attributes, sdp_records().values()): + assert expect.id == actual.id + assert expect.value == actual.value + + +# ----------------------------------------------------------------------------- +async def run(): test_data_elements() + await test_service_attribute() + await test_service_search() + await test_service_search_attribute() + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + asyncio.run(run()) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..f19f18c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,73 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional + +from bumble.controller import Controller +from bumble.link import LocalLink +from bumble.device import Device, Connection +from bumble.host import Host +from bumble.transport import AsyncPipeSink +from bumble.hci import Address + + +class TwoDevices: + connections: List[Optional[Connection]] + + def __init__(self) -> None: + self.connections = [None, None] + + self.link = LocalLink() + self.controllers = [ + Controller('C1', link=self.link), + Controller('C2', link=self.link), + ] + self.devices = [ + Device( + address=Address('F0:F1:F2:F3:F4:F5'), + host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), + ), + Device( + address=Address('F5:F4:F3:F2:F1:F0'), + host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), + ), + ] + + self.paired = [None, None] + + def on_connection(self, which, connection): + self.connections[which] = connection + + def on_paired(self, which, keys): + self.paired[which] = keys + + async def setup_connection(self) -> None: + # Attach listeners + self.devices[0].on( + 'connection', lambda connection: self.on_connection(0, connection) + ) + self.devices[1].on( + 'connection', lambda connection: self.on_connection(1, connection) + ) + + # Start + await self.devices[0].power_on() + await self.devices[1].power_on() + + # Connect the two devices + await self.devices[0].connect(self.devices[1].random_address) + + # Check the post conditions + assert self.connections[0] is not None + assert self.connections[1] is not None