Merge pull request #359 from zxzxwu/ascs

Audio Stream Control Service
This commit is contained in:
zxzxwu
2023-12-12 00:47:03 +08:00
committed by GitHub
7 changed files with 1085 additions and 3 deletions
+12
View File
@@ -1263,3 +1263,15 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command
'''
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
def on_hci_le_setup_iso_data_path_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command
'''
return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
def on_hci_le_remove_iso_data_path_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.110 LE Remove ISO Data Path Command
'''
return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
+1 -1
View File
@@ -961,7 +961,7 @@ class Server(EventEmitter):
try:
attribute.write_value(connection, request.attribute_value)
except Exception as error:
logger.warning(f'!!! ignoring exception: {error}')
logger.exception(f'!!! ignoring exception: {error}')
def on_att_handle_value_confirmation(self, connection, _confirmation):
'''
+17
View File
@@ -728,6 +728,19 @@ HCI_LE_PHY_TYPE_TO_BIT = {
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT
}
class Phy(enum.IntEnum):
LE_1M = 0x01
LE_2M = 0x02
LE_CODED = 0x03
class PhyBit(enum.IntFlag):
LE_1M = 0b00000001
LE_2M = 0b00000010
LE_CODED = 0b00000100
# Connection Parameters
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
HCI_CONNECTION_LATENCY_MS_PER_UNIT = 1.25
@@ -4541,6 +4554,10 @@ class HCI_LE_Setup_ISO_Data_Path_Command(HCI_Command):
See Bluetooth spec @ 7.8.109 LE Setup ISO Data Path command
'''
class Direction(enum.IntEnum):
HOST_TO_CONTROLLER = 0x00
CONTROLLER_TO_HOST = 0x01
connection_handle: int
data_path_direction: int
data_path_id: int
+752 -1
View File
@@ -23,13 +23,21 @@ import dataclasses
import enum
import struct
import functools
from typing import Optional, List, Union
import logging
from typing import Optional, List, Union, Type, Dict, Any, Tuple, cast
from bumble import colors
from bumble import device
from bumble import hci
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
@@ -220,6 +228,231 @@ class SupportedFrameDuration(enum.IntFlag):
DURATION_10000_US_PREFERRED = 0b0010
# -----------------------------------------------------------------------------
# ASE Operations
# -----------------------------------------------------------------------------
class ASE_Operation:
'''
See Audio Stream Control Service - 5 ASE Control operations.
'''
classes: Dict[int, Type[ASE_Operation]] = {}
op_code: int
name: str
fields: Optional[Sequence[Any]] = None
ase_id: List[int]
class Opcode(enum.IntEnum):
# fmt: off
CONFIG_CODEC = 0x01
CONFIG_QOS = 0x02
ENABLE = 0x03
RECEIVER_START_READY = 0x04
DISABLE = 0x05
RECEIVER_STOP_READY = 0x06
UPDATE_METADATA = 0x07
RELEASE = 0x08
@staticmethod
def from_bytes(pdu: bytes) -> ASE_Operation:
op_code = pdu[0]
cls = ASE_Operation.classes.get(op_code)
if cls is None:
instance = ASE_Operation(pdu)
instance.name = ASE_Operation.Opcode(op_code).name
instance.op_code = op_code
return instance
self = cls.__new__(cls)
ASE_Operation.__init__(self, pdu)
if self.fields is not None:
self.init_from_bytes(pdu, 1)
return self
@staticmethod
def subclass(fields):
def inner(cls: Type[ASE_Operation]):
try:
operation = ASE_Operation.Opcode[cls.__name__[4:].upper()]
cls.name = operation.name
cls.op_code = operation
except:
raise KeyError(f'PDU name {cls.name} not found in Ase_Operation.Opcode')
cls.fields = fields
# Register a factory for this class
ASE_Operation.classes[cls.op_code] = cls
return cls
return inner
def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None:
if self.fields is not None and kwargs:
hci.HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
pdu = bytes([self.op_code]) + hci.HCI_Object.dict_to_bytes(
kwargs, self.fields
)
self.pdu = pdu
def init_from_bytes(self, pdu: bytes, offset: int):
return hci.HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self) -> bytes:
return self.pdu
def __str__(self) -> str:
result = f'{colors.color(self.name, "yellow")} '
if fields := getattr(self, 'fields', None):
result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if len(self.pdu) > 1:
result += f': {self.pdu.hex()}'
return result
@ASE_Operation.subclass(
[
[
('ase_id', 1),
('target_latency', 1),
('target_phy', 1),
('codec_id', hci.CodingFormat.parse_from_bytes),
('codec_specific_configuration', 'v'),
],
]
)
class ASE_Config_Codec(ASE_Operation):
'''
See Audio Stream Control Service 5.1 - Config Codec Operation
'''
target_latency: List[int]
target_phy: List[int]
codec_id: List[hci.CodingFormat]
codec_specific_configuration: List[bytes]
@ASE_Operation.subclass(
[
[
('ase_id', 1),
('cig_id', 1),
('cis_id', 1),
('sdu_interval', 3),
('framing', 1),
('phy', 1),
('max_sdu', 2),
('retransmission_number', 1),
('max_transport_latency', 2),
('presentation_delay', 3),
],
]
)
class ASE_Config_QOS(ASE_Operation):
'''
See Audio Stream Control Service 5.2 - Config Qos Operation
'''
cig_id: List[int]
cis_id: List[int]
sdu_interval: List[int]
framing: List[int]
phy: List[int]
max_sdu: List[int]
retransmission_number: List[int]
max_transport_latency: List[int]
presentation_delay: List[int]
@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]])
class ASE_Enable(ASE_Operation):
'''
See Audio Stream Control Service 5.3 - Enable Operation
'''
metadata: bytes
@ASE_Operation.subclass([[('ase_id', 1)]])
class ASE_Receiver_Start_Ready(ASE_Operation):
'''
See Audio Stream Control Service 5.4 - Receiver Start Ready Operation
'''
@ASE_Operation.subclass([[('ase_id', 1)]])
class ASE_Disable(ASE_Operation):
'''
See Audio Stream Control Service 5.5 - Disable Operation
'''
@ASE_Operation.subclass([[('ase_id', 1)]])
class ASE_Receiver_Stop_Ready(ASE_Operation):
'''
See Audio Stream Control Service 5.6 - Receiver Stop Ready Operation
'''
@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]])
class ASE_Update_Metadata(ASE_Operation):
'''
See Audio Stream Control Service 5.7 - Update Metadata Operation
'''
metadata: List[bytes]
@ASE_Operation.subclass([[('ase_id', 1)]])
class ASE_Release(ASE_Operation):
'''
See Audio Stream Control Service 5.8 - Release Operation
'''
class AseResponseCode(enum.IntEnum):
# fmt: off
SUCCESS = 0x00
UNSUPPORTED_OPCODE = 0x01
INVALID_LENGTH = 0x02
INVALID_ASE_ID = 0x03
INVALID_ASE_STATE_MACHINE_TRANSITION = 0x04
INVALID_ASE_DIRECTION = 0x05
UNSUPPORTED_AUDIO_CAPABILITIES = 0x06
UNSUPPORTED_CONFIGURATION_PARAMETER_VALUE = 0x07
REJECTED_CONFIGURATION_PARAMETER_VALUE = 0x08
INVALID_CONFIGURATION_PARAMETER_VALUE = 0x09
UNSUPPORTED_METADATA = 0x0A
REJECTED_METADATA = 0x0B
INVALID_METADATA = 0x0C
INSUFFICIENT_RESOURCES = 0x0D
UNSPECIFIED_ERROR = 0x0E
class AseReasonCode(enum.IntEnum):
# fmt: off
NONE = 0x00
CODEC_ID = 0x01
CODEC_SPECIFIC_CONFIGURATION = 0x02
SDU_INTERVAL = 0x03
FRAMING = 0x04
PHY = 0x05
MAXIMUM_SDU_SIZE = 0x06
RETRANSMISSION_NUMBER = 0x07
MAX_TRANSPORT_LATENCY = 0x08
PRESENTATION_DELAY = 0x09
INVALID_ASE_CIS_MAPPING = 0x0A
class AudioRole(enum.IntEnum):
SINK = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST
SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
@@ -325,6 +558,80 @@ class CodecSpecificCapabilities:
)
@dataclasses.dataclass
class CodecSpecificConfiguration:
'''See:
* Bluetooth Assigned Numbers, 6.12.5 - Codec Specific Configuration LTV Structures
* Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements
'''
class Type(enum.IntEnum):
# fmt: off
SAMPLING_FREQUENCY = 0x01
FRAME_DURATION = 0x02
AUDIO_CHANNEL_ALLOCATION = 0x03
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency
frame_duration: FrameDuration
audio_channel_allocation: AudioLocation
octets_per_codec_frame: int
codec_frames_per_sdu: int
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0
# Allowed default values.
audio_channel_allocation = AudioLocation.NOT_ALLOWED
codec_frames_per_sdu = 1
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
value = int.from_bytes(data[offset : offset + length - 1], 'little')
offset += length - 1
if type == CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY:
sampling_frequency = SamplingFrequency(value)
elif type == CodecSpecificConfiguration.Type.FRAME_DURATION:
frame_duration = FrameDuration(value)
elif type == CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION:
audio_channel_allocation = AudioLocation(value)
elif type == CodecSpecificConfiguration.Type.OCTETS_PER_FRAME:
octets_per_codec_frame = value
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency,
frame_duration=frame_duration,
audio_channel_allocation=audio_channel_allocation,
octets_per_codec_frame=octets_per_codec_frame,
codec_frames_per_sdu=codec_frames_per_sdu,
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBBBBBBBIBBHBBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
)
@dataclasses.dataclass
class PacRecord:
coding_format: hci.CodingFormat
@@ -452,6 +759,429 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService):
super().__init__(characteristics)
class AseStateMachine(gatt.Characteristic):
class State(enum.IntEnum):
# fmt: off
IDLE = 0x00
CODEC_CONFIGURED = 0x01
QOS_CONFIGURED = 0x02
ENABLING = 0x03
STREAMING = 0x04
DISABLING = 0x05
RELEASING = 0x06
cis_link: Optional[device.CisLink] = None
# Additional parameters in CODEC_CONFIGURED State
preferred_framing = 0 # Unframed PDU supported
preferred_phy = 0
preferred_retransmission_number = 13
preferred_max_transport_latency = 100
supported_presentation_delay_min = 0
supported_presentation_delay_max = 0
preferred_presentation_delay_min = 0
preferred_presentation_delay_max = 0
codec_id = hci.CodingFormat(hci.CodecID.LC3)
codec_specific_configuration: Union[CodecSpecificConfiguration, bytes] = b''
# Additional parameters in QOS_CONFIGURED State
cig_id = 0
cis_id = 0
sdu_interval = 0
framing = 0
phy = 0
max_sdu = 0
retransmission_number = 0
max_transport_latency = 0
presentation_delay = 0
# Additional parameters in ENABLING, STREAMING, DISABLING State
# TODO: Parse this
metadata = b''
def __init__(
self,
role: AudioRole,
ase_id: int,
service: AudioStreamControlService,
) -> None:
self.service = service
self.ase_id = ase_id
self._state = AseStateMachine.State.IDLE
self.role = role
uuid = (
gatt.GATT_SINK_ASE_CHARACTERISTIC
if role == AudioRole.SINK
else gatt.GATT_SOURCE_ASE_CHARACTERISTIC
)
super().__init__(
uuid=uuid,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=gatt.CharacteristicValue(read=self.on_read),
)
self.service.device.on('cis_request', self.on_cis_request)
self.service.device.on('cis_establishment', self.on_cis_establishment)
def on_cis_request(
self,
acl_connection: device.Connection,
cis_handle: int,
cig_id: int,
cis_id: int,
) -> None:
if cis_id == self.cis_id and self.state == self.State.ENABLING:
acl_connection.abort_on(
'flush', self.service.device.accept_cis_request(cis_handle)
)
def on_cis_establishment(self, cis_link: device.CisLink) -> None:
if cis_link.cis_id == self.cis_id and self.state == self.State.ENABLING:
self.state = self.State.STREAMING
self.cis_link = cis_link
async def post_cis_established():
await self.service.device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=cis_link.handle,
data_path_direction=self.role,
data_path_id=0x00, # Fixed HCI
codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
)
)
await self.service.device.notify_subscribers(self, self.value)
cis_link.acl_connection.abort_on('flush', post_cis_established())
def on_config_codec(
self,
target_latency: int,
target_phy: int,
codec_id: hci.CodingFormat,
codec_specific_configuration: bytes,
) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state not in (
self.State.IDLE,
self.State.CODEC_CONFIGURED,
self.State.QOS_CONFIGURED,
):
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.max_transport_latency = target_latency
self.phy = target_phy
self.codec_id = codec_id
if codec_id.codec_id == hci.CodecID.VENDOR_SPECIFIC:
self.codec_specific_configuration = codec_specific_configuration
else:
self.codec_specific_configuration = CodecSpecificConfiguration.from_bytes(
codec_specific_configuration
)
self.state = self.State.CODEC_CONFIGURED
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_config_qos(
self,
cig_id: int,
cis_id: int,
sdu_interval: int,
framing: int,
phy: int,
max_sdu: int,
retransmission_number: int,
max_transport_latency: int,
presentation_delay: int,
) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state not in (
AseStateMachine.State.CODEC_CONFIGURED,
AseStateMachine.State.QOS_CONFIGURED,
):
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.cig_id = cig_id
self.cis_id = cis_id
self.sdu_interval = sdu_interval
self.framing = framing
self.phy = phy
self.max_sdu = max_sdu
self.retransmission_number = retransmission_number
self.max_transport_latency = max_transport_latency
self.presentation_delay = presentation_delay
self.state = self.State.QOS_CONFIGURED
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_enable(self, metadata: bytes) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state != AseStateMachine.State.QOS_CONFIGURED:
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.metadata = metadata
self.state = self.State.ENABLING
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_receiver_start_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state != AseStateMachine.State.ENABLING:
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.state = self.State.STREAMING
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_disable(self) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state not in (
AseStateMachine.State.ENABLING,
AseStateMachine.State.STREAMING,
):
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.state = self.State.DISABLING
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state != AseStateMachine.State.DISABLING:
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.state = self.State.QOS_CONFIGURED
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_update_metadata(
self, metadata: bytes
) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state not in (
AseStateMachine.State.ENABLING,
AseStateMachine.State.STREAMING,
):
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.metadata = metadata
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_release(self) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state == AseStateMachine.State.IDLE:
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.state = self.State.RELEASING
async def remove_cis_async():
await self.service.device.send_command(
hci.HCI_LE_Remove_ISO_Data_Path_Command(
connection_handle=self.cis_link.handle,
data_path_direction=self.role,
)
)
self.state = self.State.IDLE
await self.service.device.notify_subscribers(self, self.value)
self.service.device.abort_on('flush', remove_cis_async())
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
@property
def state(self) -> State:
return self._state
@state.setter
def state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}')
self._state = new_state
@property
def value(self):
'''Returns ASE_ID, ASE_STATE, and ASE Additional Parameters.'''
if self.state == self.State.CODEC_CONFIGURED:
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
additional_parameters = (
struct.pack(
'<BBBH',
self.preferred_framing,
self.preferred_phy,
self.preferred_retransmission_number,
self.preferred_max_transport_latency,
)
+ self.supported_presentation_delay_min.to_bytes(3, 'little')
+ self.supported_presentation_delay_max.to_bytes(3, 'little')
+ self.preferred_presentation_delay_min.to_bytes(3, 'little')
+ self.preferred_presentation_delay_max.to_bytes(3, 'little')
+ bytes(self.codec_id)
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
)
elif self.state == self.State.QOS_CONFIGURED:
additional_parameters = (
bytes([self.cig_id, self.cis_id])
+ self.sdu_interval.to_bytes(3, 'little')
+ struct.pack(
'<BBHBH',
self.framing,
self.phy,
self.max_sdu,
self.retransmission_number,
self.max_transport_latency,
)
+ self.presentation_delay.to_bytes(3, 'little')
)
elif self.state in (
self.State.ENABLING,
self.State.STREAMING,
self.State.DISABLING,
):
additional_parameters = (
bytes([self.cig_id, self.cis_id, len(self.metadata)]) + self.metadata
)
else:
additional_parameters = b''
return bytes([self.ase_id, self.state]) + additional_parameters
@value.setter
def value(self, _new_value):
# Readonly. Do nothing in the setter.
pass
def on_read(self, _: device.Connection) -> bytes:
return self.value
def __str__(self) -> str:
return (
f'AseStateMachine(id={self.ase_id}, role={self.role.name} '
f'state={self._state.name})'
)
class AudioStreamControlService(gatt.TemplateService):
UUID = gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE
ase_state_machines: Dict[int, AseStateMachine]
ase_control_point: gatt.Characteristic
def __init__(
self,
device: device.Device,
source_ase_id: Sequence[int] = [],
sink_ase_id: Sequence[int] = [],
) -> None:
self.device = device
self.ase_state_machines = {
**{
id: AseStateMachine(role=AudioRole.SINK, ase_id=id, service=self)
for id in sink_ase_id
},
**{
id: AseStateMachine(role=AudioRole.SOURCE, ase_id=id, service=self)
for id in source_ase_id
},
} # ASE state machines, by ASE ID
self.ase_control_point = gatt.Characteristic(
uuid=gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.WRITEABLE,
value=gatt.CharacteristicValue(write=self.on_write_ase_control_point),
)
super().__init__([self.ase_control_point, *self.ase_state_machines.values()])
def on_operation(self, opcode: ASE_Operation.Opcode, ase_id: int, args):
if ase := self.ase_state_machines.get(ase_id):
handler = getattr(ase, 'on_' + opcode.name.lower())
return (ase_id, *handler(*args))
else:
return (ase_id, AseResponseCode.INVALID_ASE_ID, AseReasonCode.NONE)
def on_write_ase_control_point(self, connection, data):
operation = ASE_Operation.from_bytes(data)
responses = []
logger.debug(f'*** ASCS Write {operation} ***')
if operation.op_code == ASE_Operation.Opcode.CONFIG_CODEC:
for ase_id, *args in zip(
operation.ase_id,
operation.target_latency,
operation.target_phy,
operation.codec_id,
operation.codec_specific_configuration,
):
responses.append(self.on_operation(operation.op_code, ase_id, args))
elif operation.op_code == ASE_Operation.Opcode.CONFIG_QOS:
for ase_id, *args in zip(
operation.ase_id,
operation.cig_id,
operation.cis_id,
operation.sdu_interval,
operation.framing,
operation.phy,
operation.max_sdu,
operation.retransmission_number,
operation.max_transport_latency,
operation.presentation_delay,
):
responses.append(self.on_operation(operation.op_code, ase_id, args))
elif operation.op_code in (
ASE_Operation.Opcode.ENABLE,
ASE_Operation.Opcode.UPDATE_METADATA,
):
for ase_id, *args in zip(
operation.ase_id,
operation.metadata,
):
responses.append(self.on_operation(operation.op_code, ase_id, args))
elif operation.op_code in (
ASE_Operation.Opcode.RECEIVER_START_READY,
ASE_Operation.Opcode.DISABLE,
ASE_Operation.Opcode.RECEIVER_STOP_READY,
ASE_Operation.Opcode.RELEASE,
):
for ase_id in operation.ase_id:
responses.append(self.on_operation(operation.op_code, ase_id, []))
control_point_notification = bytes(
[operation.op_code, len(responses)]
) + b''.join(map(bytes, responses))
self.device.abort_on(
'flush',
self.device.notify_subscribers(
self.ase_control_point, control_point_notification
),
)
for ase_id, *_ in responses:
if ase := self.ase_state_machines.get(ase_id):
self.device.abort_on(
'flush',
self.device.notify_subscribers(ase, ase.value),
)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
@@ -494,3 +1224,24 @@ class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = characteristics[0]
class AudioStreamControlServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = AudioStreamControlService
sink_ase: List[gatt_client.CharacteristicProxy]
source_ase: List[gatt_client.CharacteristicProxy]
ase_control_point: gatt_client.CharacteristicProxy
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.sink_ase = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_ASE_CHARACTERISTIC
)
self.source_ase = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_ASE_CHARACTERISTIC
)
self.ase_control_point = service_proxy.get_characteristics_by_uuid(
gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC
)[0]
+1
View File
@@ -1,5 +1,6 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"address": "F0:F1:F2:F3:F4:FA",
"advertising_interval": 100
}
+51 -1
View File
@@ -19,12 +19,14 @@ import asyncio
import logging
import sys
import os
import struct
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.device import Device, CisLink
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_IsoDataPacket,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
@@ -35,6 +37,7 @@ from bumble.profiles.bap import (
SupportedFrameDuration,
PacRecord,
PublishedAudioCapabilitiesService,
AudioStreamControlService,
)
from bumble.transport import open_transport_or_link
@@ -103,6 +106,8 @@ async def main() -> None:
)
)
device.add_service(AudioStreamControlService(device, sink_ase_id=[1, 2]))
advertising_data = bytes(
AdvertisingData(
[
@@ -110,6 +115,16 @@ async def main() -> None:
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.FLAGS,
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_HOST_FLAG
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
]
),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
@@ -117,6 +132,41 @@ async def main() -> None:
]
)
)
subprocess = await asyncio.create_subprocess_shell(
f'dlc3 | ffplay pipe:0',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdin = subprocess.stdin
assert stdin
# Write a fake LC3 header to dlc3.
stdin.write(
bytes([0x1C, 0xCC]) # Header.
+ struct.pack(
'<HHHHHHI',
18, # Header length.
24000 // 100, # Sampling Rate(/100Hz).
0, # Bitrate(unused).
1, # Channels.
10000 // 10, # Frame duration(/10us).
0, # RFU.
0x0FFFFFFF, # Frame counts.
)
)
def on_pdu(pdu: HCI_IsoDataPacket):
# LC3 format: |frame_length(2)| + |frame(length)|.
if pdu.iso_sdu_length:
stdin.write(struct.pack('<H', pdu.iso_sdu_length))
stdin.write(pdu.iso_sdu_fragment)
def on_cis(cis_link: CisLink):
cis_link.on('pdu', on_pdu)
device.once('cis_establishment', on_cis)
await device.start_extended_advertising(
advertising_properties=(
+251
View File
@@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
import asyncio
import os
import functools
import pytest
import logging
@@ -24,11 +25,26 @@ from bumble import device
from bumble.hci import CodecID, CodingFormat
from bumble.profiles.bap import (
AudioLocation,
AseStateMachine,
ASE_Operation,
ASE_Config_Codec,
ASE_Config_QOS,
ASE_Disable,
ASE_Enable,
ASE_Receiver_Start_Ready,
ASE_Receiver_Stop_Ready,
ASE_Release,
ASE_Update_Metadata,
SupportedFrameDuration,
SupportedSamplingFrequency,
SamplingFrequency,
FrameDuration,
CodecSpecificCapabilities,
CodecSpecificConfiguration,
ContextType,
PacRecord,
AudioStreamControlService,
AudioStreamControlServiceProxy,
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
@@ -40,6 +56,13 @@ from .test_utils import TwoDevices
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def basic_check(operation: ASE_Operation):
serialized = bytes(operation)
parsed = ASE_Operation.from_bytes(serialized)
assert bytes(parsed) == serialized
# -----------------------------------------------------------------------------
def test_codec_specific_capabilities() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
@@ -85,6 +108,92 @@ def test_vendor_specific_pac_record() -> None:
assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
# -----------------------------------------------------------------------------
def test_ASE_Config_Codec() -> None:
operation = ASE_Config_Codec(
ase_id=[1, 2],
target_latency=[3, 4],
target_phy=[5, 6],
codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
codec_specific_configuration=[b'foo', b'bar'],
)
basic_check(operation)
# -----------------------------------------------------------------------------
def test_ASE_Config_QOS() -> None:
operation = ASE_Config_QOS(
ase_id=[1, 2],
cig_id=[1, 2],
cis_id=[3, 4],
sdu_interval=[5, 6],
framing=[0, 1],
phy=[2, 3],
max_sdu=[4, 5],
retransmission_number=[6, 7],
max_transport_latency=[8, 9],
presentation_delay=[10, 11],
)
basic_check(operation)
# -----------------------------------------------------------------------------
def test_ASE_Enable() -> None:
operation = ASE_Enable(
ase_id=[1, 2],
metadata=[b'foo', b'bar'],
)
basic_check(operation)
# -----------------------------------------------------------------------------
def test_ASE_Update_Metadata() -> None:
operation = ASE_Update_Metadata(
ase_id=[1, 2],
metadata=[b'foo', b'bar'],
)
basic_check(operation)
# -----------------------------------------------------------------------------
def test_ASE_Disable() -> None:
operation = ASE_Disable(ase_id=[1, 2])
basic_check(operation)
# -----------------------------------------------------------------------------
def test_ASE_Release() -> None:
operation = ASE_Release(ase_id=[1, 2])
basic_check(operation)
# -----------------------------------------------------------------------------
def test_ASE_Receiver_Start_Ready() -> None:
operation = ASE_Receiver_Start_Ready(ase_id=[1, 2])
basic_check(operation)
# -----------------------------------------------------------------------------
def test_ASE_Receiver_Stop_Ready() -> None:
operation = ASE_Receiver_Stop_Ready(ase_id=[1, 2])
basic_check(operation)
# -----------------------------------------------------------------------------
def test_codec_specific_configuration() -> None:
SAMPLE_FREQUENCY = SamplingFrequency.FREQ_16000
FRAME_SURATION = FrameDuration.DURATION_10000_US
AUDIO_LOCATION = AudioLocation.FRONT_LEFT
config = CodecSpecificConfiguration(
sampling_frequency=SAMPLE_FREQUENCY,
frame_duration=FRAME_SURATION,
audio_channel_allocation=AUDIO_LOCATION,
octets_per_codec_frame=60,
codec_frames_per_sdu=1,
)
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():
@@ -140,6 +249,148 @@ async def test_pacs():
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ascs():
devices = TwoDevices()
devices[0].add_service(
AudioStreamControlService(device=devices[0], sink_ase_id=[1, 2])
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
ascs_client = await peer.discover_service_and_create_proxy(
AudioStreamControlServiceProxy
)
notifications = {1: asyncio.Queue(), 2: asyncio.Queue()}
def on_notification(data: bytes, ase_id: int):
notifications[ase_id].put_nowait(data)
# Should be idle
assert await ascs_client.sink_ase[0].read_value() == bytes(
[1, AseStateMachine.State.IDLE]
)
assert await ascs_client.sink_ase[1].read_value() == bytes(
[2, AseStateMachine.State.IDLE]
)
# Subscribe
await ascs_client.sink_ase[0].subscribe(
functools.partial(on_notification, ase_id=1)
)
await ascs_client.sink_ase[1].subscribe(
functools.partial(on_notification, ase_id=2)
)
# Config Codec
config = CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
audio_channel_allocation=AudioLocation.FRONT_LEFT,
octets_per_codec_frame=120,
codec_frames_per_sdu=1,
)
await ascs_client.ase_control_point.write_value(
ASE_Config_Codec(
ase_id=[1, 2],
target_latency=[3, 4],
target_phy=[5, 6],
codec_id=[CodingFormat(CodecID.LC3), CodingFormat(CodecID.LC3)],
codec_specific_configuration=[config, config],
)
)
assert (await notifications[1].get())[:2] == bytes(
[1, AseStateMachine.State.CODEC_CONFIGURED]
)
assert (await notifications[2].get())[:2] == bytes(
[2, AseStateMachine.State.CODEC_CONFIGURED]
)
# Config QOS
await ascs_client.ase_control_point.write_value(
ASE_Config_QOS(
ase_id=[1, 2],
cig_id=[1, 2],
cis_id=[3, 4],
sdu_interval=[5, 6],
framing=[0, 1],
phy=[2, 3],
max_sdu=[4, 5],
retransmission_number=[6, 7],
max_transport_latency=[8, 9],
presentation_delay=[10, 11],
)
)
assert (await notifications[1].get())[:2] == bytes(
[1, AseStateMachine.State.QOS_CONFIGURED]
)
assert (await notifications[2].get())[:2] == bytes(
[2, AseStateMachine.State.QOS_CONFIGURED]
)
# Enable
await ascs_client.ase_control_point.write_value(
ASE_Enable(
ase_id=[1, 2],
metadata=[b'foo', b'bar'],
)
)
assert (await notifications[1].get())[:2] == bytes(
[1, AseStateMachine.State.ENABLING]
)
assert (await notifications[2].get())[:2] == bytes(
[2, AseStateMachine.State.ENABLING]
)
# CIS establishment
devices[0].emit(
'cis_establishment',
device.CisLink(
device=devices[0],
acl_connection=devices.connections[0],
handle=5,
cis_id=3,
cig_id=1,
),
)
devices[0].emit(
'cis_establishment',
device.CisLink(
device=devices[0],
acl_connection=devices.connections[0],
handle=6,
cis_id=4,
cig_id=2,
),
)
assert (await notifications[1].get())[:2] == bytes(
[1, AseStateMachine.State.STREAMING]
)
assert (await notifications[2].get())[:2] == bytes(
[2, AseStateMachine.State.STREAMING]
)
# Release
await ascs_client.ase_control_point.write_value(
ASE_Release(
ase_id=[1, 2],
metadata=[b'foo', b'bar'],
)
)
assert (await notifications[1].get())[:2] == bytes(
[1, AseStateMachine.State.RELEASING]
)
assert (await notifications[2].get())[:2] == bytes(
[2, AseStateMachine.State.RELEASING]
)
assert (await notifications[1].get())[:2] == bytes([1, AseStateMachine.State.IDLE])
assert (await notifications[2].get())[:2] == bytes([2, AseStateMachine.State.IDLE])
await asyncio.sleep(0.001)
# -----------------------------------------------------------------------------
async def run():
await test_pacs()