# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import pprint import asyncio import contextlib import logging import wave import itertools import struct from typing import cast, Any, AsyncGenerator, Coroutine, List import itertools import glob try: import lc3 # type: ignore # pylint: disable=E0401 except ImportError as e: raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e from bumble.colors import color from bumble import company_ids from bumble import core from bumble import gatt from bumble import hci from bumble.profiles import bap from bumble.profiles import le_audio from bumble.profiles import pbp from bumble.profiles import bass import bumble.device import bumble.transport import bumble.utils import numpy as np # for audio down-mix from bumble.device import Host, BIGInfoAdvertisement, AdvertisingChannelMap from bumble.audio import io as audio_io from auracast import auracast_config from auracast.utils.read_lc3_file import read_lc3_file from auracast.utils.network_audio_receiver import NetworkAudioReceiverUncoded from auracast.utils.webrtc_audio_input import WebRTCAudioInput # Instantiate WebRTC audio input for streaming (can be used per-BIG or globally) # modified from bumble class ModWaveAudioInput(audio_io.ThreadedAudioInput): """Audio input that reads PCM samples from a .wav file.""" def __init__(self, filename: str) -> None: super().__init__() self._filename = filename self._wav: wave.Wave_read | None = None self._bytes_read = 0 self.rewind=True def _open(self) -> audio_io.PcmFormat: self._wav = wave.open(self._filename, 'rb') if self._wav.getsampwidth() != 2: raise ValueError('sample width not supported') return audio_io.PcmFormat( audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, self._wav.getframerate(), self._wav.getnchannels(), ) def _read(self, frame_size: int) -> bytes: if not self._wav: return b'' pcm_samples = self._wav.readframes(frame_size) if not pcm_samples and self._bytes_read: if not self.rewind: return None # Loop around. self._wav.rewind() self._bytes_read = 0 pcm_samples = self._wav.readframes(frame_size) self._bytes_read += len(pcm_samples) return pcm_samples def _close(self) -> None: if self._wav: self._wav.close() audio_io.WaveAudioInput = ModWaveAudioInput # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # Low-latency SoundDevice input shim # ----------------------------------------------------------------------------- class ModSoundDeviceAudioInput(audio_io.ThreadedAudioInput): """Audio input using sounddevice with explicit low-latency settings. This mirrors bumble.audio.io.SoundDeviceAudioInput but requests a small PortAudio blocksize and low latency to reduce buffering on PipeWire/ALSA. Tunables via env: - AURACAST_SD_BLOCKSIZE (default 64) - AURACAST_SD_LATENCY (default 'low', can be 'default', 'high' or seconds) """ def __init__(self, device_name: str, pcm_format: audio_io.PcmFormat) -> None: super().__init__() self._device = int(device_name) if device_name else None self._pcm_format = pcm_format self._stream = None def _open(self) -> audio_io.PcmFormat: import os import sounddevice # type: ignore # Read tunables try: blocksize = int(os.environ.get('AURACAST_SD_BLOCKSIZE', '64')) except Exception: blocksize = 64 raw_latency = os.environ.get('AURACAST_SD_LATENCY', 'low') if raw_latency in ('low', 'high', 'default'): latency: typing.Union[str, float] = raw_latency else: try: latency = float(raw_latency) except Exception: latency = 'low' # Create the raw input stream with tighter buffering self._stream = sounddevice.RawInputStream( samplerate=self._pcm_format.sample_rate, device=self._device, channels=self._pcm_format.channels, dtype='int16', blocksize=blocksize, latency=latency, ) self._stream.start() # Report stereo output format to match Bumble's original behavior return audio_io.PcmFormat( audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, self._pcm_format.sample_rate, 2, ) def _read(self, frame_size: int) -> bytes: if not self._stream: return b'' pcm_buffer, overflowed = self._stream.read(frame_size) if overflowed: logging.warning("input overflow") # Duplicate mono to stereo for downstream expectations if self._pcm_format.channels == 1: stereo_buffer = bytearray() for i in range(frame_size): sample = pcm_buffer[i * 2 : i * 2 + 2] stereo_buffer += sample + sample return bytes(stereo_buffer) return bytes(pcm_buffer) def _close(self): if self._stream: self._stream.stop() self._stream = None # Replace Bumble's default with the low-latency variant audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput @contextlib.asynccontextmanager async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]: async with await bumble.transport.open_transport(config.transport) as ( hci_source, hci_sink, ): device_config = bumble.device.DeviceConfiguration( name=config.device_name, address= hci.Address(config.auracast_device_address), keystore='JsonKeyStore', #le_simultaneous_enabled=True #TODO: What is this doing ? ) device = bumble.device.Device.from_config_with_hci( device_config, hci_source, hci_sink, ) await device.power_on() yield device def run_async(async_command: Coroutine) -> None: try: asyncio.run(async_command) except core.ProtocolError as error: if error.error_namespace == 'att' and error.error_code in list( bass.ApplicationError ): message = bass.ApplicationError(error.error_code).name else: message = str(error) print( color('!!! An error occurred while executing the command:', 'red'), message ) async def init_broadcast( device, global_config : auracast_config.AuracastGlobalConfig, big_config: List[auracast_config.AuracastBigConfig] ) -> dict: bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}") bigs = {} for i, conf in enumerate(big_config): metadata=le_audio.Metadata( [ le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.LANGUAGE, data=conf.language.encode() ), le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=conf.program_info.encode() ), le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=conf.name.encode() ), ] + ( [ # Broadcast Audio Immediate Rendering flag (type 0x09), zero-length value le_audio.Metadata.Entry(tag = le_audio.Metadata.Tag.BROADCAST_AUDIO_IMMEDIATE_RENDERING_FLAG, data=b"") ] if global_config.immediate_rendering else [] ) ) logging.info( metadata.pretty_print("\n") ) bigs[f'big{i}'] = {} # Config advertising set bigs[f'big{i}']['basic_audio_announcement'] = bap.BasicAudioAnnouncement( presentation_delay=global_config.presentation_delay_us, subgroups=[ bap.BasicAudioAnnouncement.Subgroup( codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3), codec_specific_configuration=bap.CodecSpecificConfiguration( sampling_frequency=bap_sampling_freq, frame_duration=bap.FrameDuration.DURATION_10000_US, octets_per_codec_frame=global_config.octets_per_frame, ), metadata=metadata, bis=[ bap.BasicAudioAnnouncement.BIS( index=1, codec_specific_configuration=bap.CodecSpecificConfiguration( audio_channel_allocation=bap.AudioLocation.FRONT_LEFT ), ), ], ) ], ) logger.info('Setup Advertising') advertising_manufacturer_data = ( b'' if global_config.manufacturer_data == (None, None) else bytes( core.AdvertisingData( [ ( core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA, struct.pack('` captures) has no `.rewind`. if hasattr(audio_input, "rewind"): audio_input.rewind = big_config[i].loop # Retry logic – ALSA sometimes keeps the device busy for a short time after the # previous stream has closed. Handle PortAudioError -9985 with back-off retries. import sounddevice as _sd max_attempts = 3 for attempt in range(1, max_attempts + 1): try: pcm_format = await audio_input.open() break # success except _sd.PortAudioError as err: # -9985 == paDeviceUnavailable logging.error('Could not open audio device %s with error %s', audio_source, err) code = None if hasattr(err, 'errno'): code = err.errno elif len(err.args) > 1 and isinstance(err.args[1], int): code = err.args[1] if code == -9985 and attempt < max_attempts: backoff_ms = 200 * attempt logging.warning("PortAudio device busy (attempt %d/%d). Retrying in %.1f ms…", attempt, max_attempts, backoff_ms) # ensure device handle and PortAudio context are closed before retrying try: if hasattr(audio_input, "aclose"): await audio_input.aclose() elif hasattr(audio_input, "close"): audio_input.close() except Exception: pass # Fully terminate PortAudio to drop lingering handles (sounddevice quirk) if hasattr(_sd, "_terminate"): try: _sd._terminate() except Exception: pass # Small pause then re-initialize PortAudio await asyncio.sleep(0.1) if hasattr(_sd, "_initialize"): try: _sd._initialize() except Exception: pass # Back-off before next attempt await asyncio.sleep(backoff_ms / 1000) # Recreate audio_input fresh for next attempt audio_input = await audio_io.create_audio_input(audio_source, input_format) continue # Other errors or final attempt – re-raise so caller can abort gracefully raise else: # Loop exhausted without break logging.error("Unable to open audio device after %d attempts – giving up", max_attempts) return if pcm_format.channels != 1: logging.info("Input device provides %d channels – will down-mix to mono for LC3", pcm_format.channels) if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16: pcm_bit_depth = 16 elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32: pcm_bit_depth = None else: logging.error("Only INT16 and FLOAT32 sample types are supported") return encoder = lc3.Encoder( frame_duration_us=global_config.frame_duration_us, sample_rate_hz=global_config.auracast_sampling_rate_hz, num_channels=1, input_sample_rate_hz=pcm_format.sample_rate, ) lc3_frame_samples = encoder.get_frame_samples() # number of the pcm samples per lc3 frame big['pcm_bit_depth'] = pcm_bit_depth big['channels'] = pcm_format.channels big['lc3_frame_samples'] = lc3_frame_samples big['lc3_bytes_per_frame'] = global_config.octets_per_frame big['audio_input'] = audio_input big['encoder'] = encoder big['precoded'] = False logging.info("Streaming audio...") bigs = self.bigs self.is_streaming = True # One streamer fits all while self.is_streaming: stream_finished = [False for _ in range(len(bigs))] for i, big in enumerate(bigs.values()): if big['precoded']:# everything was already lc3 coded beforehand lc3_frame = bytes( itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame']) ) if lc3_frame == b'': # Not all streams may stop at the same time stream_finished[i] = True continue else: # code lc3 on the fly pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None) if pcm_frame is None: # Not all streams may stop at the same time stream_finished[i] = True continue # Down-mix multi-channel PCM to mono for LC3 encoder if needed if big.get('channels', 1) > 1: if isinstance(pcm_frame, np.ndarray): if pcm_frame.ndim > 1: mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype) pcm_frame = mono else: # Convert raw bytes to numpy, average channels, convert back dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32 samples = np.frombuffer(pcm_frame, dtype=dtype) samples = samples.reshape(-1, big['channels']).mean(axis=1) pcm_frame = samples.astype(dtype).tobytes() lc3_frame = big['encoder'].encode( pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'] ) await big['iso_queue'].write(lc3_frame) if all(stream_finished): # Take into account that multiple files have different lengths logging.info('All streams finished, stopping streamer') self.is_streaming = False break # ----------------------------------------------------------------------------- # Main # ----------------------------------------------------------------------------- async def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]): """Start a broadcast.""" if global_conf.transport == 'auto': devices = glob.glob('/dev/serial/by-id/*') logging.info('Found serial devices: %s', devices) for device in devices: if 'usb-ZEPHYR_Zephyr_HCI_UART_sample' in device: logging.info('Using: %s', device) global_conf.transport = f'serial:{device},115200,rtscts' break # check again if transport is still auto if global_conf.transport == 'auto': raise AssertionError('No suitable transport found.') async with create_device(global_conf) as device: if not device.supports_le_periodic_advertising: logger.error(color('Periodic advertising not supported', 'red')) return bigs = await init_broadcast( # the bigs dictionary contains all the global configurations device, global_conf, big_conf ) streamer = Streamer(bigs, global_conf, big_conf) streamer.start_streaming() await asyncio.wait([streamer.task]) # ----------------------------------------------------------------------------- if __name__ == "__main__": import os logging.basicConfig( #export LOG_LEVEL=DEBUG level=os.environ.get('LOG_LEVEL', logging.INFO), format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) os.chdir(os.path.dirname(__file__)) config = auracast_config.AuracastConfigGroup( bigs = [ auracast_config.AuracastBigConfigDeu(), #auracast_config.AuracastBigConfigEng(), #auracast_config.AuracastBigConfigFra(), #auracast_config.AuracastBigConfigEs(), #auracast_config.AuracastBigConfigIt(), ] ) # TODO: How can we use other iso interval than 10ms ?(medium or low rel) ? - nrf53audio receiver repports I2S tx underrun config.qos_config=auracast_config.AuracastQosHigh() #config.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle #config.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk #config.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk #config.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_95A087EADB030B24-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc #config.transport='usb:2fe3:000b' #nrf52dongle hci_usb # TODO: iso packet over usb not supported #config.transport= 'auto' config.transport='serial:/dev/ttyAMA4,1000000,rtscts' # transport for raspberry pi # TODO: encrypted streams are not working for big in config.bigs: #big.code = 'ff'*16 # returns hci/HCI_ENCRYPTION_MODE_NOT_ACCEPTABLE_ERROR #big.code = '78 e5 dc f1 34 ab 42 bf c1 92 ef dd 3a fd 67 ae' big.precode_wav = False #big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files #big.audio_source = read_lc3_file(big.audio_source) # load files in advance # --- Network_uncoded mode using NetworkAudioReceiver --- #big.audio_source = NetworkAudioReceiverUncoded(port=50007, samplerate=16000, channels=1, chunk_size=1024) # 16kHz works reliably with 3 streams # 24kHz is only working with 2 streams - probably airtime constraint # TODO: with more than three broadcasters (16kHz) no advertising (no primary channels is present anymore) # TODO: find the bottleneck - probably airtime # TODO: test encrypted streams config.auracast_sampling_rate_hz = 16000 config.octets_per_frame = 40 # 32kbps@16kHz #config.debug = True run_async( broadcast( config, config.bigs ) ) # TODO: possible inputs: # wav file locally # precoded lc3 file locally # realtime audio locally # realtime audio network lc3 coded # (realtime audio network uncoded)