Files
latency_test_suit/latency_sweep.py
2025-10-15 10:38:06 +02:00

182 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import argparse
import csv
from dataclasses import dataclass
import numpy as np
import sounddevice as sd
import matplotlib
matplotlib.use("Agg") # Dateiausgabe ohne GUI; mit --show wird umgestellt
import matplotlib.pyplot as plt
# ---------- Audio/Signal-Helfer ----------
@dataclass
class Times:
dac_first_time: float | None = None
adc_first_time: float | None = None
def generate_tone(f_hz: float, dur_s: float, fs: int, volume: float,
pre_silence: float = 0.20, post_silence: float = 0.40):
"""Stille + Sinus + Stille (mit 5ms Fade-in/out)."""
n_pre = int(pre_silence * fs)
n_tone = int(dur_s * fs)
n_post = int(post_silence * fs)
t = np.arange(n_tone) / fs
tone = np.sin(2 * np.pi * f_hz * t).astype(np.float32)
fade_n = max(1, int(0.005 * fs))
w = np.ones_like(tone)
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False)
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False)
ref = (volume * tone * w).astype(np.float32)
out = np.concatenate([np.zeros(n_pre, dtype=np.float32), ref, np.zeros(n_post, dtype=np.float32)])
return out, ref, n_pre
def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray):
"""Normierte Kreuzkorrelation; liefert Onset-Index und Confidence."""
x = signal.astype(np.float64)
r = ref.astype(np.float64)
M, N = len(r), len(x)
if N < M + 1:
return 0, np.array([0.0]), 0.0
# einfache Vor-Whitening (Hochpass) stabilisiert
xw = np.concatenate([[x[0]], x[1:] - 0.97 * x[:-1]])
rw = np.concatenate([[r[0]], r[1:] - 0.97 * r[:-1]])
corr = np.correlate(xw, rw, mode="valid")
x2 = xw**2
cs = np.concatenate([[0.0], np.cumsum(x2)])
E_x = cs[M:] - cs[:-M]
E_r = np.sum(rw**2) + 1e-20
nrm = np.sqrt(E_x * E_r) + 1e-20
nxc = corr / nrm
k = int(np.argmax(nxc))
conf = float(nxc[k])
return k, nxc, conf
def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
indev: int | None, outdev: int | None,
pre_silence: float = 0.20, post_silence: float = 0.40):
"""Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück."""
play_buf, ref, _ = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence)
record_buf = []
written = 0
times = Times()
def cb(indata, outdata, frames, time_info, status):
nonlocal written, times
if status:
# Ausgabe nur informativ; Xruns etc. beeinflussen Latenz
print(status, flush=True)
if times.adc_first_time is None:
times.adc_first_time = time_info.inputBufferAdcTime
chunk = play_buf[written:written+frames]
out = np.zeros((frames,), dtype=np.float32)
if len(chunk) > 0:
out[:len(chunk)] = chunk
if times.dac_first_time is None and np.any(out != 0.0):
first_nz = int(np.argmax(out != 0.0))
times.dac_first_time = time_info.outputBufferDacTime + first_nz / fs
outdata[:] = out.reshape(-1, 1)
record_buf.append(indata.copy().reshape(-1))
written += frames
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
if indev is not None or outdev is not None:
stream_kwargs["device"] = (indev, outdev)
with sd.Stream(callback=cb, **stream_kwargs):
sd.sleep(int(1000 * (len(play_buf) / fs)))
sd.sleep(200)
if times.adc_first_time is None or times.dac_first_time is None or not record_buf:
return np.nan, 0.0
rec = np.concatenate(record_buf).astype(np.float32)
onset_idx, _, conf = detect_onset_xcorr(rec, ref)
adc_detect_time = times.adc_first_time + onset_idx / fs
latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0
return float(latency_ms), conf
# ---------- Sweep-Runner & Plot ----------
def run_sweep(freqs, repeats, fs, dur, vol, indev, outdev, conf_min):
results = {float(f): [] for f in freqs}
confs = {float(f): [] for f in freqs}
for f in freqs:
for _ in range(repeats):
lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev)
results[float(f)].append(lat_ms)
confs[float(f)].append(conf)
print(f"f={f:.1f} Hz -> latency={lat_ms:.2f} ms conf={conf:.3f}")
# Markiere zu schwache Messungen (Confidence)
bad = sum((np.isnan(v) or c < conf_min) for arr, carr in zip(results.values(), confs.values()) for v, c in zip(arr, carr))
if bad > 0:
print(f"Warnung: {bad} Messungen mit niedriger Confidence (< {conf_min}) oder NaN.")
return results, confs
def save_csv(path, results, confs):
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["frequency_hz", "trial_index", "latency_ms", "confidence"])
for f_hz, arr in results.items():
for i, lat in enumerate(arr):
w.writerow([f_hz, i, lat, confs[f_hz][i]])
def plot_boxplot(path_png, results, show=False):
freqs = sorted(results.keys())
data = [results[f] for f in freqs]
fig = plt.figure(figsize=(10, 5))
plt.title("Akustische Latenz über Frequenz (Boxplot je 5 Wiederholungen)")
plt.boxplot(data, positions=range(len(freqs)), manage_ticks=False)
# Mittelwerte als Linie
means = [np.nanmean(arr) if len(arr) else np.nan for arr in data]
plt.plot(range(len(freqs)), means, marker="o", linestyle="--", label="Mittelwert")
plt.xticks(range(len(freqs)), [f"{int(f)}" for f in freqs])
plt.xlabel("Frequenz [Hz]")
plt.ylabel("Latenz [ms]")
plt.grid(True, axis="y", alpha=0.3)
plt.legend(loc="best")
plt.tight_layout()
plt.savefig(path_png, dpi=150)
print(f"Plot gespeichert: {path_png}")
if show:
plt.switch_backend("TkAgg")
plt.show()
plt.close(fig)
def parse_freqs(arg: str | None, default_n: int = 10, fmin: float = 300.0, fmax: float = 4800.0):
"""Erzeuge logarithmisch verteilte Defaults oder parse Kommata-Liste."""
if arg:
return np.array([float(x) for x in arg.split(",") if x.strip() != ""])
# 10 log-spaced Frequenzen (Laptop-Lautsprecher-freundlich)
return np.round(np.geomspace(fmin, fmax, default_n)).astype(float)
def main():
ap = argparse.ArgumentParser(description="Mehrfachmessung akustischer Latenz über Frequenzpunkte")
ap.add_argument("--freqs", type=str, default=None,
help="Kommagetrennt (z.B. 300,400,600,1000) sonst 10 log-spaced 300..4800 Hz")
ap.add_argument("--repeats", type=int, default=5, help="Wiederholungen je Frequenz")
ap.add_argument("-r", "--samplerate", type=int, default=48_000, help="Samplerate")
ap.add_argument("-t", "--time", type=float, default=0.15, help="Tondauer (s)")
ap.add_argument("-v", "--volume", type=float, default=0.6, help="Lautstärke 0..1")
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex")
ap.add_argument("--csv", type=str, default="latency_results.csv", help="CSV-Dateiname")
ap.add_argument("--png", type=str, default="latency_boxplot.png", help="PNG-Dateiname")
ap.add_argument("--show", action="store_true", help="Plot anzeigen (GUI notwendig)")
ap.add_argument("--conf-min", type=float, default=0.3, help="Warnschwelle für Confidence")
args = ap.parse_args()
freqs = parse_freqs(args.freqs)
print("Frequenzen:", ", ".join(f"{int(f)}" for f in freqs))
results, confs = run_sweep(freqs, args.repeats, args.samplerate, args.time,
args.volume, args.indev, args.outdev, args.conf_min)
save_csv(args.csv, results, confs)
plot_boxplot(args.png, results, args.show)
# Kurze Zusammenfassung
means = {int(f): float(np.nanmean(v)) for f, v in results.items()}
print("Mittelwerte (ms):", means)
if __name__ == "__main__":
main()