From dd090c9e6be690d2c00402e1c927d374181d99fb Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 8 Dec 2023 11:00:44 +0800 Subject: [PATCH] Add ASCS tests --- bumble/controller.py | 12 ++ bumble/profiles/bap.py | 21 ++++ tests/bap_test.py | 251 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 284 insertions(+) diff --git a/bumble/controller.py b/bumble/controller.py index d035bcca..4ead098e 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -1263,3 +1263,15 @@ class Controller: See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command ''' return struct.pack(' None: SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000 @@ -85,6 +108,92 @@ def test_vendor_specific_pac_record() -> None: assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA +# ----------------------------------------------------------------------------- +def test_ASE_Config_Codec() -> None: + operation = ASE_Config_Codec( + ase_id=[1, 2], + target_latency=[3, 4], + target_phy=[5, 6], + codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)], + codec_specific_configuration=[b'foo', b'bar'], + ) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_ASE_Config_QOS() -> None: + operation = ASE_Config_QOS( + ase_id=[1, 2], + cig_id=[1, 2], + cis_id=[3, 4], + sdu_interval=[5, 6], + framing=[0, 1], + phy=[2, 3], + max_sdu=[4, 5], + retransmission_number=[6, 7], + max_transport_latency=[8, 9], + presentation_delay=[10, 11], + ) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_ASE_Enable() -> None: + operation = ASE_Enable( + ase_id=[1, 2], + metadata=[b'foo', b'bar'], + ) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_ASE_Update_Metadata() -> None: + operation = ASE_Update_Metadata( + ase_id=[1, 2], + metadata=[b'foo', b'bar'], + ) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_ASE_Disable() -> None: + operation = ASE_Disable(ase_id=[1, 2]) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_ASE_Release() -> None: + operation = ASE_Release(ase_id=[1, 2]) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_ASE_Receiver_Start_Ready() -> None: + operation = ASE_Receiver_Start_Ready(ase_id=[1, 2]) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_ASE_Receiver_Stop_Ready() -> None: + operation = ASE_Receiver_Stop_Ready(ase_id=[1, 2]) + basic_check(operation) + + +# ----------------------------------------------------------------------------- +def test_codec_specific_configuration() -> None: + SAMPLE_FREQUENCY = SamplingFrequency.FREQ_16000 + FRAME_SURATION = FrameDuration.DURATION_10000_US + AUDIO_LOCATION = AudioLocation.FRONT_LEFT + config = CodecSpecificConfiguration( + sampling_frequency=SAMPLE_FREQUENCY, + frame_duration=FRAME_SURATION, + audio_channel_allocation=AUDIO_LOCATION, + octets_per_codec_frame=60, + codec_frames_per_sdu=1, + ) + assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_pacs(): @@ -140,6 +249,148 @@ async def test_pacs(): ) +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ascs(): + devices = TwoDevices() + devices[0].add_service( + AudioStreamControlService(device=devices[0], sink_ase_id=[1, 2]) + ) + + await devices.setup_connection() + peer = device.Peer(devices.connections[1]) + ascs_client = await peer.discover_service_and_create_proxy( + AudioStreamControlServiceProxy + ) + + notifications = {1: asyncio.Queue(), 2: asyncio.Queue()} + + def on_notification(data: bytes, ase_id: int): + notifications[ase_id].put_nowait(data) + + # Should be idle + assert await ascs_client.sink_ase[0].read_value() == bytes( + [1, AseStateMachine.State.IDLE] + ) + assert await ascs_client.sink_ase[1].read_value() == bytes( + [2, AseStateMachine.State.IDLE] + ) + + # Subscribe + await ascs_client.sink_ase[0].subscribe( + functools.partial(on_notification, ase_id=1) + ) + await ascs_client.sink_ase[1].subscribe( + functools.partial(on_notification, ase_id=2) + ) + + # Config Codec + config = CodecSpecificConfiguration( + sampling_frequency=SamplingFrequency.FREQ_48000, + frame_duration=FrameDuration.DURATION_10000_US, + audio_channel_allocation=AudioLocation.FRONT_LEFT, + octets_per_codec_frame=120, + codec_frames_per_sdu=1, + ) + await ascs_client.ase_control_point.write_value( + ASE_Config_Codec( + ase_id=[1, 2], + target_latency=[3, 4], + target_phy=[5, 6], + codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)], + codec_specific_configuration=[config, config], + ) + ) + assert (await notifications[1].get())[:2] == bytes( + [1, AseStateMachine.State.CODEC_CONFIGURED] + ) + assert (await notifications[2].get())[:2] == bytes( + [2, AseStateMachine.State.CODEC_CONFIGURED] + ) + + # Config QOS + await ascs_client.ase_control_point.write_value( + ASE_Config_QOS( + ase_id=[1, 2], + cig_id=[1, 2], + cis_id=[3, 4], + sdu_interval=[5, 6], + framing=[0, 1], + phy=[2, 3], + max_sdu=[4, 5], + retransmission_number=[6, 7], + max_transport_latency=[8, 9], + presentation_delay=[10, 11], + ) + ) + assert (await notifications[1].get())[:2] == bytes( + [1, AseStateMachine.State.QOS_CONFIGURED] + ) + assert (await notifications[2].get())[:2] == bytes( + [2, AseStateMachine.State.QOS_CONFIGURED] + ) + + # Enable + await ascs_client.ase_control_point.write_value( + ASE_Enable( + ase_id=[1, 2], + metadata=[b'foo', b'bar'], + ) + ) + assert (await notifications[1].get())[:2] == bytes( + [1, AseStateMachine.State.ENABLING] + ) + assert (await notifications[2].get())[:2] == bytes( + [2, AseStateMachine.State.ENABLING] + ) + + # CIS establishment + devices[0].emit( + 'cis_establishment', + device.CisLink( + device=devices[0], + acl_connection=devices.connections[0], + handle=5, + cis_id=3, + cig_id=1, + ), + ) + devices[0].emit( + 'cis_establishment', + device.CisLink( + device=devices[0], + acl_connection=devices.connections[0], + handle=6, + cis_id=4, + cig_id=2, + ), + ) + assert (await notifications[1].get())[:2] == bytes( + [1, AseStateMachine.State.STREAMING] + ) + assert (await notifications[2].get())[:2] == bytes( + [2, AseStateMachine.State.STREAMING] + ) + + # Release + await ascs_client.ase_control_point.write_value( + ASE_Release( + ase_id=[1, 2], + metadata=[b'foo', b'bar'], + ) + ) + assert (await notifications[1].get())[:2] == bytes( + [1, AseStateMachine.State.RELEASING] + ) + assert (await notifications[2].get())[:2] == bytes( + [2, AseStateMachine.State.RELEASING] + ) + assert (await notifications[1].get())[:2] == bytes([1, AseStateMachine.State.IDLE]) + assert (await notifications[2].get())[:2] == bytes([2, AseStateMachine.State.IDLE]) + + await asyncio.sleep(0.001) + + # ----------------------------------------------------------------------------- async def run(): await test_pacs()