Migrate ATT PDU to dataclasses

This commit is contained in:
Josh Wu
2025-07-02 13:46:40 +08:00
parent a275c399a3
commit 17563e423a
4 changed files with 423 additions and 431 deletions

View File

@@ -24,6 +24,7 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import enum
import functools
import inspect
@@ -34,13 +35,16 @@ from typing import (
Generic,
TypeVar,
Union,
ClassVar,
Optional,
TYPE_CHECKING,
)
from bumble import hci
from bumble import utils
from bumble.core import UUID, name_or_number, InvalidOperationError, ProtocolError
from bumble.hci import HCI_Object, key_with_value
from bumble.core import UUID, InvalidOperationError, ProtocolError
from bumble.hci import HCI_Object
from bumble.colors import color
# -----------------------------------------------------------------------------
@@ -60,96 +64,66 @@ _T = TypeVar('_T')
ATT_CID = 0x04
ATT_PSM = 0x001F
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_PDU_NAMES = {
ATT_ERROR_RESPONSE: 'ATT_ERROR_RESPONSE',
ATT_EXCHANGE_MTU_REQUEST: 'ATT_EXCHANGE_MTU_REQUEST',
ATT_EXCHANGE_MTU_RESPONSE: 'ATT_EXCHANGE_MTU_RESPONSE',
ATT_FIND_INFORMATION_REQUEST: 'ATT_FIND_INFORMATION_REQUEST',
ATT_FIND_INFORMATION_RESPONSE: 'ATT_FIND_INFORMATION_RESPONSE',
ATT_FIND_BY_TYPE_VALUE_REQUEST: 'ATT_FIND_BY_TYPE_VALUE_REQUEST',
ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE',
ATT_READ_BY_TYPE_REQUEST: 'ATT_READ_BY_TYPE_REQUEST',
ATT_READ_BY_TYPE_RESPONSE: 'ATT_READ_BY_TYPE_RESPONSE',
ATT_READ_REQUEST: 'ATT_READ_REQUEST',
ATT_READ_RESPONSE: 'ATT_READ_RESPONSE',
ATT_READ_BLOB_REQUEST: 'ATT_READ_BLOB_REQUEST',
ATT_READ_BLOB_RESPONSE: 'ATT_READ_BLOB_RESPONSE',
ATT_READ_MULTIPLE_REQUEST: 'ATT_READ_MULTIPLE_REQUEST',
ATT_READ_MULTIPLE_RESPONSE: 'ATT_READ_MULTIPLE_RESPONSE',
ATT_READ_BY_GROUP_TYPE_REQUEST: 'ATT_READ_BY_GROUP_TYPE_REQUEST',
ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE',
ATT_WRITE_REQUEST: 'ATT_WRITE_REQUEST',
ATT_WRITE_RESPONSE: 'ATT_WRITE_RESPONSE',
ATT_WRITE_COMMAND: 'ATT_WRITE_COMMAND',
ATT_SIGNED_WRITE_COMMAND: 'ATT_SIGNED_WRITE_COMMAND',
ATT_PREPARE_WRITE_REQUEST: 'ATT_PREPARE_WRITE_REQUEST',
ATT_PREPARE_WRITE_RESPONSE: 'ATT_PREPARE_WRITE_RESPONSE',
ATT_EXECUTE_WRITE_REQUEST: 'ATT_EXECUTE_WRITE_REQUEST',
ATT_EXECUTE_WRITE_RESPONSE: 'ATT_EXECUTE_WRITE_RESPONSE',
ATT_HANDLE_VALUE_NOTIFICATION: 'ATT_HANDLE_VALUE_NOTIFICATION',
ATT_HANDLE_VALUE_INDICATION: 'ATT_HANDLE_VALUE_INDICATION',
ATT_HANDLE_VALUE_CONFIRMATION: 'ATT_HANDLE_VALUE_CONFIRMATION'
}
class Opcode(hci.SpecableEnum):
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_REQUESTS = [
ATT_EXCHANGE_MTU_REQUEST,
ATT_FIND_INFORMATION_REQUEST,
ATT_FIND_BY_TYPE_VALUE_REQUEST,
ATT_READ_BY_TYPE_REQUEST,
ATT_READ_REQUEST,
ATT_READ_BLOB_REQUEST,
ATT_READ_MULTIPLE_REQUEST,
ATT_READ_BY_GROUP_TYPE_REQUEST,
ATT_WRITE_REQUEST,
ATT_PREPARE_WRITE_REQUEST,
ATT_EXECUTE_WRITE_REQUEST
Opcode.ATT_EXCHANGE_MTU_REQUEST,
Opcode.ATT_FIND_INFORMATION_REQUEST,
Opcode.ATT_FIND_BY_TYPE_VALUE_REQUEST,
Opcode.ATT_READ_BY_TYPE_REQUEST,
Opcode.ATT_READ_REQUEST,
Opcode.ATT_READ_BLOB_REQUEST,
Opcode.ATT_READ_MULTIPLE_REQUEST,
Opcode.ATT_READ_BY_GROUP_TYPE_REQUEST,
Opcode.ATT_WRITE_REQUEST,
Opcode.ATT_PREPARE_WRITE_REQUEST,
Opcode.ATT_EXECUTE_WRITE_REQUEST
]
ATT_RESPONSES = [
ATT_ERROR_RESPONSE,
ATT_EXCHANGE_MTU_RESPONSE,
ATT_FIND_INFORMATION_RESPONSE,
ATT_FIND_BY_TYPE_VALUE_RESPONSE,
ATT_READ_BY_TYPE_RESPONSE,
ATT_READ_RESPONSE,
ATT_READ_BLOB_RESPONSE,
ATT_READ_MULTIPLE_RESPONSE,
ATT_READ_BY_GROUP_TYPE_RESPONSE,
ATT_WRITE_RESPONSE,
ATT_PREPARE_WRITE_RESPONSE,
ATT_EXECUTE_WRITE_RESPONSE
Opcode.ATT_ERROR_RESPONSE,
Opcode.ATT_EXCHANGE_MTU_RESPONSE,
Opcode.ATT_FIND_INFORMATION_RESPONSE,
Opcode.ATT_FIND_BY_TYPE_VALUE_RESPONSE,
Opcode.ATT_READ_BY_TYPE_RESPONSE,
Opcode.ATT_READ_RESPONSE,
Opcode.ATT_READ_BLOB_RESPONSE,
Opcode.ATT_READ_MULTIPLE_RESPONSE,
Opcode.ATT_READ_BY_GROUP_TYPE_RESPONSE,
Opcode.ATT_WRITE_RESPONSE,
Opcode.ATT_PREPARE_WRITE_RESPONSE,
Opcode.ATT_EXECUTE_WRITE_RESPONSE
]
class ErrorCode(utils.OpenIntEnum):
class ErrorCode(hci.SpecableEnum):
'''
See
@@ -204,10 +178,6 @@ ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES
ATT_DEFAULT_MTU = 23
HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y)
# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731
# fmt: on
# pylint: enable=line-too-long
@@ -227,7 +197,7 @@ class ATT_Error(ProtocolError):
super().__init__(
error_code,
error_namespace='att',
error_name=ATT_PDU.error_name(error_code),
error_name=ErrorCode(error_code).name,
)
self.att_handle = att_handle
self.message = message
@@ -242,61 +212,45 @@ class ATT_Error(ProtocolError):
# -----------------------------------------------------------------------------
# Attribute Protocol
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ATT_PDU:
'''
See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
'''
pdu_classes: dict[int, type[ATT_PDU]] = {}
op_code = 0
name: str
@staticmethod
def from_bytes(pdu):
op_code = pdu[0]
cls = ATT_PDU.pdu_classes.get(op_code)
if cls is None:
instance = ATT_PDU(pdu)
instance.name = ATT_PDU.pdu_name(op_code)
instance.op_code = op_code
return instance
self = cls.__new__(cls)
ATT_PDU.__init__(self, pdu)
if hasattr(self, 'fields'):
self.init_from_bytes(pdu, 1)
return self
@staticmethod
def pdu_name(op_code):
return name_or_number(ATT_PDU_NAMES, op_code, 2)
pdu_classes: ClassVar[dict[int, type[ATT_PDU]]] = {}
fields: ClassVar[hci.Fields] = ()
op_code: int = dataclasses.field(init=False)
name: str = dataclasses.field(init=False)
_payload: Optional[bytes] = dataclasses.field(default=None, init=False)
@classmethod
def error_name(cls, error_code: int) -> str:
return ErrorCode(error_code).name
def from_bytes(cls, pdu: bytes) -> ATT_PDU:
op_code = pdu[0]
@staticmethod
def subclass(fields):
def inner(cls):
cls.name = cls.__name__.upper()
cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name)
if cls.op_code is None:
raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES')
cls.fields = fields
subclass = ATT_PDU.pdu_classes.get(op_code)
if subclass is None:
instance = ATT_PDU()
instance.op_code = op_code
instance.payload = pdu[1:]
instance.name = Opcode(op_code).name
return instance
instance = subclass(**HCI_Object.dict_from_bytes(pdu, 1, subclass.fields))
instance.payload = pdu[1:]
return instance
# Register a factory for this class
ATT_PDU.pdu_classes[cls.op_code] = cls
_PDU = TypeVar("_PDU", bound="ATT_PDU")
return cls
@classmethod
def subclass(cls, subclass: type[_PDU]) -> type[_PDU]:
subclass.name = subclass.__name__.upper()
subclass.op_code = Opcode[subclass.name]
subclass.fields = HCI_Object.fields_from_dataclass(subclass)
return inner
# Register a factory for this class
ATT_PDU.pdu_classes[subclass.op_code] = subclass
def __init__(self, pdu=None, **kwargs):
if hasattr(self, 'fields') and kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
self.pdu = pdu
return subclass
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
@@ -309,67 +263,91 @@ class ATT_PDU:
def has_authentication_signature(self):
return ((self.op_code >> 7) & 1) == 1
def __bytes__(self):
return self.pdu
@property
def payload(self) -> bytes:
if self._payload is None:
self._payload = HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._payload
@payload.setter
def payload(self, value: bytes):
self._payload = value
def __bytes__(self) -> bytes:
return bytes([self.op_code]) + self.payload
def __str__(self):
result = color(self.name, 'yellow')
if fields := getattr(self, 'fields', None):
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if len(self.pdu) > 1:
result += f': {self.pdu.hex()}'
if self.payload:
result += f': {self.payload.hex()}'
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[
('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}),
('attribute_handle_in_error', HANDLE_FIELD_SPEC),
('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}),
]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Error_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
'''
request_opcode_in_error: int = dataclasses.field(metadata=Opcode.type_metadata(1))
attribute_handle_in_error: int = dataclasses.field(
metadata=hci.metadata(HANDLE_FIELD_SPEC)
)
error_code: int = dataclasses.field(metadata=ErrorCode.type_metadata(1))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('client_rx_mtu', 2)])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Exchange_MTU_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request
'''
client_rx_mtu: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('server_rx_mtu', 2)])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Exchange_MTU_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response
'''
server_rx_mtu: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC)]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Find_Information_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request
'''
starting_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
ending_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('format', 1), ('information_data', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Find_Information_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response
'''
def parse_information_data(self):
format: int = dataclasses.field(metadata=hci.metadata(1))
information_data: bytes = dataclasses.field(metadata=hci.metadata("*"))
information: list[tuple[int, bytes]] = dataclasses.field(init=False)
def __post_init__(self) -> None:
self.information = []
offset = 0
uuid_size = 2 if self.format == 1 else 16
@@ -379,14 +357,6 @@ class ATT_Find_Information_Response(ATT_PDU):
self.information.append((handle, uuid))
offset += 2 + uuid_size
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_information_data()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_information_data()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(
@@ -408,28 +378,31 @@ class ATT_Find_Information_Response(ATT_PDU):
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[
('starting_handle', HANDLE_FIELD_SPEC),
('ending_handle', HANDLE_FIELD_SPEC),
('attribute_type', UUID_2_FIELD_SPEC),
('attribute_value', '*'),
]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Find_By_Type_Value_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request
'''
starting_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
ending_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_type: UUID = dataclasses.field(metadata=hci.metadata(UUID.parse_uuid_2))
attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('handles_information_list', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Find_By_Type_Value_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response
'''
def parse_handles_information_list(self):
handles_information_list: bytes = dataclasses.field(metadata=hci.metadata("*"))
handles_information: list[tuple[int, int]] = dataclasses.field(init=False)
def __post_init__(self) -> None:
self.handles_information = []
offset = 0
while offset + 4 <= len(self.handles_information_list):
@@ -439,14 +412,6 @@ class ATT_Find_By_Type_Value_Response(ATT_PDU):
self.handles_information.append((found_attribute_handle, group_end_handle))
offset += 4
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_handles_information_list()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_handles_information_list()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(
@@ -470,27 +435,31 @@ class ATT_Find_By_Type_Value_Response(ATT_PDU):
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[
('starting_handle', HANDLE_FIELD_SPEC),
('ending_handle', HANDLE_FIELD_SPEC),
('attribute_type', UUID_2_16_FIELD_SPEC),
]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_By_Type_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request
'''
starting_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
ending_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_type: UUID = dataclasses.field(metadata=hci.metadata(UUID.parse_uuid))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_By_Type_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response
'''
def parse_attribute_data_list(self):
length: int = dataclasses.field(metadata=hci.metadata(1))
attribute_data_list: bytes = dataclasses.field(metadata=hci.metadata("*"))
attributes: list[tuple[int, bytes]] = dataclasses.field(init=False)
def __post_init__(self) -> None:
self.attributes = []
offset = 0
while self.length != 0 and offset + self.length <= len(
@@ -505,14 +474,6 @@ class ATT_Read_By_Type_Response(ATT_PDU):
self.attributes.append((attribute_handle, attribute_value))
offset += self.length
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_attribute_data_list()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_attribute_data_list()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(
@@ -534,75 +495,100 @@ class ATT_Read_By_Type_Response(ATT_PDU):
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC)])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('attribute_value', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response
'''
attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('value_offset', 2)])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Blob_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
value_offset: int = dataclasses.field(metadata=hci.metadata(2))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('part_attribute_value', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Blob_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response
'''
part_attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('set_of_handles', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
'''
set_of_handles: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('set_of_values', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response
'''
set_of_values: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[
('starting_handle', HANDLE_FIELD_SPEC),
('ending_handle', HANDLE_FIELD_SPEC),
('attribute_group_type', UUID_2_16_FIELD_SPEC),
]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_By_Group_Type_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request
'''
starting_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
ending_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_group_type: UUID = dataclasses.field(
metadata=hci.metadata(UUID.parse_uuid)
)
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_By_Group_Type_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response
'''
def parse_attribute_data_list(self):
length: int = dataclasses.field(metadata=hci.metadata(1))
attribute_data_list: bytes = dataclasses.field(metadata=hci.metadata("*"))
attributes: list[tuple[int, int, bytes]] = dataclasses.field(init=False)
def __post_init__(self) -> None:
self.attributes = []
offset = 0
while self.length != 0 and offset + self.length <= len(
@@ -619,14 +605,6 @@ class ATT_Read_By_Group_Type_Response(ATT_PDU):
)
offset += self.length
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_attribute_data_list()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_attribute_data_list()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(
@@ -651,15 +629,20 @@ class ATT_Read_By_Group_Type_Response(ATT_PDU):
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Write_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Write_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response
@@ -667,65 +650,70 @@ class ATT_Write_Response(ATT_PDU):
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Write_Command(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[
('attribute_handle', HANDLE_FIELD_SPEC),
('attribute_value', '*'),
# ('authentication_signature', 'TODO')
]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Signed_Write_Command(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# TODO: authentication_signature
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[
('attribute_handle', HANDLE_FIELD_SPEC),
('value_offset', 2),
('part_attribute_value', '*'),
]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Prepare_Write_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
value_offset: int = dataclasses.field(metadata=hci.metadata(2))
part_attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass(
[
('attribute_handle', HANDLE_FIELD_SPEC),
('value_offset', 2),
('part_attribute_value', '*'),
]
)
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Prepare_Write_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
value_offset: int = dataclasses.field(metadata=hci.metadata(2))
part_attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([("flags", 1)])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Execute_Write_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request
'''
flags: int = dataclasses.field(metadata=hci.metadata(1))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Execute_Write_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response
@@ -733,23 +721,32 @@ class ATT_Execute_Write_Response(ATT_PDU):
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Handle_Value_Notification(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Handle_Value_Indication(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication
'''
attribute_handle: int = dataclasses.field(metadata=hci.metadata(HANDLE_FIELD_SPEC))
attribute_value: bytes = dataclasses.field(metadata=hci.metadata("*"))
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Handle_Value_Confirmation(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation