From 1198f2c3f53c93ee4dfba9e3863c2ab1f9d1fb5f Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Tue, 3 Mar 2026 02:05:15 +0800 Subject: [PATCH] SDP: Make PDU dataclasses --- bumble/sdp.py | 370 ++++++++++++++++++++++---------------------------- 1 file changed, 165 insertions(+), 205 deletions(-) diff --git a/bumble/sdp.py b/bumble/sdp.py index 0562ebd..a7f8328 100644 --- a/bumble/sdp.py +++ b/bumble/sdp.py @@ -22,11 +22,11 @@ import logging import struct from collections.abc import Iterable, Sequence from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, NewType +from typing import TYPE_CHECKING, Any, ClassVar, NewType, TypeVar from typing_extensions import Self -from bumble import core, l2cap, utils +from bumble import core, hci, l2cap, utils from bumble.colors import color from bumble.core import ( InvalidArgumentError, @@ -34,7 +34,6 @@ from bumble.core import ( InvalidStateError, ProtocolError, ) -from bumble.hci import HCI_Object, key_with_value, name_or_number if TYPE_CHECKING: from bumble.device import Connection, Device @@ -55,39 +54,22 @@ SDP_CONTINUATION_WATCHDOG = 64 # Maximum number of continuations we're willing SDP_PSM = 0x0001 -SDP_ERROR_RESPONSE = 0x01 -SDP_SERVICE_SEARCH_REQUEST = 0x02 -SDP_SERVICE_SEARCH_RESPONSE = 0x03 -SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04 -SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05 -SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06 -SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07 +class PduId(hci.SpecableEnum): + SDP_ERROR_RESPONSE = 0x01 + SDP_SERVICE_SEARCH_REQUEST = 0x02 + SDP_SERVICE_SEARCH_RESPONSE = 0x03 + SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04 + SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05 + SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06 + SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07 -SDP_PDU_NAMES = { - SDP_ERROR_RESPONSE: 'SDP_ERROR_RESPONSE', - SDP_SERVICE_SEARCH_REQUEST: 'SDP_SERVICE_SEARCH_REQUEST', - SDP_SERVICE_SEARCH_RESPONSE: 'SDP_SERVICE_SEARCH_RESPONSE', - SDP_SERVICE_ATTRIBUTE_REQUEST: 'SDP_SERVICE_ATTRIBUTE_REQUEST', - SDP_SERVICE_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_ATTRIBUTE_RESPONSE', - SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: 'SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST', - SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE' -} - -SDP_INVALID_SDP_VERSION_ERROR = 0x0001 -SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR = 0x0002 -SDP_INVALID_REQUEST_SYNTAX_ERROR = 0x0003 -SDP_INVALID_PDU_SIZE_ERROR = 0x0004 -SDP_INVALID_CONTINUATION_STATE_ERROR = 0x0005 -SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR = 0x0006 - -SDP_ERROR_NAMES = { - SDP_INVALID_SDP_VERSION_ERROR: 'SDP_INVALID_SDP_VERSION_ERROR', - SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR: 'SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR', - SDP_INVALID_REQUEST_SYNTAX_ERROR: 'SDP_INVALID_REQUEST_SYNTAX_ERROR', - SDP_INVALID_PDU_SIZE_ERROR: 'SDP_INVALID_PDU_SIZE_ERROR', - SDP_INVALID_CONTINUATION_STATE_ERROR: 'SDP_INVALID_CONTINUATION_STATE_ERROR', - SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR: 'SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR' -} +class ErrorCode(hci.SpecableEnum): + INVALID_SDP_VERSION = 0x0001 + INVALID_SERVICE_RECORD_HANDLE = 0x0002 + INVALID_REQUEST_SYNTAX = 0x0003 + INVALID_PDU_SIZE = 0x0004 + INVALID_CONTINUATION_STATE = 0x0005 + INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST = 0x0006 SDP_SERVICE_NAME_ATTRIBUTE_ID_OFFSET = 0x0000 SDP_SERVICE_DESCRIPTION_ATTRIBUTE_ID_OFFSET = 0x0001 @@ -516,7 +498,7 @@ class ServiceAttribute: @staticmethod def id_name(id_code): - return name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code) + return hci.name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code) @staticmethod def is_uuid_in_value(uuid: core.UUID, value: DataElement) -> bool: @@ -546,239 +528,223 @@ class ServiceAttribute: # ----------------------------------------------------------------------------- +def _parse_service_record_handle_list( + data: bytes, offset: int +) -> tuple[int, list[int]]: + count = struct.unpack_from('>H', data, offset)[0] + offset += 2 + handle_list = [ + struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count) + ] + return offset + count * 4, handle_list + + +def _serialize_service_record_handle_list( + handles: list[int], +) -> bytes: + return struct.pack('>H', len(handles)) + b''.join( + struct.pack('>I', handle) for handle in handles + ) + + +def _parse_bytes_preceded_by_length(data: bytes, offset: int) -> tuple[int, bytes]: + length = struct.unpack_from('>H', data, offset)[0] + offset += 2 + return offset + length, data[offset : offset + length] + + +def _serialize_bytes_preceded_by_length(data: bytes) -> bytes: + return struct.pack('>H', len(data)) + data + + +_SERVICE_RECORD_HANDLE_LIST_METADATA = hci.metadata( + { + 'parser': _parse_service_record_handle_list, + 'serializer': _serialize_service_record_handle_list, + } +) + + +_BYTES_PRECEDED_BY_LENGTH_METADATA = hci.metadata( + { + 'parser': _parse_bytes_preceded_by_length, + 'serializer': _serialize_bytes_preceded_by_length, + } +) + + +# ----------------------------------------------------------------------------- +@dataclass class SDP_PDU: ''' See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT ''' RESPONSE_PDU_IDS = { - SDP_SERVICE_SEARCH_REQUEST: SDP_SERVICE_SEARCH_RESPONSE, - SDP_SERVICE_ATTRIBUTE_REQUEST: SDP_SERVICE_ATTRIBUTE_RESPONSE, - SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE, + PduId.SDP_SERVICE_SEARCH_REQUEST: PduId.SDP_SERVICE_SEARCH_RESPONSE, + PduId.SDP_SERVICE_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE, + PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE, } - sdp_pdu_classes: dict[int, type[SDP_PDU]] = {} - name = None - pdu_id = 0 + subclasses: ClassVar[dict[int, type[SDP_PDU]]] = {} + pdu_id: ClassVar[PduId] + fields: ClassVar[hci.Fields] - @staticmethod - def from_bytes(pdu): + transaction_id: int + _payload: bytes | None = field(init=False, repr=False, default=None) + + @classmethod + def from_bytes(cls, pdu: bytes) -> SDP_PDU: pdu_id, transaction_id, _parameters_length = struct.unpack_from('>BHH', pdu, 0) - cls = SDP_PDU.sdp_pdu_classes.get(pdu_id) - if cls is None: - instance = SDP_PDU(pdu) - instance.name = SDP_PDU.pdu_name(pdu_id) - instance.pdu_id = pdu_id - instance.transaction_id = transaction_id - return instance - self = cls.__new__(cls) - SDP_PDU.__init__(self, pdu, transaction_id) - if hasattr(self, 'fields'): - self.init_from_bytes(pdu, 5) - return self + subclass = cls.subclasses.get(pdu_id) + if not (subclass := cls.subclasses.get(pdu_id)): + raise InvalidPacketError(f"Unknown PDU type {pdu_id}") + instance = subclass( + transaction_id=transaction_id, + **hci.HCI_Object.dict_from_bytes(pdu, 5, subclass.fields), + ) + instance._payload = pdu + return instance - @staticmethod - def parse_service_record_handle_list_preceded_by_count( - data: bytes, offset: int - ) -> tuple[int, list[int]]: - count = struct.unpack_from('>H', data, offset - 2)[0] - handle_list = [ - struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count) - ] - return offset + count * 4, handle_list + _PDU = TypeVar('_PDU', bound='SDP_PDU') - @staticmethod - def parse_bytes_preceded_by_length(data, offset): - length = struct.unpack_from('>H', data, offset - 2)[0] - return offset + length, data[offset : offset + length] - - @staticmethod - def error_name(error_code): - return name_or_number(SDP_ERROR_NAMES, error_code) - - @staticmethod - def pdu_name(code): - return name_or_number(SDP_PDU_NAMES, code) - - @staticmethod - def subclass(fields): - def inner(cls): - name = cls.__name__ - - # add a _ character before every uppercase letter, except the SDP_ prefix - location = len(name) - 1 - while location > 4: - if not name[location].isupper(): - location -= 1 - continue - name = name[:location] + '_' + name[location:] - location -= 1 - - cls.name = name.upper() - cls.pdu_id = key_with_value(SDP_PDU_NAMES, cls.name) - if cls.pdu_id is None: - raise KeyError(f'PDU name {cls.name} not found in SDP_PDU_NAMES') - cls.fields = fields - - # Register a factory for this class - SDP_PDU.sdp_pdu_classes[cls.pdu_id] = cls - - return cls - - return inner - - def __init__(self, pdu=None, transaction_id=0, **kwargs): - if hasattr(self, 'fields') and kwargs: - HCI_Object.init_from_fields(self, self.fields, kwargs) - if pdu is None: - parameters = HCI_Object.dict_to_bytes(kwargs, self.fields) - pdu = ( - struct.pack('>BHH', self.pdu_id, transaction_id, len(parameters)) - + parameters - ) - self.pdu = pdu - self.transaction_id = transaction_id - - def init_from_bytes(self, pdu, offset): - return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) + @classmethod + def subclass(cls, subclass: type[_PDU]) -> type[_PDU]: + subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass) + cls.subclasses[subclass.pdu_id] = subclass + return subclass def __bytes__(self): - return self.pdu + if self._payload is None: + self._payload = struct.pack( + '>BHH', self.pdu_id, self.transaction_id, 0 + ) + hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields) + return self._payload + + @property + def name(self) -> str: + return self.pdu_id.name def __str__(self): result = f'{color(self.name, "blue")} [TID={self.transaction_id}]' if fields := getattr(self, 'fields', None): - result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') + result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ') elif len(self.pdu) > 1: result += f': {self.pdu.hex()}' return result # ----------------------------------------------------------------------------- -@SDP_PDU.subclass([('error_code', {'size': 2, 'mapper': SDP_PDU.error_name})]) +@SDP_PDU.subclass +@dataclass class SDP_ErrorResponse(SDP_PDU): ''' See Bluetooth spec @ Vol 3, Part B - 4.4.1 SDP_ErrorResponse PDU ''' - error_code: int + pdu_id = PduId.SDP_ERROR_RESPONSE + + error_code: ErrorCode = field(metadata=ErrorCode.type_metadata(2)) # ----------------------------------------------------------------------------- -@SDP_PDU.subclass( - [ - ('service_search_pattern', DataElement.parse_from_bytes), - ('maximum_service_record_count', '>2'), - ('continuation_state', '*'), - ] -) +@SDP_PDU.subclass +@dataclass class SDP_ServiceSearchRequest(SDP_PDU): ''' See Bluetooth spec @ Vol 3, Part B - 4.5.1 SDP_ServiceSearchRequest PDU ''' - service_search_pattern: DataElement - maximum_service_record_count: int - continuation_state: bytes + pdu_id = PduId.SDP_SERVICE_SEARCH_REQUEST + + service_search_pattern: DataElement = field( + metadata=hci.metadata(DataElement.parse_from_bytes) + ) + maximum_service_record_count: int = field(metadata=hci.metadata('>2')) + continuation_state: bytes = field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@SDP_PDU.subclass( - [ - ('total_service_record_count', '>2'), - ('current_service_record_count', '>2'), - ( - 'service_record_handle_list', - SDP_PDU.parse_service_record_handle_list_preceded_by_count, - ), - ('continuation_state', '*'), - ] -) +@SDP_PDU.subclass +@dataclass class SDP_ServiceSearchResponse(SDP_PDU): ''' See Bluetooth spec @ Vol 3, Part B - 4.5.2 SDP_ServiceSearchResponse PDU ''' - service_record_handle_list: list[int] - total_service_record_count: int - current_service_record_count: int - continuation_state: bytes + pdu_id = PduId.SDP_SERVICE_SEARCH_RESPONSE + + total_service_record_count: int = field(metadata=hci.metadata('>2')) + service_record_handle_list: Sequence[int] = field( + metadata=_SERVICE_RECORD_HANDLE_LIST_METADATA + ) + continuation_state: bytes = field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@SDP_PDU.subclass( - [ - ('service_record_handle', '>4'), - ('maximum_attribute_byte_count', '>2'), - ('attribute_id_list', DataElement.parse_from_bytes), - ('continuation_state', '*'), - ] -) +@SDP_PDU.subclass +@dataclass class SDP_ServiceAttributeRequest(SDP_PDU): ''' See Bluetooth spec @ Vol 3, Part B - 4.6.1 SDP_ServiceAttributeRequest PDU ''' - service_record_handle: int - maximum_attribute_byte_count: int - attribute_id_list: DataElement - continuation_state: bytes + pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_REQUEST + + service_record_handle: int = field(metadata=hci.metadata('>4')) + maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2')) + attribute_id_list: DataElement = field( + metadata=hci.metadata(DataElement.parse_from_bytes) + ) + continuation_state: bytes = field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@SDP_PDU.subclass( - [ - ('attribute_list_byte_count', '>2'), - ('attribute_list', SDP_PDU.parse_bytes_preceded_by_length), - ('continuation_state', '*'), - ] -) +@SDP_PDU.subclass +@dataclass class SDP_ServiceAttributeResponse(SDP_PDU): ''' See Bluetooth spec @ Vol 3, Part B - 4.6.2 SDP_ServiceAttributeResponse PDU ''' - attribute_list_byte_count: int - attribute_list: bytes - continuation_state: bytes + pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE + + attribute_list: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA) + continuation_state: bytes = field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@SDP_PDU.subclass( - [ - ('service_search_pattern', DataElement.parse_from_bytes), - ('maximum_attribute_byte_count', '>2'), - ('attribute_id_list', DataElement.parse_from_bytes), - ('continuation_state', '*'), - ] -) +@SDP_PDU.subclass +@dataclass class SDP_ServiceSearchAttributeRequest(SDP_PDU): ''' See Bluetooth spec @ Vol 3, Part B - 4.7.1 SDP_ServiceSearchAttributeRequest PDU ''' - service_search_pattern: DataElement - maximum_attribute_byte_count: int - attribute_id_list: DataElement - continuation_state: bytes + pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST + + service_search_pattern: DataElement = field( + metadata=hci.metadata(DataElement.parse_from_bytes) + ) + maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2')) + attribute_id_list: DataElement = field( + metadata=hci.metadata(DataElement.parse_from_bytes) + ) + continuation_state: bytes = field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- -@SDP_PDU.subclass( - [ - ('attribute_lists_byte_count', '>2'), - ('attribute_lists', SDP_PDU.parse_bytes_preceded_by_length), - ('continuation_state', '*'), - ] -) +@SDP_PDU.subclass +@dataclass class SDP_ServiceSearchAttributeResponse(SDP_PDU): ''' See Bluetooth spec @ Vol 3, Part B - 4.7.2 SDP_ServiceSearchAttributeResponse PDU ''' - attribute_lists_byte_count: int - attribute_lists: bytes - continuation_state: bytes + pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE + + attribute_lists: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA) + continuation_state: bytes = field(metadata=hci.metadata('*')) # ----------------------------------------------------------------------------- @@ -879,7 +845,7 @@ class Client: ) # Request and accumulate until there's no more continuation - service_record_handle_list = [] + service_record_handle_list: list[int] = [] continuation_state = bytes([0]) watchdog = SDP_CONTINUATION_WATCHDOG while watchdog > 0: @@ -1097,7 +1063,7 @@ class Server: logger.exception(color('failed to parse SDP Request PDU', 'red')) self.send_response( SDP_ErrorResponse( - transaction_id=0, error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR + transaction_id=0, error_code=ErrorCode.INVALID_REQUEST_SYNTAX ) ) @@ -1114,7 +1080,7 @@ class Server: self.send_response( SDP_ErrorResponse( transaction_id=sdp_pdu.transaction_id, - error_code=SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR, + error_code=ErrorCode.INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST, ) ) else: @@ -1122,7 +1088,7 @@ class Server: self.send_response( SDP_ErrorResponse( transaction_id=sdp_pdu.transaction_id, - error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR, + error_code=ErrorCode.INVALID_REQUEST_SYNTAX, ) ) @@ -1140,7 +1106,7 @@ class Server: self.send_response( SDP_ErrorResponse( transaction_id=transaction_id, - error_code=SDP_INVALID_CONTINUATION_STATE_ERROR, + error_code=ErrorCode.INVALID_CONTINUATION_STATE, ) ) return None @@ -1234,15 +1200,11 @@ class Server: if service_record_handles_remaining else bytes([0]) ) - service_record_handle_list = b''.join( - [struct.pack('>I', handle) for handle in service_record_handles] - ) self.send_response( SDP_ServiceSearchResponse( transaction_id=request.transaction_id, total_service_record_count=total_service_record_count, - current_service_record_count=len(service_record_handles), - service_record_handle_list=service_record_handle_list, + service_record_handle_list=service_record_handles, continuation_state=continuation_state, ) ) @@ -1265,7 +1227,7 @@ class Server: self.send_response( SDP_ErrorResponse( transaction_id=request.transaction_id, - error_code=SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR, + error_code=ErrorCode.INVALID_SERVICE_RECORD_HANDLE, ) ) return @@ -1290,7 +1252,6 @@ class Server: self.send_response( SDP_ServiceAttributeResponse( transaction_id=request.transaction_id, - attribute_list_byte_count=len(attribute_list_response), attribute_list=attribute_list_response, continuation_state=continuation_state, ) @@ -1337,7 +1298,6 @@ class Server: self.send_response( SDP_ServiceSearchAttributeResponse( transaction_id=request.transaction_id, - attribute_lists_byte_count=len(attribute_lists_response), attribute_lists=attribute_lists_response, continuation_state=continuation_state, )