feat: add ALSA backend support and advanced audio metrics logging

This commit is contained in:
pstruebi
2025-10-22 08:32:10 +02:00
parent 6d835bf1be
commit 329510beae
4 changed files with 275 additions and 80 deletions
+189 -3
View File
@@ -311,8 +311,9 @@ async def init_broadcast(
bigs[f'big{i}']['iso_queue'] = iso_queue
logging.info(f'big{i} parameters are:')
logging.info('%s', pprint.pformat(vars(big)))
if global_config.debug:
logging.info(f'big{i} parameters are:')
logging.info('%s', pprint.pformat(vars(big)))
logging.info(f'Finished setup of big{i}.')
await asyncio.sleep(i+1) # Wait for advertising to set up
@@ -714,7 +715,7 @@ class Streamer():
# Periodic stats logging
now = time.perf_counter()
if now - last_stats_log >= stats_interval:
# Get current buffer status
# Get current buffer status from PortAudio
current_sd_buffer = 0
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
try:
@@ -722,19 +723,90 @@ class Streamer():
except Exception:
pass
# Get stream latency and CPU load from sounddevice
stream_latency_ms = None
cpu_load_pct = None
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
try:
latency = big['audio_input']._stream.latency
if frame_count == 501: # Debug log once
logging.info(f"DEBUG: stream.latency raw value = {latency}, type = {type(latency)}")
# latency can be either a float (for input-only streams) or tuple (input, output)
if latency is not None:
if isinstance(latency, (int, float)):
# Single value for input-only stream
stream_latency_ms = float(latency) * 1000.0
elif isinstance(latency, (tuple, list)) and len(latency) >= 1:
# Tuple (input_latency, output_latency)
stream_latency_ms = latency[0] * 1000.0
except Exception as e:
if frame_count == 501: # Log once at startup
logging.warning(f"Could not get stream.latency: {e}")
try:
cpu_load = big['audio_input']._stream.cpu_load
if frame_count == 501: # Debug log once
logging.info(f"DEBUG: stream.cpu_load raw value = {cpu_load}")
# cpu_load is a fraction (0.0 to 1.0)
if cpu_load is not None and cpu_load >= 0:
cpu_load_pct = cpu_load * 100.0 # Convert to percentage
except Exception as e:
if frame_count == 501: # Log once at startup
logging.warning(f"Could not get stream.cpu_load: {e}")
# Get backend-specific buffer status
backend_delay = None
backend_label = "Backend"
# Determine which backend we're using based on audio_input device
try:
device_info = big['audio_input']._device if hasattr(big['audio_input'], '_device') else None
if device_info is not None and isinstance(device_info, int):
hostapi = sd.query_hostapis(sd.query_devices(device_info)['hostapi'])
backend_name = hostapi['name']
else:
backend_name = "Unknown"
except Exception:
backend_name = "Unknown"
if 'pulse' in backend_name.lower():
# PipeWire/PulseAudio backend - no direct buffer access
# SD_buffer is the only reliable metric
backend_label = "PipeWire"
backend_delay = None # Cannot read PipeWire internal buffers directly
else:
# ALSA backend - can read kernel buffer
backend_label = "ALSA_kernel"
try:
with open('/proc/asound/card0/pcm0c/sub0/status', 'r') as f:
for line in f:
if 'delay' in line and ':' in line:
backend_delay = int(line.split(':')[1].strip())
break
except Exception:
pass
if enable_drift_compensation:
avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0
discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
logging.info(
f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | "
f"avg_discard={avg_discard_per_event:.0f} samples/event | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"{latency_str} | {cpu_str} | "
f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)"
)
else:
backend_str = f"{backend_label}={backend_delay} samples ({backend_delay / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" if backend_delay is not None else f"{backend_label}=N/A (use pw-top)"
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
logging.info(
f"STATS: frames={frame_count} | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"{latency_str} | {cpu_str} | "
f"{backend_str} | "
f"drift_compensation=DISABLED"
)
last_stats_log = now
@@ -791,6 +863,107 @@ if __name__ == "__main__":
)
os.chdir(os.path.dirname(__file__))
# =============================================================================
# AUDIO BACKEND CONFIGURATION - Toggle between ALSA and PipeWire
# =============================================================================
# Uncomment ONE of the following backend configurations:
# Option 1: Direct ALSA (Direct hardware access, bypasses PipeWire)
AUDIO_BACKEND = 'ALSA'
target_latency_ms = 10.0
# Option 2: PipeWire via PulseAudio API (Routes through pipewire-pulse)
#AUDIO_BACKEND = 'PipeWire'
#target_latency_ms = 5.0 # PipeWire typically handles lower latency better
# =============================================================================
import sounddevice as sd
import subprocess
# Detect if PipeWire is running (even if we're using ALSA API)
pipewire_running = False
try:
result = subprocess.run(['systemctl', '--user', 'is-active', 'pipewire'],
capture_output=True, text=True, timeout=1)
pipewire_running = (result.returncode == 0)
except Exception:
pass
if AUDIO_BACKEND == 'ALSA':
os.environ['SDL_AUDIODRIVER'] = 'alsa'
sd.default.latency = target_latency_ms / 1000.0
# Find ALSA host API
try:
alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'ALSA' in ha['name'])
logging.info(f"ALSA host API available at index: {alsa_hostapi}")
except StopIteration:
logging.error("ALSA backend not found!")
elif AUDIO_BACKEND == 'PipeWire':
os.environ['SDL_AUDIODRIVER'] = 'pulseaudio'
sd.default.latency = target_latency_ms / 1000.0
if not pipewire_running:
logging.error("PipeWire selected but not running!")
raise RuntimeError("PipeWire is not active")
# Find PulseAudio host API (required for PipeWire mode)
try:
pulse_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'pulse' in ha['name'].lower())
logging.info(f"Using PulseAudio host API at index: {pulse_hostapi} → routes to PipeWire")
except StopIteration:
logging.error("PulseAudio host API not found! Did you rebuild PortAudio with -DPA_USE_PULSEAUDIO=ON?")
raise RuntimeError("PulseAudio API not available in PortAudio")
else:
logging.error(f"Unknown AUDIO_BACKEND: {AUDIO_BACKEND}")
raise ValueError(f"Invalid AUDIO_BACKEND: {AUDIO_BACKEND}")
# Select audio input device based on backend
shure_device_idx = None
if AUDIO_BACKEND == 'ALSA':
# Use ALSA devices
from auracast.utils.sounddevice_utils import get_alsa_usb_inputs
devices = get_alsa_usb_inputs()
logging.info("Searching ALSA devices for Shure MVX2U...")
for idx, dev in devices:
logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
if 'shure' in dev['name'].lower() and 'mvx2u' in dev['name'].lower():
shure_device_idx = idx
logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}")
break
elif AUDIO_BACKEND == 'PipeWire':
# Use PulseAudio devices (routed through PipeWire)
logging.info("Searching PulseAudio devices for Shure MVX2U...")
for idx, dev in enumerate(sd.query_devices()):
# Only consider PulseAudio input devices
if dev['max_input_channels'] > 0:
hostapi = sd.query_hostapis(dev['hostapi'])
if 'pulse' in hostapi['name'].lower():
dev_name_lower = dev['name'].lower()
logging.info(f" PulseAudio device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
# Skip monitor devices (they're output monitors, not real inputs)
if 'monitor' in dev_name_lower:
continue
# Look for Shure MVX2U - prefer "Mono" device for mono input
if 'shure' in dev_name_lower and 'mvx2u' in dev_name_lower:
shure_device_idx = idx
logging.info(f"✓ Selected PulseAudio device {idx}: {dev['name']} → routes to PipeWire")
break
if shure_device_idx is None:
logging.error(f"Shure MVX2U not found in {AUDIO_BACKEND} devices!")
raise RuntimeError(f"Audio device not found for {AUDIO_BACKEND} backend")
config = auracast_config.AuracastConfigGroup(
bigs = [
auracast_config.AuracastBigConfigDeu(),
@@ -821,6 +994,16 @@ if __name__ == "__main__":
#big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files
#big.audio_source = read_lc3_file(big.audio_source) # load files in advance
# --- Configure Shure MVX2U USB Audio Interface (ALSA backend) ---
if shure_device_idx is not None:
big.audio_source = f'device:{shure_device_idx}' # Shure MVX2U USB mono interface
big.input_format = 'int16le,48000,1' # int16, 48kHz, mono
logging.info(f"Configured BIG '{big.name}' with Shure MVX2U (device:{shure_device_idx}, 48kHz mono)")
else:
logging.warning(f"Shure device not found, BIG '{big.name}' will use default audio_source: {big.audio_source}")
big.name='Broadcast0'
big.iso_que_len=1
# --- Network_uncoded mode using NetworkAudioReceiver ---
#big.audio_source = NetworkAudioReceiverUncoded(port=50007, samplerate=16000, channels=1, chunk_size=1024)
@@ -833,6 +1016,9 @@ if __name__ == "__main__":
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40 # 32kbps@16kHz
#config.debug = True
# Enable clock drift compensation to prevent latency accumulation
# With ~43 samples/sec drift (0.89ms/sec), threshold of 2ms will trigger every ~2.2 seconds
run_async(
broadcast(
+22 -66
View File
@@ -5,19 +5,13 @@ multicast_script
Loads environment variables from a .env file located next to this script
and configures the multicast broadcast. Only UPPERCASE keys are read.
This version uses ALSA backend for USB audio devices (no PipeWire required).
Environment variables
---------------------
- LOG_LEVEL: Logging level for the script.
Default: INFO. Examples: DEBUG, INFO, WARNING, ERROR.
- INPUT: Select audio capture source.
Values:
- "usb" (default): first available USB input device.
- "aes67": select AES67 inputs. Two forms:
* INPUT=aes67 -> first available AES67 input.
* INPUT=aes67,<substr> -> case-insensitive substring match against
the device name, e.g. INPUT=aes67,8f6326.
- BROADCAST_NAME: Name of the broadcast (Auracast BIG name).
Default: "Broadcast0".
@@ -27,16 +21,16 @@ Environment variables
- LANGUATE: ISO 639-3 language code used by config (intentional key name).
Default: "deu".
- PULSE_LATENCY_MSEC: Pulse/PipeWire latency hint in milliseconds.
Default: 3.
- ALSA_LATENCY_MSEC: ALSA latency hint in milliseconds.
Default: 2.
Examples (.env)
---------------
LOG_LEVEL=DEBUG
INPUT=aes67,8f6326
BROADCAST_NAME=MyBroadcast
PROGRAM_INFO="Live announcements"
LANGUATE=deu
ALSA_LATENCY_MSEC=2
"""
import logging
import os
@@ -45,9 +39,7 @@ from dotenv import load_dotenv
from auracast import multicast
from auracast import auracast_config
from auracast.utils.sounddevice_utils import (
get_usb_pw_inputs,
get_network_pw_inputs,
refresh_pw_cache,
get_alsa_usb_inputs,
)
@@ -60,65 +52,29 @@ if __name__ == "__main__":
os.chdir(os.path.dirname(__file__))
# Load .env located next to this script (only uppercase keys will be referenced)
load_dotenv(dotenv_path='.env')
os.environ.setdefault("PULSE_LATENCY_MSEC", "2")
# Default tight ALSA latency (ms); can be overridden via environment
os.environ.setdefault('ALSA_LATENCY_MSEC', '2')
# Refresh device cache and list inputs
refresh_pw_cache()
usb_inputs = get_usb_pw_inputs()
logging.info("USB pw inputs:")
# List USB ALSA inputs
usb_inputs = get_alsa_usb_inputs()
logging.info("USB ALSA inputs:")
for i, d in usb_inputs:
logging.info(f"{i}: {d['name']} in={d['max_input_channels']}")
aes67_inputs = get_network_pw_inputs()
logging.info("AES67 pw inputs:")
for i, d in aes67_inputs:
logging.info(f"{i}: {d['name']} in={d['max_input_channels']}")
# Input selection (usb | aes67). Default to usb.
# Allows specifying an AES67 device by substring: INPUT=aes67,<substring>
# Example: INPUT=aes67,8f6326 will match a device name containing "8f6326".
input_env = os.environ.get('INPUT', 'usb') or 'usb'
parts = [p.strip() for p in input_env.split(',', 1)]
input_mode = (parts[0] or 'usb').lower()
iface_substr = (parts[1].lower() if len(parts) > 1 and parts[1] else None)
# Loop until a USB input becomes available
selected_dev = None
if input_mode == 'aes67':
if not aes67_inputs and not iface_substr:
# No AES67 inputs and no specific target -> fail fast
raise RuntimeError("No AES67 audio inputs found.")
if iface_substr:
# Loop until a matching AES67 input becomes available
while True:
refresh_pw_cache()
current = get_network_pw_inputs()
sel = next(((i, d) for i, d in current if iface_substr in (d.get('name','').lower())), None)
if sel:
input_sel = sel[0]
selected_dev = sel[1]
logging.info(f"Selected AES67 input by match '{iface_substr}': index={input_sel}")
break
logging.info(f"Waiting for AES67 input matching '{iface_substr}'... retrying in 2s")
time.sleep(2)
else:
input_sel, selected_dev = aes67_inputs[0]
logging.info(f"Selected first AES67 input: index={input_sel}, device={selected_dev['name']}")
else:
# Loop until a USB input becomes available (mirror AES67 retry behavior)
while True:
refresh_pw_cache()
current = get_usb_pw_inputs()
if current:
input_sel, selected_dev = current[0]
logging.info(f"Selected first USB input: index={input_sel}, device={selected_dev['name']}")
break
logging.info("Waiting for USB input... retrying in 2s")
time.sleep(2)
while True:
current = get_alsa_usb_inputs()
if current:
input_sel, selected_dev = current[0]
logging.info(f"Selected first USB input (ALSA): index={input_sel}, device={selected_dev['name']}")
break
logging.info("Waiting for USB input (ALSA)... retrying in 2s")
time.sleep(2)
TRANSPORT1 = 'serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi gpio header
TRANSPORT2 = 'serial:/dev/ttyAMA4,1000000,rtscts' # transport for raspberry pi gpio header
# Capture at 48 kHz to avoid PipeWire resampler latency; encode LC3 at 24 kHz
# Capture at 48 kHz to avoid resampler latency; encode LC3 at 24 kHz
CAPTURE_SRATE = 48000
LC3_SRATE = 24000
OCTETS_PER_FRAME=60
@@ -156,7 +112,7 @@ if __name__ == "__main__":
auracast_sampling_rate_hz = LC3_SRATE,
octets_per_frame = OCTETS_PER_FRAME,
transport=TRANSPORT1,
enable_drift_compensation=True,
enable_drift_compensation=False,
drift_threshold_ms=2.0
)
config.debug = False
+63 -2
View File
@@ -23,7 +23,11 @@ def _pa_like_hostapi_index():
raise RuntimeError("PipeWire/PulseAudio host API not present in PortAudio.")
def _pw_dump():
return json.loads(subprocess.check_output(["pw-dump"]))
try:
return json.loads(subprocess.check_output(["pw-dump"]))
except (FileNotFoundError, subprocess.CalledProcessError):
# PipeWire not available
return []
def _sd_refresh():
"""Force PortAudio to re-enumerate devices on next query.
@@ -66,12 +70,22 @@ def refresh_pw_cache():
Performs a full device scan and updates the internal caches for both USB
and Network audio devices. This is a heavy operation and should not be
called frequently or during active streams.
If PipeWire is not available, caches will remain empty.
"""
global _usb_inputs_cache, _network_inputs_cache
# Force PortAudio to re-enumerate devices
_sd_refresh()
pa_idx = _pa_like_hostapi_index()
try:
pa_idx = _pa_like_hostapi_index()
except RuntimeError:
# PipeWire/PulseAudio not available - reset caches and return
_usb_inputs_cache = []
_network_inputs_cache = []
return
pw = _pw_dump()
# --- Pass 1: Map device.id to device.bus ---
@@ -129,6 +143,53 @@ def refresh_pw_cache():
_network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names)
def get_alsa_usb_inputs():
"""
Return USB audio input devices using the ALSA backend.
Filters for devices that appear to be USB hardware (hw:X,Y pattern or USB in name).
Returns list of (index, device_dict) tuples.
"""
try:
alsa_devices = devices_by_backend('ALSA')
except ValueError:
# ALSA backend not available
return []
usb_inputs = []
for idx, dev in alsa_devices:
# Only include input devices
if dev.get('max_input_channels', 0) <= 0:
continue
name = dev.get('name', '').lower()
# Filter for USB devices based on common patterns:
# - Contains 'usb' in the name
# - hw:X,Y pattern (ALSA hardware devices)
# Exclude: default, dmix, pulse, pipewire, sysdefault
if any(exclude in name for exclude in ['default', 'dmix', 'pulse', 'pipewire', 'sysdefault']):
continue
# Include if it has 'usb' in name or matches hw:X pattern
if 'usb' in name or re.match(r'hw:\d+', name):
usb_inputs.append((idx, dev))
return usb_inputs
def get_alsa_inputs():
"""
Return all ALSA audio input devices.
Returns list of (index, device_dict) tuples.
"""
try:
alsa_devices = devices_by_backend('ALSA')
except ValueError:
# ALSA backend not available
return []
return [(idx, dev) for idx, dev in alsa_devices
if dev.get('max_input_channels', 0) > 0]
# Populate cache on initial module load
refresh_pw_cache()
+1 -9
View File
@@ -1,6 +1,5 @@
import sounddevice as sd, pprint
from auracast.utils.sounddevice_utils import devices_by_backend, list_usb_pw_inputs, list_network_pw_inputs
from auracast.utils.sounddevice_utils import devices_by_backend
print("PortAudio library:", sd._libname)
print("PortAudio version:", sd.get_portaudio_version())
@@ -15,10 +14,3 @@ for i, d in devices_by_backend("PulseAudio"):
print(f"{i}: {d['name']} in={d['max_input_channels']} out={d['max_output_channels']}")
print("Network pw inputs:")
for i, d in list_network_pw_inputs():
print(f"{i}: {d['name']} in={d['max_input_channels']}")
print("USB pw inputs:")
for i, d in list_usb_pw_inputs():
print(f"{i}: {d['name']} in={d['max_input_channels']}")