Files
latency_test_suit/latency_440.py

805 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import argparse
import csv
from dataclasses import dataclass
import numpy as np
import sounddevice as sd
import matplotlib
matplotlib.use("TkAgg") # GUI-Ausgabe für interaktives Fenster
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
from matplotlib.widgets import Button, Slider, CheckButtons
import matplotlib.patches as mpatches
import threading
import time
from datetime import datetime
# ---------- Audio/Signal-Helfer ----------
@dataclass
class Times:
dac_first_time: float | None = None
adc_first_time: float | None = None
def generate_tone(f_hz: float, dur_s: float, fs: int, volume: float,
pre_silence: float = 0.20, post_silence: float = 0.40):
"""Stille + Sinus + Stille (mit 5ms Fade-in/out)."""
n_pre = int(pre_silence * fs)
n_tone = int(dur_s * fs)
n_post = int(post_silence * fs)
t = np.arange(n_tone) / fs
tone = np.sin(2 * np.pi * f_hz * t).astype(np.float32)
fade_n = max(1, int(0.005 * fs))
w = np.ones_like(tone)
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False)
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False)
ref = (volume * tone * w).astype(np.float32)
out = np.concatenate([np.zeros(n_pre, dtype=np.float32), ref, np.zeros(n_post, dtype=np.float32)])
return out, ref, n_pre
def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray, pre_len: int | None = None):
"""Normierte Kreuzkorrelation; liefert Onset-Index und Confidence."""
x = signal.astype(np.float64)
r = ref.astype(np.float64)
M, N = len(r), len(x)
if N < M + 1:
return 0, np.array([0.0]), 0.0
# einfache Vor-Whitening (Hochpass) stabilisiert
xw = np.concatenate([[x[0]], x[1:] - 0.97 * x[:-1]])
rw = np.concatenate([[r[0]], r[1:] - 0.97 * r[:-1]])
corr = np.correlate(xw, rw, mode="valid")
x2 = xw**2
cs = np.concatenate([[0.0], np.cumsum(x2)])
E_x = cs[M:] - cs[:-M]
E_r = np.sum(rw**2) + 1e-20
nrm = np.sqrt(E_x * E_r) + 1e-20
nxc = corr / nrm
k = int(np.argmax(nxc))
peak = float(nxc[k])
# Robust confidence: compare peak to pre-silence baseline distribution
if pre_len is None or pre_len <= M:
base_end = max(1, int(len(nxc) * 0.2))
else:
base_end = max(1, min(len(nxc), int(pre_len - M + 1)))
base = nxc[:base_end]
if base.size <= 1:
conf = peak
else:
med = float(np.median(base))
mad = float(np.median(np.abs(base - med)))
scale = 1.4826 * mad + 1e-6
z = (peak - med) / scale
conf = float(1.0 / (1.0 + np.exp(-0.5 * z)))
return k, nxc, conf
# Simple biquad band-pass (RBJ cookbook) and direct-form I filter
def design_biquad_bandpass(fs: float, f0: float, Q: float) -> tuple[np.ndarray, np.ndarray]:
w0 = 2.0 * np.pi * (f0 / fs)
alpha = np.sin(w0) / (2.0 * Q)
b0 = Q * alpha
b1 = 0.0
b2 = -Q * alpha
a0 = 1.0 + alpha
a1 = -2.0 * np.cos(w0)
a2 = 1.0 - alpha
# Normalize
b = np.array([b0/a0, b1/a0, b2/a0], dtype=np.float64)
a = np.array([1.0, a1/a0, a2/a0], dtype=np.float64)
return b, a
def lfilter_biquad(b: np.ndarray, a: np.ndarray, x: np.ndarray) -> np.ndarray:
y = np.zeros_like(x, dtype=np.float64)
x1 = x2 = y1 = y2 = 0.0
b0, b1, b2 = float(b[0]), float(b[1]), float(b[2])
a1, a2 = float(a[1]), float(a[2])
for i in range(len(x)):
xi = float(x[i])
yi = b0*xi + b1*x1 + b2*x2 - a1*y1 - a2*y2
y[i] = yi
x2, x1 = x1, xi
y2, y1 = y1, yi
return y.astype(np.float32)
def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
indev: int | None, outdev: int | None,
pre_silence: float = 0.20, post_silence: float = 0.40,
blocksize: int | None = None, iolatency: float | str | None = None,
estimator: str = "xcorr", xrun_counter: dict | None = None,
bandpass: bool = True, rms_info: dict | None = None,
io_info: dict | None = None, diag: dict | None = None):
"""Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück."""
play_buf, ref, n_pre = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence)
record_buf = []
written = 0
times = Times()
def cb(indata, outdata, frames, time_info, status):
nonlocal written, times
if status:
# Ausgabe nur informativ; Xruns etc. beeinflussen Latenz
print(status, flush=True)
if xrun_counter is not None:
try:
xrun_counter["count"] = int(xrun_counter.get("count", 0)) + 1
except Exception:
pass
# Input RMS/clip
if rms_info is not None:
try:
rms = float(np.sqrt(np.mean(np.square(indata.astype(np.float64)))))
peak = float(np.max(np.abs(indata)))
rms_db = -120.0 if rms <= 1e-9 else 20.0 * np.log10(rms)
rms_info["rms_dbfs"] = rms_db
rms_info["clip"] = bool(peak >= 0.999)
except Exception:
pass
# Report actual frames-per-buffer used by the stream
if io_info is not None:
try:
io_info["blocksize_actual"] = int(frames)
except Exception:
pass
if times.adc_first_time is None:
times.adc_first_time = time_info.inputBufferAdcTime
chunk = play_buf[written:written+frames]
out = np.zeros((frames,), dtype=np.float32)
if len(chunk) > 0:
out[:len(chunk)] = chunk
if times.dac_first_time is None and np.any(out != 0.0):
first_nz = int(np.argmax(out != 0.0))
times.dac_first_time = time_info.outputBufferDacTime + first_nz / fs
outdata[:] = out.reshape(-1, 1)
record_buf.append(indata.copy().reshape(-1))
written += frames
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
if indev is not None or outdev is not None:
stream_kwargs["device"] = (indev, outdev)
if blocksize is not None:
stream_kwargs["blocksize"] = int(blocksize)
if iolatency is not None:
stream_kwargs["latency"] = iolatency
with sd.Stream(callback=cb, **stream_kwargs):
sd.sleep(int(1000 * (len(play_buf) / fs)))
sd.sleep(200)
if not record_buf:
return np.nan, 0.0
rec = np.concatenate(record_buf).astype(np.float32)
# Optional band-pass around the test tone to increase SNR
if bandpass:
try:
b, a = design_biquad_bandpass(fs=float(fs), f0=float(freq_hz), Q=8.0)
rec = lfilter_biquad(b, a, rec)
except Exception:
pass
onset_idx, nxc, conf = detect_onset_xcorr(rec, ref, n_pre)
# Simple RMS gate: require window RMS to exceed pre-silence RMS
try:
M = len(ref)
base_rms = float(np.sqrt(np.mean(np.square(rec[:max(1, n_pre)])))) + 1e-12
w0 = int(max(0, onset_idx))
w1 = int(min(len(rec), w0 + M))
win_rms = float(np.sqrt(np.mean(np.square(rec[w0:w1])))) if w1 > w0 else 0.0
snr_lin = win_rms / max(base_rms, 1e-12)
if snr_lin < 2.0:
conf = float(min(conf, 0.2))
except Exception:
pass
# Fill diagnostics for visualization if requested
if diag is not None:
try:
diag.clear()
diag.update({
"fs": int(fs),
"play_buf": play_buf.copy(),
"rec": rec.copy(),
"ref": ref.copy(),
"n_pre": int(n_pre),
"onset_idx": int(onset_idx),
"nxc": nxc.copy(),
"bandpass": bool(bandpass)
})
except Exception:
pass
if estimator == "timeinfo":
if times.adc_first_time is None or times.dac_first_time is None:
return np.nan, conf
adc_detect_time = times.adc_first_time + onset_idx / fs
latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0
return float(latency_ms), conf
else: # "xcorr" (default)
latency_samples = max(0, int(onset_idx) - int(n_pre))
latency_ms = (latency_samples / fs) * 1000.0
return float(latency_ms), conf
# ---------- 440-Hz-Runner & Einzel-Balken-Plot ----------
def run_440(repeats, fs, dur, vol, indev, outdev, conf_min):
f = 440.0
latencies: list[float] = []
confidences: list[float] = []
for i in range(repeats):
lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev)
latencies.append(lat_ms)
confidences.append(conf)
print(f"Try {i+1}/{repeats}: f=440 Hz -> latency={lat_ms:.2f} ms conf={conf:.3f}")
bad = sum((np.isnan(v) or c < conf_min) for v, c in zip(latencies, confidences))
if bad > 0:
print(f"Warnung: {bad} Messungen mit niedriger Confidence (< {conf_min}) oder NaN.")
return latencies, confidences
def plot_single_bar(latencies: list[float]):
data = np.array(latencies, dtype=float)
mean = float(np.nanmean(data)) if data.size else np.nan
std = float(np.nanstd(data, ddof=1)) if data.size > 1 else 0.0
vmin = float(np.nanmin(data)) if data.size else 0.0
vmax = float(np.nanmax(data)) if data.size else 0.0
fig, ax = plt.subplots(figsize=(5, 6))
ax.set_title("Latenz bei 440 Hz")
# Einzelner Balken bei x=0
ax.bar([0], [mean], color="#4C78A8", width=0.6, label="Mittelwert")
# Fehlerbalken = Standardabweichung
ax.errorbar([0], [mean], yerr=[[std], [std]], fmt="none", ecolor="#333333", capsize=6, label="Std")
# Spannweite min..max als vertikale Linie
ax.vlines(0, vmin, vmax, colors="#E45756", linewidth=3, label="MinMax")
ax.set_xticks([0])
ax.set_xticklabels(["440 Hz"])
# y-Achse startet immer bei 0
ymax = max(1.0, (vmax if np.isfinite(vmax) else 0.0))
ax.set_ylim(0.0, ymax * 1.1)
ax.set_ylabel("Latenz [ms]")
ax.grid(True, axis="y", alpha=0.3)
ax.legend(loc="best")
plt.tight_layout()
plt.show()
def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None,
conf_min: float, blocksize: int | None = None, iolatency: float | str | None = None,
estimator: str = "xcorr", pre_silence: float = 0.20, post_silence: float = 0.40,
bandpass: bool = True):
latencies: list[float] = []
confidences: list[float] = []
fig = plt.figure(figsize=(11, 6))
# Leave more space at bottom for a two-row control area
plt.tight_layout(rect=[0, 0.20, 1, 1])
# Scatterplot of latency vs sample index (top-left)
ax_sc = fig.add_axes([0.05, 0.55, 0.62, 0.42])
ax_sc.set_title("Latency over samples", loc="left")
ax_sc.set_xlabel("sample index")
ax_sc.set_ylabel("latency [ms]")
ax_sc.grid(True, axis="both", alpha=0.25)
# Zero reference line
zero_line = ax_sc.axhline(0.0, color="#999999", linewidth=1, alpha=0.6, zorder=0)
# Two series (legacy, cleared each update) and one confidence-graded scatter
sc_valid, = ax_sc.plot([], [], 'o', color="#4C78A8", markersize=6, label="valid")
sc_low, = ax_sc.plot([], [], 'o', markerfacecolor='none', markeredgecolor="#E45756", markersize=6, label="low/invalid")
sc_conf = ax_sc.scatter([], [], c=[], s=24, cmap='viridis_r', vmin=0.0, vmax=1.0, edgecolors='none', alpha=0.9)
# Legend cleanup: show only rolling mean and last sample
leg = ax_sc.legend([ ], [ ], loc="upper right")
SCATTER_WINDOW = [50] # fixed default: number of last points to display
# Rolling mean and std band (initialized empty)
line_mean, = ax_sc.plot([], [], '-', color="#1f77b4", linewidth=1.5, alpha=0.9, label="rolling mean")
band_poly = [None]
# Latest sample highlight
sc_last, = ax_sc.plot([], [], 'o', color="#2ca02c", markersize=7, label="last")
ann_last = ax_sc.text(0, 0, "", va="bottom", ha="left", fontsize=8, color="#2ca02c")
# Add colorbar for confidence
try:
cbar = fig.colorbar(sc_conf, ax=ax_sc, fraction=0.046, pad=0.04)
cbar.set_label('confidence')
except Exception:
pass
# Duplex waveform panel (below scatter, left)
ax_duplex = fig.add_axes([0.05, 0.25, 0.62, 0.25])
ax_duplex.set_title("Duplex stream (time-domain)", loc="left", fontsize=9)
ax_duplex.set_xlabel("time [ms]")
ax_duplex.set_ylabel("amplitude")
ax_duplex.grid(True, axis="both", alpha=0.25)
line_play, = ax_duplex.plot([], [], '-', color="#4C78A8", linewidth=1.0, label="playout")
line_rec, = ax_duplex.plot([], [], '-', color="#E45756", linewidth=1.0, alpha=0.9, label="record")
v_on = ax_duplex.axvline(0.0, color="#2ca02c", linestyle="--", linewidth=1.0, label="onset")
v_t0 = ax_duplex.axvline(0.0, color="#999999", linestyle=":", linewidth=1.0, label="tone start")
ax_duplex.legend(loc="upper right", fontsize=8)
# Visual box background
ax_duplex.set_facecolor("#fcfcff")
try:
ax_duplex.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_duplex.transAxes,
boxstyle="round,pad=0.01", facecolor="#f7f9ff",
edgecolor="#c6d3f5", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Terminal-style readout panel (right)
ax_log = fig.add_axes([0.73, 0.30, 0.23, 0.60])
ax_log.set_title("Measurements", loc="center", fontsize=10)
ax_log.axis("off")
log_text = ax_log.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8)
LOG_WINDOW = 10 # show last 10 lines; start scrolling after 10
# Visual box
try:
ax_log.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_log.transAxes,
boxstyle="round,pad=0.01", facecolor="#fbfbfb",
edgecolor="#dddddd", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Stats panel (move higher and slightly to the right)
ax_stats = fig.add_axes([0.73, 0.60, 0.18, 0.04])
ax_stats.axis("off")
ax_stats.set_title("Stats", loc="left", fontsize=9)
stats_text = ax_stats.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8)
try:
ax_stats.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_stats.transAxes,
boxstyle="round,pad=0.01", facecolor="#fbfbff",
edgecolor="#dfe6ff", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Hardware/Status panel (just below the moved stats)
ax_hw = fig.add_axes([0.73, 0.46, 0.18, 0.04])
ax_hw.axis("off")
ax_hw.set_title("Hardware", loc="left", fontsize=9)
hw_text = ax_hw.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8)
try:
ax_hw.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_hw.transAxes,
boxstyle="round,pad=0.01", facecolor="#fbfffb",
edgecolor="#d8f0d8", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Information box (explains the measurement method). Placed above controls.
ax_info = fig.add_axes([0.2, 0.5, 0.5, 0.20])
ax_info.axis("off")
ax_info.set_title("Method", loc="left", fontsize=9)
# runtime I/O info (used below in info text; updated by stream callback later)
io_info = {"blocksize_actual": None}
info_text_str = (
f"Method details:\n"
f"- Signal: 440 Hz sine, dur={dur:.3f}s, pre={pre_silence:.2f}s, post={post_silence:.2f}s, vol={vol:.2f}; 5 ms fade-in/out.\n"
f"- I/O: full-duplex sd.Stream(fs={fs}, ch=1, dtype=float32, blocksize={io_info['blocksize_actual'] if io_info['blocksize_actual'] is not None else (blocksize if blocksize is not None else 'auto')}, latency={iolatency}).\n"
f"- Band-pass (optional): RBJ biquad centered 440 Hz, Q=8; direct-form I.\n"
f"- Pre-whitening: apply x[n]-0.97*x[n-1] on ref and recording.\n"
f"- Normalized xcorr: corr / sqrt(E_x*E_r) over valid lags; take peak index k and value.\n"
f"- Baseline/confidence: median/MAD of pre-silence nxc; z=(peak-med)/(1.4826*MAD+eps); conf=sigmoid(0.5*z).\n"
f"- SNR gate: window RMS vs pre-silence RMS; if <2x then cap conf≤0.2.\n"
f"- Latency (xcorr): ((k - n_pre)/fs)*1000 ms.\n"
f"- Latency (timeinfo): uses PortAudio DAC/ADC timestamps around onset.\n"
f"- Negatives are invalid → shown as NaN; conf_min and 'include low' control filtering.\n"
f"- Display: zero_offset subtracts current mean; rolling mean/std shown over window."
)
info_box = ax_info.text(
0.0, 1.0, info_text_str,
va="top", ha="left", fontsize=14, wrap=True,
bbox=dict(boxstyle="round", facecolor="#f0f6ff", edgecolor="#4C78A8", alpha=0.9)
)
running = threading.Event()
latest_changed = threading.Event()
lock = threading.Lock()
latest_conf = {"value": float("nan")}
last_diag = {}
info_visible = [True]
current_conf_min = [float(conf_min)]
include_low = [False]
zero_offset = [0.0]
# status/xrun counter shared with stream callback
xrun_counter = {"count": 0}
# input RMS meter shared
rms_info = {"rms_dbfs": float('nan'), "clip": False}
# Resolve device names for display
try:
dev_in_name = sd.query_devices(indev)["name"] if indev is not None else sd.query_devices(sd.default.device[0])["name"]
except Exception:
dev_in_name = str(indev)
try:
dev_out_name = sd.query_devices(outdev)["name"] if outdev is not None else sd.query_devices(sd.default.device[1])["name"]
except Exception:
dev_out_name = str(outdev)
def compute_stats():
data = np.array(latencies, dtype=float)
conf = np.array(confidences, dtype=float)
if data.size == 0:
return float('nan'), 0.0, 0.0, 0.0, 0
# When include_low is ON, include all finite samples (even negative latencies)
if include_low[0]:
mask = np.isfinite(data)
else:
mask = np.isfinite(data) & (data >= 0.0)
if conf.size == data.size:
mask &= np.isfinite(conf) & (conf >= current_conf_min[0])
valid = data[mask]
if valid.size == 0:
return float('nan'), 0.0, 0.0, 0.0, 0
mean = float(np.nanmean(valid))
std = float(np.nanstd(valid, ddof=1)) if valid.size > 1 else 0.0
vmin = float(np.nanmin(valid))
vmax = float(np.nanmax(valid))
return mean, std, vmin, vmax, int(valid.size)
def compute_stats_all():
data = np.array(latencies, dtype=float)
if data.size == 0:
return float('nan'), 0
# 'All' means all finite samples, including negatives
mask = np.isfinite(data)
allv = data[mask]
if allv.size == 0:
return float('nan'), 0
return float(np.nanmean(allv)), int(allv.size)
# (removed old remove_errorbar helper; no longer needed)
def update_plot():
with lock:
# Update scatterplot with last N points
n = len(latencies)
if n > 0:
start = max(0, n - SCATTER_WINDOW[0])
idx = np.arange(start, n)
y = np.array(latencies[start:n], dtype=float)
# Apply display zero-offset
y = y - zero_offset[0]
c = np.array(confidences[start:n], dtype=float)
finite = np.isfinite(y)
thr = current_conf_min[0]
is_valid = finite & (y >= 0.0) & np.isfinite(c) & (c >= thr)
is_low = finite & ~is_valid
y_plot = y.copy()
y_plot[~finite] = np.nan
# No artificial floor; allow negatives when zero-offset is applied
# Build display mask respecting include_low and conf_min
if include_low[0]:
disp_mask = (np.isfinite(y_plot))
else:
disp_mask = is_valid
# Update confidence-graded scatter
x_disp = idx[disp_mask]
y_disp = y_plot[disp_mask]
c_disp = c[disp_mask]
if c_disp.size > 0:
offs = np.column_stack([x_disp, y_disp])
sc_conf.set_offsets(offs)
sc_conf.set_array(c_disp.astype(float))
# Marker size scaled by confidence
sizes = 16.0 + 36.0 * np.clip(c_disp.astype(float), 0.0, 1.0)
sc_conf.set_sizes(sizes)
else:
sc_conf.set_offsets(np.empty((0, 2)))
sc_conf.set_array(np.array([], dtype=float))
sc_conf.set_sizes(np.array([], dtype=float))
# Clear legacy series to avoid double plotting
sc_valid.set_data([], [])
sc_low.set_data([], [])
# Y-axis: include zero and any negatives (when offset applied)
# Guard all-NaN window: if no finite data, show default axes and clear rolling overlays
if not np.any(np.isfinite(y_plot)):
ax_sc.set_ylim(0.0, 1.0)
ax_sc.set_xlim(max(0, n - SCATTER_WINDOW[0]), max(SCATTER_WINDOW[0], n))
line_mean.set_data([], [])
if band_poly[0] is not None:
try:
band_poly[0].remove()
except Exception:
pass
band_poly[0] = None
sc_last.set_data([], [])
ann_last.set_text("")
else:
y_max = float(np.nanmax(y_plot))
y_min = float(np.nanmin(y_plot))
y_low = min(0.0, y_min)
y_high = max(1.0, y_max)
pad = 0.05 * (y_high - y_low)
ax_sc.set_ylim(y_low, y_high + pad)
# Use nice ticks and a readable formatter
ax_sc.yaxis.set_major_locator(mticker.MaxNLocator(nbins=6))
ax_sc.yaxis.set_major_formatter(mticker.FormatStrFormatter('%.1f'))
ax_sc.set_xlim(max(0, n - SCATTER_WINDOW[0]), max(SCATTER_WINDOW[0], n))
# Rolling mean/std band on displayed window (only if some finite data)
if np.any(np.isfinite(y_plot)):
win = max(3, min(50, int(max(10, SCATTER_WINDOW[0] * 0.05))))
y_roll = y_plot.copy()
m = np.full_like(y_roll, np.nan, dtype=float)
s = np.full_like(y_roll, np.nan, dtype=float)
for k in range(len(y_roll)):
a = max(0, k - win + 1)
seg = y_roll[a:k+1]
seg = seg[np.isfinite(seg)]
if seg.size >= 2:
m[k] = float(np.nanmean(seg))
s[k] = float(np.nanstd(seg, ddof=1))
elif seg.size == 1:
m[k] = float(seg[0])
s[k] = 0.0
line_mean.set_data(idx, m)
if band_poly[0] is not None:
try:
band_poly[0].remove()
except Exception:
pass
upper = m + s
lower = m - s
band_poly[0] = ax_sc.fill_between(idx, lower, upper, color="#1f77b4", alpha=0.15, linewidth=0)
# Latest sample highlight
last_x = idx[-1]
last_y = y_plot[-1] if np.isfinite(y_plot[-1]) else np.nan
if np.isfinite(last_y):
sc_last.set_data([last_x], [last_y])
ann_last.set_position((last_x, last_y))
ann_last.set_text(f"{last_y:.2f} ms")
else:
sc_last.set_data([], [])
ann_last.set_text("")
# Update duplex waveform (left-bottom)
if last_diag:
try:
fs_d = int(last_diag.get("fs", fs))
rec = np.asarray(last_diag.get("rec", []), dtype=float)
play = np.asarray(last_diag.get("play_buf", []), dtype=float)
n_pre = int(last_diag.get("n_pre", 0))
onset_idx = int(last_diag.get("onset_idx", 0))
M = len(last_diag.get("ref", []))
# Choose a window around onset
w_before = max(M // 2, int(0.03 * fs_d))
w_after = max(int(1.5 * M), int(0.06 * fs_d))
s0 = max(0, onset_idx - w_before)
s1 = min(len(rec), onset_idx + w_after)
if s1 > s0:
t_ms = (np.arange(s0, s1) - onset_idx) * 1000.0 / fs_d
y_rec = rec[s0:s1]
y_play = play[s0:s1] if s1 <= len(play) else play[s0:min(s1, len(play))]
# Ensure same length for plotting
if y_play.shape[0] != (s1 - s0):
y_play = np.pad(y_play, (0, (s1 - s0) - y_play.shape[0]), mode='constant')
line_rec.set_data(t_ms, y_rec)
line_play.set_data(t_ms, y_play)
v_on.set_xdata([0.0, 0.0])
t0_ms = (n_pre - onset_idx) * 1000.0 / fs_d
v_t0.set_xdata([t0_ms, t0_ms])
# Y limits with padding
y_min = float(np.nanmin([np.min(y_rec), np.min(y_play)]) if y_rec.size and y_play.size else -1.0)
y_max = float(np.nanmax([np.max(y_rec), np.max(y_play)]) if y_rec.size and y_play.size else 1.0)
if not np.isfinite(y_min) or not np.isfinite(y_max) or y_min == y_max:
y_min, y_max = -1.0, 1.0
pad = 0.05 * (y_max - y_min)
ax_duplex.set_xlim(t_ms[0], t_ms[-1])
ax_duplex.set_ylim(y_min - pad, y_max + pad)
except Exception:
# If anything goes wrong, clear the duplex plot gracefully
line_rec.set_data([], [])
line_play.set_data([], [])
# Update rolling terminal (right)
lines = []
thr = current_conf_min[0]
for i, (lat, conf) in enumerate(zip(latencies, confidences)):
lat_ok = np.isfinite(lat)
conf_ok = np.isfinite(conf) and (conf >= thr)
flag = "OK " if (lat_ok and lat >= 0.0 and conf_ok) else ("LOW" if np.isfinite(conf) else "NA ")
lat_str = f"{lat:8.2f}" if np.isfinite(lat) else " NaN"
conf_str = f"{conf:5.3f}" if np.isfinite(conf) else " NaN"
lines.append(f"{i:04d} | {lat_str} ms | conf={conf_str} | {flag}")
log_text.set_text("\n".join(lines[-LOG_WINDOW:]))
# Update stats panel (respect filters like the scatter). Stats use zero-offset adjusted values.
data_all = np.array(latencies, dtype=float)
conf_all = np.array(confidences, dtype=float)
thr = current_conf_min[0]
if include_low[0]:
msk = np.isfinite(data_all)
else:
msk = np.isfinite(data_all) & (data_all >= 0.0)
if conf_all.size == data_all.size:
msk &= np.isfinite(conf_all) & (conf_all >= thr)
n_sel = int(np.sum(msk))
if n_sel >= 1:
adj = data_all - zero_offset[0]
mean_val = float(np.nanmean(adj[msk]))
std_val = float(np.nanstd(adj[msk], ddof=1)) if n_sel >= 2 else 0.0
stats_text.set_text(f"N={n_sel} mean={mean_val:.2f} ms std={std_val:.2f} ms")
else:
stats_text.set_text("N=0 mean=-- std=--")
# Update hardware/status panel
hw_lines = [
f"fs={fs} Hz, win={SCATTER_WINDOW[0]}",
f"conf_min={current_conf_min[0]:.2f}, include_low={'on' if include_low[0] else 'off'}",
f"estimator={estimator}",
f"indev={indev} ({dev_in_name})",
f"outdev={outdev} ({dev_out_name})",
f"blocksize={io_info['blocksize_actual'] if io_info['blocksize_actual'] is not None else (blocksize if blocksize is not None else 'auto')}",
f"iolatency={iolatency if iolatency is not None else 'default'}",
f"pre={pre_silence:.2f}s, post={post_silence:.2f}s",
f"xruns={xrun_counter['count']}",
f"inRMS={rms_info['rms_dbfs']:.1f} dBFS, clip={'YES' if rms_info['clip'] else 'no'}",
f"bandpass={'on' if bandpass else 'off'}",
f"zero_offset={zero_offset[0]:.2f} ms"
]
hw_text.set_text("\n".join(hw_lines))
fig.canvas.draw_idle()
def worker():
f = 440.0
while running.is_set():
local_diag = {}
lat_ms, conf = measure_latency_once(
f, fs, dur, vol, indev, outdev,
pre_silence=pre_silence, post_silence=post_silence,
blocksize=blocksize, iolatency=iolatency,
estimator=estimator, xrun_counter=xrun_counter,
bandpass=bandpass, rms_info=rms_info, io_info=io_info, diag=local_diag
)
with lock:
# Negative latencies are physically impossible -> mark as invalid (NaN)
if np.isfinite(lat_ms) and lat_ms < 0.0:
latencies.append(np.nan)
else:
latencies.append(lat_ms)
confidences.append(conf)
latest_conf["value"] = conf
last_diag.clear()
last_diag.update(local_diag)
latest_changed.set()
def on_start(event):
if running.is_set():
return
running.set()
if not hasattr(on_start, "thr") or not on_start.thr.is_alive():
on_start.thr = threading.Thread(target=worker, daemon=True)
on_start.thr.start()
def on_stop(event):
running.clear()
# (removed middle window slider control)
# Controls, two rows (no overlap)
slider_ax = fig.add_axes([0.08, 0.02, 0.46, 0.06])
cbox_ax = fig.add_axes([0.6, 0.02, 0.12, 0.06])
info_ax = fig.add_axes([0.75, 0.02, 0.08, 0.06])
start_ax = fig.add_axes([0.08, 0.10, 0.10, 0.08])
stop_ax = fig.add_axes([0.20, 0.10, 0.10, 0.08])
reset_ax = fig.add_axes([0.32, 0.10, 0.08, 0.08])
save_ax = fig.add_axes([0.42, 0.10, 0.08, 0.08])
zero_ax = fig.add_axes([0.52, 0.10, 0.10, 0.08])
zero_clr_ax = fig.add_axes([0.64, 0.10, 0.12, 0.08])
btn_info = Button(info_ax, "Info")
btn_start = Button(start_ax, "Start")
btn_stop = Button(stop_ax, "Stop")
btn_reset = Button(reset_ax, "Clr")
btn_save = Button(save_ax, "Save")
btn_zero = Button(zero_ax, "Zero")
btn_zero_clr = Button(zero_clr_ax, "ZeroClr")
btn_start.on_clicked(on_start)
btn_stop.on_clicked(on_stop)
def on_info(event):
info_visible[0] = not info_visible[0]
ax_info.set_visible(info_visible[0])
fig.canvas.draw_idle()
btn_info.on_clicked(on_info)
# Now create Slider and CheckButtons after their axes exist
slider = Slider(slider_ax, 'conf_min', 0.0, 1.0, valinit=current_conf_min[0], valstep=0.01)
def on_slider(val):
current_conf_min[0] = float(val)
update_plot()
slider.on_changed(on_slider)
cbox = CheckButtons(cbox_ax, ["include low"], [include_low[0]])
def on_cbox(label):
include_low[0] = not include_low[0]
update_plot()
cbox.on_clicked(on_cbox)
def on_reset(event):
with lock:
latencies.clear()
confidences.clear()
latest_conf["value"] = float("nan")
zero_offset[0] = 0.0
update_plot()
btn_reset.on_clicked(on_reset)
def on_zero(event):
# Set zero_offset to current mean of selected (masked) samples
data_all = np.array(latencies, dtype=float)
conf_all = np.array(confidences, dtype=float)
thr = current_conf_min[0]
if include_low[0]:
msk = np.isfinite(data_all)
else:
msk = np.isfinite(data_all) & (data_all >= 0.0)
if conf_all.size == data_all.size:
msk &= np.isfinite(conf_all) & (conf_all >= thr)
if np.any(msk):
zero_offset[0] = float(np.nanmean(data_all[msk]))
else:
zero_offset[0] = 0.0
update_plot()
btn_zero.on_clicked(on_zero)
def on_zero_clr(event):
zero_offset[0] = 0.0
update_plot()
btn_zero_clr.on_clicked(on_zero_clr)
def on_save(event):
# Save current measurements to CSV with timestamped filename
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
fname = f"latency_{ts}.csv"
try:
with open(fname, 'w', encoding='utf-8') as fcsv:
fcsv.write("# latency_440 export\n")
fcsv.write(f"# fs,{fs}\n")
fcsv.write(f"# blocksize,{blocksize}\n")
fcsv.write(f"# iolatency,{iolatency}\n")
fcsv.write(f"# estimator,{estimator}\n")
fcsv.write(f"# pre_silence,{pre_silence}\n")
fcsv.write(f"# post_silence,{post_silence}\n")
fcsv.write(f"# bandpass,{bandpass}\n")
fcsv.write("index,latency_ms,confidence\n")
with lock:
for i, (lat, conf) in enumerate(zip(latencies, confidences)):
lv = '' if not np.isfinite(lat) else f"{lat:.6f}"
cv = '' if not np.isfinite(conf) else f"{conf:.6f}"
fcsv.write(f"{i},{lv},{cv}\n")
except Exception as e:
print(f"Save failed: {e}")
btn_save.on_clicked(on_save)
# (no old errorbar state to keep)
timer = fig.canvas.new_timer(interval=200)
def on_timer():
if latest_changed.is_set():
latest_changed.clear()
update_plot()
timer.add_callback(on_timer)
timer.start()
plt.show()
def main():
ap = argparse.ArgumentParser(description="Akustische Latenzmessung nur für 440 Hz")
ap.add_argument("--repeats", type=int, default=5, help="Wiederholungen")
ap.add_argument("-r", "--samplerate", type=int, default=48_000, help="Samplerate")
ap.add_argument("-t", "--time", type=float, default=0.15, help="Tondauer (s)")
ap.add_argument("-v", "--volume", type=float, default=0.6, help="Lautstärke 0..1")
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex")
ap.add_argument("--conf-min", type=float, default=0.9, help="Warnschwelle für Confidence")
ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames), e.g. 1024/2048")
ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency (seconds or preset: 'low','high')")
ap.add_argument("--estimator", type=str, choices=["xcorr","timeinfo"], default="xcorr", help="Latency estimator: 'xcorr' (default, robust) or 'timeinfo' (host timestamps)")
ap.add_argument("--pre-silence", type=float, default=0.30, help="Pre-silence before tone (s)")
ap.add_argument("--post-silence", type=float, default=0.60, help="Post-silence after tone (s)")
ap.add_argument("--bandpass", action='store_true', default=True, help="Apply 440 Hz band-pass before correlation")
ap.add_argument("--no-bandpass", dest='bandpass', action='store_false', help="Disable band-pass prefilter")
args = ap.parse_args()
run_gui(args.samplerate, args.time, args.volume, args.indev, args.outdev,
args.conf_min, blocksize=args.blocksize, iolatency=args.iolatency,
estimator=args.estimator, pre_silence=args.pre_silence,
post_silence=args.post_silence, bandpass=args.bandpass)
if __name__ == "__main__":
main()