Merge pull request #767 from zxzxwu/avrcp

Migrate AVRCP packets to dataclasses
This commit is contained in:
zxzxwu
2025-09-01 13:26:56 +08:00
committed by GitHub
3 changed files with 618 additions and 653 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -112,7 +112,14 @@ class SpecableEnum(utils.OpenIntEnum):
@classmethod
def type_spec(cls, size: int):
return {'size': size, 'mapper': lambda x: cls(x).name}
return {
'serializer': lambda x: x.to_bytes(size, 'little'),
'parser': lambda data, offset: (
offset + size,
cls(int.from_bytes(data[offset : offset + size], 'little')),
),
'mapper': lambda x: cls(x).name,
}
@classmethod
def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False):
@@ -123,7 +130,14 @@ class SpecableFlag(enum.IntFlag):
@classmethod
def type_spec(cls, size: int):
return {'size': size, 'mapper': lambda x: cls(x).name}
return {
'serializer': lambda x: x.to_bytes(size, 'little'),
'parser': lambda data, offset: (
offset + size,
cls(int.from_bytes(data[offset : offset + size], 'little')),
),
'mapper': lambda x: cls(x).name,
}
@classmethod
def type_metadata(cls, size: int, list_begin: bool = False, list_end: bool = False):

View File

@@ -15,67 +15,210 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
from __future__ import annotations
import struct
from collections.abc import Sequence
import pytest
from bumble import avc, avctp, avrcp, controller, core, device, host, link
from bumble.transport import common
from bumble import avc, avctp, avrcp
from . import test_utils
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = link.LocalLink()
self.controllers = [
controller.Controller('C1', link=self.link, public_address=addresses[0]),
controller.Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
device.Device(
address=addresses[0],
host=host.Host(
self.controllers[0], common.AsyncPipeSink(self.controllers[0])
),
),
device.Device(
address=addresses[1],
host=host.Host(
self.controllers[1], common.AsyncPipeSink(self.controllers[1])
),
),
]
self.devices[0].classic_enabled = True
self.devices[1].classic_enabled = True
self.connections = [None, None]
self.protocols = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
async def setup_connections(self):
await self.devices[0].power_on()
await self.devices[1].power_on()
self.connections = await asyncio.gather(
self.devices[0].connect(
self.devices[1].public_address, core.PhysicalTransport.BR_EDR
),
self.devices[1].accept(self.devices[0].public_address),
)
class TwoDevices(test_utils.TwoDevices):
protocols: Sequence[avrcp.Protocol] = ()
async def setup_avdtp_connections(self):
self.protocols = [avrcp.Protocol(), avrcp.Protocol()]
self.protocols[0].listen(self.devices[1])
await self.protocols[1].connect(self.connections[0])
@classmethod
async def create_with_avdtp(cls) -> TwoDevices:
devices = await cls.create_with_connection()
await devices.setup_avdtp_connections()
return devices
# -----------------------------------------------------------------------------
def test_GetPlayStatusCommand():
command = avrcp.GetPlayStatusCommand()
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
def test_GetCapabilitiesCommand():
command = avrcp.GetCapabilitiesCommand(
capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.COMPANY_ID
)
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
def test_SetAbsoluteVolumeCommand():
command = avrcp.SetAbsoluteVolumeCommand(volume=5)
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
def test_GetElementAttributesCommand():
command = avrcp.GetElementAttributesCommand(
identifier=999,
attribute_ids=[
avrcp.MediaAttributeId.ALBUM_NAME,
avrcp.MediaAttributeId.ARTIST_NAME,
],
)
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
def test_RegisterNotificationCommand():
command = avrcp.RegisterNotificationCommand(
event_id=avrcp.EventId.ADDRESSED_PLAYER_CHANGED, playback_interval=123
)
assert avrcp.Command.from_bytes(command.pdu_id, bytes(command)) == command
# -----------------------------------------------------------------------------
def test_UidsChangedEvent():
event = avrcp.UidsChangedEvent(uid_counter=7)
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_TrackChangedEvent():
event = avrcp.TrackChangedEvent(identifier=b'12356')
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_VolumeChangedEvent():
event = avrcp.VolumeChangedEvent(volume=9)
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_PlaybackStatusChangedEvent():
event = avrcp.PlaybackStatusChangedEvent(play_status=avrcp.PlayStatus.PLAYING)
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_AddressedPlayerChangedEvent():
event = avrcp.AddressedPlayerChangedEvent(
player=avrcp.AddressedPlayerChangedEvent.Player(player_id=9, uid_counter=10)
)
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_AvailablePlayersChangedEvent():
event = avrcp.AvailablePlayersChangedEvent()
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_PlaybackPositionChangedEvent():
event = avrcp.PlaybackPositionChangedEvent(playback_position=1314)
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_NowPlayingContentChangedEvent():
event = avrcp.NowPlayingContentChangedEvent()
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_PlayerApplicationSettingChangedEvent():
event = avrcp.PlayerApplicationSettingChangedEvent(
player_application_settings=[
avrcp.PlayerApplicationSettingChangedEvent.Setting(
avrcp.ApplicationSetting.AttributeId.REPEAT_MODE,
avrcp.ApplicationSetting.RepeatModeStatus.ALL_TRACK_REPEAT,
)
]
)
assert avrcp.Event.from_bytes(bytes(event)) == event
# -----------------------------------------------------------------------------
def test_RejectedResponse():
pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES
response = avrcp.RejectedResponse(
pdu_id=pdu_id,
status_code=avrcp.StatusCode.DOES_NOT_EXIST,
)
assert (
avrcp.RejectedResponse.from_bytes(pdu=bytes(response), pdu_id=pdu_id)
== response
)
# -----------------------------------------------------------------------------
def test_GetPlayStatusResponse():
response = avrcp.GetPlayStatusResponse(
song_length=1010, song_position=13, play_status=avrcp.PlayStatus.PAUSED
)
assert avrcp.GetPlayStatusResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_NotImplementedResponse():
pdu_id = avrcp.PduId.GET_ELEMENT_ATTRIBUTES
response = avrcp.NotImplementedResponse(pdu_id=pdu_id, parameters=b'koasd')
assert (
avrcp.NotImplementedResponse.from_bytes(bytes(response), pdu_id=pdu_id)
== response
)
# -----------------------------------------------------------------------------
def test_GetCapabilitiesResponse():
response = avrcp.GetCapabilitiesResponse(
capability_id=avrcp.GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED,
capabilities=[
avrcp.EventId.ADDRESSED_PLAYER_CHANGED,
avrcp.EventId.BATT_STATUS_CHANGED,
],
)
assert avrcp.GetCapabilitiesResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_RegisterNotificationResponse():
response = avrcp.RegisterNotificationResponse(
event=avrcp.PlaybackPositionChangedEvent(playback_position=38)
)
assert avrcp.RegisterNotificationResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_SetAbsoluteVolumeResponse():
response = avrcp.SetAbsoluteVolumeResponse(volume=99)
assert avrcp.SetAbsoluteVolumeResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_GetElementAttributesResponse():
response = avrcp.GetElementAttributesResponse(
attributes=[
avrcp.MediaAttribute(
attribute_id=avrcp.MediaAttributeId.ALBUM_NAME,
attribute_value="White Album",
)
]
)
assert avrcp.GetElementAttributesResponse.from_bytes(bytes(response)) == response
# -----------------------------------------------------------------------------
def test_frame_parser():
with pytest.raises(ValueError) as error:
with pytest.raises(ValueError):
avc.Frame.from_bytes(bytes.fromhex("11480000"))
x = bytes.fromhex("014D0208")
@@ -217,8 +360,7 @@ def test_passthrough_commands():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_supported_events():
two_devices = TwoDevices()
await two_devices.setup_connections()
two_devices = await TwoDevices.create_with_avdtp()
supported_events = await two_devices.protocols[0].get_supported_events()
assert supported_events == []