forked from auracaster/bumble_mirror
Controller: CIS implementation
This commit is contained in:
@@ -423,6 +423,55 @@ async def test_get_remote_le_features():
|
||||
assert (await devices.connections[0].get_remote_le_features()) is not None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_cis():
|
||||
devices = TwoDevices()
|
||||
await devices.setup_connection()
|
||||
|
||||
peripheral_cis_futures = {}
|
||||
|
||||
def on_cis_request(
|
||||
acl_connection: Connection,
|
||||
cis_handle: int,
|
||||
_cig_id: int,
|
||||
_cis_id: int,
|
||||
):
|
||||
acl_connection.abort_on(
|
||||
'disconnection', devices[1].accept_cis_request(cis_handle)
|
||||
)
|
||||
peripheral_cis_futures[cis_handle] = asyncio.get_running_loop().create_future()
|
||||
|
||||
devices[1].on('cis_request', on_cis_request)
|
||||
devices[1].on(
|
||||
'cis_establishment',
|
||||
lambda cis_link: peripheral_cis_futures[cis_link.handle].set_result(None),
|
||||
)
|
||||
|
||||
cis_handles = await devices[0].setup_cig(
|
||||
cig_id=1,
|
||||
cis_id=[2, 3],
|
||||
sdu_interval=(0, 0),
|
||||
framing=0,
|
||||
max_sdu=(0, 0),
|
||||
retransmission_number=0,
|
||||
max_transport_latency=(0, 0),
|
||||
)
|
||||
assert len(cis_handles) == 2
|
||||
cis_links = await devices[0].create_cis(
|
||||
[
|
||||
(cis_handles[0], devices.connections[0].handle),
|
||||
(cis_handles[1], devices.connections[0].handle),
|
||||
]
|
||||
)
|
||||
await asyncio.gather(*peripheral_cis_futures.values())
|
||||
assert len(cis_links) == 2
|
||||
|
||||
# TODO: Fix Host CIS support.
|
||||
# await cis_links[0].disconnect()
|
||||
# await cis_links[1].disconnect()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_gatt_services_with_gas():
|
||||
device = Device(host=Host(None, None))
|
||||
|
||||
Reference in New Issue
Block a user