From 14401910bb8ceadf55d292c32577ab453294c8e7 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Wed, 27 Aug 2025 15:47:46 +0800 Subject: [PATCH] AVRCP: Implement most commands and responses --- bumble/avrcp.py | 956 ++++++++++++++++++++++++++++++++++++++------ bumble/hci.py | 52 ++- tests/avrcp_test.py | 383 ++++++++++-------- 3 files changed, 1087 insertions(+), 304 deletions(-) diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 6aed349..784a295 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -19,6 +19,7 @@ from __future__ import annotations import asyncio import enum +import functools import logging import struct from dataclasses import dataclass, field @@ -41,6 +42,7 @@ from bumble import avc, avctp, core, hci, l2cap, utils from bumble.colors import color from bumble.device import Connection, Device from bumble.sdp import ( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, @@ -65,6 +67,15 @@ AVRCP_PID = 0x110E AVRCP_BLUETOOTH_SIG_COMPANY_ID = 0x001958 +_UINT64_BE_METADATA = { + 'parser': lambda data, offset: ( + offset + 8, + int.from_bytes(data[offset : offset + 8], byteorder='big'), + ), + 'serializer': lambda x: x.to_bytes(8, byteorder='big'), +} + + class PduId(utils.OpenIntEnum): GET_CAPABILITIES = 0x10 LIST_PLAYER_APPLICATION_SETTING_ATTRIBUTES = 0x11 @@ -84,7 +95,12 @@ class PduId(utils.OpenIntEnum): SET_ADDRESSED_PLAYER = 0x60 SET_BROWSED_PLAYER = 0x70 GET_FOLDER_ITEMS = 0x71 + CHANGE_PATH = 0x72 + GET_ITEM_ATTRIBUTES = 0x73 + PLAY_ITEM = 0x74 GET_TOTAL_NUMBER_OF_ITEMS = 0x75 + SEARCH = 0x80 + ADD_TO_NOW_PLAYING = 0x90 class CharacterSetId(hci.SpecableEnum): @@ -155,18 +171,49 @@ class StatusCode(hci.SpecableEnum): ADDRESSED_PLAYER_CHANGED = 0x16 +class Scope(hci.SpecableEnum): + MEDIA_PLAYER_LIST = 0x00 + MEDIA_PLAYER_VIRTUAL_FILESYSTEM = 0x01 + SEARCH = 0x02 + NOW_PLAYING = 0x03 + + +class ControllerFeatures(enum.IntFlag): + # fmt: off + CATEGORY_1 = 1 << 0 + CATEGORY_2 = 1 << 1 + CATEGORY_3 = 1 << 2 + CATEGORY_4 = 1 << 3 + SUPPORTS_BROWSING = 1 << 6 + SUPPORTS_COVER_ART_GET_IMAGE_PROPERTIES_FEATURE = 1 << 7 + SUPPORTS_COVER_ART_GET_IMAGE_FEATURE = 1 << 8 + SUPPORTS_COVER_ART_GET_LINKED_THUMBNAIL_FEATURE = 1 << 9 + + +class TargetFeatures(enum.IntFlag): + # fmt: off + CATEGORY_1 = 1 << 0 + CATEGORY_2 = 1 << 1 + CATEGORY_3 = 1 << 2 + CATEGORY_4 = 1 << 3 + PLAYER_APPLICATION_SETTINGS = 1 << 4 + GROUP_NAVIGATION = 1 << 5 + SUPPORTS_BROWSING = 1 << 6 + SUPPORTS_MULTIPLE_MEDIA_PLAYER_APPLICATIONS = 1 << 7 + SUPPORTS_COVER_ART = 1 << 8 + + # ----------------------------------------------------------------------------- def make_controller_service_sdp_records( service_record_handle: int, avctp_version: tuple[int, int] = (1, 4), avrcp_version: tuple[int, int] = (1, 6), - supported_features: int = 1, + supported_features: Union[int, ControllerFeatures] = 1, ) -> list[ServiceAttribute]: - # TODO: support a way to compute the supported features from a feature list avctp_version_int = avctp_version[0] << 8 | avctp_version[1] avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] - return [ + attributes = [ ServiceAttribute( SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, DataElement.unsigned_integer_32(service_record_handle), @@ -221,6 +268,31 @@ def make_controller_service_sdp_records( DataElement.unsigned_integer_16(supported_features), ), ] + if supported_features & ControllerFeatures.SUPPORTS_BROWSING: + attributes.append( + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16( + avctp.AVCTP_BROWSING_PSM + ), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), + DataElement.unsigned_integer_16(avctp_version_int), + ] + ), + ] + ), + ), + ) + return attributes # ----------------------------------------------------------------------------- @@ -228,13 +300,13 @@ def make_target_service_sdp_records( service_record_handle: int, avctp_version: tuple[int, int] = (1, 4), avrcp_version: tuple[int, int] = (1, 6), - supported_features: int = 0x23, + supported_features: Union[int, TargetFeatures] = 0x23, ) -> list[ServiceAttribute]: # TODO: support a way to compute the supported features from a feature list avctp_version_int = avctp_version[0] << 8 | avctp_version[1] avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] - return [ + attributes = [ ServiceAttribute( SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, DataElement.unsigned_integer_32(service_record_handle), @@ -288,56 +360,321 @@ def make_target_service_sdp_records( DataElement.unsigned_integer_16(supported_features), ), ] + if supported_features & TargetFeatures.SUPPORTS_BROWSING: + attributes.append( + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16( + avctp.AVCTP_BROWSING_PSM + ), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), + DataElement.unsigned_integer_16(avctp_version_int), + ] + ), + ] + ), + ), + ) + return attributes + + +# ----------------------------------------------------------------------------- +def _parse_string(data: bytes, offset: int, length_size: int) -> tuple[int, str]: + length = int.from_bytes( + data[offset : offset + length_size], byteorder='big', signed=False + ) + offset += length_size + encoded = data[offset : offset + length] + try: + decoded = encoded.decode("utf-8") + except UnicodeDecodeError: + # This can decode anything. + decoded = encoded.decode("latin1") + return offset + length, decoded + + +# ----------------------------------------------------------------------------- +def _serialize_string(value: str, length_size: int) -> bytes: + encoded = value.encode("utf-8") + return len(encoded).to_bytes(length_size, byteorder='big', signed=False) + encoded + + +# ----------------------------------------------------------------------------- +def _string_spec(length_size: int): + return { + 'parser': functools.partial(_parse_string, length_size=length_size), + 'serializer': functools.partial(_serialize_string, length_size=length_size), + } # ----------------------------------------------------------------------------- @dataclass -class MediaAttribute: - attribute_id: MediaAttributeId - attribute_value: str - character_set_id: CharacterSetId = CharacterSetId.UTF_8 +class MediaAttribute(hci.HCI_Dataclass_Object): + attribute_id: MediaAttributeId = field( + metadata=MediaAttributeId.type_metadata(4, byteorder='big') + ) + character_set_id: CharacterSetId = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + attribute_value: str = field(metadata=hci.metadata(_string_spec(2))) + + +# ----------------------------------------------------------------------------- +@dataclass +class SongAndPlayStatus: + song_length: int + song_position: int + play_status: PlayStatus + + +# ----------------------------------------------------------------------------- +class ApplicationSetting: + class AttributeId(hci.SpecableEnum): + EQUALIZER_ON_OFF = 0x01 + REPEAT_MODE = 0x02 + SHUFFLE_ON_OFF = 0x03 + SCAN_ON_OFF = 0x04 + + class EqualizerOnOffStatus(hci.SpecableEnum): + OFF = 0x01 + ON = 0x02 + + class RepeatModeStatus(hci.SpecableEnum): + OFF = 0x01 + SINGLE_TRACK_REPEAT = 0x02 + ALL_TRACK_REPEAT = 0x03 + GROUP_REPEAT = 0x04 + + class ShuffleOnOffStatus(hci.SpecableEnum): + OFF = 0x01 + ALL_TRACKS_SHUFFLE = 0x02 + GROUP_SHUFFLE = 0x03 + + class ScanOnOffStatus(hci.SpecableEnum): + OFF = 0x01 + ALL_TRACKS_SCAN = 0x02 + GROUP_SCAN = 0x03 + + class GenericValue(hci.SpecableEnum): + pass + + +# ----------------------------------------------------------------------------- +@dataclass +class AttributeValueEntry(hci.HCI_Dataclass_Object): + attribute_id: MediaAttributeId = field( + metadata=MediaAttributeId.type_metadata(4, byteorder='big') + ) + character_set_id: CharacterSetId = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + attribute_value: str = field(metadata=hci.metadata(_string_spec(2))) + + +# ----------------------------------------------------------------------------- +class BrowseableItem: + """6.10.2 Browseable items.""" + + class Type(hci.SpecableEnum): + MEDIA_PLAYER = 0x01 + FOLDER = 0x02 + MEDIA_ELEMENT = 0x03 + + item_type: ClassVar[Type] + _payload: Optional[bytes] = None + + subclasses: ClassVar[dict[Type, type[BrowseableItem]]] = {} + fields: ClassVar[hci.Fields] = () @classmethod - def _decode_attribute_value( - cls, value: bytes, character_set: CharacterSetId - ) -> str: - try: - if character_set == CharacterSetId.UTF_8: - return value.decode("utf-8") - return value.decode("ascii") - except UnicodeDecodeError: - logger.warning(f"cannot decode string with bytes: {value.hex()}") - return value.hex() - - @classmethod - def parse_from_bytes(cls, pdu: bytes, offset: int) -> tuple[int, MediaAttribute]: - ( - attribute_id_int, - character_set_id_int, - attribute_value_length, - ) = struct.unpack_from(">IHH", pdu, offset) - attribute_value_bytes = pdu[offset + 8 : offset + 8 + attribute_value_length] - character_set_id = CharacterSetId(character_set_id_int) - return offset + 8 + attribute_value_length, cls( - attribute_id=MediaAttributeId(attribute_id_int), - character_set_id=character_set_id, - attribute_value=cls._decode_attribute_value( - attribute_value_bytes, character_set_id - ), + def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, BrowseableItem]: + item_type, length = struct.unpack_from('>BH', data, offset) + subclass = cls.subclasses[BrowseableItem.Type(item_type)] + instance = subclass( + **hci.HCI_Object.dict_from_bytes(data, offset + 3, subclass.fields) ) + instance._payload = data[3:] + return offset + length, instance def __bytes__(self) -> bytes: - attribute_value_bytes = self.attribute_value.encode("utf-8") + if self._payload is None: + self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields) return ( - struct.pack( - ">IHH", - int(self.attribute_id), - int(self.character_set_id), - len(attribute_value_bytes), - ) - + attribute_value_bytes + struct.pack('>BH', self.item_type, len(self._payload) + 3) + self._payload ) + _Item = TypeVar('_Item', bound='BrowseableItem') + + @classmethod + def item(cls, subclass: type[_Item]) -> type[_Item]: + cls.subclasses[subclass.item_type] = subclass + subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass) + return subclass + + +# ----------------------------------------------------------------------------- +@BrowseableItem.item +@dataclass +class MediaPlayerItem(BrowseableItem): + item_type = BrowseableItem.Type.MEDIA_PLAYER + + class MajorPlayerType(hci.SpecableFlag): + AUDIO = 0x01 + VIDEO = 0x02 + BROADCASTING_AUDIO = 0x04 + BROADCASTING_VIDEO = 0x08 + + class PlayerSubType(hci.SpecableFlag): + AUDIO_BOOK = 0x01 + PODCAST = 0x02 + + class Features(hci.SpecableFlag): + SELECT = 1 << 0 + UP = 1 << 1 + DOWN = 1 << 2 + LEFT = 1 << 3 + RIGHT = 1 << 4 + RIGHT_UP = 1 << 5 + RIGHT_DOWN = 1 << 6 + LEFT_UP = 1 << 7 + LEFT_DOWN = 1 << 8 + ROOT_MENU = 1 << 9 + SETUP_MENU = 1 << 10 + CONTENTS_MENU = 1 << 11 + FAVORITE_MENU = 1 << 12 + EXIT = 1 << 13 + NUM_0 = 1 << 14 + NUM_1 = 1 << 15 + NUM_2 = 1 << 16 + NUM_3 = 1 << 17 + NUM_4 = 1 << 18 + NUM_5 = 1 << 19 + NUM_6 = 1 << 20 + NUM_7 = 1 << 21 + NUM_8 = 1 << 22 + NUM_9 = 1 << 23 + DOT = 1 << 24 + ENTER = 1 << 25 + CLEAR = 1 << 26 + CHANNEL_UP = 1 << 27 + CHANNEL_DOWN = 1 << 28 + PREVIOUS_CHANNEL = 1 << 29 + SOUND_SELECT = 1 << 30 + INPUT_SELECT = 1 << 31 + DISPLAY_INFORMATION = 1 << 32 + HELP = 1 << 33 + PAGE_UP = 1 << 34 + PAGE_DOWN = 1 << 35 + POWER = 1 << 36 + VOLUME_UP = 1 << 37 + VOLUME_DOWN = 1 << 38 + MUTE = 1 << 39 + PLAY = 1 << 40 + STOP = 1 << 41 + PAUSE = 1 << 42 + RECORD = 1 << 43 + REWIND = 1 << 44 + FAST_FORWARD = 1 << 45 + EJECT = 1 << 46 + FORWARD = 1 << 47 + BACKWARD = 1 << 48 + ANGLE = 1 << 49 + SUBPICTURE = 1 << 50 + F1 = 1 << 51 + F2 = 1 << 52 + F3 = 1 << 53 + F4 = 1 << 54 + F5 = 1 << 55 + VENDOR_UNIQUE = 1 << 56 + BASIC_GROUP_NAVIGATION = 1 << 57 + ADVANCED_CONTROL_PLAYER = 1 << 58 + BROWSING = 1 << 59 + SEARCHING = 1 << 60 + ADD_TO_NOW_PLAYING = 1 << 61 + UI_DS_UNIQUE_IN_PLAYER_BROWSE_TREE = 1 << 62 + ONLY_BROWSABLE_WHEN_ADDRESSED = 1 << 63 + ONLY_SEARCHABLE_WHEN_ADDRESSED = 1 << 64 + NOW_PLAYING = 1 << 65 + UID_PERSISTENCY = 1 << 66 + NUMBER_OF_ITEMS = 1 << 67 + COVER_ART = 1 << 68 + + player_id: int = field(metadata=hci.metadata('>2')) + major_player_type: MajorPlayerType = field( + metadata=MajorPlayerType.type_metadata(1) + ) + player_sub_type: PlayerSubType = field( + metadata=PlayerSubType.type_metadata(4, byteorder='big') + ) + play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1)) + feature_bitmask: Features = field( + metadata=Features.type_metadata(16, byteorder='big') + ) + character_set_id: CharacterSetId = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + displayable_name: str = field(metadata=hci.metadata(_string_spec(2))) + + +# ----------------------------------------------------------------------------- +@BrowseableItem.item +@dataclass +class FolderItem(BrowseableItem): + item_type = BrowseableItem.Type.FOLDER + + class FolderType(hci.SpecableEnum): + MIXED = 0x00 + TITLES = 0x01 + ALBUMS = 0x02 + ARTISTS = 0x03 + GENRES = 0x04 + PLAYLISTS = 0x05 + YEARS = 0x06 + + class Playable(hci.SpecableEnum): + NOT_PLAYABLE = 0x00 + PLAYABLE = 0x01 + + folder_uid: int = field(metadata=_UINT64_BE_METADATA) + folder_type: FolderType = field(metadata=FolderType.type_metadata(1)) + is_playable: FolderType = field(metadata=Playable.type_metadata(1)) + character_set_id: CharacterSetId = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + displayable_name: str = field(metadata=hci.metadata(_string_spec(2))) + + +# ----------------------------------------------------------------------------- +@BrowseableItem.item +@dataclass +class MediaElementItem(BrowseableItem): + item_type = BrowseableItem.Type.MEDIA_ELEMENT + + class MediaType(hci.SpecableEnum): + AUDIO = 0x00 + VIDEO = 0x01 + + media_element_uid: int = field(metadata=_UINT64_BE_METADATA) + media_type: MediaType = field(metadata=MediaType.type_metadata(1)) + character_set_id: CharacterSetId = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + displayable_name: str = field(metadata=hci.metadata(_string_spec(2))) + attribute_value_entry_list: Sequence[AttributeValueEntry] = field( + metadata=hci.metadata( + AttributeValueEntry.parse_from_bytes, list_begin=True, list_end=True + ) + ) + # ----------------------------------------------------------------------------- class PduAssembler: @@ -439,6 +776,105 @@ class GetCapabilitiesCommand(Command): capability_id: CapabilityId = field(metadata=CapabilityId.type_metadata(1)) +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class ListPlayerApplicationSettingAttributesCommand(Command): + pdu_id = PduId.LIST_PLAYER_APPLICATION_SETTING_ATTRIBUTES + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class ListPlayerApplicationSettingValuesCommand(Command): + pdu_id = PduId.LIST_PLAYER_APPLICATION_SETTING_VALUES + + attribute: ApplicationSetting.AttributeId = field( + metadata=ApplicationSetting.AttributeId.type_metadata(1) + ) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class GetCurrentPlayerApplicationSettingValueCommand(Command): + pdu_id = PduId.GET_CURRENT_PLAYER_APPLICATION_SETTING_VALUE + + attribute: Sequence[ApplicationSetting.AttributeId] = field( + metadata=ApplicationSetting.AttributeId.type_metadata( + 1, list_begin=True, list_end=True + ) + ) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class SetPlayerApplicationSettingValueCommand(Command): + pdu_id = PduId.SET_PLAYER_APPLICATION_SETTING_VALUE + + attribute: Sequence[ApplicationSetting.AttributeId] = field( + metadata=ApplicationSetting.AttributeId.type_metadata(1, list_begin=True) + ) + value: Sequence[int] = field(metadata=hci.metadata(1, list_end=True)) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class GetPlayerApplicationSettingAttributeTextCommand(Command): + pdu_id = PduId.GET_PLAYER_APPLICATION_SETTING_ATTRIBUTE_TEXT + + attribute: Sequence[ApplicationSetting.AttributeId] = field( + metadata=ApplicationSetting.AttributeId.type_metadata( + 1, list_begin=True, list_end=True + ) + ) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class GetPlayerApplicationSettingValueTextCommand(Command): + pdu_id = PduId.GET_PLAYER_APPLICATION_SETTING_VALUE_TEXT + + attribute: ApplicationSetting.AttributeId = field( + metadata=ApplicationSetting.AttributeId.type_metadata(1) + ) + value: Sequence[int] = field( + metadata=hci.metadata(1, list_begin=True, list_end=True) + ) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class InformDisplayableCharacterSetCommand(Command): + pdu_id = PduId.INFORM_DISPLAYABLE_CHARACTER_SET + + character_set_id: Sequence[CharacterSetId] = field( + metadata=CharacterSetId.type_metadata( + 2, list_begin=True, list_end=True, byteorder='big' + ) + ) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class InformBatteryStatusOfCtCommand(Command): + pdu_id = PduId.INFORM_BATTERY_STATUS_OF_CT + + class BatteryStatus(hci.SpecableEnum): + NORMAL = 0x00 + WARNING = 0x01 + CRITICAL = 0x02 + EXTERNAL = 0x03 + FULL_CHARGE = 0x04 + + battery_status: BatteryStatus = field(metadata=BatteryStatus.type_metadata(1)) + + # ----------------------------------------------------------------------------- @Command.command @dataclass @@ -452,19 +888,11 @@ class GetPlayStatusCommand(Command): class GetElementAttributesCommand(Command): pdu_id = PduId.GET_ELEMENT_ATTRIBUTES - identifier: int = field( - metadata=hci.metadata( - { - 'parser': lambda data, offset: ( - offset + 8, - int.from_bytes(data[offset : offset + 8], byteorder='big'), - ), - 'serializer': lambda x: x.to_bytes(8, byteorder='big'), - } - ) - ) + identifier: int = field(metadata=hci.metadata(_UINT64_BE_METADATA)) attribute_ids: Sequence[MediaAttributeId] = field( - metadata=MediaAttributeId.type_metadata(1, list_begin=True, list_end=True) + metadata=MediaAttributeId.type_metadata( + 4, list_begin=True, list_end=True, byteorder='big' + ) ) @@ -488,18 +916,131 @@ class RegisterNotificationCommand(Command): playback_interval: int = field(metadata=hci.metadata('>4')) +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class SetAddressedPlayerCommand(Command): + pdu_id = PduId.SET_ADDRESSED_PLAYER + + player_id: int = field(metadata=hci.metadata('>2')) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class SetBrowsedPlayerCommand(Command): + pdu_id = PduId.SET_BROWSED_PLAYER + + player_id: int = field(metadata=hci.metadata('>2')) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class GetFolderItemsCommand(Command): + pdu_id = PduId.GET_FOLDER_ITEMS + + scope: Scope = field(metadata=Scope.type_metadata(1)) + start_item: int = field(metadata=hci.metadata('>4')) + end_item: int = field(metadata=hci.metadata('>4')) + # When attributes is empty, all attributes will be requested. + attributes: Sequence[MediaAttributeId] = field( + metadata=MediaAttributeId.type_metadata( + 4, list_begin=True, list_end=True, byteorder='big' + ) + ) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class ChangePathCommand(Command): + pdu_id = PduId.CHANGE_PATH + + class Direction(hci.SpecableEnum): + UP = 0 + DOWN = 1 + + uid_counter: int = field(metadata=hci.metadata('>2')) + direction: Direction = field(metadata=Direction.type_metadata(1)) + folder_uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA)) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class GetItemAttributesCommand(Command): + pdu_id = PduId.GET_ITEM_ATTRIBUTES + + scope: Scope = field(metadata=Scope.type_metadata(1)) + uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA)) + uid_counter: int = field(metadata=hci.metadata('>2')) + start_item: int = field(metadata=hci.metadata('>4')) + end_item: int = field(metadata=hci.metadata('>4')) + # When attributes is empty, all attributes will be requested. + attributes: Sequence[MediaAttributeId] = field( + metadata=MediaAttributeId.type_metadata(1, list_begin=True, list_end=True) + ) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class GetTotalNumberOfItemsCommand(Command): + pdu_id = PduId.GET_TOTAL_NUMBER_OF_ITEMS + + scope: Scope = field(metadata=Scope.type_metadata(1)) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class SearchCommand(Command): + pdu_id = PduId.SEARCH + + character_set_id: CharacterSetId = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + search_string: str = field(metadata=hci.metadata(_string_spec(2))) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class PlayItemCommand(Command): + pdu_id = PduId.PLAY_ITEM + + scope: Scope = field(metadata=Scope.type_metadata(1)) + uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA)) + uid_counter: int = field(metadata=hci.metadata('>2')) + + +# ----------------------------------------------------------------------------- +@Command.command +@dataclass +class AddToNowPlayingCommand(Command): + pdu_id = PduId.ADD_TO_NOW_PLAYING + + scope: Scope = field(metadata=Scope.type_metadata(1)) + uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA)) + uid_counter: int = field(metadata=hci.metadata('>2')) + + # ----------------------------------------------------------------------------- class Response: pdu_id: PduId _payload: Optional[bytes] = None fields: ClassVar[hci.Fields] = () + subclasses: ClassVar[dict[PduId, type[Response]]] = {} _Response = TypeVar('_Response', bound='Response') @classmethod - def register(cls, subclass: type[_Response]) -> type[_Response]: + def response(cls, subclass: type[_Response]) -> type[_Response]: subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass) + if pdu_id := getattr(subclass, 'pdu_id', None): + cls.subclasses[pdu_id] = subclass return subclass def __bytes__(self) -> bytes: @@ -508,32 +1049,44 @@ class Response: return self._payload @classmethod - def from_bytes(cls, pdu: bytes, pdu_id: Optional[PduId] = None) -> Response: - kwargs = hci.HCI_Object.dict_from_bytes(pdu, 0, cls.fields) - if pdu_id is not None: - kwargs['pdu_id'] = pdu_id - instance = cls(**kwargs) - instance._payload = pdu + def from_bytes(cls, pdu: bytes, pdu_id: PduId) -> Response: + if not (subclass := cls.subclasses.get(pdu_id)): + raise core.InvalidArgumentError(f"Unimplemented packet {pdu_id.name}") + return subclass.from_parameters(pdu) + + @classmethod + def from_parameters(cls, parameters: bytes) -> Response: + instance = cls(**hci.HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) + instance._payload = parameters return instance # ----------------------------------------------------------------------------- -@Response.register +@Response.response @dataclass class RejectedResponse(Response): pdu_id: PduId status_code: StatusCode = field(metadata=StatusCode.type_metadata(1)) + @classmethod + def from_bytes(cls, pdu: bytes, pdu_id: PduId) -> Response: + return cls(pdu_id=pdu_id, status_code=StatusCode(pdu[0])) + # ----------------------------------------------------------------------------- -@Response.register +@Response.response @dataclass class NotImplementedResponse(Response): pdu_id: PduId parameters: bytes = field(metadata=hci.metadata('*')) + @classmethod + def from_bytes(cls, pdu: bytes, pdu_id: PduId) -> Response: + return cls(pdu_id=pdu_id, parameters=pdu) + # ----------------------------------------------------------------------------- +@Response.response @dataclass class GetCapabilitiesResponse(Response): pdu_id = PduId.GET_CAPABILITIES @@ -541,25 +1094,24 @@ class GetCapabilitiesResponse(Response): capabilities: Sequence[Union[SupportsBytes, bytes]] @classmethod - def from_bytes(cls, pdu: bytes, pdu_id: Optional[PduId] = None) -> Response: - del pdu_id # Unused. - if len(pdu) < 2: + def from_parameters(cls, parameters: bytes) -> Response: + if len(parameters) < 2: # Possibly a reject response. return cls(GetCapabilitiesCommand.CapabilityId(0), []) # Assume that the payloads all follow the same pattern: # - capability_id = GetCapabilitiesCommand.CapabilityId(pdu[0]) - capability_count = pdu[1] + capability_id = GetCapabilitiesCommand.CapabilityId(parameters[0]) + capability_count = parameters[1] capabilities: list[Union[SupportsBytes, bytes]] if capability_id == GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED: - capabilities = [EventId(pdu[2 + x]) for x in range(capability_count)] + capabilities = [EventId(parameters[2 + x]) for x in range(capability_count)] else: - capability_size = (len(pdu) - 2) // capability_count + capability_size = (len(parameters) - 2) // capability_count capabilities = [ - pdu[x : x + capability_size] - for x in range(2, len(pdu), capability_size) + parameters[x : x + capability_size] + for x in range(2, len(parameters), capability_size) ] return cls(capability_id, capabilities) @@ -571,7 +1123,96 @@ class GetCapabilitiesResponse(Response): # ----------------------------------------------------------------------------- -@Response.register +@Response.response +@dataclass +class ListPlayerApplicationSettingAttributesResponse(Response): + pdu_id = PduId.LIST_PLAYER_APPLICATION_SETTING_ATTRIBUTES + + attribute: Sequence[ApplicationSetting.AttributeId] = field( + metadata=ApplicationSetting.AttributeId.type_metadata( + 1, list_begin=True, list_end=True + ) + ) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class ListPlayerApplicationSettingValuesResponse(Response): + pdu_id = PduId.LIST_PLAYER_APPLICATION_SETTING_VALUES + + value: Sequence[int] = field( + metadata=hci.metadata(1, list_begin=True, list_end=True) + ) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class GetCurrentPlayerApplicationSettingValueResponse(Response): + pdu_id = PduId.GET_CURRENT_PLAYER_APPLICATION_SETTING_VALUE + + attribute: Sequence[ApplicationSetting.AttributeId] = field( + metadata=ApplicationSetting.AttributeId.type_metadata(1, list_begin=True) + ) + value: Sequence[int] = field(metadata=hci.metadata(1, list_end=True)) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class SetPlayerApplicationSettingValueResponse(Response): + pdu_id = PduId.SET_PLAYER_APPLICATION_SETTING_VALUE + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class GetPlayerApplicationSettingAttributeTextResponse(Response): + pdu_id = PduId.GET_PLAYER_APPLICATION_SETTING_ATTRIBUTE_TEXT + + attribute: Sequence[ApplicationSetting.AttributeId] = field( + metadata=ApplicationSetting.AttributeId.type_metadata(1, list_begin=True) + ) + character_set_id: Sequence[CharacterSetId] = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + attribute_string: Sequence[str] = field( + metadata=hci.metadata(_string_spec(1), list_end=True) + ) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class GetPlayerApplicationSettingValueTextResponse(Response): + pdu_id = PduId.GET_PLAYER_APPLICATION_SETTING_VALUE_TEXT + + value: Sequence[int] = field(metadata=hci.metadata(1, list_begin=True)) + character_set_id: Sequence[CharacterSetId] = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + attribute_string: Sequence[str] = field( + metadata=hci.metadata(_string_spec(1), list_end=True) + ) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class InformDisplayableCharacterSetResponse(Response): + pdu_id = PduId.INFORM_DISPLAYABLE_CHARACTER_SET + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class InformBatteryStatusOfCtResponse(Response): + pdu_id = PduId.INFORM_BATTERY_STATUS_OF_CT + + +# ----------------------------------------------------------------------------- +@Response.response @dataclass class GetPlayStatusResponse(Response): pdu_id = PduId.GET_PLAY_STATUS @@ -581,7 +1222,7 @@ class GetPlayStatusResponse(Response): # ----------------------------------------------------------------------------- -@Response.register +@Response.response @dataclass class GetElementAttributesResponse(Response): pdu_id = PduId.GET_ELEMENT_ATTRIBUTES @@ -593,7 +1234,7 @@ class GetElementAttributesResponse(Response): # ----------------------------------------------------------------------------- -@Response.register +@Response.response @dataclass class SetAbsoluteVolumeResponse(Response): pdu_id = PduId.SET_ABSOLUTE_VOLUME @@ -601,7 +1242,7 @@ class SetAbsoluteVolumeResponse(Response): # ----------------------------------------------------------------------------- -@Response.register +@Response.response @dataclass class RegisterNotificationResponse(Response): pdu_id = PduId.REGISTER_NOTIFICATION @@ -613,43 +1254,122 @@ class RegisterNotificationResponse(Response): # ----------------------------------------------------------------------------- +@Response.response @dataclass -class SongAndPlayStatus: - song_length: int - song_position: int - play_status: PlayStatus +class SetAddressedPlayerResponse(Response): + pdu_id = PduId.SET_ADDRESSED_PLAYER + + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) # ----------------------------------------------------------------------------- -class ApplicationSetting: - class AttributeId(hci.SpecableEnum): - EQUALIZER_ON_OFF = 0x01 - REPEAT_MODE = 0x02 - SHUFFLE_ON_OFF = 0x03 - SCAN_ON_OFF = 0x04 +@Response.response +@dataclass +class SetBrowsedPlayerResponse(Response): + pdu_id = PduId.SET_BROWSED_PLAYER - class EqualizerOnOffStatus(hci.SpecableEnum): - OFF = 0x01 - ON = 0x02 + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) + uid_counter: int = field(metadata=hci.metadata('>2')) + numbers_of_items: int = field(metadata=hci.metadata('>4')) + character_set_id: CharacterSetId = field( + metadata=CharacterSetId.type_metadata(2, byteorder='big') + ) + folder_names: Sequence[str] = field( + metadata=hci.metadata(_string_spec(2), list_begin=True, list_end=True) + ) - class RepeatModeStatus(hci.SpecableEnum): - OFF = 0x01 - SINGLE_TRACK_REPEAT = 0x02 - ALL_TRACK_REPEAT = 0x03 - GROUP_REPEAT = 0x04 - class ShuffleOnOffStatus(hci.SpecableEnum): - OFF = 0x01 - ALL_TRACKS_SHUFFLE = 0x02 - GROUP_SHUFFLE = 0x03 +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class GetFolderItemsResponse(Response): + pdu_id = PduId.GET_FOLDER_ITEMS - class ScanOnOffStatus(hci.SpecableEnum): - OFF = 0x01 - ALL_TRACKS_SCAN = 0x02 - GROUP_SCAN = 0x03 + status: StatusCode + uid_counter: int + items: Sequence[BrowseableItem] - class GenericValue(hci.SpecableEnum): - pass + @classmethod + def from_parameters(cls, parameters: bytes) -> Response: + status, uid_counter, count = struct.unpack_from('>BHH', parameters) + items: list[BrowseableItem] = [] + offset = 5 + for _ in range(count): + offset, item = BrowseableItem.parse_from_bytes(parameters, offset) + items.append(item) + instance = cls(status=StatusCode(status), uid_counter=uid_counter, items=items) + instance._payload = parameters + return instance + + def __post_init__(self) -> None: + if self._payload is None: + self._payload = struct.pack( + '>BHH', self.status, self.uid_counter, len(self.items) + ) + b''.join(map(bytes, self.items)) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class ChangePathResponse(Response): + pdu_id = PduId.CHANGE_PATH + + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) + number_of_items: int = field(metadata=hci.metadata('>4')) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class GetItemAttributesResponse(Response): + pdu_id = PduId.GET_ITEM_ATTRIBUTES + + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) + attribute_value_entry_list: Sequence[AttributeValueEntry] = field( + metadata=hci.metadata( + AttributeValueEntry.parse_from_bytes, list_begin=True, list_end=True + ) + ) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class GetTotalNumberOfItemsResponse(Response): + pdu_id = PduId.GET_TOTAL_NUMBER_OF_ITEMS + + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) + uid_counter: int = field(metadata=hci.metadata('>2')) + number_of_items: int = field(metadata=hci.metadata('>4')) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class SearchResponse(Response): + pdu_id = PduId.SEARCH + + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) + uid_counter: int = field(metadata=hci.metadata('>2')) + number_of_items: int = field(metadata=hci.metadata('>4')) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class PlayItemResponse(Response): + pdu_id = PduId.PLAY_ITEM + + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) + + +# ----------------------------------------------------------------------------- +@Response.response +@dataclass +class AddToNowPlayingResponse(Response): + pdu_id = PduId.ADD_TO_NOW_PLAYING + + status: StatusCode = field(metadata=StatusCode.type_metadata(1)) # ----------------------------------------------------------------------------- @@ -670,7 +1390,7 @@ class Event: @classmethod def from_bytes(cls, pdu: bytes) -> Event: if not (subclass := cls.subclasses.get(pdu[0])): - raise core.InvalidPacketError(f"Unimplemented PDU {pdu[0]}") + raise core.InvalidPacketError(f"Unimplemented Event {pdu[0]}") instance = subclass(**hci.HCI_Object.dict_from_bytes(pdu, 1, subclass.fields)) instance._pdu = pdu return instance @@ -1458,34 +2178,16 @@ class Protocol(utils.EventEmitter): # more appropriate. response: Optional[Response] = None if response_code == avc.ResponseFrame.ResponseCode.REJECTED: - response = RejectedResponse.from_bytes(pdu_id=pdu_id, pdu=pdu) + response = RejectedResponse(pdu_id=pdu_id, status_code=StatusCode(pdu[0])) elif response_code == avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED: - response = NotImplementedResponse.from_bytes(pdu_id=pdu_id, pdu=pdu) + response = NotImplementedResponse(pdu_id=pdu_id, parameters=pdu) elif response_code in ( avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, avc.ResponseFrame.ResponseCode.INTERIM, avc.ResponseFrame.ResponseCode.CHANGED, avc.ResponseFrame.ResponseCode.ACCEPTED, ): - if pdu_id == PduId.GET_CAPABILITIES: - response = GetCapabilitiesResponse.from_bytes(pdu=pdu) - elif pdu_id == PduId.GET_PLAY_STATUS: - response = GetPlayStatusResponse.from_bytes(pdu=pdu) - elif pdu_id == PduId.GET_ELEMENT_ATTRIBUTES: - response = GetElementAttributesResponse.from_bytes(pdu=pdu) - elif pdu_id == PduId.SET_ABSOLUTE_VOLUME: - response = SetAbsoluteVolumeResponse.from_bytes(pdu=pdu) - elif pdu_id == PduId.REGISTER_NOTIFICATION: - response = RegisterNotificationResponse.from_bytes(pdu=pdu) - else: - logger.debug("unexpected PDU ID") - pending_command.response.set_exception( - core.ProtocolError( - error_code=None, - error_namespace="avrcp", - details="unexpected PDU ID", - ) - ) + response = Response.from_bytes(pdu=pdu, pdu_id=PduId(pdu_id)) else: logger.debug("unexpected response code") pending_command.response.set_exception( diff --git a/bumble/hci.py b/bumble/hci.py index 9cfe81f..0faebca 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -26,7 +26,17 @@ import secrets import struct from collections.abc import Sequence from dataclasses import field -from typing import Any, Callable, ClassVar, Iterable, Optional, TypeVar, Union, cast +from typing import ( + Any, + Callable, + ClassVar, + Iterable, + Literal, + Optional, + TypeVar, + Union, + cast, +) from typing_extensions import Self @@ -111,37 +121,57 @@ def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int: class SpecableEnum(utils.OpenIntEnum): @classmethod - def type_spec(cls, size: int): + def type_spec(cls, size: int, byteorder: Literal['little', 'big'] = 'little'): return { - 'serializer': lambda x: x.to_bytes(size, 'little'), + 'serializer': lambda x: x.to_bytes(size, byteorder), 'parser': lambda data, offset: ( offset + size, - cls(int.from_bytes(data[offset : offset + size], 'little')), + cls(int.from_bytes(data[offset : offset + size], byteorder)), ), 'mapper': lambda x: cls(x).name, } @classmethod - def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False): - return metadata(cls.type_spec(size), list_begin=list_begin, list_end=list_end) + def type_metadata( + cls, + size: int, + list_begin: bool = False, + list_end: bool = False, + byteorder: Literal['little', 'big'] = 'little', + ): + return metadata( + cls.type_spec(size, byteorder), + list_begin=list_begin, + list_end=list_end, + ) class SpecableFlag(enum.IntFlag): @classmethod - def type_spec(cls, size: int): + def type_spec(cls, size: int, byteorder: Literal['little', 'big'] = 'little'): return { - 'serializer': lambda x: x.to_bytes(size, 'little'), + 'serializer': lambda x: x.to_bytes(size, byteorder), 'parser': lambda data, offset: ( offset + size, - cls(int.from_bytes(data[offset : offset + size], 'little')), + cls(int.from_bytes(data[offset : offset + size], byteorder)), ), 'mapper': lambda x: cls(x).name, } @classmethod - def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False): - return metadata(cls.type_spec(size), list_begin=list_begin, list_end=list_end) + def type_metadata( + cls, + size: int, + list_begin: bool = False, + list_end: bool = False, + byteorder: Literal['little', 'big'] = 'little', + ): + return metadata( + cls.type_spec(size, byteorder), + list_begin=list_begin, + list_end=list_end, + ) # ----------------------------------------------------------------------------- diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index 5769eb1..911c519 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -43,177 +43,228 @@ class TwoDevices(test_utils.TwoDevices): return devices -# ----------------------------------------------------------------------------- -def test_GetPlayStatusCommand(): - command = avrcp.GetPlayStatusCommand() - assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command - - -# ----------------------------------------------------------------------------- -def test_GetCapabilitiesCommand(): - command = avrcp.GetCapabilitiesCommand( - capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.COMPANY_ID - ) - assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command - - -# ----------------------------------------------------------------------------- -def test_SetAbsoluteVolumeCommand(): - command = avrcp.SetAbsoluteVolumeCommand(volume=5) - assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command - - -# ----------------------------------------------------------------------------- -def test_GetElementAttributesCommand(): - command = avrcp.GetElementAttributesCommand( - identifier=999, - attribute_ids=[ - avrcp.MediaAttributeId.ALBUM_NAME, - avrcp.MediaAttributeId.ARTIST_NAME, - ], - ) - assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command - - -# ----------------------------------------------------------------------------- -def test_RegisterNotificationCommand(): - command = avrcp.RegisterNotificationCommand( - event_id=avrcp.EventId.ADDRESSED_PLAYER_CHANGED, playback_interval=123 - ) - assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command - - -# ----------------------------------------------------------------------------- -def test_UidsChangedEvent(): - event = avrcp.UidsChangedEvent(uid_counter=7) - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_TrackChangedEvent(): - event = avrcp.TrackChangedEvent(identifier=b'12356') - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_VolumeChangedEvent(): - event = avrcp.VolumeChangedEvent(volume=9) - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_PlaybackStatusChangedEvent(): - event = avrcp.PlaybackStatusChangedEvent(play_status=avrcp.PlayStatus.PLAYING) - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_AddressedPlayerChangedEvent(): - event = avrcp.AddressedPlayerChangedEvent( - player=avrcp.AddressedPlayerChangedEvent.Player(player_id=9, uid_counter=10) - ) - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_AvailablePlayersChangedEvent(): - event = avrcp.AvailablePlayersChangedEvent() - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_PlaybackPositionChangedEvent(): - event = avrcp.PlaybackPositionChangedEvent(playback_position=1314) - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_NowPlayingContentChangedEvent(): - event = avrcp.NowPlayingContentChangedEvent() - assert avrcp.Event.from_bytes(bytes(event)) == event - - -# ----------------------------------------------------------------------------- -def test_PlayerApplicationSettingChangedEvent(): - event = avrcp.PlayerApplicationSettingChangedEvent( - player_application_settings=[ - avrcp.PlayerApplicationSettingChangedEvent.Setting( +@pytest.mark.parametrize( + "command,", + [ + avrcp.GetPlayStatusCommand(), + avrcp.GetCapabilitiesCommand( + capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.COMPANY_ID + ), + avrcp.SetAbsoluteVolumeCommand(volume=5), + avrcp.GetElementAttributesCommand( + identifier=999, + attribute_ids=[ + avrcp.MediaAttributeId.ALBUM_NAME, + avrcp.MediaAttributeId.ARTIST_NAME, + ], + ), + avrcp.RegisterNotificationCommand( + event_id=avrcp.EventId.ADDRESSED_PLAYER_CHANGED, playback_interval=123 + ), + avrcp.SearchCommand( + character_set_id=avrcp.CharacterSetId.UTF_8, search_string="Bumble!" + ), + avrcp.PlayItemCommand( + scope=avrcp.Scope.MEDIA_PLAYER_LIST, uid=0, uid_counter=1 + ), + avrcp.ListPlayerApplicationSettingAttributesCommand(), + avrcp.ListPlayerApplicationSettingValuesCommand( + attribute=avrcp.ApplicationSetting.AttributeId.REPEAT_MODE + ), + avrcp.GetCurrentPlayerApplicationSettingValueCommand( + attribute=[ avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF, + ] + ), + avrcp.SetPlayerApplicationSettingValueCommand( + attribute=[avrcp.ApplicationSetting.AttributeId.REPEAT_MODE], + value=[avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT], + ), + avrcp.GetPlayerApplicationSettingAttributeTextCommand( + attribute=[ + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF, + ] + ), + avrcp.GetPlayerApplicationSettingValueTextCommand( + attribute=avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + value=[ avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, - ) - ] - ) + avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT, + ], + ), + avrcp.InformDisplayableCharacterSetCommand( + character_set_id=[avrcp.CharacterSetId.UTF_8] + ), + avrcp.InformBatteryStatusOfCtCommand( + battery_status=avrcp.InformBatteryStatusOfCtCommand.BatteryStatus.NORMAL + ), + avrcp.SetAddressedPlayerCommand(player_id=1), + avrcp.SetBrowsedPlayerCommand(player_id=1), + avrcp.GetFolderItemsCommand( + scope=avrcp.Scope.NOW_PLAYING, + start_item=0, + end_item=1, + attributes=[avrcp.MediaAttributeId.ARTIST_NAME], + ), + avrcp.ChangePathCommand( + uid_counter=1, + direction=avrcp.ChangePathCommand.Direction.DOWN, + folder_uid=2, + ), + avrcp.GetItemAttributesCommand( + scope=avrcp.Scope.NOW_PLAYING, + uid=0, + uid_counter=1, + start_item=0, + end_item=0, + attributes=[avrcp.MediaAttributeId.DEFAULT_COVER_ART], + ), + avrcp.GetTotalNumberOfItemsCommand(scope=avrcp.Scope.NOW_PLAYING), + avrcp.AddToNowPlayingCommand( + scope=avrcp.Scope.NOW_PLAYING, uid=0, uid_counter=1 + ), + ], +) +def test_command(command: avrcp.Command): + assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command + + +@pytest.mark.parametrize( + "event,", + [ + avrcp.UidsChangedEvent(uid_counter=7), + avrcp.TrackChangedEvent(identifier=b'12356'), + avrcp.VolumeChangedEvent(volume=9), + avrcp.PlaybackStatusChangedEvent(play_status=avrcp.PlayStatus.PLAYING), + avrcp.AddressedPlayerChangedEvent( + player=avrcp.AddressedPlayerChangedEvent.Player(player_id=9, uid_counter=10) + ), + avrcp.AvailablePlayersChangedEvent(), + avrcp.PlaybackPositionChangedEvent(playback_position=1314), + avrcp.NowPlayingContentChangedEvent(), + avrcp.PlayerApplicationSettingChangedEvent( + player_application_settings=[ + avrcp.PlayerApplicationSettingChangedEvent.Setting( + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + ) + ] + ), + ], +) +def test_event(event: avrcp.Event): assert avrcp.Event.from_bytes(bytes(event)) == event -# ----------------------------------------------------------------------------- -def test_RejectedResponse(): - pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES - response = avrcp.RejectedResponse( - pdu_id=pdu_id, - status_code=avrcp.StatusCode.DOES_NOT_EXIST, - ) - assert ( - avrcp.RejectedResponse.from_bytes(pdu=bytes(response), pdu_id=pdu_id) - == response - ) - - -# ----------------------------------------------------------------------------- -def test_GetPlayStatusResponse(): - response = avrcp.GetPlayStatusResponse( - song_length=1010, song_position=13, play_status=avrcp.PlayStatus.PAUSED - ) - assert avrcp.GetPlayStatusResponse.from_bytes(bytes(response)) == response - - -# ----------------------------------------------------------------------------- -def test_NotImplementedResponse(): - pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES - response = avrcp.NotImplementedResponse(pdu_id=pdu_id, parameters=b'koasd') - assert ( - avrcp.NotImplementedResponse.from_bytes(bytes(response), pdu_id=pdu_id) - == response - ) - - -# ----------------------------------------------------------------------------- -def test_GetCapabilitiesResponse(): - response = avrcp.GetCapabilitiesResponse( - capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED, - capabilities=[ - avrcp.EventId.ADDRESSED_PLAYER_CHANGED, - avrcp.EventId.BATT_STATUS_CHANGED, - ], - ) - assert avrcp.GetCapabilitiesResponse.from_bytes(bytes(response)) == response - - -# ----------------------------------------------------------------------------- -def test_RegisterNotificationResponse(): - response = avrcp.RegisterNotificationResponse( - event=avrcp.PlaybackPositionChangedEvent(playback_position=38) - ) - assert avrcp.RegisterNotificationResponse.from_bytes(bytes(response)) == response - - -# ----------------------------------------------------------------------------- -def test_SetAbsoluteVolumeResponse(): - response = avrcp.SetAbsoluteVolumeResponse(volume=99) - assert avrcp.SetAbsoluteVolumeResponse.from_bytes(bytes(response)) == response - - -# ----------------------------------------------------------------------------- -def test_GetElementAttributesResponse(): - response = avrcp.GetElementAttributesResponse( - attributes=[ - avrcp.MediaAttribute( - attribute_id=avrcp.MediaAttributeId.ALBUM_NAME, - attribute_value="White Album", - ) - ] - ) - assert avrcp.GetElementAttributesResponse.from_bytes(bytes(response)) == response +@pytest.mark.parametrize( + "response,", + [ + avrcp.GetPlayStatusResponse( + song_length=1010, song_position=13, play_status=avrcp.PlayStatus.PAUSED + ), + avrcp.GetCapabilitiesResponse( + capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED, + capabilities=[ + avrcp.EventId.ADDRESSED_PLAYER_CHANGED, + avrcp.EventId.BATT_STATUS_CHANGED, + ], + ), + avrcp.RegisterNotificationResponse( + event=avrcp.PlaybackPositionChangedEvent(playback_position=38) + ), + avrcp.SetAbsoluteVolumeResponse(volume=99), + avrcp.GetElementAttributesResponse( + attributes=[ + avrcp.MediaAttribute( + attribute_id=avrcp.MediaAttributeId.ALBUM_NAME, + attribute_value="White Album", + character_set_id=avrcp.CharacterSetId.UTF_8, + ) + ] + ), + avrcp.ListPlayerApplicationSettingAttributesResponse( + attribute=[ + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF, + ] + ), + avrcp.ListPlayerApplicationSettingValuesResponse( + value=[ + avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT, + ] + ), + avrcp.GetCurrentPlayerApplicationSettingValueResponse( + attribute=[avrcp.ApplicationSetting.AttributeId.REPEAT_MODE], + value=[avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT], + ), + avrcp.SetPlayerApplicationSettingValueResponse(), + avrcp.GetPlayerApplicationSettingAttributeTextResponse( + attribute=[avrcp.ApplicationSetting.AttributeId.REPEAT_MODE], + character_set_id=[avrcp.CharacterSetId.UTF_8], + attribute_string=["Repeat"], + ), + avrcp.GetPlayerApplicationSettingValueTextResponse( + value=[avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT], + character_set_id=[avrcp.CharacterSetId.UTF_8], + attribute_string=["All track repeat"], + ), + avrcp.InformDisplayableCharacterSetResponse(), + avrcp.InformBatteryStatusOfCtResponse(), + avrcp.SetAddressedPlayerResponse(status=avrcp.StatusCode.OPERATION_COMPLETED), + avrcp.SetBrowsedPlayerResponse( + status=avrcp.StatusCode.OPERATION_COMPLETED, + uid_counter=1, + numbers_of_items=2, + character_set_id=avrcp.CharacterSetId.UTF_8, + folder_names=["folder1", "folder2"], + ), + avrcp.GetFolderItemsResponse( + status=avrcp.StatusCode.OPERATION_COMPLETED, + uid_counter=1, + items=[ + avrcp.MediaPlayerItem( + player_id=1, + major_player_type=avrcp.MediaPlayerItem.MajorPlayerType.AUDIO, + player_sub_type=avrcp.MediaPlayerItem.PlayerSubType.AUDIO_BOOK, + play_status=avrcp.PlayStatus.FWD_SEEK, + feature_bitmask=avrcp.MediaPlayerItem.Features.ADD_TO_NOW_PLAYING, + character_set_id=avrcp.CharacterSetId.UTF_8, + displayable_name="Woo", + ) + ], + ), + avrcp.ChangePathResponse( + status=avrcp.StatusCode.OPERATION_COMPLETED, number_of_items=2 + ), + avrcp.GetItemAttributesResponse( + status=avrcp.StatusCode.OPERATION_COMPLETED, + attribute_value_entry_list=[ + avrcp.AttributeValueEntry( + attribute_id=avrcp.MediaAttributeId.GENRE, + character_set_id=avrcp.CharacterSetId.UTF_8, + attribute_value="uuddlrlrabab", + ) + ], + ), + avrcp.GetTotalNumberOfItemsResponse( + status=avrcp.StatusCode.OPERATION_COMPLETED, + uid_counter=1, + number_of_items=2, + ), + avrcp.SearchResponse( + status=avrcp.StatusCode.OPERATION_COMPLETED, + uid_counter=1, + number_of_items=2, + ), + avrcp.PlayItemResponse(status=avrcp.StatusCode.OPERATION_COMPLETED), + avrcp.AddToNowPlayingResponse(status=avrcp.StatusCode.OPERATION_COMPLETED), + ], +) +def test_response(response: avrcp.Response): + assert avrcp.Response.from_bytes(bytes(response), response.pdu_id) == response # -----------------------------------------------------------------------------