diff --git a/tests/a2dp_test.py b/tests/a2dp_test.py index 6ed92f0a..a1fa74d6 100644 --- a/tests/a2dp_test.py +++ b/tests/a2dp_test.py @@ -18,14 +18,12 @@ import asyncio import logging import os +import struct +from typing import Awaitable import pytest -from bumble.a2dp import ( - AacMediaCodecInformation, - OpusMediaCodecInformation, - SbcMediaCodecInformation, -) +from bumble import a2dp from bumble.avdtp import ( A2DP_SBC_CODEC_TYPE, AVDTP_AUDIO_MEDIA_TYPE, @@ -82,6 +80,24 @@ class TwoDevices: self.paired[which] = keys +# ----------------------------------------------------------------------------- +class Data: + pointer: int = 0 + data: bytes + + def __init__(self, data: bytes): + self.data = data + + async def read(self, length: int) -> Awaitable[bytes]: + def generate_read(): + end = min(self.pointer + length, len(self.data)) + chunk = self.data[self.pointer : end] + self.pointer = end + return chunk + + return generate_read() + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_self_connection(): @@ -122,12 +138,12 @@ def source_codec_capabilities(): return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE, - media_codec_information=SbcMediaCodecInformation( - sampling_frequency=SbcMediaCodecInformation.SamplingFrequency.SF_44100, - channel_mode=SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, - block_length=SbcMediaCodecInformation.BlockLength.BL_16, - subbands=SbcMediaCodecInformation.Subbands.S_8, - allocation_method=SbcMediaCodecInformation.AllocationMethod.LOUDNESS, + media_codec_information=a2dp.SbcMediaCodecInformation( + sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100, + channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, + block_length=a2dp.SbcMediaCodecInformation.BlockLength.BL_16, + subbands=a2dp.SbcMediaCodecInformation.Subbands.S_8, + allocation_method=a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS, minimum_bitpool_value=2, maximum_bitpool_value=53, ), @@ -139,23 +155,23 @@ def sink_codec_capabilities(): return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, media_codec_type=A2DP_SBC_CODEC_TYPE, - media_codec_information=SbcMediaCodecInformation( - sampling_frequency=SbcMediaCodecInformation.SamplingFrequency.SF_48000 - | SbcMediaCodecInformation.SamplingFrequency.SF_44100 - | SbcMediaCodecInformation.SamplingFrequency.SF_32000 - | SbcMediaCodecInformation.SamplingFrequency.SF_16000, - channel_mode=SbcMediaCodecInformation.ChannelMode.MONO - | SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL - | SbcMediaCodecInformation.ChannelMode.STEREO - | SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, - block_length=SbcMediaCodecInformation.BlockLength.BL_4 - | SbcMediaCodecInformation.BlockLength.BL_8 - | SbcMediaCodecInformation.BlockLength.BL_12 - | SbcMediaCodecInformation.BlockLength.BL_16, - subbands=SbcMediaCodecInformation.Subbands.S_4 - | SbcMediaCodecInformation.Subbands.S_8, - allocation_method=SbcMediaCodecInformation.AllocationMethod.LOUDNESS - | SbcMediaCodecInformation.AllocationMethod.SNR, + media_codec_information=a2dp.SbcMediaCodecInformation( + sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000 + | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100 + | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_32000 + | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_16000, + channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.MONO + | a2dp.SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL + | a2dp.SbcMediaCodecInformation.ChannelMode.STEREO + | a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, + block_length=a2dp.SbcMediaCodecInformation.BlockLength.BL_4 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_8 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_12 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_16, + subbands=a2dp.SbcMediaCodecInformation.Subbands.S_4 + | a2dp.SbcMediaCodecInformation.Subbands.S_8, + allocation_method=a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS + | a2dp.SbcMediaCodecInformation.AllocationMethod.SNR, minimum_bitpool_value=2, maximum_bitpool_value=53, ), @@ -274,52 +290,54 @@ async def test_source_sink_1(): # ----------------------------------------------------------------------------- def test_sbc_codec_specific_information(): - sbc_info = SbcMediaCodecInformation.from_bytes(bytes.fromhex("3fff0235")) + sbc_info = a2dp.SbcMediaCodecInformation.from_bytes(bytes.fromhex("3fff0235")) assert ( sbc_info.sampling_frequency - == SbcMediaCodecInformation.SamplingFrequency.SF_44100 - | SbcMediaCodecInformation.SamplingFrequency.SF_48000 + == a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100 + | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000 ) assert ( sbc_info.channel_mode - == SbcMediaCodecInformation.ChannelMode.MONO - | SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL - | SbcMediaCodecInformation.ChannelMode.STEREO - | SbcMediaCodecInformation.ChannelMode.JOINT_STEREO + == a2dp.SbcMediaCodecInformation.ChannelMode.MONO + | a2dp.SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL + | a2dp.SbcMediaCodecInformation.ChannelMode.STEREO + | a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO ) assert ( sbc_info.block_length - == SbcMediaCodecInformation.BlockLength.BL_4 - | SbcMediaCodecInformation.BlockLength.BL_8 - | SbcMediaCodecInformation.BlockLength.BL_12 - | SbcMediaCodecInformation.BlockLength.BL_16 + == a2dp.SbcMediaCodecInformation.BlockLength.BL_4 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_8 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_12 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_16 ) assert ( sbc_info.subbands - == SbcMediaCodecInformation.Subbands.S_4 | SbcMediaCodecInformation.Subbands.S_8 + == a2dp.SbcMediaCodecInformation.Subbands.S_4 + | a2dp.SbcMediaCodecInformation.Subbands.S_8 ) assert ( sbc_info.allocation_method - == SbcMediaCodecInformation.AllocationMethod.SNR - | SbcMediaCodecInformation.AllocationMethod.LOUDNESS + == a2dp.SbcMediaCodecInformation.AllocationMethod.SNR + | a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS ) assert sbc_info.minimum_bitpool_value == 2 assert sbc_info.maximum_bitpool_value == 53 - sbc_info2 = SbcMediaCodecInformation( - SbcMediaCodecInformation.SamplingFrequency.SF_44100 - | SbcMediaCodecInformation.SamplingFrequency.SF_48000, - SbcMediaCodecInformation.ChannelMode.MONO - | SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL - | SbcMediaCodecInformation.ChannelMode.STEREO - | SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, - SbcMediaCodecInformation.BlockLength.BL_4 - | SbcMediaCodecInformation.BlockLength.BL_8 - | SbcMediaCodecInformation.BlockLength.BL_12 - | SbcMediaCodecInformation.BlockLength.BL_16, - SbcMediaCodecInformation.Subbands.S_4 | SbcMediaCodecInformation.Subbands.S_8, - SbcMediaCodecInformation.AllocationMethod.SNR - | SbcMediaCodecInformation.AllocationMethod.LOUDNESS, + sbc_info2 = a2dp.SbcMediaCodecInformation( + a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100 + | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000, + a2dp.SbcMediaCodecInformation.ChannelMode.MONO + | a2dp.SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL + | a2dp.SbcMediaCodecInformation.ChannelMode.STEREO + | a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, + a2dp.SbcMediaCodecInformation.BlockLength.BL_4 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_8 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_12 + | a2dp.SbcMediaCodecInformation.BlockLength.BL_16, + a2dp.SbcMediaCodecInformation.Subbands.S_4 + | a2dp.SbcMediaCodecInformation.Subbands.S_8, + a2dp.SbcMediaCodecInformation.AllocationMethod.SNR + | a2dp.SbcMediaCodecInformation.AllocationMethod.LOUDNESS, 2, 53, ) @@ -329,36 +347,36 @@ def test_sbc_codec_specific_information(): # ----------------------------------------------------------------------------- def test_aac_codec_specific_information(): - aac_info = AacMediaCodecInformation.from_bytes(bytes.fromhex("f0018c83e800")) + aac_info = a2dp.AacMediaCodecInformation.from_bytes(bytes.fromhex("f0018c83e800")) assert ( aac_info.object_type - == AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC - | AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC - | AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP - | AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE + == a2dp.AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC + | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC + | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP + | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE ) assert ( aac_info.sampling_frequency - == AacMediaCodecInformation.SamplingFrequency.SF_44100 - | AacMediaCodecInformation.SamplingFrequency.SF_48000 + == a2dp.AacMediaCodecInformation.SamplingFrequency.SF_44100 + | a2dp.AacMediaCodecInformation.SamplingFrequency.SF_48000 ) assert ( aac_info.channels - == AacMediaCodecInformation.Channels.MONO - | AacMediaCodecInformation.Channels.STEREO + == a2dp.AacMediaCodecInformation.Channels.MONO + | a2dp.AacMediaCodecInformation.Channels.STEREO ) assert aac_info.vbr == 1 assert aac_info.bitrate == 256000 - aac_info2 = AacMediaCodecInformation( - AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC - | AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC - | AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP - | AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE, - AacMediaCodecInformation.SamplingFrequency.SF_44100 - | AacMediaCodecInformation.SamplingFrequency.SF_48000, - AacMediaCodecInformation.Channels.MONO - | AacMediaCodecInformation.Channels.STEREO, + aac_info2 = a2dp.AacMediaCodecInformation( + a2dp.AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC + | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LC + | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_LTP + | a2dp.AacMediaCodecInformation.ObjectType.MPEG_4_AAC_SCALABLE, + a2dp.AacMediaCodecInformation.SamplingFrequency.SF_44100 + | a2dp.AacMediaCodecInformation.SamplingFrequency.SF_48000, + a2dp.AacMediaCodecInformation.Channels.MONO + | a2dp.AacMediaCodecInformation.Channels.STEREO, 1, 256000, ) @@ -368,25 +386,159 @@ def test_aac_codec_specific_information(): # ----------------------------------------------------------------------------- def test_opus_codec_specific_information(): - opus_info = OpusMediaCodecInformation.from_bytes(bytes([0x92])) - assert opus_info.vendor_id == OpusMediaCodecInformation.VENDOR_ID - assert opus_info.codec_id == OpusMediaCodecInformation.CODEC_ID - assert opus_info.frame_size == OpusMediaCodecInformation.FrameSize.FS_20MS - assert opus_info.channel_mode == OpusMediaCodecInformation.ChannelMode.STEREO + opus_info = a2dp.OpusMediaCodecInformation.from_bytes(bytes([0x92])) + assert opus_info.vendor_id == a2dp.OpusMediaCodecInformation.VENDOR_ID + assert opus_info.codec_id == a2dp.OpusMediaCodecInformation.CODEC_ID + assert opus_info.frame_size == a2dp.OpusMediaCodecInformation.FrameSize.FS_20MS + assert opus_info.channel_mode == a2dp.OpusMediaCodecInformation.ChannelMode.STEREO assert ( opus_info.sampling_frequency - == OpusMediaCodecInformation.SamplingFrequency.SF_48000 + == a2dp.OpusMediaCodecInformation.SamplingFrequency.SF_48000 ) - opus_info2 = OpusMediaCodecInformation( - OpusMediaCodecInformation.ChannelMode.STEREO, - OpusMediaCodecInformation.FrameSize.FS_20MS, - OpusMediaCodecInformation.SamplingFrequency.SF_48000, + opus_info2 = a2dp.OpusMediaCodecInformation( + a2dp.OpusMediaCodecInformation.ChannelMode.STEREO, + a2dp.OpusMediaCodecInformation.FrameSize.FS_20MS, + a2dp.OpusMediaCodecInformation.SamplingFrequency.SF_48000, ) assert opus_info2 == opus_info assert opus_info2.value == bytes([0x92]) +# ----------------------------------------------------------------------------- +async def test_sbc_parser(): + header = b'\x9c\x80\x08\x00' + payload = b'\x00\x00\x00\x00\x00\x00' + data = Data(header + payload) + + parser = a2dp.SbcParser(data.read) + async for frame in parser.frames: + assert frame.sampling_frequency == 44100 + assert frame.block_count == 4 + assert frame.channel_mode == 0 + assert frame.allocation_method == 0 + assert frame.subband_count == 4 + assert frame.bitpool == 8 + assert frame.payload == header + payload + + +# ----------------------------------------------------------------------------- +async def test_sbc_packet_source(): + header = b'\x9c\x80\x08\x00' + payload = b'\x00\x00\x00\x00\x00\x00' + data = Data((header + payload) * 2) + + packet_source = a2dp.SbcPacketSource(data.read, 23) + async for packet in packet_source.packets: + assert packet.sequence_number == 0 + assert packet.timestamp == 0 + assert packet.payload == b'\x01' + header + payload + + +# ----------------------------------------------------------------------------- +async def test_aac_parser(): + header = b'\xff\xf0\x10\x00\x01\xa0\x00' + payload = b'\x00\x00\x00\x00\x00\x00' + data = Data(header + payload) + + parser = a2dp.AacParser(data.read) + async for frame in parser.frames: + assert frame.profile == a2dp.AacFrame.Profile.MAIN + assert frame.sampling_frequency == 44100 + assert frame.channel_configuration == 0 + assert frame.payload == payload + + +# ----------------------------------------------------------------------------- +async def test_aac_packet_source(): + header = b'\xff\xf0\x10\x00\x01\xa0\x00' + payload = b'\x00\x00\x00\x00\x00\x00' + data = Data(header + payload) + + packet_source = a2dp.AacPacketSource(data.read, 0) + async for packet in packet_source.packets: + assert packet.sequence_number == 0 + assert packet.timestamp == 0 + assert packet.payload == b' \x00\x12\x00\x00\x000\x00\x00\x00\x00\x00\x00' + + +# ----------------------------------------------------------------------------- +async def test_opus_parser(): + packed_header_data_revised = struct.pack( + "