diff --git a/latency_440.py b/latency_440.py new file mode 100644 index 0000000..0f6e12e --- /dev/null +++ b/latency_440.py @@ -0,0 +1,273 @@ +#!/usr/bin/env python3 +import argparse +import csv +from dataclasses import dataclass +import numpy as np +import sounddevice as sd +import matplotlib +matplotlib.use("TkAgg") # GUI-Ausgabe für interaktives Fenster +import matplotlib.pyplot as plt +from matplotlib.widgets import Button +import threading + +# ---------- Audio/Signal-Helfer ---------- +@dataclass +class Times: + dac_first_time: float | None = None + adc_first_time: float | None = None + +def generate_tone(f_hz: float, dur_s: float, fs: int, volume: float, + pre_silence: float = 0.20, post_silence: float = 0.40): + """Stille + Sinus + Stille (mit 5ms Fade-in/out).""" + n_pre = int(pre_silence * fs) + n_tone = int(dur_s * fs) + n_post = int(post_silence * fs) + t = np.arange(n_tone) / fs + tone = np.sin(2 * np.pi * f_hz * t).astype(np.float32) + fade_n = max(1, int(0.005 * fs)) + w = np.ones_like(tone) + w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False) + w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False) + ref = (volume * tone * w).astype(np.float32) + out = np.concatenate([np.zeros(n_pre, dtype=np.float32), ref, np.zeros(n_post, dtype=np.float32)]) + return out, ref, n_pre + +def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray): + """Normierte Kreuzkorrelation; liefert Onset-Index und Confidence.""" + x = signal.astype(np.float64) + r = ref.astype(np.float64) + M, N = len(r), len(x) + if N < M + 1: + return 0, np.array([0.0]), 0.0 + # einfache Vor-Whitening (Hochpass) stabilisiert + xw = np.concatenate([[x[0]], x[1:] - 0.97 * x[:-1]]) + rw = np.concatenate([[r[0]], r[1:] - 0.97 * r[:-1]]) + corr = np.correlate(xw, rw, mode="valid") + x2 = xw**2 + cs = np.concatenate([[0.0], np.cumsum(x2)]) + E_x = cs[M:] - cs[:-M] + E_r = np.sum(rw**2) + 1e-20 + nrm = np.sqrt(E_x * E_r) + 1e-20 + nxc = corr / nrm + k = int(np.argmax(nxc)) + conf = float(nxc[k]) + return k, nxc, conf + +def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float, + indev: int | None, outdev: int | None, + pre_silence: float = 0.20, post_silence: float = 0.40): + """Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück.""" + play_buf, ref, _ = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence) + record_buf = [] + written = 0 + times = Times() + + def cb(indata, outdata, frames, time_info, status): + nonlocal written, times + if status: + # Ausgabe nur informativ; Xruns etc. beeinflussen Latenz + print(status, flush=True) + if times.adc_first_time is None: + times.adc_first_time = time_info.inputBufferAdcTime + + chunk = play_buf[written:written+frames] + out = np.zeros((frames,), dtype=np.float32) + if len(chunk) > 0: + out[:len(chunk)] = chunk + + if times.dac_first_time is None and np.any(out != 0.0): + first_nz = int(np.argmax(out != 0.0)) + times.dac_first_time = time_info.outputBufferDacTime + first_nz / fs + + outdata[:] = out.reshape(-1, 1) + record_buf.append(indata.copy().reshape(-1)) + written += frames + + stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1) + if indev is not None or outdev is not None: + stream_kwargs["device"] = (indev, outdev) + + with sd.Stream(callback=cb, **stream_kwargs): + sd.sleep(int(1000 * (len(play_buf) / fs))) + sd.sleep(200) + + if times.adc_first_time is None or times.dac_first_time is None or not record_buf: + return np.nan, 0.0 + + rec = np.concatenate(record_buf).astype(np.float32) + onset_idx, _, conf = detect_onset_xcorr(rec, ref) + adc_detect_time = times.adc_first_time + onset_idx / fs + latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0 + return float(latency_ms), conf + +# ---------- 440-Hz-Runner & Einzel-Balken-Plot ---------- +def run_440(repeats, fs, dur, vol, indev, outdev, conf_min): + f = 440.0 + latencies: list[float] = [] + confidences: list[float] = [] + for i in range(repeats): + lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev) + latencies.append(lat_ms) + confidences.append(conf) + print(f"Try {i+1}/{repeats}: f=440 Hz -> latency={lat_ms:.2f} ms conf={conf:.3f}") + bad = sum((np.isnan(v) or c < conf_min) for v, c in zip(latencies, confidences)) + if bad > 0: + print(f"Warnung: {bad} Messungen mit niedriger Confidence (< {conf_min}) oder NaN.") + return latencies, confidences + +def plot_single_bar(latencies: list[float]): + data = np.array(latencies, dtype=float) + mean = float(np.nanmean(data)) if data.size else np.nan + std = float(np.nanstd(data, ddof=1)) if data.size > 1 else 0.0 + vmin = float(np.nanmin(data)) if data.size else 0.0 + vmax = float(np.nanmax(data)) if data.size else 0.0 + + fig, ax = plt.subplots(figsize=(5, 6)) + ax.set_title("Latenz bei 440 Hz") + # Einzelner Balken bei x=0 + ax.bar([0], [mean], color="#4C78A8", width=0.6, label="Mittelwert") + # Fehlerbalken = Standardabweichung + ax.errorbar([0], [mean], yerr=[[std], [std]], fmt="none", ecolor="#333333", capsize=6, label="Std") + # Spannweite min..max als vertikale Linie + ax.vlines(0, vmin, vmax, colors="#E45756", linewidth=3, label="Min–Max") + ax.set_xticks([0]) + ax.set_xticklabels(["440 Hz"]) + # y-Achse startet immer bei 0 + ymax = max(1.0, (vmax if np.isfinite(vmax) else 0.0)) + ax.set_ylim(0.0, ymax * 1.1) + ax.set_ylabel("Latenz [ms]") + ax.grid(True, axis="y", alpha=0.3) + ax.legend(loc="best") + plt.tight_layout() + plt.show() + +def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None, conf_min: float): + latencies: list[float] = [] + confidences: list[float] = [] + + fig, ax = plt.subplots(figsize=(5, 6)) + ax.set_title("Latenz bei 440 Hz") + bar = ax.bar([0], [0.0], color="#4C78A8", width=0.6, label="Mittelwert") + err = ax.errorbar([0], [0.0], yerr=[[0.0], [0.0]], fmt="none", ecolor="#333333", capsize=6, label="Std") + vline = ax.vlines(0, 0.0, 0.0, colors="#E45756", linewidth=3, label="Min–Max") + ax.set_xticks([0]) + ax.set_xticklabels(["440 Hz"]) + ax.set_ylim(0.0, 1.0) + ax.set_ylabel("Latenz [ms]") + ax.grid(True, axis="y", alpha=0.3) + ax.legend(loc="best") + txt = ax.text(0.02, 0.96, "Confidence: -", transform=ax.transAxes, ha="left", va="top") + txt_mean = ax.text(0.02, 0.90, "Mean: - ms", transform=ax.transAxes, ha="left", va="top") + plt.tight_layout(rect=[0, 0.1, 1, 1]) + + running = threading.Event() + latest_changed = threading.Event() + lock = threading.Lock() + latest_conf = {"value": float("nan")} + + def compute_stats(): + data = np.array(latencies, dtype=float) + mean = float(np.nanmean(data)) if data.size else 0.0 + std = float(np.nanstd(data, ddof=1)) if data.size > 1 else 0.0 + vmin = float(np.nanmin(data)) if data.size else 0.0 + vmax = float(np.nanmax(data)) if data.size else 0.0 + return mean, std, vmin, vmax + + def remove_errorbar(container): + if container is None: + return + # container has: lines (list[Line2D] or []), caplines (list[Line2D]), barlinecols (list[LineCollection]) + for artist in list(getattr(container, 'lines', []) or []): + try: + artist.remove() + except Exception: + pass + for artist in list(getattr(container, 'caplines', []) or []): + try: + artist.remove() + except Exception: + pass + for artist in list(getattr(container, 'barlinecols', []) or []): + try: + artist.remove() + except Exception: + pass + + def update_plot(): + with lock: + mean, std, vmin, vmax = compute_stats() + bar.patches[0].set_height(mean) + # remove previous errorbar artists and create new ones + prev = nonlocal_err[0] + remove_errorbar(prev) + err_lines = ax.errorbar([0], [mean], yerr=[[std], [std]], fmt="none", ecolor="#333333", capsize=6) + nonlocal_err[0] = err_lines + vline.set_segments([[(0, vmin), (0, vmax)]]) + c = latest_conf["value"] + if np.isfinite(c): + txt.set_text(f"Confidence: {c:.3f}") + else: + txt.set_text("Confidence: -") + if np.isfinite(mean): + txt_mean.set_text(f"Mean: {mean:.2f} ms") + else: + txt_mean.set_text("Mean: - ms") + ymax = max(1.0, vmax) + ax.set_ylim(0.0, ymax * 1.1) + fig.canvas.draw_idle() + + def worker(): + f = 440.0 + while running.is_set(): + lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev) + with lock: + latencies.append(lat_ms) + confidences.append(conf) + latest_conf["value"] = conf + latest_changed.set() + + def on_start(event): + if running.is_set(): + return + running.set() + if not hasattr(on_start, "thr") or not on_start.thr.is_alive(): + on_start.thr = threading.Thread(target=worker, daemon=True) + on_start.thr.start() + + def on_stop(event): + running.clear() + + start_ax = fig.add_axes([0.58, 0.02, 0.15, 0.06]) + stop_ax = fig.add_axes([0.77, 0.02, 0.15, 0.06]) + btn_start = Button(start_ax, "Start") + btn_stop = Button(stop_ax, "Stop") + btn_start.on_clicked(on_start) + btn_stop.on_clicked(on_stop) + + nonlocal_err = [err] + + timer = fig.canvas.new_timer(interval=200) + def on_timer(): + if latest_changed.is_set(): + latest_changed.clear() + update_plot() + timer.add_callback(on_timer) + timer.start() + + plt.show() + +def main(): + ap = argparse.ArgumentParser(description="Akustische Latenzmessung nur für 440 Hz") + ap.add_argument("--repeats", type=int, default=5, help="Wiederholungen") + ap.add_argument("-r", "--samplerate", type=int, default=48_000, help="Samplerate") + ap.add_argument("-t", "--time", type=float, default=0.15, help="Tondauer (s)") + ap.add_argument("-v", "--volume", type=float, default=0.6, help="Lautstärke 0..1") + ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex") + ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex") + ap.add_argument("--conf-min", type=float, default=0.3, help="Warnschwelle für Confidence") + args = ap.parse_args() + + run_gui(args.samplerate, args.time, args.volume, args.indev, args.outdev, args.conf_min) + +if __name__ == "__main__": + main()