forked from auracaster/bumble_mirror
Dataclass-based packets
This commit is contained in:
@@ -107,6 +107,36 @@ def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int:
|
||||
return phy_bits
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Field Metadata
|
||||
# -----------------------------------------------------------------------------
|
||||
# Field specification can be:
|
||||
# - a dict with "serializer", "parser", "size", "mapper" keys
|
||||
# - a callable that takes (packet, offset) and returns (new_offset, value) (deserialize only)
|
||||
# - a string of
|
||||
# - ">2" and ">4" for 2-byte and 4-byte big-endian integers
|
||||
# - "*" for all remaining bytes in the packet
|
||||
# - "v" for variable length bytes with a leading length byte
|
||||
# - an integer [1, 4] for 1-byte, 2-byte or 4-byte unsigned little-endian integers
|
||||
# - an integer [-2, -1] for 1-byte, 2-byte signed little-endian integers
|
||||
FieldSpec = Union[dict[str, Any], Callable[[bytes, int], tuple[int, Any]], str, int]
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class FieldMetadata:
|
||||
spec: FieldSpec
|
||||
list_begin: bool = False
|
||||
list_end: bool = False
|
||||
|
||||
|
||||
def metadata(
|
||||
spec: FieldSpec, list_begin: bool = False, list_end: bool = False
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"bumble.hci": FieldMetadata(spec=spec, list_begin=list_begin, list_end=list_end)
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -1659,7 +1689,7 @@ class HCI_Object:
|
||||
HCI_Object.init_from_fields(hci_object, parsed.keys(), parsed.values())
|
||||
|
||||
@staticmethod
|
||||
def parse_field(data, offset, field_type):
|
||||
def parse_field(data: bytes, offset: int, field_type: FieldSpec):
|
||||
# The field_type may be a dictionary with a mapper, parser, and/or size
|
||||
if isinstance(field_type, dict):
|
||||
if 'size' in field_type:
|
||||
@@ -1741,7 +1771,7 @@ class HCI_Object:
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def serialize_field(field_value, field_type):
|
||||
def serialize_field(field_value: Any, field_type: FieldSpec) -> bytes:
|
||||
# The field_type may be a dictionary with a mapper, parser, serializer,
|
||||
# and/or size
|
||||
serializer = None
|
||||
@@ -1932,6 +1962,24 @@ class HCI_Object:
|
||||
for field_name, field_value in field_strings
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def fields_from_dataclass(cls, obj: Any) -> list[Any]:
|
||||
stack: list[list[Any]] = [[]]
|
||||
for field in dataclasses.fields(obj):
|
||||
# Fields without metadata should be ignored.
|
||||
if not isinstance(
|
||||
(metadata := field.metadata.get("bumble.hci")), FieldMetadata
|
||||
):
|
||||
continue
|
||||
if metadata.list_begin:
|
||||
stack.append([])
|
||||
if metadata.spec:
|
||||
stack[-1].append((field.name, metadata.spec))
|
||||
if metadata.list_end:
|
||||
top = stack.pop()
|
||||
stack[-1].append(top)
|
||||
return stack[0]
|
||||
|
||||
def __init__(self, fields, **kwargs):
|
||||
self.fields = fields
|
||||
self.init_from_fields(self, fields, kwargs)
|
||||
|
||||
Reference in New Issue
Block a user