Migrate AVRCP responses to dataclasses

This commit is contained in:
Josh Wu
2025-08-28 00:09:06 +08:00
parent 31961febe5
commit 9f3d8c9b49
3 changed files with 266 additions and 269 deletions

View File

@@ -26,6 +26,7 @@ from typing import (
AsyncIterator,
Awaitable,
Callable,
ClassVar,
Iterable,
List,
Optional,
@@ -33,12 +34,10 @@ from typing import (
SupportsBytes,
TypeVar,
Union,
ClassVar,
TypeAlias,
cast,
)
from bumble import avc, avctp, core, l2cap, utils, hci
from bumble import avc, avctp, core, hci, l2cap, utils
from bumble.colors import color
from bumble.device import Connection, Device
from bumble.sdp import (
@@ -131,6 +130,31 @@ class EventId(hci.SpecableEnum):
return bytes([int(self)])
class StatusCode(hci.SpecableEnum):
INVALID_COMMAND = 0x00
INVALID_PARAMETER = 0x01
PARAMETER_CONTENT_ERROR = 0x02
INTERNAL_ERROR = 0x03
OPERATION_COMPLETED = 0x04
UID_CHANGED = 0x05
INVALID_DIRECTION = 0x07
NOT_A_DIRECTORY = 0x08
DOES_NOT_EXIST = 0x09
INVALID_SCOPE = 0x0A
RANGE_OUT_OF_BOUNDS = 0x0B
FOLDER_ITEM_IS_NOT_PLAYABLE = 0x0C
MEDIA_IN_USE = 0x0D
NOW_PLAYING_LIST_FULL = 0x0E
SEARCH_NOT_SUPPORTED = 0x0F
SEARCH_IN_PROGRESS = 0x10
INVALID_PLAYER_ID = 0x11
PLAYER_NOT_BROWSABLE = 0x12
PLAYER_NOT_ADDRESSED = 0x13
NO_VALID_SEARCH_RESULTS = 0x14
NO_AVAILABLE_PLAYERS = 0x15
ADDRESSED_PLAYER_CHANGED = 0x16
# -----------------------------------------------------------------------------
def make_controller_service_sdp_records(
service_record_handle: int,
@@ -267,14 +291,52 @@ def make_target_service_sdp_records(
# -----------------------------------------------------------------------------
def _decode_attribute_value(value: bytes, character_set: CharacterSetId) -> str:
try:
if character_set == CharacterSetId.UTF_8:
return value.decode("utf-8")
return value.decode("ascii")
except UnicodeDecodeError:
logger.warning(f"cannot decode string with bytes: {value.hex()}")
return ""
@dataclass
class MediaAttribute:
attribute_id: MediaAttributeId
attribute_value: str
character_set_id: CharacterSetId = CharacterSetId.UTF_8
@classmethod
def _decode_attribute_value(
cls, value: bytes, character_set: CharacterSetId
) -> str:
try:
if character_set == CharacterSetId.UTF_8:
return value.decode("utf-8")
return value.decode("ascii")
except UnicodeDecodeError:
logger.warning(f"cannot decode string with bytes: {value.hex()}")
return value.hex()
@classmethod
def parse_from_bytes(cls, pdu: bytes, offset: int) -> tuple[int, MediaAttribute]:
(
attribute_id_int,
character_set_id_int,
attribute_value_length,
) = struct.unpack_from(">IHH", pdu, offset)
attribute_value_bytes = pdu[offset + 8 : offset + 8 + attribute_value_length]
character_set_id = CharacterSetId(character_set_id_int)
return offset + 8 + attribute_value_length, cls(
attribute_id=MediaAttributeId(attribute_id_int),
character_set_id=character_set_id,
attribute_value=cls._decode_attribute_value(
attribute_value_bytes, character_set_id
),
)
def __bytes__(self) -> bytes:
attribute_value_bytes = self.attribute_value.encode("utf-8")
return (
struct.pack(
">IHH",
int(self.attribute_id),
int(self.character_set_id),
len(attribute_value_bytes),
)
+ attribute_value_bytes
)
# -----------------------------------------------------------------------------
@@ -336,10 +398,9 @@ class PduAssembler:
# -----------------------------------------------------------------------------
@dataclass
class Command:
pdu_id: ClassVar[PduId]
_payload: Optional[bytes] = field(init=False, default=None)
_payload: Optional[bytes] = None
_Command = TypeVar('_Command', bound='Command')
subclasses: ClassVar[dict[int, type[Command]]] = {}
@@ -359,15 +420,11 @@ class Command:
instance._payload = pdu[0:]
return instance
@property
def payload(self) -> bytes:
def __bytes__(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._payload
def __repr__(self) -> str:
return str(self)
# -----------------------------------------------------------------------------
@Command.command
@@ -400,9 +457,9 @@ class GetElementAttributesCommand(Command):
{
'parser': lambda data, offset: (
offset + 8,
int.from_bytes(data[offset : offset + 8]),
int.from_bytes(data[offset : offset + 8], byteorder='big'),
),
'serializer': lambda x: x.to_bytes(8),
'serializer': lambda x: x.to_bytes(8, byteorder='big'),
}
)
)
@@ -432,58 +489,60 @@ class RegisterNotificationCommand(Command):
# -----------------------------------------------------------------------------
@dataclass
class Response:
pdu_id: PduId
parameter: bytes
_payload: Optional[bytes] = None
def to_string(self, properties: dict[str, str]) -> str:
properties_str = ",".join(
[f"{name}={value}" for name, value in properties.items()]
)
return f"Response[{self.pdu_id.name}]({properties_str})"
fields: ClassVar[hci.Fields] = ()
def __str__(self) -> str:
return self.to_string({"parameter": self.parameter.hex()})
_Response = TypeVar('_Response', bound='Response')
def __repr__(self) -> str:
return str(self)
@classmethod
def register(cls, subclass: type[_Response]) -> type[_Response]:
subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass)
return subclass
def __bytes__(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._payload
@classmethod
def from_bytes(cls, pdu: bytes, pdu_id: Optional[PduId] = None) -> Response:
kwargs = hci.HCI_Object.dict_from_bytes(pdu, 0, cls.fields)
if pdu_id is not None:
kwargs['pdu_id'] = pdu_id
instance = cls(**kwargs)
instance._payload = pdu
return instance
# -----------------------------------------------------------------------------
@Response.register
@dataclass
class RejectedResponse(Response):
status_code: Protocol.StatusCode
@classmethod
def from_bytes(cls, pdu_id: PduId, pdu: bytes) -> RejectedResponse:
return cls(pdu_id, Protocol.StatusCode(pdu[0]))
def __init__(self, pdu_id: PduId, status_code: Protocol.StatusCode) -> None:
super().__init__(pdu_id, bytes([int(status_code)]))
self.status_code = status_code
def __str__(self) -> str:
return self.to_string(
{
"status_code": self.status_code.name,
}
)
pdu_id: PduId
status_code: StatusCode = field(metadata=StatusCode.type_metadata(1))
# -----------------------------------------------------------------------------
@Response.register
@dataclass
class NotImplementedResponse(Response):
@classmethod
def from_bytes(cls, pdu_id: PduId, pdu: bytes) -> NotImplementedResponse:
return cls(pdu_id, pdu[1:])
pdu_id: PduId
parameters: bytes = field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@dataclass
class GetCapabilitiesResponse(Response):
pdu_id = PduId.GET_CAPABILITIES
capability_id: GetCapabilitiesCommand.CapabilityId
capabilities: list[Union[SupportsBytes, bytes]]
capabilities: Sequence[Union[SupportsBytes, bytes]]
@classmethod
def from_bytes(cls, pdu: bytes) -> GetCapabilitiesResponse:
def from_bytes(cls, pdu: bytes, pdu_id: Optional[PduId] = None) -> Response:
del pdu_id # Unused.
if len(pdu) < 2:
# Possibly a reject response.
return cls(GetCapabilitiesCommand.CapabilityId(0), [])
@@ -505,168 +564,52 @@ class GetCapabilitiesResponse(Response):
return cls(capability_id, capabilities)
def __init__(
self,
capability_id: GetCapabilitiesCommand.CapabilityId,
capabilities: Sequence[Union[SupportsBytes, bytes]],
) -> None:
super().__init__(
PduId.GET_CAPABILITIES,
bytes([capability_id, len(capabilities)])
+ b''.join(bytes(capability) for capability in capabilities),
)
self.capability_id = capability_id
self.capabilities = list(capabilities)
def __str__(self) -> str:
return self.to_string(
{
"capability_id": self.capability_id.name,
"capabilities": str(self.capabilities),
}
)
# -----------------------------------------------------------------------------
class GetPlayStatusResponse(Response):
song_length: int
song_position: int
play_status: PlayStatus
@classmethod
def from_bytes(cls, pdu: bytes) -> GetPlayStatusResponse:
(song_length, song_position) = struct.unpack_from(">II", pdu, 0)
play_status = PlayStatus(pdu[8])
return cls(song_length, song_position, play_status)
def __init__(
self,
song_length: int,
song_position: int,
play_status: PlayStatus,
) -> None:
super().__init__(
PduId.GET_PLAY_STATUS,
struct.pack(">IIB", song_length, song_position, int(play_status)),
)
self.song_length = song_length
self.song_position = song_position
self.play_status = play_status
def __str__(self) -> str:
return self.to_string(
{
"song_length": str(self.song_length),
"song_position": str(self.song_position),
"play_status": self.play_status.name,
}
)
# -----------------------------------------------------------------------------
class GetElementAttributesResponse(Response):
attributes: list[MediaAttribute]
@classmethod
def from_bytes(cls, pdu: bytes) -> GetElementAttributesResponse:
num_attributes = pdu[0]
offset = 1
attributes: list[MediaAttribute] = []
for _ in range(num_attributes):
(
attribute_id_int,
character_set_id_int,
attribute_value_length,
) = struct.unpack_from(">IHH", pdu, offset)
attribute_value_bytes = pdu[
offset + 8 : offset + 8 + attribute_value_length
]
attribute_id = MediaAttributeId(attribute_id_int)
character_set_id = CharacterSetId(character_set_id_int)
attribute_value = _decode_attribute_value(
attribute_value_bytes, character_set_id
)
attributes.append(
MediaAttribute(attribute_id, character_set_id, attribute_value)
)
offset += 8 + attribute_value_length
return cls(attributes)
def __init__(self, attributes: Sequence[MediaAttribute]) -> None:
parameter = bytes([len(attributes)])
for attribute in attributes:
attribute_value_bytes = attribute.attribute_value.encode("utf-8")
parameter += (
struct.pack(
">IHH",
int(attribute.attribute_id),
int(CharacterSetId.UTF_8),
len(attribute_value_bytes),
)
+ attribute_value_bytes
)
super().__init__(
PduId.GET_ELEMENT_ATTRIBUTES,
parameter,
)
self.attributes = list(attributes)
def __str__(self) -> str:
attribute_strs = [str(attribute) for attribute in self.attributes]
return self.to_string(
{
"attributes": f"[{', '.join(attribute_strs)}]",
}
)
# -----------------------------------------------------------------------------
class SetAbsoluteVolumeResponse(Response):
volume: int
@classmethod
def from_bytes(cls, pdu: bytes) -> SetAbsoluteVolumeResponse:
return cls(pdu[0])
def __init__(self, volume: int) -> None:
super().__init__(PduId.SET_ABSOLUTE_VOLUME, bytes([volume]))
self.volume = volume
def __str__(self) -> str:
return self.to_string({"volume": str(self.volume)})
# -----------------------------------------------------------------------------
class RegisterNotificationResponse(Response):
event: Event
@classmethod
def from_bytes(cls, pdu: bytes) -> RegisterNotificationResponse:
return cls(Event.from_bytes(pdu))
def __init__(self, event: Event) -> None:
super().__init__(
PduId.REGISTER_NOTIFICATION,
bytes(event),
)
self.event = event
def __str__(self) -> str:
return self.to_string(
{
"event": str(self.event),
}
def __post_init__(self) -> None:
self._payload = bytes([self.capability_id, len(self.capabilities)]) + b''.join(
bytes(capability) for capability in self.capabilities
)
# -----------------------------------------------------------------------------
@Response.register
@dataclass
class MediaAttribute:
attribute_id: MediaAttributeId
character_set_id: CharacterSetId
attribute_value: str
class GetPlayStatusResponse(Response):
pdu_id = PduId.GET_PLAY_STATUS
song_length: int = field(metadata=hci.metadata(">4"))
song_position: int = field(metadata=hci.metadata(">4"))
play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1))
# -----------------------------------------------------------------------------
@Response.register
@dataclass
class GetElementAttributesResponse(Response):
pdu_id = PduId.GET_ELEMENT_ATTRIBUTES
attributes: Sequence[MediaAttribute] = field(
metadata=hci.metadata(
MediaAttribute.parse_from_bytes, list_begin=True, list_end=True
)
)
# -----------------------------------------------------------------------------
@Response.register
@dataclass
class SetAbsoluteVolumeResponse(Response):
pdu_id = PduId.SET_ABSOLUTE_VOLUME
volume: int = field(metadata=hci.metadata(1))
# -----------------------------------------------------------------------------
@Response.register
@dataclass
class RegisterNotificationResponse(Response):
pdu_id = PduId.REGISTER_NOTIFICATION
event: Event = field(
metadata=hci.metadata(
lambda data, offset: (len(data), Event.from_bytes(data[offset:]))
)
)
# -----------------------------------------------------------------------------
@@ -710,10 +653,9 @@ class ApplicationSetting:
# -----------------------------------------------------------------------------
@dataclass
class Event:
event_id: EventId = field(init=False)
_pdu: Optional[bytes] = field(init=False, default=None)
event_id: EventId
_pdu: Optional[bytes] = None
_Event = TypeVar('_Event', bound='Event')
subclasses: ClassVar[dict[int, type[Event]]] = {}
@@ -740,9 +682,6 @@ class Event:
)
return self._pdu
def __repr__(self) -> str:
return str(self)
# -----------------------------------------------------------------------------
@dataclass
@@ -871,7 +810,7 @@ class Delegate:
class Error(Exception):
"""The delegate method failed, with a specified status code."""
def __init__(self, status_code: Protocol.StatusCode) -> None:
def __init__(self, status_code: StatusCode) -> None:
self.status_code = status_code
supported_events: list[EventId]
@@ -913,30 +852,6 @@ class Protocol(utils.EventEmitter):
CONTINUE = 0b10
END = 0b11
class StatusCode(utils.OpenIntEnum):
INVALID_COMMAND = 0x00
INVALID_PARAMETER = 0x01
PARAMETER_CONTENT_ERROR = 0x02
INTERNAL_ERROR = 0x03
OPERATION_COMPLETED = 0x04
UID_CHANGED = 0x05
INVALID_DIRECTION = 0x07
NOT_A_DIRECTORY = 0x08
DOES_NOT_EXIST = 0x09
INVALID_SCOPE = 0x0A
RANGE_OUT_OF_BOUNDS = 0x0B
FOLDER_ITEM_IS_NOT_PLAYABLE = 0x0C
MEDIA_IN_USE = 0x0D
NOW_PLAYING_LIST_FULL = 0x0E
SEARCH_NOT_SUPPORTED = 0x0F
SEARCH_IN_PROGRESS = 0x10
INVALID_PLAYER_ID = 0x11
PLAYER_NOT_BROWSABLE = 0x12
PLAYER_NOT_ADDRESSED = 0x13
NO_VALID_SEARCH_RESULTS = 0x14
NO_AVAILABLE_PLAYERS = 0x15
ADDRESSED_PLAYER_CHANGED = 0x16
class InvalidPidError(Exception):
"""A response frame with ipid==1 was received."""
@@ -1111,7 +1026,7 @@ class Protocol(utils.EventEmitter):
self.send_rejected_avrcp_response(
transaction_label,
command.pdu_id,
Protocol.StatusCode.INTERNAL_ERROR,
StatusCode.INTERNAL_ERROR,
)
utils.AsyncRunner.spawn(call())
@@ -1146,7 +1061,7 @@ class Protocol(utils.EventEmitter):
GetElementAttributesCommand(element_identifier, attribute_ids),
)
response = self._check_response(response_context, GetElementAttributesResponse)
return response.attributes
return list(response.attributes)
async def monitor_events(
self, event_id: EventId, playback_interval: int = 0
@@ -1511,12 +1426,12 @@ class Protocol(utils.EventEmitter):
# TODO: check that this is the right way to respond in this case.
logger.debug("unsupported PDU ID")
self.send_rejected_avrcp_response(
transaction_label, pdu_id, self.StatusCode.INVALID_PARAMETER
transaction_label, pdu_id, StatusCode.INVALID_PARAMETER
)
else:
logger.debug("unsupported command type")
self.send_rejected_avrcp_response(
transaction_label, pdu_id, self.StatusCode.INVALID_COMMAND
transaction_label, pdu_id, StatusCode.INVALID_COMMAND
)
self.receive_command_state = None
@@ -1541,9 +1456,9 @@ class Protocol(utils.EventEmitter):
# more appropriate.
response: Optional[Response] = None
if response_code == avc.ResponseFrame.ResponseCode.REJECTED:
response = RejectedResponse.from_bytes(pdu_id, pdu)
response = RejectedResponse.from_bytes(pdu_id=pdu_id, pdu=pdu)
elif response_code == avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED:
response = NotImplementedResponse.from_bytes(pdu_id, pdu)
response = NotImplementedResponse.from_bytes(pdu_id=pdu_id, pdu=pdu)
elif response_code in (
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
avc.ResponseFrame.ResponseCode.INTERIM,
@@ -1551,15 +1466,15 @@ class Protocol(utils.EventEmitter):
avc.ResponseFrame.ResponseCode.ACCEPTED,
):
if pdu_id == PduId.GET_CAPABILITIES:
response = GetCapabilitiesResponse.from_bytes(pdu)
response = GetCapabilitiesResponse.from_bytes(pdu=pdu)
elif pdu_id == PduId.GET_PLAY_STATUS:
response = GetPlayStatusResponse.from_bytes(pdu)
response = GetPlayStatusResponse.from_bytes(pdu=pdu)
elif pdu_id == PduId.GET_ELEMENT_ATTRIBUTES:
response = GetElementAttributesResponse.from_bytes(pdu)
response = GetElementAttributesResponse.from_bytes(pdu=pdu)
elif pdu_id == PduId.SET_ABSOLUTE_VOLUME:
response = SetAbsoluteVolumeResponse.from_bytes(pdu)
response = SetAbsoluteVolumeResponse.from_bytes(pdu=pdu)
elif pdu_id == PduId.REGISTER_NOTIFICATION:
response = RegisterNotificationResponse.from_bytes(pdu)
response = RegisterNotificationResponse.from_bytes(pdu=pdu)
else:
logger.debug("unexpected PDU ID")
pending_command.response.set_exception(
@@ -1655,10 +1570,8 @@ class Protocol(utils.EventEmitter):
# TODO: fragmentation
# Send the command.
logger.debug(f">>> AVRCP command PDU: {command}")
pdu = (
struct.pack(">BBH", command.pdu_id, 0, len(command.payload))
+ command.payload
)
payload = bytes(command)
pdu = struct.pack(">BBH", command.pdu_id, 0, len(payload)) + payload
command_frame = avc.VendorDependentCommandFrame(
command_type,
avc.Frame.SubunitType.PANEL,
@@ -1702,10 +1615,8 @@ class Protocol(utils.EventEmitter):
) -> None:
# TODO: fragmentation
logger.debug(f">>> AVRCP response PDU: {response}")
pdu = (
struct.pack(">BBH", response.pdu_id, 0, len(response.parameter))
+ response.parameter
)
payload = bytes(response)
pdu = struct.pack(">BBH", response.pdu_id, 0, len(payload)) + payload
response_frame = avc.VendorDependentResponseFrame(
response_code,
avc.Frame.SubunitType.PANEL,

View File

@@ -112,7 +112,14 @@ class SpecableEnum(utils.OpenIntEnum):
@classmethod
def type_spec(cls, size: int):
return {'size': size, 'mapper': lambda x: cls(x).name}
return {
'serializer': lambda x: x.to_bytes(size, 'little'),
'parser': lambda data, offset: (
offset + size,
cls(int.from_bytes(data[offset : offset + size], 'little')),
),
'mapper': lambda x: cls(x).name,
}
@classmethod
def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False):
@@ -123,7 +130,14 @@ class SpecableFlag(enum.IntFlag):
@classmethod
def type_spec(cls, size: int):
return {'size': size, 'mapper': lambda x: cls(x).name}
return {
'serializer': lambda x: x.to_bytes(size, 'little'),
'parser': lambda data, offset: (
offset + size,
cls(int.from_bytes(data[offset : offset + size], 'little')),
),
'mapper': lambda x: cls(x).name,
}
@classmethod
def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False):

View File

@@ -15,13 +15,15 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from collections.abc import Sequence
import pytest
from collections.abc import Sequence
from typing import Self
from bumble import avc, avctp, avrcp
from . import test_utils
@@ -35,7 +37,7 @@ class TwoDevices(test_utils.TwoDevices):
await self.protocols[1].connect(self.connections[0])
@classmethod
async def create_with_avdtp(cls) -> Self:
async def create_with_avdtp(cls) -> TwoDevices:
devices = await cls.create_with_connection()
await devices.setup_avdtp_connections()
return devices
@@ -44,7 +46,7 @@ class TwoDevices(test_utils.TwoDevices):
# -----------------------------------------------------------------------------
def test_GetPlayStatusCommand():
command = avrcp.GetPlayStatusCommand()
assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
@@ -52,13 +54,13 @@ def test_GetCapabilitiesCommand():
command = avrcp.GetCapabilitiesCommand(
capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.COMPANY_ID
)
assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
def test_SetAbsoluteVolumeCommand():
command = avrcp.SetAbsoluteVolumeCommand(volume=5)
assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
@@ -70,7 +72,7 @@ def test_GetElementAttributesCommand():
avrcp.MediaAttributeId.ARTIST_NAME,
],
)
assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
@@ -78,7 +80,7 @@ def test_RegisterNotificationCommand():
command = avrcp.RegisterNotificationCommand(
event_id=avrcp.EventId.ADDRESSED_PLAYER_CHANGED, playback_interval=123
)
assert avrcp.Command.from_bytes(command.pdu_id, command.payload) == command
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
@@ -144,6 +146,76 @@ def test_PlayerApplicationSettingChangedEvent():
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_RejectedResponse():
pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES
response = avrcp.RejectedResponse(
pdu_id=pdu_id,
status_code=avrcp.StatusCode.DOES_NOT_EXIST,
)
assert (
avrcp.RejectedResponse.from_bytes(pdu=bytes(response), pdu_id=pdu_id)
== response
)
# -----------------------------------------------------------------------------
def test_GetPlayStatusResponse():
response = avrcp.GetPlayStatusResponse(
song_length=1010, song_position=13, play_status=avrcp.PlayStatus.PAUSED
)
assert avrcp.GetPlayStatusResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_NotImplementedResponse():
pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES
response = avrcp.NotImplementedResponse(pdu_id=pdu_id, parameters=b'koasd')
assert (
avrcp.NotImplementedResponse.from_bytes(bytes(response), pdu_id=pdu_id)
== response
)
# -----------------------------------------------------------------------------
def test_GetCapabilitiesResponse():
response = avrcp.GetCapabilitiesResponse(
capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED,
capabilities=[
avrcp.EventId.ADDRESSED_PLAYER_CHANGED,
avrcp.EventId.BATT_STATUS_CHANGED,
],
)
assert avrcp.GetCapabilitiesResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_RegisterNotificationResponse():
response = avrcp.RegisterNotificationResponse(
event=avrcp.PlaybackPositionChangedEvent(playback_position=38)
)
assert avrcp.RegisterNotificationResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_SetAbsoluteVolumeResponse():
response = avrcp.SetAbsoluteVolumeResponse(volume=99)
assert avrcp.SetAbsoluteVolumeResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_GetElementAttributesResponse():
response = avrcp.GetElementAttributesResponse(
attributes=[
avrcp.MediaAttribute(
attribute_id=avrcp.MediaAttributeId.ALBUM_NAME,
attribute_value="White Album",
)
]
)
assert avrcp.GetElementAttributesResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_frame_parser():
with pytest.raises(ValueError):