# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import pprint import asyncio import contextlib import logging import wave import itertools import struct from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple, List import itertools try: import lc3 # type: ignore # pylint: disable=E0401 except ImportError as e: raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e from bumble.colors import color from bumble import company_ids from bumble import core from bumble import gatt from bumble import hci from bumble.profiles import bap from bumble.profiles import le_audio from bumble.profiles import pbp from bumble.profiles import bass import bumble.device import bumble.transport import bumble.utils from bumble.device import Host, BIGInfoAdvertisement, AdvertisingChannelMap from bumble.audio import io as audio_io from auracast import auracast_config # modified from bumble class ModWaveAudioInput(audio_io.ThreadedAudioInput): """Audio input that reads PCM samples from a .wav file.""" def __init__(self, filename: str) -> None: super().__init__() self._filename = filename self._wav: wave.Wave_read | None = None self._bytes_read = 0 self.rewind=True def _open(self) -> audio_io.PcmFormat: self._wav = wave.open(self._filename, 'rb') if self._wav.getsampwidth() != 2: raise ValueError('sample width not supported') return audio_io.PcmFormat( audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, self._wav.getframerate(), self._wav.getnchannels(), ) def _read(self, frame_size: int) -> bytes: if not self._wav: return b'' pcm_samples = self._wav.readframes(frame_size) if not pcm_samples and self._bytes_read: if not self.rewind: return None # Loop around. self._wav.rewind() self._bytes_read = 0 pcm_samples = self._wav.readframes(frame_size) self._bytes_read += len(pcm_samples) return pcm_samples def _close(self) -> None: if self._wav: self._wav.close() audio_io.WaveAudioInput = ModWaveAudioInput # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) @contextlib.asynccontextmanager async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]: async with await bumble.transport.open_transport(config.transport) as ( hci_source, hci_sink, ): device_config = bumble.device.DeviceConfiguration( name=config.device_name, address=config.auracast_device_address, keystore='JsonKeyStore', #le_simultaneous_enabled=True #TODO: What is this doing ? ) device = bumble.device.Device.from_config_with_hci( device_config, hci_source, hci_sink, ) await device.power_on() yield device def run_async(async_command: Coroutine) -> None: try: asyncio.run(async_command) except core.ProtocolError as error: if error.error_namespace == 'att' and error.error_code in list( bass.ApplicationError ): message = bass.ApplicationError(error.error_code).name else: message = str(error) print( color('!!! An error occurred while executing the command:', 'red'), message ) async def init_broadcast( device, global_config : auracast_config.AuracastGlobalConfig, big_config: List[auracast_config.AuracastBigConfig] ) -> dict: bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}") bigs = {} for i, conf in enumerate(big_config): bigs[f'big{i}'] = {} # Config advertising set bigs[f'big{i}']['basic_audio_announcement'] = bap.BasicAudioAnnouncement( presentation_delay=global_config.presentation_delay_us, subgroups=[ bap.BasicAudioAnnouncement.Subgroup( codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3), codec_specific_configuration=bap.CodecSpecificConfiguration( sampling_frequency=bap_sampling_freq, frame_duration=bap.FrameDuration.DURATION_10000_US, octets_per_codec_frame=global_config.octets_per_frame, ), metadata=le_audio.Metadata( [ le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.LANGUAGE, data=conf.language.encode() ), le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=conf.program_info.encode() ), le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=conf.name.encode() ), ] ), bis=[ bap.BasicAudioAnnouncement.BIS( index=1, codec_specific_configuration=bap.CodecSpecificConfiguration( audio_channel_allocation=bap.AudioLocation.FRONT_LEFT ), ), ], ) ], ) logger.info('Setup Advertising') advertising_manufacturer_data = ( b'' if global_config.manufacturer_data is None else bytes( core.AdvertisingData( [ ( core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA, struct.pack('