diff --git a/bumble/a2dp.py b/bumble/a2dp.py index 4af6d335..7b1c8641 100644 --- a/bumble/a2dp.py +++ b/bumble/a2dp.py @@ -21,11 +21,12 @@ import dataclasses import enum import logging import struct -from collections.abc import AsyncGenerator -from typing import Awaitable, Callable +from collections.abc import AsyncGenerator, Awaitable, Callable +from typing import Union from typing_extensions import ClassVar, Self +from bumble import utils from bumble.codecs import AacAudioRtpPacket from bumble.company_ids import COMPANY_IDENTIFIERS from bumble.core import ( @@ -59,19 +60,18 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # fmt: off -A2DP_SBC_CODEC_TYPE = 0x00 -A2DP_MPEG_1_2_AUDIO_CODEC_TYPE = 0x01 -A2DP_MPEG_2_4_AAC_CODEC_TYPE = 0x02 -A2DP_ATRAC_FAMILY_CODEC_TYPE = 0x03 -A2DP_NON_A2DP_CODEC_TYPE = 0xFF +class CodecType(utils.OpenIntEnum): + SBC = 0x00 + MPEG_1_2_AUDIO = 0x01 + MPEG_2_4_AAC = 0x02 + ATRAC_FAMILY = 0x03 + NON_A2DP = 0xFF -A2DP_CODEC_TYPE_NAMES = { - A2DP_SBC_CODEC_TYPE: 'A2DP_SBC_CODEC_TYPE', - A2DP_MPEG_1_2_AUDIO_CODEC_TYPE: 'A2DP_MPEG_1_2_AUDIO_CODEC_TYPE', - A2DP_MPEG_2_4_AAC_CODEC_TYPE: 'A2DP_MPEG_2_4_AAC_CODEC_TYPE', - A2DP_ATRAC_FAMILY_CODEC_TYPE: 'A2DP_ATRAC_FAMILY_CODEC_TYPE', - A2DP_NON_A2DP_CODEC_TYPE: 'A2DP_NON_A2DP_CODEC_TYPE' -} +A2DP_SBC_CODEC_TYPE = CodecType.SBC +A2DP_MPEG_1_2_AUDIO_CODEC_TYPE = CodecType.MPEG_1_2_AUDIO +A2DP_MPEG_2_4_AAC_CODEC_TYPE = CodecType.MPEG_2_4_AAC +A2DP_ATRAC_FAMILY_CODEC_TYPE = CodecType.ATRAC_FAMILY +A2DP_NON_A2DP_CODEC_TYPE = CodecType.NON_A2DP SBC_SYNC_WORD = 0x9C @@ -259,9 +259,48 @@ def make_audio_sink_service_sdp_records(service_record_handle, version=(1, 3)): ] +# ----------------------------------------------------------------------------- +class MediaCodecInformation: + '''Base Media Codec Information.''' + + @classmethod + def create( + cls, media_codec_type: int, data: bytes + ) -> Union[MediaCodecInformation, bytes]: + if media_codec_type == CodecType.SBC: + return SbcMediaCodecInformation.from_bytes(data) + elif media_codec_type == CodecType.MPEG_2_4_AAC: + return AacMediaCodecInformation.from_bytes(data) + elif media_codec_type == CodecType.NON_A2DP: + vendor_media_codec_information = ( + VendorSpecificMediaCodecInformation.from_bytes(data) + ) + if ( + vendor_class_map := A2DP_VENDOR_MEDIA_CODEC_INFORMATION_CLASSES.get( + vendor_media_codec_information.vendor_id + ) + ) and ( + media_codec_information_class := vendor_class_map.get( + vendor_media_codec_information.codec_id + ) + ): + return media_codec_information_class.from_bytes( + vendor_media_codec_information.value + ) + return vendor_media_codec_information + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + del data # Unused. + raise NotImplementedError + + def __bytes__(self) -> bytes: + raise NotImplementedError + + # ----------------------------------------------------------------------------- @dataclasses.dataclass -class SbcMediaCodecInformation: +class SbcMediaCodecInformation(MediaCodecInformation): ''' A2DP spec - 4.3.2 Codec Specific Information Elements ''' @@ -345,7 +384,7 @@ class SbcMediaCodecInformation: # ----------------------------------------------------------------------------- @dataclasses.dataclass -class AacMediaCodecInformation: +class AacMediaCodecInformation(MediaCodecInformation): ''' A2DP spec - 4.5.2 Codec Specific Information Elements ''' @@ -427,7 +466,7 @@ class AacMediaCodecInformation: @dataclasses.dataclass # ----------------------------------------------------------------------------- -class VendorSpecificMediaCodecInformation: +class VendorSpecificMediaCodecInformation(MediaCodecInformation): ''' A2DP spec - 4.7.2 Codec Specific Information Elements ''' diff --git a/bumble/avdtp.py b/bumble/avdtp.py index d2fc7b7c..ebc264f4 100644 --- a/bumble/avdtp.py +++ b/bumble/avdtp.py @@ -22,33 +22,23 @@ import enum import logging import time import warnings +from collections.abc import AsyncGenerator, Awaitable, Iterable +from dataclasses import dataclass, field from typing import ( Any, - AsyncGenerator, - Awaitable, Callable, - Iterable, + ClassVar, Optional, SupportsBytes, + TypeVar, Union, cast, ) -from bumble import device, l2cap, sdp, utils -from bumble.a2dp import ( - A2DP_CODEC_TYPE_NAMES, - A2DP_MPEG_2_4_AAC_CODEC_TYPE, - A2DP_NON_A2DP_CODEC_TYPE, - A2DP_SBC_CODEC_TYPE, - A2DP_VENDOR_MEDIA_CODEC_INFORMATION_CLASSES, - AacMediaCodecInformation, - SbcMediaCodecInformation, - VendorSpecificMediaCodecInformation, -) +from bumble import a2dp, device, hci, l2cap, sdp, utils from bumble.colors import color from bumble.core import ( BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE, - InvalidArgumentError, InvalidStateError, ProtocolError, name_or_number, @@ -102,22 +92,6 @@ AVDTP_SIGNAL_NAMES = { AVDTP_DELAYREPORT: 'AVDTP_DELAYREPORT' } -AVDTP_SIGNAL_IDENTIFIERS = { - 'AVDTP_DISCOVER': AVDTP_DISCOVER, - 'AVDTP_GET_CAPABILITIES': AVDTP_GET_CAPABILITIES, - 'AVDTP_SET_CONFIGURATION': AVDTP_SET_CONFIGURATION, - 'AVDTP_GET_CONFIGURATION': AVDTP_GET_CONFIGURATION, - 'AVDTP_RECONFIGURE': AVDTP_RECONFIGURE, - 'AVDTP_OPEN': AVDTP_OPEN, - 'AVDTP_START': AVDTP_START, - 'AVDTP_CLOSE': AVDTP_CLOSE, - 'AVDTP_SUSPEND': AVDTP_SUSPEND, - 'AVDTP_ABORT': AVDTP_ABORT, - 'AVDTP_SECURITY_CONTROL': AVDTP_SECURITY_CONTROL, - 'AVDTP_GET_ALL_CAPABILITIES': AVDTP_GET_ALL_CAPABILITIES, - 'AVDTP_DELAYREPORT': AVDTP_DELAYREPORT -} - # Error codes (AVDTP spec - 8.20.6.2 ERROR_CODE tables) AVDTP_BAD_HEADER_FORMAT_ERROR = 0x01 AVDTP_BAD_LENGTH_ERROR = 0x11 @@ -157,15 +131,14 @@ AVDTP_ERROR_NAMES = { AVDTP_BAD_STATE_ERROR: 'AVDTP_BAD_STATE_ERROR' } -AVDTP_AUDIO_MEDIA_TYPE = 0x00 -AVDTP_VIDEO_MEDIA_TYPE = 0x01 -AVDTP_MULTIMEDIA_MEDIA_TYPE = 0x02 +class MediaType(utils.OpenIntEnum): + AUDIO = 0x00 + VIDEO = 0x01 + MULTIMEDIA = 0x02 -AVDTP_MEDIA_TYPE_NAMES = { - AVDTP_AUDIO_MEDIA_TYPE: 'AVDTP_AUDIO_MEDIA_TYPE', - AVDTP_VIDEO_MEDIA_TYPE: 'AVDTP_VIDEO_MEDIA_TYPE', - AVDTP_MULTIMEDIA_MEDIA_TYPE: 'AVDTP_MULTIMEDIA_MEDIA_TYPE' -} +AVDTP_AUDIO_MEDIA_TYPE = MediaType.AUDIO +AVDTP_VIDEO_MEDIA_TYPE = MediaType.VIDEO +AVDTP_MULTIMEDIA_MEDIA_TYPE = MediaType.MULTIMEDIA # TSEP (AVDTP spec - 8.20.3 Stream End-point Type, Source or Sink (TSEP)) AVDTP_TSEP_SRC = 0x00 @@ -440,150 +413,126 @@ class MessageAssembler: # ----------------------------------------------------------------------------- +@dataclass class ServiceCapabilities: - @staticmethod + METADATA = hci.metadata( + { + 'parser': lambda data, offset: ( + len(data), + ServiceCapabilities.parse_capabilities(data[offset:]), + ), + 'serializer': lambda capabilities: ServiceCapabilities.serialize_capabilities( + capabilities + ), + } + ) + service_category: int + service_capabilities_bytes: bytes = b'' + + @classmethod def create( - service_category: int, service_capabilities_bytes: bytes + cls, service_category: int, service_capabilities_bytes: bytes ) -> ServiceCapabilities: # Select the appropriate subclass - cls: type[ServiceCapabilities] if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY: - cls = MediaCodecCapabilities - else: - cls = ServiceCapabilities + return MediaCodecCapabilities.from_bytes(service_capabilities_bytes) + return ServiceCapabilities( + service_category=service_category, + service_capabilities_bytes=service_capabilities_bytes, + ) - # Create an instance and initialize it - instance = cls.__new__(cls) - instance.service_category = service_category - instance.service_capabilities_bytes = service_capabilities_bytes - instance.init_from_bytes() - - return instance - - @staticmethod - def parse_capabilities(payload: bytes) -> list[ServiceCapabilities]: + @classmethod + def parse_capabilities(cls, payload: bytes) -> list[ServiceCapabilities]: capabilities = [] - while payload: - service_category = payload[0] - length_of_service_capabilities = payload[1] - service_capabilities_bytes = payload[2 : 2 + length_of_service_capabilities] + offset = 0 + while offset < len(payload): + service_category = payload[offset] + length_of_service_capabilities = payload[offset + 1] + service_capabilities_bytes = payload[ + offset + 2 : offset + 2 + length_of_service_capabilities + ] capabilities.append( ServiceCapabilities.create(service_category, service_capabilities_bytes) ) - - payload = payload[2 + length_of_service_capabilities :] + offset += 2 + length_of_service_capabilities return capabilities - @staticmethod - def serialize_capabilities(capabilities: Iterable[ServiceCapabilities]) -> bytes: - serialized = b'' - for item in capabilities: - serialized += ( - bytes([item.service_category, len(item.service_capabilities_bytes)]) - + item.service_capabilities_bytes - ) - return serialized - - def init_from_bytes(self) -> None: - pass - - def __init__( - self, service_category: int, service_capabilities_bytes: bytes = b'' - ) -> None: - self.service_category = service_category - self.service_capabilities_bytes = service_capabilities_bytes - - def to_string(self, details: Optional[list[str]] = None) -> str: - attributes = ','.join( - [name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)] - + (details or []) + @classmethod + def serialize_capabilities( + cls, capabilities: Iterable[ServiceCapabilities] + ) -> bytes: + return b''.join( + bytes([item.service_category, len(item.service_capabilities_bytes)]) + + item.service_capabilities_bytes + for item in capabilities ) - return f'ServiceCapabilities({attributes})' - - def __str__(self) -> str: - if self.service_capabilities_bytes: - details = [self.service_capabilities_bytes.hex()] - else: - details = [] - return self.to_string(details) # ----------------------------------------------------------------------------- +@dataclass(init=False) class MediaCodecCapabilities(ServiceCapabilities): + service_category = AVDTP_MEDIA_CODEC_SERVICE_CATEGORY + # Redeclare this attribute to suppress inheritance error. + service_capabilities_bytes: bytes + + media_type: MediaType + media_codec_type: a2dp.CodecType media_codec_information: Union[bytes, SupportsBytes] - media_type: int - media_codec_type: int - - def init_from_bytes(self) -> None: - self.media_type = self.service_capabilities_bytes[0] - self.media_codec_type = self.service_capabilities_bytes[1] - self.media_codec_information = self.service_capabilities_bytes[2:] - - if self.media_codec_type == A2DP_SBC_CODEC_TYPE: - self.media_codec_information = SbcMediaCodecInformation.from_bytes( - self.media_codec_information - ) - elif self.media_codec_type == A2DP_MPEG_2_4_AAC_CODEC_TYPE: - self.media_codec_information = AacMediaCodecInformation.from_bytes( - self.media_codec_information - ) - elif self.media_codec_type == A2DP_NON_A2DP_CODEC_TYPE: - vendor_media_codec_information = ( - VendorSpecificMediaCodecInformation.from_bytes( - self.media_codec_information - ) - ) - if ( - vendor_class_map := A2DP_VENDOR_MEDIA_CODEC_INFORMATION_CLASSES.get( - vendor_media_codec_information.vendor_id - ) - ) and ( - media_codec_information_class := vendor_class_map.get( - vendor_media_codec_information.codec_id - ) - ): - self.media_codec_information = media_codec_information_class.from_bytes( - vendor_media_codec_information.value - ) - else: - self.media_codec_information = vendor_media_codec_information + # Override init to allow passing service_capabilities_bytes. def __init__( self, - media_type: int, - media_codec_type: int, + media_type: MediaType, + media_codec_type: a2dp.CodecType, media_codec_information: Union[bytes, SupportsBytes], + service_capabilities_bytes: Optional[bytes] = None, ) -> None: - super().__init__( - AVDTP_MEDIA_CODEC_SERVICE_CATEGORY, - bytes([media_type, media_codec_type]) + bytes(media_codec_information), - ) self.media_type = media_type self.media_codec_type = media_codec_type - self.media_codec_information = media_codec_information - def __str__(self) -> str: - codec_info = ( - self.media_codec_information.hex() - if isinstance(self.media_codec_information, bytes) - else str(self.media_codec_information) + if isinstance(media_codec_information, bytes): + self.media_codec_information = a2dp.MediaCodecInformation.create( + media_codec_type, media_codec_information + ) + else: + self.media_codec_information = media_codec_information + + if service_capabilities_bytes is not None: + self.service_capabilities_bytes = service_capabilities_bytes + else: + self.service_capabilities_bytes = bytes( + [self.media_type, self.media_codec_type] + ) + bytes(self.media_codec_information) + + @classmethod + def from_bytes(cls, data: bytes) -> ServiceCapabilities: + media_type = MediaType(data[0]) + media_codec_type = a2dp.CodecType(data[1]) + return cls( + media_type=media_type, + media_codec_type=media_codec_type, + media_codec_information=a2dp.MediaCodecInformation.create( + media_codec_type, data[2:] + ), ) - details = [ - f'media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}', - f'codec={name_or_number(A2DP_CODEC_TYPE_NAMES, self.media_codec_type)}', - f'codec_info={codec_info}', - ] - return self.to_string(details) - # ----------------------------------------------------------------------------- +@dataclass class EndPointInfo: - @staticmethod - def from_bytes(payload: bytes) -> EndPointInfo: - return EndPointInfo( - payload[0] >> 2, payload[0] >> 1 & 1, payload[1] >> 4, payload[1] >> 3 & 1 + seid: int + in_use: int + media_type: MediaType + tsep: int + + @classmethod + def from_bytes(cls, payload: bytes) -> EndPointInfo: + return cls( + seid=payload[0] >> 2, + in_use=payload[0] >> 1 & 1, + media_type=MediaType(payload[1] >> 4), + tsep=payload[1] >> 3 & 1, ) def __bytes__(self) -> bytes: @@ -591,94 +540,78 @@ class EndPointInfo: [self.seid << 2 | self.in_use << 1, self.media_type << 4 | self.tsep << 3] ) - def __init__(self, seid: int, in_use: int, media_type: int, tsep: int) -> None: - self.seid = seid - self.in_use = in_use - self.media_type = media_type - self.tsep = tsep - # ----------------------------------------------------------------------------- -class Message: # pylint:disable=attribute-defined-outside-init +class Message: class MessageType(enum.IntEnum): COMMAND = 0 GENERAL_REJECT = 1 RESPONSE_ACCEPT = 2 RESPONSE_REJECT = 3 + SEID_METADATA = hci.metadata( + { + 'serializer': lambda seid: bytes([seid << 2]), + 'parser': lambda data, offset: (offset + 1, data[offset] >> 2), + } + ) + # Subclasses, by signal identifier and message type - subclasses: dict[int, dict[int, type[Message]]] = {} + subclasses: ClassVar[dict[int, dict[int, type[Message]]]] = {} + message_type: MessageType signal_identifier: int + _payload: Optional[bytes] = None + fields: ClassVar[hci.Fields] = () - @staticmethod - def subclass(subclass): - # Infer the signal identifier and message subtype from the class name - name = subclass.__name__ - if name == 'General_Reject': - subclass.signal_identifier = 0 - signal_identifier_str = None - message_type = Message.MessageType.COMMAND - elif name.endswith('_Command'): - signal_identifier_str = name[:-8] - message_type = Message.MessageType.COMMAND - elif name.endswith('_Response'): - signal_identifier_str = name[:-9] - message_type = Message.MessageType.RESPONSE_ACCEPT - elif name.endswith('_Reject'): - signal_identifier_str = name[:-7] - message_type = Message.MessageType.RESPONSE_REJECT - else: - raise InvalidArgumentError('invalid class name') + @property + def payload(self) -> bytes: + if self._payload is None: + self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields) + return self._payload - subclass.message_type = message_type + @payload.setter + def payload(self, payload: bytes) -> None: + self._payload = payload - if signal_identifier_str is not None: - for name, signal_identifier in AVDTP_SIGNAL_IDENTIFIERS.items(): - if name.lower().endswith(signal_identifier_str.lower()): - subclass.signal_identifier = signal_identifier - break - - # Register the subclass - Message.subclasses.setdefault(subclass.signal_identifier, {})[ - subclass.message_type - ] = subclass + _Message = TypeVar("_Message", bound="Message") + @classmethod + def subclass(cls, subclass: type[_Message]) -> type[_Message]: + cls.subclasses.setdefault(subclass.signal_identifier, {})[ + subclass.message_type + ] = subclass + subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass) return subclass # Factory method to create a subclass based on the signal identifier and message # type - @staticmethod + @classmethod def create( - signal_identifier: int, message_type: MessageType, payload: bytes + cls, signal_identifier: int, message_type: MessageType, payload: bytes ) -> Message: + instance: Message # Look for a registered subclass - subclasses = Message.subclasses.get(signal_identifier) - if subclasses: - subclass = subclasses.get(message_type) - if subclass: - instance = subclass.__new__(subclass) - instance.payload = payload - instance.init_from_payload() - return instance + if (subclasses := Message.subclasses.get(signal_identifier)) and ( + subclass := subclasses.get(message_type) + ): + instance = subclass( + **hci.HCI_Object.dict_from_bytes(payload, 0, subclass.fields), + ) + instance.payload = payload + return instance # Instantiate the appropriate class based on the message type if message_type == Message.MessageType.RESPONSE_REJECT: # Assume a simple reject message - instance = Simple_Reject(payload) - instance.init_from_payload() + instance = Simple_Reject(payload[0]) else: - instance = Message(payload) + instance = Message() + instance.payload = payload + instance.message_type = message_type instance.signal_identifier = signal_identifier - instance.message_type = message_type return instance - def init_from_payload(self) -> None: - pass - - def __init__(self, payload: bytes = b'') -> None: - self.payload = payload - def to_string(self, details: Union[str, Iterable[str]]) -> str: base = color( f'{name_or_number(AVDTP_SIGNAL_NAMES, self.signal_identifier)}_' @@ -703,34 +636,30 @@ class Message: # pylint:disable=attribute-defined-outside-init # ----------------------------------------------------------------------------- +@dataclass class Simple_Command(Message): ''' Command message with just one seid ''' - def init_from_payload(self): - self.acp_seid = self.payload[0] >> 2 + message_type = Message.MessageType.COMMAND - def __init__(self, seid): - super().__init__(payload=bytes([seid << 2])) - self.acp_seid = seid + acp_seid: int = field(metadata=Message.SEID_METADATA) def __str__(self) -> str: return self.to_string([f'ACP SEID: {self.acp_seid}']) # ----------------------------------------------------------------------------- +@dataclass class Simple_Reject(Message): ''' Reject messages with just an error code ''' - def init_from_payload(self): - self.error_code = self.payload[0] + message_type = Message.MessageType.RESPONSE_REJECT - def __init__(self, error_code): - super().__init__(payload=bytes([error_code])) - self.error_code = error_code + error_code: int = field(metadata=hci.metadata(1)) def __str__(self) -> str: details = [f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'] @@ -739,32 +668,52 @@ class Simple_Reject(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Discover_Command(Message): ''' See Bluetooth AVDTP spec - 8.6.1 Stream End Point Discovery Command ''' + signal_identifier = AVDTP_DISCOVER + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Discover_Response(Message): ''' See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response ''' - endpoints: list[EndPointInfo] + signal_identifier = AVDTP_DISCOVER + message_type = Message.MessageType.RESPONSE_ACCEPT - def init_from_payload(self): - self.endpoints = [] - endpoint_count = len(self.payload) // 2 - for i in range(endpoint_count): - self.endpoints.append( - EndPointInfo.from_bytes(self.payload[i * 2 : (i + 1) * 2]) - ) + @classmethod + def parse_endpoints( + cls, data: bytes, offset: int + ) -> tuple[int, list[EndPointInfo]]: + return len(data), [ + EndPointInfo.from_bytes(data[i * 2 : (i + 1) * 2]) + for i in range(offset, len(data) // 2) + ] - def __init__(self, endpoints): - super().__init__(payload=b''.join([bytes(endpoint) for endpoint in endpoints])) - self.endpoints = endpoints + @classmethod + def serialize_endpoints(cls, endpoints: Iterable[EndPointInfo]) -> bytes: + return b''.join([bytes(endpoint) for endpoint in endpoints]) + + endpoints: Iterable[EndPointInfo] = field( + metadata=hci.metadata( + { + 'parser': lambda data, offset: Discover_Response.parse_endpoints( + data, offset + ), + 'serializer': lambda endpoints: Discover_Response.serialize_endpoints( + endpoints + ), + } + ) + ) def __str__(self) -> str: details = [] @@ -774,7 +723,7 @@ class Discover_Response(Message): [ f'ACP SEID: {endpoint.seid}', f' in_use: {endpoint.in_use}', - f' media_type: {name_or_number(AVDTP_MEDIA_TYPE_NAMES, endpoint.media_type)}', + f' media_type: {endpoint.media_type.name}', f' tsep: {name_or_number(AVDTP_TSEP_NAMES, endpoint.tsep)}', ] ) @@ -783,27 +732,30 @@ class Discover_Response(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_Capabilities_Command(Simple_Command): ''' See Bluetooth AVDTP spec - 8.7.1 Get Capabilities Command ''' + signal_identifier = AVDTP_GET_CAPABILITIES + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_Capabilities_Response(Message): ''' See Bluetooth AVDTP spec - 8.7.2 Get All Capabilities Response ''' - def init_from_payload(self): - self.capabilities = ServiceCapabilities.parse_capabilities(self.payload) + signal_identifier = AVDTP_GET_CAPABILITIES + message_type = Message.MessageType.RESPONSE_ACCEPT - def __init__(self, capabilities): - super().__init__( - payload=ServiceCapabilities.serialize_capabilities(capabilities) - ) - self.capabilities = capabilities + capabilities: Iterable[ServiceCapabilities] = field( + metadata=ServiceCapabilities.METADATA + ) def __str__(self) -> str: details = [str(capability) for capability in self.capabilities] @@ -812,58 +764,68 @@ class Get_Capabilities_Response(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_Capabilities_Reject(Simple_Reject): ''' See Bluetooth AVDTP spec - 8.7.3 Get Capabilities Reject ''' + signal_identifier = AVDTP_GET_CAPABILITIES + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_All_Capabilities_Command(Get_Capabilities_Command): ''' See Bluetooth AVDTP spec - 8.8.1 Get All Capabilities Command ''' + signal_identifier = AVDTP_GET_ALL_CAPABILITIES + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_All_Capabilities_Response(Get_Capabilities_Response): ''' See Bluetooth AVDTP spec - 8.8.2 Get All Capabilities Response ''' + signal_identifier = AVDTP_GET_ALL_CAPABILITIES + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_All_Capabilities_Reject(Simple_Reject): ''' See Bluetooth AVDTP spec - 8.8.3 Get All Capabilities Reject ''' + signal_identifier = AVDTP_GET_ALL_CAPABILITIES + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Set_Configuration_Command(Message): ''' See Bluetooth AVDTP spec - 8.9.1 Set Configuration Command ''' - def init_from_payload(self): - self.acp_seid = self.payload[0] >> 2 - self.int_seid = self.payload[1] >> 2 - self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[2:]) + signal_identifier = AVDTP_SET_CONFIGURATION + message_type = Message.MessageType.COMMAND - def __init__( - self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities] - ) -> None: - super().__init__( - payload=bytes([acp_seid << 2, int_seid << 2]) - + ServiceCapabilities.serialize_capabilities(capabilities) - ) - self.acp_seid = acp_seid - self.int_seid = int_seid - self.capabilities = capabilities + acp_seid: int = field(metadata=Message.SEID_METADATA) + int_seid: int = field(metadata=Message.SEID_METADATA) + capabilities: Iterable[ServiceCapabilities] = field( + metadata=ServiceCapabilities.METADATA + ) def __str__(self) -> str: details = [f'ACP SEID: {self.acp_seid}', f'INT SEID: {self.int_seid}'] + [ @@ -874,27 +836,29 @@ class Set_Configuration_Command(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Set_Configuration_Response(Message): ''' See Bluetooth AVDTP spec - 8.9.2 Set Configuration Response ''' + signal_identifier = AVDTP_SET_CONFIGURATION + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Set_Configuration_Reject(Message): ''' See Bluetooth AVDTP spec - 8.9.3 Set Configuration Reject ''' - def init_from_payload(self): - self.service_category = self.payload[0] - self.error_code = self.payload[1] + signal_identifier = AVDTP_SET_CONFIGURATION + message_type = Message.MessageType.RESPONSE_REJECT - def __init__(self, error_code: int, service_category: int = 0) -> None: - super().__init__(payload=bytes([service_category, error_code])) - self.service_category = service_category - self.error_code = error_code + service_category: int = field(metadata=hci.metadata(1), default=0) + error_code: int = field(metadata=hci.metadata(1), default=0) def __str__(self) -> str: details = [ @@ -912,27 +876,30 @@ class Set_Configuration_Reject(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_Configuration_Command(Simple_Command): ''' See Bluetooth AVDTP spec - 8.10.1 Get Configuration Command ''' + signal_identifier = AVDTP_GET_CONFIGURATION + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_Configuration_Response(Message): ''' See Bluetooth AVDTP spec - 8.10.2 Get Configuration Response ''' - def init_from_payload(self): - self.capabilities = ServiceCapabilities.parse_capabilities(self.payload) + signal_identifier = AVDTP_GET_CONFIGURATION + message_type = Message.MessageType.RESPONSE_ACCEPT - def __init__(self, capabilities: Iterable[ServiceCapabilities]) -> None: - super().__init__( - payload=ServiceCapabilities.serialize_capabilities(capabilities) - ) - self.capabilities = capabilities + capabilities: Iterable[ServiceCapabilities] = field( + metadata=ServiceCapabilities.METADATA + ) def __str__(self) -> str: details = [str(capability) for capability in self.capabilities] @@ -941,23 +908,31 @@ class Get_Configuration_Response(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Get_Configuration_Reject(Simple_Reject): ''' See Bluetooth AVDTP spec - 8.10.3 Get Configuration Reject ''' + signal_identifier = AVDTP_GET_CONFIGURATION + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Reconfigure_Command(Message): ''' See Bluetooth AVDTP spec - 8.11.1 Reconfigure Command ''' - def init_from_payload(self): - # pylint: disable=attribute-defined-outside-init - self.acp_seid = self.payload[0] >> 2 - self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[1:]) + signal_identifier = AVDTP_RECONFIGURE + message_type = Message.MessageType.COMMAND + + acp_seid: int = field(metadata=Message.SEID_METADATA) + capabilities: Iterable[ServiceCapabilities] = field( + metadata=ServiceCapabilities.METADATA + ) def __str__(self) -> str: details = [ @@ -968,57 +943,86 @@ class Reconfigure_Command(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Reconfigure_Response(Message): ''' See Bluetooth AVDTP spec - 8.11.2 Reconfigure Response ''' + signal_identifier = AVDTP_RECONFIGURE + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Reconfigure_Reject(Set_Configuration_Reject): ''' See Bluetooth AVDTP spec - 8.11.3 Reconfigure Reject ''' + signal_identifier = AVDTP_RECONFIGURE + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Open_Command(Simple_Command): ''' See Bluetooth AVDTP spec - 8.12.1 Open Stream Command ''' + signal_identifier = AVDTP_OPEN + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Open_Response(Message): ''' See Bluetooth AVDTP spec - 8.12.2 Open Stream Response ''' + signal_identifier = AVDTP_OPEN + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Open_Reject(Simple_Reject): ''' See Bluetooth AVDTP spec - 8.12.3 Open Stream Reject ''' + signal_identifier = AVDTP_OPEN + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Start_Command(Message): ''' See Bluetooth AVDTP spec - 8.13.1 Start Stream Command ''' - def init_from_payload(self): - self.acp_seids = [x >> 2 for x in self.payload] + signal_identifier = AVDTP_START + message_type = Message.MessageType.COMMAND - def __init__(self, seids: Iterable[int]) -> None: - super().__init__(payload=bytes([seid << 2 for seid in seids])) - self.acp_seids = seids + acp_seids: Iterable[int] = field( + metadata=hci.metadata( + { + 'serializer': lambda seids: bytes([seid << 2 for seid in seids]), + 'parser': lambda data, offset: ( + len(data), + [x >> 2 for x in data[offset:]], + ), + } + ) + ) def __str__(self) -> str: return self.to_string([f'ACP SEIDs: {self.acp_seids}']) @@ -1026,27 +1030,29 @@ class Start_Command(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Start_Response(Message): ''' See Bluetooth AVDTP spec - 8.13.2 Start Stream Response ''' + signal_identifier = AVDTP_START + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Start_Reject(Message): ''' See Bluetooth AVDTP spec - 8.13.3 Set Configuration Reject ''' - def init_from_payload(self): - self.acp_seid = self.payload[0] >> 2 - self.error_code = self.payload[1] + signal_identifier = AVDTP_START + message_type = Message.MessageType.RESPONSE_REJECT - def __init__(self, acp_seid, error_code): - super().__init__(payload=bytes([acp_seid << 2, error_code])) - self.acp_seid = acp_seid - self.error_code = error_code + acp_seid: int = field(metadata=Message.SEID_METADATA) + error_code: int = field(metadata=hci.metadata(1)) def __str__(self) -> str: details = [ @@ -1058,122 +1064,182 @@ class Start_Reject(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Close_Command(Simple_Command): ''' See Bluetooth AVDTP spec - 8.14.1 Close Stream Command ''' + signal_identifier = AVDTP_CLOSE + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Close_Response(Message): ''' See Bluetooth AVDTP spec - 8.14.2 Close Stream Response ''' + signal_identifier = AVDTP_CLOSE + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Close_Reject(Simple_Reject): ''' See Bluetooth AVDTP spec - 8.14.3 Close Stream Reject ''' + signal_identifier = AVDTP_CLOSE + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Suspend_Command(Start_Command): ''' See Bluetooth AVDTP spec - 8.15.1 Suspend Command ''' + signal_identifier = AVDTP_SUSPEND + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Suspend_Response(Message): ''' See Bluetooth AVDTP spec - 8.15.2 Suspend Response ''' + signal_identifier = AVDTP_SUSPEND + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Suspend_Reject(Start_Reject): ''' See Bluetooth AVDTP spec - 8.15.3 Suspend Reject ''' + signal_identifier = AVDTP_SUSPEND + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Abort_Command(Simple_Command): ''' See Bluetooth AVDTP spec - 8.16.1 Abort Command ''' + signal_identifier = AVDTP_ABORT + message_type = Message.MessageType.COMMAND + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Abort_Response(Message): ''' See Bluetooth AVDTP spec - 8.16.2 Abort Response ''' + signal_identifier = AVDTP_ABORT + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Security_Control_Command(Message): ''' See Bluetooth AVDTP spec - 8.17.1 Security Control Command ''' - def init_from_payload(self): - # pylint: disable=attribute-defined-outside-init - self.acp_seid = self.payload[0] >> 2 - self.data = self.payload[1:] + signal_identifier = AVDTP_SECURITY_CONTROL + message_type = Message.MessageType.COMMAND + + acp_seid: int = field(metadata=Message.SEID_METADATA) + data: bytes = field(metadata=hci.metadata('*')) def __str__(self) -> str: - return self.to_string([f'ACP_SEID: {self.acp_seid}', f'data: {self.data}']) + return self.to_string( + [f'ACP_SEID: {self.acp_seid}', f'data: {self.data.hex()}'] + ) # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Security_Control_Response(Message): ''' See Bluetooth AVDTP spec - 8.17.2 Security Control Response ''' + signal_identifier = AVDTP_SECURITY_CONTROL + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class Security_Control_Reject(Simple_Reject): ''' See Bluetooth AVDTP spec - 8.17.3 Security Control Reject ''' + signal_identifier = AVDTP_SECURITY_CONTROL + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class General_Reject(Message): ''' See Bluetooth AVDTP spec - 8.18 General Reject ''' + signal_identifier = 0 + message_type = Message.MessageType.GENERAL_REJECT + def to_string(self, details): return color('GENERAL_REJECT', 'yellow') # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class DelayReport_Command(Message): ''' See Bluetooth AVDTP spec - 8.19.1 Delay Report Command ''' - def init_from_payload(self): - # pylint: disable=attribute-defined-outside-init - self.acp_seid = self.payload[0] >> 2 - self.delay = (self.payload[1] << 8) | (self.payload[2]) + signal_identifier = AVDTP_DELAYREPORT + message_type = Message.MessageType.COMMAND + + DELAY_METADATA = hci.metadata( + { + 'serializer': lambda delay: bytes([delay >> 8, delay & 0xFF]), + 'parser': lambda data, offset: ( + offset + 2, + (data[offset] << 8) | (data[offset + 1]), + ), + } + ) + + acp_seid: int = field(metadata=Message.SEID_METADATA) + delay: int = field(metadata=DELAY_METADATA) def __str__(self) -> str: return self.to_string([f'ACP_SEID: {self.acp_seid}', f'delay: {self.delay}']) @@ -1181,19 +1247,27 @@ class DelayReport_Command(Message): # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class DelayReport_Response(Message): ''' See Bluetooth AVDTP spec - 8.19.2 Delay Report Response ''' + signal_identifier = AVDTP_DELAYREPORT + message_type = Message.MessageType.RESPONSE_ACCEPT + # ----------------------------------------------------------------------------- @Message.subclass +@dataclass class DelayReport_Reject(Simple_Reject): ''' See Bluetooth AVDTP spec - 8.19.3 Delay Report Reject ''' + signal_identifier = AVDTP_DELAYREPORT + message_type = Message.MessageType.RESPONSE_REJECT + # ----------------------------------------------------------------------------- class Protocol(utils.EventEmitter): @@ -1348,7 +1422,7 @@ class Protocol(utils.EventEmitter): ): if isinstance( codec_capabilities.media_codec_information, - VendorSpecificMediaCodecInformation, + a2dp.VendorSpecificMediaCodecInformation, ): if ( codec_capabilities.media_codec_information.vendor_id @@ -2007,37 +2081,13 @@ class Stream: # ----------------------------------------------------------------------------- +@dataclass class StreamEndPoint: - def __init__( - self, - seid: int, - media_type: int, - tsep: int, - in_use: int, - capabilities: Iterable[ServiceCapabilities], - ) -> None: - self.seid = seid - self.media_type = media_type - self.tsep = tsep - self.in_use = in_use - self.capabilities = capabilities - - def __str__(self) -> str: - media_type = f'{name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}' - tsep = f'{name_or_number(AVDTP_TSEP_NAMES, self.tsep)}' - return '\n'.join( - [ - 'SEP(', - f' seid={self.seid}', - f' media_type={media_type}', - f' tsep={tsep}', - f' in_use={self.in_use}', - ' capabilities=[', - '\n'.join([f' {x}' for x in self.capabilities]), - ' ]', - ')', - ] - ) + seid: int + media_type: MediaType + tsep: int + in_use: int + capabilities: Iterable[ServiceCapabilities] # ----------------------------------------------------------------------------- @@ -2073,7 +2123,7 @@ class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy): self, protocol: Protocol, seid: int, - media_type: int, + media_type: MediaType, tsep: int, in_use: int, capabilities: Iterable[ServiceCapabilities], @@ -2103,7 +2153,7 @@ class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter): self, protocol: Protocol, seid: int, - media_type: int, + media_type: MediaType, tsep: int, capabilities: Iterable[ServiceCapabilities], configuration: Optional[Iterable[ServiceCapabilities]] = None, diff --git a/tests/a2dp_test.py b/tests/a2dp_test.py index a1fa74d6..d9cecd9d 100644 --- a/tests/a2dp_test.py +++ b/tests/a2dp_test.py @@ -25,7 +25,6 @@ import pytest from bumble import a2dp from bumble.avdtp import ( - A2DP_SBC_CODEC_TYPE, AVDTP_AUDIO_MEDIA_TYPE, AVDTP_IDLE_STATE, AVDTP_STREAMING_STATE, @@ -137,7 +136,7 @@ async def test_self_connection(): def source_codec_capabilities(): return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, - media_codec_type=A2DP_SBC_CODEC_TYPE, + media_codec_type=a2dp.CodecType.SBC, media_codec_information=a2dp.SbcMediaCodecInformation( sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100, channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO, @@ -154,7 +153,7 @@ def source_codec_capabilities(): def sink_codec_capabilities(): return MediaCodecCapabilities( media_type=AVDTP_AUDIO_MEDIA_TYPE, - media_codec_type=A2DP_SBC_CODEC_TYPE, + media_codec_type=a2dp.CodecType.SBC, media_codec_information=a2dp.SbcMediaCodecInformation( sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000 | a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100 diff --git a/tests/avdtp_test.py b/tests/avdtp_test.py index 983ea62b..369b6752 100644 --- a/tests/avdtp_test.py +++ b/tests/avdtp_test.py @@ -15,43 +15,105 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +import pytest + +from bumble import avdtp from bumble.a2dp import A2DP_SBC_CODEC_TYPE -from bumble.avdtp import ( - AVDTP_AUDIO_MEDIA_TYPE, - AVDTP_DELAY_REPORTING_SERVICE_CATEGORY, - AVDTP_GET_CAPABILITIES, - AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY, - AVDTP_SET_CONFIGURATION, - Get_Capabilities_Response, - MediaCodecCapabilities, - Message, - ServiceCapabilities, - Set_Configuration_Command, -) from bumble.rtp import MediaPacket # ----------------------------------------------------------------------------- -def test_messages(): - capabilities = [ - ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), - MediaCodecCapabilities( - media_type=AVDTP_AUDIO_MEDIA_TYPE, - media_codec_type=A2DP_SBC_CODEC_TYPE, - media_codec_information=bytes.fromhex('211502fa'), +@pytest.mark.parametrize( + 'message', + ( + avdtp.Discover_Command(), + avdtp.Discover_Response( + endpoints=[ + avdtp.EndPointInfo( + seid=1, in_use=1, media_type=avdtp.MediaType.AUDIO, tsep=1 + ) + ] ), - ServiceCapabilities(AVDTP_DELAY_REPORTING_SERVICE_CATEGORY), - ] - message = Get_Capabilities_Response(capabilities) - parsed = Message.create( - AVDTP_GET_CAPABILITIES, Message.MessageType.RESPONSE_ACCEPT, message.payload - ) - assert message.payload == parsed.payload - - message = Set_Configuration_Command(3, 4, capabilities) - parsed = Message.create( - AVDTP_SET_CONFIGURATION, Message.MessageType.COMMAND, message.payload + avdtp.Get_Capabilities_Command(acp_seid=1), + avdtp.Get_Capabilities_Response( + capabilities=[ + avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), + avdtp.MediaCodecCapabilities( + media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE, + media_codec_type=A2DP_SBC_CODEC_TYPE, + media_codec_information=bytes.fromhex('211502fa'), + ), + avdtp.ServiceCapabilities(avdtp.AVDTP_DELAY_REPORTING_SERVICE_CATEGORY), + ] + ), + avdtp.Get_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR), + avdtp.Get_All_Capabilities_Command(acp_seid=1), + avdtp.Get_All_Capabilities_Response( + capabilities=[ + avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY) + ] + ), + avdtp.Get_All_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR), + avdtp.Set_Configuration_Command( + acp_seid=1, + int_seid=2, + capabilities=[ + avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY) + ], + ), + avdtp.Set_Configuration_Response(), + avdtp.Set_Configuration_Reject( + service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY, + error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR, + ), + avdtp.Get_Configuration_Command(acp_seid=1), + avdtp.Get_Configuration_Response( + capabilities=[ + avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY) + ] + ), + avdtp.Get_Configuration_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR), + avdtp.Reconfigure_Command( + acp_seid=1, + capabilities=[ + avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY) + ], + ), + avdtp.Reconfigure_Response(), + avdtp.Reconfigure_Reject( + service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY, + error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR, + ), + avdtp.Open_Command(acp_seid=1), + avdtp.Open_Response(), + avdtp.Open_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR), + avdtp.Start_Command(acp_seids=[1, 2]), + avdtp.Start_Response(), + avdtp.Start_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR), + avdtp.Close_Command(acp_seid=1), + avdtp.Close_Response(), + avdtp.Close_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR), + avdtp.Suspend_Command(acp_seids=[1, 2]), + avdtp.Suspend_Response(), + avdtp.Suspend_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR), + avdtp.Abort_Command(acp_seid=1), + avdtp.Abort_Response(), + avdtp.Security_Control_Command(acp_seid=1, data=b'foo'), + avdtp.Security_Control_Response(), + avdtp.Security_Control_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR), + avdtp.General_Reject(), + avdtp.DelayReport_Command(acp_seid=1, delay=100), + avdtp.DelayReport_Response(), + avdtp.DelayReport_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR), + ), +) +def test_messages(message: avdtp.Message): + parsed = avdtp.Message.create( + signal_identifier=message.signal_identifier, + message_type=message.message_type, + payload=message.payload, ) + assert message == parsed assert message.payload == parsed.payload @@ -62,9 +124,3 @@ def test_rtp(): ) media_packet = MediaPacket.from_bytes(packet) print(media_packet) - - -# ----------------------------------------------------------------------------- -if __name__ == '__main__': - test_messages() - test_rtp()