# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # ATT - Attribute Protocol # # See Bluetooth spec @ Vol 3, Part F # # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from colors import color from pyee import EventEmitter from .core import * from .hci import * # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- ATT_CID = 0x04 ATT_ERROR_RESPONSE = 0x01 ATT_EXCHANGE_MTU_REQUEST = 0x02 ATT_EXCHANGE_MTU_RESPONSE = 0x03 ATT_FIND_INFORMATION_REQUEST = 0x04 ATT_FIND_INFORMATION_RESPONSE = 0x05 ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06 ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07 ATT_READ_BY_TYPE_REQUEST = 0x08 ATT_READ_BY_TYPE_RESPONSE = 0x09 ATT_READ_REQUEST = 0x0A ATT_READ_RESPONSE = 0x0B ATT_READ_BLOB_REQUEST = 0x0C ATT_READ_BLOB_RESPONSE = 0x0D ATT_READ_MULTIPLE_REQUEST = 0x0E ATT_READ_MULTIPLE_RESPONSE = 0x0F ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10 ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11 ATT_WRITE_REQUEST = 0x12 ATT_WRITE_RESPONSE = 0x13 ATT_WRITE_COMMAND = 0x52 ATT_SIGNED_WRITE_COMMAND = 0xD2 ATT_PREPARE_WRITE_REQUEST = 0x16 ATT_PREPARE_WRITE_RESPONSE = 0x17 ATT_EXECUTE_WRITE_REQUEST = 0x18 ATT_EXECUTE_WRITE_RESPONSE = 0x19 ATT_HANDLE_VALUE_NOTIFICATION = 0x1B ATT_HANDLE_VALUE_INDICATION = 0x1D ATT_HANDLE_VALUE_CONFIRMATION = 0x1E ATT_PDU_NAMES = { ATT_ERROR_RESPONSE: 'ATT_ERROR_RESPONSE', ATT_EXCHANGE_MTU_REQUEST: 'ATT_EXCHANGE_MTU_REQUEST', ATT_EXCHANGE_MTU_RESPONSE: 'ATT_EXCHANGE_MTU_RESPONSE', ATT_FIND_INFORMATION_REQUEST: 'ATT_FIND_INFORMATION_REQUEST', ATT_FIND_INFORMATION_RESPONSE: 'ATT_FIND_INFORMATION_RESPONSE', ATT_FIND_BY_TYPE_VALUE_REQUEST: 'ATT_FIND_BY_TYPE_VALUE_REQUEST', ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE', ATT_READ_BY_TYPE_REQUEST: 'ATT_READ_BY_TYPE_REQUEST', ATT_READ_BY_TYPE_RESPONSE: 'ATT_READ_BY_TYPE_RESPONSE', ATT_READ_REQUEST: 'ATT_READ_REQUEST', ATT_READ_RESPONSE: 'ATT_READ_RESPONSE', ATT_READ_BLOB_REQUEST: 'ATT_READ_BLOB_REQUEST', ATT_READ_BLOB_RESPONSE: 'ATT_READ_BLOB_RESPONSE', ATT_READ_MULTIPLE_REQUEST: 'ATT_READ_MULTIPLE_REQUEST', ATT_READ_MULTIPLE_RESPONSE: 'ATT_READ_MULTIPLE_RESPONSE', ATT_READ_BY_GROUP_TYPE_REQUEST: 'ATT_READ_BY_GROUP_TYPE_REQUEST', ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE', ATT_WRITE_REQUEST: 'ATT_WRITE_REQUEST', ATT_WRITE_RESPONSE: 'ATT_WRITE_RESPONSE', ATT_WRITE_COMMAND: 'ATT_WRITE_COMMAND', ATT_SIGNED_WRITE_COMMAND: 'ATT_SIGNED_WRITE_COMMAND', ATT_PREPARE_WRITE_REQUEST: 'ATT_PREPARE_WRITE_REQUEST', ATT_PREPARE_WRITE_RESPONSE: 'ATT_PREPARE_WRITE_RESPONSE', ATT_EXECUTE_WRITE_REQUEST: 'ATT_EXECUTE_WRITE_REQUEST', ATT_EXECUTE_WRITE_RESPONSE: 'ATT_EXECUTE_WRITE_RESPONSE', ATT_HANDLE_VALUE_NOTIFICATION: 'ATT_HANDLE_VALUE_NOTIFICATION', ATT_HANDLE_VALUE_INDICATION: 'ATT_HANDLE_VALUE_INDICATION', ATT_HANDLE_VALUE_CONFIRMATION: 'ATT_HANDLE_VALUE_CONFIRMATION' } ATT_REQUESTS = [ ATT_EXCHANGE_MTU_REQUEST, ATT_FIND_INFORMATION_REQUEST, ATT_FIND_BY_TYPE_VALUE_REQUEST, ATT_READ_BY_TYPE_REQUEST, ATT_READ_REQUEST, ATT_READ_BLOB_REQUEST, ATT_READ_MULTIPLE_REQUEST, ATT_READ_BY_GROUP_TYPE_REQUEST, ATT_WRITE_REQUEST, ATT_PREPARE_WRITE_REQUEST, ATT_EXECUTE_WRITE_REQUEST ] ATT_RESPONSES = [ ATT_ERROR_RESPONSE, ATT_EXCHANGE_MTU_RESPONSE, ATT_FIND_INFORMATION_RESPONSE, ATT_FIND_BY_TYPE_VALUE_RESPONSE, ATT_READ_BY_TYPE_RESPONSE, ATT_READ_RESPONSE, ATT_READ_BLOB_RESPONSE, ATT_READ_MULTIPLE_RESPONSE, ATT_READ_BY_GROUP_TYPE_RESPONSE, ATT_WRITE_RESPONSE, ATT_PREPARE_WRITE_RESPONSE, ATT_EXECUTE_WRITE_RESPONSE ] ATT_INVALID_HANDLE_ERROR = 0x01 ATT_READ_NOT_PERMITTED_ERROR = 0x02 ATT_WRITE_NOT_PERMITTED_ERROR = 0x03 ATT_INVALID_PDU_ERROR = 0x04 ATT_INSUFFICIENT_AUTHENTICATION_ERROR = 0x05 ATT_REQUEST_NOT_SUPPORTED_ERROR = 0x06 ATT_INVALID_OFFSET_ERROR = 0x07 ATT_INSUFFICIENT_AUTHORIZATION_ERROR = 0x08 ATT_PREPARE_QUEUE_FULL_ERROR = 0x09 ATT_ATTRIBUTE_NOT_FOUND_ERROR = 0x0A ATT_ATTRIBUTE_NOT_LONG_ERROR = 0x0B ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = 0x0D ATT_UNLIKELY_ERROR_ERROR = 0x0E ATT_INSUFFICIENT_ENCRYPTION_ERROR = 0x0F ATT_UNSUPPORTED_GROUP_TYPE_ERROR = 0x10 ATT_INSUFFICIENT_RESOURCES_ERROR = 0x11 ATT_ERROR_NAMES = { ATT_INVALID_HANDLE_ERROR: 'ATT_INVALID_HANDLE_ERROR', ATT_READ_NOT_PERMITTED_ERROR: 'ATT_READ_NOT_PERMITTED_ERROR', ATT_WRITE_NOT_PERMITTED_ERROR: 'ATT_WRITE_NOT_PERMITTED_ERROR', ATT_INVALID_PDU_ERROR: 'ATT_INVALID_PDU_ERROR', ATT_INSUFFICIENT_AUTHENTICATION_ERROR: 'ATT_INSUFFICIENT_AUTHENTICATION_ERROR', ATT_REQUEST_NOT_SUPPORTED_ERROR: 'ATT_REQUEST_NOT_SUPPORTED_ERROR', ATT_INVALID_OFFSET_ERROR: 'ATT_INVALID_OFFSET_ERROR', ATT_INSUFFICIENT_AUTHORIZATION_ERROR: 'ATT_INSUFFICIENT_AUTHORIZATION_ERROR', ATT_PREPARE_QUEUE_FULL_ERROR: 'ATT_PREPARE_QUEUE_FULL_ERROR', ATT_ATTRIBUTE_NOT_FOUND_ERROR: 'ATT_ATTRIBUTE_NOT_FOUND_ERROR', ATT_ATTRIBUTE_NOT_LONG_ERROR: 'ATT_ATTRIBUTE_NOT_LONG_ERROR', ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR', ATT_INVALID_ATTRIBUTE_LENGTH_ERROR: 'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR', ATT_UNLIKELY_ERROR_ERROR: 'ATT_UNLIKELY_ERROR_ERROR', ATT_INSUFFICIENT_ENCRYPTION_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_ERROR', ATT_UNSUPPORTED_GROUP_TYPE_ERROR: 'ATT_UNSUPPORTED_GROUP_TYPE_ERROR', ATT_INSUFFICIENT_RESOURCES_ERROR: 'ATT_INSUFFICIENT_RESOURCES_ERROR' } ATT_DEFAULT_MTU = 23 HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'} UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y) # noqa: E731 UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731 # ----------------------------------------------------------------------------- # Utils # ----------------------------------------------------------------------------- def key_with_value(dictionary, target_value): for key, value in dictionary.items(): if value == target_value: return key return None # ----------------------------------------------------------------------------- # Exceptions # ----------------------------------------------------------------------------- class ATT_Error(Exception): def __init__(self, error_code, att_handle=0x0000): self.error_code = error_code self.att_handle = att_handle def __str__(self): return f'ATT_Error({ATT_PDU.error_name(self.error_code)})' # ----------------------------------------------------------------------------- # Attribute Protocol # ----------------------------------------------------------------------------- class ATT_PDU: ''' See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU ''' pdu_classes = {} op_code = 0 @staticmethod def from_bytes(pdu): op_code = pdu[0] cls = ATT_PDU.pdu_classes.get(op_code) if cls is None: instance = ATT_PDU(pdu) instance.name = ATT_PDU.pdu_name(op_code) instance.op_code = op_code return instance self = cls.__new__(cls) ATT_PDU.__init__(self, pdu) if hasattr(self, 'fields'): self.init_from_bytes(pdu, 1) return self @staticmethod def pdu_name(op_code): return name_or_number(ATT_PDU_NAMES, op_code, 2) @staticmethod def error_name(error_code): return name_or_number(ATT_ERROR_NAMES, error_code, 2) @staticmethod def subclass(fields): def inner(cls): cls.name = cls.__name__.upper() cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name) if cls.op_code is None: raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES') cls.fields = fields # Register a factory for this class ATT_PDU.pdu_classes[cls.op_code] = cls return cls return inner def __init__(self, pdu=None, **kwargs): if hasattr(self, 'fields') and kwargs: HCI_Object.init_from_fields(self, self.fields, kwargs) if pdu is None: pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) self.pdu = pdu def init_from_bytes(self, pdu, offset): return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) def to_bytes(self): return self.pdu @property def is_command(self): return ((self.op_code >> 6) & 1) == 1 @property def has_authentication_signature(self): return ((self.op_code >> 7) & 1) == 1 def __bytes__(self): return self.to_bytes() def __str__(self): result = color(self.name, 'yellow') if fields := getattr(self, 'fields', None): result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') else: if len(self.pdu) > 1: result += f': {self.pdu.hex()}' return result # ----------------------------------------------------------------------------- @ATT_PDU.subclass([ ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}), ('attribute_handle_in_error', HANDLE_FIELD_SPEC), ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}) ]) class ATT_Error_Response(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response ''' # ----------------------------------------------------------------------------- @ATT_PDU.subclass([ ('client_rx_mtu', 2) ]) class ATT_Exchange_MTU_Request(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request ''' # ----------------------------------------------------------------------------- @ATT_PDU.subclass([ ('server_rx_mtu', 2) ]) class ATT_Exchange_MTU_Response(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response ''' # ----------------------------------------------------------------------------- @ATT_PDU.subclass([ ('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC) ]) class ATT_Find_Information_Request(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request ''' # ----------------------------------------------------------------------------- @ATT_PDU.subclass([ ('format', 1), ('information_data', '*') ]) class ATT_Find_Information_Response(ATT_PDU): ''' See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response ''' def parse_information_data(self): self.information = [] offset = 0 uuid_size = 2 if self.format == 1 else 16 while offset + uuid_size <= len(self.information_data): handle = struct.unpack_from(' 0: value_string = f', value={self.value.hex()}' else: value_string = '' return f'Attribute(handle=0x{self.handle:04X}, type={self.type}, permissions={self.permissions}{value_string})'