197 lines
7.7 KiB
Python
197 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
import argparse, threading, os
|
|
import numpy as np
|
|
import sounddevice as sd
|
|
import matplotlib
|
|
# GUI-Backend wählen
|
|
if os.environ.get("DISPLAY"):
|
|
matplotlib.use("TkAgg", force=True)
|
|
else:
|
|
matplotlib.use("Agg", force=True)
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.animation import FuncAnimation
|
|
|
|
def ema_update(prev, new, alpha):
|
|
return new if prev is None else (alpha * new + (1 - alpha) * prev)
|
|
|
|
def dbfs(x):
|
|
return 20.0 * np.log10(np.maximum(x, 1e-20))
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Live Mic-Scope (ruhiges Zeitbild + Frequenzband)")
|
|
# Audio
|
|
ap.add_argument("-r", "--samplerate", type=int, default=48_000)
|
|
ap.add_argument("-c", "--channels", type=int, default=1)
|
|
ap.add_argument("-b", "--blocksize", type=int, default=1024)
|
|
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
|
|
# Zeitansicht
|
|
ap.add_argument("-w", "--window", type=float, default=0.8, help="Zeitfenster (s)")
|
|
ap.add_argument("--gain", type=float, default=1.0, help="Anzeige-Gain (nur Darstellung)")
|
|
ap.add_argument("--smooth-ms", type=float, default=6.0, help="Glättung Waveform (ms)")
|
|
ap.add_argument("--level-ema", type=float, default=0.30, help="Level-EMA Zeitkonstante (s)")
|
|
ap.add_argument("--autoscale", action="store_true", help="Automatische Y-Skalierung aktivieren")
|
|
ap.add_argument("--ymax", type=float, default=1.05, help="Fixe Y-Grenze, wenn Autoscale aus")
|
|
# Spektrum
|
|
ap.add_argument("--fft-n", type=int, default=4096, help="FFT-Punkte (wird auf nächste 2er-Potenz gerundet)")
|
|
ap.add_argument("--spec-ema", type=float, default=0.30, help="Spektral-EMA Zeitkonstante (s)")
|
|
ap.add_argument("--fmin", type=float, default=20.0, help="untere Frequenzgrenze (Hz)")
|
|
ap.add_argument("--fmax", type=float, default=20000.0, help="obere Frequenzgrenze (Hz)")
|
|
ap.add_argument("--logx", action="store_true", help="logarithmische Frequenzachse")
|
|
args = ap.parse_args()
|
|
|
|
fs = args.samplerate
|
|
nwin = int(args.window * fs)
|
|
buf = np.zeros(nwin, dtype=np.float32)
|
|
lock = threading.Lock()
|
|
|
|
# --- Zustände ---
|
|
level_dbfs = None
|
|
peak_hold = 0.0
|
|
clip_pct = 0.0
|
|
|
|
# Waveform-Glättungskern (nur Anzeige)
|
|
k = max(1, int(args.smooth_ms * fs / 1000.0))
|
|
kernel = np.ones(k, dtype=np.float32) / k if k > 1 else None
|
|
|
|
# EMA-Alphas aus Zeitkonstanten
|
|
lvl_alpha = 1.0 - np.exp(-args.blocksize / (fs * max(1e-3, args.level_ema)))
|
|
spec_alpha = 1.0 - np.exp(-args.blocksize / (fs * max(1e-3, args.spec_ema)))
|
|
|
|
# Spektrum-Setup
|
|
# nächstgrößere 2er-Potenz für FFT
|
|
nfft = 1 << (int(np.ceil(np.log2(max(256, args.fft_n)))))
|
|
hann = np.hanning(nfft).astype(np.float32)
|
|
spec_db = None # für EMA
|
|
# Frequenzachse & Sichtfenster
|
|
freqs = np.fft.rfftfreq(nfft, 1.0 / fs)
|
|
fmask = (freqs >= args.fmin) & (freqs <= args.fmax)
|
|
if not np.any(fmask):
|
|
fmask[:] = True # Fallback: alles zeigen
|
|
|
|
def callback(indata, frames, time_info, status):
|
|
nonlocal buf, level_dbfs, peak_hold, clip_pct
|
|
if status:
|
|
print(status)
|
|
x = indata[:, 0].astype(np.float32, copy=True)
|
|
|
|
# Clip-Detektion
|
|
c = np.mean(np.abs(x) >= 0.999)
|
|
clip_pct = 0.9 * clip_pct + 0.1 * c
|
|
|
|
# RMS -> dBFS (vor Anzeige-Gain)
|
|
rms = np.sqrt(np.mean(np.clip(x, -1.0, 1.0) ** 2) + 1e-20)
|
|
level_dbfs = ema_update(level_dbfs, 20 * np.log10(rms), lvl_alpha)
|
|
|
|
# Ringpuffer aktualisieren
|
|
with lock:
|
|
if len(x) >= len(buf):
|
|
buf[:] = x[-len(buf):]
|
|
else:
|
|
buf[:-len(x)] = buf[len(x):]
|
|
buf[-len(x):] = x
|
|
|
|
# Peak-Hold
|
|
peak = float(np.max(np.abs(x)))
|
|
peak_hold = max(peak, peak_hold * 0.95)
|
|
|
|
# --- Plot-Setup: zwei Spalten nebeneinander ---
|
|
fig, (ax_t, ax_f) = plt.subplots(1, 2, figsize=(12, 4))
|
|
# Zeitdiagramm
|
|
t = np.linspace(-args.window, 0, nwin)
|
|
(line_t,) = ax_t.plot(t, np.zeros_like(t), lw=1)
|
|
ax_t.set_title("Zeitbereich (glättet & autoscaled)")
|
|
ax_t.set_xlabel("Zeit [s]"); ax_t.set_ylabel("Amplitude")
|
|
ax_t.set_ylim(-args.ymax, args.ymax); ax_t.grid(True, alpha=0.3)
|
|
txt = ax_t.text(0.01, 0.92, "", ha="left", va="center", transform=ax_t.transAxes)
|
|
clip_txt = ax_t.text(0.99, 0.92, "", ha="right", va="center", transform=ax_t.transAxes)
|
|
|
|
# Frequenzdiagramm
|
|
(line_f,) = ax_f.plot(freqs[fmask], np.full(np.count_nonzero(fmask), -120.0), lw=1)
|
|
ax_f.set_title("Frequenzband (dBFS)")
|
|
ax_f.set_xlabel("Frequenz [Hz]"); ax_f.set_ylabel("Pegel [dBFS]")
|
|
ax_f.grid(True, alpha=0.3)
|
|
if args.logx:
|
|
ax_f.set_xscale("log")
|
|
ax_f.set_xlim(freqs[fmask][0], freqs[fmask][-1])
|
|
ax_f.set_ylim(-120, 0)
|
|
peak_marker = ax_f.axvline(x=1000, linestyle="--", alpha=0.7)
|
|
peak_text = ax_f.text(0.98, 0.92, "", ha="right", va="center", transform=ax_f.transAxes)
|
|
|
|
# Autoscale der Zeitansicht
|
|
y_ema = None
|
|
y_alpha = 0.15
|
|
disp_ymax = args.ymax
|
|
|
|
def update(_frame):
|
|
nonlocal spec_db, y_ema, disp_ymax
|
|
with lock:
|
|
y = buf.copy()
|
|
|
|
# ---- Zeitbereich (links) ----
|
|
y_plot = y * args.gain
|
|
if kernel is not None and len(y_plot) >= len(kernel):
|
|
y_plot = np.convolve(y_plot, kernel, mode="same")
|
|
|
|
if args.autoscale:
|
|
target = float(np.percentile(np.abs(y_plot), 95)) * 1.2
|
|
target = max(target, 0.2)
|
|
y_ema = ema_update(y_ema, target, y_alpha)
|
|
disp_ymax = max(0.2, min(1.5 * args.gain, y_ema))
|
|
ax_t.set_ylim(-disp_ymax, +disp_ymax)
|
|
line_t.set_ydata(y_plot)
|
|
|
|
db = level_dbfs if level_dbfs is not None else -np.inf
|
|
txt.set_text(f"Level: {db:6.1f} dBFS Peak: {peak_hold*args.gain:0.2f} Gain: {args.gain:g}"
|
|
+ (f" Y±{disp_ymax:0.2f}" if args.autoscale else f" Y±{args.ymax:0.2f}"))
|
|
|
|
if clip_pct > 0.005:
|
|
clip_txt.set_text(f"CLIP {clip_pct*100:4.1f}%")
|
|
clip_txt.set_color("red")
|
|
else:
|
|
clip_txt.set_text("")
|
|
|
|
# ---- Frequenzband (rechts) ----
|
|
# nimm den letzten nfft-Slice aus dem Ringpuffer
|
|
xfft = y[-nfft:].astype(np.float32, copy=False)
|
|
if xfft.size < nfft:
|
|
pad = np.zeros(nfft - xfft.size, dtype=np.float32)
|
|
xfft = np.concatenate([pad, xfft])
|
|
# Hann-Fenster & FFT
|
|
xw = xfft * hann
|
|
spec = np.fft.rfft(xw, n=nfft)
|
|
mag = np.abs(spec) / (np.sum(hann) / 2.0) # grobe Amplitudenkalibrierung
|
|
spec_linear = mag
|
|
|
|
# Spektral-EMA (über Frames)
|
|
spec_db_inst = dbfs(spec_linear)
|
|
if spec_db is None:
|
|
spec_db = spec_db_inst
|
|
else:
|
|
spec_db = spec_alpha * spec_db_inst + (1 - spec_alpha) * spec_db
|
|
|
|
# Anzeige beschränken
|
|
yspec = spec_db[fmask]
|
|
line_f.set_ydata(yspec)
|
|
|
|
# Peak suchen im Sichtfenster
|
|
idx_pk = int(np.nanargmax(yspec))
|
|
f_pk = float(freqs[fmask][idx_pk])
|
|
v_pk = float(yspec[idx_pk])
|
|
peak_marker.set_xdata([f_pk, f_pk])
|
|
peak_text.set_text(f"Peak: {f_pk:0.0f} Hz / {v_pk:0.1f} dBFS")
|
|
|
|
return line_t, txt, clip_txt, line_f, peak_marker, peak_text
|
|
|
|
stream_kwargs = dict(samplerate=fs, channels=args.channels, dtype="float32", blocksize=args.blocksize)
|
|
if args.indev is not None:
|
|
stream_kwargs["device"] = (args.indev, None)
|
|
|
|
with sd.InputStream(callback=callback, **stream_kwargs):
|
|
ani = FuncAnimation(fig, update, interval=33, blit=False) # ~30 FPS
|
|
print("Live-Ansicht läuft. Fenster schließen oder Strg+C zum Beenden.")
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|