Merge pull request #883 from zxzxwu/avrcp

AVRCP: More delegation and bugfix
This commit is contained in:
Josh Wu
2026-02-11 13:13:16 +08:00
committed by GitHub
3 changed files with 526 additions and 93 deletions

View File

@@ -22,7 +22,14 @@ import enum
import functools import functools
import logging import logging
import struct import struct
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence from collections.abc import (
AsyncIterator,
Awaitable,
Callable,
Iterable,
Sequence,
Mapping,
)
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import ClassVar, SupportsBytes, TypeVar from typing import ClassVar, SupportsBytes, TypeVar
@@ -1049,11 +1056,9 @@ class GetItemAttributesCommand(Command):
scope: Scope = field(metadata=Scope.type_metadata(1)) scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=_UINT64_BE_METADATA) uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2')) uid_counter: int = field(metadata=hci.metadata('>2'))
start_item: int = field(metadata=hci.metadata('>4'))
end_item: int = field(metadata=hci.metadata('>4'))
# When attributes is empty, all attributes will be requested. # When attributes is empty, all attributes will be requested.
attributes: Sequence[MediaAttributeId] = field( attributes: Sequence[MediaAttributeId] = field(
metadata=MediaAttributeId.type_metadata(1, list_begin=True, list_end=True) metadata=MediaAttributeId.type_metadata(4, list_begin=True, list_end=True)
) )
@@ -1512,7 +1517,9 @@ class PlaybackPositionChangedEvent(Event):
@dataclass @dataclass
class TrackChangedEvent(Event): class TrackChangedEvent(Event):
event_id = EventId.TRACK_CHANGED event_id = EventId.TRACK_CHANGED
identifier: bytes = field(metadata=hci.metadata('*')) NO_TRACK = 0xFFFFFFFFFFFFFFFF
uid: int = field(metadata=_UINT64_BE_METADATA)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -1536,16 +1543,19 @@ class PlayerApplicationSettingChangedEvent(Event):
def __post_init__(self) -> None: def __post_init__(self) -> None:
super().__post_init__() super().__post_init__()
if self.attribute_id == ApplicationSetting.AttributeId.EQUALIZER_ON_OFF: match self.attribute_id:
self.value_id = ApplicationSetting.EqualizerOnOffStatus(self.value_id) case ApplicationSetting.AttributeId.EQUALIZER_ON_OFF:
elif self.attribute_id == ApplicationSetting.AttributeId.REPEAT_MODE: self.value_id = ApplicationSetting.EqualizerOnOffStatus(
self.value_id = ApplicationSetting.RepeatModeStatus(self.value_id) self.value_id
elif self.attribute_id == ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: )
self.value_id = ApplicationSetting.ShuffleOnOffStatus(self.value_id) case ApplicationSetting.AttributeId.REPEAT_MODE:
elif self.attribute_id == ApplicationSetting.AttributeId.SCAN_ON_OFF: self.value_id = ApplicationSetting.RepeatModeStatus(self.value_id)
self.value_id = ApplicationSetting.ScanOnOffStatus(self.value_id) case ApplicationSetting.AttributeId.SHUFFLE_ON_OFF:
else: self.value_id = ApplicationSetting.ShuffleOnOffStatus(self.value_id)
self.value_id = ApplicationSetting.GenericValue(self.value_id) case ApplicationSetting.AttributeId.SCAN_ON_OFF:
self.value_id = ApplicationSetting.ScanOnOffStatus(self.value_id)
case _:
self.value_id = ApplicationSetting.GenericValue(self.value_id)
player_application_settings: Sequence[Setting] = field( player_application_settings: Sequence[Setting] = field(
metadata=hci.metadata(Setting.parse_from_bytes, list_begin=True, list_end=True) metadata=hci.metadata(Setting.parse_from_bytes, list_begin=True, list_end=True)
@@ -1619,6 +1629,8 @@ class Delegate:
supported_events: list[EventId] supported_events: list[EventId]
supported_company_ids: list[int] supported_company_ids: list[int]
supported_player_app_settings: dict[ApplicationSetting.AttributeId, list[int]]
player_app_settings: dict[ApplicationSetting.AttributeId, int]
volume: int volume: int
playback_status: PlayStatus playback_status: PlayStatus
@@ -1626,11 +1638,23 @@ class Delegate:
self, self,
supported_events: Iterable[EventId] = (), supported_events: Iterable[EventId] = (),
supported_company_ids: Iterable[int] = (AVRCP_BLUETOOTH_SIG_COMPANY_ID,), supported_company_ids: Iterable[int] = (AVRCP_BLUETOOTH_SIG_COMPANY_ID,),
supported_player_app_settings: (
Mapping[ApplicationSetting.AttributeId, Sequence[int]] | None
) = None,
) -> None: ) -> None:
self.supported_company_ids = list(supported_company_ids) self.supported_company_ids = list(supported_company_ids)
self.supported_events = list(supported_events) self.supported_events = list(supported_events)
self.volume = 0 self.volume = 0
self.playback_status = PlayStatus.STOPPED self.playback_status = PlayStatus.STOPPED
self.supported_player_app_settings = (
{key: list(value) for key, value in supported_player_app_settings.items()}
if supported_player_app_settings
else {}
)
self.player_app_settings = {}
self.uid_counter = 0
self.addressed_player_id = 0
self.current_track_uid = TrackChangedEvent.NO_TRACK
async def get_supported_events(self) -> list[EventId]: async def get_supported_events(self) -> list[EventId]:
return self.supported_events return self.supported_events
@@ -1663,6 +1687,38 @@ class Delegate:
async def get_playback_status(self) -> PlayStatus: async def get_playback_status(self) -> PlayStatus:
return self.playback_status return self.playback_status
async def get_supported_player_app_settings(
self,
) -> dict[ApplicationSetting.AttributeId, list[int]]:
return self.supported_player_app_settings
async def get_current_player_app_settings(
self,
) -> dict[ApplicationSetting.AttributeId, int]:
return self.player_app_settings
async def set_player_app_settings(
self, attribute: ApplicationSetting.AttributeId, value: int
) -> None:
self.player_app_settings[attribute] = value
async def play_item(self, scope: Scope, uid: int, uid_counter: int) -> None:
logger.debug(
"@@@ play_item: scope=%s, uid=%s, uid_counter=%s",
scope,
uid,
uid_counter,
)
async def get_uid_counter(self) -> int:
return self.uid_counter
async def get_addressed_player_id(self) -> int:
return self.addressed_player_id
async def get_current_track_uid(self) -> int:
return self.current_track_uid
# TODO add other delegate methods # TODO add other delegate methods
@@ -1910,6 +1966,51 @@ class Protocol(utils.EventEmitter):
response = self._check_response(response_context, GetElementAttributesResponse) response = self._check_response(response_context, GetElementAttributesResponse)
return list(response.attributes) return list(response.attributes)
async def list_supported_player_app_settings(
self, attribute_ids: Sequence[ApplicationSetting.AttributeId] = ()
) -> dict[ApplicationSetting.AttributeId, list[int]]:
"""Get element attributes from the connected peer."""
response_context = await self.send_avrcp_command(
avc.CommandFrame.CommandType.STATUS,
ListPlayerApplicationSettingAttributesCommand(),
)
if not attribute_ids:
list_attribute_response = self._check_response(
response_context, ListPlayerApplicationSettingAttributesResponse
)
attribute_ids = list_attribute_response.attribute
supported_settings: dict[ApplicationSetting.AttributeId, list[int]] = {}
for attribute_id in attribute_ids:
response_context = await self.send_avrcp_command(
avc.CommandFrame.CommandType.STATUS,
ListPlayerApplicationSettingValuesCommand(attribute_id),
)
list_value_response = self._check_response(
response_context, ListPlayerApplicationSettingValuesResponse
)
supported_settings[attribute_id] = list(list_value_response.value)
return supported_settings
async def get_player_app_settings(
self, attribute_ids: Sequence[ApplicationSetting.AttributeId]
) -> dict[ApplicationSetting.AttributeId, int]:
"""Get element attributes from the connected peer."""
response_context = await self.send_avrcp_command(
avc.CommandFrame.CommandType.STATUS,
GetCurrentPlayerApplicationSettingValueCommand(attribute_ids),
)
response: GetCurrentPlayerApplicationSettingValueResponse = (
self._check_response(
response_context, GetCurrentPlayerApplicationSettingValueResponse
)
)
return {
attribute_id: value
for attribute_id, value in zip(response.attribute, response.value)
}
async def monitor_events( async def monitor_events(
self, event_id: EventId, playback_interval: int = 0 self, event_id: EventId, playback_interval: int = 0
) -> AsyncIterator[Event]: ) -> AsyncIterator[Event]:
@@ -1961,13 +2062,13 @@ class Protocol(utils.EventEmitter):
async def monitor_track_changed( async def monitor_track_changed(
self, self,
) -> AsyncIterator[bytes]: ) -> AsyncIterator[int]:
"""Monitor Track changes from the connected peer.""" """Monitor Track changes from the connected peer."""
async for event in self.monitor_events(EventId.TRACK_CHANGED, 0): async for event in self.monitor_events(EventId.TRACK_CHANGED, 0):
if not isinstance(event, TrackChangedEvent): if not isinstance(event, TrackChangedEvent):
logger.warning("unexpected event class") logger.warning("unexpected event class")
continue continue
yield event.identifier yield event.uid
async def monitor_playback_position( async def monitor_playback_position(
self, playback_interval: int self, playback_interval: int
@@ -2060,11 +2161,9 @@ class Protocol(utils.EventEmitter):
"""Notify the connected peer of a Playback Status change.""" """Notify the connected peer of a Playback Status change."""
self.notify_event(PlaybackStatusChangedEvent(status)) self.notify_event(PlaybackStatusChangedEvent(status))
def notify_track_changed(self, identifier: bytes) -> None: def notify_track_changed(self, uid: int) -> None:
"""Notify the connected peer of a Track change.""" """Notify the connected peer of a Track change."""
if len(identifier) != 8: self.notify_event(TrackChangedEvent(uid))
raise core.InvalidArgumentError("identifier must be 8 bytes")
self.notify_event(TrackChangedEvent(identifier))
def notify_playback_position_changed(self, position: int) -> None: def notify_playback_position_changed(self, position: int) -> None:
"""Notify the connected peer of a Position change.""" """Notify the connected peer of a Position change."""
@@ -2280,21 +2379,40 @@ class Protocol(utils.EventEmitter):
): ):
# TODO: catch exceptions from delegates # TODO: catch exceptions from delegates
command = Command.from_bytes(pdu_id, pdu) command = Command.from_bytes(pdu_id, pdu)
if isinstance(command, GetCapabilitiesCommand): match command:
self._on_get_capabilities_command(transaction_label, command) case GetCapabilitiesCommand():
elif isinstance(command, SetAbsoluteVolumeCommand): self._on_get_capabilities_command(transaction_label, command)
self._on_set_absolute_volume_command(transaction_label, command) case SetAbsoluteVolumeCommand():
elif isinstance(command, RegisterNotificationCommand): self._on_set_absolute_volume_command(transaction_label, command)
self._on_register_notification_command(transaction_label, command) case RegisterNotificationCommand():
elif isinstance(command, GetPlayStatusCommand): self._on_register_notification_command(transaction_label, command)
self._on_get_play_status_command(transaction_label, command) case GetPlayStatusCommand():
else: self._on_get_play_status_command(transaction_label, command)
# Not supported. case ListPlayerApplicationSettingAttributesCommand():
# TODO: check that this is the right way to respond in this case. self._on_list_player_application_setting_attributes_command(
logger.debug("unsupported PDU ID") transaction_label, command
self.send_rejected_avrcp_response( )
transaction_label, pdu_id, StatusCode.INVALID_PARAMETER case ListPlayerApplicationSettingValuesCommand():
) self._on_list_player_application_setting_values_command(
transaction_label, command
)
case SetPlayerApplicationSettingValueCommand():
self._on_set_player_application_setting_value_command(
transaction_label, command
)
case GetCurrentPlayerApplicationSettingValueCommand():
self._on_get_current_player_application_setting_value_command(
transaction_label, command
)
case PlayItemCommand():
self._on_play_item_command(transaction_label, command)
case _:
# Not supported.
# TODO: check that this is the right way to respond in this case.
logger.debug("unsupported PDU ID")
self.send_rejected_avrcp_response(
transaction_label, pdu_id, StatusCode.INVALID_PARAMETER
)
else: else:
logger.debug("unsupported command type") logger.debug("unsupported command type")
self.send_rejected_avrcp_response( self.send_rejected_avrcp_response(
@@ -2322,26 +2440,29 @@ class Protocol(utils.EventEmitter):
# is Ok, but if/when more responses are supported, a lookup mechanism would be # is Ok, but if/when more responses are supported, a lookup mechanism would be
# more appropriate. # more appropriate.
response: Response | None = None response: Response | None = None
if response_code == avc.ResponseFrame.ResponseCode.REJECTED: match response_code:
response = RejectedResponse(pdu_id=pdu_id, status_code=StatusCode(pdu[0])) case avc.ResponseFrame.ResponseCode.REJECTED:
elif response_code == avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED: response = RejectedResponse(
response = NotImplementedResponse(pdu_id=pdu_id, parameters=pdu) pdu_id=pdu_id, status_code=StatusCode(pdu[0])
elif response_code in ( )
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, case avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED:
avc.ResponseFrame.ResponseCode.INTERIM, response = NotImplementedResponse(pdu_id=pdu_id, parameters=pdu)
avc.ResponseFrame.ResponseCode.CHANGED, case (
avc.ResponseFrame.ResponseCode.ACCEPTED, avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE
): | avc.ResponseFrame.ResponseCode.INTERIM
response = Response.from_bytes(pdu=pdu, pdu_id=PduId(pdu_id)) | avc.ResponseFrame.ResponseCode.CHANGED
else: | avc.ResponseFrame.ResponseCode.ACCEPTED
logger.debug("unexpected response code") ):
pending_command.response.set_exception( response = Response.from_bytes(pdu=pdu, pdu_id=PduId(pdu_id))
core.ProtocolError( case _:
error_code=None, logger.debug("unexpected response code")
error_namespace="avrcp", pending_command.response.set_exception(
details="unexpected response code", core.ProtocolError(
error_code=None,
error_namespace="avrcp",
details="unexpected response code",
)
) )
)
if response is None: if response is None:
self.recycle_pending_command(pending_command) self.recycle_pending_command(pending_command)
@@ -2512,22 +2633,18 @@ class Protocol(utils.EventEmitter):
async def get_supported_events() -> None: async def get_supported_events() -> None:
capabilities: Sequence[bytes | SupportsBytes] capabilities: Sequence[bytes | SupportsBytes]
if ( match command.capability_id:
command.capability_id case GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED:
== GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED capabilities = await self.delegate.get_supported_events()
): case GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED.COMPANY_ID:
capabilities = await self.delegate.get_supported_events() company_ids = await self.delegate.get_supported_company_ids()
elif ( capabilities = [
command.capability_id == GetCapabilitiesCommand.CapabilityId.COMPANY_ID company_id.to_bytes(3, 'big') for company_id in company_ids
): ]
company_ids = await self.delegate.get_supported_company_ids() case _:
capabilities = [ raise core.InvalidArgumentError(
company_id.to_bytes(3, 'big') for company_id in company_ids f"Unsupported capability: {command.capability_id}"
] )
else:
raise core.InvalidArgumentError(
f"Unsupported capability: {command.capability_id}"
)
self.send_avrcp_response( self.send_avrcp_response(
transaction_label, transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
@@ -2572,6 +2689,121 @@ class Protocol(utils.EventEmitter):
self._delegate_command(transaction_label, command, get_playback_status()) self._delegate_command(transaction_label, command, get_playback_status())
def _on_list_player_application_setting_attributes_command(
self,
transaction_label: int,
command: ListPlayerApplicationSettingAttributesCommand,
) -> None:
logger.debug("<<< AVRCP command PDU: %s", command)
async def get_supported_player_app_settings() -> None:
supported_settings = await self.delegate.get_supported_player_app_settings()
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
ListPlayerApplicationSettingAttributesResponse(
list(supported_settings.keys())
),
)
self._delegate_command(
transaction_label, command, get_supported_player_app_settings()
)
def _on_list_player_application_setting_values_command(
self,
transaction_label: int,
command: ListPlayerApplicationSettingValuesCommand,
) -> None:
logger.debug("<<< AVRCP command PDU: %s", command)
async def get_supported_player_app_settings() -> None:
supported_settings = await self.delegate.get_supported_player_app_settings()
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
ListPlayerApplicationSettingValuesResponse(
supported_settings.get(command.attribute, [])
),
)
self._delegate_command(
transaction_label, command, get_supported_player_app_settings()
)
def _on_get_current_player_application_setting_value_command(
self,
transaction_label: int,
command: GetCurrentPlayerApplicationSettingValueCommand,
) -> None:
logger.debug("<<< AVRCP command PDU: %s", command)
async def get_supported_player_app_settings() -> None:
current_settings = await self.delegate.get_current_player_app_settings()
if not all(
attribute in current_settings for attribute in command.attribute
):
self.send_not_implemented_avrcp_response(
transaction_label,
PduId.GET_CURRENT_PLAYER_APPLICATION_SETTING_VALUE,
)
return
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
GetCurrentPlayerApplicationSettingValueResponse(
attribute=command.attribute,
value=[
current_settings[attribute] for attribute in command.attribute
],
),
)
self._delegate_command(
transaction_label, command, get_supported_player_app_settings()
)
def _on_set_player_application_setting_value_command(
self,
transaction_label: int,
command: SetPlayerApplicationSettingValueCommand,
) -> None:
logger.debug("<<< AVRCP command PDU: %s", command)
async def set_player_app_settings() -> None:
for attribute, value in zip(command.attribute, command.value):
await self.delegate.set_player_app_settings(attribute, value)
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
SetPlayerApplicationSettingValueResponse(),
)
self._delegate_command(transaction_label, command, set_player_app_settings())
def _on_play_item_command(
self,
transaction_label: int,
command: PlayItemCommand,
) -> None:
logger.debug("<<< AVRCP command PDU: %s", command)
async def play_item() -> None:
await self.delegate.play_item(
scope=command.scope, uid=command.uid, uid_counter=command.uid_counter
)
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
PlayItemResponse(status=StatusCode.OPERATION_COMPLETED),
)
self._delegate_command(transaction_label, command, play_item())
def _on_register_notification_command( def _on_register_notification_command(
self, transaction_label: int, command: RegisterNotificationCommand self, transaction_label: int, command: RegisterNotificationCommand
) -> None: ) -> None:
@@ -2587,26 +2819,51 @@ class Protocol(utils.EventEmitter):
) )
return return
response: Response event: Event
if command.event_id == EventId.VOLUME_CHANGED: match command.event_id:
volume = await self.delegate.get_absolute_volume() case EventId.VOLUME_CHANGED:
response = RegisterNotificationResponse(VolumeChangedEvent(volume)) volume = await self.delegate.get_absolute_volume()
elif command.event_id == EventId.PLAYBACK_STATUS_CHANGED: event = VolumeChangedEvent(volume)
playback_status = await self.delegate.get_playback_status() case EventId.PLAYBACK_STATUS_CHANGED:
response = RegisterNotificationResponse( playback_status = await self.delegate.get_playback_status()
PlaybackStatusChangedEvent(play_status=playback_status) event = PlaybackStatusChangedEvent(play_status=playback_status)
) case EventId.NOW_PLAYING_CONTENT_CHANGED:
elif command.event_id == EventId.NOW_PLAYING_CONTENT_CHANGED: event = NowPlayingContentChangedEvent()
playback_status = await self.delegate.get_playback_status() case EventId.PLAYER_APPLICATION_SETTING_CHANGED:
response = RegisterNotificationResponse(NowPlayingContentChangedEvent()) settings = await self.delegate.get_current_player_app_settings()
else: event = PlayerApplicationSettingChangedEvent(
logger.warning("Event supported but not handled %s", command.event_id) [
return PlayerApplicationSettingChangedEvent.Setting(
attribute, value # type: ignore
)
for attribute, value in settings.items()
]
)
case EventId.AVAILABLE_PLAYERS_CHANGED:
event = AvailablePlayersChangedEvent()
case EventId.ADDRESSED_PLAYER_CHANGED:
event = AddressedPlayerChangedEvent(
AddressedPlayerChangedEvent.Player(
player_id=await self.delegate.get_addressed_player_id(),
uid_counter=await self.delegate.get_uid_counter(),
)
)
case EventId.UIDS_CHANGED:
event = UidsChangedEvent(await self.delegate.get_uid_counter())
case EventId.TRACK_CHANGED:
event = TrackChangedEvent(
await self.delegate.get_current_track_uid()
)
case _:
logger.warning(
"Event supported but not handled %s", command.event_id
)
return
self.send_avrcp_response( self.send_avrcp_response(
transaction_label, transaction_label,
avc.ResponseFrame.ResponseCode.INTERIM, avc.ResponseFrame.ResponseCode.INTERIM,
response, RegisterNotificationResponse(event),
) )
self._register_notification_listener(transaction_label, command) self._register_notification_listener(transaction_label, command)

