Merge pull request #878 from zxzxwu/avrcp

AVRCP: SDP record classes and some delegation
This commit is contained in:
Josh Wu
2026-01-31 00:01:55 +08:00
committed by GitHub
3 changed files with 577 additions and 200 deletions

View File

@@ -26,7 +26,7 @@ from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequen
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import ClassVar, SupportsBytes, TypeVar from typing import ClassVar, SupportsBytes, TypeVar
from bumble import avc, avctp, core, hci, l2cap, utils from bumble import avc, avctp, core, hci, l2cap, sdp, utils
from bumble.colors import color from bumble.colors import color
from bumble.device import Connection, Device from bumble.device import Connection, Device
from bumble.sdp import ( from bumble.sdp import (
@@ -194,82 +194,43 @@ class TargetFeatures(enum.IntFlag):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def make_controller_service_sdp_records( @dataclass
service_record_handle: int, class ControllerServiceSdpRecord:
avctp_version: tuple[int, int] = (1, 4), service_record_handle: int
avrcp_version: tuple[int, int] = (1, 6), avctp_version: tuple[int, int] = (1, 4)
supported_features: int | ControllerFeatures = 1, avrcp_version: tuple[int, int] = (1, 6)
) -> list[ServiceAttribute]: supported_features: int | ControllerFeatures = ControllerFeatures(1)
avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
attributes = [ def to_service_attributes(self) -> list[ServiceAttribute]:
ServiceAttribute( avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1]
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1]
DataElement.unsigned_integer_32(service_record_handle),
), attributes = [
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(supported_features),
),
]
if supported_features & ControllerFeatures.SUPPORTS_BROWSING:
attributes.append(
ServiceAttribute( ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(self.service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence( DataElement.sequence(
[ [
DataElement.sequence( DataElement.sequence(
[ [
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16( DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
avctp.AVCTP_BROWSING_PSM
),
] ]
), ),
DataElement.sequence( DataElement.sequence(
@@ -281,87 +242,130 @@ def make_controller_service_sdp_records(
] ]
), ),
), ),
) ServiceAttribute(
return attributes SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(self.supported_features),
),
]
if self.supported_features & ControllerFeatures.SUPPORTS_BROWSING:
attributes.append(
ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(
avctp.AVCTP_BROWSING_PSM
),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
)
return attributes
@classmethod
async def find(cls, connection: Connection) -> list[ControllerServiceSdpRecord]:
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE],
attribute_ids=[
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
records: list[ControllerServiceSdpRecord] = []
for attribute_lists in search_result:
record = cls(0)
for attribute in attribute_lists:
if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID:
record.service_record_handle = attribute.value.value
elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
# [[L2CAP, PSM], [AVCTP, version]]
record.avctp_version = (
attribute.value.value[1].value[1].value >> 8,
attribute.value.value[1].value[1].value & 0xFF,
)
elif (
attribute.id
== SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
# [[AV_REMOTE_CONTROL, version]]
record.avrcp_version = (
attribute.value.value[0].value[1].value >> 8,
attribute.value.value[0].value[1].value & 0xFF,
)
elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
record.supported_features = ControllerFeatures(
attribute.value.value
)
records.append(record)
return records
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def make_target_service_sdp_records( @dataclass
service_record_handle: int, class TargetServiceSdpRecord:
avctp_version: tuple[int, int] = (1, 4), service_record_handle: int
avrcp_version: tuple[int, int] = (1, 6), avctp_version: tuple[int, int] = (1, 4)
supported_features: int | TargetFeatures = 0x23, avrcp_version: tuple[int, int] = (1, 6)
) -> list[ServiceAttribute]: supported_features: int | TargetFeatures = TargetFeatures(0x23)
# TODO: support a way to compute the supported features from a feature list
avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
attributes = [ def to_service_attributes(self) -> list[ServiceAttribute]:
ServiceAttribute( # TODO: support a way to compute the supported features from a feature list
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1]
DataElement.unsigned_integer_32(service_record_handle), avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1]
),
ServiceAttribute( attributes = [
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(supported_features),
),
]
if supported_features & TargetFeatures.SUPPORTS_BROWSING:
attributes.append(
ServiceAttribute( ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(self.service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence( DataElement.sequence(
[ [
DataElement.sequence( DataElement.sequence(
[ [
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16( DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
avctp.AVCTP_BROWSING_PSM
),
] ]
), ),
DataElement.sequence( DataElement.sequence(
@@ -373,8 +377,90 @@ def make_target_service_sdp_records(
] ]
), ),
), ),
) ServiceAttribute(
return attributes SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(self.supported_features),
),
]
if self.supported_features & TargetFeatures.SUPPORTS_BROWSING:
attributes.append(
ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(
avctp.AVCTP_BROWSING_PSM
),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
)
return attributes
@classmethod
async def find(cls, connection: Connection) -> list[TargetServiceSdpRecord]:
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE],
attribute_ids=[
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
records: list[TargetServiceSdpRecord] = []
for attribute_lists in search_result:
record = cls(0)
for attribute in attribute_lists:
if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID:
record.service_record_handle = attribute.value.value
elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
# [[L2CAP, PSM], [AVCTP, version]]
record.avctp_version = (
attribute.value.value[1].value[1].value >> 8,
attribute.value.value[1].value[1].value & 0xFF,
)
elif (
attribute.id
== SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
# [[AV_REMOTE_CONTROL, version]]
record.avrcp_version = (
attribute.value.value[0].value[1].value >> 8,
attribute.value.value[0].value[1].value & 0xFF,
)
elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
record.supported_features = TargetFeatures(
attribute.value.value
)
records.append(record)
return records
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -1204,6 +1290,10 @@ class InformBatteryStatusOfCtResponse(Response):
@dataclass @dataclass
class GetPlayStatusResponse(Response): class GetPlayStatusResponse(Response):
pdu_id = PduId.GET_PLAY_STATUS pdu_id = PduId.GET_PLAY_STATUS
# TG doesn't support Song Length or Position.
UNAVAILABLE = 0xFFFFFFFF
song_length: int = field(metadata=hci.metadata(">4")) song_length: int = field(metadata=hci.metadata(">4"))
song_position: int = field(metadata=hci.metadata(">4")) song_position: int = field(metadata=hci.metadata(">4"))
play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1)) play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1))
@@ -1521,16 +1611,33 @@ class Delegate:
def __init__(self, status_code: StatusCode) -> None: def __init__(self, status_code: StatusCode) -> None:
self.status_code = status_code self.status_code = status_code
supported_events: list[EventId] class AvcError(Exception):
volume: int """The delegate AVC method failed, with a specified status code."""
def __init__(self, supported_events: Iterable[EventId] = ()) -> None: def __init__(self, status_code: avc.ResponseFrame.ResponseCode) -> None:
self.status_code = status_code
supported_events: list[EventId]
supported_company_ids: list[int]
volume: int
playback_status: PlayStatus
def __init__(
self,
supported_events: Iterable[EventId] = (),
supported_company_ids: Iterable[int] = (AVRCP_BLUETOOTH_SIG_COMPANY_ID,),
) -> None:
self.supported_company_ids = list(supported_company_ids)
self.supported_events = list(supported_events) self.supported_events = list(supported_events)
self.volume = 0 self.volume = 0
self.playback_status = PlayStatus.STOPPED
async def get_supported_events(self) -> list[EventId]: async def get_supported_events(self) -> list[EventId]:
return self.supported_events return self.supported_events
async def get_supported_company_ids(self) -> list[int]:
return self.supported_company_ids
async def set_absolute_volume(self, volume: int) -> None: async def set_absolute_volume(self, volume: int) -> None:
""" """
Set the absolute volume. Set the absolute volume.
@@ -1543,6 +1650,19 @@ class Delegate:
async def get_absolute_volume(self) -> int: async def get_absolute_volume(self) -> int:
return self.volume return self.volume
async def on_key_event(
self,
key: avc.PassThroughFrame.OperationId,
pressed: bool,
data: bytes,
) -> None:
logger.debug(
"@@@ on_key_event: key=%s, pressed=%s, data=%s", key, pressed, data.hex()
)
async def get_playback_status(self) -> PlayStatus:
return self.playback_status
# TODO add other delegate methods # TODO add other delegate methods
@@ -1756,6 +1876,19 @@ class Protocol(utils.EventEmitter):
if isinstance(capability, EventId) if isinstance(capability, EventId)
) )
async def get_supported_company_ids(self) -> list[int]:
"""Get the list of events supported by the connected peer."""
response_context = await self.send_avrcp_command(
avc.CommandFrame.CommandType.STATUS,
GetCapabilitiesCommand(GetCapabilitiesCommand.CapabilityId.COMPANY_ID),
)
response = self._check_response(response_context, GetCapabilitiesResponse)
return list(
int.from_bytes(capability, 'big')
for capability in response.capabilities
if isinstance(capability, bytes)
)
async def get_play_status(self) -> SongAndPlayStatus: async def get_play_status(self) -> SongAndPlayStatus:
"""Get the play status of the connected peer.""" """Get the play status of the connected peer."""
response_context = await self.send_avrcp_command( response_context = await self.send_avrcp_command(
@@ -2052,16 +2185,28 @@ class Protocol(utils.EventEmitter):
return return
if isinstance(command, avc.PassThroughCommandFrame): if isinstance(command, avc.PassThroughCommandFrame):
# TODO: delegate
response = avc.PassThroughResponseFrame( async def dispatch_key_event() -> None:
avc.ResponseFrame.ResponseCode.ACCEPTED, try:
command.subunit_type, await self.delegate.on_key_event(
command.subunit_id, command.operation_id,
command.state_flag, command.state_flag == avc.PassThroughFrame.StateFlag.PRESSED,
command.operation_id, command.operation_data,
command.operation_data, )
) response_code = avc.ResponseFrame.ResponseCode.ACCEPTED
self.send_response(transaction_label, response) except Delegate.AvcError as error:
logger.exception("delegate method raised exception")
response_code = error.status_code
except Exception:
logger.exception("delegate method raised exception")
response_code = avc.ResponseFrame.ResponseCode.REJECTED
self.send_passthrough_response(
transaction_label=transaction_label,
command=command,
response_code=response_code,
)
utils.AsyncRunner.spawn(dispatch_key_event())
return return
# TODO handle other types # TODO handle other types
@@ -2141,6 +2286,8 @@ class Protocol(utils.EventEmitter):
self._on_set_absolute_volume_command(transaction_label, command) self._on_set_absolute_volume_command(transaction_label, command)
elif isinstance(command, RegisterNotificationCommand): elif isinstance(command, RegisterNotificationCommand):
self._on_register_notification_command(transaction_label, command) self._on_register_notification_command(transaction_label, command)
elif isinstance(command, GetPlayStatusCommand):
self._on_get_play_status_command(transaction_label, command)
else: else:
# Not supported. # Not supported.
# TODO: check that this is the right way to respond in this case. # TODO: check that this is the right way to respond in this case.
@@ -2364,17 +2511,27 @@ class Protocol(utils.EventEmitter):
logger.debug(f"<<< AVRCP command PDU: {command}") logger.debug(f"<<< AVRCP command PDU: {command}")
async def get_supported_events() -> None: async def get_supported_events() -> None:
capabilities: Sequence[bytes | SupportsBytes]
if ( if (
command.capability_id command.capability_id
!= GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED == GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED
): ):
raise core.InvalidArgumentError() capabilities = await self.delegate.get_supported_events()
elif (
supported_events = await self.delegate.get_supported_events() command.capability_id == GetCapabilitiesCommand.CapabilityId.COMPANY_ID
):
company_ids = await self.delegate.get_supported_company_ids()
capabilities = [
company_id.to_bytes(3, 'big') for company_id in company_ids
]
else:
raise core.InvalidArgumentError(
f"Unsupported capability: {command.capability_id}"
)
self.send_avrcp_response( self.send_avrcp_response(
transaction_label, transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
GetCapabilitiesResponse(command.capability_id, supported_events), GetCapabilitiesResponse(command.capability_id, capabilities),
) )
self._delegate_command(transaction_label, command, get_supported_events()) self._delegate_command(transaction_label, command, get_supported_events())
@@ -2395,6 +2552,26 @@ class Protocol(utils.EventEmitter):
self._delegate_command(transaction_label, command, set_absolute_volume()) self._delegate_command(transaction_label, command, set_absolute_volume())
def _on_get_play_status_command(
self, transaction_label: int, command: GetPlayStatusCommand
) -> None:
logger.debug("<<< AVRCP command PDU: %s", command)
async def get_playback_status() -> None:
play_status: PlayStatus = await self.delegate.get_playback_status()
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
GetPlayStatusResponse(
# TODO: Delegate this.
song_length=GetPlayStatusResponse.UNAVAILABLE,
song_position=GetPlayStatusResponse.UNAVAILABLE,
play_status=play_status,
),
)
self._delegate_command(transaction_label, command, get_playback_status())
def _on_register_notification_command( def _on_register_notification_command(
self, transaction_label: int, command: RegisterNotificationCommand self, transaction_label: int, command: RegisterNotificationCommand
) -> None: ) -> None:
@@ -2410,28 +2587,27 @@ class Protocol(utils.EventEmitter):
) )
return return
response: Response
if command.event_id == EventId.VOLUME_CHANGED: if command.event_id == EventId.VOLUME_CHANGED:
volume = await self.delegate.get_absolute_volume() volume = await self.delegate.get_absolute_volume()
response = RegisterNotificationResponse(VolumeChangedEvent(volume)) response = RegisterNotificationResponse(VolumeChangedEvent(volume))
self.send_avrcp_response( elif command.event_id == EventId.PLAYBACK_STATUS_CHANGED:
transaction_label, playback_status = await self.delegate.get_playback_status()
avc.ResponseFrame.ResponseCode.INTERIM, response = RegisterNotificationResponse(
response, PlaybackStatusChangedEvent(play_status=playback_status)
) )
self._register_notification_listener(transaction_label, command) elif command.event_id == EventId.NOW_PLAYING_CONTENT_CHANGED:
playback_status = await self.delegate.get_playback_status()
response = RegisterNotificationResponse(NowPlayingContentChangedEvent())
else:
logger.warning("Event supported but not handled %s", command.event_id)
return return
if command.event_id == EventId.PLAYBACK_STATUS_CHANGED: self.send_avrcp_response(
# TODO: testing only, use delegate transaction_label,
response = RegisterNotificationResponse( avc.ResponseFrame.ResponseCode.INTERIM,
PlaybackStatusChangedEvent(play_status=PlayStatus.PLAYING) response,
) )
self.send_avrcp_response( self._register_notification_listener(transaction_label, command)
transaction_label,
avc.ResponseFrame.ResponseCode.INTERIM,
response,
)
self._register_notification_listener(transaction_label, command)
return
self._delegate_command(transaction_label, command, register_notification()) self._delegate_command(transaction_label, command, register_notification())

