mirror of
https://github.com/google/bumble.git
synced 2026-05-10 04:18:03 +00:00
Add Metadata LTV serializer and adapt Unicast
This commit is contained in:
@@ -17,33 +17,67 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
import dataclasses
|
||||
from typing import List
|
||||
import struct
|
||||
from typing import List, Type
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import utils
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Classes
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclasses.dataclass
|
||||
class Metadata:
|
||||
'''Bluetooth Assigned Numbers, Section 6.12.6 - Metadata LTV structures.
|
||||
|
||||
As Metadata fields may extend, and Spec doesn't forbid duplication, we don't parse
|
||||
Metadata into a key-value style dataclass here. Rather, we encourage users to parse
|
||||
again outside the lib.
|
||||
'''
|
||||
|
||||
class Tag(utils.OpenIntEnum):
|
||||
# fmt: off
|
||||
PREFERRED_AUDIO_CONTEXTS = 0x01
|
||||
STREAMING_AUDIO_CONTEXTS = 0x02
|
||||
PROGRAM_INFO = 0x03
|
||||
LANGUAGE = 0x04
|
||||
CCID_LIST = 0x05
|
||||
PARENTAL_RATING = 0x06
|
||||
PROGRAM_INFO_URI = 0x07
|
||||
AUDIO_ACTIVE_STATE = 0x08
|
||||
BROADCAST_AUDIO_IMMEDIATE_RENDERING_FLAG = 0x09
|
||||
ASSISTED_LISTENING_STREAM = 0x0A
|
||||
BROADCAST_NAME = 0x0B
|
||||
EXTENDED_METADATA = 0xFE
|
||||
VENDOR_SPECIFIC = 0xFF
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Entry:
|
||||
tag: int
|
||||
tag: Metadata.Tag
|
||||
data: bytes
|
||||
|
||||
entries: List[Entry]
|
||||
@classmethod
|
||||
def from_bytes(cls: Type[Self], data: bytes) -> Self:
|
||||
return cls(tag=Metadata.Tag(data[0]), data=data[1:])
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return bytes([len(self.data) + 1, self.tag]) + self.data
|
||||
|
||||
entries: List[Entry] = dataclasses.field(default_factory=list)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
def from_bytes(cls: Type[Self], data: bytes) -> Self:
|
||||
entries = []
|
||||
offset = 0
|
||||
length = len(data)
|
||||
while length >= 2:
|
||||
while offset < length:
|
||||
entry_length = data[offset]
|
||||
entry_tag = data[offset + 1]
|
||||
entry_data = data[offset + 2 : offset + 2 + entry_length - 1]
|
||||
entries.append(cls.Entry(entry_tag, entry_data))
|
||||
length -= entry_length
|
||||
offset += 1
|
||||
entries.append(cls.Entry.from_bytes(data[offset : offset + entry_length]))
|
||||
offset += entry_length
|
||||
|
||||
return cls(entries)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return b''.join([bytes(entry) for entry in self.entries])
|
||||
|
||||
Reference in New Issue
Block a user