From ff6528d2bf38fe39aec683be33cfab1e2c263771 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 8 Dec 2023 00:03:21 +0800 Subject: [PATCH] Add Advertising unit tests --- bumble/device.py | 15 +++- tests/device_test.py | 175 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 186 insertions(+), 4 deletions(-) diff --git a/bumble/device.py b/bumble/device.py index 369040f2..f0f4ee18 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -440,12 +440,16 @@ class AdvertisingType(IntEnum): # ----------------------------------------------------------------------------- @dataclass class LegacyAdvertiser: + device: Device advertising_type: AdvertisingType own_address_type: OwnAddressType auto_restart: bool advertising_data: Optional[bytes] scan_response_data: Optional[bytes] + async def stop(self) -> None: + await self.device.stop_legacy_advertising() + # ----------------------------------------------------------------------------- @dataclass @@ -1707,6 +1711,7 @@ class Device(CompositeEventEmitter): ) self.legacy_advertiser = LegacyAdvertiser( + device=self, advertising_type=advertising_type, own_address_type=own_address_type, auto_restart=auto_restart, @@ -1763,7 +1768,7 @@ class Device(CompositeEventEmitter): DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, ): - if i not in self.extended_advertisers.keys(): + if i not in self.extended_advertisers: adv_handle = i break @@ -3236,7 +3241,8 @@ class Device(CompositeEventEmitter): own_address_type = self.legacy_advertiser.own_address_type # Store advertiser for restarting - it's only required for legacy, since # extended advertisement produces HCI_Advertising_Set_Terminated. - advertiser = self.legacy_advertiser + if self.legacy_advertiser.auto_restart: + advertiser = self.legacy_advertiser else: # For extended advertisement, determining own address type later. own_address_type = OwnAddressType.RANDOM @@ -3366,6 +3372,8 @@ class Device(CompositeEventEmitter): self.start_legacy_advertising( advertising_type=advertiser.advertising_type, own_address_type=advertiser.own_address_type, + advertising_data=advertiser.advertising_data, + scan_response_data=advertiser.scan_response_data, auto_restart=True, ), ) @@ -3715,7 +3723,8 @@ class Device(CompositeEventEmitter): connection = self.lookup_connection(connection_handle) if advertiser := self.extended_advertisers.pop(advertising_handle, None): if connection: - connection.advertiser_after_disconnection = advertiser + if advertiser.auto_restart: + connection.advertiser_after_disconnection = advertiser if advertiser.own_address_type in ( OwnAddressType.PUBLIC, OwnAddressType.RESOLVABLE_OR_PUBLIC, diff --git a/tests/device_test.py b/tests/device_test.py index 1bcd0d08..d51431f5 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -20,8 +20,14 @@ import logging import os from types import LambdaType import pytest +from unittest import mock -from bumble.core import BT_BR_EDR_TRANSPORT +from bumble.core import ( + BT_BR_EDR_TRANSPORT, + BT_LE_TRANSPORT, + BT_PERIPHERAL_ROLE, + ConnectionParameters, +) from bumble.device import Connection, Device from bumble.host import Host from bumble.hci import ( @@ -30,6 +36,7 @@ from bumble.hci import ( HCI_CREATE_CONNECTION_COMMAND, HCI_SUCCESS, Address, + OwnAddressType, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, @@ -232,6 +239,172 @@ async def test_flush(): pass +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_legacy_advertising(): + device = Device(host=mock.AsyncMock(Host)) + + # Start advertising + advertiser = await device.start_legacy_advertising() + assert device.legacy_advertiser + + # Stop advertising + await advertiser.stop() + assert not device.legacy_advertiser + + +# ----------------------------------------------------------------------------- +@pytest.mark.parametrize( + 'own_address_type,', + (OwnAddressType.PUBLIC, OwnAddressType.RANDOM), +) +@pytest.mark.asyncio +async def test_legacy_advertising_connection(own_address_type): + device = Device(host=mock.AsyncMock(Host)) + peer_address = Address('F0:F1:F2:F3:F4:F5') + + # Start advertising + advertiser = await device.start_legacy_advertising() + device.on_connection( + 0x0001, + BT_LE_TRANSPORT, + peer_address, + BT_PERIPHERAL_ROLE, + ConnectionParameters(0, 0, 0), + ) + + if own_address_type == OwnAddressType.PUBLIC: + assert device.lookup_connection(0x0001).self_address == device.public_address + else: + assert device.lookup_connection(0x0001).self_address == device.random_address + + # For unknown reason, read_phy() in on_connection() would be killed at the end of + # test, so we force scheduling here to avoid an warning. + await asyncio.sleep(0.0001) + + +# ----------------------------------------------------------------------------- +@pytest.mark.parametrize( + 'auto_restart,', + (True, False), +) +@pytest.mark.asyncio +async def test_legacy_advertising_disconnection(auto_restart): + device = Device(host=mock.AsyncMock(spec=Host)) + peer_address = Address('F0:F1:F2:F3:F4:F5') + advertiser = await device.start_legacy_advertising(auto_restart=auto_restart) + device.on_connection( + 0x0001, + BT_LE_TRANSPORT, + peer_address, + BT_PERIPHERAL_ROLE, + ConnectionParameters(0, 0, 0), + ) + + device.start_legacy_advertising = mock.AsyncMock() + + device.on_disconnection(0x0001, 0) + + if auto_restart: + device.start_legacy_advertising.assert_called_with( + advertising_type=advertiser.advertising_type, + own_address_type=advertiser.own_address_type, + auto_restart=advertiser.auto_restart, + advertising_data=advertiser.advertising_data, + scan_response_data=advertiser.scan_response_data, + ) + else: + device.start_legacy_advertising.assert_not_called() + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_extended_advertising(): + device = Device(host=mock.AsyncMock(Host)) + + # Start advertising + advertiser = await device.start_extended_advertising() + assert device.extended_advertisers + + # Stop advertising + await advertiser.stop() + assert not device.extended_advertisers + + +# ----------------------------------------------------------------------------- +@pytest.mark.parametrize( + 'own_address_type,', + (OwnAddressType.PUBLIC, OwnAddressType.RANDOM), +) +@pytest.mark.asyncio +async def test_extended_advertising_connection(own_address_type): + device = Device(host=mock.AsyncMock(spec=Host)) + peer_address = Address('F0:F1:F2:F3:F4:F5') + advertiser = await device.start_extended_advertising( + own_address_type=own_address_type + ) + device.on_connection( + 0x0001, + BT_LE_TRANSPORT, + peer_address, + BT_PERIPHERAL_ROLE, + ConnectionParameters(0, 0, 0), + ) + device.on_advertising_set_termination( + HCI_SUCCESS, + advertiser.handle, + 0x0001, + ) + + if own_address_type == OwnAddressType.PUBLIC: + assert device.lookup_connection(0x0001).self_address == device.public_address + else: + assert device.lookup_connection(0x0001).self_address == device.random_address + + # For unknown reason, read_phy() in on_connection() would be killed at the end of + # test, so we force scheduling here to avoid an warning. + await asyncio.sleep(0.0001) + + +# ----------------------------------------------------------------------------- +@pytest.mark.parametrize( + 'auto_restart,', + (True, False), +) +@pytest.mark.asyncio +async def test_extended_advertising_disconnection(auto_restart): + device = Device(host=mock.AsyncMock(spec=Host)) + peer_address = Address('F0:F1:F2:F3:F4:F5') + advertiser = await device.start_extended_advertising(auto_restart=auto_restart) + device.on_connection( + 0x0001, + BT_LE_TRANSPORT, + peer_address, + BT_PERIPHERAL_ROLE, + ConnectionParameters(0, 0, 0), + ) + device.on_advertising_set_termination( + HCI_SUCCESS, + advertiser.handle, + 0x0001, + ) + + device.start_extended_advertising = mock.AsyncMock() + + device.on_disconnection(0x0001, 0) + + if auto_restart: + device.start_extended_advertising.assert_called_with( + advertising_properties=advertiser.advertising_properties, + own_address_type=advertiser.own_address_type, + auto_restart=advertiser.auto_restart, + advertising_data=advertiser.advertising_data, + scan_response_data=advertiser.scan_response_data, + ) + else: + device.start_extended_advertising.assert_not_called() + + # ----------------------------------------------------------------------------- def test_gatt_services_with_gas(): device = Device(host=Host(None, None))