diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 1aec633..2119e15 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -1290,6 +1290,10 @@ class InformBatteryStatusOfCtResponse(Response): @dataclass class GetPlayStatusResponse(Response): pdu_id = PduId.GET_PLAY_STATUS + + # TG doesn't support Song Length or Position. + UNAVAILABLE = 0xFFFFFFFF + song_length: int = field(metadata=hci.metadata(">4")) song_position: int = field(metadata=hci.metadata(">4")) play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1)) @@ -1615,10 +1619,12 @@ class Delegate: supported_events: list[EventId] volume: int + playback_status: PlayStatus def __init__(self, supported_events: Iterable[EventId] = ()) -> None: self.supported_events = list(supported_events) self.volume = 0 + self.playback_status = PlayStatus.STOPPED async def get_supported_events(self) -> list[EventId]: return self.supported_events @@ -1645,6 +1651,9 @@ class Delegate: "@@@ on_key_event: key=%s, pressed=%s, data=%s", key, pressed, data.hex() ) + async def get_playback_status(self) -> PlayStatus: + return self.playback_status + # TODO add other delegate methods @@ -2255,6 +2264,8 @@ class Protocol(utils.EventEmitter): self._on_set_absolute_volume_command(transaction_label, command) elif isinstance(command, RegisterNotificationCommand): self._on_register_notification_command(transaction_label, command) + elif isinstance(command, GetPlayStatusCommand): + self._on_get_play_status_command(transaction_label, command) else: # Not supported. # TODO: check that this is the right way to respond in this case. @@ -2509,6 +2520,26 @@ class Protocol(utils.EventEmitter): self._delegate_command(transaction_label, command, set_absolute_volume()) + def _on_get_play_status_command( + self, transaction_label: int, command: GetPlayStatusCommand + ) -> None: + logger.debug("<<< AVRCP command PDU: %s", command) + + async def get_playback_status() -> None: + play_status: PlayStatus = await self.delegate.get_playback_status() + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, + GetPlayStatusResponse( + # TODO: Delegate this. + song_length=GetPlayStatusResponse.UNAVAILABLE, + song_position=GetPlayStatusResponse.UNAVAILABLE, + play_status=play_status, + ), + ) + + self._delegate_command(transaction_label, command, get_playback_status()) + def _on_register_notification_command( self, transaction_label: int, command: RegisterNotificationCommand ) -> None: @@ -2524,28 +2555,27 @@ class Protocol(utils.EventEmitter): ) return + response: Response if command.event_id == EventId.VOLUME_CHANGED: volume = await self.delegate.get_absolute_volume() response = RegisterNotificationResponse(VolumeChangedEvent(volume)) - self.send_avrcp_response( - transaction_label, - avc.ResponseFrame.ResponseCode.INTERIM, - response, + elif command.event_id == EventId.PLAYBACK_STATUS_CHANGED: + playback_status = await self.delegate.get_playback_status() + response = RegisterNotificationResponse( + PlaybackStatusChangedEvent(play_status=playback_status) ) - self._register_notification_listener(transaction_label, command) + elif command.event_id == EventId.NOW_PLAYING_CONTENT_CHANGED: + playback_status = await self.delegate.get_playback_status() + response = RegisterNotificationResponse(NowPlayingContentChangedEvent()) + else: + logger.warning("Event supported but not handled %s", command.event_id) return - if command.event_id == EventId.PLAYBACK_STATUS_CHANGED: - # TODO: testing only, use delegate - response = RegisterNotificationResponse( - PlaybackStatusChangedEvent(play_status=PlayStatus.PLAYING) - ) - self.send_avrcp_response( - transaction_label, - avc.ResponseFrame.ResponseCode.INTERIM, - response, - ) - self._register_notification_listener(transaction_label, command) - return + self.send_avrcp_response( + transaction_label, + avc.ResponseFrame.ResponseCode.INTERIM, + response, + ) + self._register_notification_listener(transaction_label, command) self._delegate_command(transaction_label, command, register_notification()) diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index 3950472..c5a9dc2 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -541,6 +541,85 @@ async def test_passthrough_key_event_exception(): assert response.response == avc.ResponseFrame.ResponseCode.REJECTED +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_set_volume(): + two_devices = await TwoDevices.create_with_avdtp() + + for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1): + response = await two_devices.protocols[1].send_avrcp_command( + avc.CommandFrame.CommandType.CONTROL, avrcp.SetAbsoluteVolumeCommand(volume) + ) + assert isinstance(response.response, avrcp.SetAbsoluteVolumeResponse) + assert response.response.volume == volume + assert two_devices.protocols[0].delegate.volume == volume + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_get_playback_status(): + two_devices = await TwoDevices.create_with_avdtp() + + for status in avrcp.PlayStatus: + two_devices.protocols[0].delegate.playback_status = status + response = await two_devices.protocols[1].get_play_status() + assert response.play_status == status + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_volume(): + two_devices = await TwoDevices.create_with_avdtp() + + two_devices.protocols[1].delegate = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED]) + volume_iter = two_devices.protocols[0].monitor_volume() + + for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1): + # Interim + two_devices.protocols[1].delegate.volume = 0 + assert (await anext(volume_iter)) == 0 + # Changed + two_devices.protocols[1].notify_volume_changed(volume) + assert (await anext(volume_iter)) == volume + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_playback_status(): + two_devices = await TwoDevices.create_with_avdtp() + + two_devices.protocols[1].delegate = avrcp.Delegate( + [avrcp.EventId.PLAYBACK_STATUS_CHANGED] + ) + playback_status_iter = two_devices.protocols[0].monitor_playback_status() + + for playback_status in avrcp.PlayStatus: + # Interim + two_devices.protocols[1].delegate.playback_status = avrcp.PlayStatus.STOPPED + assert (await anext(playback_status_iter)) == avrcp.PlayStatus.STOPPED + # Changed + two_devices.protocols[1].notify_playback_status_changed(playback_status) + assert (await anext(playback_status_iter)) == playback_status + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_monitor_now_playing_content(): + two_devices = await TwoDevices.create_with_avdtp() + + two_devices.protocols[1].delegate = avrcp.Delegate( + [avrcp.EventId.NOW_PLAYING_CONTENT_CHANGED] + ) + now_playing_iter = two_devices.protocols[0].monitor_now_playing_content() + + for _ in range(2): + # Interim + await anext(now_playing_iter) + # Changed + two_devices.protocols[1].notify_now_playing_content_changed() + await anext(now_playing_iter) + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_frame_parser()