From 6d835bf1be1b75cfb100a8da3838cd9dddd5357d Mon Sep 17 00:00:00 2001 From: pstruebi Date: Wed, 8 Oct 2025 14:27:58 +0200 Subject: [PATCH 01/11] feat: add clock drift compensation to discard excess samples when buffer exceeds threshold --- src/auracast/auracast_config.py | 3 + src/auracast/multicast.py | 83 +++++++++++++++++++++++-- src/auracast/multicast_script.py | 11 ++-- src/auracast/server/multicast_server.py | 4 +- 4 files changed, 91 insertions(+), 10 deletions(-) diff --git a/src/auracast/auracast_config.py b/src/auracast/auracast_config.py index 2971dea..06d855f 100644 --- a/src/auracast/auracast_config.py +++ b/src/auracast/auracast_config.py @@ -40,6 +40,9 @@ class AuracastGlobalConfig(BaseModel): # so receivers may render earlier than the presentation delay for lower latency. immediate_rendering: bool = False assisted_listening_stream: bool = False + # Clock drift compensation: discard excess samples when buffer exceeds threshold + enable_drift_compensation: bool = False + drift_threshold_ms: float = 2.0 # Discard threshold in milliseconds # "Audio input. " # "'device' -> use the host's default sound input device, " diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 93792e5..f606dcd 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -26,11 +26,9 @@ import struct from typing import cast, Any, AsyncGenerator, Coroutine, List import itertools import glob +import time -try: - import lc3 # type: ignore # pylint: disable=E0401 -except ImportError as e: - raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e +import lc3 # type: ignore # pylint: disable=E0401 from bumble.colors import color from bumble import company_ids @@ -623,6 +621,25 @@ class Streamer(): logging.info("Streaming audio...") bigs = self.bigs self.is_streaming = True + + # Sample discard stats (clock drift compensation) + samples_discarded_total = 0 # Total samples discarded + discard_events = 0 # Number of times we discarded samples + enable_drift_compensation = global_config.enable_drift_compensation + # Calculate threshold based on config (default 2ms) + drift_threshold_ms = global_config.drift_threshold_ms if enable_drift_compensation else 0 + drop_threshold_samples = 0 + + if enable_drift_compensation: + logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms") + else: + logging.info("Clock drift compensation DISABLED") + + # Periodic monitoring + last_stats_log = time.perf_counter() + stats_interval = 5.0 # Log stats every 5 seconds + frame_count = 0 + # One streamer fits all while self.is_streaming: stream_finished = [False for _ in range(len(bigs))] @@ -641,12 +658,39 @@ class Streamer(): if frames_gen is None: frames_gen = big['audio_input'].frames(big['lc3_frame_samples']) big['frames_gen'] = frames_gen + + # Read the frame we need for encoding pcm_frame = await anext(frames_gen, None) if pcm_frame is None: # Not all streams may stop at the same time stream_finished[i] = True continue - + + # Calculate threshold samples based on sample rate (only once per BIG) + if enable_drift_compensation and drop_threshold_samples == 0: + sample_rate = big['audio_input']._pcm_format.sample_rate + drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0) + logging.info(f"Drift compensation threshold: {drop_threshold_samples} samples ({drift_threshold_ms}ms @ {sample_rate}Hz)") + + # Discard excess samples in buffer if above threshold (clock drift compensation) + if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + sd_buffer_samples = big['audio_input']._stream.read_available + + if sd_buffer_samples > drop_threshold_samples: + # Discard ALL remaining samples to bring buffer back down + try: + discarded_data = big['audio_input']._stream.read(sd_buffer_samples) + samples_discarded_total += sd_buffer_samples + discard_events += 1 + + if discard_events % 100 == 0: # Log every 100th discard + logging.warning( + f"Discard #{discard_events}: {sd_buffer_samples} samples ({sd_buffer_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) " + f"| total discarded: {samples_discarded_total} samples" + ) + except Exception as e: + logging.error(f"Failed to discard samples: {e}") + # Down-mix multi-channel PCM to mono for LC3 encoder if needed if big.get('channels', 1) > 1: if isinstance(pcm_frame, np.ndarray): @@ -665,6 +709,35 @@ class Streamer(): ) await big['iso_queue'].write(lc3_frame) + frame_count += 1 + + # Periodic stats logging + now = time.perf_counter() + if now - last_stats_log >= stats_interval: + # Get current buffer status + current_sd_buffer = 0 + if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + try: + current_sd_buffer = big['audio_input']._stream.read_available + except Exception: + pass + + if enable_drift_compensation: + avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0 + discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0 + logging.info( + f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | " + f"avg_discard={avg_discard_per_event:.0f} samples/event | " + f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" + ) + else: + logging.info( + f"STATS: frames={frame_count} | " + f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"drift_compensation=DISABLED" + ) + last_stats_log = now if all(stream_finished): # Take into account that multiple files have different lengths logging.info('All streams finished, stopping streamer') diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 553cebe..b0eb1cf 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -61,7 +61,7 @@ if __name__ == "__main__": # Load .env located next to this script (only uppercase keys will be referenced) load_dotenv(dotenv_path='.env') - os.environ.setdefault("PULSE_LATENCY_MSEC", "3") + os.environ.setdefault("PULSE_LATENCY_MSEC", "2") # Refresh device cache and list inputs refresh_pw_cache() @@ -150,14 +150,17 @@ if __name__ == "__main__": ), #auracast_config.AuracastBigConfigEng(), ], - immediate_rendering=True, + immediate_rendering=False, presentation_delay_us=40000, qos_config=auracast_config.AuracastQosHigh(), auracast_sampling_rate_hz = LC3_SRATE, octets_per_frame = OCTETS_PER_FRAME, - transport=TRANSPORT1 + transport=TRANSPORT1, + enable_drift_compensation=True, + drift_threshold_ms=2.0 ) - #config.debug = True + config.debug = False + logging.info(config.model_dump_json(indent=2)) multicast.run_async( diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index a2b1b88..8c1c52e 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -42,7 +42,7 @@ TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # tr PTIME = 40 # seems to have no effect at all pcs: Set[RTCPeerConnection] = set() # keep refs so they don’t GC early -os.environ["PULSE_LATENCY_MSEC"] = "3" +os.environ["PULSE_LATENCY_MSEC"] = "2" # In-memory cache to avoid disk I/O on hot paths like /status SETTINGS_CACHE: dict = {} @@ -172,7 +172,9 @@ class StreamerWorker: pass self._multicaster1 = None + # overwrite some configurations conf.transport = TRANSPORT1 + conf.enable_drift_compensation=True # Derive device name and input mode first_source = conf.bigs[0].audio_source if conf.bigs else '' input_device_name = None -- 2.52.0 From 329510beaead48edb95e20c50ff339f423c954b0 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Wed, 22 Oct 2025 08:32:10 +0200 Subject: [PATCH 02/11] feat: add ALSA backend support and advanced audio metrics logging --- src/auracast/multicast.py | 192 +++++++++++++++++++++++- src/auracast/multicast_script.py | 88 +++-------- src/auracast/utils/sounddevice_utils.py | 65 +++++++- src/scripts/list_pw_nodes.py | 10 +- 4 files changed, 275 insertions(+), 80 deletions(-) diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index f606dcd..96dab16 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -311,8 +311,9 @@ async def init_broadcast( bigs[f'big{i}']['iso_queue'] = iso_queue - logging.info(f'big{i} parameters are:') - logging.info('%s', pprint.pformat(vars(big))) + if global_config.debug: + logging.info(f'big{i} parameters are:') + logging.info('%s', pprint.pformat(vars(big))) logging.info(f'Finished setup of big{i}.') await asyncio.sleep(i+1) # Wait for advertising to set up @@ -714,7 +715,7 @@ class Streamer(): # Periodic stats logging now = time.perf_counter() if now - last_stats_log >= stats_interval: - # Get current buffer status + # Get current buffer status from PortAudio current_sd_buffer = 0 if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: try: @@ -722,19 +723,90 @@ class Streamer(): except Exception: pass + # Get stream latency and CPU load from sounddevice + stream_latency_ms = None + cpu_load_pct = None + if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + try: + latency = big['audio_input']._stream.latency + if frame_count == 501: # Debug log once + logging.info(f"DEBUG: stream.latency raw value = {latency}, type = {type(latency)}") + # latency can be either a float (for input-only streams) or tuple (input, output) + if latency is not None: + if isinstance(latency, (int, float)): + # Single value for input-only stream + stream_latency_ms = float(latency) * 1000.0 + elif isinstance(latency, (tuple, list)) and len(latency) >= 1: + # Tuple (input_latency, output_latency) + stream_latency_ms = latency[0] * 1000.0 + except Exception as e: + if frame_count == 501: # Log once at startup + logging.warning(f"Could not get stream.latency: {e}") + + try: + cpu_load = big['audio_input']._stream.cpu_load + if frame_count == 501: # Debug log once + logging.info(f"DEBUG: stream.cpu_load raw value = {cpu_load}") + # cpu_load is a fraction (0.0 to 1.0) + if cpu_load is not None and cpu_load >= 0: + cpu_load_pct = cpu_load * 100.0 # Convert to percentage + except Exception as e: + if frame_count == 501: # Log once at startup + logging.warning(f"Could not get stream.cpu_load: {e}") + + # Get backend-specific buffer status + backend_delay = None + backend_label = "Backend" + + # Determine which backend we're using based on audio_input device + try: + device_info = big['audio_input']._device if hasattr(big['audio_input'], '_device') else None + if device_info is not None and isinstance(device_info, int): + hostapi = sd.query_hostapis(sd.query_devices(device_info)['hostapi']) + backend_name = hostapi['name'] + else: + backend_name = "Unknown" + except Exception: + backend_name = "Unknown" + + if 'pulse' in backend_name.lower(): + # PipeWire/PulseAudio backend - no direct buffer access + # SD_buffer is the only reliable metric + backend_label = "PipeWire" + backend_delay = None # Cannot read PipeWire internal buffers directly + else: + # ALSA backend - can read kernel buffer + backend_label = "ALSA_kernel" + try: + with open('/proc/asound/card0/pcm0c/sub0/status', 'r') as f: + for line in f: + if 'delay' in line and ':' in line: + backend_delay = int(line.split(':')[1].strip()) + break + except Exception: + pass + if enable_drift_compensation: avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0 discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0 + latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A" + cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A" logging.info( f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | " f"avg_discard={avg_discard_per_event:.0f} samples/event | " f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"{latency_str} | {cpu_str} | " f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" ) else: + backend_str = f"{backend_label}={backend_delay} samples ({backend_delay / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" if backend_delay is not None else f"{backend_label}=N/A (use pw-top)" + latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A" + cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A" logging.info( f"STATS: frames={frame_count} | " f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"{latency_str} | {cpu_str} | " + f"{backend_str} | " f"drift_compensation=DISABLED" ) last_stats_log = now @@ -791,6 +863,107 @@ if __name__ == "__main__": ) os.chdir(os.path.dirname(__file__)) + # ============================================================================= + # AUDIO BACKEND CONFIGURATION - Toggle between ALSA and PipeWire + # ============================================================================= + # Uncomment ONE of the following backend configurations: + + # Option 1: Direct ALSA (Direct hardware access, bypasses PipeWire) + AUDIO_BACKEND = 'ALSA' + target_latency_ms = 10.0 + + # Option 2: PipeWire via PulseAudio API (Routes through pipewire-pulse) + #AUDIO_BACKEND = 'PipeWire' + #target_latency_ms = 5.0 # PipeWire typically handles lower latency better + + # ============================================================================= + + import sounddevice as sd + import subprocess + + # Detect if PipeWire is running (even if we're using ALSA API) + pipewire_running = False + try: + result = subprocess.run(['systemctl', '--user', 'is-active', 'pipewire'], + capture_output=True, text=True, timeout=1) + pipewire_running = (result.returncode == 0) + except Exception: + pass + + if AUDIO_BACKEND == 'ALSA': + os.environ['SDL_AUDIODRIVER'] = 'alsa' + sd.default.latency = target_latency_ms / 1000.0 + + # Find ALSA host API + try: + alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis()) + if 'ALSA' in ha['name']) + logging.info(f"ALSA host API available at index: {alsa_hostapi}") + except StopIteration: + logging.error("ALSA backend not found!") + + elif AUDIO_BACKEND == 'PipeWire': + os.environ['SDL_AUDIODRIVER'] = 'pulseaudio' + sd.default.latency = target_latency_ms / 1000.0 + + if not pipewire_running: + logging.error("PipeWire selected but not running!") + raise RuntimeError("PipeWire is not active") + + # Find PulseAudio host API (required for PipeWire mode) + try: + pulse_hostapi = next(i for i, ha in enumerate(sd.query_hostapis()) + if 'pulse' in ha['name'].lower()) + logging.info(f"Using PulseAudio host API at index: {pulse_hostapi} → routes to PipeWire") + except StopIteration: + logging.error("PulseAudio host API not found! Did you rebuild PortAudio with -DPA_USE_PULSEAUDIO=ON?") + raise RuntimeError("PulseAudio API not available in PortAudio") + else: + logging.error(f"Unknown AUDIO_BACKEND: {AUDIO_BACKEND}") + raise ValueError(f"Invalid AUDIO_BACKEND: {AUDIO_BACKEND}") + + # Select audio input device based on backend + shure_device_idx = None + + if AUDIO_BACKEND == 'ALSA': + # Use ALSA devices + from auracast.utils.sounddevice_utils import get_alsa_usb_inputs + devices = get_alsa_usb_inputs() + logging.info("Searching ALSA devices for Shure MVX2U...") + + for idx, dev in devices: + logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)") + if 'shure' in dev['name'].lower() and 'mvx2u' in dev['name'].lower(): + shure_device_idx = idx + logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}") + break + + elif AUDIO_BACKEND == 'PipeWire': + # Use PulseAudio devices (routed through PipeWire) + logging.info("Searching PulseAudio devices for Shure MVX2U...") + + for idx, dev in enumerate(sd.query_devices()): + # Only consider PulseAudio input devices + if dev['max_input_channels'] > 0: + hostapi = sd.query_hostapis(dev['hostapi']) + if 'pulse' in hostapi['name'].lower(): + dev_name_lower = dev['name'].lower() + logging.info(f" PulseAudio device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)") + + # Skip monitor devices (they're output monitors, not real inputs) + if 'monitor' in dev_name_lower: + continue + + # Look for Shure MVX2U - prefer "Mono" device for mono input + if 'shure' in dev_name_lower and 'mvx2u' in dev_name_lower: + shure_device_idx = idx + logging.info(f"✓ Selected PulseAudio device {idx}: {dev['name']} → routes to PipeWire") + break + + if shure_device_idx is None: + logging.error(f"Shure MVX2U not found in {AUDIO_BACKEND} devices!") + raise RuntimeError(f"Audio device not found for {AUDIO_BACKEND} backend") + config = auracast_config.AuracastConfigGroup( bigs = [ auracast_config.AuracastBigConfigDeu(), @@ -821,6 +994,16 @@ if __name__ == "__main__": #big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files #big.audio_source = read_lc3_file(big.audio_source) # load files in advance + # --- Configure Shure MVX2U USB Audio Interface (ALSA backend) --- + if shure_device_idx is not None: + big.audio_source = f'device:{shure_device_idx}' # Shure MVX2U USB mono interface + big.input_format = 'int16le,48000,1' # int16, 48kHz, mono + logging.info(f"Configured BIG '{big.name}' with Shure MVX2U (device:{shure_device_idx}, 48kHz mono)") + else: + logging.warning(f"Shure device not found, BIG '{big.name}' will use default audio_source: {big.audio_source}") + + big.name='Broadcast0' + big.iso_que_len=1 # --- Network_uncoded mode using NetworkAudioReceiver --- #big.audio_source = NetworkAudioReceiverUncoded(port=50007, samplerate=16000, channels=1, chunk_size=1024) @@ -833,6 +1016,9 @@ if __name__ == "__main__": config.auracast_sampling_rate_hz = 16000 config.octets_per_frame = 40 # 32kbps@16kHz #config.debug = True + + # Enable clock drift compensation to prevent latency accumulation + # With ~43 samples/sec drift (0.89ms/sec), threshold of 2ms will trigger every ~2.2 seconds run_async( broadcast( diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index b0eb1cf..9b4ea56 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -5,19 +5,13 @@ multicast_script Loads environment variables from a .env file located next to this script and configures the multicast broadcast. Only UPPERCASE keys are read. +This version uses ALSA backend for USB audio devices (no PipeWire required). + Environment variables --------------------- - LOG_LEVEL: Logging level for the script. Default: INFO. Examples: DEBUG, INFO, WARNING, ERROR. -- INPUT: Select audio capture source. - Values: - - "usb" (default): first available USB input device. - - "aes67": select AES67 inputs. Two forms: - * INPUT=aes67 -> first available AES67 input. - * INPUT=aes67, -> case-insensitive substring match against - the device name, e.g. INPUT=aes67,8f6326. - - BROADCAST_NAME: Name of the broadcast (Auracast BIG name). Default: "Broadcast0". @@ -27,16 +21,16 @@ Environment variables - LANGUATE: ISO 639-3 language code used by config (intentional key name). Default: "deu". -- PULSE_LATENCY_MSEC: Pulse/PipeWire latency hint in milliseconds. - Default: 3. +- ALSA_LATENCY_MSEC: ALSA latency hint in milliseconds. + Default: 2. Examples (.env) --------------- LOG_LEVEL=DEBUG -INPUT=aes67,8f6326 BROADCAST_NAME=MyBroadcast PROGRAM_INFO="Live announcements" LANGUATE=deu +ALSA_LATENCY_MSEC=2 """ import logging import os @@ -45,9 +39,7 @@ from dotenv import load_dotenv from auracast import multicast from auracast import auracast_config from auracast.utils.sounddevice_utils import ( - get_usb_pw_inputs, - get_network_pw_inputs, - refresh_pw_cache, + get_alsa_usb_inputs, ) @@ -60,65 +52,29 @@ if __name__ == "__main__": os.chdir(os.path.dirname(__file__)) # Load .env located next to this script (only uppercase keys will be referenced) load_dotenv(dotenv_path='.env') - - os.environ.setdefault("PULSE_LATENCY_MSEC", "2") + # Default tight ALSA latency (ms); can be overridden via environment + os.environ.setdefault('ALSA_LATENCY_MSEC', '2') - # Refresh device cache and list inputs - refresh_pw_cache() - usb_inputs = get_usb_pw_inputs() - logging.info("USB pw inputs:") + # List USB ALSA inputs + usb_inputs = get_alsa_usb_inputs() + logging.info("USB ALSA inputs:") for i, d in usb_inputs: logging.info(f"{i}: {d['name']} in={d['max_input_channels']}") - - aes67_inputs = get_network_pw_inputs() - logging.info("AES67 pw inputs:") - for i, d in aes67_inputs: - logging.info(f"{i}: {d['name']} in={d['max_input_channels']}") - # Input selection (usb | aes67). Default to usb. - # Allows specifying an AES67 device by substring: INPUT=aes67, - # Example: INPUT=aes67,8f6326 will match a device name containing "8f6326". - input_env = os.environ.get('INPUT', 'usb') or 'usb' - parts = [p.strip() for p in input_env.split(',', 1)] - input_mode = (parts[0] or 'usb').lower() - iface_substr = (parts[1].lower() if len(parts) > 1 and parts[1] else None) - + # Loop until a USB input becomes available selected_dev = None - if input_mode == 'aes67': - if not aes67_inputs and not iface_substr: - # No AES67 inputs and no specific target -> fail fast - raise RuntimeError("No AES67 audio inputs found.") - if iface_substr: - # Loop until a matching AES67 input becomes available - while True: - refresh_pw_cache() - current = get_network_pw_inputs() - sel = next(((i, d) for i, d in current if iface_substr in (d.get('name','').lower())), None) - if sel: - input_sel = sel[0] - selected_dev = sel[1] - logging.info(f"Selected AES67 input by match '{iface_substr}': index={input_sel}") - break - logging.info(f"Waiting for AES67 input matching '{iface_substr}'... retrying in 2s") - time.sleep(2) - else: - input_sel, selected_dev = aes67_inputs[0] - logging.info(f"Selected first AES67 input: index={input_sel}, device={selected_dev['name']}") - else: - # Loop until a USB input becomes available (mirror AES67 retry behavior) - while True: - refresh_pw_cache() - current = get_usb_pw_inputs() - if current: - input_sel, selected_dev = current[0] - logging.info(f"Selected first USB input: index={input_sel}, device={selected_dev['name']}") - break - logging.info("Waiting for USB input... retrying in 2s") - time.sleep(2) + while True: + current = get_alsa_usb_inputs() + if current: + input_sel, selected_dev = current[0] + logging.info(f"Selected first USB input (ALSA): index={input_sel}, device={selected_dev['name']}") + break + logging.info("Waiting for USB input (ALSA)... retrying in 2s") + time.sleep(2) TRANSPORT1 = 'serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi gpio header TRANSPORT2 = 'serial:/dev/ttyAMA4,1000000,rtscts' # transport for raspberry pi gpio header - # Capture at 48 kHz to avoid PipeWire resampler latency; encode LC3 at 24 kHz + # Capture at 48 kHz to avoid resampler latency; encode LC3 at 24 kHz CAPTURE_SRATE = 48000 LC3_SRATE = 24000 OCTETS_PER_FRAME=60 @@ -156,7 +112,7 @@ if __name__ == "__main__": auracast_sampling_rate_hz = LC3_SRATE, octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1, - enable_drift_compensation=True, + enable_drift_compensation=False, drift_threshold_ms=2.0 ) config.debug = False diff --git a/src/auracast/utils/sounddevice_utils.py b/src/auracast/utils/sounddevice_utils.py index 7db59a4..12463fa 100644 --- a/src/auracast/utils/sounddevice_utils.py +++ b/src/auracast/utils/sounddevice_utils.py @@ -23,7 +23,11 @@ def _pa_like_hostapi_index(): raise RuntimeError("PipeWire/PulseAudio host API not present in PortAudio.") def _pw_dump(): - return json.loads(subprocess.check_output(["pw-dump"])) + try: + return json.loads(subprocess.check_output(["pw-dump"])) + except (FileNotFoundError, subprocess.CalledProcessError): + # PipeWire not available + return [] def _sd_refresh(): """Force PortAudio to re-enumerate devices on next query. @@ -66,12 +70,22 @@ def refresh_pw_cache(): Performs a full device scan and updates the internal caches for both USB and Network audio devices. This is a heavy operation and should not be called frequently or during active streams. + + If PipeWire is not available, caches will remain empty. """ global _usb_inputs_cache, _network_inputs_cache # Force PortAudio to re-enumerate devices _sd_refresh() - pa_idx = _pa_like_hostapi_index() + + try: + pa_idx = _pa_like_hostapi_index() + except RuntimeError: + # PipeWire/PulseAudio not available - reset caches and return + _usb_inputs_cache = [] + _network_inputs_cache = [] + return + pw = _pw_dump() # --- Pass 1: Map device.id to device.bus --- @@ -129,6 +143,53 @@ def refresh_pw_cache(): _network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names) +def get_alsa_usb_inputs(): + """ + Return USB audio input devices using the ALSA backend. + Filters for devices that appear to be USB hardware (hw:X,Y pattern or USB in name). + Returns list of (index, device_dict) tuples. + """ + try: + alsa_devices = devices_by_backend('ALSA') + except ValueError: + # ALSA backend not available + return [] + + usb_inputs = [] + for idx, dev in alsa_devices: + # Only include input devices + if dev.get('max_input_channels', 0) <= 0: + continue + + name = dev.get('name', '').lower() + # Filter for USB devices based on common patterns: + # - Contains 'usb' in the name + # - hw:X,Y pattern (ALSA hardware devices) + # Exclude: default, dmix, pulse, pipewire, sysdefault + if any(exclude in name for exclude in ['default', 'dmix', 'pulse', 'pipewire', 'sysdefault']): + continue + + # Include if it has 'usb' in name or matches hw:X pattern + if 'usb' in name or re.match(r'hw:\d+', name): + usb_inputs.append((idx, dev)) + + return usb_inputs + +def get_alsa_inputs(): + """ + Return all ALSA audio input devices. + Returns list of (index, device_dict) tuples. + """ + try: + alsa_devices = devices_by_backend('ALSA') + except ValueError: + # ALSA backend not available + return [] + + return [(idx, dev) for idx, dev in alsa_devices + if dev.get('max_input_channels', 0) > 0] + + # Populate cache on initial module load refresh_pw_cache() diff --git a/src/scripts/list_pw_nodes.py b/src/scripts/list_pw_nodes.py index 7c9bc85..270eb6a 100644 --- a/src/scripts/list_pw_nodes.py +++ b/src/scripts/list_pw_nodes.py @@ -1,6 +1,5 @@ import sounddevice as sd, pprint -from auracast.utils.sounddevice_utils import devices_by_backend, list_usb_pw_inputs, list_network_pw_inputs - +from auracast.utils.sounddevice_utils import devices_by_backend print("PortAudio library:", sd._libname) print("PortAudio version:", sd.get_portaudio_version()) @@ -15,10 +14,3 @@ for i, d in devices_by_backend("PulseAudio"): print(f"{i}: {d['name']} in={d['max_input_channels']} out={d['max_output_channels']}") -print("Network pw inputs:") -for i, d in list_network_pw_inputs(): - print(f"{i}: {d['name']} in={d['max_input_channels']}") - -print("USB pw inputs:") -for i, d in list_usb_pw_inputs(): - print(f"{i}: {d['name']} in={d['max_input_channels']}") -- 2.52.0 From 60aa653aebf9f4b0c19a086ec7f51a028ab48fea Mon Sep 17 00:00:00 2001 From: pstruebi Date: Thu, 23 Oct 2025 18:23:20 +0200 Subject: [PATCH 03/11] tune the adaptive resampler to be near to inaudible --- src/auracast/multicast.py | 114 ++++++++++++++++++++++++++----- src/auracast/multicast_script.py | 4 +- 2 files changed, 99 insertions(+), 19 deletions(-) diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 96dab16..7d5444f 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -52,7 +52,36 @@ from auracast.utils.network_audio_receiver import NetworkAudioReceiverUncoded from auracast.utils.webrtc_audio_input import WebRTCAudioInput -# Instantiate WebRTC audio input for streaming (can be used per-BIG or globally) +# Patch sounddevice.InputStream globally to use low-latency settings +import sounddevice as sd + +class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput): + """Patched SoundDeviceAudioInput that creates RawInputStream with low-latency parameters.""" + + def _open(self): + """Patched _open method that creates RawInputStream with low-latency parameters.""" + + # Create RawInputStream with injected low-latency parameters + self._stream = sd.RawInputStream( + samplerate=self._pcm_format.sample_rate, + device=self._device, + channels=self._pcm_format.channels, + dtype='int16', + blocksize=240, # Match frame size + latency=0.010, + ) + self._stream.start() + + logging.info(f"SoundDeviceAudioInput: Opened with blocksize=240, latency=0.010 (10ms)") + + return audio_io.PcmFormat( + audio_io.PcmFormat.Endianness.LITTLE, + audio_io.PcmFormat.SampleType.INT16, + self._pcm_format.sample_rate, + 2, + ) + +audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput # modified from bumble class ModWaveAudioInput(audio_io.ThreadedAudioInput): @@ -618,21 +647,24 @@ class Streamer(): big['encoder'] = encoder big['precoded'] = False - logging.info("Streaming audio...") bigs = self.bigs self.is_streaming = True # Sample discard stats (clock drift compensation) + sample_rate = big['audio_input']._pcm_format.sample_rate samples_discarded_total = 0 # Total samples discarded discard_events = 0 # Number of times we discarded samples + frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop) enable_drift_compensation = global_config.enable_drift_compensation + discard_guard_frames = sample_rate // 100 # Don't allow discard within this many frames of previous discard # Calculate threshold based on config (default 2ms) - drift_threshold_ms = global_config.drift_threshold_ms if enable_drift_compensation else 0 - drop_threshold_samples = 0 + drift_threshold_ms = global_config.drift_threshold_ms if global_config.enable_drift_compensation else 0 + drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0) + static_drop_samples = int(sample_rate * 0.0005) # Always drop a static amount of samples - if enable_drift_compensation: - logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms") + if global_config.enable_drift_compensation: + logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms, guard={discard_guard_frames} frames") else: logging.info("Clock drift compensation DISABLED") @@ -669,26 +701,41 @@ class Streamer(): # Calculate threshold samples based on sample rate (only once per BIG) if enable_drift_compensation and drop_threshold_samples == 0: - sample_rate = big['audio_input']._pcm_format.sample_rate drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0) logging.info(f"Drift compensation threshold: {drop_threshold_samples} samples ({drift_threshold_ms}ms @ {sample_rate}Hz)") + logging.info(f"Static drop amount: {static_drop_samples} samples (3.0ms @ {sample_rate}Hz)") # Discard excess samples in buffer if above threshold (clock drift compensation) if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: sd_buffer_samples = big['audio_input']._stream.read_available - if sd_buffer_samples > drop_threshold_samples: - # Discard ALL remaining samples to bring buffer back down + # Guard: only allow discard if enough frames have passed since last discard + if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames: + # Always drop a static amount (3ms) for predictable behavior + # This matches the crossfade duration better for smoother transitions + samples_to_drop = static_drop_samples try: - discarded_data = big['audio_input']._stream.read(sd_buffer_samples) - samples_discarded_total += sd_buffer_samples + discarded_data = await anext(big['audio_input'].frames(samples_to_drop)) + samples_discarded_total += samples_to_drop discard_events += 1 - if discard_events % 100 == 0: # Log every 100th discard - logging.warning( - f"Discard #{discard_events}: {sd_buffer_samples} samples ({sd_buffer_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) " - f"| total discarded: {samples_discarded_total} samples" - ) + # Log every discard event with timing information + sample_rate = big['audio_input']._pcm_format.sample_rate + time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms + logging.info( + f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | " + f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | " + f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | " + f"frame={frame_count}" + ) + + # Reset guard counter + frames_since_last_discard = 0 + # Store how much we dropped for potential adaptive crossfade + big['last_drop_samples'] = samples_to_drop + # Set flag to apply crossfade on next frame + big['apply_crossfade'] = True + except Exception as e: logging.error(f"Failed to discard samples: {e}") @@ -704,6 +751,39 @@ class Streamer(): samples = np.frombuffer(pcm_frame, dtype=dtype) samples = samples.reshape(-1, big['channels']).mean(axis=1) pcm_frame = samples.astype(dtype).tobytes() + + # Apply crossfade if samples were just dropped (drift compensation) + if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None: + # Crossfade duration: 10ms for smoother transition (was 5ms) + dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32 + sample_rate = big['audio_input']._pcm_format.sample_rate + crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2) + + # Convert frames to numpy arrays (make writable copies) + prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy() + curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy() + + # Create equal-power crossfade curves (smoother than linear) + # Equal-power maintains perceived loudness during transition + t = np.linspace(0, 1, crossfade_samples) + fade_out = np.cos(t * np.pi / 2) # Cosine fade out + fade_in = np.sin(t * np.pi / 2) # Sine fade in + + # Apply crossfade to the beginning of current frame with end of previous frame + if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples: + crossfaded = ( + prev_samples[-crossfade_samples:] * fade_out + + curr_samples[:crossfade_samples] * fade_in + ).astype(dtype) + # Replace beginning of current frame with crossfaded section + curr_samples[:crossfade_samples] = crossfaded + pcm_frame = curr_samples.tobytes() + + big['apply_crossfade'] = False + + # Store current frame for potential next crossfade + if enable_drift_compensation: + big['prev_pcm_frame'] = pcm_frame lc3_frame = big['encoder'].encode( pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'] @@ -711,6 +791,8 @@ class Streamer(): await big['iso_queue'].write(lc3_frame) frame_count += 1 + # Increment guard counter (tracks frames since last discard) + frames_since_last_discard += 1 # Periodic stats logging now = time.perf_counter() diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 9b4ea56..1f410b2 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -52,8 +52,6 @@ if __name__ == "__main__": os.chdir(os.path.dirname(__file__)) # Load .env located next to this script (only uppercase keys will be referenced) load_dotenv(dotenv_path='.env') - # Default tight ALSA latency (ms); can be overridden via environment - os.environ.setdefault('ALSA_LATENCY_MSEC', '2') # List USB ALSA inputs usb_inputs = get_alsa_usb_inputs() @@ -112,7 +110,7 @@ if __name__ == "__main__": auracast_sampling_rate_hz = LC3_SRATE, octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1, - enable_drift_compensation=False, + enable_drift_compensation=True, drift_threshold_ms=2.0 ) config.debug = False -- 2.52.0 From da957141b4bc52a5f4ba7f80ff7e5f7b5cddb651 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Thu, 23 Oct 2025 18:32:59 +0200 Subject: [PATCH 04/11] refactor: simplify clock drift compensation with hardcoded parameters and improved frame drop logic --- src/auracast/auracast_config.py | 1 - src/auracast/multicast.py | 25 +++++++++++-------------- src/auracast/multicast_script.py | 1 - 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/auracast/auracast_config.py b/src/auracast/auracast_config.py index 06d855f..a283392 100644 --- a/src/auracast/auracast_config.py +++ b/src/auracast/auracast_config.py @@ -42,7 +42,6 @@ class AuracastGlobalConfig(BaseModel): assisted_listening_stream: bool = False # Clock drift compensation: discard excess samples when buffer exceeds threshold enable_drift_compensation: bool = False - drift_threshold_ms: float = 2.0 # Discard threshold in milliseconds # "Audio input. " # "'device' -> use the host's default sound input device, " diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 7d5444f..7e7a7c0 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -651,19 +651,22 @@ class Streamer(): bigs = self.bigs self.is_streaming = True - # Sample discard stats (clock drift compensation) + # frame drop algo parameters sample_rate = big['audio_input']._pcm_format.sample_rate samples_discarded_total = 0 # Total samples discarded discard_events = 0 # Number of times we discarded samples frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop) enable_drift_compensation = global_config.enable_drift_compensation - discard_guard_frames = sample_rate // 100 # Don't allow discard within this many frames of previous discard - # Calculate threshold based on config (default 2ms) - drift_threshold_ms = global_config.drift_threshold_ms if global_config.enable_drift_compensation else 0 + # Hardcoded parameters (unit: milliseconds) + drift_threshold_ms = 2.0 if enable_drift_compensation else 0.0 + static_drop_ms = 0.5 if enable_drift_compensation else 0.0 + # Guard interval measured in LC3 frames (10 ms each); 50 => 500 ms cooldown + discard_guard_frames = int(2*sample_rate / 1000) if enable_drift_compensation else 0 + # Derived sample counts drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0) - static_drop_samples = int(sample_rate * 0.0005) # Always drop a static amount of samples - - if global_config.enable_drift_compensation: + static_drop_samples = int(sample_rate * static_drop_ms / 1000.0) + + if enable_drift_compensation: logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms, guard={discard_guard_frames} frames") else: logging.info("Clock drift compensation DISABLED") @@ -699,12 +702,6 @@ class Streamer(): stream_finished[i] = True continue - # Calculate threshold samples based on sample rate (only once per BIG) - if enable_drift_compensation and drop_threshold_samples == 0: - drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0) - logging.info(f"Drift compensation threshold: {drop_threshold_samples} samples ({drift_threshold_ms}ms @ {sample_rate}Hz)") - logging.info(f"Static drop amount: {static_drop_samples} samples (3.0ms @ {sample_rate}Hz)") - # Discard excess samples in buffer if above threshold (clock drift compensation) if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: sd_buffer_samples = big['audio_input']._stream.read_available @@ -713,7 +710,7 @@ class Streamer(): if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames: # Always drop a static amount (3ms) for predictable behavior # This matches the crossfade duration better for smoother transitions - samples_to_drop = static_drop_samples + samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1)) try: discarded_data = await anext(big['audio_input'].frames(samples_to_drop)) samples_discarded_total += samples_to_drop diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 1f410b2..1c71f3e 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -111,7 +111,6 @@ if __name__ == "__main__": octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1, enable_drift_compensation=True, - drift_threshold_ms=2.0 ) config.debug = False -- 2.52.0 From 9bdf8e6348cf65ef3328f5185747003297dfdc24 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Thu, 23 Oct 2025 18:43:38 +0200 Subject: [PATCH 05/11] refactor: rename drift_compensation to adaptive_frame_dropping for clarity --- src/auracast/auracast_config.py | 4 ++-- src/auracast/multicast.py | 2 +- src/auracast/multicast_script.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/auracast/auracast_config.py b/src/auracast/auracast_config.py index a283392..a904b11 100644 --- a/src/auracast/auracast_config.py +++ b/src/auracast/auracast_config.py @@ -40,8 +40,8 @@ class AuracastGlobalConfig(BaseModel): # so receivers may render earlier than the presentation delay for lower latency. immediate_rendering: bool = False assisted_listening_stream: bool = False - # Clock drift compensation: discard excess samples when buffer exceeds threshold - enable_drift_compensation: bool = False + # Adaptive frame dropping: discard sub-frame samples when buffer exceeds threshold + enable_adaptive_frame_dropping: bool = False # "Audio input. " # "'device' -> use the host's default sound input device, " diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 7e7a7c0..bf1b343 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -656,7 +656,7 @@ class Streamer(): samples_discarded_total = 0 # Total samples discarded discard_events = 0 # Number of times we discarded samples frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop) - enable_drift_compensation = global_config.enable_drift_compensation + enable_drift_compensation = getattr(global_config, 'enable_adaptive_frame_dropping', False) # Hardcoded parameters (unit: milliseconds) drift_threshold_ms = 2.0 if enable_drift_compensation else 0.0 static_drop_ms = 0.5 if enable_drift_compensation else 0.0 diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 1c71f3e..97b05c8 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -110,7 +110,7 @@ if __name__ == "__main__": auracast_sampling_rate_hz = LC3_SRATE, octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1, - enable_drift_compensation=True, + enable_adaptive_frame_dropping=True, ) config.debug = False -- 2.52.0 From 6db0405d800b7d0fc8af9c47120c1561d42cb20b Mon Sep 17 00:00:00 2001 From: pstruebi Date: Thu, 23 Oct 2025 18:43:44 +0200 Subject: [PATCH 06/11] feat: enable adaptive frame dropping only for device-based audio inputs --- src/auracast/server/multicast_server.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 8c1c52e..04d2865 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -174,7 +174,14 @@ class StreamerWorker: # overwrite some configurations conf.transport = TRANSPORT1 - conf.enable_drift_compensation=True + # Enable adaptive frame dropping only for device-based inputs (not file/demo) + try: + conf.enable_adaptive_frame_dropping = any( + isinstance(big.audio_source, str) and big.audio_source.startswith('device:') + for big in conf.bigs + ) + except Exception: + conf.enable_adaptive_frame_dropping = False # Derive device name and input mode first_source = conf.bigs[0].audio_source if conf.bigs else '' input_device_name = None @@ -242,6 +249,14 @@ class StreamerWorker: self._multicaster2 = None conf.transport = TRANSPORT2 + # Enable adaptive frame dropping only for device-based inputs (not file/demo) + try: + conf.enable_adaptive_frame_dropping = any( + isinstance(big.audio_source, str) and big.audio_source.startswith('device:') + for big in conf.bigs + ) + except Exception: + conf.enable_adaptive_frame_dropping = False for big in conf.bigs: if big.audio_source.startswith('device:'): device_name = big.audio_source.split(':', 1)[1] -- 2.52.0 From 7cd8e66f442e6284dd87ccf5cce109e0f35a15d0 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Fri, 24 Oct 2025 09:36:48 +0200 Subject: [PATCH 07/11] feat: add PipeWire network input support and improve device selection logic --- src/auracast/multicast.py | 2 +- src/auracast/multicast_script.py | 75 +++++++++++++++++++++---- src/auracast/server/multicast_server.py | 48 ++++++++-------- src/auracast/utils/sounddevice_utils.py | 68 ++++++++++++++++++++++ 4 files changed, 159 insertions(+), 34 deletions(-) diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index bf1b343..aa05cdb 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -659,7 +659,7 @@ class Streamer(): enable_drift_compensation = getattr(global_config, 'enable_adaptive_frame_dropping', False) # Hardcoded parameters (unit: milliseconds) drift_threshold_ms = 2.0 if enable_drift_compensation else 0.0 - static_drop_ms = 0.5 if enable_drift_compensation else 0.0 + static_drop_ms = 1 if enable_drift_compensation else 0.0 # Guard interval measured in LC3 frames (10 ms each); 50 => 500 ms cooldown discard_guard_frames = int(2*sample_rate / 1000) if enable_drift_compensation else 0 # Derived sample counts diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 97b05c8..01c4fdd 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -21,17 +21,13 @@ Environment variables - LANGUATE: ISO 639-3 language code used by config (intentional key name). Default: "deu". -- ALSA_LATENCY_MSEC: ALSA latency hint in milliseconds. - Default: 2. Examples (.env) --------------- LOG_LEVEL=DEBUG BROADCAST_NAME=MyBroadcast PROGRAM_INFO="Live announcements" -LANGUATE=deu -ALSA_LATENCY_MSEC=2 -""" +LANGUATE=deu""" import logging import os import time @@ -40,6 +36,9 @@ from auracast import multicast from auracast import auracast_config from auracast.utils.sounddevice_utils import ( get_alsa_usb_inputs, + get_network_pw_inputs, # PipeWire network (AES67) inputs + refresh_pw_cache, + resolve_input_device_index, ) @@ -53,21 +52,75 @@ if __name__ == "__main__": # Load .env located next to this script (only uppercase keys will be referenced) load_dotenv(dotenv_path='.env') - # List USB ALSA inputs + # Refresh PipeWire cache and list devices (Network only for PW; USB via ALSA) + try: + refresh_pw_cache() + except Exception: + pass + pw_net = get_network_pw_inputs() + # List PipeWire Network inputs + logging.info("PipeWire Network inputs:") + for i, d in pw_net: + logging.info(f"{i}: {d['name']} in={d.get('max_input_channels', 0)}") + + # Also list USB ALSA inputs (fallback path) usb_inputs = get_alsa_usb_inputs() logging.info("USB ALSA inputs:") for i, d in usb_inputs: logging.info(f"{i}: {d['name']} in={d['max_input_channels']}") - # Loop until a USB input becomes available + # Optional device selection via .env: match by string or substring (uppercase keys only) + device_match = os.environ.get('INPUT_DEVICE') + device_match = device_match.strip() if isinstance(device_match, str) else None + + # Loop until a device becomes available (prefer PW Network, then ALSA USB) selected_dev = None while True: - current = get_alsa_usb_inputs() - if current: - input_sel, selected_dev = current[0] + try: + refresh_pw_cache() + except Exception: + pass + pw_net = get_network_pw_inputs() + # 1) Try to satisfy explicit .env match on PW Network + if device_match and pw_net: + for _, d in pw_net: + name = d.get('name', '') + if device_match in name: + idx = resolve_input_device_index(name) + if idx is not None: + input_sel = idx + selected_dev = d + logging.info(f"Selected Network input by match '{device_match}' (PipeWire): index={input_sel}, device={name}") + break + if selected_dev is not None: + break + if pw_net and selected_dev is None: + _, d0 = pw_net[0] + idx = resolve_input_device_index(d0.get('name', '')) + if idx is not None: + input_sel = idx + selected_dev = d0 + logging.info(f"Selected first Network input (PipeWire): index={input_sel}, device={d0['name']}") + break + current_alsa = get_alsa_usb_inputs() + # 2) Try to satisfy explicit .env match on ALSA USB + if device_match and current_alsa and selected_dev is None: + matched = None + for idx_alsa, d in current_alsa: + name = d.get('name', '') + if device_match in name: + matched = (idx_alsa, d) + break + if matched is not None: + input_sel, selected_dev = matched + logging.info(f"Selected USB input by match '{device_match}' (ALSA): index={input_sel}, device={selected_dev['name']}") + break + # Fallback to first ALSA USB + if current_alsa and selected_dev is None: + input_sel, selected_dev = current_alsa[0] logging.info(f"Selected first USB input (ALSA): index={input_sel}, device={selected_dev['name']}") break - logging.info("Waiting for USB input (ALSA)... retrying in 2s") + logging.info("Waiting for audio input (prefer PW Network, then ALSA USB)... retrying in 2s") time.sleep(2) TRANSPORT1 = 'serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi gpio header diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 04d2865..267f36f 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -26,8 +26,9 @@ import sounddevice as sd # type: ignore from typing import Set import traceback from auracast.utils.sounddevice_utils import ( - get_usb_pw_inputs, get_network_pw_inputs, + get_alsa_usb_inputs, + resolve_input_device_index, refresh_pw_cache, ) from auracast.utils.reset_utils import reset_nrf54l @@ -51,19 +52,7 @@ class Offer(BaseModel): sdp: str type: str -def get_device_index_by_name(name: str): - """Return the device index for a given device name, or None if not found. - - Queries the current sounddevice list directly (no cache). - """ - try: - devs = sd.query_devices() - for idx, d in enumerate(devs): - if d.get("name") == name and d.get("max_input_channels", 0) > 0: - return idx - except Exception: - pass - return None +# Device resolution is centralized in utils.resolve_input_device_index def _hydrate_settings_cache_from_disk() -> None: @@ -115,7 +104,7 @@ app.add_middleware( # Initialize global configuration global_config_group = auracast_config.AuracastConfigGroup() -class StreamerWorker: +class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? """Owns multicaster(s) on a dedicated asyncio loop in a background thread.""" def __init__(self) -> None: @@ -189,14 +178,20 @@ class StreamerWorker: if first_source.startswith('device:'): input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None try: - usb_names = {d.get('name') for _, d in get_usb_pw_inputs()} + alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} + except Exception: + alsa_usb_names = set() + try: net_names = {d.get('name') for _, d in get_network_pw_inputs()} except Exception: - usb_names, net_names = set(), set() + net_names = set() audio_mode_persist = 'AES67' if (input_device_name in net_names) else 'USB' - # Map device name to index and configure input_format - device_index = int(input_device_name) if (input_device_name and input_device_name.isdigit()) else get_device_index_by_name(input_device_name or '') + # Map device name to index using centralized resolver + if input_device_name and input_device_name.isdigit(): + device_index = int(input_device_name) + else: + device_index = resolve_input_device_index(input_device_name or '') if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.") for big in conf.bigs: @@ -260,7 +255,16 @@ class StreamerWorker: for big in conf.bigs: if big.audio_source.startswith('device:'): device_name = big.audio_source.split(':', 1)[1] - device_index = get_device_index_by_name(device_name) + # Resolve backend preference by membership + try: + net_names = {d.get('name') for _, d in get_network_pw_inputs()} + except Exception: + net_names = set() + try: + alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} + except Exception: + alsa_usb_names = set() + device_index = resolve_input_device_index(device_name) if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.") big.audio_source = f'device:{device_index}' @@ -504,11 +508,11 @@ async def _startup_autostart_event(): @app.get("/audio_inputs_pw_usb") async def audio_inputs_pw_usb(): - """List PipeWire USB input nodes from cache.""" + """List USB input devices using ALSA backend (USB is ALSA in our scheme).""" try: devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} - for idx, dev in get_usb_pw_inputs() + for idx, dev in get_alsa_usb_inputs() ] return {"inputs": devices} except Exception as e: diff --git a/src/auracast/utils/sounddevice_utils.py b/src/auracast/utils/sounddevice_utils.py index 12463fa..9d7c1f8 100644 --- a/src/auracast/utils/sounddevice_utils.py +++ b/src/auracast/utils/sounddevice_utils.py @@ -53,6 +53,74 @@ def _sd_matches_from_names(pa_idx, names): out.append((i, d)) return out +def get_device_index_by_name(name: str, backend: str | None = None): + """Return the sounddevice index for an input device with exact name. + + If backend is provided, restrict to that backend: + - 'ALSA' uses the ALSA host API + - 'PipeWire' or 'PulseAudio' use the PipeWire/Pulse host API index + Returns None if not found. + """ + try: + devices = sd.query_devices() + hostapi_filter = None + if backend: + if backend.lower() == 'alsa': + try: + alsa = devices_by_backend('ALSA') + alsa_indices = {idx for idx, _ in alsa} + hostapi_filter = ('indices', alsa_indices) + except Exception: + return None + else: + try: + pa_idx = _pa_like_hostapi_index() + hostapi_filter = ('hostapi', pa_idx) + except Exception: + return None + for idx, d in enumerate(devices): + if d.get('max_input_channels', 0) <= 0: + continue + if d.get('name') != name: + continue + if hostapi_filter: + kind, val = hostapi_filter + if kind == 'indices' and idx not in val: + continue + if kind == 'hostapi' and d.get('hostapi') != val: + continue + return idx + except Exception: + return None + return None + +def resolve_input_device_index(name: str): + """Resolve device index by exact name preferring backend by device type. + + - If name is known PipeWire Network device, use PipeWire backend. + - Else if name is known ALSA USB device, use ALSA backend. + - Else fallback to any backend. + Returns None if not found. + """ + try: + net_names = {d.get('name') for _, d in get_network_pw_inputs()} + except Exception: + net_names = set() + try: + alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} + except Exception: + alsa_usb_names = set() + + if name in net_names: + idx = get_device_index_by_name(name, backend='PipeWire') + if idx is not None: + return idx + if name in alsa_usb_names: + idx = get_device_index_by_name(name, backend='ALSA') + if idx is not None: + return idx + return get_device_index_by_name(name, backend=None) + # Module-level caches for device lists _usb_inputs_cache = [] _network_inputs_cache = [] -- 2.52.0 From f91f177b9a8def28f1f8a870fc42e7c97de1eb9e Mon Sep 17 00:00:00 2001 From: pstruebi Date: Fri, 31 Oct 2025 11:05:38 +0100 Subject: [PATCH 08/11] feat: enforce 48kHz audio capture and fix device detection - Changed audio capture rate to fixed 48kHz instead of using device default to prevent resampling latency and 44.1kHz compatibility issues - Updated device detection to use get_alsa_usb_inputs() instead of get_usb_pw_inputs() for more accurate USB audio device discovery - Maintains original channel count logic to use maximum of 2 channels based on device capabilities --- src/auracast/server/multicast_server.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 5d2a1de..90976ed 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -198,7 +198,8 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? if big.audio_source.startswith('device:'): big.audio_source = f'device:{device_index}' devinfo = sd.query_devices(device_index) - capture_rate = int(devinfo.get('default_samplerate') or 48000) + # Force capture at 48 kHz to avoid resampler latency and 44.1 kHz incompatibilities + capture_rate = 48000 max_in = int(devinfo.get('max_input_channels') or 1) channels = max(1, min(2, max_in)) for big in conf.bigs: @@ -446,7 +447,7 @@ async def _autostart_from_settings(): ): return # Check against the cached device lists - usb = [d for _, d in get_usb_pw_inputs()] + usb = [d for _, d in get_alsa_usb_inputs()] net = [d for _, d in get_network_pw_inputs()] names = {d.get('name') for d in usb} | {d.get('name') for d in net} if input_device_name in names: -- 2.52.0 From 230db3a96b61e1cf7f9ba6722a256ad27075a2ca Mon Sep 17 00:00:00 2001 From: pstruebi Date: Mon, 3 Nov 2025 16:00:09 +0100 Subject: [PATCH 09/11] feat: persist and restore demo stream configuration - Added persistence of demo stream type and count to maintain user preferences across sessions - Enhanced demo stream selection to automatically restore previously selected configuration - Added fallback logic to match saved sampling rate and stream count when exact demo type not found - Updated secondary stream initialization to track total demo streams across primary/secondary configs - Modified settings storage to include demo_stream_type and demo --- src/auracast/server/multicast_frontend.py | 22 ++++++++++++++++-- src/auracast/server/multicast_server.py | 28 +++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index 7b78b75..3a8dae7 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -170,11 +170,29 @@ if audio_mode == "Demo": "6 × 16kHz": {"quality": "Fair (16kHz)", "streams": 6}, } demo_options = list(demo_stream_map.keys()) - default_demo = demo_options[0] + default_index = 0 + saved_type = saved_settings.get('demo_stream_type') + if isinstance(saved_type, str) and saved_type in demo_options: + default_index = demo_options.index(saved_type) + else: + saved_rate = saved_settings.get('auracast_sampling_rate_hz') + saved_total = saved_settings.get('demo_total_streams') + if saved_total is None and saved_settings.get('audio_mode') == 'Demo': + saved_total = len(saved_settings.get('channel_names') or []) + try: + if saved_rate and saved_total: + for i, label in enumerate(demo_options): + cfg = demo_stream_map[label] + rate_for_label = QUALITY_MAP[cfg['quality']]['rate'] + if cfg['streams'] == int(saved_total) and int(saved_rate) == rate_for_label: + default_index = i + break + except Exception: + default_index = 0 demo_selected = st.selectbox( "Demo Stream Type", demo_options, - index=0, + index=default_index, help="Select the demo stream configuration." ) # Stream password and flags (same as USB/AES67) diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 90976ed..7692e22 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -219,6 +219,14 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? auto_started = True # Return proposed settings to persist on API side + demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:')) + demo_rate = int(conf.auracast_sampling_rate_hz or 0) + demo_type = None + if demo_count > 0 and demo_rate > 0: + if demo_rate in (48000, 24000, 16000): + demo_type = f"{demo_count} × {demo_rate//1000}kHz" + else: + demo_type = f"{demo_count} × {demo_rate}Hz" return { 'channel_names': [big.name for big in conf.bigs], 'languages': [big.language for big in conf.bigs], @@ -233,6 +241,8 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? 'immediate_rendering': getattr(conf, 'immediate_rendering', False), 'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False), 'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None), + 'demo_total_streams': demo_count, + 'demo_stream_type': demo_type, 'is_streaming': auto_started, } @@ -345,6 +355,24 @@ async def initialize2(conf: auracast_config.AuracastConfigGroup): try: log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) await streamer.call(streamer._w_init_secondary, conf) + try: + is_demo = any(isinstance(big.audio_source, str) and big.audio_source.startswith('file:') for big in conf.bigs) + if is_demo: + settings = load_stream_settings() or {} + primary_count = int(settings.get('demo_total_streams') or len(settings.get('channel_names') or [])) + secondary_count = len(conf.bigs or []) + total = primary_count + secondary_count + settings['demo_total_streams'] = total + demo_rate = int(conf.auracast_sampling_rate_hz or 0) + if demo_rate > 0: + if demo_rate in (48000, 24000, 16000): + settings['demo_stream_type'] = f"{total} × {demo_rate//1000}kHz" + else: + settings['demo_stream_type'] = f"{total} × {demo_rate}Hz" + settings['timestamp'] = datetime.utcnow().isoformat() + save_stream_settings(settings) + except Exception: + log.warning("Failed to persist demo_total_streams in /init2", exc_info=True) except Exception as e: log.error("Exception in /init2: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) -- 2.52.0 From c01bdb8b9692602a7ec2772c39e43e586266457f Mon Sep 17 00:00:00 2001 From: pstruebi Date: Mon, 3 Nov 2025 16:12:09 +0100 Subject: [PATCH 10/11] feat: add detailed audio device logging for input stream setup - Added comprehensive logging of audio device configuration when opening input stream - Included host API name, device details, channel count, and latency parameters in logs - Added PortAudio version logging to help with troubleshooting - Implemented error handling for device query failures with warning log output --- src/auracast/multicast.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index aa05cdb..a1f99f7 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -60,7 +60,29 @@ class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput): def _open(self): """Patched _open method that creates RawInputStream with low-latency parameters.""" - + try: + dev_info = sd.query_devices(self._device) + hostapis = sd.query_hostapis() + api_index = dev_info.get('hostapi') + api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown' + pa_ver = None + try: + pa_ver = sd.get_portaudio_version() + except Exception: + pass + logging.info( + "SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s", + api_name, + dev_info.get('name'), + self._device, + dev_info.get('max_input_channels'), + float(dev_info.get('default_low_input_latency') or 0.0), + float(dev_info.get('default_high_input_latency') or 0.0), + pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver, + ) + except Exception as e: + logging.warning("Failed to query sounddevice backend/device info: %s", e) + # Create RawInputStream with injected low-latency parameters self._stream = sd.RawInputStream( samplerate=self._pcm_format.sample_rate, -- 2.52.0 From a196bcd2fcd6f9d44852926addad86e0fcf8b111 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Tue, 4 Nov 2025 15:11:47 +0100 Subject: [PATCH 11/11] fix: disable periodic stats for WAV inputs and guard sample_rate access --- src/auracast/multicast.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index a1f99f7..02203bb 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -674,7 +674,12 @@ class Streamer(): self.is_streaming = True # frame drop algo parameters - sample_rate = big['audio_input']._pcm_format.sample_rate + # In demo/precoded modes there may be no audio_input or no _pcm_format yet + ai = big.get('audio_input') + if ai is not None and hasattr(ai, '_pcm_format') and getattr(ai, '_pcm_format') is not None: + sample_rate = ai._pcm_format.sample_rate + else: + sample_rate = global_config.auracast_sampling_rate_hz samples_discarded_total = 0 # Total samples discarded discard_events = 0 # Number of times we discarded samples frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop) @@ -813,9 +818,11 @@ class Streamer(): # Increment guard counter (tracks frames since last discard) frames_since_last_discard += 1 - # Periodic stats logging + # Periodic stats logging (only for device/sounddevice streams, not WAV files) + # WAV file concurrent access causes deadlock in ThreadedAudioInput now = time.perf_counter() - if now - last_stats_log >= stats_interval: + is_device_stream = hasattr(big['audio_input'], '_stream') and big['audio_input']._stream is not None + if is_device_stream and now - last_stats_log >= stats_interval: # Get current buffer status from PortAudio current_sd_buffer = 0 if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: -- 2.52.0