440Hz Commit

This commit is contained in:
2025-10-15 11:36:25 +02:00
parent 7a87a21368
commit d3d5068de4

273
latency_440.py Normal file
View File

@@ -0,0 +1,273 @@
#!/usr/bin/env python3
import argparse
import csv
from dataclasses import dataclass
import numpy as np
import sounddevice as sd
import matplotlib
matplotlib.use("TkAgg") # GUI-Ausgabe für interaktives Fenster
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
import threading
# ---------- Audio/Signal-Helfer ----------
@dataclass
class Times:
dac_first_time: float | None = None
adc_first_time: float | None = None
def generate_tone(f_hz: float, dur_s: float, fs: int, volume: float,
pre_silence: float = 0.20, post_silence: float = 0.40):
"""Stille + Sinus + Stille (mit 5ms Fade-in/out)."""
n_pre = int(pre_silence * fs)
n_tone = int(dur_s * fs)
n_post = int(post_silence * fs)
t = np.arange(n_tone) / fs
tone = np.sin(2 * np.pi * f_hz * t).astype(np.float32)
fade_n = max(1, int(0.005 * fs))
w = np.ones_like(tone)
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False)
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False)
ref = (volume * tone * w).astype(np.float32)
out = np.concatenate([np.zeros(n_pre, dtype=np.float32), ref, np.zeros(n_post, dtype=np.float32)])
return out, ref, n_pre
def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray):
"""Normierte Kreuzkorrelation; liefert Onset-Index und Confidence."""
x = signal.astype(np.float64)
r = ref.astype(np.float64)
M, N = len(r), len(x)
if N < M + 1:
return 0, np.array([0.0]), 0.0
# einfache Vor-Whitening (Hochpass) stabilisiert
xw = np.concatenate([[x[0]], x[1:] - 0.97 * x[:-1]])
rw = np.concatenate([[r[0]], r[1:] - 0.97 * r[:-1]])
corr = np.correlate(xw, rw, mode="valid")
x2 = xw**2
cs = np.concatenate([[0.0], np.cumsum(x2)])
E_x = cs[M:] - cs[:-M]
E_r = np.sum(rw**2) + 1e-20
nrm = np.sqrt(E_x * E_r) + 1e-20
nxc = corr / nrm
k = int(np.argmax(nxc))
conf = float(nxc[k])
return k, nxc, conf
def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
indev: int | None, outdev: int | None,
pre_silence: float = 0.20, post_silence: float = 0.40):
"""Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück."""
play_buf, ref, _ = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence)
record_buf = []
written = 0
times = Times()
def cb(indata, outdata, frames, time_info, status):
nonlocal written, times
if status:
# Ausgabe nur informativ; Xruns etc. beeinflussen Latenz
print(status, flush=True)
if times.adc_first_time is None:
times.adc_first_time = time_info.inputBufferAdcTime
chunk = play_buf[written:written+frames]
out = np.zeros((frames,), dtype=np.float32)
if len(chunk) > 0:
out[:len(chunk)] = chunk
if times.dac_first_time is None and np.any(out != 0.0):
first_nz = int(np.argmax(out != 0.0))
times.dac_first_time = time_info.outputBufferDacTime + first_nz / fs
outdata[:] = out.reshape(-1, 1)
record_buf.append(indata.copy().reshape(-1))
written += frames
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
if indev is not None or outdev is not None:
stream_kwargs["device"] = (indev, outdev)
with sd.Stream(callback=cb, **stream_kwargs):
sd.sleep(int(1000 * (len(play_buf) / fs)))
sd.sleep(200)
if times.adc_first_time is None or times.dac_first_time is None or not record_buf:
return np.nan, 0.0
rec = np.concatenate(record_buf).astype(np.float32)
onset_idx, _, conf = detect_onset_xcorr(rec, ref)
adc_detect_time = times.adc_first_time + onset_idx / fs
latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0
return float(latency_ms), conf
# ---------- 440-Hz-Runner & Einzel-Balken-Plot ----------
def run_440(repeats, fs, dur, vol, indev, outdev, conf_min):
f = 440.0
latencies: list[float] = []
confidences: list[float] = []
for i in range(repeats):
lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev)
latencies.append(lat_ms)
confidences.append(conf)
print(f"Try {i+1}/{repeats}: f=440 Hz -> latency={lat_ms:.2f} ms conf={conf:.3f}")
bad = sum((np.isnan(v) or c < conf_min) for v, c in zip(latencies, confidences))
if bad > 0:
print(f"Warnung: {bad} Messungen mit niedriger Confidence (< {conf_min}) oder NaN.")
return latencies, confidences
def plot_single_bar(latencies: list[float]):
data = np.array(latencies, dtype=float)
mean = float(np.nanmean(data)) if data.size else np.nan
std = float(np.nanstd(data, ddof=1)) if data.size > 1 else 0.0
vmin = float(np.nanmin(data)) if data.size else 0.0
vmax = float(np.nanmax(data)) if data.size else 0.0
fig, ax = plt.subplots(figsize=(5, 6))
ax.set_title("Latenz bei 440 Hz")
# Einzelner Balken bei x=0
ax.bar([0], [mean], color="#4C78A8", width=0.6, label="Mittelwert")
# Fehlerbalken = Standardabweichung
ax.errorbar([0], [mean], yerr=[[std], [std]], fmt="none", ecolor="#333333", capsize=6, label="Std")
# Spannweite min..max als vertikale Linie
ax.vlines(0, vmin, vmax, colors="#E45756", linewidth=3, label="MinMax")
ax.set_xticks([0])
ax.set_xticklabels(["440 Hz"])
# y-Achse startet immer bei 0
ymax = max(1.0, (vmax if np.isfinite(vmax) else 0.0))
ax.set_ylim(0.0, ymax * 1.1)
ax.set_ylabel("Latenz [ms]")
ax.grid(True, axis="y", alpha=0.3)
ax.legend(loc="best")
plt.tight_layout()
plt.show()
def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None, conf_min: float):
latencies: list[float] = []
confidences: list[float] = []
fig, ax = plt.subplots(figsize=(5, 6))
ax.set_title("Latenz bei 440 Hz")
bar = ax.bar([0], [0.0], color="#4C78A8", width=0.6, label="Mittelwert")
err = ax.errorbar([0], [0.0], yerr=[[0.0], [0.0]], fmt="none", ecolor="#333333", capsize=6, label="Std")
vline = ax.vlines(0, 0.0, 0.0, colors="#E45756", linewidth=3, label="MinMax")
ax.set_xticks([0])
ax.set_xticklabels(["440 Hz"])
ax.set_ylim(0.0, 1.0)
ax.set_ylabel("Latenz [ms]")
ax.grid(True, axis="y", alpha=0.3)
ax.legend(loc="best")
txt = ax.text(0.02, 0.96, "Confidence: -", transform=ax.transAxes, ha="left", va="top")
txt_mean = ax.text(0.02, 0.90, "Mean: - ms", transform=ax.transAxes, ha="left", va="top")
plt.tight_layout(rect=[0, 0.1, 1, 1])
running = threading.Event()
latest_changed = threading.Event()
lock = threading.Lock()
latest_conf = {"value": float("nan")}
def compute_stats():
data = np.array(latencies, dtype=float)
mean = float(np.nanmean(data)) if data.size else 0.0
std = float(np.nanstd(data, ddof=1)) if data.size > 1 else 0.0
vmin = float(np.nanmin(data)) if data.size else 0.0
vmax = float(np.nanmax(data)) if data.size else 0.0
return mean, std, vmin, vmax
def remove_errorbar(container):
if container is None:
return
# container has: lines (list[Line2D] or []), caplines (list[Line2D]), barlinecols (list[LineCollection])
for artist in list(getattr(container, 'lines', []) or []):
try:
artist.remove()
except Exception:
pass
for artist in list(getattr(container, 'caplines', []) or []):
try:
artist.remove()
except Exception:
pass
for artist in list(getattr(container, 'barlinecols', []) or []):
try:
artist.remove()
except Exception:
pass
def update_plot():
with lock:
mean, std, vmin, vmax = compute_stats()
bar.patches[0].set_height(mean)
# remove previous errorbar artists and create new ones
prev = nonlocal_err[0]
remove_errorbar(prev)
err_lines = ax.errorbar([0], [mean], yerr=[[std], [std]], fmt="none", ecolor="#333333", capsize=6)
nonlocal_err[0] = err_lines
vline.set_segments([[(0, vmin), (0, vmax)]])
c = latest_conf["value"]
if np.isfinite(c):
txt.set_text(f"Confidence: {c:.3f}")
else:
txt.set_text("Confidence: -")
if np.isfinite(mean):
txt_mean.set_text(f"Mean: {mean:.2f} ms")
else:
txt_mean.set_text("Mean: - ms")
ymax = max(1.0, vmax)
ax.set_ylim(0.0, ymax * 1.1)
fig.canvas.draw_idle()
def worker():
f = 440.0
while running.is_set():
lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev)
with lock:
latencies.append(lat_ms)
confidences.append(conf)
latest_conf["value"] = conf
latest_changed.set()
def on_start(event):
if running.is_set():
return
running.set()
if not hasattr(on_start, "thr") or not on_start.thr.is_alive():
on_start.thr = threading.Thread(target=worker, daemon=True)
on_start.thr.start()
def on_stop(event):
running.clear()
start_ax = fig.add_axes([0.58, 0.02, 0.15, 0.06])
stop_ax = fig.add_axes([0.77, 0.02, 0.15, 0.06])
btn_start = Button(start_ax, "Start")
btn_stop = Button(stop_ax, "Stop")
btn_start.on_clicked(on_start)
btn_stop.on_clicked(on_stop)
nonlocal_err = [err]
timer = fig.canvas.new_timer(interval=200)
def on_timer():
if latest_changed.is_set():
latest_changed.clear()
update_plot()
timer.add_callback(on_timer)
timer.start()
plt.show()
def main():
ap = argparse.ArgumentParser(description="Akustische Latenzmessung nur für 440 Hz")
ap.add_argument("--repeats", type=int, default=5, help="Wiederholungen")
ap.add_argument("-r", "--samplerate", type=int, default=48_000, help="Samplerate")
ap.add_argument("-t", "--time", type=float, default=0.15, help="Tondauer (s)")
ap.add_argument("-v", "--volume", type=float, default=0.6, help="Lautstärke 0..1")
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex")
ap.add_argument("--conf-min", type=float, default=0.3, help="Warnschwelle für Confidence")
args = ap.parse_args()
run_gui(args.samplerate, args.time, args.volume, args.indev, args.outdev, args.conf_min)
if __name__ == "__main__":
main()