mirror of
https://github.com/google/bumble.git
synced 2026-06-01 07:37:02 +00:00
Parse CodecSpecificConfiguration
This commit is contained in:
+86
-5
@@ -553,6 +553,80 @@ class CodecSpecificCapabilities:
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class CodecSpecificConfiguration:
|
||||
'''See:
|
||||
* Bluetooth Assigned Numbers, 6.12.5 - Codec Specific Configuration LTV Structures
|
||||
* Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements
|
||||
'''
|
||||
|
||||
class Type(enum.IntEnum):
|
||||
# fmt: off
|
||||
SAMPLING_FREQUENCY = 0x01
|
||||
FRAME_DURATION = 0x02
|
||||
AUDIO_CHANNEL_ALLOCATION = 0x03
|
||||
OCTETS_PER_FRAME = 0x04
|
||||
CODEC_FRAMES_PER_SDU = 0x05
|
||||
|
||||
sampling_frequency: SamplingFrequency
|
||||
frame_duration: FrameDuration
|
||||
audio_channel_allocation: AudioLocation
|
||||
octets_per_codec_frame: int
|
||||
codec_frames_per_sdu: int
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
|
||||
offset = 0
|
||||
# Allowed default values.
|
||||
audio_channel_allocation = AudioLocation.NOT_ALLOWED
|
||||
codec_frames_per_sdu = 1
|
||||
while offset < len(data):
|
||||
length, type = struct.unpack_from('BB', data, offset)
|
||||
offset += 2
|
||||
value = int.from_bytes(data[offset : offset + length - 1], 'little')
|
||||
offset += length - 1
|
||||
|
||||
if type == CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY:
|
||||
sampling_frequency = SamplingFrequency(value)
|
||||
elif type == CodecSpecificConfiguration.Type.FRAME_DURATION:
|
||||
frame_duration = FrameDuration(value)
|
||||
elif type == CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION:
|
||||
audio_channel_allocation = AudioLocation(value)
|
||||
elif type == CodecSpecificConfiguration.Type.OCTETS_PER_FRAME:
|
||||
octets_per_codec_frame = value
|
||||
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
|
||||
codec_frames_per_sdu = value
|
||||
|
||||
# It is expected here that if some fields are missing, an error should be raised.
|
||||
return CodecSpecificConfiguration(
|
||||
sampling_frequency=sampling_frequency,
|
||||
frame_duration=frame_duration,
|
||||
audio_channel_allocation=audio_channel_allocation,
|
||||
octets_per_codec_frame=octets_per_codec_frame,
|
||||
codec_frames_per_sdu=codec_frames_per_sdu,
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return struct.pack(
|
||||
'<BBBBBBBBIBBHBBB',
|
||||
2,
|
||||
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
|
||||
self.sampling_frequency,
|
||||
2,
|
||||
CodecSpecificConfiguration.Type.FRAME_DURATION,
|
||||
self.frame_duration,
|
||||
5,
|
||||
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
|
||||
self.audio_channel_allocation,
|
||||
3,
|
||||
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
|
||||
self.octets_per_codec_frame,
|
||||
2,
|
||||
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
|
||||
self.codec_frames_per_sdu,
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PacRecord:
|
||||
coding_format: hci.CodingFormat
|
||||
@@ -701,8 +775,7 @@ class AseStateMachine(gatt.Characteristic):
|
||||
preferred_presentation_delay_min = 0
|
||||
preferred_presentation_delay_max = 0
|
||||
codec_id = hci.CodingFormat(hci.CodecID.LC3)
|
||||
# TODO: Parse this
|
||||
codec_specific_configuration = b''
|
||||
codec_specific_configuration: Union[CodecSpecificConfiguration, bytes] = b''
|
||||
|
||||
# Additional parameters in QOS_CONFIGURED State
|
||||
cig_id = 0
|
||||
@@ -785,7 +858,12 @@ class AseStateMachine(gatt.Characteristic):
|
||||
self.max_transport_latency = target_latency
|
||||
self.phy = target_phy
|
||||
self.codec_id = codec_id
|
||||
self.codec_specific_configuration = codec_specific_configuration
|
||||
if codec_id.codec_id == hci.CodecID.VENDOR_SPECIFIC:
|
||||
self.codec_specific_configuration = codec_specific_configuration
|
||||
else:
|
||||
self.codec_specific_configuration = CodecSpecificConfiguration.from_bytes(
|
||||
codec_specific_configuration
|
||||
)
|
||||
|
||||
self.state = self.State.CODEC_CONFIGURED
|
||||
|
||||
@@ -896,6 +974,9 @@ class AseStateMachine(gatt.Characteristic):
|
||||
'''Returns ASE_ID, ASE_STATE, and ASE Additional Parameters.'''
|
||||
|
||||
if self.state == self.State.CODEC_CONFIGURED:
|
||||
codec_specific_configuration_bytes = bytes(
|
||||
self.codec_specific_configuration
|
||||
)
|
||||
additional_parameters = (
|
||||
struct.pack(
|
||||
'<BBBH',
|
||||
@@ -909,8 +990,8 @@ class AseStateMachine(gatt.Characteristic):
|
||||
+ self.preferred_presentation_delay_min.to_bytes(3, 'little')
|
||||
+ self.preferred_presentation_delay_max.to_bytes(3, 'little')
|
||||
+ bytes(self.codec_id)
|
||||
+ bytes([len(self.codec_specific_configuration)])
|
||||
+ self.codec_specific_configuration
|
||||
+ bytes([len(codec_specific_configuration_bytes)])
|
||||
+ codec_specific_configuration_bytes
|
||||
)
|
||||
elif self.state == self.State.QOS_CONFIGURED:
|
||||
additional_parameters = (
|
||||
|
||||
Reference in New Issue
Block a user