# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # ATT - Attribute Protocol # # See Bluetooth spec @ Vol 3, Part F # # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import dataclasses import enum import functools import inspect import struct from typing import ( TYPE_CHECKING, Awaitable, Callable, ClassVar, Generic, Optional, TypeVar, Union, ) from bumble import hci, utils from bumble.colors import color from bumble.core import UUID, InvalidOperationError, ProtocolError from bumble.hci import HCI_Object # ----------------------------------------------------------------------------- # Typing # ----------------------------------------------------------------------------- if TYPE_CHECKING: from bumble.device import Connection _T = TypeVar('_T') # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- # fmt: off # pylint: disable=line-too-long ATT_CID = 0x04 ATT_PSM = 0x001F class Opcode(hci.SpecableEnum): ATT_ERROR_RESPONSE = 0x01 ATT_EXCHANGE_MTU_REQUEST = 0x02 ATT_EXCHANGE_MTU_RESPONSE = 0x03 ATT_FIND_INFORMATION_REQUEST = 0x04 ATT_FIND_INFORMATION_RESPONSE = 0x05 ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06 ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07 ATT_READ_BY_TYPE_REQUEST = 0x08 ATT_READ_BY_TYPE_RESPONSE = 0x09 ATT_READ_REQUEST = 0x0A ATT_READ_RESPONSE = 0x0B ATT_READ_BLOB_REQUEST = 0x0C ATT_READ_BLOB_RESPONSE = 0x0D ATT_READ_MULTIPLE_REQUEST = 0x0E ATT_READ_MULTIPLE_RESPONSE = 0x0F ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10 ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11 ATT_WRITE_REQUEST = 0x12 ATT_WRITE_RESPONSE = 0x13 ATT_WRITE_COMMAND = 0x52 ATT_SIGNED_WRITE_COMMAND = 0xD2 ATT_PREPARE_WRITE_REQUEST = 0x16 ATT_PREPARE_WRITE_RESPONSE = 0x17 ATT_EXECUTE_WRITE_REQUEST = 0x18 ATT_EXECUTE_WRITE_RESPONSE = 0x19 ATT_HANDLE_VALUE_NOTIFICATION = 0x1B ATT_HANDLE_VALUE_INDICATION = 0x1D ATT_HANDLE_VALUE_CONFIRMATION = 0x1E ATT_REQUESTS = [ Opcode.ATT_EXCHANGE_MTU_REQUEST, Opcode.ATT_FIND_INFORMATION_REQUEST, Opcode.ATT_FIND_BY_TYPE_VALUE_REQUEST, Opcode.ATT_READ_BY_TYPE_REQUEST, Opcode.ATT_READ_REQUEST, Opcode.ATT_READ_BLOB_REQUEST, Opcode.ATT_READ_MULTIPLE_REQUEST, Opcode.ATT_READ_BY_GROUP_TYPE_REQUEST, Opcode.ATT_WRITE_REQUEST, Opcode.ATT_PREPARE_WRITE_REQUEST, Opcode.ATT_EXECUTE_WRITE_REQUEST ] ATT_RESPONSES = [ Opcode.ATT_ERROR_RESPONSE, Opcode.ATT_EXCHANGE_MTU_RESPONSE, Opcode.ATT_FIND_INFORMATION_RESPONSE, Opcode.ATT_FIND_BY_TYPE_VALUE_RESPONSE, Opcode.ATT_READ_BY_TYPE_RESPONSE, Opcode.ATT_READ_RESPONSE, Opcode.ATT_READ_BLOB_RESPONSE, Opcode.ATT_READ_MULTIPLE_RESPONSE, Opcode.ATT_READ_BY_GROUP_TYPE_RESPONSE, Opcode.ATT_WRITE_RESPONSE, Opcode.ATT_PREPARE_WRITE_RESPONSE, Opcode.ATT_EXECUTE_WRITE_RESPONSE ] class ErrorCode(hci.SpecableEnum): ''' See * Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response * Core Specification Supplement: Common Profile And Service Error Codes ''' INVALID_HANDLE = 0x01 READ_NOT_PERMITTED = 0x02 WRITE_NOT_PERMITTED = 0x03 INVALID_PDU = 0x04 INSUFFICIENT_AUTHENTICATION = 0x05 REQUEST_NOT_SUPPORTED = 0x06 INVALID_OFFSET = 0x07 INSUFFICIENT_AUTHORIZATION = 0x08 PREPARE_QUEUE_FULL = 0x09 ATTRIBUTE_NOT_FOUND = 0x0A ATTRIBUTE_NOT_LONG = 0x0B INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0C INVALID_ATTRIBUTE_LENGTH = 0x0D UNLIKELY_ERROR = 0x0E INSUFFICIENT_ENCRYPTION = 0x0F UNSUPPORTED_GROUP_TYPE = 0x10 INSUFFICIENT_RESOURCES = 0x11 DATABASE_OUT_OF_SYNC = 0x12 VALUE_NOT_ALLOWED = 0x13 # 0x80 – 0x9F: Application Error # 0xE0 – 0xFF: Common Profile and Service Error Codes WRITE_REQUEST_REJECTED = 0xFC CCCD_IMPROPERLY_CONFIGURED = 0xFD PROCEDURE_ALREADY_IN_PROGRESS = 0xFE OUT_OF_RANGE = 0xFF # Backward Compatible Constants ATT_INVALID_HANDLE_ERROR = ErrorCode.INVALID_HANDLE ATT_READ_NOT_PERMITTED_ERROR = ErrorCode.READ_NOT_PERMITTED ATT_WRITE_NOT_PERMITTED_ERROR = ErrorCode.WRITE_NOT_PERMITTED ATT_INVALID_PDU_ERROR = ErrorCode.INVALID_PDU ATT_INSUFFICIENT_AUTHENTICATION_ERROR = ErrorCode.INSUFFICIENT_AUTHENTICATION ATT_REQUEST_NOT_SUPPORTED_ERROR = ErrorCode.REQUEST_NOT_SUPPORTED ATT_INVALID_OFFSET_ERROR = ErrorCode.INVALID_OFFSET ATT_INSUFFICIENT_AUTHORIZATION_ERROR = ErrorCode.INSUFFICIENT_AUTHORIZATION ATT_PREPARE_QUEUE_FULL_ERROR = ErrorCode.PREPARE_QUEUE_FULL ATT_ATTRIBUTE_NOT_FOUND_ERROR = ErrorCode.ATTRIBUTE_NOT_FOUND ATT_ATTRIBUTE_NOT_LONG_ERROR = ErrorCode.ATTRIBUTE_NOT_LONG ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = ErrorCode.INSUFFICIENT_ENCRYPTION_KEY_SIZE ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = ErrorCode.INVALID_ATTRIBUTE_LENGTH ATT_UNLIKELY_ERROR_ERROR = ErrorCode.UNLIKELY_ERROR ATT_INSUFFICIENT_ENCRYPTION_ERROR = ErrorCode.INSUFFICIENT_ENCRYPTION ATT_UNSUPPORTED_GROUP_TYPE_ERROR = ErrorCode.UNSUPPORTED_GROUP_TYPE ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES ATT_DEFAULT_MTU = 23 HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'} # fmt: on # pylint: enable=line-too-long # pylint: disable=invalid-name # ----------------------------------------------------------------------------- # Exceptions # ----------------------------------------------------------------------------- class ATT_Error(ProtocolError): error_code: int att_handle: int def __init__( self, error_code: int, att_handle: int = 0x0000, message: str = '' ) -> None: super().__init__( error_code, error_namespace='att', error_name=ErrorCode(error_code).name, ) self.att_handle = att_handle self.message = message def __str__(self): return ( f'ATT_Error(error={self.error_name}, ' f'handle={self.att_handle:04X}): {self.message}' ) # ----------------------------------------------------------------------------- # Attribute Protocol # ----------------------------------------------------------------------------- @dataclasses.dataclass class ATT_PDU: ''' See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU ''' pdu_classes: ClassVar[dict[int, type[ATT_PDU]]] = {} fields: ClassVar[hci.Fields] = () op_code: int = dataclasses.field(init=False) name: str = dataclasses.field(init=False) _payload: Optional[bytes] = dataclasses.field(default=None, init=False) @classmethod def from_bytes(cls, pdu: bytes) -> ATT_PDU: op_code = pdu[0] subclass = ATT_PDU.pdu_classes.get(op_code) if subclass is None: instance = ATT_PDU() instance.op_code = op_code instance.payload = pdu[1:] instance.name = Opcode(op_code).name return instance instance = subclass(**HCI_Object.dict_from_bytes(pdu, 1, subclass.fields)) instance.payload = pdu[1:] return instance _PDU = TypeVar("_PDU", bound="ATT_PDU") @classmethod def subclass(cls, subclass: type[_PDU]) -> type[_PDU]: subclass.name = subclass.__name__.upper() subclass.op_code = Opcode[subclass.name] subclass.fields = HCI_Object.fields_from_dataclass(subclass) # Register a factory for this class ATT_PDU.pdu_classes[subclass.op_code] = subclass return subclass def init_from_bytes(self, pdu, offset): return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) @property def is_command(self): return ((self.op_code >> 6) & 1) == 1 @property def has_authentication_signature(self): return ((self.op_code >> 7) & 1) == 1 @property def payload(self) -> bytes: if self._payload is None: self._payload = HCI_Object.dict_to_bytes(self.__dict__, self.fields) return self._payload @payload.setter def payload(self, value: bytes): self._payload = value def __bytes__(self) -> bytes: return bytes([self.op_code]) + self.payload def __str__(self): result = color(self.name, 'yellow') if fields := getattr(self, 'fields', None): result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') else: if self.payload: result += f': {self.payload.hex()}' return result # ----------------------------------------------------------------------------- @ATT_PDU.subclass @dataclasses.dataclass class ATT_Error_Response(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response ''' request_opcode_in_error: int = dataclasses.field(metadata=Opcode.type_metadata(1)) attribute_handle_in_error: int = dataclasses.field( metadata=hci.metadata(HANDLE_FIELD_SPEC) ) error_code: int = dataclasses.field(metadata=ErrorCode.type_metadata(1)) # ----------------------------------------------------------------------------- @ATT_PDU.subclass @dataclasses.dataclass class ATT_Exchange_MTU_Request(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request ''' client_rx_mtu: int = dataclasses.field(metadata=hci.metadata(2)) # ----------------------------------------------------------------------------- @ATT_PDU.subclass @dataclasses.dataclass class ATT_Exchange_MTU_Response(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response ''' server_rx_mtu: int = dataclasses.field(metadata=hci.metadata(2)) # ----------------------------------------------------------------------------- @ATT_PDU.subclass @dataclasses.dataclass class ATT_Find_Information_Request(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request ''' starting_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC)) ending_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC)) # ----------------------------------------------------------------------------- @ATT_PDU.subclass @dataclasses.dataclass class ATT_Find_Information_Response(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response ''' format: int = dataclasses.field(metadata=hci.metadata(1)) information_data: bytes = dataclasses.field(metadata=hci.metadata("*")) information: list[tuple[int, bytes]] = dataclasses.field(init=False) def __post_init__(self) -> None: self.information = [] offset = 0 uuid_size = 2 if self.format == 1 else 16 while offset + uuid_size <= len(self.information_data): handle = struct.unpack_from(' None: self.handles_information = [] offset = 0 while offset + 4 <= len(self.handles_information_list): found_attribute_handle, group_end_handle = struct.unpack_from( ' None: self.attributes = [] offset = 0 while self.length != 0 and offset + self.length <= len( self.attribute_data_list ): (attribute_handle,) = struct.unpack_from( ' None: self.attributes = [] offset = 0 while self.length != 0 and offset + self.length <= len( self.attribute_data_list ): attribute_handle, end_group_handle = struct.unpack_from( ' Union[_T, Awaitable[_T]]: if self._read is None: raise InvalidOperationError('AttributeValue has no read function') return self._read(connection) def write(self, connection: Connection, value: _T) -> Union[Awaitable[None], None]: if self._write is None: raise InvalidOperationError('AttributeValue has no write function') return self._write(connection, value) # ----------------------------------------------------------------------------- class Attribute(utils.EventEmitter, Generic[_T]): class Permissions(enum.IntFlag): READABLE = 0x01 WRITEABLE = 0x02 READ_REQUIRES_ENCRYPTION = 0x04 WRITE_REQUIRES_ENCRYPTION = 0x08 READ_REQUIRES_AUTHENTICATION = 0x10 WRITE_REQUIRES_AUTHENTICATION = 0x20 READ_REQUIRES_AUTHORIZATION = 0x40 WRITE_REQUIRES_AUTHORIZATION = 0x80 @classmethod def from_string(cls, permissions_str: str) -> Attribute.Permissions: try: return functools.reduce( lambda x, y: x | Attribute.Permissions[y], permissions_str.replace('|', ',').split(","), Attribute.Permissions(0), ) except TypeError as exc: # The check for `p.name is not None` here is needed because for InFlag # enums, the .name property can be None, when the enum value is 0, # so the type hint for .name is Optional[str]. enum_list: list[str] = [p.name for p in cls if p.name is not None] enum_list_str = ",".join(enum_list) raise TypeError( f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {enum_list_str}\nGot: {permissions_str}" ) from exc # Permission flags(legacy-use only) READABLE = Permissions.READABLE WRITEABLE = Permissions.WRITEABLE READ_REQUIRES_ENCRYPTION = Permissions.READ_REQUIRES_ENCRYPTION WRITE_REQUIRES_ENCRYPTION = Permissions.WRITE_REQUIRES_ENCRYPTION READ_REQUIRES_AUTHENTICATION = Permissions.READ_REQUIRES_AUTHENTICATION WRITE_REQUIRES_AUTHENTICATION = Permissions.WRITE_REQUIRES_AUTHENTICATION READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION EVENT_READ = "read" EVENT_WRITE = "write" value: Union[AttributeValue[_T], _T, None] def __init__( self, attribute_type: Union[str, bytes, UUID], permissions: Union[str, Attribute.Permissions], value: Union[AttributeValue[_T], _T, None] = None, ) -> None: utils.EventEmitter.__init__(self) self.handle = 0 self.end_group_handle = 0 if isinstance(permissions, str): self.permissions = Attribute.Permissions.from_string(permissions) else: self.permissions = permissions # Convert the type to a UUID object if it isn't already if isinstance(attribute_type, str): self.type = UUID(attribute_type) elif isinstance(attribute_type, bytes): self.type = UUID.from_bytes(attribute_type) else: self.type = attribute_type self.value = value def encode_value(self, value: _T) -> bytes: return value # type: ignore def decode_value(self, value: bytes) -> _T: return value # type: ignore async def read_value(self, connection: Connection) -> bytes: if ( (self.permissions & self.READ_REQUIRES_ENCRYPTION) and connection is not None and not connection.encryption ): raise ATT_Error( error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle ) if ( (self.permissions & self.READ_REQUIRES_AUTHENTICATION) and connection is not None and not connection.authenticated ): raise ATT_Error( error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle ) if self.permissions & self.READ_REQUIRES_AUTHORIZATION: # TODO: handle authorization better raise ATT_Error( error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle ) value: Union[_T, None] if isinstance(self.value, AttributeValue): try: read_value = self.value.read(connection) if inspect.isawaitable(read_value): value = await read_value else: value = read_value except ATT_Error as error: raise ATT_Error( error_code=error.error_code, att_handle=self.handle ) from error else: value = self.value self.emit(self.EVENT_READ, connection, b'' if value is None else value) return b'' if value is None else self.encode_value(value) async def write_value(self, connection: Connection, value: bytes) -> None: if ( (self.permissions & self.WRITE_REQUIRES_ENCRYPTION) and connection is not None and not connection.encryption ): raise ATT_Error( error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle ) if ( (self.permissions & self.WRITE_REQUIRES_AUTHENTICATION) and connection is not None and not connection.authenticated ): raise ATT_Error( error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle ) if self.permissions & self.WRITE_REQUIRES_AUTHORIZATION: # TODO: handle authorization better raise ATT_Error( error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle ) decoded_value = self.decode_value(value) if isinstance(self.value, AttributeValue): try: result = self.value.write(connection, decoded_value) if inspect.isawaitable(result): await result except ATT_Error as error: raise ATT_Error( error_code=error.error_code, att_handle=self.handle ) from error else: self.value = decoded_value self.emit(self.EVENT_WRITE, connection, decoded_value) def __repr__(self): if isinstance(self.value, bytes): value_str = self.value.hex() else: value_str = str(self.value) if value_str: value_string = f', value={self.value.hex()}' else: value_string = '' return ( f'Attribute(handle=0x{self.handle:04X}, ' f'type={self.type}, ' f'permissions={self.permissions}{value_string})' )