182 lines
7.8 KiB
Python
182 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import csv
|
||
from dataclasses import dataclass
|
||
import numpy as np
|
||
import sounddevice as sd
|
||
import matplotlib
|
||
matplotlib.use("Agg") # Dateiausgabe ohne GUI; mit --show wird umgestellt
|
||
import matplotlib.pyplot as plt
|
||
|
||
# ---------- Audio/Signal-Helfer ----------
|
||
@dataclass
|
||
class Times:
|
||
dac_first_time: float | None = None
|
||
adc_first_time: float | None = None
|
||
|
||
def generate_tone(f_hz: float, dur_s: float, fs: int, volume: float,
|
||
pre_silence: float = 0.20, post_silence: float = 0.40):
|
||
"""Stille + Sinus + Stille (mit 5ms Fade-in/out)."""
|
||
n_pre = int(pre_silence * fs)
|
||
n_tone = int(dur_s * fs)
|
||
n_post = int(post_silence * fs)
|
||
t = np.arange(n_tone) / fs
|
||
tone = np.sin(2 * np.pi * f_hz * t).astype(np.float32)
|
||
fade_n = max(1, int(0.005 * fs))
|
||
w = np.ones_like(tone)
|
||
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False)
|
||
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False)
|
||
ref = (volume * tone * w).astype(np.float32)
|
||
out = np.concatenate([np.zeros(n_pre, dtype=np.float32), ref, np.zeros(n_post, dtype=np.float32)])
|
||
return out, ref, n_pre
|
||
|
||
def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray):
|
||
"""Normierte Kreuzkorrelation; liefert Onset-Index und Confidence."""
|
||
x = signal.astype(np.float64)
|
||
r = ref.astype(np.float64)
|
||
M, N = len(r), len(x)
|
||
if N < M + 1:
|
||
return 0, np.array([0.0]), 0.0
|
||
# einfache Vor-Whitening (Hochpass) stabilisiert
|
||
xw = np.concatenate([[x[0]], x[1:] - 0.97 * x[:-1]])
|
||
rw = np.concatenate([[r[0]], r[1:] - 0.97 * r[:-1]])
|
||
corr = np.correlate(xw, rw, mode="valid")
|
||
x2 = xw**2
|
||
cs = np.concatenate([[0.0], np.cumsum(x2)])
|
||
E_x = cs[M:] - cs[:-M]
|
||
E_r = np.sum(rw**2) + 1e-20
|
||
nrm = np.sqrt(E_x * E_r) + 1e-20
|
||
nxc = corr / nrm
|
||
k = int(np.argmax(nxc))
|
||
conf = float(nxc[k])
|
||
return k, nxc, conf
|
||
|
||
def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
|
||
indev: int | None, outdev: int | None,
|
||
pre_silence: float = 0.20, post_silence: float = 0.40):
|
||
"""Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück."""
|
||
play_buf, ref, _ = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence)
|
||
record_buf = []
|
||
written = 0
|
||
times = Times()
|
||
|
||
def cb(indata, outdata, frames, time_info, status):
|
||
nonlocal written, times
|
||
if status:
|
||
# Ausgabe nur informativ; Xruns etc. beeinflussen Latenz
|
||
print(status, flush=True)
|
||
if times.adc_first_time is None:
|
||
times.adc_first_time = time_info.inputBufferAdcTime
|
||
|
||
chunk = play_buf[written:written+frames]
|
||
out = np.zeros((frames,), dtype=np.float32)
|
||
if len(chunk) > 0:
|
||
out[:len(chunk)] = chunk
|
||
|
||
if times.dac_first_time is None and np.any(out != 0.0):
|
||
first_nz = int(np.argmax(out != 0.0))
|
||
times.dac_first_time = time_info.outputBufferDacTime + first_nz / fs
|
||
|
||
outdata[:] = out.reshape(-1, 1)
|
||
record_buf.append(indata.copy().reshape(-1))
|
||
written += frames
|
||
|
||
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
|
||
if indev is not None or outdev is not None:
|
||
stream_kwargs["device"] = (indev, outdev)
|
||
|
||
with sd.Stream(callback=cb, **stream_kwargs):
|
||
sd.sleep(int(1000 * (len(play_buf) / fs)))
|
||
sd.sleep(200)
|
||
|
||
if times.adc_first_time is None or times.dac_first_time is None or not record_buf:
|
||
return np.nan, 0.0
|
||
|
||
rec = np.concatenate(record_buf).astype(np.float32)
|
||
onset_idx, _, conf = detect_onset_xcorr(rec, ref)
|
||
adc_detect_time = times.adc_first_time + onset_idx / fs
|
||
latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0
|
||
return float(latency_ms), conf
|
||
|
||
# ---------- Sweep-Runner & Plot ----------
|
||
def run_sweep(freqs, repeats, fs, dur, vol, indev, outdev, conf_min):
|
||
results = {float(f): [] for f in freqs}
|
||
confs = {float(f): [] for f in freqs}
|
||
for f in freqs:
|
||
for _ in range(repeats):
|
||
lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev)
|
||
results[float(f)].append(lat_ms)
|
||
confs[float(f)].append(conf)
|
||
print(f"f={f:.1f} Hz -> latency={lat_ms:.2f} ms conf={conf:.3f}")
|
||
# Markiere zu schwache Messungen (Confidence)
|
||
bad = sum((np.isnan(v) or c < conf_min) for arr, carr in zip(results.values(), confs.values()) for v, c in zip(arr, carr))
|
||
if bad > 0:
|
||
print(f"Warnung: {bad} Messungen mit niedriger Confidence (< {conf_min}) oder NaN.")
|
||
return results, confs
|
||
|
||
def save_csv(path, results, confs):
|
||
with open(path, "w", newline="") as f:
|
||
w = csv.writer(f)
|
||
w.writerow(["frequency_hz", "trial_index", "latency_ms", "confidence"])
|
||
for f_hz, arr in results.items():
|
||
for i, lat in enumerate(arr):
|
||
w.writerow([f_hz, i, lat, confs[f_hz][i]])
|
||
|
||
def plot_boxplot(path_png, results, show=False):
|
||
freqs = sorted(results.keys())
|
||
data = [results[f] for f in freqs]
|
||
fig = plt.figure(figsize=(10, 5))
|
||
plt.title("Akustische Latenz über Frequenz (Boxplot je 5 Wiederholungen)")
|
||
plt.boxplot(data, positions=range(len(freqs)), manage_ticks=False)
|
||
# Mittelwerte als Linie
|
||
means = [np.nanmean(arr) if len(arr) else np.nan for arr in data]
|
||
plt.plot(range(len(freqs)), means, marker="o", linestyle="--", label="Mittelwert")
|
||
plt.xticks(range(len(freqs)), [f"{int(f)}" for f in freqs])
|
||
plt.xlabel("Frequenz [Hz]")
|
||
plt.ylabel("Latenz [ms]")
|
||
plt.grid(True, axis="y", alpha=0.3)
|
||
plt.legend(loc="best")
|
||
plt.tight_layout()
|
||
plt.savefig(path_png, dpi=150)
|
||
print(f"Plot gespeichert: {path_png}")
|
||
if show:
|
||
plt.switch_backend("TkAgg")
|
||
plt.show()
|
||
plt.close(fig)
|
||
|
||
def parse_freqs(arg: str | None, default_n: int = 10, fmin: float = 300.0, fmax: float = 4800.0):
|
||
"""Erzeuge logarithmisch verteilte Defaults oder parse Kommata-Liste."""
|
||
if arg:
|
||
return np.array([float(x) for x in arg.split(",") if x.strip() != ""])
|
||
# 10 log-spaced Frequenzen (Laptop-Lautsprecher-freundlich)
|
||
return np.round(np.geomspace(fmin, fmax, default_n)).astype(float)
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="Mehrfachmessung akustischer Latenz über Frequenzpunkte")
|
||
ap.add_argument("--freqs", type=str, default=None,
|
||
help="Kommagetrennt (z.B. 300,400,600,1000) – sonst 10 log-spaced 300..4800 Hz")
|
||
ap.add_argument("--repeats", type=int, default=5, help="Wiederholungen je Frequenz")
|
||
ap.add_argument("-r", "--samplerate", type=int, default=48_000, help="Samplerate")
|
||
ap.add_argument("-t", "--time", type=float, default=0.15, help="Tondauer (s)")
|
||
ap.add_argument("-v", "--volume", type=float, default=0.6, help="Lautstärke 0..1")
|
||
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
|
||
ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex")
|
||
ap.add_argument("--csv", type=str, default="latency_results.csv", help="CSV-Dateiname")
|
||
ap.add_argument("--png", type=str, default="latency_boxplot.png", help="PNG-Dateiname")
|
||
ap.add_argument("--show", action="store_true", help="Plot anzeigen (GUI notwendig)")
|
||
ap.add_argument("--conf-min", type=float, default=0.3, help="Warnschwelle für Confidence")
|
||
args = ap.parse_args()
|
||
|
||
freqs = parse_freqs(args.freqs)
|
||
print("Frequenzen:", ", ".join(f"{int(f)}" for f in freqs))
|
||
results, confs = run_sweep(freqs, args.repeats, args.samplerate, args.time,
|
||
args.volume, args.indev, args.outdev, args.conf_min)
|
||
save_csv(args.csv, results, confs)
|
||
plot_boxplot(args.png, results, args.show)
|
||
# Kurze Zusammenfassung
|
||
means = {int(f): float(np.nanmean(v)) for f, v in results.items()}
|
||
print("Mittelwerte (ms):", means)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|