Initial Commit
This commit is contained in:
24
# tone.py
Normal file
24
# tone.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
# tone.py
|
||||||
|
import numpy as np
|
||||||
|
import sounddevice as sd
|
||||||
|
|
||||||
|
def play_tone(frequency=440.0, duration=1.0, volume=0.2, sample_rate=44_100):
|
||||||
|
"""
|
||||||
|
Spielt einen Sinuston ab.
|
||||||
|
frequency: Hz
|
||||||
|
duration: Sekunden
|
||||||
|
volume: 0.0–1.0
|
||||||
|
sample_rate: Abtastrate in Hz
|
||||||
|
"""
|
||||||
|
t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False)
|
||||||
|
wave = (volume * np.sin(2 * np.pi * frequency * t)).astype(np.float32)
|
||||||
|
sd.play(wave, samplerate=sample_rate, blocking=True)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
ap = argparse.ArgumentParser(description="Spiele einen Ton")
|
||||||
|
ap.add_argument("-f", "--frequency", type=float, default=440.0, help="Frequenz in Hz (z.B. 440)")
|
||||||
|
ap.add_argument("-t", "--time", type=float, default=1.0, help="Dauer in Sekunden (z.B. 1.5)")
|
||||||
|
ap.add_argument("-v", "--volume", type=float, default=0.2, help="Lautstärke 0.0–1.0")
|
||||||
|
args = ap.parse_args()
|
||||||
|
play_tone(args.frequency, args.time, args.volume)
|
||||||
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
.venv/
|
||||||
BIN
latency_boxplot.png
Normal file
BIN
latency_boxplot.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 58 KiB |
51
latency_results.csv
Normal file
51
latency_results.csv
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
frequency_hz,trial_index,latency_ms,confidence
|
||||||
|
300.0,0,-207.31400000022404,0.00622056501343238
|
||||||
|
300.0,1,348.6330000000635,0.056284549811349935
|
||||||
|
300.0,2,-143.1591666669192,0.038332755806023674
|
||||||
|
300.0,3,-180.1999999997861,0.020459769531907698
|
||||||
|
300.0,4,243.53600000040387,0.027538859419328387
|
||||||
|
408.0,0,511.01566666693543,0.13034782307873757
|
||||||
|
408.0,1,205.96383333304402,0.00602274978480801
|
||||||
|
408.0,2,-156.44200000042474,0.006040940316049311
|
||||||
|
408.0,3,321.73283333304425,0.013428349185833171
|
||||||
|
408.0,4,-95.41566666666768,0.013516053778991198
|
||||||
|
556.0,0,-51.99999999967986,0.008908330169987178
|
||||||
|
556.0,1,163.87583333334987,0.012293108137542823
|
||||||
|
556.0,2,20.89316666706509,0.013672583791677939
|
||||||
|
556.0,3,92.36583333358794,0.08046100213143226
|
||||||
|
556.0,4,496.5111666665507,0.03348673405585233
|
||||||
|
756.0,0,-2.0180000001346343,0.0394411029185194
|
||||||
|
756.0,1,1.150000000052387,0.03896462403988957
|
||||||
|
756.0,2,206.1319999997977,0.8568158097486878
|
||||||
|
756.0,3,209.28316666640967,0.8396316030225823
|
||||||
|
756.0,4,211.58083333284594,0.8459048262785326
|
||||||
|
1029.0,0,210.14933333344743,0.9530819615129752
|
||||||
|
1029.0,1,210.65683333335983,0.9532782357410453
|
||||||
|
1029.0,2,211.39866666680973,0.9518998263560067
|
||||||
|
1029.0,3,211.22466666702167,0.9544329867247763
|
||||||
|
1029.0,4,210.19283333362182,0.9053878364174661
|
||||||
|
1400.0,0,210.17250000068088,0.9968093472420404
|
||||||
|
1400.0,1,210.19466666621156,0.9966188177939429
|
||||||
|
1400.0,2,210.24766666687356,0.9961348885445872
|
||||||
|
1400.0,3,209.77616666687027,0.9955757851840116
|
||||||
|
1400.0,4,211.357333333126,0.9945443953712811
|
||||||
|
1905.0,0,211.82350000026418,0.9960441681665126
|
||||||
|
1905.0,1,210.90266666669777,0.9961293380620004
|
||||||
|
1905.0,2,210.557666666773,0.9968312654763809
|
||||||
|
1905.0,3,212.87600000050588,0.9917235795474151
|
||||||
|
1905.0,4,210.96283333326937,0.9972385928498976
|
||||||
|
2592.0,0,211.56116666679736,0.9986127409307071
|
||||||
|
2592.0,1,212.37466666707405,0.9959640592347981
|
||||||
|
2592.0,2,212.0991666665759,0.9978592280151695
|
||||||
|
2592.0,3,212.55183333323657,0.9980591577758795
|
||||||
|
2592.0,4,212.1754999998302,0.998122763542641
|
||||||
|
3527.0,0,212.77016666681448,0.9966566522428859
|
||||||
|
3527.0,1,212.27300000055038,0.9977422314075753
|
||||||
|
3527.0,2,212.0703333334859,0.9970086636719195
|
||||||
|
3527.0,3,212.2191666667277,0.9930518500366761
|
||||||
|
3527.0,4,212.39983333362034,0.9958515430059891
|
||||||
|
4800.0,0,212.03550000018367,0.9362108622164094
|
||||||
|
4800.0,1,212.61533333336047,0.9542874220201463
|
||||||
|
4800.0,2,212.76416666660225,0.9675263253369292
|
||||||
|
4800.0,3,212.87066666673127,0.9592850324828915
|
||||||
|
4800.0,4,213.04566666685787,0.9851714795883683
|
||||||
|
181
latency_sweep.py
Normal file
181
latency_sweep.py
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
import csv
|
||||||
|
from dataclasses import dataclass
|
||||||
|
import numpy as np
|
||||||
|
import sounddevice as sd
|
||||||
|
import matplotlib
|
||||||
|
matplotlib.use("Agg") # Dateiausgabe ohne GUI; mit --show wird umgestellt
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
# ---------- Audio/Signal-Helfer ----------
|
||||||
|
@dataclass
|
||||||
|
class Times:
|
||||||
|
dac_first_time: float | None = None
|
||||||
|
adc_first_time: float | None = None
|
||||||
|
|
||||||
|
def generate_tone(f_hz: float, dur_s: float, fs: int, volume: float,
|
||||||
|
pre_silence: float = 0.20, post_silence: float = 0.40):
|
||||||
|
"""Stille + Sinus + Stille (mit 5ms Fade-in/out)."""
|
||||||
|
n_pre = int(pre_silence * fs)
|
||||||
|
n_tone = int(dur_s * fs)
|
||||||
|
n_post = int(post_silence * fs)
|
||||||
|
t = np.arange(n_tone) / fs
|
||||||
|
tone = np.sin(2 * np.pi * f_hz * t).astype(np.float32)
|
||||||
|
fade_n = max(1, int(0.005 * fs))
|
||||||
|
w = np.ones_like(tone)
|
||||||
|
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False)
|
||||||
|
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False)
|
||||||
|
ref = (volume * tone * w).astype(np.float32)
|
||||||
|
out = np.concatenate([np.zeros(n_pre, dtype=np.float32), ref, np.zeros(n_post, dtype=np.float32)])
|
||||||
|
return out, ref, n_pre
|
||||||
|
|
||||||
|
def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray):
|
||||||
|
"""Normierte Kreuzkorrelation; liefert Onset-Index und Confidence."""
|
||||||
|
x = signal.astype(np.float64)
|
||||||
|
r = ref.astype(np.float64)
|
||||||
|
M, N = len(r), len(x)
|
||||||
|
if N < M + 1:
|
||||||
|
return 0, np.array([0.0]), 0.0
|
||||||
|
# einfache Vor-Whitening (Hochpass) stabilisiert
|
||||||
|
xw = np.concatenate([[x[0]], x[1:] - 0.97 * x[:-1]])
|
||||||
|
rw = np.concatenate([[r[0]], r[1:] - 0.97 * r[:-1]])
|
||||||
|
corr = np.correlate(xw, rw, mode="valid")
|
||||||
|
x2 = xw**2
|
||||||
|
cs = np.concatenate([[0.0], np.cumsum(x2)])
|
||||||
|
E_x = cs[M:] - cs[:-M]
|
||||||
|
E_r = np.sum(rw**2) + 1e-20
|
||||||
|
nrm = np.sqrt(E_x * E_r) + 1e-20
|
||||||
|
nxc = corr / nrm
|
||||||
|
k = int(np.argmax(nxc))
|
||||||
|
conf = float(nxc[k])
|
||||||
|
return k, nxc, conf
|
||||||
|
|
||||||
|
def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
|
||||||
|
indev: int | None, outdev: int | None,
|
||||||
|
pre_silence: float = 0.20, post_silence: float = 0.40):
|
||||||
|
"""Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück."""
|
||||||
|
play_buf, ref, _ = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence)
|
||||||
|
record_buf = []
|
||||||
|
written = 0
|
||||||
|
times = Times()
|
||||||
|
|
||||||
|
def cb(indata, outdata, frames, time_info, status):
|
||||||
|
nonlocal written, times
|
||||||
|
if status:
|
||||||
|
# Ausgabe nur informativ; Xruns etc. beeinflussen Latenz
|
||||||
|
print(status, flush=True)
|
||||||
|
if times.adc_first_time is None:
|
||||||
|
times.adc_first_time = time_info.inputBufferAdcTime
|
||||||
|
|
||||||
|
chunk = play_buf[written:written+frames]
|
||||||
|
out = np.zeros((frames,), dtype=np.float32)
|
||||||
|
if len(chunk) > 0:
|
||||||
|
out[:len(chunk)] = chunk
|
||||||
|
|
||||||
|
if times.dac_first_time is None and np.any(out != 0.0):
|
||||||
|
first_nz = int(np.argmax(out != 0.0))
|
||||||
|
times.dac_first_time = time_info.outputBufferDacTime + first_nz / fs
|
||||||
|
|
||||||
|
outdata[:] = out.reshape(-1, 1)
|
||||||
|
record_buf.append(indata.copy().reshape(-1))
|
||||||
|
written += frames
|
||||||
|
|
||||||
|
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
|
||||||
|
if indev is not None or outdev is not None:
|
||||||
|
stream_kwargs["device"] = (indev, outdev)
|
||||||
|
|
||||||
|
with sd.Stream(callback=cb, **stream_kwargs):
|
||||||
|
sd.sleep(int(1000 * (len(play_buf) / fs)))
|
||||||
|
sd.sleep(200)
|
||||||
|
|
||||||
|
if times.adc_first_time is None or times.dac_first_time is None or not record_buf:
|
||||||
|
return np.nan, 0.0
|
||||||
|
|
||||||
|
rec = np.concatenate(record_buf).astype(np.float32)
|
||||||
|
onset_idx, _, conf = detect_onset_xcorr(rec, ref)
|
||||||
|
adc_detect_time = times.adc_first_time + onset_idx / fs
|
||||||
|
latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0
|
||||||
|
return float(latency_ms), conf
|
||||||
|
|
||||||
|
# ---------- Sweep-Runner & Plot ----------
|
||||||
|
def run_sweep(freqs, repeats, fs, dur, vol, indev, outdev, conf_min):
|
||||||
|
results = {float(f): [] for f in freqs}
|
||||||
|
confs = {float(f): [] for f in freqs}
|
||||||
|
for f in freqs:
|
||||||
|
for _ in range(repeats):
|
||||||
|
lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev)
|
||||||
|
results[float(f)].append(lat_ms)
|
||||||
|
confs[float(f)].append(conf)
|
||||||
|
print(f"f={f:.1f} Hz -> latency={lat_ms:.2f} ms conf={conf:.3f}")
|
||||||
|
# Markiere zu schwache Messungen (Confidence)
|
||||||
|
bad = sum((np.isnan(v) or c < conf_min) for arr, carr in zip(results.values(), confs.values()) for v, c in zip(arr, carr))
|
||||||
|
if bad > 0:
|
||||||
|
print(f"Warnung: {bad} Messungen mit niedriger Confidence (< {conf_min}) oder NaN.")
|
||||||
|
return results, confs
|
||||||
|
|
||||||
|
def save_csv(path, results, confs):
|
||||||
|
with open(path, "w", newline="") as f:
|
||||||
|
w = csv.writer(f)
|
||||||
|
w.writerow(["frequency_hz", "trial_index", "latency_ms", "confidence"])
|
||||||
|
for f_hz, arr in results.items():
|
||||||
|
for i, lat in enumerate(arr):
|
||||||
|
w.writerow([f_hz, i, lat, confs[f_hz][i]])
|
||||||
|
|
||||||
|
def plot_boxplot(path_png, results, show=False):
|
||||||
|
freqs = sorted(results.keys())
|
||||||
|
data = [results[f] for f in freqs]
|
||||||
|
fig = plt.figure(figsize=(10, 5))
|
||||||
|
plt.title("Akustische Latenz über Frequenz (Boxplot je 5 Wiederholungen)")
|
||||||
|
plt.boxplot(data, positions=range(len(freqs)), manage_ticks=False)
|
||||||
|
# Mittelwerte als Linie
|
||||||
|
means = [np.nanmean(arr) if len(arr) else np.nan for arr in data]
|
||||||
|
plt.plot(range(len(freqs)), means, marker="o", linestyle="--", label="Mittelwert")
|
||||||
|
plt.xticks(range(len(freqs)), [f"{int(f)}" for f in freqs])
|
||||||
|
plt.xlabel("Frequenz [Hz]")
|
||||||
|
plt.ylabel("Latenz [ms]")
|
||||||
|
plt.grid(True, axis="y", alpha=0.3)
|
||||||
|
plt.legend(loc="best")
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.savefig(path_png, dpi=150)
|
||||||
|
print(f"Plot gespeichert: {path_png}")
|
||||||
|
if show:
|
||||||
|
plt.switch_backend("TkAgg")
|
||||||
|
plt.show()
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
def parse_freqs(arg: str | None, default_n: int = 10, fmin: float = 300.0, fmax: float = 4800.0):
|
||||||
|
"""Erzeuge logarithmisch verteilte Defaults oder parse Kommata-Liste."""
|
||||||
|
if arg:
|
||||||
|
return np.array([float(x) for x in arg.split(",") if x.strip() != ""])
|
||||||
|
# 10 log-spaced Frequenzen (Laptop-Lautsprecher-freundlich)
|
||||||
|
return np.round(np.geomspace(fmin, fmax, default_n)).astype(float)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser(description="Mehrfachmessung akustischer Latenz über Frequenzpunkte")
|
||||||
|
ap.add_argument("--freqs", type=str, default=None,
|
||||||
|
help="Kommagetrennt (z.B. 300,400,600,1000) – sonst 10 log-spaced 300..4800 Hz")
|
||||||
|
ap.add_argument("--repeats", type=int, default=5, help="Wiederholungen je Frequenz")
|
||||||
|
ap.add_argument("-r", "--samplerate", type=int, default=48_000, help="Samplerate")
|
||||||
|
ap.add_argument("-t", "--time", type=float, default=0.15, help="Tondauer (s)")
|
||||||
|
ap.add_argument("-v", "--volume", type=float, default=0.6, help="Lautstärke 0..1")
|
||||||
|
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
|
||||||
|
ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex")
|
||||||
|
ap.add_argument("--csv", type=str, default="latency_results.csv", help="CSV-Dateiname")
|
||||||
|
ap.add_argument("--png", type=str, default="latency_boxplot.png", help="PNG-Dateiname")
|
||||||
|
ap.add_argument("--show", action="store_true", help="Plot anzeigen (GUI notwendig)")
|
||||||
|
ap.add_argument("--conf-min", type=float, default=0.3, help="Warnschwelle für Confidence")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
freqs = parse_freqs(args.freqs)
|
||||||
|
print("Frequenzen:", ", ".join(f"{int(f)}" for f in freqs))
|
||||||
|
results, confs = run_sweep(freqs, args.repeats, args.samplerate, args.time,
|
||||||
|
args.volume, args.indev, args.outdev, args.conf_min)
|
||||||
|
save_csv(args.csv, results, confs)
|
||||||
|
plot_boxplot(args.png, results, args.show)
|
||||||
|
# Kurze Zusammenfassung
|
||||||
|
means = {int(f): float(np.nanmean(v)) for f, v in results.items()}
|
||||||
|
print("Mittelwerte (ms):", means)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
BIN
latenz.png
Normal file
BIN
latenz.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 66 KiB |
136
mic_scope_smooth.py
Normal file
136
mic_scope_smooth.py
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse, threading, os
|
||||||
|
import numpy as np
|
||||||
|
import sounddevice as sd
|
||||||
|
import matplotlib
|
||||||
|
# GUI-Backend wählen
|
||||||
|
if os.environ.get("DISPLAY"):
|
||||||
|
matplotlib.use("TkAgg", force=True)
|
||||||
|
else:
|
||||||
|
matplotlib.use("Agg", force=True)
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from matplotlib.animation import FuncAnimation
|
||||||
|
|
||||||
|
def ema_update(prev, new, alpha):
|
||||||
|
return new if prev is None else (alpha * new + (1 - alpha) * prev)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser(description="Live Mic-Scope (glatt, autoscale, Clip-Anzeige)")
|
||||||
|
ap.add_argument("-r", "--samplerate", type=int, default=48_000)
|
||||||
|
ap.add_argument("-c", "--channels", type=int, default=1)
|
||||||
|
ap.add_argument("-b", "--blocksize", type=int, default=1024)
|
||||||
|
ap.add_argument("-w", "--window", type=float, default=0.8, help="Anzeigefenster in Sekunden")
|
||||||
|
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
|
||||||
|
ap.add_argument("--gain", type=float, default=1.0, help="Anzeige-Gain (nur Darstellung)")
|
||||||
|
ap.add_argument("--smooth-ms", type=float, default=5.0, help="Glättung des Waveforms (ms)")
|
||||||
|
ap.add_argument("--level-ema", type=float, default=0.30, help="Level-EMA Zeitkonstante in s")
|
||||||
|
ap.add_argument("--autoscale", action="store_true", help="Automatische Y-Skalierung aktivieren")
|
||||||
|
ap.add_argument("--ymax", type=float, default=1.05, help="Fixe Y-Grenze, wenn Autoscale aus")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
fs = args.samplerate
|
||||||
|
nwin = int(args.window * fs)
|
||||||
|
buf = np.zeros(nwin, dtype=np.float32)
|
||||||
|
lock = threading.Lock()
|
||||||
|
|
||||||
|
# State
|
||||||
|
level_dbfs = None
|
||||||
|
peak_hold = 0.0
|
||||||
|
clip_pct = 0.0
|
||||||
|
disp_ymax = args.ymax if not args.autoscale else None # wird dynamisch gesetzt
|
||||||
|
|
||||||
|
# Glättungskern (nur für Darstellung)
|
||||||
|
k = max(1, int(args.smooth_ms * fs / 1000))
|
||||||
|
kernel = np.ones(k, dtype=np.float32) / k if k > 1 else None
|
||||||
|
|
||||||
|
# Level-EMA Alpha aus Zeitkonstante
|
||||||
|
lvl_alpha = 1.0 - np.exp(-args.blocksize / (fs * max(1e-3, args.level_ema)))
|
||||||
|
|
||||||
|
def callback(indata, frames, time_info, status):
|
||||||
|
nonlocal buf, level_dbfs, peak_hold, clip_pct
|
||||||
|
if status:
|
||||||
|
print(status)
|
||||||
|
# nur erster Kanal
|
||||||
|
x = indata[:, 0].copy()
|
||||||
|
# Clip-Detektion (echtes Input-Clipping => abs(x) ~ 1.0)
|
||||||
|
c = np.mean(np.abs(x) >= 0.999)
|
||||||
|
clip_pct = 0.9 * clip_pct + 0.1 * c # leicht glätten
|
||||||
|
|
||||||
|
# RMS -> dBFS (vor Gain, um echte Nähe zu 0 dBFS zu sehen)
|
||||||
|
rms = np.sqrt(np.mean(np.clip(x, -1.0, 1.0) ** 2) + 1e-20)
|
||||||
|
db = 20 * np.log10(rms)
|
||||||
|
level_dbfs = ema_update(level_dbfs, db, lvl_alpha)
|
||||||
|
|
||||||
|
# Ringpuffer
|
||||||
|
with lock:
|
||||||
|
if len(x) >= len(buf):
|
||||||
|
buf[:] = x[-len(buf):]
|
||||||
|
else:
|
||||||
|
buf[:-len(x)] = buf[len(x):]
|
||||||
|
buf[-len(x):] = x
|
||||||
|
|
||||||
|
# Peak-Hold (für Textanzeige)
|
||||||
|
peak = float(np.max(np.abs(x)))
|
||||||
|
peak_hold = max(peak, peak_hold * 0.95) # langsamer Abkling
|
||||||
|
|
||||||
|
stream_kwargs = dict(samplerate=fs, channels=args.channels, dtype="float32", blocksize=args.blocksize)
|
||||||
|
if args.indev is not None:
|
||||||
|
stream_kwargs["device"] = (args.indev, None)
|
||||||
|
|
||||||
|
# Plot-Setup
|
||||||
|
fig, ax = plt.subplots(figsize=(10, 4))
|
||||||
|
t = np.linspace(-args.window, 0, nwin)
|
||||||
|
(line,) = ax.plot(t, np.zeros_like(t), lw=1)
|
||||||
|
ax.set_title("Mikrofon – Live Waveform (glättet & autoscaled)")
|
||||||
|
ax.set_xlabel("Zeit [s]"); ax.set_ylabel("Amplitude")
|
||||||
|
ax.set_ylim(-args.ymax, args.ymax)
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
txt = ax.text(0.01, 0.92, "", ha="left", va="center", transform=ax.transAxes)
|
||||||
|
clip_txt = ax.text(0.99, 0.92, "", ha="right", va="center", transform=ax.transAxes)
|
||||||
|
|
||||||
|
# weiche Autoscale-EMA für Y-Limits
|
||||||
|
y_ema = None
|
||||||
|
y_alpha = 0.15 # Höhere Werte => schnellere Anpassung
|
||||||
|
|
||||||
|
def update(_frame):
|
||||||
|
nonlocal disp_ymax, y_ema
|
||||||
|
with lock:
|
||||||
|
y = buf.copy()
|
||||||
|
|
||||||
|
# Anzeige-Gain
|
||||||
|
y *= args.gain
|
||||||
|
|
||||||
|
# Glättung nur für Darstellung
|
||||||
|
if kernel is not None and len(y) >= len(kernel):
|
||||||
|
y = np.convolve(y, kernel, mode="same")
|
||||||
|
|
||||||
|
# Autoscale (robust): Ziel = 95. Perzentil(|y|) * 1.2
|
||||||
|
if args.autoscale:
|
||||||
|
target = float(np.percentile(np.abs(y), 95)) * 1.2
|
||||||
|
target = max(target, 0.2) # minimal sinnvoller Bereich
|
||||||
|
y_ema = ema_update(y_ema, target, y_alpha)
|
||||||
|
disp_ymax = max(0.2, min(1.5 * args.gain, y_ema)) # clamp
|
||||||
|
ax.set_ylim(-disp_ymax, +disp_ymax)
|
||||||
|
|
||||||
|
line.set_ydata(y)
|
||||||
|
|
||||||
|
# Info-Text
|
||||||
|
db = level_dbfs if level_dbfs is not None else -np.inf
|
||||||
|
txt.set_text(f"Level: {db:6.1f} dBFS Peak: {peak_hold*args.gain:0.2f} Gain: {args.gain:g}"
|
||||||
|
+ (f" Y±{disp_ymax:0.2f}" if args.autoscale else f" Y±{args.ymax:0.2f}"))
|
||||||
|
|
||||||
|
# Clip-Warnung (rot, wenn >0.5% geclippt)
|
||||||
|
if clip_pct > 0.005:
|
||||||
|
clip_txt.set_text(f"CLIP {clip_pct*100:4.1f}%")
|
||||||
|
clip_txt.set_color("red")
|
||||||
|
else:
|
||||||
|
clip_txt.set_text("")
|
||||||
|
return line, txt, clip_txt
|
||||||
|
|
||||||
|
with sd.InputStream(callback=callback, **stream_kwargs):
|
||||||
|
ani = FuncAnimation(fig, update, interval=33, blit=False) # ~30 FPS
|
||||||
|
print("Live-Ansicht läuft. Fenster schließen oder Strg+C zum Beenden.")
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
196
mic_scope_smooth_spectrum.py
Normal file
196
mic_scope_smooth_spectrum.py
Normal file
@@ -0,0 +1,196 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse, threading, os
|
||||||
|
import numpy as np
|
||||||
|
import sounddevice as sd
|
||||||
|
import matplotlib
|
||||||
|
# GUI-Backend wählen
|
||||||
|
if os.environ.get("DISPLAY"):
|
||||||
|
matplotlib.use("TkAgg", force=True)
|
||||||
|
else:
|
||||||
|
matplotlib.use("Agg", force=True)
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from matplotlib.animation import FuncAnimation
|
||||||
|
|
||||||
|
def ema_update(prev, new, alpha):
|
||||||
|
return new if prev is None else (alpha * new + (1 - alpha) * prev)
|
||||||
|
|
||||||
|
def dbfs(x):
|
||||||
|
return 20.0 * np.log10(np.maximum(x, 1e-20))
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser(description="Live Mic-Scope (ruhiges Zeitbild + Frequenzband)")
|
||||||
|
# Audio
|
||||||
|
ap.add_argument("-r", "--samplerate", type=int, default=48_000)
|
||||||
|
ap.add_argument("-c", "--channels", type=int, default=1)
|
||||||
|
ap.add_argument("-b", "--blocksize", type=int, default=1024)
|
||||||
|
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
|
||||||
|
# Zeitansicht
|
||||||
|
ap.add_argument("-w", "--window", type=float, default=0.8, help="Zeitfenster (s)")
|
||||||
|
ap.add_argument("--gain", type=float, default=1.0, help="Anzeige-Gain (nur Darstellung)")
|
||||||
|
ap.add_argument("--smooth-ms", type=float, default=6.0, help="Glättung Waveform (ms)")
|
||||||
|
ap.add_argument("--level-ema", type=float, default=0.30, help="Level-EMA Zeitkonstante (s)")
|
||||||
|
ap.add_argument("--autoscale", action="store_true", help="Automatische Y-Skalierung aktivieren")
|
||||||
|
ap.add_argument("--ymax", type=float, default=1.05, help="Fixe Y-Grenze, wenn Autoscale aus")
|
||||||
|
# Spektrum
|
||||||
|
ap.add_argument("--fft-n", type=int, default=4096, help="FFT-Punkte (wird auf nächste 2er-Potenz gerundet)")
|
||||||
|
ap.add_argument("--spec-ema", type=float, default=0.30, help="Spektral-EMA Zeitkonstante (s)")
|
||||||
|
ap.add_argument("--fmin", type=float, default=20.0, help="untere Frequenzgrenze (Hz)")
|
||||||
|
ap.add_argument("--fmax", type=float, default=20000.0, help="obere Frequenzgrenze (Hz)")
|
||||||
|
ap.add_argument("--logx", action="store_true", help="logarithmische Frequenzachse")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
fs = args.samplerate
|
||||||
|
nwin = int(args.window * fs)
|
||||||
|
buf = np.zeros(nwin, dtype=np.float32)
|
||||||
|
lock = threading.Lock()
|
||||||
|
|
||||||
|
# --- Zustände ---
|
||||||
|
level_dbfs = None
|
||||||
|
peak_hold = 0.0
|
||||||
|
clip_pct = 0.0
|
||||||
|
|
||||||
|
# Waveform-Glättungskern (nur Anzeige)
|
||||||
|
k = max(1, int(args.smooth_ms * fs / 1000.0))
|
||||||
|
kernel = np.ones(k, dtype=np.float32) / k if k > 1 else None
|
||||||
|
|
||||||
|
# EMA-Alphas aus Zeitkonstanten
|
||||||
|
lvl_alpha = 1.0 - np.exp(-args.blocksize / (fs * max(1e-3, args.level_ema)))
|
||||||
|
spec_alpha = 1.0 - np.exp(-args.blocksize / (fs * max(1e-3, args.spec_ema)))
|
||||||
|
|
||||||
|
# Spektrum-Setup
|
||||||
|
# nächstgrößere 2er-Potenz für FFT
|
||||||
|
nfft = 1 << (int(np.ceil(np.log2(max(256, args.fft_n)))))
|
||||||
|
hann = np.hanning(nfft).astype(np.float32)
|
||||||
|
spec_db = None # für EMA
|
||||||
|
# Frequenzachse & Sichtfenster
|
||||||
|
freqs = np.fft.rfftfreq(nfft, 1.0 / fs)
|
||||||
|
fmask = (freqs >= args.fmin) & (freqs <= args.fmax)
|
||||||
|
if not np.any(fmask):
|
||||||
|
fmask[:] = True # Fallback: alles zeigen
|
||||||
|
|
||||||
|
def callback(indata, frames, time_info, status):
|
||||||
|
nonlocal buf, level_dbfs, peak_hold, clip_pct
|
||||||
|
if status:
|
||||||
|
print(status)
|
||||||
|
x = indata[:, 0].astype(np.float32, copy=True)
|
||||||
|
|
||||||
|
# Clip-Detektion
|
||||||
|
c = np.mean(np.abs(x) >= 0.999)
|
||||||
|
clip_pct = 0.9 * clip_pct + 0.1 * c
|
||||||
|
|
||||||
|
# RMS -> dBFS (vor Anzeige-Gain)
|
||||||
|
rms = np.sqrt(np.mean(np.clip(x, -1.0, 1.0) ** 2) + 1e-20)
|
||||||
|
level_dbfs = ema_update(level_dbfs, 20 * np.log10(rms), lvl_alpha)
|
||||||
|
|
||||||
|
# Ringpuffer aktualisieren
|
||||||
|
with lock:
|
||||||
|
if len(x) >= len(buf):
|
||||||
|
buf[:] = x[-len(buf):]
|
||||||
|
else:
|
||||||
|
buf[:-len(x)] = buf[len(x):]
|
||||||
|
buf[-len(x):] = x
|
||||||
|
|
||||||
|
# Peak-Hold
|
||||||
|
peak = float(np.max(np.abs(x)))
|
||||||
|
peak_hold = max(peak, peak_hold * 0.95)
|
||||||
|
|
||||||
|
# --- Plot-Setup: zwei Spalten nebeneinander ---
|
||||||
|
fig, (ax_t, ax_f) = plt.subplots(1, 2, figsize=(12, 4))
|
||||||
|
# Zeitdiagramm
|
||||||
|
t = np.linspace(-args.window, 0, nwin)
|
||||||
|
(line_t,) = ax_t.plot(t, np.zeros_like(t), lw=1)
|
||||||
|
ax_t.set_title("Zeitbereich (glättet & autoscaled)")
|
||||||
|
ax_t.set_xlabel("Zeit [s]"); ax_t.set_ylabel("Amplitude")
|
||||||
|
ax_t.set_ylim(-args.ymax, args.ymax); ax_t.grid(True, alpha=0.3)
|
||||||
|
txt = ax_t.text(0.01, 0.92, "", ha="left", va="center", transform=ax_t.transAxes)
|
||||||
|
clip_txt = ax_t.text(0.99, 0.92, "", ha="right", va="center", transform=ax_t.transAxes)
|
||||||
|
|
||||||
|
# Frequenzdiagramm
|
||||||
|
(line_f,) = ax_f.plot(freqs[fmask], np.full(np.count_nonzero(fmask), -120.0), lw=1)
|
||||||
|
ax_f.set_title("Frequenzband (dBFS)")
|
||||||
|
ax_f.set_xlabel("Frequenz [Hz]"); ax_f.set_ylabel("Pegel [dBFS]")
|
||||||
|
ax_f.grid(True, alpha=0.3)
|
||||||
|
if args.logx:
|
||||||
|
ax_f.set_xscale("log")
|
||||||
|
ax_f.set_xlim(freqs[fmask][0], freqs[fmask][-1])
|
||||||
|
ax_f.set_ylim(-120, 0)
|
||||||
|
peak_marker = ax_f.axvline(x=1000, linestyle="--", alpha=0.7)
|
||||||
|
peak_text = ax_f.text(0.98, 0.92, "", ha="right", va="center", transform=ax_f.transAxes)
|
||||||
|
|
||||||
|
# Autoscale der Zeitansicht
|
||||||
|
y_ema = None
|
||||||
|
y_alpha = 0.15
|
||||||
|
disp_ymax = args.ymax
|
||||||
|
|
||||||
|
def update(_frame):
|
||||||
|
nonlocal spec_db, y_ema, disp_ymax
|
||||||
|
with lock:
|
||||||
|
y = buf.copy()
|
||||||
|
|
||||||
|
# ---- Zeitbereich (links) ----
|
||||||
|
y_plot = y * args.gain
|
||||||
|
if kernel is not None and len(y_plot) >= len(kernel):
|
||||||
|
y_plot = np.convolve(y_plot, kernel, mode="same")
|
||||||
|
|
||||||
|
if args.autoscale:
|
||||||
|
target = float(np.percentile(np.abs(y_plot), 95)) * 1.2
|
||||||
|
target = max(target, 0.2)
|
||||||
|
y_ema = ema_update(y_ema, target, y_alpha)
|
||||||
|
disp_ymax = max(0.2, min(1.5 * args.gain, y_ema))
|
||||||
|
ax_t.set_ylim(-disp_ymax, +disp_ymax)
|
||||||
|
line_t.set_ydata(y_plot)
|
||||||
|
|
||||||
|
db = level_dbfs if level_dbfs is not None else -np.inf
|
||||||
|
txt.set_text(f"Level: {db:6.1f} dBFS Peak: {peak_hold*args.gain:0.2f} Gain: {args.gain:g}"
|
||||||
|
+ (f" Y±{disp_ymax:0.2f}" if args.autoscale else f" Y±{args.ymax:0.2f}"))
|
||||||
|
|
||||||
|
if clip_pct > 0.005:
|
||||||
|
clip_txt.set_text(f"CLIP {clip_pct*100:4.1f}%")
|
||||||
|
clip_txt.set_color("red")
|
||||||
|
else:
|
||||||
|
clip_txt.set_text("")
|
||||||
|
|
||||||
|
# ---- Frequenzband (rechts) ----
|
||||||
|
# nimm den letzten nfft-Slice aus dem Ringpuffer
|
||||||
|
xfft = y[-nfft:].astype(np.float32, copy=False)
|
||||||
|
if xfft.size < nfft:
|
||||||
|
pad = np.zeros(nfft - xfft.size, dtype=np.float32)
|
||||||
|
xfft = np.concatenate([pad, xfft])
|
||||||
|
# Hann-Fenster & FFT
|
||||||
|
xw = xfft * hann
|
||||||
|
spec = np.fft.rfft(xw, n=nfft)
|
||||||
|
mag = np.abs(spec) / (np.sum(hann) / 2.0) # grobe Amplitudenkalibrierung
|
||||||
|
spec_linear = mag
|
||||||
|
|
||||||
|
# Spektral-EMA (über Frames)
|
||||||
|
spec_db_inst = dbfs(spec_linear)
|
||||||
|
if spec_db is None:
|
||||||
|
spec_db = spec_db_inst
|
||||||
|
else:
|
||||||
|
spec_db = spec_alpha * spec_db_inst + (1 - spec_alpha) * spec_db
|
||||||
|
|
||||||
|
# Anzeige beschränken
|
||||||
|
yspec = spec_db[fmask]
|
||||||
|
line_f.set_ydata(yspec)
|
||||||
|
|
||||||
|
# Peak suchen im Sichtfenster
|
||||||
|
idx_pk = int(np.nanargmax(yspec))
|
||||||
|
f_pk = float(freqs[fmask][idx_pk])
|
||||||
|
v_pk = float(yspec[idx_pk])
|
||||||
|
peak_marker.set_xdata([f_pk, f_pk])
|
||||||
|
peak_text.set_text(f"Peak: {f_pk:0.0f} Hz / {v_pk:0.1f} dBFS")
|
||||||
|
|
||||||
|
return line_t, txt, clip_txt, line_f, peak_marker, peak_text
|
||||||
|
|
||||||
|
stream_kwargs = dict(samplerate=fs, channels=args.channels, dtype="float32", blocksize=args.blocksize)
|
||||||
|
if args.indev is not None:
|
||||||
|
stream_kwargs["device"] = (args.indev, None)
|
||||||
|
|
||||||
|
with sd.InputStream(callback=callback, **stream_kwargs):
|
||||||
|
ani = FuncAnimation(fig, update, interval=33, blit=False) # ~30 FPS
|
||||||
|
print("Live-Ansicht läuft. Fenster schließen oder Strg+C zum Beenden.")
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
23
tone.py
Normal file
23
tone.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
import numpy as np
|
||||||
|
import sounddevice as sd
|
||||||
|
|
||||||
|
def play_tone(frequency=440.0, duration=1.0, volume=0.2, sample_rate=44_100):
|
||||||
|
"""
|
||||||
|
Spielt einen Sinuston ab.
|
||||||
|
frequency: Hz
|
||||||
|
duration: Sekunden
|
||||||
|
volume: 0.0–1.0
|
||||||
|
sample_rate: Abtastrate in Hz
|
||||||
|
"""
|
||||||
|
t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False)
|
||||||
|
wave = (volume * np.sin(2 * np.pi * frequency * t)).astype(np.float32)
|
||||||
|
sd.play(wave, samplerate=sample_rate, blocking=True)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
ap = argparse.ArgumentParser(description="Spiele einen Ton")
|
||||||
|
ap.add_argument("-f", "--frequency", type=float, default=440.0, help="Frequenz in Hz (z.B. 440)")
|
||||||
|
ap.add_argument("-t", "--time", type=float, default=1.0, help="Dauer in Sekunden (z.B. 1.5)")
|
||||||
|
ap.add_argument("-v", "--volume", type=float, default=0.2, help="Lautstärke 0.0–1.0")
|
||||||
|
args = ap.parse_args()
|
||||||
|
play_tone(args.frequency, args.time, args.volume)
|
||||||
Reference in New Issue
Block a user