Implement adaptive frame dropping (#10)

- Implement adaptive frame dropping to prevent latency from accumulating
- small packets are dropped and a crossfade is used to hide the dropping.
- still audible in some situations

Co-authored-by: pstruebi <struebin.patrick.com>
Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/10
This commit was merged in pull request #10.
This commit is contained in:
2025-11-04 17:16:33 +01:00
parent 5a1e1f13ac
commit 98dd00e653
7 changed files with 677 additions and 100 deletions

View File

@@ -40,6 +40,8 @@ class AuracastGlobalConfig(BaseModel):
# so receivers may render earlier than the presentation delay for lower latency.
immediate_rendering: bool = False
assisted_listening_stream: bool = False
# Adaptive frame dropping: discard sub-frame samples when buffer exceeds threshold
enable_adaptive_frame_dropping: bool = False
# "Audio input. "
# "'device' -> use the host's default sound input device, "

View File

@@ -26,11 +26,9 @@ import struct
from typing import cast, Any, AsyncGenerator, Coroutine, List
import itertools
import glob
import time
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
import lc3 # type: ignore # pylint: disable=E0401
from bumble.colors import color
from bumble import company_ids
@@ -54,7 +52,58 @@ from auracast.utils.network_audio_receiver import NetworkAudioReceiverUncoded
from auracast.utils.webrtc_audio_input import WebRTCAudioInput
# Instantiate WebRTC audio input for streaming (can be used per-BIG or globally)
# Patch sounddevice.InputStream globally to use low-latency settings
import sounddevice as sd
class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
"""Patched SoundDeviceAudioInput that creates RawInputStream with low-latency parameters."""
def _open(self):
"""Patched _open method that creates RawInputStream with low-latency parameters."""
try:
dev_info = sd.query_devices(self._device)
hostapis = sd.query_hostapis()
api_index = dev_info.get('hostapi')
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
pa_ver = None
try:
pa_ver = sd.get_portaudio_version()
except Exception:
pass
logging.info(
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
api_name,
dev_info.get('name'),
self._device,
dev_info.get('max_input_channels'),
float(dev_info.get('default_low_input_latency') or 0.0),
float(dev_info.get('default_high_input_latency') or 0.0),
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
)
except Exception as e:
logging.warning("Failed to query sounddevice backend/device info: %s", e)
# Create RawInputStream with injected low-latency parameters
self._stream = sd.RawInputStream(
samplerate=self._pcm_format.sample_rate,
device=self._device,
channels=self._pcm_format.channels,
dtype='int16',
blocksize=240, # Match frame size
latency=0.010,
)
self._stream.start()
logging.info(f"SoundDeviceAudioInput: Opened with blocksize=240, latency=0.010 (10ms)")
return audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
audio_io.PcmFormat.SampleType.INT16,
self._pcm_format.sample_rate,
2,
)
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
# modified from bumble
class ModWaveAudioInput(audio_io.ThreadedAudioInput):
@@ -313,8 +362,9 @@ async def init_broadcast(
bigs[f'big{i}']['iso_queue'] = iso_queue
logging.info(f'big{i} parameters are:')
logging.info('%s', pprint.pformat(vars(big)))
if global_config.debug:
logging.info(f'big{i} parameters are:')
logging.info('%s', pprint.pformat(vars(big)))
logging.info(f'Finished setup of big{i}.')
await asyncio.sleep(i+1) # Wait for advertising to set up
@@ -619,10 +669,40 @@ class Streamer():
big['encoder'] = encoder
big['precoded'] = False
logging.info("Streaming audio...")
bigs = self.bigs
self.is_streaming = True
# frame drop algo parameters
# In demo/precoded modes there may be no audio_input or no _pcm_format yet
ai = big.get('audio_input')
if ai is not None and hasattr(ai, '_pcm_format') and getattr(ai, '_pcm_format') is not None:
sample_rate = ai._pcm_format.sample_rate
else:
sample_rate = global_config.auracast_sampling_rate_hz
samples_discarded_total = 0 # Total samples discarded
discard_events = 0 # Number of times we discarded samples
frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop)
enable_drift_compensation = getattr(global_config, 'enable_adaptive_frame_dropping', False)
# Hardcoded parameters (unit: milliseconds)
drift_threshold_ms = 2.0 if enable_drift_compensation else 0.0
static_drop_ms = 1 if enable_drift_compensation else 0.0
# Guard interval measured in LC3 frames (10 ms each); 50 => 500 ms cooldown
discard_guard_frames = int(2*sample_rate / 1000) if enable_drift_compensation else 0
# Derived sample counts
drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0)
static_drop_samples = int(sample_rate * static_drop_ms / 1000.0)
if enable_drift_compensation:
logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms, guard={discard_guard_frames} frames")
else:
logging.info("Clock drift compensation DISABLED")
# Periodic monitoring
last_stats_log = time.perf_counter()
stats_interval = 5.0 # Log stats every 5 seconds
frame_count = 0
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
@@ -641,12 +721,48 @@ class Streamer():
if frames_gen is None:
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Read the frame we need for encoding
pcm_frame = await anext(frames_gen, None)
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Discard excess samples in buffer if above threshold (clock drift compensation)
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
sd_buffer_samples = big['audio_input']._stream.read_available
# Guard: only allow discard if enough frames have passed since last discard
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
# Always drop a static amount (3ms) for predictable behavior
# This matches the crossfade duration better for smoother transitions
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
try:
discarded_data = await anext(big['audio_input'].frames(samples_to_drop))
samples_discarded_total += samples_to_drop
discard_events += 1
# Log every discard event with timing information
sample_rate = big['audio_input']._pcm_format.sample_rate
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
logging.info(
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
f"frame={frame_count}"
)
# Reset guard counter
frames_since_last_discard = 0
# Store how much we dropped for potential adaptive crossfade
big['last_drop_samples'] = samples_to_drop
# Set flag to apply crossfade on next frame
big['apply_crossfade'] = True
except Exception as e:
logging.error(f"Failed to discard samples: {e}")
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
@@ -659,12 +775,149 @@ class Streamer():
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
# Apply crossfade if samples were just dropped (drift compensation)
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
# Crossfade duration: 10ms for smoother transition (was 5ms)
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
sample_rate = big['audio_input']._pcm_format.sample_rate
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
# Convert frames to numpy arrays (make writable copies)
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
# Create equal-power crossfade curves (smoother than linear)
# Equal-power maintains perceived loudness during transition
t = np.linspace(0, 1, crossfade_samples)
fade_out = np.cos(t * np.pi / 2) # Cosine fade out
fade_in = np.sin(t * np.pi / 2) # Sine fade in
# Apply crossfade to the beginning of current frame with end of previous frame
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
crossfaded = (
prev_samples[-crossfade_samples:] * fade_out +
curr_samples[:crossfade_samples] * fade_in
).astype(dtype)
# Replace beginning of current frame with crossfaded section
curr_samples[:crossfade_samples] = crossfaded
pcm_frame = curr_samples.tobytes()
big['apply_crossfade'] = False
# Store current frame for potential next crossfade
if enable_drift_compensation:
big['prev_pcm_frame'] = pcm_frame
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
await big['iso_queue'].write(lc3_frame)
frame_count += 1
# Increment guard counter (tracks frames since last discard)
frames_since_last_discard += 1
# Periodic stats logging (only for device/sounddevice streams, not WAV files)
# WAV file concurrent access causes deadlock in ThreadedAudioInput
now = time.perf_counter()
is_device_stream = hasattr(big['audio_input'], '_stream') and big['audio_input']._stream is not None
if is_device_stream and now - last_stats_log >= stats_interval:
# Get current buffer status from PortAudio
current_sd_buffer = 0
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
try:
current_sd_buffer = big['audio_input']._stream.read_available
except Exception:
pass
# Get stream latency and CPU load from sounddevice
stream_latency_ms = None
cpu_load_pct = None
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
try:
latency = big['audio_input']._stream.latency
if frame_count == 501: # Debug log once
logging.info(f"DEBUG: stream.latency raw value = {latency}, type = {type(latency)}")
# latency can be either a float (for input-only streams) or tuple (input, output)
if latency is not None:
if isinstance(latency, (int, float)):
# Single value for input-only stream
stream_latency_ms = float(latency) * 1000.0
elif isinstance(latency, (tuple, list)) and len(latency) >= 1:
# Tuple (input_latency, output_latency)
stream_latency_ms = latency[0] * 1000.0
except Exception as e:
if frame_count == 501: # Log once at startup
logging.warning(f"Could not get stream.latency: {e}")
try:
cpu_load = big['audio_input']._stream.cpu_load
if frame_count == 501: # Debug log once
logging.info(f"DEBUG: stream.cpu_load raw value = {cpu_load}")
# cpu_load is a fraction (0.0 to 1.0)
if cpu_load is not None and cpu_load >= 0:
cpu_load_pct = cpu_load * 100.0 # Convert to percentage
except Exception as e:
if frame_count == 501: # Log once at startup
logging.warning(f"Could not get stream.cpu_load: {e}")
# Get backend-specific buffer status
backend_delay = None
backend_label = "Backend"
# Determine which backend we're using based on audio_input device
try:
device_info = big['audio_input']._device if hasattr(big['audio_input'], '_device') else None
if device_info is not None and isinstance(device_info, int):
hostapi = sd.query_hostapis(sd.query_devices(device_info)['hostapi'])
backend_name = hostapi['name']
else:
backend_name = "Unknown"
except Exception:
backend_name = "Unknown"
if 'pulse' in backend_name.lower():
# PipeWire/PulseAudio backend - no direct buffer access
# SD_buffer is the only reliable metric
backend_label = "PipeWire"
backend_delay = None # Cannot read PipeWire internal buffers directly
else:
# ALSA backend - can read kernel buffer
backend_label = "ALSA_kernel"
try:
with open('/proc/asound/card0/pcm0c/sub0/status', 'r') as f:
for line in f:
if 'delay' in line and ':' in line:
backend_delay = int(line.split(':')[1].strip())
break
except Exception:
pass
if enable_drift_compensation:
avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0
discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
logging.info(
f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | "
f"avg_discard={avg_discard_per_event:.0f} samples/event | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"{latency_str} | {cpu_str} | "
f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)"
)
else:
backend_str = f"{backend_label}={backend_delay} samples ({backend_delay / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" if backend_delay is not None else f"{backend_label}=N/A (use pw-top)"
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
logging.info(
f"STATS: frames={frame_count} | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"{latency_str} | {cpu_str} | "
f"{backend_str} | "
f"drift_compensation=DISABLED"
)
last_stats_log = now
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')
@@ -718,6 +971,107 @@ if __name__ == "__main__":
)
os.chdir(os.path.dirname(__file__))
# =============================================================================
# AUDIO BACKEND CONFIGURATION - Toggle between ALSA and PipeWire
# =============================================================================
# Uncomment ONE of the following backend configurations:
# Option 1: Direct ALSA (Direct hardware access, bypasses PipeWire)
AUDIO_BACKEND = 'ALSA'
target_latency_ms = 10.0
# Option 2: PipeWire via PulseAudio API (Routes through pipewire-pulse)
#AUDIO_BACKEND = 'PipeWire'
#target_latency_ms = 5.0 # PipeWire typically handles lower latency better
# =============================================================================
import sounddevice as sd
import subprocess
# Detect if PipeWire is running (even if we're using ALSA API)
pipewire_running = False
try:
result = subprocess.run(['systemctl', '--user', 'is-active', 'pipewire'],
capture_output=True, text=True, timeout=1)
pipewire_running = (result.returncode == 0)
except Exception:
pass
if AUDIO_BACKEND == 'ALSA':
os.environ['SDL_AUDIODRIVER'] = 'alsa'
sd.default.latency = target_latency_ms / 1000.0
# Find ALSA host API
try:
alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'ALSA' in ha['name'])
logging.info(f"ALSA host API available at index: {alsa_hostapi}")
except StopIteration:
logging.error("ALSA backend not found!")
elif AUDIO_BACKEND == 'PipeWire':
os.environ['SDL_AUDIODRIVER'] = 'pulseaudio'
sd.default.latency = target_latency_ms / 1000.0
if not pipewire_running:
logging.error("PipeWire selected but not running!")
raise RuntimeError("PipeWire is not active")
# Find PulseAudio host API (required for PipeWire mode)
try:
pulse_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'pulse' in ha['name'].lower())
logging.info(f"Using PulseAudio host API at index: {pulse_hostapi} → routes to PipeWire")
except StopIteration:
logging.error("PulseAudio host API not found! Did you rebuild PortAudio with -DPA_USE_PULSEAUDIO=ON?")
raise RuntimeError("PulseAudio API not available in PortAudio")
else:
logging.error(f"Unknown AUDIO_BACKEND: {AUDIO_BACKEND}")
raise ValueError(f"Invalid AUDIO_BACKEND: {AUDIO_BACKEND}")
# Select audio input device based on backend
shure_device_idx = None
if AUDIO_BACKEND == 'ALSA':
# Use ALSA devices
from auracast.utils.sounddevice_utils import get_alsa_usb_inputs
devices = get_alsa_usb_inputs()
logging.info("Searching ALSA devices for Shure MVX2U...")
for idx, dev in devices:
logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
if 'shure' in dev['name'].lower() and 'mvx2u' in dev['name'].lower():
shure_device_idx = idx
logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}")
break
elif AUDIO_BACKEND == 'PipeWire':
# Use PulseAudio devices (routed through PipeWire)
logging.info("Searching PulseAudio devices for Shure MVX2U...")
for idx, dev in enumerate(sd.query_devices()):
# Only consider PulseAudio input devices
if dev['max_input_channels'] > 0:
hostapi = sd.query_hostapis(dev['hostapi'])
if 'pulse' in hostapi['name'].lower():
dev_name_lower = dev['name'].lower()
logging.info(f" PulseAudio device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
# Skip monitor devices (they're output monitors, not real inputs)
if 'monitor' in dev_name_lower:
continue
# Look for Shure MVX2U - prefer "Mono" device for mono input
if 'shure' in dev_name_lower and 'mvx2u' in dev_name_lower:
shure_device_idx = idx
logging.info(f"✓ Selected PulseAudio device {idx}: {dev['name']} → routes to PipeWire")
break
if shure_device_idx is None:
logging.error(f"Shure MVX2U not found in {AUDIO_BACKEND} devices!")
raise RuntimeError(f"Audio device not found for {AUDIO_BACKEND} backend")
config = auracast_config.AuracastConfigGroup(
bigs = [
auracast_config.AuracastBigConfigDeu(),
@@ -748,6 +1102,16 @@ if __name__ == "__main__":
#big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files
#big.audio_source = read_lc3_file(big.audio_source) # load files in advance
# --- Configure Shure MVX2U USB Audio Interface (ALSA backend) ---
if shure_device_idx is not None:
big.audio_source = f'device:{shure_device_idx}' # Shure MVX2U USB mono interface
big.input_format = 'int16le,48000,1' # int16, 48kHz, mono
logging.info(f"Configured BIG '{big.name}' with Shure MVX2U (device:{shure_device_idx}, 48kHz mono)")
else:
logging.warning(f"Shure device not found, BIG '{big.name}' will use default audio_source: {big.audio_source}")
big.name='Broadcast0'
big.iso_que_len=1
# --- Network_uncoded mode using NetworkAudioReceiver ---
#big.audio_source = NetworkAudioReceiverUncoded(port=50007, samplerate=16000, channels=1, chunk_size=1024)
@@ -760,6 +1124,9 @@ if __name__ == "__main__":
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40 # 32kbps@16kHz
#config.debug = True
# Enable clock drift compensation to prevent latency accumulation
# With ~43 samples/sec drift (0.89ms/sec), threshold of 2ms will trigger every ~2.2 seconds
run_async(
broadcast(

View File

@@ -5,19 +5,13 @@ multicast_script
Loads environment variables from a .env file located next to this script
and configures the multicast broadcast. Only UPPERCASE keys are read.
This version uses ALSA backend for USB audio devices (no PipeWire required).
Environment variables
---------------------
- LOG_LEVEL: Logging level for the script.
Default: INFO. Examples: DEBUG, INFO, WARNING, ERROR.
- INPUT: Select audio capture source.
Values:
- "usb" (default): first available USB input device.
- "aes67": select AES67 inputs. Two forms:
* INPUT=aes67 -> first available AES67 input.
* INPUT=aes67,<substr> -> case-insensitive substring match against
the device name, e.g. INPUT=aes67,8f6326.
- BROADCAST_NAME: Name of the broadcast (Auracast BIG name).
Default: "Broadcast0".
@@ -27,17 +21,13 @@ Environment variables
- LANGUATE: ISO 639-3 language code used by config (intentional key name).
Default: "deu".
- PULSE_LATENCY_MSEC: Pulse/PipeWire latency hint in milliseconds.
Default: 3.
Examples (.env)
---------------
LOG_LEVEL=DEBUG
INPUT=aes67,8f6326
BROADCAST_NAME=MyBroadcast
PROGRAM_INFO="Live announcements"
LANGUATE=deu
"""
LANGUATE=deu"""
import logging
import os
import time
@@ -45,9 +35,10 @@ from dotenv import load_dotenv
from auracast import multicast
from auracast import auracast_config
from auracast.utils.sounddevice_utils import (
get_usb_pw_inputs,
get_network_pw_inputs,
get_alsa_usb_inputs,
get_network_pw_inputs, # PipeWire network (AES67) inputs
refresh_pw_cache,
resolve_input_device_index,
)
@@ -60,65 +51,81 @@ if __name__ == "__main__":
os.chdir(os.path.dirname(__file__))
# Load .env located next to this script (only uppercase keys will be referenced)
load_dotenv(dotenv_path='.env')
os.environ.setdefault("PULSE_LATENCY_MSEC", "3")
# Refresh device cache and list inputs
refresh_pw_cache()
usb_inputs = get_usb_pw_inputs()
logging.info("USB pw inputs:")
# Refresh PipeWire cache and list devices (Network only for PW; USB via ALSA)
try:
refresh_pw_cache()
except Exception:
pass
pw_net = get_network_pw_inputs()
# List PipeWire Network inputs
logging.info("PipeWire Network inputs:")
for i, d in pw_net:
logging.info(f"{i}: {d['name']} in={d.get('max_input_channels', 0)}")
# Also list USB ALSA inputs (fallback path)
usb_inputs = get_alsa_usb_inputs()
logging.info("USB ALSA inputs:")
for i, d in usb_inputs:
logging.info(f"{i}: {d['name']} in={d['max_input_channels']}")
aes67_inputs = get_network_pw_inputs()
logging.info("AES67 pw inputs:")
for i, d in aes67_inputs:
logging.info(f"{i}: {d['name']} in={d['max_input_channels']}")
# Input selection (usb | aes67). Default to usb.
# Allows specifying an AES67 device by substring: INPUT=aes67,<substring>
# Example: INPUT=aes67,8f6326 will match a device name containing "8f6326".
input_env = os.environ.get('INPUT', 'usb') or 'usb'
parts = [p.strip() for p in input_env.split(',', 1)]
input_mode = (parts[0] or 'usb').lower()
iface_substr = (parts[1].lower() if len(parts) > 1 and parts[1] else None)
# Optional device selection via .env: match by string or substring (uppercase keys only)
device_match = os.environ.get('INPUT_DEVICE')
device_match = device_match.strip() if isinstance(device_match, str) else None
# Loop until a device becomes available (prefer PW Network, then ALSA USB)
selected_dev = None
if input_mode == 'aes67':
if not aes67_inputs and not iface_substr:
# No AES67 inputs and no specific target -> fail fast
raise RuntimeError("No AES67 audio inputs found.")
if iface_substr:
# Loop until a matching AES67 input becomes available
while True:
refresh_pw_cache()
current = get_network_pw_inputs()
sel = next(((i, d) for i, d in current if iface_substr in (d.get('name','').lower())), None)
if sel:
input_sel = sel[0]
selected_dev = sel[1]
logging.info(f"Selected AES67 input by match '{iface_substr}': index={input_sel}")
break
logging.info(f"Waiting for AES67 input matching '{iface_substr}'... retrying in 2s")
time.sleep(2)
else:
input_sel, selected_dev = aes67_inputs[0]
logging.info(f"Selected first AES67 input: index={input_sel}, device={selected_dev['name']}")
else:
# Loop until a USB input becomes available (mirror AES67 retry behavior)
while True:
while True:
try:
refresh_pw_cache()
current = get_usb_pw_inputs()
if current:
input_sel, selected_dev = current[0]
logging.info(f"Selected first USB input: index={input_sel}, device={selected_dev['name']}")
except Exception:
pass
pw_net = get_network_pw_inputs()
# 1) Try to satisfy explicit .env match on PW Network
if device_match and pw_net:
for _, d in pw_net:
name = d.get('name', '')
if device_match in name:
idx = resolve_input_device_index(name)
if idx is not None:
input_sel = idx
selected_dev = d
logging.info(f"Selected Network input by match '{device_match}' (PipeWire): index={input_sel}, device={name}")
break
if selected_dev is not None:
break
logging.info("Waiting for USB input... retrying in 2s")
time.sleep(2)
if pw_net and selected_dev is None:
_, d0 = pw_net[0]
idx = resolve_input_device_index(d0.get('name', ''))
if idx is not None:
input_sel = idx
selected_dev = d0
logging.info(f"Selected first Network input (PipeWire): index={input_sel}, device={d0['name']}")
break
current_alsa = get_alsa_usb_inputs()
# 2) Try to satisfy explicit .env match on ALSA USB
if device_match and current_alsa and selected_dev is None:
matched = None
for idx_alsa, d in current_alsa:
name = d.get('name', '')
if device_match in name:
matched = (idx_alsa, d)
break
if matched is not None:
input_sel, selected_dev = matched
logging.info(f"Selected USB input by match '{device_match}' (ALSA): index={input_sel}, device={selected_dev['name']}")
break
# Fallback to first ALSA USB
if current_alsa and selected_dev is None:
input_sel, selected_dev = current_alsa[0]
logging.info(f"Selected first USB input (ALSA): index={input_sel}, device={selected_dev['name']}")
break
logging.info("Waiting for audio input (prefer PW Network, then ALSA USB)... retrying in 2s")
time.sleep(2)
TRANSPORT1 = 'serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi gpio header
TRANSPORT2 = 'serial:/dev/ttyAMA4,1000000,rtscts' # transport for raspberry pi gpio header
# Capture at 48 kHz to avoid PipeWire resampler latency; encode LC3 at 24 kHz
# Capture at 48 kHz to avoid resampler latency; encode LC3 at 24 kHz
CAPTURE_SRATE = 48000
LC3_SRATE = 24000
OCTETS_PER_FRAME=60
@@ -150,14 +157,16 @@ if __name__ == "__main__":
),
#auracast_config.AuracastBigConfigEng(),
],
immediate_rendering=True,
immediate_rendering=False,
presentation_delay_us=40000,
qos_config=auracast_config.AuracastQosHigh(),
auracast_sampling_rate_hz = LC3_SRATE,
octets_per_frame = OCTETS_PER_FRAME,
transport=TRANSPORT1
transport=TRANSPORT1,
enable_adaptive_frame_dropping=True,
)
#config.debug = True
config.debug = False
logging.info(config.model_dump_json(indent=2))
multicast.run_async(

View File

@@ -170,11 +170,29 @@ if audio_mode == "Demo":
"6 × 16kHz": {"quality": "Fair (16kHz)", "streams": 6},
}
demo_options = list(demo_stream_map.keys())
default_demo = demo_options[0]
default_index = 0
saved_type = saved_settings.get('demo_stream_type')
if isinstance(saved_type, str) and saved_type in demo_options:
default_index = demo_options.index(saved_type)
else:
saved_rate = saved_settings.get('auracast_sampling_rate_hz')
saved_total = saved_settings.get('demo_total_streams')
if saved_total is None and saved_settings.get('audio_mode') == 'Demo':
saved_total = len(saved_settings.get('channel_names') or [])
try:
if saved_rate and saved_total:
for i, label in enumerate(demo_options):
cfg = demo_stream_map[label]
rate_for_label = QUALITY_MAP[cfg['quality']]['rate']
if cfg['streams'] == int(saved_total) and int(saved_rate) == rate_for_label:
default_index = i
break
except Exception:
default_index = 0
demo_selected = st.selectbox(
"Demo Stream Type",
demo_options,
index=0,
index=default_index,
help="Select the demo stream configuration."
)
# Stream password and flags (same as USB/AES67)

View File

@@ -19,8 +19,9 @@ from auracast import multicast_control, auracast_config
import sounddevice as sd # type: ignore
import traceback
from auracast.utils.sounddevice_utils import (
get_usb_pw_inputs,
get_network_pw_inputs,
get_alsa_usb_inputs,
resolve_input_device_index,
refresh_pw_cache,
)
from auracast.utils.reset_utils import reset_nrf54l
@@ -32,8 +33,6 @@ STREAM_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), 'stream_settings.
TRANSPORT1 = os.getenv('TRANSPORT1', 'serial:/dev/ttyAMA3,1000000,rtscts') # transport for raspberry pi gpio header
TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # transport for raspberry pi gpio header
os.environ["PULSE_LATENCY_MSEC"] = "3"
# In-memory cache to avoid disk I/O on hot paths like /status
@@ -105,7 +104,7 @@ app.add_middleware(
# Initialize global configuration
global_config_group = auracast_config.AuracastConfigGroup()
class StreamerWorker:
class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
"""Owns multicaster(s) on a dedicated asyncio loop in a background thread."""
def __init__(self) -> None:
@@ -162,7 +161,16 @@ class StreamerWorker:
pass
self._multicaster1 = None
# overwrite some configurations
conf.transport = TRANSPORT1
# Enable adaptive frame dropping only for device-based inputs (not file/demo)
try:
conf.enable_adaptive_frame_dropping = any(
isinstance(big.audio_source, str) and big.audio_source.startswith('device:')
for big in conf.bigs
)
except Exception:
conf.enable_adaptive_frame_dropping = False
# Derive device name and input mode
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None
@@ -170,21 +178,28 @@ class StreamerWorker:
if first_source.startswith('device:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
try:
usb_names = {d.get('name') for _, d in get_usb_pw_inputs()}
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
except Exception:
alsa_usb_names = set()
try:
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
except Exception:
usb_names, net_names = set(), set()
audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB'
# Map device name to index and configure input_format
device_index = int(input_device_name) if (input_device_name and input_device_name.isdigit()) else get_device_index_by_name(input_device_name or '')
# Map device name to index using centralized resolver
if input_device_name and input_device_name.isdigit():
device_index = int(input_device_name)
else:
device_index = resolve_input_device_index(input_device_name or '')
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.")
for big in conf.bigs:
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
devinfo = sd.query_devices(device_index)
capture_rate = int(devinfo.get('default_samplerate') or 48000)
# Force capture at 48 kHz to avoid resampler latency and 44.1 kHz incompatibilities
capture_rate = 48000
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
for big in conf.bigs:
@@ -204,6 +219,14 @@ class StreamerWorker:
auto_started = True
# Return proposed settings to persist on API side
demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'))
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
demo_type = None
if demo_count > 0 and demo_rate > 0:
if demo_rate in (48000, 24000, 16000):
demo_type = f"{demo_count} × {demo_rate//1000}kHz"
else:
demo_type = f"{demo_count} × {demo_rate}Hz"
return {
'channel_names': [big.name for big in conf.bigs],
'languages': [big.language for big in conf.bigs],
@@ -218,6 +241,8 @@ class StreamerWorker:
'immediate_rendering': getattr(conf, 'immediate_rendering', False),
'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False),
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
'demo_total_streams': demo_count,
'demo_stream_type': demo_type,
'is_streaming': auto_started,
}
@@ -230,10 +255,27 @@ class StreamerWorker:
self._multicaster2 = None
conf.transport = TRANSPORT2
# Enable adaptive frame dropping only for device-based inputs (not file/demo)
try:
conf.enable_adaptive_frame_dropping = any(
isinstance(big.audio_source, str) and big.audio_source.startswith('device:')
for big in conf.bigs
)
except Exception:
conf.enable_adaptive_frame_dropping = False
for big in conf.bigs:
if big.audio_source.startswith('device:'):
device_name = big.audio_source.split(':', 1)[1]
device_index = get_device_index_by_name(device_name)
# Resolve backend preference by membership
try:
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
except Exception:
net_names = set()
try:
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
except Exception:
alsa_usb_names = set()
device_index = resolve_input_device_index(device_name)
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
big.audio_source = f'device:{device_index}'
@@ -313,6 +355,24 @@ async def initialize2(conf: auracast_config.AuracastConfigGroup):
try:
log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2))
await streamer.call(streamer._w_init_secondary, conf)
try:
is_demo = any(isinstance(big.audio_source, str) and big.audio_source.startswith('file:') for big in conf.bigs)
if is_demo:
settings = load_stream_settings() or {}
primary_count = int(settings.get('demo_total_streams') or len(settings.get('channel_names') or []))
secondary_count = len(conf.bigs or [])
total = primary_count + secondary_count
settings['demo_total_streams'] = total
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
if demo_rate > 0:
if demo_rate in (48000, 24000, 16000):
settings['demo_stream_type'] = f"{total} × {demo_rate//1000}kHz"
else:
settings['demo_stream_type'] = f"{total} × {demo_rate}Hz"
settings['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(settings)
except Exception:
log.warning("Failed to persist demo_total_streams in /init2", exc_info=True)
except Exception as e:
log.error("Exception in /init2: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@@ -415,7 +475,7 @@ async def _autostart_from_settings():
):
return
# Check against the cached device lists
usb = [d for _, d in get_usb_pw_inputs()]
usb = [d for _, d in get_alsa_usb_inputs()]
net = [d for _, d in get_network_pw_inputs()]
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
if input_device_name in names:
@@ -471,11 +531,11 @@ async def _startup_autostart_event():
@app.get("/audio_inputs_pw_usb")
async def audio_inputs_pw_usb():
"""List PipeWire USB input nodes from cache."""
"""List USB input devices using ALSA backend (USB is ALSA in our scheme)."""
try:
devices = [
{"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)}
for idx, dev in get_usb_pw_inputs()
for idx, dev in get_alsa_usb_inputs()
]
return {"inputs": devices}
except Exception as e:

View File

@@ -23,7 +23,11 @@ def _pa_like_hostapi_index():
raise RuntimeError("PipeWire/PulseAudio host API not present in PortAudio.")
def _pw_dump():
return json.loads(subprocess.check_output(["pw-dump"]))
try:
return json.loads(subprocess.check_output(["pw-dump"]))
except (FileNotFoundError, subprocess.CalledProcessError):
# PipeWire not available
return []
def _sd_refresh():
"""Force PortAudio to re-enumerate devices on next query.
@@ -49,6 +53,74 @@ def _sd_matches_from_names(pa_idx, names):
out.append((i, d))
return out
def get_device_index_by_name(name: str, backend: str | None = None):
"""Return the sounddevice index for an input device with exact name.
If backend is provided, restrict to that backend:
- 'ALSA' uses the ALSA host API
- 'PipeWire' or 'PulseAudio' use the PipeWire/Pulse host API index
Returns None if not found.
"""
try:
devices = sd.query_devices()
hostapi_filter = None
if backend:
if backend.lower() == 'alsa':
try:
alsa = devices_by_backend('ALSA')
alsa_indices = {idx for idx, _ in alsa}
hostapi_filter = ('indices', alsa_indices)
except Exception:
return None
else:
try:
pa_idx = _pa_like_hostapi_index()
hostapi_filter = ('hostapi', pa_idx)
except Exception:
return None
for idx, d in enumerate(devices):
if d.get('max_input_channels', 0) <= 0:
continue
if d.get('name') != name:
continue
if hostapi_filter:
kind, val = hostapi_filter
if kind == 'indices' and idx not in val:
continue
if kind == 'hostapi' and d.get('hostapi') != val:
continue
return idx
except Exception:
return None
return None
def resolve_input_device_index(name: str):
"""Resolve device index by exact name preferring backend by device type.
- If name is known PipeWire Network device, use PipeWire backend.
- Else if name is known ALSA USB device, use ALSA backend.
- Else fallback to any backend.
Returns None if not found.
"""
try:
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
except Exception:
net_names = set()
try:
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
except Exception:
alsa_usb_names = set()
if name in net_names:
idx = get_device_index_by_name(name, backend='PipeWire')
if idx is not None:
return idx
if name in alsa_usb_names:
idx = get_device_index_by_name(name, backend='ALSA')
if idx is not None:
return idx
return get_device_index_by_name(name, backend=None)
# Module-level caches for device lists
_usb_inputs_cache = []
_network_inputs_cache = []
@@ -66,12 +138,22 @@ def refresh_pw_cache():
Performs a full device scan and updates the internal caches for both USB
and Network audio devices. This is a heavy operation and should not be
called frequently or during active streams.
If PipeWire is not available, caches will remain empty.
"""
global _usb_inputs_cache, _network_inputs_cache
# Force PortAudio to re-enumerate devices
_sd_refresh()
pa_idx = _pa_like_hostapi_index()
try:
pa_idx = _pa_like_hostapi_index()
except RuntimeError:
# PipeWire/PulseAudio not available - reset caches and return
_usb_inputs_cache = []
_network_inputs_cache = []
return
pw = _pw_dump()
# --- Pass 1: Map device.id to device.bus ---
@@ -129,6 +211,53 @@ def refresh_pw_cache():
_network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names)
def get_alsa_usb_inputs():
"""
Return USB audio input devices using the ALSA backend.
Filters for devices that appear to be USB hardware (hw:X,Y pattern or USB in name).
Returns list of (index, device_dict) tuples.
"""
try:
alsa_devices = devices_by_backend('ALSA')
except ValueError:
# ALSA backend not available
return []
usb_inputs = []
for idx, dev in alsa_devices:
# Only include input devices
if dev.get('max_input_channels', 0) <= 0:
continue
name = dev.get('name', '').lower()
# Filter for USB devices based on common patterns:
# - Contains 'usb' in the name
# - hw:X,Y pattern (ALSA hardware devices)
# Exclude: default, dmix, pulse, pipewire, sysdefault
if any(exclude in name for exclude in ['default', 'dmix', 'pulse', 'pipewire', 'sysdefault']):
continue
# Include if it has 'usb' in name or matches hw:X pattern
if 'usb' in name or re.match(r'hw:\d+', name):
usb_inputs.append((idx, dev))
return usb_inputs
def get_alsa_inputs():
"""
Return all ALSA audio input devices.
Returns list of (index, device_dict) tuples.
"""
try:
alsa_devices = devices_by_backend('ALSA')
except ValueError:
# ALSA backend not available
return []
return [(idx, dev) for idx, dev in alsa_devices
if dev.get('max_input_channels', 0) > 0]
# Populate cache on initial module load
refresh_pw_cache()

View File

@@ -1,6 +1,5 @@
import sounddevice as sd, pprint
from auracast.utils.sounddevice_utils import devices_by_backend, list_usb_pw_inputs, list_network_pw_inputs
from auracast.utils.sounddevice_utils import devices_by_backend
print("PortAudio library:", sd._libname)
print("PortAudio version:", sd.get_portaudio_version())
@@ -15,10 +14,3 @@ for i, d in devices_by_backend("PulseAudio"):
print(f"{i}: {d['name']} in={d['max_input_channels']} out={d['max_output_channels']}")
print("Network pw inputs:")
for i, d in list_network_pw_inputs():
print(f"{i}: {d['name']} in={d['max_input_channels']}")
print("USB pw inputs:")
for i, d in list_usb_pw_inputs():
print(f"{i}: {d['name']} in={d['max_input_channels']}")