forked from auracaster/bumble_mirror
Add ASCS tests
This commit is contained in:
@@ -1263,3 +1263,15 @@ class Controller:
|
|||||||
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command
|
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command
|
||||||
'''
|
'''
|
||||||
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
|
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
|
||||||
|
|
||||||
|
def on_hci_le_setup_iso_data_path_command(self, command):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command
|
||||||
|
'''
|
||||||
|
return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
|
||||||
|
|
||||||
|
def on_hci_le_remove_iso_data_path_command(self, command):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec Vol 4, Part E - 7.8.110 LE Remove ISO Data Path Command
|
||||||
|
'''
|
||||||
|
return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
|
||||||
|
|||||||
@@ -1209,3 +1209,24 @@ class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
|
|||||||
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
|
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
|
||||||
):
|
):
|
||||||
self.source_audio_locations = characteristics[0]
|
self.source_audio_locations = characteristics[0]
|
||||||
|
|
||||||
|
|
||||||
|
class AudioStreamControlServiceProxy(gatt_client.ProfileServiceProxy):
|
||||||
|
SERVICE_CLASS = AudioStreamControlService
|
||||||
|
|
||||||
|
sink_ase: List[gatt_client.CharacteristicProxy]
|
||||||
|
source_ase: List[gatt_client.CharacteristicProxy]
|
||||||
|
ase_control_point: gatt_client.CharacteristicProxy
|
||||||
|
|
||||||
|
def __init__(self, service_proxy: gatt_client.ServiceProxy):
|
||||||
|
self.service_proxy = service_proxy
|
||||||
|
|
||||||
|
self.sink_ase = service_proxy.get_characteristics_by_uuid(
|
||||||
|
gatt.GATT_SINK_ASE_CHARACTERISTIC
|
||||||
|
)
|
||||||
|
self.source_ase = service_proxy.get_characteristics_by_uuid(
|
||||||
|
gatt.GATT_SOURCE_ASE_CHARACTERISTIC
|
||||||
|
)
|
||||||
|
self.ase_control_point = service_proxy.get_characteristics_by_uuid(
|
||||||
|
gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC
|
||||||
|
)[0]
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
import functools
|
||||||
import pytest
|
import pytest
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@@ -24,11 +25,26 @@ from bumble import device
|
|||||||
from bumble.hci import CodecID, CodingFormat
|
from bumble.hci import CodecID, CodingFormat
|
||||||
from bumble.profiles.bap import (
|
from bumble.profiles.bap import (
|
||||||
AudioLocation,
|
AudioLocation,
|
||||||
|
AseStateMachine,
|
||||||
|
ASE_Operation,
|
||||||
|
ASE_Config_Codec,
|
||||||
|
ASE_Config_QOS,
|
||||||
|
ASE_Disable,
|
||||||
|
ASE_Enable,
|
||||||
|
ASE_Receiver_Start_Ready,
|
||||||
|
ASE_Receiver_Stop_Ready,
|
||||||
|
ASE_Release,
|
||||||
|
ASE_Update_Metadata,
|
||||||
SupportedFrameDuration,
|
SupportedFrameDuration,
|
||||||
SupportedSamplingFrequency,
|
SupportedSamplingFrequency,
|
||||||
|
SamplingFrequency,
|
||||||
|
FrameDuration,
|
||||||
CodecSpecificCapabilities,
|
CodecSpecificCapabilities,
|
||||||
|
CodecSpecificConfiguration,
|
||||||
ContextType,
|
ContextType,
|
||||||
PacRecord,
|
PacRecord,
|
||||||
|
AudioStreamControlService,
|
||||||
|
AudioStreamControlServiceProxy,
|
||||||
PublishedAudioCapabilitiesService,
|
PublishedAudioCapabilitiesService,
|
||||||
PublishedAudioCapabilitiesServiceProxy,
|
PublishedAudioCapabilitiesServiceProxy,
|
||||||
)
|
)
|
||||||
@@ -40,6 +56,13 @@ from .test_utils import TwoDevices
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def basic_check(operation: ASE_Operation):
|
||||||
|
serialized = bytes(operation)
|
||||||
|
parsed = ASE_Operation.from_bytes(serialized)
|
||||||
|
assert bytes(parsed) == serialized
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_codec_specific_capabilities() -> None:
|
def test_codec_specific_capabilities() -> None:
|
||||||
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
|
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
|
||||||
@@ -85,6 +108,92 @@ def test_vendor_specific_pac_record() -> None:
|
|||||||
assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
|
assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Config_Codec() -> None:
|
||||||
|
operation = ASE_Config_Codec(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
target_latency=[3, 4],
|
||||||
|
target_phy=[5, 6],
|
||||||
|
codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
|
||||||
|
codec_specific_configuration=[b'foo', b'bar'],
|
||||||
|
)
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Config_QOS() -> None:
|
||||||
|
operation = ASE_Config_QOS(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
cig_id=[1, 2],
|
||||||
|
cis_id=[3, 4],
|
||||||
|
sdu_interval=[5, 6],
|
||||||
|
framing=[0, 1],
|
||||||
|
phy=[2, 3],
|
||||||
|
max_sdu=[4, 5],
|
||||||
|
retransmission_number=[6, 7],
|
||||||
|
max_transport_latency=[8, 9],
|
||||||
|
presentation_delay=[10, 11],
|
||||||
|
)
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Enable() -> None:
|
||||||
|
operation = ASE_Enable(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
metadata=[b'foo', b'bar'],
|
||||||
|
)
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Update_Metadata() -> None:
|
||||||
|
operation = ASE_Update_Metadata(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
metadata=[b'foo', b'bar'],
|
||||||
|
)
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Disable() -> None:
|
||||||
|
operation = ASE_Disable(ase_id=[1, 2])
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Release() -> None:
|
||||||
|
operation = ASE_Release(ase_id=[1, 2])
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Receiver_Start_Ready() -> None:
|
||||||
|
operation = ASE_Receiver_Start_Ready(ase_id=[1, 2])
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_ASE_Receiver_Stop_Ready() -> None:
|
||||||
|
operation = ASE_Receiver_Stop_Ready(ase_id=[1, 2])
|
||||||
|
basic_check(operation)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def test_codec_specific_configuration() -> None:
|
||||||
|
SAMPLE_FREQUENCY = SamplingFrequency.FREQ_16000
|
||||||
|
FRAME_SURATION = FrameDuration.DURATION_10000_US
|
||||||
|
AUDIO_LOCATION = AudioLocation.FRONT_LEFT
|
||||||
|
config = CodecSpecificConfiguration(
|
||||||
|
sampling_frequency=SAMPLE_FREQUENCY,
|
||||||
|
frame_duration=FRAME_SURATION,
|
||||||
|
audio_channel_allocation=AUDIO_LOCATION,
|
||||||
|
octets_per_codec_frame=60,
|
||||||
|
codec_frames_per_sdu=1,
|
||||||
|
)
|
||||||
|
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_pacs():
|
async def test_pacs():
|
||||||
@@ -140,6 +249,148 @@ async def test_pacs():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ascs():
|
||||||
|
devices = TwoDevices()
|
||||||
|
devices[0].add_service(
|
||||||
|
AudioStreamControlService(device=devices[0], sink_ase_id=[1, 2])
|
||||||
|
)
|
||||||
|
|
||||||
|
await devices.setup_connection()
|
||||||
|
peer = device.Peer(devices.connections[1])
|
||||||
|
ascs_client = await peer.discover_service_and_create_proxy(
|
||||||
|
AudioStreamControlServiceProxy
|
||||||
|
)
|
||||||
|
|
||||||
|
notifications = {1: asyncio.Queue(), 2: asyncio.Queue()}
|
||||||
|
|
||||||
|
def on_notification(data: bytes, ase_id: int):
|
||||||
|
notifications[ase_id].put_nowait(data)
|
||||||
|
|
||||||
|
# Should be idle
|
||||||
|
assert await ascs_client.sink_ase[0].read_value() == bytes(
|
||||||
|
[1, AseStateMachine.State.IDLE]
|
||||||
|
)
|
||||||
|
assert await ascs_client.sink_ase[1].read_value() == bytes(
|
||||||
|
[2, AseStateMachine.State.IDLE]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subscribe
|
||||||
|
await ascs_client.sink_ase[0].subscribe(
|
||||||
|
functools.partial(on_notification, ase_id=1)
|
||||||
|
)
|
||||||
|
await ascs_client.sink_ase[1].subscribe(
|
||||||
|
functools.partial(on_notification, ase_id=2)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Config Codec
|
||||||
|
config = CodecSpecificConfiguration(
|
||||||
|
sampling_frequency=SamplingFrequency.FREQ_48000,
|
||||||
|
frame_duration=FrameDuration.DURATION_10000_US,
|
||||||
|
audio_channel_allocation=AudioLocation.FRONT_LEFT,
|
||||||
|
octets_per_codec_frame=120,
|
||||||
|
codec_frames_per_sdu=1,
|
||||||
|
)
|
||||||
|
await ascs_client.ase_control_point.write_value(
|
||||||
|
ASE_Config_Codec(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
target_latency=[3, 4],
|
||||||
|
target_phy=[5, 6],
|
||||||
|
codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
|
||||||
|
codec_specific_configuration=[config, config],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (await notifications[1].get())[:2] == bytes(
|
||||||
|
[1, AseStateMachine.State.CODEC_CONFIGURED]
|
||||||
|
)
|
||||||
|
assert (await notifications[2].get())[:2] == bytes(
|
||||||
|
[2, AseStateMachine.State.CODEC_CONFIGURED]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Config QOS
|
||||||
|
await ascs_client.ase_control_point.write_value(
|
||||||
|
ASE_Config_QOS(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
cig_id=[1, 2],
|
||||||
|
cis_id=[3, 4],
|
||||||
|
sdu_interval=[5, 6],
|
||||||
|
framing=[0, 1],
|
||||||
|
phy=[2, 3],
|
||||||
|
max_sdu=[4, 5],
|
||||||
|
retransmission_number=[6, 7],
|
||||||
|
max_transport_latency=[8, 9],
|
||||||
|
presentation_delay=[10, 11],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (await notifications[1].get())[:2] == bytes(
|
||||||
|
[1, AseStateMachine.State.QOS_CONFIGURED]
|
||||||
|
)
|
||||||
|
assert (await notifications[2].get())[:2] == bytes(
|
||||||
|
[2, AseStateMachine.State.QOS_CONFIGURED]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Enable
|
||||||
|
await ascs_client.ase_control_point.write_value(
|
||||||
|
ASE_Enable(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
metadata=[b'foo', b'bar'],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (await notifications[1].get())[:2] == bytes(
|
||||||
|
[1, AseStateMachine.State.ENABLING]
|
||||||
|
)
|
||||||
|
assert (await notifications[2].get())[:2] == bytes(
|
||||||
|
[2, AseStateMachine.State.ENABLING]
|
||||||
|
)
|
||||||
|
|
||||||
|
# CIS establishment
|
||||||
|
devices[0].emit(
|
||||||
|
'cis_establishment',
|
||||||
|
device.CisLink(
|
||||||
|
device=devices[0],
|
||||||
|
acl_connection=devices.connections[0],
|
||||||
|
handle=5,
|
||||||
|
cis_id=3,
|
||||||
|
cig_id=1,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
devices[0].emit(
|
||||||
|
'cis_establishment',
|
||||||
|
device.CisLink(
|
||||||
|
device=devices[0],
|
||||||
|
acl_connection=devices.connections[0],
|
||||||
|
handle=6,
|
||||||
|
cis_id=4,
|
||||||
|
cig_id=2,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
assert (await notifications[1].get())[:2] == bytes(
|
||||||
|
[1, AseStateMachine.State.STREAMING]
|
||||||
|
)
|
||||||
|
assert (await notifications[2].get())[:2] == bytes(
|
||||||
|
[2, AseStateMachine.State.STREAMING]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Release
|
||||||
|
await ascs_client.ase_control_point.write_value(
|
||||||
|
ASE_Release(
|
||||||
|
ase_id=[1, 2],
|
||||||
|
metadata=[b'foo', b'bar'],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (await notifications[1].get())[:2] == bytes(
|
||||||
|
[1, AseStateMachine.State.RELEASING]
|
||||||
|
)
|
||||||
|
assert (await notifications[2].get())[:2] == bytes(
|
||||||
|
[2, AseStateMachine.State.RELEASING]
|
||||||
|
)
|
||||||
|
assert (await notifications[1].get())[:2] == bytes([1, AseStateMachine.State.IDLE])
|
||||||
|
assert (await notifications[2].get())[:2] == bytes([2, AseStateMachine.State.IDLE])
|
||||||
|
|
||||||
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
async def run():
|
async def run():
|
||||||
await test_pacs()
|
await test_pacs()
|
||||||
|
|||||||
Reference in New Issue
Block a user