diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 733f327c..98ded8f1 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -26,6 +26,7 @@ from typing import ( AsyncIterator, Awaitable, Callable, + ClassVar, Iterable, List, Optional, @@ -33,12 +34,10 @@ from typing import ( SupportsBytes, TypeVar, Union, - ClassVar, - TypeAlias, cast, ) -from bumble import avc, avctp, core, l2cap, utils, hci +from bumble import avc, avctp, core, hci, l2cap, utils from bumble.colors import color from bumble.device import Connection, Device from bumble.sdp import ( @@ -131,6 +130,31 @@ class EventId(hci.SpecableEnum): return bytes([int(self)]) +class StatusCode(hci.SpecableEnum): + INVALID_COMMAND = 0x00 + INVALID_PARAMETER = 0x01 + PARAMETER_CONTENT_ERROR = 0x02 + INTERNAL_ERROR = 0x03 + OPERATION_COMPLETED = 0x04 + UID_CHANGED = 0x05 + INVALID_DIRECTION = 0x07 + NOT_A_DIRECTORY = 0x08 + DOES_NOT_EXIST = 0x09 + INVALID_SCOPE = 0x0A + RANGE_OUT_OF_BOUNDS = 0x0B + FOLDER_ITEM_IS_NOT_PLAYABLE = 0x0C + MEDIA_IN_USE = 0x0D + NOW_PLAYING_LIST_FULL = 0x0E + SEARCH_NOT_SUPPORTED = 0x0F + SEARCH_IN_PROGRESS = 0x10 + INVALID_PLAYER_ID = 0x11 + PLAYER_NOT_BROWSABLE = 0x12 + PLAYER_NOT_ADDRESSED = 0x13 + NO_VALID_SEARCH_RESULTS = 0x14 + NO_AVAILABLE_PLAYERS = 0x15 + ADDRESSED_PLAYER_CHANGED = 0x16 + + # ----------------------------------------------------------------------------- def make_controller_service_sdp_records( service_record_handle: int, @@ -267,14 +291,52 @@ def make_target_service_sdp_records( # ----------------------------------------------------------------------------- -def _decode_attribute_value(value: bytes, character_set: CharacterSetId) -> str: - try: - if character_set == CharacterSetId.UTF_8: - return value.decode("utf-8") - return value.decode("ascii") - except UnicodeDecodeError: - logger.warning(f"cannot decode string with bytes: {value.hex()}") - return "" +@dataclass +class MediaAttribute: + attribute_id: MediaAttributeId + attribute_value: str + character_set_id: CharacterSetId = CharacterSetId.UTF_8 + + @classmethod + def _decode_attribute_value( + cls, value: bytes, character_set: CharacterSetId + ) -> str: + try: + if character_set == CharacterSetId.UTF_8: + return value.decode("utf-8") + return value.decode("ascii") + except UnicodeDecodeError: + logger.warning(f"cannot decode string with bytes: {value.hex()}") + return value.hex() + + @classmethod + def parse_from_bytes(cls, pdu: bytes, offset: int) -> tuple[int, MediaAttribute]: + ( + attribute_id_int, + character_set_id_int, + attribute_value_length, + ) = struct.unpack_from(">IHH", pdu, offset) + attribute_value_bytes = pdu[offset + 8 : offset + 8 + attribute_value_length] + character_set_id = CharacterSetId(character_set_id_int) + return offset + 8 + attribute_value_length, cls( + attribute_id=MediaAttributeId(attribute_id_int), + character_set_id=character_set_id, + attribute_value=cls._decode_attribute_value( + attribute_value_bytes, character_set_id + ), + ) + + def __bytes__(self) -> bytes: + attribute_value_bytes = self.attribute_value.encode("utf-8") + return ( + struct.pack( + ">IHH", + int(self.attribute_id), + int(self.character_set_id), + len(attribute_value_bytes), + ) + + attribute_value_bytes + ) # ----------------------------------------------------------------------------- @@ -336,10 +398,9 @@ class PduAssembler: # ----------------------------------------------------------------------------- -@dataclass class Command: pdu_id: ClassVar[PduId] - _payload: Optional[bytes] = field(init=False, default=None) + _payload: Optional[bytes] = None _Command = TypeVar('_Command', bound='Command') subclasses: ClassVar[dict[int, type[Command]]] = {} @@ -359,15 +420,11 @@ class Command: instance._payload = pdu[0:] return instance - @property - def payload(self) -> bytes: + def __bytes__(self) -> bytes: if self._payload is None: self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields) return self._payload - def __repr__(self) -> str: - return str(self) - # ----------------------------------------------------------------------------- @Command.command @@ -400,9 +457,9 @@ class GetElementAttributesCommand(Command): { 'parser': lambda data, offset: ( offset + 8, - int.from_bytes(data[offset : offset + 8]), + int.from_bytes(data[offset : offset + 8], byteorder='big'), ), - 'serializer': lambda x: x.to_bytes(8), + 'serializer': lambda x: x.to_bytes(8, byteorder='big'), } ) ) @@ -432,58 +489,60 @@ class RegisterNotificationCommand(Command): # ----------------------------------------------------------------------------- -@dataclass class Response: pdu_id: PduId - parameter: bytes + _payload: Optional[bytes] = None - def to_string(self, properties: dict[str, str]) -> str: - properties_str = ",".join( - [f"{name}={value}" for name, value in properties.items()] - ) - return f"Response[{self.pdu_id.name}]({properties_str})" + fields: ClassVar[hci.Fields] = () - def __str__(self) -> str: - return self.to_string({"parameter": self.parameter.hex()}) + _Response = TypeVar('_Response', bound='Response') - def __repr__(self) -> str: - return str(self) + @classmethod + def register(cls, subclass: type[_Response]) -> type[_Response]: + subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass) + return subclass + + def __bytes__(self) -> bytes: + if self._payload is None: + self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields) + return self._payload + + @classmethod + def from_bytes(cls, pdu: bytes, pdu_id: Optional[PduId] = None) -> Response: + kwargs = hci.HCI_Object.dict_from_bytes(pdu, 0, cls.fields) + if pdu_id is not None: + kwargs['pdu_id'] = pdu_id + instance = cls(**kwargs) + instance._payload = pdu + return instance # ----------------------------------------------------------------------------- +@Response.register +@dataclass class RejectedResponse(Response): - status_code: Protocol.StatusCode - - @classmethod - def from_bytes(cls, pdu_id: PduId, pdu: bytes) -> RejectedResponse: - return cls(pdu_id, Protocol.StatusCode(pdu[0])) - - def __init__(self, pdu_id: PduId, status_code: Protocol.StatusCode) -> None: - super().__init__(pdu_id, bytes([int(status_code)])) - self.status_code = status_code - - def __str__(self) -> str: - return self.to_string( - { - "status_code": self.status_code.name, - } - ) + pdu_id: PduId + status_code: StatusCode = field(metadata=StatusCode.type_metadata(1)) # ----------------------------------------------------------------------------- +@Response.register +@dataclass class NotImplementedResponse(Response): - @classmethod - def from_bytes(cls, pdu_id: PduId, pdu: bytes) -> NotImplementedResponse: - return cls(pdu_id, pdu[1:]) + pdu_id: PduId + parameters: bytes = field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- +@dataclass class GetCapabilitiesResponse(Response): + pdu_id = PduId.GET_CAPABILITIES capability_id: GetCapabilitiesCommand.CapabilityId - capabilities: list[Union[SupportsBytes, bytes]] + capabilities: Sequence[Union[SupportsBytes, bytes]] @classmethod - def from_bytes(cls, pdu: bytes) -> GetCapabilitiesResponse: + def from_bytes(cls, pdu: bytes, pdu_id: Optional[PduId] = None) -> Response: + del pdu_id # Unused. if len(pdu) < 2: # Possibly a reject response. return cls(GetCapabilitiesCommand.CapabilityId(0), []) @@ -505,168 +564,52 @@ class GetCapabilitiesResponse(Response): return cls(capability_id, capabilities) - def __init__( - self, - capability_id: GetCapabilitiesCommand.CapabilityId, - capabilities: Sequence[Union[SupportsBytes, bytes]], - ) -> None: - super().__init__( - PduId.GET_CAPABILITIES, - bytes([capability_id, len(capabilities)]) - + b''.join(bytes(capability) for capability in capabilities), - ) - self.capability_id = capability_id - self.capabilities = list(capabilities) - - def __str__(self) -> str: - return self.to_string( - { - "capability_id": self.capability_id.name, - "capabilities": str(self.capabilities), - } - ) - - -# ----------------------------------------------------------------------------- -class GetPlayStatusResponse(Response): - song_length: int - song_position: int - play_status: PlayStatus - - @classmethod - def from_bytes(cls, pdu: bytes) -> GetPlayStatusResponse: - (song_length, song_position) = struct.unpack_from(">II", pdu, 0) - play_status = PlayStatus(pdu[8]) - - return cls(song_length, song_position, play_status) - - def __init__( - self, - song_length: int, - song_position: int, - play_status: PlayStatus, - ) -> None: - super().__init__( - PduId.GET_PLAY_STATUS, - struct.pack(">IIB", song_length, song_position, int(play_status)), - ) - self.song_length = song_length - self.song_position = song_position - self.play_status = play_status - - def __str__(self) -> str: - return self.to_string( - { - "song_length": str(self.song_length), - "song_position": str(self.song_position), - "play_status": self.play_status.name, - } - ) - - -# ----------------------------------------------------------------------------- -class GetElementAttributesResponse(Response): - attributes: list[MediaAttribute] - - @classmethod - def from_bytes(cls, pdu: bytes) -> GetElementAttributesResponse: - num_attributes = pdu[0] - offset = 1 - attributes: list[MediaAttribute] = [] - for _ in range(num_attributes): - ( - attribute_id_int, - character_set_id_int, - attribute_value_length, - ) = struct.unpack_from(">IHH", pdu, offset) - attribute_value_bytes = pdu[ - offset + 8 : offset + 8 + attribute_value_length - ] - attribute_id = MediaAttributeId(attribute_id_int) - character_set_id = CharacterSetId(character_set_id_int) - attribute_value = _decode_attribute_value( - attribute_value_bytes, character_set_id - ) - attributes.append( - MediaAttribute(attribute_id, character_set_id, attribute_value) - ) - offset += 8 + attribute_value_length - - return cls(attributes) - - def __init__(self, attributes: Sequence[MediaAttribute]) -> None: - parameter = bytes([len(attributes)]) - for attribute in attributes: - attribute_value_bytes = attribute.attribute_value.encode("utf-8") - parameter += ( - struct.pack( - ">IHH", - int(attribute.attribute_id), - int(CharacterSetId.UTF_8), - len(attribute_value_bytes), - ) - + attribute_value_bytes - ) - super().__init__( - PduId.GET_ELEMENT_ATTRIBUTES, - parameter, - ) - self.attributes = list(attributes) - - def __str__(self) -> str: - attribute_strs = [str(attribute) for attribute in self.attributes] - return self.to_string( - { - "attributes": f"[{', '.join(attribute_strs)}]", - } - ) - - -# ----------------------------------------------------------------------------- -class SetAbsoluteVolumeResponse(Response): - volume: int - - @classmethod - def from_bytes(cls, pdu: bytes) -> SetAbsoluteVolumeResponse: - return cls(pdu[0]) - - def __init__(self, volume: int) -> None: - super().__init__(PduId.SET_ABSOLUTE_VOLUME, bytes([volume])) - self.volume = volume - - def __str__(self) -> str: - return self.to_string({"volume": str(self.volume)}) - - -# ----------------------------------------------------------------------------- -class RegisterNotificationResponse(Response): - event: Event - - @classmethod - def from_bytes(cls, pdu: bytes) -> RegisterNotificationResponse: - return cls(Event.from_bytes(pdu)) - - def __init__(self, event: Event) -> None: - super().__init__( - PduId.REGISTER_NOTIFICATION, - bytes(event), - ) - self.event = event - - def __str__(self) -> str: - return self.to_string( - { - "event": str(self.event), - } + def __post_init__(self) -> None: + self._payload = bytes([self.capability_id, len(self.capabilities)]) + b''.join( + bytes(capability) for capability in self.capabilities ) # ----------------------------------------------------------------------------- +@Response.register @dataclass -class MediaAttribute: - attribute_id: MediaAttributeId - character_set_id: CharacterSetId - attribute_value: str +class GetPlayStatusResponse(Response): + pdu_id = PduId.GET_PLAY_STATUS + song_length: int = field(metadata=hci.metadata(">4")) + song_position: int = field(metadata=hci.metadata(">4")) + play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1)) + + +# ----------------------------------------------------------------------------- +@Response.register +@dataclass +class GetElementAttributesResponse(Response): + pdu_id = PduId.GET_ELEMENT_ATTRIBUTES + attributes: Sequence[MediaAttribute] = field( + metadata=hci.metadata( + MediaAttribute.parse_from_bytes, list_begin=True, list_end=True + ) + ) + + +# ----------------------------------------------------------------------------- +@Response.register +@dataclass +class SetAbsoluteVolumeResponse(Response): + pdu_id = PduId.SET_ABSOLUTE_VOLUME + volume: int = field(metadata=hci.metadata(1)) + + +# ----------------------------------------------------------------------------- +@Response.register +@dataclass +class RegisterNotificationResponse(Response): + pdu_id = PduId.REGISTER_NOTIFICATION + event: Event = field( + metadata=hci.metadata( + lambda data, offset: (len(data), Event.from_bytes(data[offset:])) + ) + ) # ----------------------------------------------------------------------------- @@ -710,10 +653,9 @@ class ApplicationSetting: # ----------------------------------------------------------------------------- -@dataclass class Event: - event_id: EventId = field(init=False) - _pdu: Optional[bytes] = field(init=False, default=None) + event_id: EventId + _pdu: Optional[bytes] = None _Event = TypeVar('_Event', bound='Event') subclasses: ClassVar[dict[int, type[Event]]] = {} @@ -740,9 +682,6 @@ class Event: ) return self._pdu - def __repr__(self) -> str: - return str(self) - # ----------------------------------------------------------------------------- @dataclass @@ -871,7 +810,7 @@ class Delegate: class Error(Exception): """The delegate method failed, with a specified status code.""" - def __init__(self, status_code: Protocol.StatusCode) -> None: + def __init__(self, status_code: StatusCode) -> None: self.status_code = status_code supported_events: list[EventId] @@ -913,30 +852,6 @@ class Protocol(utils.EventEmitter): CONTINUE = 0b10 END = 0b11 - class StatusCode(utils.OpenIntEnum): - INVALID_COMMAND = 0x00 - INVALID_PARAMETER = 0x01 - PARAMETER_CONTENT_ERROR = 0x02 - INTERNAL_ERROR = 0x03 - OPERATION_COMPLETED = 0x04 - UID_CHANGED = 0x05 - INVALID_DIRECTION = 0x07 - NOT_A_DIRECTORY = 0x08 - DOES_NOT_EXIST = 0x09 - INVALID_SCOPE = 0x0A - RANGE_OUT_OF_BOUNDS = 0x0B - FOLDER_ITEM_IS_NOT_PLAYABLE = 0x0C - MEDIA_IN_USE = 0x0D - NOW_PLAYING_LIST_FULL = 0x0E - SEARCH_NOT_SUPPORTED = 0x0F - SEARCH_IN_PROGRESS = 0x10 - INVALID_PLAYER_ID = 0x11 - PLAYER_NOT_BROWSABLE = 0x12 - PLAYER_NOT_ADDRESSED = 0x13 - NO_VALID_SEARCH_RESULTS = 0x14 - NO_AVAILABLE_PLAYERS = 0x15 - ADDRESSED_PLAYER_CHANGED = 0x16 - class InvalidPidError(Exception): """A response frame with ipid==1 was received.""" @@ -1111,7 +1026,7 @@ class Protocol(utils.EventEmitter): self.send_rejected_avrcp_response( transaction_label, command.pdu_id, - Protocol.StatusCode.INTERNAL_ERROR, + StatusCode.INTERNAL_ERROR, ) utils.AsyncRunner.spawn(call()) @@ -1146,7 +1061,7 @@ class Protocol(utils.EventEmitter): GetElementAttributesCommand(element_identifier, attribute_ids), ) response = self._check_response(response_context, GetElementAttributesResponse) - return response.attributes + return list(response.attributes) async def monitor_events( self, event_id: EventId, playback_interval: int = 0 @@ -1511,12 +1426,12 @@ class Protocol(utils.EventEmitter): # TODO: check that this is the right way to respond in this case. logger.debug("unsupported PDU ID") self.send_rejected_avrcp_response( - transaction_label, pdu_id, self.StatusCode.INVALID_PARAMETER + transaction_label, pdu_id, StatusCode.INVALID_PARAMETER ) else: logger.debug("unsupported command type") self.send_rejected_avrcp_response( - transaction_label, pdu_id, self.StatusCode.INVALID_COMMAND + transaction_label, pdu_id, StatusCode.INVALID_COMMAND ) self.receive_command_state = None @@ -1541,9 +1456,9 @@ class Protocol(utils.EventEmitter): # more appropriate. response: Optional[Response] = None if response_code == avc.ResponseFrame.ResponseCode.REJECTED: - response = RejectedResponse.from_bytes(pdu_id, pdu) + response = RejectedResponse.from_bytes(pdu_id=pdu_id, pdu=pdu) elif response_code == avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED: - response = NotImplementedResponse.from_bytes(pdu_id, pdu) + response = NotImplementedResponse.from_bytes(pdu_id=pdu_id, pdu=pdu) elif response_code in ( avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, avc.ResponseFrame.ResponseCode.INTERIM, @@ -1551,15 +1466,15 @@ class Protocol(utils.EventEmitter): avc.ResponseFrame.ResponseCode.ACCEPTED, ): if pdu_id == PduId.GET_CAPABILITIES: - response = GetCapabilitiesResponse.from_bytes(pdu) + response = GetCapabilitiesResponse.from_bytes(pdu=pdu) elif pdu_id == PduId.GET_PLAY_STATUS: - response = GetPlayStatusResponse.from_bytes(pdu) + response = GetPlayStatusResponse.from_bytes(pdu=pdu) elif pdu_id == PduId.GET_ELEMENT_ATTRIBUTES: - response = GetElementAttributesResponse.from_bytes(pdu) + response = GetElementAttributesResponse.from_bytes(pdu=pdu) elif pdu_id == PduId.SET_ABSOLUTE_VOLUME: - response = SetAbsoluteVolumeResponse.from_bytes(pdu) + response = SetAbsoluteVolumeResponse.from_bytes(pdu=pdu) elif pdu_id == PduId.REGISTER_NOTIFICATION: - response = RegisterNotificationResponse.from_bytes(pdu) + response = RegisterNotificationResponse.from_bytes(pdu=pdu) else: logger.debug("unexpected PDU ID") pending_command.response.set_exception( @@ -1655,10 +1570,8 @@ class Protocol(utils.EventEmitter): # TODO: fragmentation # Send the command. logger.debug(f">>> AVRCP command PDU: {command}") - pdu = ( - struct.pack(">BBH", command.pdu_id, 0, len(command.payload)) - + command.payload - ) + payload = bytes(command) + pdu = struct.pack(">BBH", command.pdu_id, 0, len(payload)) + payload command_frame = avc.VendorDependentCommandFrame( command_type, avc.Frame.SubunitType.PANEL, @@ -1702,10 +1615,8 @@ class Protocol(utils.EventEmitter): ) -> None: # TODO: fragmentation logger.debug(f">>> AVRCP response PDU: {response}") - pdu = ( - struct.pack(">BBH", response.pdu_id, 0, len(response.parameter)) - + response.parameter - ) + payload = bytes(response) + pdu = struct.pack(">BBH", response.pdu_id, 0, len(payload)) + payload response_frame = avc.VendorDependentResponseFrame( response_code, avc.Frame.SubunitType.PANEL, diff --git a/bumble/hci.py b/bumble/hci.py index 06d6cd31..07f01f03 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -112,7 +112,14 @@ class SpecableEnum(utils.OpenIntEnum): @classmethod def type_spec(cls, size: int): - return {'size': size, 'mapper': lambda x: cls(x).name} + return { + 'serializer': lambda x: x.to_bytes(size, 'little'), + 'parser': lambda data, offset: ( + offset + size, + cls(int.from_bytes(data[offset : offset + size], 'little')), + ), + 'mapper': lambda x: cls(x).name, + } @classmethod def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False): @@ -123,7 +130,14 @@ class SpecableFlag(enum.IntFlag): @classmethod def type_spec(cls, size: int): - return {'size': size, 'mapper': lambda x: cls(x).name} + return { + 'serializer': lambda x: x.to_bytes(size, 'little'), + 'parser': lambda data, offset: ( + offset + size, + cls(int.from_bytes(data[offset : offset + size], 'little')), + ), + 'mapper': lambda x: cls(x).name, + } @classmethod def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False): diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index 26f9ba9d..5769eb17 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -15,13 +15,15 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + import struct +from collections.abc import Sequence import pytest -from collections.abc import Sequence -from typing import Self from bumble import avc, avctp, avrcp + from . import test_utils @@ -35,7 +37,7 @@ class TwoDevices(test_utils.TwoDevices): await self.protocols[1].connect(self.connections[0]) @classmethod - async def create_with_avdtp(cls) -> Self: + async def create_with_avdtp(cls) -> TwoDevices: devices = await cls.create_with_connection() await devices.setup_avdtp_connections() return devices @@ -44,7 +46,7 @@ class TwoDevices(test_utils.TwoDevices): # ----------------------------------------------------------------------------- def test_GetPlayStatusCommand(): command = avrcp.GetPlayStatusCommand() - assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command + assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command # ----------------------------------------------------------------------------- @@ -52,13 +54,13 @@ def test_GetCapabilitiesCommand(): command = avrcp.GetCapabilitiesCommand( capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.COMPANY_ID ) - assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command + assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command # ----------------------------------------------------------------------------- def test_SetAbsoluteVolumeCommand(): command = avrcp.SetAbsoluteVolumeCommand(volume=5) - assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command + assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command # ----------------------------------------------------------------------------- @@ -70,7 +72,7 @@ def test_GetElementAttributesCommand(): avrcp.MediaAttributeId.ARTIST_NAME, ], ) - assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command + assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command # ----------------------------------------------------------------------------- @@ -78,7 +80,7 @@ def test_RegisterNotificationCommand(): command = avrcp.RegisterNotificationCommand( event_id=avrcp.EventId.ADDRESSED_PLAYER_CHANGED, playback_interval=123 ) - assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command + assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command # ----------------------------------------------------------------------------- @@ -144,6 +146,76 @@ def test_PlayerApplicationSettingChangedEvent(): assert avrcp.Event.from_bytes(bytes(event)) == event +# ----------------------------------------------------------------------------- +def test_RejectedResponse(): + pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES + response = avrcp.RejectedResponse( + pdu_id=pdu_id, + status_code=avrcp.StatusCode.DOES_NOT_EXIST, + ) + assert ( + avrcp.RejectedResponse.from_bytes(pdu=bytes(response), pdu_id=pdu_id) + == response + ) + + +# ----------------------------------------------------------------------------- +def test_GetPlayStatusResponse(): + response = avrcp.GetPlayStatusResponse( + song_length=1010, song_position=13, play_status=avrcp.PlayStatus.PAUSED + ) + assert avrcp.GetPlayStatusResponse.from_bytes(bytes(response)) == response + + +# ----------------------------------------------------------------------------- +def test_NotImplementedResponse(): + pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES + response = avrcp.NotImplementedResponse(pdu_id=pdu_id, parameters=b'koasd') + assert ( + avrcp.NotImplementedResponse.from_bytes(bytes(response), pdu_id=pdu_id) + == response + ) + + +# ----------------------------------------------------------------------------- +def test_GetCapabilitiesResponse(): + response = avrcp.GetCapabilitiesResponse( + capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED, + capabilities=[ + avrcp.EventId.ADDRESSED_PLAYER_CHANGED, + avrcp.EventId.BATT_STATUS_CHANGED, + ], + ) + assert avrcp.GetCapabilitiesResponse.from_bytes(bytes(response)) == response + + +# ----------------------------------------------------------------------------- +def test_RegisterNotificationResponse(): + response = avrcp.RegisterNotificationResponse( + event=avrcp.PlaybackPositionChangedEvent(playback_position=38) + ) + assert avrcp.RegisterNotificationResponse.from_bytes(bytes(response)) == response + + +# ----------------------------------------------------------------------------- +def test_SetAbsoluteVolumeResponse(): + response = avrcp.SetAbsoluteVolumeResponse(volume=99) + assert avrcp.SetAbsoluteVolumeResponse.from_bytes(bytes(response)) == response + + +# ----------------------------------------------------------------------------- +def test_GetElementAttributesResponse(): + response = avrcp.GetElementAttributesResponse( + attributes=[ + avrcp.MediaAttribute( + attribute_id=avrcp.MediaAttributeId.ALBUM_NAME, + attribute_value="White Album", + ) + ] + ) + assert avrcp.GetElementAttributesResponse.from_bytes(bytes(response)) == response + + # ----------------------------------------------------------------------------- def test_frame_parser(): with pytest.raises(ValueError):