View File

@@ -133,10 +133,10 @@ def on_avrcp_start(
utils.AsyncRunner.spawn(get_supported_events()) utils.AsyncRunner.spawn(get_supported_events())
async def monitor_track_changed() -> None: async def monitor_track_changed() -> None:
async for identifier in avrcp_protocol.monitor_track_changed(): async for uid in avrcp_protocol.monitor_track_changed():
print("TRACK CHANGED:", identifier.hex()) print("TRACK CHANGED:", hex(uid))
websocket_server.send_message( websocket_server.send_message(
{"type": "track-changed", "params": {"identifier": identifier.hex()}} {"type": "track-changed", "params": {"identifier": hex(uid)}}
) )
async def monitor_playback_status() -> None: async def monitor_playback_status() -> None:

View File

@@ -20,6 +20,7 @@ from __future__ import annotations
import asyncio import asyncio
import struct import struct
from collections.abc import Sequence from collections.abc import Sequence
from unittest import mock
import pytest import pytest
@@ -118,8 +119,6 @@ class TwoDevices(test_utils.TwoDevices):
scope=avrcp.Scope.NOW_PLAYING, scope=avrcp.Scope.NOW_PLAYING,
uid=0, uid=0,
uid_counter=1, uid_counter=1,
start_item=0,
end_item=0,
attributes=[avrcp.MediaAttributeId.DEFAULT_COVER_ART], attributes=[avrcp.MediaAttributeId.DEFAULT_COVER_ART],
), ),
avrcp.GetTotalNumberOfItemsCommand(scope=avrcp.Scope.NOW_PLAYING), avrcp.GetTotalNumberOfItemsCommand(scope=avrcp.Scope.NOW_PLAYING),
@@ -136,7 +135,7 @@ def test_command(command: avrcp.Command):
"event,", "event,",
[ [
avrcp.UidsChangedEvent(uid_counter=7), avrcp.UidsChangedEvent(uid_counter=7),
avrcp.TrackChangedEvent(identifier=b'12356'), avrcp.TrackChangedEvent(uid=12356),
avrcp.VolumeChangedEvent(volume=9), avrcp.VolumeChangedEvent(volume=9),
avrcp.PlaybackStatusChangedEvent(play_status=avrcp.PlayStatus.PLAYING), avrcp.PlaybackStatusChangedEvent(play_status=avrcp.PlayStatus.PLAYING),
avrcp.AddressedPlayerChangedEvent( avrcp.AddressedPlayerChangedEvent(
@@ -581,6 +580,87 @@ async def test_get_supported_company_ids():
assert supported_company_ids == [avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID] assert supported_company_ids == [avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_player_application_settings():
two_devices: TwoDevices = await TwoDevices.create_with_avdtp()
expected_settings = {
avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: [
avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT,
avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT,
avrcp.ApplicationSetting.RepeatModeStatus.SINGLE_TRACK_REPEAT,
avrcp.ApplicationSetting.RepeatModeStatus.OFF,
],
avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: [
avrcp.ApplicationSetting.ShuffleOnOffStatus.OFF,
avrcp.ApplicationSetting.ShuffleOnOffStatus.ALL_TRACKS_SHUFFLE,
avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE,
],
}
two_devices.protocols[1].delegate = avrcp.Delegate(
supported_player_app_settings=expected_settings
)
actual_settings = await two_devices.protocols[
0
].list_supported_player_app_settings()
assert actual_settings == expected_settings
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_set_player_app_settings():
two_devices: TwoDevices = await TwoDevices.create_with_avdtp()
delegate = two_devices.protocols[1].delegate
await two_devices.protocols[0].send_avrcp_command(
avc.CommandFrame.CommandType.CONTROL,
avrcp.SetPlayerApplicationSettingValueCommand(
attribute=[
avrcp.ApplicationSetting.AttributeId.REPEAT_MODE,
avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF,
],
value=[
avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT,
avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE,
],
),
)
expected_settings = {
avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT,
avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: avrcp.ApplicationSetting.ShuffleOnOffStatus.GROUP_SHUFFLE,
}
assert delegate.player_app_settings == expected_settings
actual_settings = await two_devices.protocols[0].get_player_app_settings(
[
avrcp.ApplicationSetting.AttributeId.REPEAT_MODE,
avrcp.ApplicationSetting.AttributeId.SHUFFLE_ON_OFF,
]
)
assert actual_settings == expected_settings
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_play_item():
two_devices: TwoDevices = await TwoDevices.create_with_avdtp()
delegate = two_devices.protocols[1].delegate
with mock.patch.object(delegate, delegate.play_item.__name__) as play_item_mock:
await two_devices.protocols[0].send_avrcp_command(
avc.CommandFrame.CommandType.CONTROL,
avrcp.PlayItemCommand(
scope=avrcp.Scope.MEDIA_PLAYER_LIST, uid=0, uid_counter=1
),
)
play_item_mock.assert_called_once_with(
scope=avrcp.Scope.MEDIA_PLAYER_LIST, uid=0, uid_counter=1
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_monitor_volume(): async def test_monitor_volume():
@@ -635,6 +715,102 @@ async def test_monitor_now_playing_content():
await anext(now_playing_iter) await anext(now_playing_iter)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_track_changed():
two_devices = await TwoDevices.create_with_avdtp()
delegate = two_devices.protocols[1].delegate = avrcp.Delegate(
[avrcp.EventId.TRACK_CHANGED]
)
delegate.current_track_uid = avrcp.TrackChangedEvent.NO_TRACK
track_iter = two_devices.protocols[0].monitor_track_changed()
# Interim
assert (await anext(track_iter)) == avrcp.TrackChangedEvent.NO_TRACK
# Changed
two_devices.protocols[1].notify_track_changed(1)
assert (await anext(track_iter)) == 1
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_uid_changed():
two_devices = await TwoDevices.create_with_avdtp()
delegate = two_devices.protocols[1].delegate = avrcp.Delegate(
[avrcp.EventId.UIDS_CHANGED]
)
delegate.uid_counter = 0
uid_iter = two_devices.protocols[0].monitor_uids()
# Interim
assert (await anext(uid_iter)) == 0
# Changed
two_devices.protocols[1].notify_uids_changed(1)
assert (await anext(uid_iter)) == 1
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_addressed_player():
two_devices = await TwoDevices.create_with_avdtp()
delegate = two_devices.protocols[1].delegate = avrcp.Delegate(
[avrcp.EventId.ADDRESSED_PLAYER_CHANGED]
)
delegate.uid_counter = 0
delegate.addressed_player_id = 0
addressed_player_iter = two_devices.protocols[0].monitor_addressed_player()
# Interim
assert (
await anext(addressed_player_iter)
) == avrcp.AddressedPlayerChangedEvent.Player(player_id=0, uid_counter=0)
# Changed
two_devices.protocols[1].notify_addressed_player_changed(
avrcp.AddressedPlayerChangedEvent.Player(player_id=1, uid_counter=1)
)
assert (
await anext(addressed_player_iter)
) == avrcp.AddressedPlayerChangedEvent.Player(player_id=1, uid_counter=1)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_player_app_settings():
two_devices = await TwoDevices.create_with_avdtp()
delegate = two_devices.protocols[1].delegate = avrcp.Delegate(
supported_events=[avrcp.EventId.PLAYER_APPLICATION_SETTING_CHANGED]
)
delegate.player_app_settings = {
avrcp.ApplicationSetting.AttributeId.REPEAT_MODE: avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT
}
settings_iter = two_devices.protocols[0].monitor_player_application_settings()
# Interim
interim = await anext(settings_iter)
assert interim[0].attribute_id == avrcp.ApplicationSetting.AttributeId.REPEAT_MODE
assert (
interim[0].value_id
== avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT
)
# Changed
two_devices.protocols[1].notify_player_application_settings_changed(
[
avrcp.PlayerApplicationSettingChangedEvent.Setting(
avrcp.ApplicationSetting.AttributeId.REPEAT_MODE,
avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT,
)
]
)
changed = await anext(settings_iter)
assert changed[0].attribute_id == avrcp.ApplicationSetting.AttributeId.REPEAT_MODE
assert changed[0].value_id == avrcp.ApplicationSetting.RepeatModeStatus.GROUP_REPEAT
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_frame_parser() test_frame_parser()