diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 37fffb1e..4663af7a 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -1521,6 +1521,12 @@ class Delegate: def __init__(self, status_code: StatusCode) -> None: self.status_code = status_code + class AvcError(Exception): + """The delegate AVC method failed, with a specified status code.""" + + def __init__(self, status_code: avc.ResponseFrame.ResponseCode) -> None: + self.status_code = status_code + supported_events: list[EventId] volume: int @@ -1543,6 +1549,16 @@ class Delegate: async def get_absolute_volume(self) -> int: return self.volume + async def on_key_event( + self, + key: avc.PassThroughFrame.OperationId, + pressed: bool, + data: bytes, + ) -> None: + logger.debug( + "@@@ on_key_event: key=%s, pressed=%s, data=%s", key, pressed, data.hex() + ) + # TODO add other delegate methods @@ -2052,16 +2068,28 @@ class Protocol(utils.EventEmitter): return if isinstance(command, avc.PassThroughCommandFrame): - # TODO: delegate - response = avc.PassThroughResponseFrame( - avc.ResponseFrame.ResponseCode.ACCEPTED, - command.subunit_type, - command.subunit_id, - command.state_flag, - command.operation_id, - command.operation_data, - ) - self.send_response(transaction_label, response) + + async def dispatch_key_event() -> None: + try: + await self.delegate.on_key_event( + command.operation_id, + command.state_flag == avc.PassThroughFrame.StateFlag.PRESSED, + command.operation_data, + ) + response_code = avc.ResponseFrame.ResponseCode.ACCEPTED + except Delegate.AvcError as error: + logger.exception("delegate method raised exception") + response_code = error.status_code + except Exception: + logger.exception("delegate method raised exception") + response_code = avc.ResponseFrame.ResponseCode.REJECTED + self.send_passthrough_response( + transaction_label=transaction_label, + command=command, + response_code=response_code, + ) + + utils.AsyncRunner.spawn(dispatch_key_event()) return # TODO handle other types diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index c7554f9d..d534c0fb 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- from __future__ import annotations +import asyncio import struct from collections.abc import Sequence @@ -436,6 +437,69 @@ async def test_get_supported_events(): assert supported_events == [avrcp.EventId.VOLUME_CHANGED] +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_passthrough_key_event(): + two_devices = await TwoDevices.create_with_avdtp() + + q = asyncio.Queue[tuple[avc.PassThroughFrame.OperationId, bool, bytes]]() + + class Delegate(avrcp.Delegate): + async def on_key_event( + self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes + ) -> None: + q.put_nowait((key, pressed, data)) + + two_devices.protocols[1].delegate = Delegate() + + for key, pressed in [ + (avc.PassThroughFrame.OperationId.PLAY, True), + (avc.PassThroughFrame.OperationId.PLAY, False), + (avc.PassThroughFrame.OperationId.PAUSE, True), + (avc.PassThroughFrame.OperationId.PAUSE, False), + ]: + await two_devices.protocols[0].send_key_event(key, pressed) + assert (await q.get()) == (key, pressed, b'') + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_passthrough_key_event_rejected(): + two_devices = await TwoDevices.create_with_avdtp() + + class Delegate(avrcp.Delegate): + async def on_key_event( + self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes + ) -> None: + raise avrcp.Delegate.AvcError(avc.ResponseFrame.ResponseCode.REJECTED) + + two_devices.protocols[1].delegate = Delegate() + + response = await two_devices.protocols[0].send_key_event( + avc.PassThroughFrame.OperationId.PLAY, True + ) + assert response.response == avc.ResponseFrame.ResponseCode.REJECTED + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_passthrough_key_event_exception(): + two_devices = await TwoDevices.create_with_avdtp() + + class Delegate(avrcp.Delegate): + async def on_key_event( + self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes + ) -> None: + raise Exception() + + two_devices.protocols[1].delegate = Delegate() + + response = await two_devices.protocols[0].send_key_event( + avc.PassThroughFrame.OperationId.PLAY, True + ) + assert response.response == avc.ResponseFrame.ResponseCode.REJECTED + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_frame_parser()