forked from auracaster/bumble_mirror
attempt to fix pandora test (+3 squashed commits)
Squashed commits: [759372d] address PR comments [2f2a275] wip [cc86b98] wip wip address PR comments attempt to fix pandora test
This commit is contained in:
@@ -25,6 +25,7 @@ import struct
|
||||
import functools
|
||||
import logging
|
||||
from typing import Optional, List, Union, Type, Dict, Any, Tuple
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import core
|
||||
from bumble import colors
|
||||
@@ -32,6 +33,8 @@ from bumble import device
|
||||
from bumble import hci
|
||||
from bumble import gatt
|
||||
from bumble import gatt_client
|
||||
from bumble import utils
|
||||
from bumble.profiles import le_audio
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -115,7 +118,7 @@ class ContextType(enum.IntFlag):
|
||||
EMERGENCY_ALARM = 0x0800
|
||||
|
||||
|
||||
class SamplingFrequency(enum.IntEnum):
|
||||
class SamplingFrequency(utils.OpenIntEnum):
|
||||
'''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
|
||||
|
||||
# fmt: off
|
||||
@@ -240,7 +243,7 @@ class SupportedFrameDuration(enum.IntFlag):
|
||||
DURATION_10000_US_PREFERRED = 0b0010
|
||||
|
||||
|
||||
class AnnouncementType(enum.IntEnum):
|
||||
class AnnouncementType(utils.OpenIntEnum):
|
||||
'''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements'''
|
||||
|
||||
# fmt: off
|
||||
@@ -613,7 +616,7 @@ class CodecSpecificConfiguration:
|
||||
* Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements
|
||||
'''
|
||||
|
||||
class Type(enum.IntEnum):
|
||||
class Type(utils.OpenIntEnum):
|
||||
# fmt: off
|
||||
SAMPLING_FREQUENCY = 0x01
|
||||
FRAME_DURATION = 0x02
|
||||
@@ -725,6 +728,99 @@ class PacRecord:
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class BroadcastAudioAnnouncement:
|
||||
broadcast_id: int
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
return cls(int.from_bytes(data[:3], 'little'))
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class BasicAudioAnnouncement:
|
||||
@dataclasses.dataclass
|
||||
class BIS:
|
||||
index: int
|
||||
codec_specific_configuration: CodecSpecificConfiguration
|
||||
|
||||
@dataclasses.dataclass
|
||||
class CodecInfo:
|
||||
coding_format: hci.CodecID
|
||||
company_id: int
|
||||
vendor_specific_codec_id: int
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
coding_format = hci.CodecID(data[0])
|
||||
company_id = int.from_bytes(data[1:3], 'little')
|
||||
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
|
||||
return cls(coding_format, company_id, vendor_specific_codec_id)
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Subgroup:
|
||||
codec_id: BasicAudioAnnouncement.CodecInfo
|
||||
codec_specific_configuration: CodecSpecificConfiguration
|
||||
metadata: le_audio.Metadata
|
||||
bis: List[BasicAudioAnnouncement.BIS]
|
||||
|
||||
presentation_delay: int
|
||||
subgroups: List[BasicAudioAnnouncement.Subgroup]
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
presentation_delay = int.from_bytes(data[:3], 'little')
|
||||
subgroups = []
|
||||
offset = 4
|
||||
for _ in range(data[3]):
|
||||
num_bis = data[offset]
|
||||
offset += 1
|
||||
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5])
|
||||
offset += 5
|
||||
codec_specific_configuration_length = data[offset]
|
||||
offset += 1
|
||||
codec_specific_configuration = data[
|
||||
offset : offset + codec_specific_configuration_length
|
||||
]
|
||||
offset += codec_specific_configuration_length
|
||||
metadata_length = data[offset]
|
||||
offset += 1
|
||||
metadata = le_audio.Metadata.from_bytes(
|
||||
data[offset : offset + metadata_length]
|
||||
)
|
||||
offset += metadata_length
|
||||
|
||||
bis = []
|
||||
for _ in range(num_bis):
|
||||
bis_index = data[offset]
|
||||
offset += 1
|
||||
bis_codec_specific_configuration_length = data[offset]
|
||||
offset += 1
|
||||
bis_codec_specific_configuration = data[
|
||||
offset : offset + bis_codec_specific_configuration_length
|
||||
]
|
||||
offset += bis_codec_specific_configuration_length
|
||||
bis.append(
|
||||
cls.BIS(
|
||||
bis_index,
|
||||
CodecSpecificConfiguration.from_bytes(
|
||||
bis_codec_specific_configuration
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
subgroups.append(
|
||||
cls.Subgroup(
|
||||
codec_id,
|
||||
CodecSpecificConfiguration.from_bytes(codec_specific_configuration),
|
||||
metadata,
|
||||
bis,
|
||||
)
|
||||
)
|
||||
|
||||
return cls(presentation_delay, subgroups)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Server
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -744,9 +840,9 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService):
|
||||
supported_sink_context: ContextType,
|
||||
available_source_context: ContextType,
|
||||
available_sink_context: ContextType,
|
||||
sink_pac: Sequence[PacRecord] = [],
|
||||
sink_pac: Sequence[PacRecord] = (),
|
||||
sink_audio_locations: Optional[AudioLocation] = None,
|
||||
source_pac: Sequence[PacRecord] = [],
|
||||
source_pac: Sequence[PacRecord] = (),
|
||||
source_audio_locations: Optional[AudioLocation] = None,
|
||||
) -> None:
|
||||
characteristics = []
|
||||
|
||||
Reference in New Issue
Block a user