diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 7e839f4..2f51ee4 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -22,7 +22,14 @@ import enum import functools import logging import struct -from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence +from collections.abc import ( + AsyncIterator, + Awaitable, + Callable, + Iterable, + Sequence, + Mapping, +) from dataclasses import dataclass, field from typing import ClassVar, SupportsBytes, TypeVar @@ -1530,6 +1537,7 @@ class PlayerApplicationSettingChangedEvent(Event): | ApplicationSetting.ShuffleOnOffStatus | ApplicationSetting.ScanOnOffStatus | ApplicationSetting.GenericValue + | int ) = field(metadata=hci.metadata(1)) def __post_init__(self) -> None: @@ -1620,6 +1628,8 @@ class Delegate: supported_events: list[EventId] supported_company_ids: list[int] + supported_player_app_settings: dict[ApplicationSetting.AttributeId, list[int]] + player_app_settings: dict[ApplicationSetting.AttributeId, int] volume: int playback_status: PlayStatus @@ -1627,11 +1637,20 @@ class Delegate: self, supported_events: Iterable[EventId] = (), supported_company_ids: Iterable[int] = (AVRCP_BLUETOOTH_SIG_COMPANY_ID,), + supported_player_app_settings: ( + Mapping[ApplicationSetting.AttributeId, Sequence[int]] | None + ) = None, ) -> None: self.supported_company_ids = list(supported_company_ids) self.supported_events = list(supported_events) self.volume = 0 self.playback_status = PlayStatus.STOPPED + self.supported_player_app_settings = ( + {key: list(value) for key, value in supported_player_app_settings.items()} + if supported_player_app_settings + else {} + ) + self.player_app_settings = {} async def get_supported_events(self) -> list[EventId]: return self.supported_events @@ -1664,6 +1683,21 @@ class Delegate: async def get_playback_status(self) -> PlayStatus: return self.playback_status + async def get_supported_player_app_settings( + self, + ) -> dict[ApplicationSetting.AttributeId, list[int]]: + return self.supported_player_app_settings + + async def get_current_player_app_settings( + self, + ) -> dict[ApplicationSetting.AttributeId, int]: + return self.player_app_settings + + async def set_player_app_settings( + self, attribute: ApplicationSetting.AttributeId, value: int + ) -> None: + self.player_app_settings[attribute] = value + # TODO add other delegate methods @@ -1911,6 +1945,51 @@ class Protocol(utils.EventEmitter): response = self._check_response(response_context, GetElementAttributesResponse) return list(response.attributes) + async def list_supported_player_app_settings( + self, attribute_ids: Sequence[ApplicationSetting.AttributeId] = () + ) -> dict[ApplicationSetting.AttributeId, list[int]]: + """Get element attributes from the connected peer.""" + response_context = await self.send_avrcp_command( + avc.CommandFrame.CommandType.STATUS, + ListPlayerApplicationSettingAttributesCommand(), + ) + if not attribute_ids: + list_attribute_response = self._check_response( + response_context, ListPlayerApplicationSettingAttributesResponse + ) + attribute_ids = list_attribute_response.attribute + + supported_settings: dict[ApplicationSetting.AttributeId, list[int]] = {} + for attribute_id in attribute_ids: + response_context = await self.send_avrcp_command( + avc.CommandFrame.CommandType.STATUS, + ListPlayerApplicationSettingValuesCommand(attribute_id), + ) + list_value_response = self._check_response( + response_context, ListPlayerApplicationSettingValuesResponse + ) + supported_settings[attribute_id] = list(list_value_response.value) + + return supported_settings + + async def get_player_app_settings( + self, attribute_ids: Sequence[ApplicationSetting.AttributeId] + ) -> dict[ApplicationSetting.AttributeId, int]: + """Get element attributes from the connected peer.""" + response_context = await self.send_avrcp_command( + avc.CommandFrame.CommandType.STATUS, + GetCurrentPlayerApplicationSettingValueCommand(attribute_ids), + ) + response: GetCurrentPlayerApplicationSettingValueResponse = ( + self._check_response( + response_context, GetCurrentPlayerApplicationSettingValueResponse + ) + ) + return { + attribute_id: value + for attribute_id, value in zip(response.attribute, response.value) + } + async def monitor_events( self, event_id: EventId, playback_interval: int = 0 ) -> AsyncIterator[Event]: @@ -2290,6 +2369,22 @@ class Protocol(utils.EventEmitter): self._on_register_notification_command(transaction_label, command) case GetPlayStatusCommand(): self._on_get_play_status_command(transaction_label, command) + case ListPlayerApplicationSettingAttributesCommand(): + self._on_list_player_application_setting_attributes_command( + transaction_label, command + ) + case ListPlayerApplicationSettingValuesCommand(): + self._on_list_player_application_setting_values_command( + transaction_label, command + ) + case SetPlayerApplicationSettingValueCommand(): + self._on_set_player_application_setting_value_command( + transaction_label, command + ) + case GetCurrentPlayerApplicationSettingValueCommand(): + self._on_get_current_player_application_setting_value_command( + transaction_label, command + ) case _: # Not supported. # TODO: check that this is the right way to respond in this case. @@ -2573,6 +2668,101 @@ class Protocol(utils.EventEmitter): self._delegate_command(transaction_label, command, get_playback_status()) + def _on_list_player_application_setting_attributes_command( + self, + transaction_label: int, + command: ListPlayerApplicationSettingAttributesCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_supported_player_app_settings() -> None: + supported_settings = await self.delegate.get_supported_player_app_settings() + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + ListPlayerApplicationSettingAttributesResponse( + list(supported_settings.keys()) + ), + ) + + self._delegate_command( + transaction_label, command, get_supported_player_app_settings() + ) + + def _on_list_player_application_setting_values_command( + self, + transaction_label: int, + command: ListPlayerApplicationSettingValuesCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_supported_player_app_settings() -> None: + supported_settings = await self.delegate.get_supported_player_app_settings() + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + ListPlayerApplicationSettingValuesResponse( + supported_settings.get(command.attribute, []) + ), + ) + + self._delegate_command( + transaction_label, command, get_supported_player_app_settings() + ) + + def _on_get_current_player_application_setting_value_command( + self, + transaction_label: int, + command: GetCurrentPlayerApplicationSettingValueCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_supported_player_app_settings() -> None: + current_settings = await self.delegate.get_current_player_app_settings() + + if not all( + attribute in current_settings for attribute in command.attribute + ): + self.send_not_implemented_avrcp_response( + transaction_label, + PduId.GET_CURRENT_PLAYER_APPLICATION_SETTING_VALUE, + ) + return + + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + GetCurrentPlayerApplicationSettingValueResponse( + attribute=command.attribute, + value=[ + current_settings[attribute] for attribute in command.attribute + ], + ), + ) + + self._delegate_command( + transaction_label, command, get_supported_player_app_settings() + ) + + def _on_set_player_application_setting_value_command( + self, + transaction_label: int, + command: SetPlayerApplicationSettingValueCommand, + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def set_player_app_settings() -> None: + for attribute, value in zip(command.attribute, command.value): + await self.delegate.set_player_app_settings(attribute, value) + + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + SetPlayerApplicationSettingValueResponse(), + ) + + self._delegate_command(transaction_label, command, set_player_app_settings()) + def _on_register_notification_command( self, transaction_label: int, command: RegisterNotificationCommand ) -> None: @@ -2598,6 +2788,16 @@ class Protocol(utils.EventEmitter): event = PlaybackStatusChangedEvent(play_status=playback_status) case EventId.NOW_PLAYING_CONTENT_CHANGED: event = NowPlayingContentChangedEvent() + case EventId.PLAYER_APPLICATION_SETTING_CHANGED: + settings = await self.delegate.get_current_player_app_settings() + event = PlayerApplicationSettingChangedEvent( + [ + PlayerApplicationSettingChangedEvent.Setting( + attribute, value + ) + for attribute, value in settings.items() + ] + ) case _: logger.warning( "Event supported but not handled %s", command.event_id diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index 755ff17..ac164d4 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -118,8 +118,6 @@ class TwoDevices(test_utils.TwoDevices): scope=avrcp.Scope.NOW_PLAYING, uid=0, uid_counter=1, - start_item=0, - end_item=0, attributes=[avrcp.MediaAttributeId.DEFAULT_COVER_ART], ), avrcp.GetTotalNumberOfItemsCommand(scope=avrcp.Scope.NOW_PLAYING), @@ -581,6 +579,67 @@ async def test_get_supported_company_ids(): assert supported_company_ids == [avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID] +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_list_player_application_settings(): + two_devices: TwoDevices = await TwoDevices.create_with_avdtp() + + expected_settings = { + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: [ + avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT, + avrcp.ApplicationSetting.RepeatModeStatus.SINGLE_TRACK_REPEAT, + avrcp.ApplicationSetting.RepeatModeStatus.OFF, + ], + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: [ + avrcp.ApplicationSetting.ShuffleOnOffStatus.OFF, + avrcp.ApplicationSetting.ShuffleOnOffStatus.ALL_TRACKS_SHUFFLE, + avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE, + ], + } + delegate = two_devices.protocols[1].delegate = avrcp.Delegate( + supported_player_app_settings=expected_settings + ) + actual_settings = await two_devices.protocols[ + 0 + ].list_supported_player_app_settings() + assert actual_settings == expected_settings + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_get_set_player_app_settings(): + two_devices: TwoDevices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate + await two_devices.protocols[0].send_avrcp_command( + avc.CommandFrame.CommandType.CONTROL, + avrcp.SetPlayerApplicationSettingValueCommand( + attribute=[ + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF, + ], + value=[ + avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE, + ], + ), + ) + expected_settings = { + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE, + } + assert delegate.player_app_settings == expected_settings + + actual_settings = await two_devices.protocols[0].get_player_app_settings( + [ + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF, + ] + ) + assert actual_settings == expected_settings + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_monitor_volume(): @@ -635,6 +694,41 @@ async def test_monitor_now_playing_content(): await anext(now_playing_iter) +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_player_app_settings(): + two_devices = await TwoDevices.create_with_avdtp() + + delegate = two_devices.protocols[1].delegate = avrcp.Delegate( + supported_events=[avrcp.EventId.PLAYER_APPLICATION_SETTING_CHANGED] + ) + delegate.player_app_settings = { + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT + } + settings_iter = two_devices.protocols[0].monitor_player_application_settings() + + # Interim + interim = await anext(settings_iter) + assert interim[0].attribute_id == avrcp.ApplicationSetting.AttributeId.REPEAT_MODE + assert ( + interim[0].value_id + == avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT + ) + + # Changed + two_devices.protocols[1].notify_player_application_settings_changed( + [ + avrcp.PlayerApplicationSettingChangedEvent.Setting( + avrcp.ApplicationSetting.AttributeId.REPEAT_MODE, + avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT, + ) + ] + ) + changed = await anext(settings_iter) + assert changed[0].attribute_id == avrcp.ApplicationSetting.AttributeId.REPEAT_MODE + assert changed[0].value_id == avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_frame_parser()