Add a2dp_test.py tests for a2dp.py

This commit is contained in:
khsiao-google
2025-09-13 06:37:22 +00:00
parent 7d8addb849
commit 21bf69592c

View File

@@ -18,14 +18,12 @@
import asyncio import asyncio
import logging import logging
import os import os
import struct
from typing import Awaitable
import pytest import pytest
from bumble.a2dp import ( from bumble import a2dp
AacMediaCodecInformation,
OpusMediaCodecInformation,
SbcMediaCodecInformation,
)
from bumble.avdtp import ( from bumble.avdtp import (
A2DP_SBC_CODEC_TYPE, A2DP_SBC_CODEC_TYPE,
AVDTP_AUDIO_MEDIA_TYPE, AVDTP_AUDIO_MEDIA_TYPE,
@@ -82,6 +80,24 @@ class TwoDevices:
self.paired[which] = keys self.paired[which] = keys
# -----------------------------------------------------------------------------
class Data:
pointer: int = 0
data: bytes
def __init__(self, data: bytes):
self.data = data
async def read(self, length: int) -> Awaitable[bytes]:
def generate_read():
end = min(self.pointer + length, len(self.data))
chunk = self.data[self.pointer : end]
self.pointer = end
return chunk
return generate_read()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_self_connection(): async def test_self_connection():
@@ -122,12 +138,12 @@ def source_codec_capabilities():
return MediaCodecCapabilities( return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE, media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=SbcMediaCodecInformation( media_codec_information=a2dp.SbcMediaCodecInformation(
sampling_frequency=SbcMediaCodecInformation.SamplingFrequency.SF_44100, sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100,
channel_mode=SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO,
block_length=SbcMediaCodecInformation.BlockLength.BL_16, block_length=a2dp.SbcMediaCodecInformation.BlockLength.BL_16,
subbands=SbcMediaCodecInformation.Subbands.S_8, subbands=a2dp.SbcMediaCodecInformation.Subbands.S_8,
allocation_method=SbcMediaCodecInformation.AllocationMethod.LOUDNESS, allocation_method=a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS,
minimum_bitpool_value=2, minimum_bitpool_value=2,
maximum_bitpool_value=53, maximum_bitpool_value=53,
), ),
@@ -139,23 +155,23 @@ def sink_codec_capabilities():
return MediaCodecCapabilities( return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE, media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=SbcMediaCodecInformation( media_codec_information=a2dp.SbcMediaCodecInformation(
sampling_frequency=SbcMediaCodecInformation.SamplingFrequency.SF_48000 sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000
| SbcMediaCodecInformation.SamplingFrequency.SF_44100 | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100
| SbcMediaCodecInformation.SamplingFrequency.SF_32000 | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_32000
| SbcMediaCodecInformation.SamplingFrequency.SF_16000, | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_16000,
channel_mode=SbcMediaCodecInformation.ChannelMode.MONO channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.MONO
| SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL | a2dp.SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL
| SbcMediaCodecInformation.ChannelMode.STEREO | a2dp.SbcMediaCodecInformation.ChannelMode.STEREO
| SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, | a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO,
block_length=SbcMediaCodecInformation.BlockLength.BL_4 block_length=a2dp.SbcMediaCodecInformation.BlockLength.BL_4
| SbcMediaCodecInformation.BlockLength.BL_8 | a2dp.SbcMediaCodecInformation.BlockLength.BL_8
| SbcMediaCodecInformation.BlockLength.BL_12 | a2dp.SbcMediaCodecInformation.BlockLength.BL_12
| SbcMediaCodecInformation.BlockLength.BL_16, | a2dp.SbcMediaCodecInformation.BlockLength.BL_16,
subbands=SbcMediaCodecInformation.Subbands.S_4 subbands=a2dp.SbcMediaCodecInformation.Subbands.S_4
| SbcMediaCodecInformation.Subbands.S_8, | a2dp.SbcMediaCodecInformation.Subbands.S_8,
allocation_method=SbcMediaCodecInformation.AllocationMethod.LOUDNESS allocation_method=a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS
| SbcMediaCodecInformation.AllocationMethod.SNR, | a2dp.SbcMediaCodecInformation.AllocationMethod.SNR,
minimum_bitpool_value=2, minimum_bitpool_value=2,
maximum_bitpool_value=53, maximum_bitpool_value=53,
), ),
@@ -274,52 +290,54 @@ async def test_source_sink_1():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_sbc_codec_specific_information(): def test_sbc_codec_specific_information():
sbc_info = SbcMediaCodecInformation.from_bytes(bytes.fromhex("3fff0235")) sbc_info = a2dp.SbcMediaCodecInformation.from_bytes(bytes.fromhex("3fff0235"))
assert ( assert (
sbc_info.sampling_frequency sbc_info.sampling_frequency
== SbcMediaCodecInformation.SamplingFrequency.SF_44100 == a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100
| SbcMediaCodecInformation.SamplingFrequency.SF_48000 | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000
) )
assert ( assert (
sbc_info.channel_mode sbc_info.channel_mode
== SbcMediaCodecInformation.ChannelMode.MONO == a2dp.SbcMediaCodecInformation.ChannelMode.MONO
| SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL | a2dp.SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL
| SbcMediaCodecInformation.ChannelMode.STEREO | a2dp.SbcMediaCodecInformation.ChannelMode.STEREO
| SbcMediaCodecInformation.ChannelMode.JOINT_STEREO | a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO
) )
assert ( assert (
sbc_info.block_length sbc_info.block_length
== SbcMediaCodecInformation.BlockLength.BL_4 == a2dp.SbcMediaCodecInformation.BlockLength.BL_4
| SbcMediaCodecInformation.BlockLength.BL_8 | a2dp.SbcMediaCodecInformation.BlockLength.BL_8
| SbcMediaCodecInformation.BlockLength.BL_12 | a2dp.SbcMediaCodecInformation.BlockLength.BL_12
| SbcMediaCodecInformation.BlockLength.BL_16 | a2dp.SbcMediaCodecInformation.BlockLength.BL_16
) )
assert ( assert (
sbc_info.subbands sbc_info.subbands
== SbcMediaCodecInformation.Subbands.S_4 | SbcMediaCodecInformation.Subbands.S_8 == a2dp.SbcMediaCodecInformation.Subbands.S_4
| a2dp.SbcMediaCodecInformation.Subbands.S_8
) )
assert ( assert (
sbc_info.allocation_method sbc_info.allocation_method
== SbcMediaCodecInformation.AllocationMethod.SNR == a2dp.SbcMediaCodecInformation.AllocationMethod.SNR
| SbcMediaCodecInformation.AllocationMethod.LOUDNESS | a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS
) )
assert sbc_info.minimum_bitpool_value == 2 assert sbc_info.minimum_bitpool_value == 2
assert sbc_info.maximum_bitpool_value == 53 assert sbc_info.maximum_bitpool_value == 53
sbc_info2 = SbcMediaCodecInformation( sbc_info2 = a2dp.SbcMediaCodecInformation(
SbcMediaCodecInformation.SamplingFrequency.SF_44100 a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100
| SbcMediaCodecInformation.SamplingFrequency.SF_48000, | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000,
SbcMediaCodecInformation.ChannelMode.MONO a2dp.SbcMediaCodecInformation.ChannelMode.MONO
| SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL | a2dp.SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL
| SbcMediaCodecInformation.ChannelMode.STEREO | a2dp.SbcMediaCodecInformation.ChannelMode.STEREO
| SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, | a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO,
SbcMediaCodecInformation.BlockLength.BL_4 a2dp.SbcMediaCodecInformation.BlockLength.BL_4
| SbcMediaCodecInformation.BlockLength.BL_8 | a2dp.SbcMediaCodecInformation.BlockLength.BL_8
| SbcMediaCodecInformation.BlockLength.BL_12 | a2dp.SbcMediaCodecInformation.BlockLength.BL_12
| SbcMediaCodecInformation.BlockLength.BL_16, | a2dp.SbcMediaCodecInformation.BlockLength.BL_16,
SbcMediaCodecInformation.Subbands.S_4 | SbcMediaCodecInformation.Subbands.S_8, a2dp.SbcMediaCodecInformation.Subbands.S_4
SbcMediaCodecInformation.AllocationMethod.SNR | a2dp.SbcMediaCodecInformation.Subbands.S_8,
| SbcMediaCodecInformation.AllocationMethod.LOUDNESS, a2dp.SbcMediaCodecInformation.AllocationMethod.SNR
| a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS,
2, 2,
53, 53,
) )
@@ -329,36 +347,36 @@ def test_sbc_codec_specific_information():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_aac_codec_specific_information(): def test_aac_codec_specific_information():
aac_info = AacMediaCodecInformation.from_bytes(bytes.fromhex("f0018c83e800")) aac_info = a2dp.AacMediaCodecInformation.from_bytes(bytes.fromhex("f0018c83e800"))
assert ( assert (
aac_info.object_type aac_info.object_type
== AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC == a2dp.AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC
| AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC
| AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP
| AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE
) )
assert ( assert (
aac_info.sampling_frequency aac_info.sampling_frequency
== AacMediaCodecInformation.SamplingFrequency.SF_44100 == a2dp.AacMediaCodecInformation.SamplingFrequency.SF_44100
| AacMediaCodecInformation.SamplingFrequency.SF_48000 | a2dp.AacMediaCodecInformation.SamplingFrequency.SF_48000
) )
assert ( assert (
aac_info.channels aac_info.channels
== AacMediaCodecInformation.Channels.MONO == a2dp.AacMediaCodecInformation.Channels.MONO
| AacMediaCodecInformation.Channels.STEREO | a2dp.AacMediaCodecInformation.Channels.STEREO
) )
assert aac_info.vbr == 1 assert aac_info.vbr == 1
assert aac_info.bitrate == 256000 assert aac_info.bitrate == 256000
aac_info2 = AacMediaCodecInformation( aac_info2 = a2dp.AacMediaCodecInformation(
AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC a2dp.AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC
| AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC
| AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP
| AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE, | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE,
AacMediaCodecInformation.SamplingFrequency.SF_44100 a2dp.AacMediaCodecInformation.SamplingFrequency.SF_44100
| AacMediaCodecInformation.SamplingFrequency.SF_48000, | a2dp.AacMediaCodecInformation.SamplingFrequency.SF_48000,
AacMediaCodecInformation.Channels.MONO a2dp.AacMediaCodecInformation.Channels.MONO
| AacMediaCodecInformation.Channels.STEREO, | a2dp.AacMediaCodecInformation.Channels.STEREO,
1, 1,
256000, 256000,
) )
@@ -368,25 +386,159 @@ def test_aac_codec_specific_information():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_opus_codec_specific_information(): def test_opus_codec_specific_information():
opus_info = OpusMediaCodecInformation.from_bytes(bytes([0x92])) opus_info = a2dp.OpusMediaCodecInformation.from_bytes(bytes([0x92]))
assert opus_info.vendor_id == OpusMediaCodecInformation.VENDOR_ID assert opus_info.vendor_id == a2dp.OpusMediaCodecInformation.VENDOR_ID
assert opus_info.codec_id == OpusMediaCodecInformation.CODEC_ID assert opus_info.codec_id == a2dp.OpusMediaCodecInformation.CODEC_ID
assert opus_info.frame_size == OpusMediaCodecInformation.FrameSize.FS_20MS assert opus_info.frame_size == a2dp.OpusMediaCodecInformation.FrameSize.FS_20MS
assert opus_info.channel_mode == OpusMediaCodecInformation.ChannelMode.STEREO assert opus_info.channel_mode == a2dp.OpusMediaCodecInformation.ChannelMode.STEREO
assert ( assert (
opus_info.sampling_frequency opus_info.sampling_frequency
== OpusMediaCodecInformation.SamplingFrequency.SF_48000 == a2dp.OpusMediaCodecInformation.SamplingFrequency.SF_48000
) )
opus_info2 = OpusMediaCodecInformation( opus_info2 = a2dp.OpusMediaCodecInformation(
OpusMediaCodecInformation.ChannelMode.STEREO, a2dp.OpusMediaCodecInformation.ChannelMode.STEREO,
OpusMediaCodecInformation.FrameSize.FS_20MS, a2dp.OpusMediaCodecInformation.FrameSize.FS_20MS,
OpusMediaCodecInformation.SamplingFrequency.SF_48000, a2dp.OpusMediaCodecInformation.SamplingFrequency.SF_48000,
) )
assert opus_info2 == opus_info assert opus_info2 == opus_info
assert opus_info2.value == bytes([0x92]) assert opus_info2.value == bytes([0x92])
# -----------------------------------------------------------------------------
async def test_sbc_parser():
header = b'\x9c\x80\x08\x00'
payload = b'\x00\x00\x00\x00\x00\x00'
data = Data(header + payload)
parser = a2dp.SbcParser(data.read)
async for frame in parser.frames:
assert frame.sampling_frequency == 44100
assert frame.block_count == 4
assert frame.channel_mode == 0
assert frame.allocation_method == 0
assert frame.subband_count == 4
assert frame.bitpool == 8
assert frame.payload == header + payload
# -----------------------------------------------------------------------------
async def test_sbc_packet_source():
header = b'\x9c\x80\x08\x00'
payload = b'\x00\x00\x00\x00\x00\x00'
data = Data((header + payload) * 2)
packet_source = a2dp.SbcPacketSource(data.read, 23)
async for packet in packet_source.packets:
assert packet.sequence_number == 0
assert packet.timestamp == 0
assert packet.payload == b'\x01' + header + payload
# -----------------------------------------------------------------------------
async def test_aac_parser():
header = b'\xff\xf0\x10\x00\x01\xa0\x00'
payload = b'\x00\x00\x00\x00\x00\x00'
data = Data(header + payload)
parser = a2dp.AacParser(data.read)
async for frame in parser.frames:
assert frame.profile == a2dp.AacFrame.Profile.MAIN
assert frame.sampling_frequency == 44100
assert frame.channel_configuration == 0
assert frame.payload == payload
# -----------------------------------------------------------------------------
async def test_aac_packet_source():
header = b'\xff\xf0\x10\x00\x01\xa0\x00'
payload = b'\x00\x00\x00\x00\x00\x00'
data = Data(header + payload)
packet_source = a2dp.AacPacketSource(data.read, 0)
async for packet in packet_source.packets:
assert packet.sequence_number == 0
assert packet.timestamp == 0
assert packet.payload == b' \x00\x12\x00\x00\x000\x00\x00\x00\x00\x00\x00'
# -----------------------------------------------------------------------------
async def test_opus_parser():
packed_header_data_revised = struct.pack(
"<QIIIB",
0, # granule_position
2, # bitstream_serial_number
2, # page_sequence_number
0, # crc_checksum
3, # page_segments
)
first_page_header_revised = (
b'OggS' # Capture pattern
+ b'\x00' # Version
+ b'\x02' # Header type
+ packed_header_data_revised
)
segment_table_revised = b'\x0a\x08\x0a'
opus_head_packet_data = b'OpusHead' + b'\x00' + b'\x00'
opus_tags_packet_data = b'OpusTags'
audio_data_packet = b'0123456789'
data = Data(
first_page_header_revised
+ segment_table_revised
+ opus_head_packet_data
+ opus_tags_packet_data
+ audio_data_packet
)
parser = a2dp.OpusParser(data.read)
async for packet in parser.packets:
assert packet.channel_mode == a2dp.OpusPacket.ChannelMode.STEREO
assert packet.payload == audio_data_packet
# -----------------------------------------------------------------------------
async def test_opus_packet_source():
packed_header_data_revised = struct.pack(
"<QIIIB",
0, # granule_position
2, # bitstream_serial_number
2, # page_sequence_number
0, # crc_checksum
3, # page_segments
)
first_page_header_revised = (
b'OggS' # Capture pattern
+ b'\x00' # Version
+ b'\x02' # Header type
+ packed_header_data_revised
)
segment_table_revised = b'\x0a\x08\x0a'
opus_head_packet_data = b'OpusHead' + b'\x00' + b'\x00'
opus_tags_packet_data = b'OpusTags'
audio_data_packet = b'0123456789'
data = Data(
first_page_header_revised
+ segment_table_revised
+ opus_head_packet_data
+ opus_tags_packet_data
+ audio_data_packet
)
parser = a2dp.OpusPacketSource(data.read, 0)
async for packet in parser.packets:
assert packet.sequence_number == 0
assert packet.timestamp == 0
assert packet.payload == b'\x01' + audio_data_packet
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def async_main(): async def async_main():
test_sbc_codec_specific_information() test_sbc_codec_specific_information()
@@ -394,6 +546,12 @@ async def async_main():
test_opus_codec_specific_information() test_opus_codec_specific_information()
await test_self_connection() await test_self_connection()
await test_source_sink_1() await test_source_sink_1()
test_sbc_parser()
test_sbc_packet_source()
test_aac_parser()
test_aac_packet_source()
test_opus_parser()
test_opus_packet_source()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------