mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
Merge pull request #799 from zxzxwu/avdtp
Migrate AVDTP packets to dataclasses
This commit is contained in:
@@ -21,11 +21,12 @@ import dataclasses
|
||||
import enum
|
||||
import logging
|
||||
import struct
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Awaitable, Callable
|
||||
from collections.abc import AsyncGenerator, Awaitable, Callable
|
||||
from typing import Union
|
||||
|
||||
from typing_extensions import ClassVar, Self
|
||||
|
||||
from bumble import utils
|
||||
from bumble.codecs import AacAudioRtpPacket
|
||||
from bumble.company_ids import COMPANY_IDENTIFIERS
|
||||
from bumble.core import (
|
||||
@@ -59,19 +60,18 @@ logger = logging.getLogger(__name__)
|
||||
# -----------------------------------------------------------------------------
|
||||
# fmt: off
|
||||
|
||||
A2DP_SBC_CODEC_TYPE = 0x00
|
||||
A2DP_MPEG_1_2_AUDIO_CODEC_TYPE = 0x01
|
||||
A2DP_MPEG_2_4_AAC_CODEC_TYPE = 0x02
|
||||
A2DP_ATRAC_FAMILY_CODEC_TYPE = 0x03
|
||||
A2DP_NON_A2DP_CODEC_TYPE = 0xFF
|
||||
class CodecType(utils.OpenIntEnum):
|
||||
SBC = 0x00
|
||||
MPEG_1_2_AUDIO = 0x01
|
||||
MPEG_2_4_AAC = 0x02
|
||||
ATRAC_FAMILY = 0x03
|
||||
NON_A2DP = 0xFF
|
||||
|
||||
A2DP_CODEC_TYPE_NAMES = {
|
||||
A2DP_SBC_CODEC_TYPE: 'A2DP_SBC_CODEC_TYPE',
|
||||
A2DP_MPEG_1_2_AUDIO_CODEC_TYPE: 'A2DP_MPEG_1_2_AUDIO_CODEC_TYPE',
|
||||
A2DP_MPEG_2_4_AAC_CODEC_TYPE: 'A2DP_MPEG_2_4_AAC_CODEC_TYPE',
|
||||
A2DP_ATRAC_FAMILY_CODEC_TYPE: 'A2DP_ATRAC_FAMILY_CODEC_TYPE',
|
||||
A2DP_NON_A2DP_CODEC_TYPE: 'A2DP_NON_A2DP_CODEC_TYPE'
|
||||
}
|
||||
A2DP_SBC_CODEC_TYPE = CodecType.SBC
|
||||
A2DP_MPEG_1_2_AUDIO_CODEC_TYPE = CodecType.MPEG_1_2_AUDIO
|
||||
A2DP_MPEG_2_4_AAC_CODEC_TYPE = CodecType.MPEG_2_4_AAC
|
||||
A2DP_ATRAC_FAMILY_CODEC_TYPE = CodecType.ATRAC_FAMILY
|
||||
A2DP_NON_A2DP_CODEC_TYPE = CodecType.NON_A2DP
|
||||
|
||||
|
||||
SBC_SYNC_WORD = 0x9C
|
||||
@@ -259,9 +259,48 @@ def make_audio_sink_service_sdp_records(service_record_handle, version=(1, 3)):
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class MediaCodecInformation:
|
||||
'''Base Media Codec Information.'''
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, media_codec_type: int, data: bytes
|
||||
) -> Union[MediaCodecInformation, bytes]:
|
||||
if media_codec_type == CodecType.SBC:
|
||||
return SbcMediaCodecInformation.from_bytes(data)
|
||||
elif media_codec_type == CodecType.MPEG_2_4_AAC:
|
||||
return AacMediaCodecInformation.from_bytes(data)
|
||||
elif media_codec_type == CodecType.NON_A2DP:
|
||||
vendor_media_codec_information = (
|
||||
VendorSpecificMediaCodecInformation.from_bytes(data)
|
||||
)
|
||||
if (
|
||||
vendor_class_map := A2DP_VENDOR_MEDIA_CODEC_INFORMATION_CLASSES.get(
|
||||
vendor_media_codec_information.vendor_id
|
||||
)
|
||||
) and (
|
||||
media_codec_information_class := vendor_class_map.get(
|
||||
vendor_media_codec_information.codec_id
|
||||
)
|
||||
):
|
||||
return media_codec_information_class.from_bytes(
|
||||
vendor_media_codec_information.value
|
||||
)
|
||||
return vendor_media_codec_information
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
del data # Unused.
|
||||
raise NotImplementedError
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclasses.dataclass
|
||||
class SbcMediaCodecInformation:
|
||||
class SbcMediaCodecInformation(MediaCodecInformation):
|
||||
'''
|
||||
A2DP spec - 4.3.2 Codec Specific Information Elements
|
||||
'''
|
||||
@@ -345,7 +384,7 @@ class SbcMediaCodecInformation:
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclasses.dataclass
|
||||
class AacMediaCodecInformation:
|
||||
class AacMediaCodecInformation(MediaCodecInformation):
|
||||
'''
|
||||
A2DP spec - 4.5.2 Codec Specific Information Elements
|
||||
'''
|
||||
@@ -427,7 +466,7 @@ class AacMediaCodecInformation:
|
||||
|
||||
@dataclasses.dataclass
|
||||
# -----------------------------------------------------------------------------
|
||||
class VendorSpecificMediaCodecInformation:
|
||||
class VendorSpecificMediaCodecInformation(MediaCodecInformation):
|
||||
'''
|
||||
A2DP spec - 4.7.2 Codec Specific Information Elements
|
||||
'''
|
||||
|
||||
698
bumble/avdtp.py
698
bumble/avdtp.py
File diff suppressed because it is too large
Load Diff
@@ -25,7 +25,6 @@ import pytest
|
||||
|
||||
from bumble import a2dp
|
||||
from bumble.avdtp import (
|
||||
A2DP_SBC_CODEC_TYPE,
|
||||
AVDTP_AUDIO_MEDIA_TYPE,
|
||||
AVDTP_IDLE_STATE,
|
||||
AVDTP_STREAMING_STATE,
|
||||
@@ -137,7 +136,7 @@ async def test_self_connection():
|
||||
def source_codec_capabilities():
|
||||
return MediaCodecCapabilities(
|
||||
media_type=AVDTP_AUDIO_MEDIA_TYPE,
|
||||
media_codec_type=A2DP_SBC_CODEC_TYPE,
|
||||
media_codec_type=a2dp.CodecType.SBC,
|
||||
media_codec_information=a2dp.SbcMediaCodecInformation(
|
||||
sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100,
|
||||
channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO,
|
||||
@@ -154,7 +153,7 @@ def source_codec_capabilities():
|
||||
def sink_codec_capabilities():
|
||||
return MediaCodecCapabilities(
|
||||
media_type=AVDTP_AUDIO_MEDIA_TYPE,
|
||||
media_codec_type=A2DP_SBC_CODEC_TYPE,
|
||||
media_codec_type=a2dp.CodecType.SBC,
|
||||
media_codec_information=a2dp.SbcMediaCodecInformation(
|
||||
sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000
|
||||
| a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100
|
||||
|
||||
@@ -15,43 +15,105 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import pytest
|
||||
|
||||
from bumble import avdtp
|
||||
from bumble.a2dp import A2DP_SBC_CODEC_TYPE
|
||||
from bumble.avdtp import (
|
||||
AVDTP_AUDIO_MEDIA_TYPE,
|
||||
AVDTP_DELAY_REPORTING_SERVICE_CATEGORY,
|
||||
AVDTP_GET_CAPABILITIES,
|
||||
AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
|
||||
AVDTP_SET_CONFIGURATION,
|
||||
Get_Capabilities_Response,
|
||||
MediaCodecCapabilities,
|
||||
Message,
|
||||
ServiceCapabilities,
|
||||
Set_Configuration_Command,
|
||||
)
|
||||
from bumble.rtp import MediaPacket
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_messages():
|
||||
capabilities = [
|
||||
ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
|
||||
MediaCodecCapabilities(
|
||||
media_type=AVDTP_AUDIO_MEDIA_TYPE,
|
||||
media_codec_type=A2DP_SBC_CODEC_TYPE,
|
||||
media_codec_information=bytes.fromhex('211502fa'),
|
||||
@pytest.mark.parametrize(
|
||||
'message',
|
||||
(
|
||||
avdtp.Discover_Command(),
|
||||
avdtp.Discover_Response(
|
||||
endpoints=[
|
||||
avdtp.EndPointInfo(
|
||||
seid=1, in_use=1, media_type=avdtp.MediaType.AUDIO, tsep=1
|
||||
)
|
||||
]
|
||||
),
|
||||
ServiceCapabilities(AVDTP_DELAY_REPORTING_SERVICE_CATEGORY),
|
||||
]
|
||||
message = Get_Capabilities_Response(capabilities)
|
||||
parsed = Message.create(
|
||||
AVDTP_GET_CAPABILITIES, Message.MessageType.RESPONSE_ACCEPT, message.payload
|
||||
)
|
||||
assert message.payload == parsed.payload
|
||||
|
||||
message = Set_Configuration_Command(3, 4, capabilities)
|
||||
parsed = Message.create(
|
||||
AVDTP_SET_CONFIGURATION, Message.MessageType.COMMAND, message.payload
|
||||
avdtp.Get_Capabilities_Command(acp_seid=1),
|
||||
avdtp.Get_Capabilities_Response(
|
||||
capabilities=[
|
||||
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
|
||||
avdtp.MediaCodecCapabilities(
|
||||
media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE,
|
||||
media_codec_type=A2DP_SBC_CODEC_TYPE,
|
||||
media_codec_information=bytes.fromhex('211502fa'),
|
||||
),
|
||||
avdtp.ServiceCapabilities(avdtp.AVDTP_DELAY_REPORTING_SERVICE_CATEGORY),
|
||||
]
|
||||
),
|
||||
avdtp.Get_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
|
||||
avdtp.Get_All_Capabilities_Command(acp_seid=1),
|
||||
avdtp.Get_All_Capabilities_Response(
|
||||
capabilities=[
|
||||
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
|
||||
]
|
||||
),
|
||||
avdtp.Get_All_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
|
||||
avdtp.Set_Configuration_Command(
|
||||
acp_seid=1,
|
||||
int_seid=2,
|
||||
capabilities=[
|
||||
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
|
||||
],
|
||||
),
|
||||
avdtp.Set_Configuration_Response(),
|
||||
avdtp.Set_Configuration_Reject(
|
||||
service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
|
||||
error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR,
|
||||
),
|
||||
avdtp.Get_Configuration_Command(acp_seid=1),
|
||||
avdtp.Get_Configuration_Response(
|
||||
capabilities=[
|
||||
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
|
||||
]
|
||||
),
|
||||
avdtp.Get_Configuration_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
|
||||
avdtp.Reconfigure_Command(
|
||||
acp_seid=1,
|
||||
capabilities=[
|
||||
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
|
||||
],
|
||||
),
|
||||
avdtp.Reconfigure_Response(),
|
||||
avdtp.Reconfigure_Reject(
|
||||
service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
|
||||
error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR,
|
||||
),
|
||||
avdtp.Open_Command(acp_seid=1),
|
||||
avdtp.Open_Response(),
|
||||
avdtp.Open_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
|
||||
avdtp.Start_Command(acp_seids=[1, 2]),
|
||||
avdtp.Start_Response(),
|
||||
avdtp.Start_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR),
|
||||
avdtp.Close_Command(acp_seid=1),
|
||||
avdtp.Close_Response(),
|
||||
avdtp.Close_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
|
||||
avdtp.Suspend_Command(acp_seids=[1, 2]),
|
||||
avdtp.Suspend_Response(),
|
||||
avdtp.Suspend_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR),
|
||||
avdtp.Abort_Command(acp_seid=1),
|
||||
avdtp.Abort_Response(),
|
||||
avdtp.Security_Control_Command(acp_seid=1, data=b'foo'),
|
||||
avdtp.Security_Control_Response(),
|
||||
avdtp.Security_Control_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
|
||||
avdtp.General_Reject(),
|
||||
avdtp.DelayReport_Command(acp_seid=1, delay=100),
|
||||
avdtp.DelayReport_Response(),
|
||||
avdtp.DelayReport_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
|
||||
),
|
||||
)
|
||||
def test_messages(message: avdtp.Message):
|
||||
parsed = avdtp.Message.create(
|
||||
signal_identifier=message.signal_identifier,
|
||||
message_type=message.message_type,
|
||||
payload=message.payload,
|
||||
)
|
||||
assert message == parsed
|
||||
assert message.payload == parsed.payload
|
||||
|
||||
|
||||
@@ -62,9 +124,3 @@ def test_rtp():
|
||||
)
|
||||
media_packet = MediaPacket.from_bytes(packet)
|
||||
print(media_packet)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
test_messages()
|
||||
test_rtp()
|
||||
|
||||
Reference in New Issue
Block a user