Add Advertising unit tests

This commit is contained in:
Josh Wu
2023-12-08 00:03:21 +08:00
parent 72ac75a98d
commit ff6528d2bf
2 changed files with 186 additions and 4 deletions
+12 -3
View File
@@ -440,12 +440,16 @@ class AdvertisingType(IntEnum):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclass @dataclass
class LegacyAdvertiser: class LegacyAdvertiser:
device: Device
advertising_type: AdvertisingType advertising_type: AdvertisingType
own_address_type: OwnAddressType own_address_type: OwnAddressType
auto_restart: bool auto_restart: bool
advertising_data: Optional[bytes] advertising_data: Optional[bytes]
scan_response_data: Optional[bytes] scan_response_data: Optional[bytes]
async def stop(self) -> None:
await self.device.stop_legacy_advertising()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclass @dataclass
@@ -1707,6 +1711,7 @@ class Device(CompositeEventEmitter):
) )
self.legacy_advertiser = LegacyAdvertiser( self.legacy_advertiser = LegacyAdvertiser(
device=self,
advertising_type=advertising_type, advertising_type=advertising_type,
own_address_type=own_address_type, own_address_type=own_address_type,
auto_restart=auto_restart, auto_restart=auto_restart,
@@ -1763,7 +1768,7 @@ class Device(CompositeEventEmitter):
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
): ):
if i not in self.extended_advertisers.keys(): if i not in self.extended_advertisers:
adv_handle = i adv_handle = i
break break
@@ -3236,7 +3241,8 @@ class Device(CompositeEventEmitter):
own_address_type = self.legacy_advertiser.own_address_type own_address_type = self.legacy_advertiser.own_address_type
# Store advertiser for restarting - it's only required for legacy, since # Store advertiser for restarting - it's only required for legacy, since
# extended advertisement produces HCI_Advertising_Set_Terminated. # extended advertisement produces HCI_Advertising_Set_Terminated.
advertiser = self.legacy_advertiser if self.legacy_advertiser.auto_restart:
advertiser = self.legacy_advertiser
else: else:
# For extended advertisement, determining own address type later. # For extended advertisement, determining own address type later.
own_address_type = OwnAddressType.RANDOM own_address_type = OwnAddressType.RANDOM
@@ -3366,6 +3372,8 @@ class Device(CompositeEventEmitter):
self.start_legacy_advertising( self.start_legacy_advertising(
advertising_type=advertiser.advertising_type, advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type, own_address_type=advertiser.own_address_type,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
auto_restart=True, auto_restart=True,
), ),
) )
@@ -3715,7 +3723,8 @@ class Device(CompositeEventEmitter):
connection = self.lookup_connection(connection_handle) connection = self.lookup_connection(connection_handle)
if advertiser := self.extended_advertisers.pop(advertising_handle, None): if advertiser := self.extended_advertisers.pop(advertising_handle, None):
if connection: if connection:
connection.advertiser_after_disconnection = advertiser if advertiser.auto_restart:
connection.advertiser_after_disconnection = advertiser
if advertiser.own_address_type in ( if advertiser.own_address_type in (
OwnAddressType.PUBLIC, OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC, OwnAddressType.RESOLVABLE_OR_PUBLIC,
+174 -1
View File
@@ -20,8 +20,14 @@ import logging
import os import os
from types import LambdaType from types import LambdaType
import pytest import pytest
from unittest import mock
from bumble.core import BT_BR_EDR_TRANSPORT from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import Connection, Device from bumble.device import Connection, Device
from bumble.host import Host from bumble.host import Host
from bumble.hci import ( from bumble.hci import (
@@ -30,6 +36,7 @@ from bumble.hci import (
HCI_CREATE_CONNECTION_COMMAND, HCI_CREATE_CONNECTION_COMMAND,
HCI_SUCCESS, HCI_SUCCESS,
Address, Address,
OwnAddressType,
HCI_Command_Complete_Event, HCI_Command_Complete_Event,
HCI_Command_Status_Event, HCI_Command_Status_Event,
HCI_Connection_Complete_Event, HCI_Connection_Complete_Event,
@@ -232,6 +239,172 @@ async def test_flush():
pass pass
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_legacy_advertising()
assert device.legacy_advertiser
# Stop advertising
await advertiser.stop()
assert not device.legacy_advertiser
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_legacy_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
advertiser = await device.start_legacy_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.start_legacy_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_legacy_advertising.assert_called_with(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_legacy_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_extended_advertising()
assert device.extended_advertisers
# Stop advertising
await advertiser.stop()
assert not device.extended_advertisers
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(
own_address_type=own_address_type
)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
# For unknown reason, read_phy() in on_connection() would be killed at the end of
# test, so we force scheduling here to avoid an warning.
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_gatt_services_with_gas(): def test_gatt_services_with_gas():
device = Device(host=Host(None, None)) device = Device(host=Host(None, None))