mirror of
https://github.com/google/bumble.git
synced 2026-04-18 00:45:32 +00:00
Implement extended advertising emulation
This commit is contained in:
@@ -284,52 +284,49 @@ async def test_legacy_advertising():
|
||||
@pytest.mark.asyncio
|
||||
async def test_legacy_advertising_disconnection(auto_restart):
|
||||
devices = TwoDevices()
|
||||
device = devices[0]
|
||||
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
|
||||
await device.power_on()
|
||||
peer_address = Address('F0:F1:F2:F3:F4:F5')
|
||||
await device.start_advertising(auto_restart=auto_restart)
|
||||
device.on_le_connection(
|
||||
0x0001,
|
||||
peer_address,
|
||||
None,
|
||||
None,
|
||||
Role.PERIPHERAL,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
)
|
||||
for controller in devices.controllers:
|
||||
controller.le_features = bytes.fromhex('ffffffffffffffff')
|
||||
for dev in devices:
|
||||
await dev.power_on()
|
||||
await devices[0].start_advertising(auto_restart=auto_restart)
|
||||
connecion = await devices[1].connect(devices[0].random_address)
|
||||
|
||||
device.on_advertising_set_termination(
|
||||
HCI_SUCCESS, device.legacy_advertising_set.advertising_handle, 0x0001, 0
|
||||
)
|
||||
await connecion.disconnect()
|
||||
|
||||
device.on_disconnection(0x0001, 0)
|
||||
await async_barrier()
|
||||
await async_barrier()
|
||||
|
||||
if auto_restart:
|
||||
assert device.legacy_advertising_set
|
||||
assert devices[0].legacy_advertising_set
|
||||
started = asyncio.Event()
|
||||
if not device.is_advertising:
|
||||
device.legacy_advertising_set.once('start', started.set)
|
||||
if not devices[0].is_advertising:
|
||||
devices[0].legacy_advertising_set.once('start', started.set)
|
||||
await asyncio.wait_for(started.wait(), _TIMEOUT)
|
||||
assert device.is_advertising
|
||||
assert devices[0].is_advertising
|
||||
else:
|
||||
assert not device.is_advertising
|
||||
assert not devices[0].is_advertising
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_extended_advertising():
|
||||
device = TwoDevices()[0]
|
||||
await device.power_on()
|
||||
async def test_advertising_and_scanning():
|
||||
devices = TwoDevices()
|
||||
for dev in devices:
|
||||
await dev.power_on()
|
||||
|
||||
# Start scanning
|
||||
advertisements = asyncio.Queue[device.Advertisement]()
|
||||
devices[1].on(devices[1].EVENT_ADVERTISEMENT, advertisements.put_nowait)
|
||||
await devices[1].start_scanning()
|
||||
|
||||
# Start advertising
|
||||
advertising_set = await device.create_advertising_set()
|
||||
assert device.extended_advertising_sets
|
||||
advertising_set = await devices[0].create_advertising_set(advertising_data=b'123')
|
||||
assert devices[0].extended_advertising_sets
|
||||
assert advertising_set.enabled
|
||||
|
||||
advertisement = await asyncio.wait_for(advertisements.get(), _TIMEOUT)
|
||||
assert advertisement.data_bytes == b'123'
|
||||
|
||||
# Stop advertising
|
||||
await advertising_set.stop()
|
||||
assert not advertising_set.enabled
|
||||
@@ -342,33 +339,30 @@ async def test_extended_advertising():
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_extended_advertising_connection(own_address_type):
|
||||
device = TwoDevices()[0]
|
||||
await device.power_on()
|
||||
peer_address = Address('F0:F1:F2:F3:F4:F5')
|
||||
advertising_set = await device.create_advertising_set(
|
||||
devices = TwoDevices()
|
||||
for dev in devices:
|
||||
await dev.power_on()
|
||||
advertising_set = await devices[0].create_advertising_set(
|
||||
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
|
||||
)
|
||||
device.on_le_connection(
|
||||
0x0001,
|
||||
peer_address,
|
||||
None,
|
||||
None,
|
||||
Role.PERIPHERAL,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
)
|
||||
device.on_advertising_set_termination(
|
||||
HCI_SUCCESS,
|
||||
advertising_set.advertising_handle,
|
||||
0x0001,
|
||||
0,
|
||||
await asyncio.wait_for(
|
||||
devices[1].connect(advertising_set.random_address or devices[0].public_address),
|
||||
_TIMEOUT,
|
||||
)
|
||||
|
||||
# Advertising set should be terminated after connected.
|
||||
assert not advertising_set.enabled
|
||||
|
||||
if own_address_type == OwnAddressType.PUBLIC:
|
||||
assert device.lookup_connection(0x0001).self_address == device.public_address
|
||||
assert (
|
||||
devices[0].lookup_connection(0x0001).self_address
|
||||
== devices[0].public_address
|
||||
)
|
||||
else:
|
||||
assert device.lookup_connection(0x0001).self_address == device.random_address
|
||||
assert (
|
||||
devices[0].lookup_connection(0x0001).self_address
|
||||
== devices[0].random_address
|
||||
)
|
||||
|
||||
await async_barrier()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user