Migrate AVDTP packets to dataclasses

This commit is contained in:
Josh Wu
2025-10-20 16:06:14 +08:00
parent 0fa517a4f6
commit a43b403511
4 changed files with 524 additions and 380 deletions

View File

@@ -25,7 +25,6 @@ import pytest
from bumble import a2dp
from bumble.avdtp import (
A2DP_SBC_CODEC_TYPE,
AVDTP_AUDIO_MEDIA_TYPE,
AVDTP_IDLE_STATE,
AVDTP_STREAMING_STATE,
@@ -137,7 +136,7 @@ async def test_self_connection():
def source_codec_capabilities():
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_type=a2dp.CodecType.SBC,
media_codec_information=a2dp.SbcMediaCodecInformation(
sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100,
channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO,
@@ -154,7 +153,7 @@ def source_codec_capabilities():
def sink_codec_capabilities():
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_type=a2dp.CodecType.SBC,
media_codec_information=a2dp.SbcMediaCodecInformation(
sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000
| a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100

View File

@@ -15,43 +15,105 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pytest
from bumble import avdtp
from bumble.a2dp import A2DP_SBC_CODEC_TYPE
from bumble.avdtp import (
AVDTP_AUDIO_MEDIA_TYPE,
AVDTP_DELAY_REPORTING_SERVICE_CATEGORY,
AVDTP_GET_CAPABILITIES,
AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
AVDTP_SET_CONFIGURATION,
Get_Capabilities_Response,
MediaCodecCapabilities,
Message,
ServiceCapabilities,
Set_Configuration_Command,
)
from bumble.rtp import MediaPacket
# -----------------------------------------------------------------------------
def test_messages():
capabilities = [
ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=bytes.fromhex('211502fa'),
@pytest.mark.parametrize(
'message',
(
avdtp.Discover_Command(),
avdtp.Discover_Response(
endpoints=[
avdtp.EndPointInfo(
seid=1, in_use=1, media_type=avdtp.MediaType.AUDIO, tsep=1
)
]
),
ServiceCapabilities(AVDTP_DELAY_REPORTING_SERVICE_CATEGORY),
]
message = Get_Capabilities_Response(capabilities)
parsed = Message.create(
AVDTP_GET_CAPABILITIES, Message.MessageType.RESPONSE_ACCEPT, message.payload
)
assert message.payload == parsed.payload
message = Set_Configuration_Command(3, 4, capabilities)
parsed = Message.create(
AVDTP_SET_CONFIGURATION, Message.MessageType.COMMAND, message.payload
avdtp.Get_Capabilities_Command(acp_seid=1),
avdtp.Get_Capabilities_Response(
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
avdtp.MediaCodecCapabilities(
media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=bytes.fromhex('211502fa'),
),
avdtp.ServiceCapabilities(avdtp.AVDTP_DELAY_REPORTING_SERVICE_CATEGORY),
]
),
avdtp.Get_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Get_All_Capabilities_Command(acp_seid=1),
avdtp.Get_All_Capabilities_Response(
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
]
),
avdtp.Get_All_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Set_Configuration_Command(
acp_seid=1,
int_seid=2,
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
],
),
avdtp.Set_Configuration_Response(),
avdtp.Set_Configuration_Reject(
service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR,
),
avdtp.Get_Configuration_Command(acp_seid=1),
avdtp.Get_Configuration_Response(
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
]
),
avdtp.Get_Configuration_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Reconfigure_Command(
acp_seid=1,
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
],
),
avdtp.Reconfigure_Response(),
avdtp.Reconfigure_Reject(
service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR,
),
avdtp.Open_Command(acp_seid=1),
avdtp.Open_Response(),
avdtp.Open_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Start_Command(acp_seids=[1, 2]),
avdtp.Start_Response(),
avdtp.Start_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR),
avdtp.Close_Command(acp_seid=1),
avdtp.Close_Response(),
avdtp.Close_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Suspend_Command(acp_seids=[1, 2]),
avdtp.Suspend_Response(),
avdtp.Suspend_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR),
avdtp.Abort_Command(acp_seid=1),
avdtp.Abort_Response(),
avdtp.Security_Control_Command(acp_seid=1, data=b'foo'),
avdtp.Security_Control_Response(),
avdtp.Security_Control_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.General_Reject(),
avdtp.DelayReport_Command(acp_seid=1, delay=100),
avdtp.DelayReport_Response(),
avdtp.DelayReport_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
),
)
def test_messages(message: avdtp.Message):
parsed = avdtp.Message.create(
signal_identifier=message.signal_identifier,
message_type=message.message_type,
payload=message.payload,
)
assert message == parsed
assert message.payload == parsed.payload
@@ -62,9 +124,3 @@ def test_rtp():
)
media_packet = MediaPacket.from_bytes(packet)
print(media_packet)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_messages()
test_rtp()