Files
latency_test_suit/mic_scope_smooth_spectrum.py
2025-10-15 10:38:06 +02:00

197 lines
7.7 KiB
Python

#!/usr/bin/env python3
import argparse, threading, os
import numpy as np
import sounddevice as sd
import matplotlib
# GUI-Backend wählen
if os.environ.get("DISPLAY"):
matplotlib.use("TkAgg", force=True)
else:
matplotlib.use("Agg", force=True)
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
def ema_update(prev, new, alpha):
return new if prev is None else (alpha * new + (1 - alpha) * prev)
def dbfs(x):
return 20.0 * np.log10(np.maximum(x, 1e-20))
def main():
ap = argparse.ArgumentParser(description="Live Mic-Scope (ruhiges Zeitbild + Frequenzband)")
# Audio
ap.add_argument("-r", "--samplerate", type=int, default=48_000)
ap.add_argument("-c", "--channels", type=int, default=1)
ap.add_argument("-b", "--blocksize", type=int, default=1024)
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
# Zeitansicht
ap.add_argument("-w", "--window", type=float, default=0.8, help="Zeitfenster (s)")
ap.add_argument("--gain", type=float, default=1.0, help="Anzeige-Gain (nur Darstellung)")
ap.add_argument("--smooth-ms", type=float, default=6.0, help="Glättung Waveform (ms)")
ap.add_argument("--level-ema", type=float, default=0.30, help="Level-EMA Zeitkonstante (s)")
ap.add_argument("--autoscale", action="store_true", help="Automatische Y-Skalierung aktivieren")
ap.add_argument("--ymax", type=float, default=1.05, help="Fixe Y-Grenze, wenn Autoscale aus")
# Spektrum
ap.add_argument("--fft-n", type=int, default=4096, help="FFT-Punkte (wird auf nächste 2er-Potenz gerundet)")
ap.add_argument("--spec-ema", type=float, default=0.30, help="Spektral-EMA Zeitkonstante (s)")
ap.add_argument("--fmin", type=float, default=20.0, help="untere Frequenzgrenze (Hz)")
ap.add_argument("--fmax", type=float, default=20000.0, help="obere Frequenzgrenze (Hz)")
ap.add_argument("--logx", action="store_true", help="logarithmische Frequenzachse")
args = ap.parse_args()
fs = args.samplerate
nwin = int(args.window * fs)
buf = np.zeros(nwin, dtype=np.float32)
lock = threading.Lock()
# --- Zustände ---
level_dbfs = None
peak_hold = 0.0
clip_pct = 0.0
# Waveform-Glättungskern (nur Anzeige)
k = max(1, int(args.smooth_ms * fs / 1000.0))
kernel = np.ones(k, dtype=np.float32) / k if k > 1 else None
# EMA-Alphas aus Zeitkonstanten
lvl_alpha = 1.0 - np.exp(-args.blocksize / (fs * max(1e-3, args.level_ema)))
spec_alpha = 1.0 - np.exp(-args.blocksize / (fs * max(1e-3, args.spec_ema)))
# Spektrum-Setup
# nächstgrößere 2er-Potenz für FFT
nfft = 1 << (int(np.ceil(np.log2(max(256, args.fft_n)))))
hann = np.hanning(nfft).astype(np.float32)
spec_db = None # für EMA
# Frequenzachse & Sichtfenster
freqs = np.fft.rfftfreq(nfft, 1.0 / fs)
fmask = (freqs >= args.fmin) & (freqs <= args.fmax)
if not np.any(fmask):
fmask[:] = True # Fallback: alles zeigen
def callback(indata, frames, time_info, status):
nonlocal buf, level_dbfs, peak_hold, clip_pct
if status:
print(status)
x = indata[:, 0].astype(np.float32, copy=True)
# Clip-Detektion
c = np.mean(np.abs(x) >= 0.999)
clip_pct = 0.9 * clip_pct + 0.1 * c
# RMS -> dBFS (vor Anzeige-Gain)
rms = np.sqrt(np.mean(np.clip(x, -1.0, 1.0) ** 2) + 1e-20)
level_dbfs = ema_update(level_dbfs, 20 * np.log10(rms), lvl_alpha)
# Ringpuffer aktualisieren
with lock:
if len(x) >= len(buf):
buf[:] = x[-len(buf):]
else:
buf[:-len(x)] = buf[len(x):]
buf[-len(x):] = x
# Peak-Hold
peak = float(np.max(np.abs(x)))
peak_hold = max(peak, peak_hold * 0.95)
# --- Plot-Setup: zwei Spalten nebeneinander ---
fig, (ax_t, ax_f) = plt.subplots(1, 2, figsize=(12, 4))
# Zeitdiagramm
t = np.linspace(-args.window, 0, nwin)
(line_t,) = ax_t.plot(t, np.zeros_like(t), lw=1)
ax_t.set_title("Zeitbereich (glättet & autoscaled)")
ax_t.set_xlabel("Zeit [s]"); ax_t.set_ylabel("Amplitude")
ax_t.set_ylim(-args.ymax, args.ymax); ax_t.grid(True, alpha=0.3)
txt = ax_t.text(0.01, 0.92, "", ha="left", va="center", transform=ax_t.transAxes)
clip_txt = ax_t.text(0.99, 0.92, "", ha="right", va="center", transform=ax_t.transAxes)
# Frequenzdiagramm
(line_f,) = ax_f.plot(freqs[fmask], np.full(np.count_nonzero(fmask), -120.0), lw=1)
ax_f.set_title("Frequenzband (dBFS)")
ax_f.set_xlabel("Frequenz [Hz]"); ax_f.set_ylabel("Pegel [dBFS]")
ax_f.grid(True, alpha=0.3)
if args.logx:
ax_f.set_xscale("log")
ax_f.set_xlim(freqs[fmask][0], freqs[fmask][-1])
ax_f.set_ylim(-120, 0)
peak_marker = ax_f.axvline(x=1000, linestyle="--", alpha=0.7)
peak_text = ax_f.text(0.98, 0.92, "", ha="right", va="center", transform=ax_f.transAxes)
# Autoscale der Zeitansicht
y_ema = None
y_alpha = 0.15
disp_ymax = args.ymax
def update(_frame):
nonlocal spec_db, y_ema, disp_ymax
with lock:
y = buf.copy()
# ---- Zeitbereich (links) ----
y_plot = y * args.gain
if kernel is not None and len(y_plot) >= len(kernel):
y_plot = np.convolve(y_plot, kernel, mode="same")
if args.autoscale:
target = float(np.percentile(np.abs(y_plot), 95)) * 1.2
target = max(target, 0.2)
y_ema = ema_update(y_ema, target, y_alpha)
disp_ymax = max(0.2, min(1.5 * args.gain, y_ema))
ax_t.set_ylim(-disp_ymax, +disp_ymax)
line_t.set_ydata(y_plot)
db = level_dbfs if level_dbfs is not None else -np.inf
txt.set_text(f"Level: {db:6.1f} dBFS Peak: {peak_hold*args.gain:0.2f} Gain: {args.gain:g}"
+ (f"{disp_ymax:0.2f}" if args.autoscale else f"{args.ymax:0.2f}"))
if clip_pct > 0.005:
clip_txt.set_text(f"CLIP {clip_pct*100:4.1f}%")
clip_txt.set_color("red")
else:
clip_txt.set_text("")
# ---- Frequenzband (rechts) ----
# nimm den letzten nfft-Slice aus dem Ringpuffer
xfft = y[-nfft:].astype(np.float32, copy=False)
if xfft.size < nfft:
pad = np.zeros(nfft - xfft.size, dtype=np.float32)
xfft = np.concatenate([pad, xfft])
# Hann-Fenster & FFT
xw = xfft * hann
spec = np.fft.rfft(xw, n=nfft)
mag = np.abs(spec) / (np.sum(hann) / 2.0) # grobe Amplitudenkalibrierung
spec_linear = mag
# Spektral-EMA (über Frames)
spec_db_inst = dbfs(spec_linear)
if spec_db is None:
spec_db = spec_db_inst
else:
spec_db = spec_alpha * spec_db_inst + (1 - spec_alpha) * spec_db
# Anzeige beschränken
yspec = spec_db[fmask]
line_f.set_ydata(yspec)
# Peak suchen im Sichtfenster
idx_pk = int(np.nanargmax(yspec))
f_pk = float(freqs[fmask][idx_pk])
v_pk = float(yspec[idx_pk])
peak_marker.set_xdata([f_pk, f_pk])
peak_text.set_text(f"Peak: {f_pk:0.0f} Hz / {v_pk:0.1f} dBFS")
return line_t, txt, clip_txt, line_f, peak_marker, peak_text
stream_kwargs = dict(samplerate=fs, channels=args.channels, dtype="float32", blocksize=args.blocksize)
if args.indev is not None:
stream_kwargs["device"] = (args.indev, None)
with sd.InputStream(callback=callback, **stream_kwargs):
ani = FuncAnimation(fig, update, interval=33, blit=False) # ~30 FPS
print("Live-Ansicht läuft. Fenster schließen oder Strg+C zum Beenden.")
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main()