# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for """LE Audio - Published Audio Capabilities Service""" # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import dataclasses import logging import struct from typing import Optional, Sequence, Union from bumble.profiles.bap import AudioLocation, CodecSpecificCapabilities, ContextType from bumble import gatt from bumble import gatt_client from bumble import hci # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- @dataclasses.dataclass class PacRecord: coding_format: hci.CodingFormat codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] # TODO: Parse Metadata metadata: bytes = b'' @classmethod def from_bytes(cls, data: bytes) -> PacRecord: offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0) codec_specific_capabilities_size = data[offset] offset += 1 codec_specific_capabilities_bytes = data[ offset : offset + codec_specific_capabilities_size ] offset += codec_specific_capabilities_size metadata_size = data[offset] metadata = data[offset : offset + metadata_size] codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC: codec_specific_capabilities = codec_specific_capabilities_bytes else: codec_specific_capabilities = CodecSpecificCapabilities.from_bytes( codec_specific_capabilities_bytes ) return PacRecord( coding_format=coding_format, codec_specific_capabilities=codec_specific_capabilities, metadata=metadata, ) def __bytes__(self) -> bytes: capabilities_bytes = bytes(self.codec_specific_capabilities) return ( bytes(self.coding_format) + bytes([len(capabilities_bytes)]) + capabilities_bytes + bytes([len(self.metadata)]) + self.metadata ) # ----------------------------------------------------------------------------- # Server # ----------------------------------------------------------------------------- class PublishedAudioCapabilitiesService(gatt.TemplateService): UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE sink_pac: Optional[gatt.Characteristic] sink_audio_locations: Optional[gatt.Characteristic] source_pac: Optional[gatt.Characteristic] source_audio_locations: Optional[gatt.Characteristic] available_audio_contexts: gatt.Characteristic supported_audio_contexts: gatt.Characteristic def __init__( self, supported_source_context: ContextType, supported_sink_context: ContextType, available_source_context: ContextType, available_sink_context: ContextType, sink_pac: Sequence[PacRecord] = (), sink_audio_locations: Optional[AudioLocation] = None, source_pac: Sequence[PacRecord] = (), source_audio_locations: Optional[AudioLocation] = None, ) -> None: characteristics = [] self.supported_audio_contexts = gatt.Characteristic( uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC, properties=gatt.Characteristic.Properties.READ, permissions=gatt.Characteristic.Permissions.READABLE, value=struct.pack('