View File

@@ -25,7 +25,7 @@ import sys
import websockets.asyncio.server import websockets.asyncio.server
import bumble.logging import bumble.logging
from bumble import a2dp, avc, avdtp, avrcp, utils from bumble import a2dp, avc, avdtp, avrcp, sdp, utils
from bumble.core import PhysicalTransport from bumble.core import PhysicalTransport
from bumble.device import Device from bumble.device import Device
from bumble.transport import open_transport from bumble.transport import open_transport
@@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def sdp_records(): def sdp_records() -> dict[int, list[sdp.ServiceAttribute]]:
a2dp_sink_service_record_handle = 0x00010001 a2dp_sink_service_record_handle = 0x00010001
avrcp_controller_service_record_handle = 0x00010002 avrcp_controller_service_record_handle = 0x00010002
avrcp_target_service_record_handle = 0x00010003 avrcp_target_service_record_handle = 0x00010003
@@ -43,17 +43,17 @@ def sdp_records():
a2dp_sink_service_record_handle: a2dp.make_audio_sink_service_sdp_records( a2dp_sink_service_record_handle: a2dp.make_audio_sink_service_sdp_records(
a2dp_sink_service_record_handle a2dp_sink_service_record_handle
), ),
avrcp_controller_service_record_handle: avrcp.make_controller_service_sdp_records( avrcp_controller_service_record_handle: avrcp.ControllerServiceSdpRecord(
avrcp_controller_service_record_handle avrcp_controller_service_record_handle
), ).to_service_attributes(),
avrcp_target_service_record_handle: avrcp.make_target_service_sdp_records( avrcp_target_service_record_handle: avrcp.TargetServiceSdpRecord(
avrcp_controller_service_record_handle avrcp_target_service_record_handle
), ).to_service_attributes(),
} }
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def codec_capabilities(): def codec_capabilities() -> avdtp.MediaCodecCapabilities:
return avdtp.MediaCodecCapabilities( return avdtp.MediaCodecCapabilities(
media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE, media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=a2dp.A2DP_SBC_CODEC_TYPE, media_codec_type=a2dp.A2DP_SBC_CODEC_TYPE,
@@ -81,20 +81,22 @@ def codec_capabilities():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def on_avdtp_connection(server): def on_avdtp_connection(server: avdtp.Protocol) -> None:
# Add a sink endpoint to the server # Add a sink endpoint to the server
sink = server.add_sink(codec_capabilities()) sink = server.add_sink(codec_capabilities())
sink.on('rtp_packet', on_rtp_packet) sink.on(sink.EVENT_RTP_PACKET, on_rtp_packet)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def on_rtp_packet(packet): def on_rtp_packet(packet: avdtp.MediaPacket) -> None:
print(f'RTP: {packet}') print(f'RTP: {packet}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer): def on_avrcp_start(
async def get_supported_events(): avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer
) -> None:
async def get_supported_events() -> None:
events = await avrcp_protocol.get_supported_events() events = await avrcp_protocol.get_supported_events()
print("SUPPORTED EVENTS:", events) print("SUPPORTED EVENTS:", events)
websocket_server.send_message( websocket_server.send_message(
@@ -130,14 +132,14 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
utils.AsyncRunner.spawn(get_supported_events()) utils.AsyncRunner.spawn(get_supported_events())
async def monitor_track_changed(): async def monitor_track_changed() -> None:
async for identifier in avrcp_protocol.monitor_track_changed(): async for identifier in avrcp_protocol.monitor_track_changed():
print("TRACK CHANGED:", identifier.hex()) print("TRACK CHANGED:", identifier.hex())
websocket_server.send_message( websocket_server.send_message(
{"type": "track-changed", "params": {"identifier": identifier.hex()}} {"type": "track-changed", "params": {"identifier": identifier.hex()}}
) )
async def monitor_playback_status(): async def monitor_playback_status() -> None:
async for playback_status in avrcp_protocol.monitor_playback_status(): async for playback_status in avrcp_protocol.monitor_playback_status():
print("PLAYBACK STATUS CHANGED:", playback_status.name) print("PLAYBACK STATUS CHANGED:", playback_status.name)
websocket_server.send_message( websocket_server.send_message(
@@ -147,7 +149,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
} }
) )
async def monitor_playback_position(): async def monitor_playback_position() -> None:
async for playback_position in avrcp_protocol.monitor_playback_position( async for playback_position in avrcp_protocol.monitor_playback_position(
playback_interval=1 playback_interval=1
): ):
@@ -159,7 +161,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
} }
) )
async def monitor_player_application_settings(): async def monitor_player_application_settings() -> None:
async for settings in avrcp_protocol.monitor_player_application_settings(): async for settings in avrcp_protocol.monitor_player_application_settings():
print("PLAYER APPLICATION SETTINGS:", settings) print("PLAYER APPLICATION SETTINGS:", settings)
settings_as_dict = [ settings_as_dict = [
@@ -173,14 +175,14 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
} }
) )
async def monitor_available_players(): async def monitor_available_players() -> None:
async for _ in avrcp_protocol.monitor_available_players(): async for _ in avrcp_protocol.monitor_available_players():
print("AVAILABLE PLAYERS CHANGED") print("AVAILABLE PLAYERS CHANGED")
websocket_server.send_message( websocket_server.send_message(
{"type": "available-players-changed", "params": {}} {"type": "available-players-changed", "params": {}}
) )
async def monitor_addressed_player(): async def monitor_addressed_player() -> None:
async for player in avrcp_protocol.monitor_addressed_player(): async for player in avrcp_protocol.monitor_addressed_player():
print("ADDRESSED PLAYER CHANGED") print("ADDRESSED PLAYER CHANGED")
websocket_server.send_message( websocket_server.send_message(
@@ -195,7 +197,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
} }
) )
async def monitor_uids(): async def monitor_uids() -> None:
async for uid_counter in avrcp_protocol.monitor_uids(): async for uid_counter in avrcp_protocol.monitor_uids():
print("UIDS CHANGED") print("UIDS CHANGED")
websocket_server.send_message( websocket_server.send_message(
@@ -207,7 +209,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
} }
) )
async def monitor_volume(): async def monitor_volume() -> None:
async for volume in avrcp_protocol.monitor_volume(): async for volume in avrcp_protocol.monitor_volume():
print("VOLUME CHANGED:", volume) print("VOLUME CHANGED:", volume)
websocket_server.send_message( websocket_server.send_message(
@@ -360,7 +362,7 @@ async def main() -> None:
# Create a listener to wait for AVDTP connections # Create a listener to wait for AVDTP connections
listener = avdtp.Listener(avdtp.Listener.create_registrar(device)) listener = avdtp.Listener(avdtp.Listener.create_registrar(device))
listener.on('connection', on_avdtp_connection) listener.on(listener.EVENT_CONNECTION, on_avdtp_connection)
avrcp_delegate = Delegate() avrcp_delegate = Delegate()
avrcp_protocol = avrcp.Protocol(avrcp_delegate) avrcp_protocol = avrcp.Protocol(avrcp_delegate)

View File

@@ -17,6 +17,7 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import asyncio
import struct import struct
from collections.abc import Sequence from collections.abc import Sequence
@@ -422,6 +423,47 @@ def test_passthrough_commands():
assert bytes(parsed) == play_pressed_bytes assert bytes(parsed) == play_pressed_bytes
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_find_sdp_records():
two_devices = await TwoDevices.create_with_avdtp()
# Add SDP records to device 1
controller_record = avrcp.ControllerServiceSdpRecord(
service_record_handle=0x10001,
avctp_version=(1, 4),
avrcp_version=(1, 6),
supported_features=(
avrcp.ControllerFeatures.CATEGORY_1
| avrcp.ControllerFeatures.SUPPORTS_BROWSING
),
)
target_record = avrcp.TargetServiceSdpRecord(
service_record_handle=0x10002,
avctp_version=(1, 4),
avrcp_version=(1, 6),
supported_features=(
avrcp.TargetFeatures.CATEGORY_1 | avrcp.TargetFeatures.SUPPORTS_BROWSING
),
)
two_devices.devices[1].sdp_service_records = {
0x10001: controller_record.to_service_attributes(),
0x10002: target_record.to_service_attributes(),
}
# Find records from device 0
controller_records = await avrcp.ControllerServiceSdpRecord.find(
two_devices.connections[0]
)
assert len(controller_records) == 1
assert controller_records[0] == controller_record
target_records = await avrcp.TargetServiceSdpRecord.find(two_devices.connections[0])
assert len(target_records) == 1
assert target_records[0] == target_record
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_supported_events(): async def test_get_supported_events():
@@ -436,6 +478,163 @@ async def test_get_supported_events():
assert supported_events == [avrcp.EventId.VOLUME_CHANGED] assert supported_events == [avrcp.EventId.VOLUME_CHANGED]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_passthrough_key_event():
two_devices = await TwoDevices.create_with_avdtp()
q = asyncio.Queue[tuple[avc.PassThroughFrame.OperationId, bool, bytes]]()
class Delegate(avrcp.Delegate):
async def on_key_event(
self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes
) -> None:
q.put_nowait((key, pressed, data))
two_devices.protocols[1].delegate = Delegate()
for key, pressed in [
(avc.PassThroughFrame.OperationId.PLAY, True),
(avc.PassThroughFrame.OperationId.PLAY, False),
(avc.PassThroughFrame.OperationId.PAUSE, True),
(avc.PassThroughFrame.OperationId.PAUSE, False),
]:
await two_devices.protocols[0].send_key_event(key, pressed)
assert (await q.get()) == (key, pressed, b'')
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_passthrough_key_event_rejected():
two_devices = await TwoDevices.create_with_avdtp()
class Delegate(avrcp.Delegate):
async def on_key_event(
self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes
) -> None:
raise avrcp.Delegate.AvcError(avc.ResponseFrame.ResponseCode.REJECTED)
two_devices.protocols[1].delegate = Delegate()
response = await two_devices.protocols[0].send_key_event(
avc.PassThroughFrame.OperationId.PLAY, True
)
assert response.response == avc.ResponseFrame.ResponseCode.REJECTED
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_passthrough_key_event_exception():
two_devices = await TwoDevices.create_with_avdtp()
class Delegate(avrcp.Delegate):
async def on_key_event(
self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes
) -> None:
raise Exception()
two_devices.protocols[1].delegate = Delegate()
response = await two_devices.protocols[0].send_key_event(
avc.PassThroughFrame.OperationId.PLAY, True
)
assert response.response == avc.ResponseFrame.ResponseCode.REJECTED
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_set_volume():
two_devices = await TwoDevices.create_with_avdtp()
for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1):
response = await two_devices.protocols[1].send_avrcp_command(
avc.CommandFrame.CommandType.CONTROL, avrcp.SetAbsoluteVolumeCommand(volume)
)
assert isinstance(response.response, avrcp.SetAbsoluteVolumeResponse)
assert response.response.volume == volume
assert two_devices.protocols[0].delegate.volume == volume
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_playback_status():
two_devices = await TwoDevices.create_with_avdtp()
for status in avrcp.PlayStatus:
two_devices.protocols[0].delegate.playback_status = status
response = await two_devices.protocols[1].get_play_status()
assert response.play_status == status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_supported_company_ids():
two_devices = await TwoDevices.create_with_avdtp()
for status in avrcp.PlayStatus:
two_devices.protocols[0].delegate = avrcp.Delegate(
supported_company_ids=[avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID]
)
supported_company_ids = await two_devices.protocols[
1
].get_supported_company_ids()
assert supported_company_ids == [avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_volume():
two_devices = await TwoDevices.create_with_avdtp()
two_devices.protocols[1].delegate = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED])
volume_iter = two_devices.protocols[0].monitor_volume()
for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1):
# Interim
two_devices.protocols[1].delegate.volume = 0
assert (await anext(volume_iter)) == 0
# Changed
two_devices.protocols[1].notify_volume_changed(volume)
assert (await anext(volume_iter)) == volume
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_playback_status():
two_devices = await TwoDevices.create_with_avdtp()
two_devices.protocols[1].delegate = avrcp.Delegate(
[avrcp.EventId.PLAYBACK_STATUS_CHANGED]
)
playback_status_iter = two_devices.protocols[0].monitor_playback_status()
for playback_status in avrcp.PlayStatus:
# Interim
two_devices.protocols[1].delegate.playback_status = avrcp.PlayStatus.STOPPED
assert (await anext(playback_status_iter)) == avrcp.PlayStatus.STOPPED
# Changed
two_devices.protocols[1].notify_playback_status_changed(playback_status)
assert (await anext(playback_status_iter)) == playback_status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_now_playing_content():
two_devices = await TwoDevices.create_with_avdtp()
two_devices.protocols[1].delegate = avrcp.Delegate(
[avrcp.EventId.NOW_PLAYING_CONTENT_CHANGED]
)
now_playing_iter = two_devices.protocols[0].monitor_now_playing_content()
for _ in range(2):
# Interim
await anext(now_playing_iter)
# Changed
two_devices.protocols[1].notify_now_playing_content_changed()
await anext(now_playing_iter)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_frame_parser() test_frame_parser()