diff --git a/src/auracast/auracast_config.py b/src/auracast/auracast_config.py index 2971dea..a904b11 100644 --- a/src/auracast/auracast_config.py +++ b/src/auracast/auracast_config.py @@ -40,6 +40,8 @@ class AuracastGlobalConfig(BaseModel): # so receivers may render earlier than the presentation delay for lower latency. immediate_rendering: bool = False assisted_listening_stream: bool = False + # Adaptive frame dropping: discard sub-frame samples when buffer exceeds threshold + enable_adaptive_frame_dropping: bool = False # "Audio input. " # "'device' -> use the host's default sound input device, " diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 93792e5..02203bb 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -26,11 +26,9 @@ import struct from typing import cast, Any, AsyncGenerator, Coroutine, List import itertools import glob +import time -try: - import lc3 # type: ignore # pylint: disable=E0401 -except ImportError as e: - raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e +import lc3 # type: ignore # pylint: disable=E0401 from bumble.colors import color from bumble import company_ids @@ -54,7 +52,58 @@ from auracast.utils.network_audio_receiver import NetworkAudioReceiverUncoded from auracast.utils.webrtc_audio_input import WebRTCAudioInput -# Instantiate WebRTC audio input for streaming (can be used per-BIG or globally) +# Patch sounddevice.InputStream globally to use low-latency settings +import sounddevice as sd + +class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput): + """Patched SoundDeviceAudioInput that creates RawInputStream with low-latency parameters.""" + + def _open(self): + """Patched _open method that creates RawInputStream with low-latency parameters.""" + try: + dev_info = sd.query_devices(self._device) + hostapis = sd.query_hostapis() + api_index = dev_info.get('hostapi') + api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown' + pa_ver = None + try: + pa_ver = sd.get_portaudio_version() + except Exception: + pass + logging.info( + "SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s", + api_name, + dev_info.get('name'), + self._device, + dev_info.get('max_input_channels'), + float(dev_info.get('default_low_input_latency') or 0.0), + float(dev_info.get('default_high_input_latency') or 0.0), + pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver, + ) + except Exception as e: + logging.warning("Failed to query sounddevice backend/device info: %s", e) + + # Create RawInputStream with injected low-latency parameters + self._stream = sd.RawInputStream( + samplerate=self._pcm_format.sample_rate, + device=self._device, + channels=self._pcm_format.channels, + dtype='int16', + blocksize=240, # Match frame size + latency=0.010, + ) + self._stream.start() + + logging.info(f"SoundDeviceAudioInput: Opened with blocksize=240, latency=0.010 (10ms)") + + return audio_io.PcmFormat( + audio_io.PcmFormat.Endianness.LITTLE, + audio_io.PcmFormat.SampleType.INT16, + self._pcm_format.sample_rate, + 2, + ) + +audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput # modified from bumble class ModWaveAudioInput(audio_io.ThreadedAudioInput): @@ -313,8 +362,9 @@ async def init_broadcast( bigs[f'big{i}']['iso_queue'] = iso_queue - logging.info(f'big{i} parameters are:') - logging.info('%s', pprint.pformat(vars(big))) + if global_config.debug: + logging.info(f'big{i} parameters are:') + logging.info('%s', pprint.pformat(vars(big))) logging.info(f'Finished setup of big{i}.') await asyncio.sleep(i+1) # Wait for advertising to set up @@ -619,10 +669,40 @@ class Streamer(): big['encoder'] = encoder big['precoded'] = False - logging.info("Streaming audio...") bigs = self.bigs self.is_streaming = True + + # frame drop algo parameters + # In demo/precoded modes there may be no audio_input or no _pcm_format yet + ai = big.get('audio_input') + if ai is not None and hasattr(ai, '_pcm_format') and getattr(ai, '_pcm_format') is not None: + sample_rate = ai._pcm_format.sample_rate + else: + sample_rate = global_config.auracast_sampling_rate_hz + samples_discarded_total = 0 # Total samples discarded + discard_events = 0 # Number of times we discarded samples + frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop) + enable_drift_compensation = getattr(global_config, 'enable_adaptive_frame_dropping', False) + # Hardcoded parameters (unit: milliseconds) + drift_threshold_ms = 2.0 if enable_drift_compensation else 0.0 + static_drop_ms = 1 if enable_drift_compensation else 0.0 + # Guard interval measured in LC3 frames (10 ms each); 50 => 500 ms cooldown + discard_guard_frames = int(2*sample_rate / 1000) if enable_drift_compensation else 0 + # Derived sample counts + drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0) + static_drop_samples = int(sample_rate * static_drop_ms / 1000.0) + + if enable_drift_compensation: + logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms, guard={discard_guard_frames} frames") + else: + logging.info("Clock drift compensation DISABLED") + + # Periodic monitoring + last_stats_log = time.perf_counter() + stats_interval = 5.0 # Log stats every 5 seconds + frame_count = 0 + # One streamer fits all while self.is_streaming: stream_finished = [False for _ in range(len(bigs))] @@ -641,12 +721,48 @@ class Streamer(): if frames_gen is None: frames_gen = big['audio_input'].frames(big['lc3_frame_samples']) big['frames_gen'] = frames_gen + + # Read the frame we need for encoding pcm_frame = await anext(frames_gen, None) if pcm_frame is None: # Not all streams may stop at the same time stream_finished[i] = True continue - + + # Discard excess samples in buffer if above threshold (clock drift compensation) + if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + sd_buffer_samples = big['audio_input']._stream.read_available + + # Guard: only allow discard if enough frames have passed since last discard + if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames: + # Always drop a static amount (3ms) for predictable behavior + # This matches the crossfade duration better for smoother transitions + samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1)) + try: + discarded_data = await anext(big['audio_input'].frames(samples_to_drop)) + samples_discarded_total += samples_to_drop + discard_events += 1 + + # Log every discard event with timing information + sample_rate = big['audio_input']._pcm_format.sample_rate + time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms + logging.info( + f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | " + f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | " + f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | " + f"frame={frame_count}" + ) + + # Reset guard counter + frames_since_last_discard = 0 + # Store how much we dropped for potential adaptive crossfade + big['last_drop_samples'] = samples_to_drop + # Set flag to apply crossfade on next frame + big['apply_crossfade'] = True + + except Exception as e: + logging.error(f"Failed to discard samples: {e}") + # Down-mix multi-channel PCM to mono for LC3 encoder if needed if big.get('channels', 1) > 1: if isinstance(pcm_frame, np.ndarray): @@ -659,12 +775,149 @@ class Streamer(): samples = np.frombuffer(pcm_frame, dtype=dtype) samples = samples.reshape(-1, big['channels']).mean(axis=1) pcm_frame = samples.astype(dtype).tobytes() + + # Apply crossfade if samples were just dropped (drift compensation) + if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None: + # Crossfade duration: 10ms for smoother transition (was 5ms) + dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32 + sample_rate = big['audio_input']._pcm_format.sample_rate + crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2) + + # Convert frames to numpy arrays (make writable copies) + prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy() + curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy() + + # Create equal-power crossfade curves (smoother than linear) + # Equal-power maintains perceived loudness during transition + t = np.linspace(0, 1, crossfade_samples) + fade_out = np.cos(t * np.pi / 2) # Cosine fade out + fade_in = np.sin(t * np.pi / 2) # Sine fade in + + # Apply crossfade to the beginning of current frame with end of previous frame + if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples: + crossfaded = ( + prev_samples[-crossfade_samples:] * fade_out + + curr_samples[:crossfade_samples] * fade_in + ).astype(dtype) + # Replace beginning of current frame with crossfaded section + curr_samples[:crossfade_samples] = crossfaded + pcm_frame = curr_samples.tobytes() + + big['apply_crossfade'] = False + + # Store current frame for potential next crossfade + if enable_drift_compensation: + big['prev_pcm_frame'] = pcm_frame lc3_frame = big['encoder'].encode( pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'] ) await big['iso_queue'].write(lc3_frame) + frame_count += 1 + # Increment guard counter (tracks frames since last discard) + frames_since_last_discard += 1 + + # Periodic stats logging (only for device/sounddevice streams, not WAV files) + # WAV file concurrent access causes deadlock in ThreadedAudioInput + now = time.perf_counter() + is_device_stream = hasattr(big['audio_input'], '_stream') and big['audio_input']._stream is not None + if is_device_stream and now - last_stats_log >= stats_interval: + # Get current buffer status from PortAudio + current_sd_buffer = 0 + if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + try: + current_sd_buffer = big['audio_input']._stream.read_available + except Exception: + pass + + # Get stream latency and CPU load from sounddevice + stream_latency_ms = None + cpu_load_pct = None + if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + try: + latency = big['audio_input']._stream.latency + if frame_count == 501: # Debug log once + logging.info(f"DEBUG: stream.latency raw value = {latency}, type = {type(latency)}") + # latency can be either a float (for input-only streams) or tuple (input, output) + if latency is not None: + if isinstance(latency, (int, float)): + # Single value for input-only stream + stream_latency_ms = float(latency) * 1000.0 + elif isinstance(latency, (tuple, list)) and len(latency) >= 1: + # Tuple (input_latency, output_latency) + stream_latency_ms = latency[0] * 1000.0 + except Exception as e: + if frame_count == 501: # Log once at startup + logging.warning(f"Could not get stream.latency: {e}") + + try: + cpu_load = big['audio_input']._stream.cpu_load + if frame_count == 501: # Debug log once + logging.info(f"DEBUG: stream.cpu_load raw value = {cpu_load}") + # cpu_load is a fraction (0.0 to 1.0) + if cpu_load is not None and cpu_load >= 0: + cpu_load_pct = cpu_load * 100.0 # Convert to percentage + except Exception as e: + if frame_count == 501: # Log once at startup + logging.warning(f"Could not get stream.cpu_load: {e}") + + # Get backend-specific buffer status + backend_delay = None + backend_label = "Backend" + + # Determine which backend we're using based on audio_input device + try: + device_info = big['audio_input']._device if hasattr(big['audio_input'], '_device') else None + if device_info is not None and isinstance(device_info, int): + hostapi = sd.query_hostapis(sd.query_devices(device_info)['hostapi']) + backend_name = hostapi['name'] + else: + backend_name = "Unknown" + except Exception: + backend_name = "Unknown" + + if 'pulse' in backend_name.lower(): + # PipeWire/PulseAudio backend - no direct buffer access + # SD_buffer is the only reliable metric + backend_label = "PipeWire" + backend_delay = None # Cannot read PipeWire internal buffers directly + else: + # ALSA backend - can read kernel buffer + backend_label = "ALSA_kernel" + try: + with open('/proc/asound/card0/pcm0c/sub0/status', 'r') as f: + for line in f: + if 'delay' in line and ':' in line: + backend_delay = int(line.split(':')[1].strip()) + break + except Exception: + pass + + if enable_drift_compensation: + avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0 + discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0 + latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A" + cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A" + logging.info( + f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | " + f"avg_discard={avg_discard_per_event:.0f} samples/event | " + f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"{latency_str} | {cpu_str} | " + f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" + ) + else: + backend_str = f"{backend_label}={backend_delay} samples ({backend_delay / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" if backend_delay is not None else f"{backend_label}=N/A (use pw-top)" + latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A" + cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A" + logging.info( + f"STATS: frames={frame_count} | " + f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"{latency_str} | {cpu_str} | " + f"{backend_str} | " + f"drift_compensation=DISABLED" + ) + last_stats_log = now if all(stream_finished): # Take into account that multiple files have different lengths logging.info('All streams finished, stopping streamer') @@ -718,6 +971,107 @@ if __name__ == "__main__": ) os.chdir(os.path.dirname(__file__)) + # ============================================================================= + # AUDIO BACKEND CONFIGURATION - Toggle between ALSA and PipeWire + # ============================================================================= + # Uncomment ONE of the following backend configurations: + + # Option 1: Direct ALSA (Direct hardware access, bypasses PipeWire) + AUDIO_BACKEND = 'ALSA' + target_latency_ms = 10.0 + + # Option 2: PipeWire via PulseAudio API (Routes through pipewire-pulse) + #AUDIO_BACKEND = 'PipeWire' + #target_latency_ms = 5.0 # PipeWire typically handles lower latency better + + # ============================================================================= + + import sounddevice as sd + import subprocess + + # Detect if PipeWire is running (even if we're using ALSA API) + pipewire_running = False + try: + result = subprocess.run(['systemctl', '--user', 'is-active', 'pipewire'], + capture_output=True, text=True, timeout=1) + pipewire_running = (result.returncode == 0) + except Exception: + pass + + if AUDIO_BACKEND == 'ALSA': + os.environ['SDL_AUDIODRIVER'] = 'alsa' + sd.default.latency = target_latency_ms / 1000.0 + + # Find ALSA host API + try: + alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis()) + if 'ALSA' in ha['name']) + logging.info(f"ALSA host API available at index: {alsa_hostapi}") + except StopIteration: + logging.error("ALSA backend not found!") + + elif AUDIO_BACKEND == 'PipeWire': + os.environ['SDL_AUDIODRIVER'] = 'pulseaudio' + sd.default.latency = target_latency_ms / 1000.0 + + if not pipewire_running: + logging.error("PipeWire selected but not running!") + raise RuntimeError("PipeWire is not active") + + # Find PulseAudio host API (required for PipeWire mode) + try: + pulse_hostapi = next(i for i, ha in enumerate(sd.query_hostapis()) + if 'pulse' in ha['name'].lower()) + logging.info(f"Using PulseAudio host API at index: {pulse_hostapi} → routes to PipeWire") + except StopIteration: + logging.error("PulseAudio host API not found! Did you rebuild PortAudio with -DPA_USE_PULSEAUDIO=ON?") + raise RuntimeError("PulseAudio API not available in PortAudio") + else: + logging.error(f"Unknown AUDIO_BACKEND: {AUDIO_BACKEND}") + raise ValueError(f"Invalid AUDIO_BACKEND: {AUDIO_BACKEND}") + + # Select audio input device based on backend + shure_device_idx = None + + if AUDIO_BACKEND == 'ALSA': + # Use ALSA devices + from auracast.utils.sounddevice_utils import get_alsa_usb_inputs + devices = get_alsa_usb_inputs() + logging.info("Searching ALSA devices for Shure MVX2U...") + + for idx, dev in devices: + logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)") + if 'shure' in dev['name'].lower() and 'mvx2u' in dev['name'].lower(): + shure_device_idx = idx + logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}") + break + + elif AUDIO_BACKEND == 'PipeWire': + # Use PulseAudio devices (routed through PipeWire) + logging.info("Searching PulseAudio devices for Shure MVX2U...") + + for idx, dev in enumerate(sd.query_devices()): + # Only consider PulseAudio input devices + if dev['max_input_channels'] > 0: + hostapi = sd.query_hostapis(dev['hostapi']) + if 'pulse' in hostapi['name'].lower(): + dev_name_lower = dev['name'].lower() + logging.info(f" PulseAudio device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)") + + # Skip monitor devices (they're output monitors, not real inputs) + if 'monitor' in dev_name_lower: + continue + + # Look for Shure MVX2U - prefer "Mono" device for mono input + if 'shure' in dev_name_lower and 'mvx2u' in dev_name_lower: + shure_device_idx = idx + logging.info(f"✓ Selected PulseAudio device {idx}: {dev['name']} → routes to PipeWire") + break + + if shure_device_idx is None: + logging.error(f"Shure MVX2U not found in {AUDIO_BACKEND} devices!") + raise RuntimeError(f"Audio device not found for {AUDIO_BACKEND} backend") + config = auracast_config.AuracastConfigGroup( bigs = [ auracast_config.AuracastBigConfigDeu(), @@ -748,6 +1102,16 @@ if __name__ == "__main__": #big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files #big.audio_source = read_lc3_file(big.audio_source) # load files in advance + # --- Configure Shure MVX2U USB Audio Interface (ALSA backend) --- + if shure_device_idx is not None: + big.audio_source = f'device:{shure_device_idx}' # Shure MVX2U USB mono interface + big.input_format = 'int16le,48000,1' # int16, 48kHz, mono + logging.info(f"Configured BIG '{big.name}' with Shure MVX2U (device:{shure_device_idx}, 48kHz mono)") + else: + logging.warning(f"Shure device not found, BIG '{big.name}' will use default audio_source: {big.audio_source}") + + big.name='Broadcast0' + big.iso_que_len=1 # --- Network_uncoded mode using NetworkAudioReceiver --- #big.audio_source = NetworkAudioReceiverUncoded(port=50007, samplerate=16000, channels=1, chunk_size=1024) @@ -760,6 +1124,9 @@ if __name__ == "__main__": config.auracast_sampling_rate_hz = 16000 config.octets_per_frame = 40 # 32kbps@16kHz #config.debug = True + + # Enable clock drift compensation to prevent latency accumulation + # With ~43 samples/sec drift (0.89ms/sec), threshold of 2ms will trigger every ~2.2 seconds run_async( broadcast( diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 553cebe..01c4fdd 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -5,19 +5,13 @@ multicast_script Loads environment variables from a .env file located next to this script and configures the multicast broadcast. Only UPPERCASE keys are read. +This version uses ALSA backend for USB audio devices (no PipeWire required). + Environment variables --------------------- - LOG_LEVEL: Logging level for the script. Default: INFO. Examples: DEBUG, INFO, WARNING, ERROR. -- INPUT: Select audio capture source. - Values: - - "usb" (default): first available USB input device. - - "aes67": select AES67 inputs. Two forms: - * INPUT=aes67 -> first available AES67 input. - * INPUT=aes67, -> case-insensitive substring match against - the device name, e.g. INPUT=aes67,8f6326. - - BROADCAST_NAME: Name of the broadcast (Auracast BIG name). Default: "Broadcast0". @@ -27,17 +21,13 @@ Environment variables - LANGUATE: ISO 639-3 language code used by config (intentional key name). Default: "deu". -- PULSE_LATENCY_MSEC: Pulse/PipeWire latency hint in milliseconds. - Default: 3. Examples (.env) --------------- LOG_LEVEL=DEBUG -INPUT=aes67,8f6326 BROADCAST_NAME=MyBroadcast PROGRAM_INFO="Live announcements" -LANGUATE=deu -""" +LANGUATE=deu""" import logging import os import time @@ -45,9 +35,10 @@ from dotenv import load_dotenv from auracast import multicast from auracast import auracast_config from auracast.utils.sounddevice_utils import ( - get_usb_pw_inputs, - get_network_pw_inputs, + get_alsa_usb_inputs, + get_network_pw_inputs, # PipeWire network (AES67) inputs refresh_pw_cache, + resolve_input_device_index, ) @@ -60,65 +51,81 @@ if __name__ == "__main__": os.chdir(os.path.dirname(__file__)) # Load .env located next to this script (only uppercase keys will be referenced) load_dotenv(dotenv_path='.env') - - os.environ.setdefault("PULSE_LATENCY_MSEC", "3") - # Refresh device cache and list inputs - refresh_pw_cache() - usb_inputs = get_usb_pw_inputs() - logging.info("USB pw inputs:") + # Refresh PipeWire cache and list devices (Network only for PW; USB via ALSA) + try: + refresh_pw_cache() + except Exception: + pass + pw_net = get_network_pw_inputs() + # List PipeWire Network inputs + logging.info("PipeWire Network inputs:") + for i, d in pw_net: + logging.info(f"{i}: {d['name']} in={d.get('max_input_channels', 0)}") + + # Also list USB ALSA inputs (fallback path) + usb_inputs = get_alsa_usb_inputs() + logging.info("USB ALSA inputs:") for i, d in usb_inputs: logging.info(f"{i}: {d['name']} in={d['max_input_channels']}") - - aes67_inputs = get_network_pw_inputs() - logging.info("AES67 pw inputs:") - for i, d in aes67_inputs: - logging.info(f"{i}: {d['name']} in={d['max_input_channels']}") - # Input selection (usb | aes67). Default to usb. - # Allows specifying an AES67 device by substring: INPUT=aes67, - # Example: INPUT=aes67,8f6326 will match a device name containing "8f6326". - input_env = os.environ.get('INPUT', 'usb') or 'usb' - parts = [p.strip() for p in input_env.split(',', 1)] - input_mode = (parts[0] or 'usb').lower() - iface_substr = (parts[1].lower() if len(parts) > 1 and parts[1] else None) + # Optional device selection via .env: match by string or substring (uppercase keys only) + device_match = os.environ.get('INPUT_DEVICE') + device_match = device_match.strip() if isinstance(device_match, str) else None + # Loop until a device becomes available (prefer PW Network, then ALSA USB) selected_dev = None - if input_mode == 'aes67': - if not aes67_inputs and not iface_substr: - # No AES67 inputs and no specific target -> fail fast - raise RuntimeError("No AES67 audio inputs found.") - if iface_substr: - # Loop until a matching AES67 input becomes available - while True: - refresh_pw_cache() - current = get_network_pw_inputs() - sel = next(((i, d) for i, d in current if iface_substr in (d.get('name','').lower())), None) - if sel: - input_sel = sel[0] - selected_dev = sel[1] - logging.info(f"Selected AES67 input by match '{iface_substr}': index={input_sel}") - break - logging.info(f"Waiting for AES67 input matching '{iface_substr}'... retrying in 2s") - time.sleep(2) - else: - input_sel, selected_dev = aes67_inputs[0] - logging.info(f"Selected first AES67 input: index={input_sel}, device={selected_dev['name']}") - else: - # Loop until a USB input becomes available (mirror AES67 retry behavior) - while True: + while True: + try: refresh_pw_cache() - current = get_usb_pw_inputs() - if current: - input_sel, selected_dev = current[0] - logging.info(f"Selected first USB input: index={input_sel}, device={selected_dev['name']}") + except Exception: + pass + pw_net = get_network_pw_inputs() + # 1) Try to satisfy explicit .env match on PW Network + if device_match and pw_net: + for _, d in pw_net: + name = d.get('name', '') + if device_match in name: + idx = resolve_input_device_index(name) + if idx is not None: + input_sel = idx + selected_dev = d + logging.info(f"Selected Network input by match '{device_match}' (PipeWire): index={input_sel}, device={name}") + break + if selected_dev is not None: break - logging.info("Waiting for USB input... retrying in 2s") - time.sleep(2) + if pw_net and selected_dev is None: + _, d0 = pw_net[0] + idx = resolve_input_device_index(d0.get('name', '')) + if idx is not None: + input_sel = idx + selected_dev = d0 + logging.info(f"Selected first Network input (PipeWire): index={input_sel}, device={d0['name']}") + break + current_alsa = get_alsa_usb_inputs() + # 2) Try to satisfy explicit .env match on ALSA USB + if device_match and current_alsa and selected_dev is None: + matched = None + for idx_alsa, d in current_alsa: + name = d.get('name', '') + if device_match in name: + matched = (idx_alsa, d) + break + if matched is not None: + input_sel, selected_dev = matched + logging.info(f"Selected USB input by match '{device_match}' (ALSA): index={input_sel}, device={selected_dev['name']}") + break + # Fallback to first ALSA USB + if current_alsa and selected_dev is None: + input_sel, selected_dev = current_alsa[0] + logging.info(f"Selected first USB input (ALSA): index={input_sel}, device={selected_dev['name']}") + break + logging.info("Waiting for audio input (prefer PW Network, then ALSA USB)... retrying in 2s") + time.sleep(2) TRANSPORT1 = 'serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi gpio header TRANSPORT2 = 'serial:/dev/ttyAMA4,1000000,rtscts' # transport for raspberry pi gpio header - # Capture at 48 kHz to avoid PipeWire resampler latency; encode LC3 at 24 kHz + # Capture at 48 kHz to avoid resampler latency; encode LC3 at 24 kHz CAPTURE_SRATE = 48000 LC3_SRATE = 24000 OCTETS_PER_FRAME=60 @@ -150,14 +157,16 @@ if __name__ == "__main__": ), #auracast_config.AuracastBigConfigEng(), ], - immediate_rendering=True, + immediate_rendering=False, presentation_delay_us=40000, qos_config=auracast_config.AuracastQosHigh(), auracast_sampling_rate_hz = LC3_SRATE, octets_per_frame = OCTETS_PER_FRAME, - transport=TRANSPORT1 + transport=TRANSPORT1, + enable_adaptive_frame_dropping=True, ) - #config.debug = True + config.debug = False + logging.info(config.model_dump_json(indent=2)) multicast.run_async( diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index 7b78b75..3a8dae7 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -170,11 +170,29 @@ if audio_mode == "Demo": "6 × 16kHz": {"quality": "Fair (16kHz)", "streams": 6}, } demo_options = list(demo_stream_map.keys()) - default_demo = demo_options[0] + default_index = 0 + saved_type = saved_settings.get('demo_stream_type') + if isinstance(saved_type, str) and saved_type in demo_options: + default_index = demo_options.index(saved_type) + else: + saved_rate = saved_settings.get('auracast_sampling_rate_hz') + saved_total = saved_settings.get('demo_total_streams') + if saved_total is None and saved_settings.get('audio_mode') == 'Demo': + saved_total = len(saved_settings.get('channel_names') or []) + try: + if saved_rate and saved_total: + for i, label in enumerate(demo_options): + cfg = demo_stream_map[label] + rate_for_label = QUALITY_MAP[cfg['quality']]['rate'] + if cfg['streams'] == int(saved_total) and int(saved_rate) == rate_for_label: + default_index = i + break + except Exception: + default_index = 0 demo_selected = st.selectbox( "Demo Stream Type", demo_options, - index=0, + index=default_index, help="Select the demo stream configuration." ) # Stream password and flags (same as USB/AES67) diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 96508a8..7692e22 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -19,8 +19,9 @@ from auracast import multicast_control, auracast_config import sounddevice as sd # type: ignore import traceback from auracast.utils.sounddevice_utils import ( - get_usb_pw_inputs, get_network_pw_inputs, + get_alsa_usb_inputs, + resolve_input_device_index, refresh_pw_cache, ) from auracast.utils.reset_utils import reset_nrf54l @@ -32,8 +33,6 @@ STREAM_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), 'stream_settings. TRANSPORT1 = os.getenv('TRANSPORT1', 'serial:/dev/ttyAMA3,1000000,rtscts') # transport for raspberry pi gpio header TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # transport for raspberry pi gpio header - - os.environ["PULSE_LATENCY_MSEC"] = "3" # In-memory cache to avoid disk I/O on hot paths like /status @@ -105,7 +104,7 @@ app.add_middleware( # Initialize global configuration global_config_group = auracast_config.AuracastConfigGroup() -class StreamerWorker: +class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? """Owns multicaster(s) on a dedicated asyncio loop in a background thread.""" def __init__(self) -> None: @@ -162,7 +161,16 @@ class StreamerWorker: pass self._multicaster1 = None + # overwrite some configurations conf.transport = TRANSPORT1 + # Enable adaptive frame dropping only for device-based inputs (not file/demo) + try: + conf.enable_adaptive_frame_dropping = any( + isinstance(big.audio_source, str) and big.audio_source.startswith('device:') + for big in conf.bigs + ) + except Exception: + conf.enable_adaptive_frame_dropping = False # Derive device name and input mode first_source = conf.bigs[0].audio_source if conf.bigs else '' input_device_name = None @@ -170,21 +178,28 @@ class StreamerWorker: if first_source.startswith('device:'): input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None try: - usb_names = {d.get('name') for _, d in get_usb_pw_inputs()} + alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} + except Exception: + alsa_usb_names = set() + try: net_names = {d.get('name') for _, d in get_network_pw_inputs()} except Exception: usb_names, net_names = set(), set() audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB' - # Map device name to index and configure input_format - device_index = int(input_device_name) if (input_device_name and input_device_name.isdigit()) else get_device_index_by_name(input_device_name or '') + # Map device name to index using centralized resolver + if input_device_name and input_device_name.isdigit(): + device_index = int(input_device_name) + else: + device_index = resolve_input_device_index(input_device_name or '') if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.") for big in conf.bigs: if big.audio_source.startswith('device:'): big.audio_source = f'device:{device_index}' devinfo = sd.query_devices(device_index) - capture_rate = int(devinfo.get('default_samplerate') or 48000) + # Force capture at 48 kHz to avoid resampler latency and 44.1 kHz incompatibilities + capture_rate = 48000 max_in = int(devinfo.get('max_input_channels') or 1) channels = max(1, min(2, max_in)) for big in conf.bigs: @@ -204,6 +219,14 @@ class StreamerWorker: auto_started = True # Return proposed settings to persist on API side + demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:')) + demo_rate = int(conf.auracast_sampling_rate_hz or 0) + demo_type = None + if demo_count > 0 and demo_rate > 0: + if demo_rate in (48000, 24000, 16000): + demo_type = f"{demo_count} × {demo_rate//1000}kHz" + else: + demo_type = f"{demo_count} × {demo_rate}Hz" return { 'channel_names': [big.name for big in conf.bigs], 'languages': [big.language for big in conf.bigs], @@ -218,6 +241,8 @@ class StreamerWorker: 'immediate_rendering': getattr(conf, 'immediate_rendering', False), 'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False), 'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None), + 'demo_total_streams': demo_count, + 'demo_stream_type': demo_type, 'is_streaming': auto_started, } @@ -230,10 +255,27 @@ class StreamerWorker: self._multicaster2 = None conf.transport = TRANSPORT2 + # Enable adaptive frame dropping only for device-based inputs (not file/demo) + try: + conf.enable_adaptive_frame_dropping = any( + isinstance(big.audio_source, str) and big.audio_source.startswith('device:') + for big in conf.bigs + ) + except Exception: + conf.enable_adaptive_frame_dropping = False for big in conf.bigs: if big.audio_source.startswith('device:'): device_name = big.audio_source.split(':', 1)[1] - device_index = get_device_index_by_name(device_name) + # Resolve backend preference by membership + try: + net_names = {d.get('name') for _, d in get_network_pw_inputs()} + except Exception: + net_names = set() + try: + alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} + except Exception: + alsa_usb_names = set() + device_index = resolve_input_device_index(device_name) if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.") big.audio_source = f'device:{device_index}' @@ -313,6 +355,24 @@ async def initialize2(conf: auracast_config.AuracastConfigGroup): try: log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) await streamer.call(streamer._w_init_secondary, conf) + try: + is_demo = any(isinstance(big.audio_source, str) and big.audio_source.startswith('file:') for big in conf.bigs) + if is_demo: + settings = load_stream_settings() or {} + primary_count = int(settings.get('demo_total_streams') or len(settings.get('channel_names') or [])) + secondary_count = len(conf.bigs or []) + total = primary_count + secondary_count + settings['demo_total_streams'] = total + demo_rate = int(conf.auracast_sampling_rate_hz or 0) + if demo_rate > 0: + if demo_rate in (48000, 24000, 16000): + settings['demo_stream_type'] = f"{total} × {demo_rate//1000}kHz" + else: + settings['demo_stream_type'] = f"{total} × {demo_rate}Hz" + settings['timestamp'] = datetime.utcnow().isoformat() + save_stream_settings(settings) + except Exception: + log.warning("Failed to persist demo_total_streams in /init2", exc_info=True) except Exception as e: log.error("Exception in /init2: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @@ -415,7 +475,7 @@ async def _autostart_from_settings(): ): return # Check against the cached device lists - usb = [d for _, d in get_usb_pw_inputs()] + usb = [d for _, d in get_alsa_usb_inputs()] net = [d for _, d in get_network_pw_inputs()] names = {d.get('name') for d in usb} | {d.get('name') for d in net} if input_device_name in names: @@ -471,11 +531,11 @@ async def _startup_autostart_event(): @app.get("/audio_inputs_pw_usb") async def audio_inputs_pw_usb(): - """List PipeWire USB input nodes from cache.""" + """List USB input devices using ALSA backend (USB is ALSA in our scheme).""" try: devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} - for idx, dev in get_usb_pw_inputs() + for idx, dev in get_alsa_usb_inputs() ] return {"inputs": devices} except Exception as e: diff --git a/src/auracast/utils/sounddevice_utils.py b/src/auracast/utils/sounddevice_utils.py index 7db59a4..9d7c1f8 100644 --- a/src/auracast/utils/sounddevice_utils.py +++ b/src/auracast/utils/sounddevice_utils.py @@ -23,7 +23,11 @@ def _pa_like_hostapi_index(): raise RuntimeError("PipeWire/PulseAudio host API not present in PortAudio.") def _pw_dump(): - return json.loads(subprocess.check_output(["pw-dump"])) + try: + return json.loads(subprocess.check_output(["pw-dump"])) + except (FileNotFoundError, subprocess.CalledProcessError): + # PipeWire not available + return [] def _sd_refresh(): """Force PortAudio to re-enumerate devices on next query. @@ -49,6 +53,74 @@ def _sd_matches_from_names(pa_idx, names): out.append((i, d)) return out +def get_device_index_by_name(name: str, backend: str | None = None): + """Return the sounddevice index for an input device with exact name. + + If backend is provided, restrict to that backend: + - 'ALSA' uses the ALSA host API + - 'PipeWire' or 'PulseAudio' use the PipeWire/Pulse host API index + Returns None if not found. + """ + try: + devices = sd.query_devices() + hostapi_filter = None + if backend: + if backend.lower() == 'alsa': + try: + alsa = devices_by_backend('ALSA') + alsa_indices = {idx for idx, _ in alsa} + hostapi_filter = ('indices', alsa_indices) + except Exception: + return None + else: + try: + pa_idx = _pa_like_hostapi_index() + hostapi_filter = ('hostapi', pa_idx) + except Exception: + return None + for idx, d in enumerate(devices): + if d.get('max_input_channels', 0) <= 0: + continue + if d.get('name') != name: + continue + if hostapi_filter: + kind, val = hostapi_filter + if kind == 'indices' and idx not in val: + continue + if kind == 'hostapi' and d.get('hostapi') != val: + continue + return idx + except Exception: + return None + return None + +def resolve_input_device_index(name: str): + """Resolve device index by exact name preferring backend by device type. + + - If name is known PipeWire Network device, use PipeWire backend. + - Else if name is known ALSA USB device, use ALSA backend. + - Else fallback to any backend. + Returns None if not found. + """ + try: + net_names = {d.get('name') for _, d in get_network_pw_inputs()} + except Exception: + net_names = set() + try: + alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} + except Exception: + alsa_usb_names = set() + + if name in net_names: + idx = get_device_index_by_name(name, backend='PipeWire') + if idx is not None: + return idx + if name in alsa_usb_names: + idx = get_device_index_by_name(name, backend='ALSA') + if idx is not None: + return idx + return get_device_index_by_name(name, backend=None) + # Module-level caches for device lists _usb_inputs_cache = [] _network_inputs_cache = [] @@ -66,12 +138,22 @@ def refresh_pw_cache(): Performs a full device scan and updates the internal caches for both USB and Network audio devices. This is a heavy operation and should not be called frequently or during active streams. + + If PipeWire is not available, caches will remain empty. """ global _usb_inputs_cache, _network_inputs_cache # Force PortAudio to re-enumerate devices _sd_refresh() - pa_idx = _pa_like_hostapi_index() + + try: + pa_idx = _pa_like_hostapi_index() + except RuntimeError: + # PipeWire/PulseAudio not available - reset caches and return + _usb_inputs_cache = [] + _network_inputs_cache = [] + return + pw = _pw_dump() # --- Pass 1: Map device.id to device.bus --- @@ -129,6 +211,53 @@ def refresh_pw_cache(): _network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names) +def get_alsa_usb_inputs(): + """ + Return USB audio input devices using the ALSA backend. + Filters for devices that appear to be USB hardware (hw:X,Y pattern or USB in name). + Returns list of (index, device_dict) tuples. + """ + try: + alsa_devices = devices_by_backend('ALSA') + except ValueError: + # ALSA backend not available + return [] + + usb_inputs = [] + for idx, dev in alsa_devices: + # Only include input devices + if dev.get('max_input_channels', 0) <= 0: + continue + + name = dev.get('name', '').lower() + # Filter for USB devices based on common patterns: + # - Contains 'usb' in the name + # - hw:X,Y pattern (ALSA hardware devices) + # Exclude: default, dmix, pulse, pipewire, sysdefault + if any(exclude in name for exclude in ['default', 'dmix', 'pulse', 'pipewire', 'sysdefault']): + continue + + # Include if it has 'usb' in name or matches hw:X pattern + if 'usb' in name or re.match(r'hw:\d+', name): + usb_inputs.append((idx, dev)) + + return usb_inputs + +def get_alsa_inputs(): + """ + Return all ALSA audio input devices. + Returns list of (index, device_dict) tuples. + """ + try: + alsa_devices = devices_by_backend('ALSA') + except ValueError: + # ALSA backend not available + return [] + + return [(idx, dev) for idx, dev in alsa_devices + if dev.get('max_input_channels', 0) > 0] + + # Populate cache on initial module load refresh_pw_cache() diff --git a/src/scripts/list_pw_nodes.py b/src/scripts/list_pw_nodes.py index 7c9bc85..270eb6a 100644 --- a/src/scripts/list_pw_nodes.py +++ b/src/scripts/list_pw_nodes.py @@ -1,6 +1,5 @@ import sounddevice as sd, pprint -from auracast.utils.sounddevice_utils import devices_by_backend, list_usb_pw_inputs, list_network_pw_inputs - +from auracast.utils.sounddevice_utils import devices_by_backend print("PortAudio library:", sd._libname) print("PortAudio version:", sd.get_portaudio_version()) @@ -15,10 +14,3 @@ for i, d in devices_by_backend("PulseAudio"): print(f"{i}: {d['name']} in={d['max_input_channels']} out={d['max_output_channels']}") -print("Network pw inputs:") -for i, d in list_network_pw_inputs(): - print(f"{i}: {d['name']} in={d['max_input_channels']}") - -print("USB pw inputs:") -for i, d in list_usb_pw_inputs(): - print(f"{i}: {d['name']} in={d['max_input_channels']}")