From aedc9716539f7b9f8eaf81cbe01e547758e496da Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Thu, 29 Jan 2026 15:18:45 +0800 Subject: [PATCH] AVRCP: Add SDP record class and finder --- bumble/avrcp.py | 372 ++++++++++++++++++++++++++---------------- examples/run_avrcp.py | 4 +- tests/avrcp_test.py | 41 +++++ 3 files changed, 272 insertions(+), 145 deletions(-) diff --git a/bumble/avrcp.py b/bumble/avrcp.py index 4663af7..1aec633 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -26,7 +26,7 @@ from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequen from dataclasses import dataclass, field from typing import ClassVar, SupportsBytes, TypeVar -from bumble import avc, avctp, core, hci, l2cap, utils +from bumble import avc, avctp, core, hci, l2cap, sdp, utils from bumble.colors import color from bumble.device import Connection, Device from bumble.sdp import ( @@ -194,82 +194,43 @@ class TargetFeatures(enum.IntFlag): # ----------------------------------------------------------------------------- -def make_controller_service_sdp_records( - service_record_handle: int, - avctp_version: tuple[int, int] = (1, 4), - avrcp_version: tuple[int, int] = (1, 6), - supported_features: int | ControllerFeatures = 1, -) -> list[ServiceAttribute]: - avctp_version_int = avctp_version[0] << 8 | avctp_version[1] - avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] +@dataclass +class ControllerServiceSdpRecord: + service_record_handle: int + avctp_version: tuple[int, int] = (1, 4) + avrcp_version: tuple[int, int] = (1, 6) + supported_features: int | ControllerFeatures = ControllerFeatures(1) - attributes = [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(service_record_handle), - ), - ServiceAttribute( - SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, - DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), - ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE), - ] - ), - ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp.AVCTP_PSM), - ] - ), - DataElement.sequence( - [ - DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp_version_int), - ] - ), - ] - ), - ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), - DataElement.unsigned_integer_16(avrcp_version_int), - ] - ), - ] - ), - ), - ServiceAttribute( - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, - DataElement.unsigned_integer_16(supported_features), - ), - ] - if supported_features & ControllerFeatures.SUPPORTS_BROWSING: - attributes.append( + def to_service_attributes(self) -> list[ServiceAttribute]: + avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1] + avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1] + + attributes = [ ServiceAttribute( - SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(self.service_record_handle), + ), + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), + ), + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE), + ] + ), + ), + ServiceAttribute( + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, DataElement.sequence( [ DataElement.sequence( [ DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16( - avctp.AVCTP_BROWSING_PSM - ), + DataElement.unsigned_integer_16(avctp.AVCTP_PSM), ] ), DataElement.sequence( @@ -281,87 +242,130 @@ def make_controller_service_sdp_records( ] ), ), - ) - return attributes + ServiceAttribute( + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), + DataElement.unsigned_integer_16(avrcp_version_int), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(self.supported_features), + ), + ] + if self.supported_features & ControllerFeatures.SUPPORTS_BROWSING: + attributes.append( + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16( + avctp.AVCTP_BROWSING_PSM + ), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), + DataElement.unsigned_integer_16(avctp_version_int), + ] + ), + ] + ), + ), + ) + return attributes + + @classmethod + async def find(cls, connection: Connection) -> list[ControllerServiceSdpRecord]: + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE], + attribute_ids=[ + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + + records: list[ControllerServiceSdpRecord] = [] + for attribute_lists in search_result: + record = cls(0) + for attribute in attribute_lists: + if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID: + record.service_record_handle = attribute.value.value + elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + # [[L2CAP, PSM], [AVCTP, version]] + record.avctp_version = ( + attribute.value.value[1].value[1].value >> 8, + attribute.value.value[1].value[1].value & 0xFF, + ) + elif ( + attribute.id + == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + # [[AV_REMOTE_CONTROL, version]] + record.avrcp_version = ( + attribute.value.value[0].value[1].value >> 8, + attribute.value.value[0].value[1].value & 0xFF, + ) + elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + record.supported_features = ControllerFeatures( + attribute.value.value + ) + records.append(record) + return records # ----------------------------------------------------------------------------- -def make_target_service_sdp_records( - service_record_handle: int, - avctp_version: tuple[int, int] = (1, 4), - avrcp_version: tuple[int, int] = (1, 6), - supported_features: int | TargetFeatures = 0x23, -) -> list[ServiceAttribute]: - # TODO: support a way to compute the supported features from a feature list - avctp_version_int = avctp_version[0] << 8 | avctp_version[1] - avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] +@dataclass +class TargetServiceSdpRecord: + service_record_handle: int + avctp_version: tuple[int, int] = (1, 4) + avrcp_version: tuple[int, int] = (1, 6) + supported_features: int | TargetFeatures = TargetFeatures(0x23) - attributes = [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(service_record_handle), - ), - ServiceAttribute( - SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, - DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), - ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE), - ] - ), - ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp.AVCTP_PSM), - ] - ), - DataElement.sequence( - [ - DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), - DataElement.unsigned_integer_16(avctp_version_int), - ] - ), - ] - ), - ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( - [ - DataElement.sequence( - [ - DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), - DataElement.unsigned_integer_16(avrcp_version_int), - ] - ), - ] - ), - ), - ServiceAttribute( - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, - DataElement.unsigned_integer_16(supported_features), - ), - ] - if supported_features & TargetFeatures.SUPPORTS_BROWSING: - attributes.append( + def to_service_attributes(self) -> list[ServiceAttribute]: + # TODO: support a way to compute the supported features from a feature list + avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1] + avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1] + + attributes = [ ServiceAttribute( - SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(self.service_record_handle), + ), + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), + ), + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE), + ] + ), + ), + ServiceAttribute( + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, DataElement.sequence( [ DataElement.sequence( [ DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16( - avctp.AVCTP_BROWSING_PSM - ), + DataElement.unsigned_integer_16(avctp.AVCTP_PSM), ] ), DataElement.sequence( @@ -373,8 +377,90 @@ def make_target_service_sdp_records( ] ), ), - ) - return attributes + ServiceAttribute( + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE), + DataElement.unsigned_integer_16(avrcp_version_int), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(self.supported_features), + ), + ] + if self.supported_features & TargetFeatures.SUPPORTS_BROWSING: + attributes.append( + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16( + avctp.AVCTP_BROWSING_PSM + ), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID), + DataElement.unsigned_integer_16(avctp_version_int), + ] + ), + ] + ), + ), + ) + return attributes + + @classmethod + async def find(cls, connection: Connection) -> list[TargetServiceSdpRecord]: + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE], + attribute_ids=[ + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + + records: list[TargetServiceSdpRecord] = [] + for attribute_lists in search_result: + record = cls(0) + for attribute in attribute_lists: + if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID: + record.service_record_handle = attribute.value.value + elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + # [[L2CAP, PSM], [AVCTP, version]] + record.avctp_version = ( + attribute.value.value[1].value[1].value >> 8, + attribute.value.value[1].value[1].value & 0xFF, + ) + elif ( + attribute.id + == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + # [[AV_REMOTE_CONTROL, version]] + record.avrcp_version = ( + attribute.value.value[0].value[1].value >> 8, + attribute.value.value[0].value[1].value & 0xFF, + ) + elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + record.supported_features = TargetFeatures( + attribute.value.value + ) + records.append(record) + return records # ----------------------------------------------------------------------------- diff --git a/examples/run_avrcp.py b/examples/run_avrcp.py index 94d2a6b..8755d52 100644 --- a/examples/run_avrcp.py +++ b/examples/run_avrcp.py @@ -44,10 +44,10 @@ def sdp_records(): a2dp_sink_service_record_handle ), avrcp_controller_service_record_handle: avrcp.make_controller_service_sdp_records( - avrcp_controller_service_record_handle + avrcp.ControllerServiceSdpRecord(avrcp_controller_service_record_handle) ), avrcp_target_service_record_handle: avrcp.make_target_service_sdp_records( - avrcp_controller_service_record_handle + avrcp.TargetServiceSdpRecord(avrcp_target_service_record_handle) ), } diff --git a/tests/avrcp_test.py b/tests/avrcp_test.py index d534c0f..3950472 100644 --- a/tests/avrcp_test.py +++ b/tests/avrcp_test.py @@ -423,6 +423,47 @@ def test_passthrough_commands(): assert bytes(parsed) == play_pressed_bytes +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_find_sdp_records(): + two_devices = await TwoDevices.create_with_avdtp() + + # Add SDP records to device 1 + controller_record = avrcp.ControllerServiceSdpRecord( + service_record_handle=0x10001, + avctp_version=(1, 4), + avrcp_version=(1, 6), + supported_features=( + avrcp.ControllerFeatures.CATEGORY_1 + | avrcp.ControllerFeatures.SUPPORTS_BROWSING + ), + ) + target_record = avrcp.TargetServiceSdpRecord( + service_record_handle=0x10002, + avctp_version=(1, 4), + avrcp_version=(1, 6), + supported_features=( + avrcp.TargetFeatures.CATEGORY_1 | avrcp.TargetFeatures.SUPPORTS_BROWSING + ), + ) + + two_devices.devices[1].sdp_service_records = { + 0x10001: controller_record.to_service_attributes(), + 0x10002: target_record.to_service_attributes(), + } + + # Find records from device 0 + controller_records = await avrcp.ControllerServiceSdpRecord.find( + two_devices.connections[0] + ) + assert len(controller_records) == 1 + assert controller_records[0] == controller_record + + target_records = await avrcp.TargetServiceSdpRecord.find(two_devices.connections[0]) + assert len(target_records) == 1 + assert target_records[0] == target_record + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_supported_events():