Compare commits

...

4 Commits

2 changed files with 855 additions and 73 deletions

251
frequency_quality.py Normal file
View File

@@ -0,0 +1,251 @@
#!/usr/bin/env python3
import argparse
import threading
import numpy as np
import sounddevice as sd
import matplotlib
matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
from matplotlib.widgets import Button, Slider
def generate_log_sweep(f_start: float, f_end: float, dur_s: float, fs: int, volume: float,
pre_silence: float = 0.10, post_silence: float = 0.20) -> tuple[np.ndarray, np.ndarray, int, int]:
n_pre = int(pre_silence * fs)
n_tone = int(dur_s * fs)
n_post = int(post_silence * fs)
t = np.arange(n_tone, dtype=np.float64) / fs
r = float(f_end) / float(f_start)
k = np.log(r)
phase = 2.0 * np.pi * f_start * (np.exp((t / dur_s) * k) - 1.0) * (dur_s / k)
sweep = np.sin(phase).astype(np.float32)
fade_n = max(1, int(0.005 * fs))
if fade_n > 0 and fade_n * 2 < n_tone:
w = np.ones_like(sweep)
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False, dtype=np.float32)
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False, dtype=np.float32)
sweep *= w
sweep *= float(volume)
out = np.concatenate([
np.zeros(n_pre, dtype=np.float32),
sweep,
np.zeros(n_post, dtype=np.float32)
])
return out, sweep, n_pre, n_pre + n_tone
def map_time_to_freq_log(t: np.ndarray, f_start: float, f_end: float, dur_s: float) -> np.ndarray:
r = float(f_end) / float(f_start)
return f_start * np.exp((t / dur_s) * np.log(r))
def analyze_rms_vs_freq(rec: np.ndarray, fs: int, start_idx: int, end_idx: int,
dur_s: float, f_start: float, f_end: float,
frame: int = 2048, hop: int = 1024,
eps: float = 1e-12) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
x = rec.astype(np.float32)
x = x[start_idx:end_idx]
if x.size < frame:
return np.array([], dtype=np.float32), np.array([], dtype=np.float32)
n = x.size
centers = []
vals = []
confs = []
i = 0
while i + frame <= n:
seg = x[i:i+frame]
rms = float(np.sqrt(np.mean(seg.astype(np.float64) ** 2)))
vals.append(20.0 * np.log10(max(rms, eps)))
c = (i + frame * 0.5) / fs
centers.append(c)
# Confidence via spectral peak prominence near expected frequency
win = np.hanning(len(seg)).astype(np.float64)
segw = seg.astype(np.float64) * win
# next power of two for speed
nfft = 1 << (int(len(segw) - 1).bit_length())
spec = np.fft.rfft(segw, n=nfft)
mag = np.abs(spec)
# Expected bin
t_here = c
f_exp = float(map_time_to_freq_log(np.array([t_here]), f_start, f_end, dur_s)[0])
bin_exp = int(np.clip(np.round(f_exp / fs * nfft), 1, (nfft // 2)))
b0 = max(1, bin_exp - 2)
b1 = min(nfft // 2, bin_exp + 2)
peak = float(np.max(mag[b0:b1+1])) if b1 >= b0 else float(mag[bin_exp])
noise = float(np.median(mag[1:])) if mag.size > 1 else peak
snr_db = 20.0 * np.log10((peak + eps) / (noise + eps))
conf = float(np.clip((snr_db - 6.0) / 24.0, 0.0, 1.0)) # 6..30 dB -> 0..1
confs.append(conf)
i += hop
t = np.array(centers, dtype=np.float64)
f = map_time_to_freq_log(t, f_start, f_end, dur_s).astype(np.float32)
db = np.array(vals, dtype=np.float32)
confa = np.array(confs, dtype=np.float32)
return f, db, confa
def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None,
f_start: float = 20.0, f_end: float = 20000.0,
frame: int = 2048, hop: int = 1024,
blocksize: int | None = None, iolatency: float | str | None = "high"):
freqs: list[float] = []
dbfs: list[float] = []
confs: list[float] = []
fig = plt.figure(figsize=(10, 6))
plt.tight_layout(rect=[0, 0.16, 1, 1])
ax_sc = fig.add_axes([0.08, 0.22, 0.84, 0.72])
ax_sc.set_title("Sweep capture: dBFS vs frequency", loc="left")
ax_sc.set_xlabel("frequency [Hz]")
ax_sc.set_ylabel("level [dBFS]")
ax_sc.set_xscale("log")
ax_sc.grid(True, which="both", axis="both", alpha=0.25)
sc = ax_sc.scatter([], [], c=[], cmap="viridis", vmin=0.0, vmax=1.0, s=18, alpha=0.9)
cbar = fig.colorbar(sc, ax=ax_sc, pad=0.02)
cbar.set_label("confidence")
busy = threading.Event()
latest_changed = threading.Event()
lock = threading.Lock()
current_vol = [float(vol)]
def update_plot():
with lock:
if len(freqs) == 0:
ax_sc.set_xlim(f_start, f_end)
ax_sc.set_ylim(-100.0, 0.0)
sc.set_offsets(np.empty((0, 2)))
sc.set_array(np.array([]))
else:
f = np.array(freqs, dtype=float)
y = np.array(dbfs, dtype=float)
xy = np.column_stack([f, y])
sc.set_offsets(xy)
sc.set_array(np.array(confs, dtype=float))
ax_sc.set_xlim(max(10.0, np.nanmin(f)), min(24000.0, np.nanmax(f)))
ymin = np.nanpercentile(y, 2) if np.isfinite(y).any() else -100.0
ymax = np.nanpercentile(y, 98) if np.isfinite(y).any() else 0.0
if not np.isfinite(ymin):
ymin = -100.0
if not np.isfinite(ymax):
ymax = 0.0
if ymax - ymin < 10.0:
ymin = min(ymin, -100.0)
ymax = max(ymax, 0.0)
pad = 0.05 * (ymax - ymin)
ax_sc.set_ylim(ymin - pad, ymax + pad)
fig.canvas.draw_idle()
def run_one_sweep():
play_buf, ref, start_idx, end_idx = generate_log_sweep(
f_start, f_end, dur, fs, current_vol[0], pre_silence=0.10, post_silence=0.20
)
record_buf = []
written = 0
def cb(indata, outdata, frames, time_info, status):
nonlocal written
if status:
print(status, flush=True)
chunk = play_buf[written:written+frames]
out = np.zeros((frames,), dtype=np.float32)
if len(chunk) > 0:
out[:len(chunk)] = chunk
outdata[:] = out.reshape(-1, 1)
record_buf.append(indata.copy().reshape(-1))
written += frames
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
if indev is not None or outdev is not None:
stream_kwargs["device"] = (indev, outdev)
if blocksize is not None:
stream_kwargs["blocksize"] = int(blocksize)
if iolatency is not None:
stream_kwargs["latency"] = iolatency
with sd.Stream(callback=cb, **stream_kwargs):
sd.sleep(int(1000 * (len(play_buf) / fs)))
sd.sleep(100)
if not record_buf:
return np.array([], dtype=np.float32), np.array([], dtype=np.float32), np.array([], dtype=np.float32)
rec = np.concatenate(record_buf).astype(np.float32)
f, y, c = analyze_rms_vs_freq(rec, fs, start_idx, end_idx, dur, f_start, f_end, frame=frame, hop=hop)
return f, y, c
def on_sweep(event):
if busy.is_set():
return
busy.set()
btn_sweep.label.set_text("Running…")
def task():
try:
f, y, c = run_one_sweep()
with lock:
if f.size > 0:
freqs.extend(list(f))
dbfs.extend(list(y))
confs.extend(list(c))
latest_changed.set()
finally:
busy.clear()
btn_sweep.label.set_text("Sweep")
threading.Thread(target=task, daemon=True).start()
sweep_ax = fig.add_axes([0.79, 0.05, 0.15, 0.07])
btn_sweep = Button(sweep_ax, "Sweep")
btn_sweep.on_clicked(on_sweep)
vol_ax = fig.add_axes([0.08, 0.05, 0.45, 0.07])
vol_slider = Slider(vol_ax, 'volume', 0.0, 1.0, valinit=current_vol[0], valstep=0.01)
def on_vol(val):
current_vol[0] = float(val)
vol_slider.on_changed(on_vol)
timer = fig.canvas.new_timer(interval=250)
def on_timer():
if latest_changed.is_set():
latest_changed.clear()
update_plot()
timer.add_callback(on_timer)
timer.start()
update_plot()
plt.show()
def main():
ap = argparse.ArgumentParser(description="Frequency sweep capture: plot recorded dBFS vs frequency")
ap.add_argument("--samplerate", "-r", type=int, default=48_000, help="Sample rate")
ap.add_argument("--time", "-t", type=float, default=8.0, help="Sweep duration (s)")
ap.add_argument("--volume", "-v", type=float, default=0.5, help="Playback volume 0..1")
ap.add_argument("--indev", type=int, default=None, help="Input device index")
ap.add_argument("--outdev", type=int, default=None, help="Output device index")
ap.add_argument("--f-start", type=float, default=20.0, help="Sweep start frequency (Hz)")
ap.add_argument("--f-end", type=float, default=20000.0, help="Sweep end frequency (Hz)")
ap.add_argument("--frame", type=int, default=2048, help="RMS window size (samples)")
ap.add_argument("--hop", type=int, default=1024, help="RMS hop size (samples)")
ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames)")
ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency preset or seconds")
args = ap.parse_args()
run_gui(
fs=args.samplerate,
dur=args.time,
vol=args.volume,
indev=args.indev,
outdev=args.outdev,
f_start=args.f_start,
f_end=args.f_end,
frame=args.frame,
hop=args.hop,
blocksize=args.blocksize,
iolatency=args.iolatency,
)
if __name__ == "__main__":
main()

View File

@@ -7,8 +7,12 @@ import sounddevice as sd
import matplotlib
matplotlib.use("TkAgg") # GUI-Ausgabe für interaktives Fenster
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
import matplotlib.ticker as mticker
from matplotlib.widgets import Button, Slider, CheckButtons
import matplotlib.patches as mpatches
import threading
import time
from datetime import datetime
# ---------- Audio/Signal-Helfer ----------
@dataclass
@@ -32,7 +36,7 @@ def generate_tone(f_hz: float, dur_s: float, fs: int, volume: float,
out = np.concatenate([np.zeros(n_pre, dtype=np.float32), ref, np.zeros(n_post, dtype=np.float32)])
return out, ref, n_pre
def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray):
def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray, pre_len: int | None = None):
"""Normierte Kreuzkorrelation; liefert Onset-Index und Confidence."""
x = signal.astype(np.float64)
r = ref.astype(np.float64)
@@ -50,14 +54,60 @@ def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray):
nrm = np.sqrt(E_x * E_r) + 1e-20
nxc = corr / nrm
k = int(np.argmax(nxc))
conf = float(nxc[k])
peak = float(nxc[k])
# Robust confidence: compare peak to pre-silence baseline distribution
if pre_len is None or pre_len <= M:
base_end = max(1, int(len(nxc) * 0.2))
else:
base_end = max(1, min(len(nxc), int(pre_len - M + 1)))
base = nxc[:base_end]
if base.size <= 1:
conf = peak
else:
med = float(np.median(base))
mad = float(np.median(np.abs(base - med)))
scale = 1.4826 * mad + 1e-6
z = (peak - med) / scale
conf = float(1.0 / (1.0 + np.exp(-0.5 * z)))
return k, nxc, conf
# Simple biquad band-pass (RBJ cookbook) and direct-form I filter
def design_biquad_bandpass(fs: float, f0: float, Q: float) -> tuple[np.ndarray, np.ndarray]:
w0 = 2.0 * np.pi * (f0 / fs)
alpha = np.sin(w0) / (2.0 * Q)
b0 = Q * alpha
b1 = 0.0
b2 = -Q * alpha
a0 = 1.0 + alpha
a1 = -2.0 * np.cos(w0)
a2 = 1.0 - alpha
# Normalize
b = np.array([b0/a0, b1/a0, b2/a0], dtype=np.float64)
a = np.array([1.0, a1/a0, a2/a0], dtype=np.float64)
return b, a
def lfilter_biquad(b: np.ndarray, a: np.ndarray, x: np.ndarray) -> np.ndarray:
y = np.zeros_like(x, dtype=np.float64)
x1 = x2 = y1 = y2 = 0.0
b0, b1, b2 = float(b[0]), float(b[1]), float(b[2])
a1, a2 = float(a[1]), float(a[2])
for i in range(len(x)):
xi = float(x[i])
yi = b0*xi + b1*x1 + b2*x2 - a1*y1 - a2*y2
y[i] = yi
x2, x1 = x1, xi
y2, y1 = y1, yi
return y.astype(np.float32)
def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
indev: int | None, outdev: int | None,
pre_silence: float = 0.20, post_silence: float = 0.40):
pre_silence: float = 0.20, post_silence: float = 0.40,
blocksize: int | None = None, iolatency: float | str | None = None,
estimator: str = "xcorr", xrun_counter: dict | None = None,
bandpass: bool = True, rms_info: dict | None = None,
io_info: dict | None = None, diag: dict | None = None):
"""Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück."""
play_buf, ref, _ = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence)
play_buf, ref, n_pre = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence)
record_buf = []
written = 0
times = Times()
@@ -67,6 +117,27 @@ def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
if status:
# Ausgabe nur informativ; Xruns etc. beeinflussen Latenz
print(status, flush=True)
if xrun_counter is not None:
try:
xrun_counter["count"] = int(xrun_counter.get("count", 0)) + 1
except Exception:
pass
# Input RMS/clip
if rms_info is not None:
try:
rms = float(np.sqrt(np.mean(np.square(indata.astype(np.float64)))))
peak = float(np.max(np.abs(indata)))
rms_db = -120.0 if rms <= 1e-9 else 20.0 * np.log10(rms)
rms_info["rms_dbfs"] = rms_db
rms_info["clip"] = bool(peak >= 0.999)
except Exception:
pass
# Report actual frames-per-buffer used by the stream
if io_info is not None:
try:
io_info["blocksize_actual"] = int(frames)
except Exception:
pass
if times.adc_first_time is None:
times.adc_first_time = time_info.inputBufferAdcTime
@@ -86,19 +157,68 @@ def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float,
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
if indev is not None or outdev is not None:
stream_kwargs["device"] = (indev, outdev)
if blocksize is not None:
stream_kwargs["blocksize"] = int(blocksize)
if iolatency is not None:
stream_kwargs["latency"] = iolatency
with sd.Stream(callback=cb, **stream_kwargs):
sd.sleep(int(1000 * (len(play_buf) / fs)))
sd.sleep(200)
if times.adc_first_time is None or times.dac_first_time is None or not record_buf:
if not record_buf:
return np.nan, 0.0
rec = np.concatenate(record_buf).astype(np.float32)
onset_idx, _, conf = detect_onset_xcorr(rec, ref)
adc_detect_time = times.adc_first_time + onset_idx / fs
latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0
return float(latency_ms), conf
# Optional band-pass around the test tone to increase SNR
if bandpass:
try:
b, a = design_biquad_bandpass(fs=float(fs), f0=float(freq_hz), Q=8.0)
rec = lfilter_biquad(b, a, rec)
except Exception:
pass
onset_idx, nxc, conf = detect_onset_xcorr(rec, ref, n_pre)
# Simple RMS gate: require window RMS to exceed pre-silence RMS
try:
M = len(ref)
base_rms = float(np.sqrt(np.mean(np.square(rec[:max(1, n_pre)])))) + 1e-12
w0 = int(max(0, onset_idx))
w1 = int(min(len(rec), w0 + M))
win_rms = float(np.sqrt(np.mean(np.square(rec[w0:w1])))) if w1 > w0 else 0.0
snr_lin = win_rms / max(base_rms, 1e-12)
if snr_lin < 2.0:
conf = float(min(conf, 0.2))
except Exception:
pass
# Fill diagnostics for visualization if requested
if diag is not None:
try:
diag.clear()
diag.update({
"fs": int(fs),
"play_buf": play_buf.copy(),
"rec": rec.copy(),
"ref": ref.copy(),
"n_pre": int(n_pre),
"onset_idx": int(onset_idx),
"nxc": nxc.copy(),
"bandpass": bool(bandpass)
})
except Exception:
pass
if estimator == "timeinfo":
if times.adc_first_time is None or times.dac_first_time is None:
return np.nan, conf
adc_detect_time = times.adc_first_time + onset_idx / fs
latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0
return float(latency_ms), conf
else: # "xcorr" (default)
latency_samples = max(0, int(onset_idx) - int(n_pre))
latency_ms = (latency_samples / fs) * 1000.0
return float(latency_ms), conf
# ---------- 440-Hz-Runner & Einzel-Balken-Plot ----------
def run_440(repeats, fs, dur, vol, indev, outdev, conf_min):
@@ -141,89 +261,401 @@ def plot_single_bar(latencies: list[float]):
plt.tight_layout()
plt.show()
def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None, conf_min: float):
def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None,
conf_min: float, blocksize: int | None = None, iolatency: float | str | None = None,
estimator: str = "xcorr", pre_silence: float = 0.20, post_silence: float = 0.40,
bandpass: bool = True):
latencies: list[float] = []
confidences: list[float] = []
fig, ax = plt.subplots(figsize=(5, 6))
ax.set_title("Latenz bei 440 Hz")
bar = ax.bar([0], [0.0], color="#4C78A8", width=0.6, label="Mittelwert")
err = ax.errorbar([0], [0.0], yerr=[[0.0], [0.0]], fmt="none", ecolor="#333333", capsize=6, label="Std")
vline = ax.vlines(0, 0.0, 0.0, colors="#E45756", linewidth=3, label="MinMax")
ax.set_xticks([0])
ax.set_xticklabels(["440 Hz"])
ax.set_ylim(0.0, 1.0)
ax.set_ylabel("Latenz [ms]")
ax.grid(True, axis="y", alpha=0.3)
ax.legend(loc="best")
txt = ax.text(0.02, 0.96, "Confidence: -", transform=ax.transAxes, ha="left", va="top")
txt_mean = ax.text(0.02, 0.90, "Mean: - ms", transform=ax.transAxes, ha="left", va="top")
plt.tight_layout(rect=[0, 0.1, 1, 1])
fig = plt.figure(figsize=(11, 6))
# Leave more space at bottom for a two-row control area
plt.tight_layout(rect=[0, 0.20, 1, 1])
# Scatterplot of latency vs sample index (top-left)
ax_sc = fig.add_axes([0.05, 0.55, 0.62, 0.42])
ax_sc.set_title("Latency over samples", loc="left")
ax_sc.set_xlabel("sample index")
ax_sc.set_ylabel("latency [ms]")
ax_sc.grid(True, axis="both", alpha=0.25)
# Zero reference line
zero_line = ax_sc.axhline(0.0, color="#999999", linewidth=1, alpha=0.6, zorder=0)
# Two series (legacy, cleared each update) and one confidence-graded scatter
sc_valid, = ax_sc.plot([], [], 'o', color="#4C78A8", markersize=6, label="valid")
sc_low, = ax_sc.plot([], [], 'o', markerfacecolor='none', markeredgecolor="#E45756", markersize=6, label="low/invalid")
sc_conf = ax_sc.scatter([], [], c=[], s=24, cmap='viridis_r', vmin=0.0, vmax=1.0, edgecolors='none', alpha=0.9)
# Legend cleanup: show only rolling mean and last sample
leg = ax_sc.legend([ ], [ ], loc="upper right")
SCATTER_WINDOW = [50] # fixed default: number of last points to display
# Rolling mean and std band (initialized empty)
line_mean, = ax_sc.plot([], [], '-', color="#1f77b4", linewidth=1.5, alpha=0.9, label="rolling mean")
band_poly = [None]
# Latest sample highlight
sc_last, = ax_sc.plot([], [], 'o', color="#2ca02c", markersize=7, label="last")
ann_last = ax_sc.text(0, 0, "", va="bottom", ha="left", fontsize=8, color="#2ca02c")
# Add colorbar for confidence
try:
cbar = fig.colorbar(sc_conf, ax=ax_sc, fraction=0.046, pad=0.04)
cbar.set_label('confidence')
except Exception:
pass
# Duplex waveform panel (below scatter, left)
ax_duplex = fig.add_axes([0.05, 0.25, 0.62, 0.25])
ax_duplex.set_title("Duplex stream (time-domain)", loc="left", fontsize=9)
ax_duplex.set_xlabel("time [ms]")
ax_duplex.set_ylabel("amplitude")
ax_duplex.grid(True, axis="both", alpha=0.25)
line_play, = ax_duplex.plot([], [], '-', color="#4C78A8", linewidth=1.0, label="playout")
line_rec, = ax_duplex.plot([], [], '-', color="#E45756", linewidth=1.0, alpha=0.9, label="record")
v_on = ax_duplex.axvline(0.0, color="#2ca02c", linestyle="--", linewidth=1.0, label="onset")
v_t0 = ax_duplex.axvline(0.0, color="#999999", linestyle=":", linewidth=1.0, label="tone start")
ax_duplex.legend(loc="upper right", fontsize=8)
# Visual box background
ax_duplex.set_facecolor("#fcfcff")
try:
ax_duplex.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_duplex.transAxes,
boxstyle="round,pad=0.01", facecolor="#f7f9ff",
edgecolor="#c6d3f5", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Terminal-style readout panel (right)
ax_log = fig.add_axes([0.73, 0.30, 0.23, 0.60])
ax_log.set_title("Measurements", loc="center", fontsize=10)
ax_log.axis("off")
log_text = ax_log.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8)
LOG_WINDOW = 10 # show last 10 lines; start scrolling after 10
# Visual box
try:
ax_log.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_log.transAxes,
boxstyle="round,pad=0.01", facecolor="#fbfbfb",
edgecolor="#dddddd", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Stats panel (move higher and slightly to the right)
ax_stats = fig.add_axes([0.73, 0.60, 0.18, 0.04])
ax_stats.axis("off")
ax_stats.set_title("Stats", loc="left", fontsize=9)
stats_text = ax_stats.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8)
try:
ax_stats.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_stats.transAxes,
boxstyle="round,pad=0.01", facecolor="#fbfbff",
edgecolor="#dfe6ff", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Hardware/Status panel (just below the moved stats)
ax_hw = fig.add_axes([0.73, 0.46, 0.18, 0.04])
ax_hw.axis("off")
ax_hw.set_title("Hardware", loc="left", fontsize=9)
hw_text = ax_hw.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8)
try:
ax_hw.add_patch(mpatches.FancyBboxPatch((0, 0), 1, 1, transform=ax_hw.transAxes,
boxstyle="round,pad=0.01", facecolor="#fbfffb",
edgecolor="#d8f0d8", linewidth=0.8, zorder=-1, clip_on=False))
except Exception:
pass
# Information box (explains the measurement method). Placed above controls.
ax_info = fig.add_axes([0.2, 0.5, 0.5, 0.20])
ax_info.axis("off")
ax_info.set_title("Method", loc="left", fontsize=9)
# runtime I/O info (used below in info text; updated by stream callback later)
io_info = {"blocksize_actual": None}
info_text_str = (
f"Method details:\n"
f"- Signal: 440 Hz sine, dur={dur:.3f}s, pre={pre_silence:.2f}s, post={post_silence:.2f}s, vol={vol:.2f}; 5 ms fade-in/out.\n"
f"- I/O: full-duplex sd.Stream(fs={fs}, ch=1, dtype=float32, blocksize={io_info['blocksize_actual'] if io_info['blocksize_actual'] is not None else (blocksize if blocksize is not None else 'auto')}, latency={iolatency}).\n"
f"- Band-pass (optional): RBJ biquad centered 440 Hz, Q=8; direct-form I.\n"
f"- Pre-whitening: apply x[n]-0.97*x[n-1] on ref and recording.\n"
f"- Normalized xcorr: corr / sqrt(E_x*E_r) over valid lags; take peak index k and value.\n"
f"- Baseline/confidence: median/MAD of pre-silence nxc; z=(peak-med)/(1.4826*MAD+eps); conf=sigmoid(0.5*z).\n"
f"- SNR gate: window RMS vs pre-silence RMS; if <2x then cap conf≤0.2.\n"
f"- Latency (xcorr): ((k - n_pre)/fs)*1000 ms.\n"
f"- Latency (timeinfo): uses PortAudio DAC/ADC timestamps around onset.\n"
f"- Negatives are invalid → shown as NaN; conf_min and 'include low' control filtering.\n"
f"- Display: zero_offset subtracts current mean; rolling mean/std shown over window."
)
info_box = ax_info.text(
0.0, 1.0, info_text_str,
va="top", ha="left", fontsize=14, wrap=True,
bbox=dict(boxstyle="round", facecolor="#f0f6ff", edgecolor="#4C78A8", alpha=0.9)
)
running = threading.Event()
latest_changed = threading.Event()
lock = threading.Lock()
latest_conf = {"value": float("nan")}
last_diag = {}
info_visible = [True]
current_conf_min = [float(conf_min)]
include_low = [False]
zero_offset = [0.0]
# status/xrun counter shared with stream callback
xrun_counter = {"count": 0}
# input RMS meter shared
rms_info = {"rms_dbfs": float('nan'), "clip": False}
# Resolve device names for display
try:
dev_in_name = sd.query_devices(indev)["name"] if indev is not None else sd.query_devices(sd.default.device[0])["name"]
except Exception:
dev_in_name = str(indev)
try:
dev_out_name = sd.query_devices(outdev)["name"] if outdev is not None else sd.query_devices(sd.default.device[1])["name"]
except Exception:
dev_out_name = str(outdev)
def compute_stats():
data = np.array(latencies, dtype=float)
mean = float(np.nanmean(data)) if data.size else 0.0
std = float(np.nanstd(data, ddof=1)) if data.size > 1 else 0.0
vmin = float(np.nanmin(data)) if data.size else 0.0
vmax = float(np.nanmax(data)) if data.size else 0.0
return mean, std, vmin, vmax
conf = np.array(confidences, dtype=float)
if data.size == 0:
return float('nan'), 0.0, 0.0, 0.0, 0
# When include_low is ON, include all finite samples (even negative latencies)
if include_low[0]:
mask = np.isfinite(data)
else:
mask = np.isfinite(data) & (data >= 0.0)
if conf.size == data.size:
mask &= np.isfinite(conf) & (conf >= current_conf_min[0])
valid = data[mask]
if valid.size == 0:
return float('nan'), 0.0, 0.0, 0.0, 0
mean = float(np.nanmean(valid))
std = float(np.nanstd(valid, ddof=1)) if valid.size > 1 else 0.0
vmin = float(np.nanmin(valid))
vmax = float(np.nanmax(valid))
return mean, std, vmin, vmax, int(valid.size)
def remove_errorbar(container):
if container is None:
return
# container has: lines (list[Line2D] or []), caplines (list[Line2D]), barlinecols (list[LineCollection])
for artist in list(getattr(container, 'lines', []) or []):
try:
artist.remove()
except Exception:
pass
for artist in list(getattr(container, 'caplines', []) or []):
try:
artist.remove()
except Exception:
pass
for artist in list(getattr(container, 'barlinecols', []) or []):
try:
artist.remove()
except Exception:
pass
def compute_stats_all():
data = np.array(latencies, dtype=float)
if data.size == 0:
return float('nan'), 0
# 'All' means all finite samples, including negatives
mask = np.isfinite(data)
allv = data[mask]
if allv.size == 0:
return float('nan'), 0
return float(np.nanmean(allv)), int(allv.size)
# (removed old remove_errorbar helper; no longer needed)
def update_plot():
with lock:
mean, std, vmin, vmax = compute_stats()
bar.patches[0].set_height(mean)
# remove previous errorbar artists and create new ones
prev = nonlocal_err[0]
remove_errorbar(prev)
err_lines = ax.errorbar([0], [mean], yerr=[[std], [std]], fmt="none", ecolor="#333333", capsize=6)
nonlocal_err[0] = err_lines
vline.set_segments([[(0, vmin), (0, vmax)]])
c = latest_conf["value"]
if np.isfinite(c):
txt.set_text(f"Confidence: {c:.3f}")
# Update scatterplot with last N points
n = len(latencies)
if n > 0:
start = max(0, n - SCATTER_WINDOW[0])
idx = np.arange(start, n)
y = np.array(latencies[start:n], dtype=float)
# Apply display zero-offset
y = y - zero_offset[0]
c = np.array(confidences[start:n], dtype=float)
finite = np.isfinite(y)
thr = current_conf_min[0]
is_valid = finite & (y >= 0.0) & np.isfinite(c) & (c >= thr)
is_low = finite & ~is_valid
y_plot = y.copy()
y_plot[~finite] = np.nan
# No artificial floor; allow negatives when zero-offset is applied
# Build display mask respecting include_low and conf_min
if include_low[0]:
disp_mask = (np.isfinite(y_plot))
else:
disp_mask = is_valid
# Update confidence-graded scatter
x_disp = idx[disp_mask]
y_disp = y_plot[disp_mask]
c_disp = c[disp_mask]
if c_disp.size > 0:
offs = np.column_stack([x_disp, y_disp])
sc_conf.set_offsets(offs)
sc_conf.set_array(c_disp.astype(float))
# Marker size scaled by confidence
sizes = 16.0 + 36.0 * np.clip(c_disp.astype(float), 0.0, 1.0)
sc_conf.set_sizes(sizes)
else:
sc_conf.set_offsets(np.empty((0, 2)))
sc_conf.set_array(np.array([], dtype=float))
sc_conf.set_sizes(np.array([], dtype=float))
# Clear legacy series to avoid double plotting
sc_valid.set_data([], [])
sc_low.set_data([], [])
# Y-axis: include zero and any negatives (when offset applied)
# Guard all-NaN window: if no finite data, show default axes and clear rolling overlays
if not np.any(np.isfinite(y_plot)):
ax_sc.set_ylim(0.0, 1.0)
ax_sc.set_xlim(max(0, n - SCATTER_WINDOW[0]), max(SCATTER_WINDOW[0], n))
line_mean.set_data([], [])
if band_poly[0] is not None:
try:
band_poly[0].remove()
except Exception:
pass
band_poly[0] = None
sc_last.set_data([], [])
ann_last.set_text("")
else:
y_max = float(np.nanmax(y_plot))
y_min = float(np.nanmin(y_plot))
y_low = min(0.0, y_min)
y_high = max(1.0, y_max)
pad = 0.05 * (y_high - y_low)
ax_sc.set_ylim(y_low, y_high + pad)
# Use nice ticks and a readable formatter
ax_sc.yaxis.set_major_locator(mticker.MaxNLocator(nbins=6))
ax_sc.yaxis.set_major_formatter(mticker.FormatStrFormatter('%.1f'))
ax_sc.set_xlim(max(0, n - SCATTER_WINDOW[0]), max(SCATTER_WINDOW[0], n))
# Rolling mean/std band on displayed window (only if some finite data)
if np.any(np.isfinite(y_plot)):
win = max(3, min(50, int(max(10, SCATTER_WINDOW[0] * 0.05))))
y_roll = y_plot.copy()
m = np.full_like(y_roll, np.nan, dtype=float)
s = np.full_like(y_roll, np.nan, dtype=float)
for k in range(len(y_roll)):
a = max(0, k - win + 1)
seg = y_roll[a:k+1]
seg = seg[np.isfinite(seg)]
if seg.size >= 2:
m[k] = float(np.nanmean(seg))
s[k] = float(np.nanstd(seg, ddof=1))
elif seg.size == 1:
m[k] = float(seg[0])
s[k] = 0.0
line_mean.set_data(idx, m)
if band_poly[0] is not None:
try:
band_poly[0].remove()
except Exception:
pass
upper = m + s
lower = m - s
band_poly[0] = ax_sc.fill_between(idx, lower, upper, color="#1f77b4", alpha=0.15, linewidth=0)
# Latest sample highlight
last_x = idx[-1]
last_y = y_plot[-1] if np.isfinite(y_plot[-1]) else np.nan
if np.isfinite(last_y):
sc_last.set_data([last_x], [last_y])
ann_last.set_position((last_x, last_y))
ann_last.set_text(f"{last_y:.2f} ms")
else:
sc_last.set_data([], [])
ann_last.set_text("")
# Update duplex waveform (left-bottom)
if last_diag:
try:
fs_d = int(last_diag.get("fs", fs))
rec = np.asarray(last_diag.get("rec", []), dtype=float)
play = np.asarray(last_diag.get("play_buf", []), dtype=float)
n_pre = int(last_diag.get("n_pre", 0))
onset_idx = int(last_diag.get("onset_idx", 0))
M = len(last_diag.get("ref", []))
# Choose a window around onset
w_before = max(M // 2, int(0.03 * fs_d))
w_after = max(int(1.5 * M), int(0.06 * fs_d))
s0 = max(0, onset_idx - w_before)
s1 = min(len(rec), onset_idx + w_after)
if s1 > s0:
t_ms = (np.arange(s0, s1) - onset_idx) * 1000.0 / fs_d
y_rec = rec[s0:s1]
y_play = play[s0:s1] if s1 <= len(play) else play[s0:min(s1, len(play))]
# Ensure same length for plotting
if y_play.shape[0] != (s1 - s0):
y_play = np.pad(y_play, (0, (s1 - s0) - y_play.shape[0]), mode='constant')
line_rec.set_data(t_ms, y_rec)
line_play.set_data(t_ms, y_play)
v_on.set_xdata([0.0, 0.0])
t0_ms = (n_pre - onset_idx) * 1000.0 / fs_d
v_t0.set_xdata([t0_ms, t0_ms])
# Y limits with padding
y_min = float(np.nanmin([np.min(y_rec), np.min(y_play)]) if y_rec.size and y_play.size else -1.0)
y_max = float(np.nanmax([np.max(y_rec), np.max(y_play)]) if y_rec.size and y_play.size else 1.0)
if not np.isfinite(y_min) or not np.isfinite(y_max) or y_min == y_max:
y_min, y_max = -1.0, 1.0
pad = 0.05 * (y_max - y_min)
ax_duplex.set_xlim(t_ms[0], t_ms[-1])
ax_duplex.set_ylim(y_min - pad, y_max + pad)
except Exception:
# If anything goes wrong, clear the duplex plot gracefully
line_rec.set_data([], [])
line_play.set_data([], [])
# Update rolling terminal (right)
lines = []
thr = current_conf_min[0]
for i, (lat, conf) in enumerate(zip(latencies, confidences)):
lat_ok = np.isfinite(lat)
conf_ok = np.isfinite(conf) and (conf >= thr)
flag = "OK " if (lat_ok and lat >= 0.0 and conf_ok) else ("LOW" if np.isfinite(conf) else "NA ")
lat_str = f"{lat:8.2f}" if np.isfinite(lat) else " NaN"
conf_str = f"{conf:5.3f}" if np.isfinite(conf) else " NaN"
lines.append(f"{i:04d} | {lat_str} ms | conf={conf_str} | {flag}")
log_text.set_text("\n".join(lines[-LOG_WINDOW:]))
# Update stats panel (respect filters like the scatter). Stats use zero-offset adjusted values.
data_all = np.array(latencies, dtype=float)
conf_all = np.array(confidences, dtype=float)
thr = current_conf_min[0]
if include_low[0]:
msk = np.isfinite(data_all)
else:
txt.set_text("Confidence: -")
if np.isfinite(mean):
txt_mean.set_text(f"Mean: {mean:.2f} ms")
msk = np.isfinite(data_all) & (data_all >= 0.0)
if conf_all.size == data_all.size:
msk &= np.isfinite(conf_all) & (conf_all >= thr)
n_sel = int(np.sum(msk))
if n_sel >= 1:
adj = data_all - zero_offset[0]
mean_val = float(np.nanmean(adj[msk]))
std_val = float(np.nanstd(adj[msk], ddof=1)) if n_sel >= 2 else 0.0
stats_text.set_text(f"N={n_sel} mean={mean_val:.2f} ms std={std_val:.2f} ms")
else:
txt_mean.set_text("Mean: - ms")
ymax = max(1.0, vmax)
ax.set_ylim(0.0, ymax * 1.1)
stats_text.set_text("N=0 mean=-- std=--")
# Update hardware/status panel
hw_lines = [
f"fs={fs} Hz, win={SCATTER_WINDOW[0]}",
f"conf_min={current_conf_min[0]:.2f}, include_low={'on' if include_low[0] else 'off'}",
f"estimator={estimator}",
f"indev={indev} ({dev_in_name})",
f"outdev={outdev} ({dev_out_name})",
f"blocksize={io_info['blocksize_actual'] if io_info['blocksize_actual'] is not None else (blocksize if blocksize is not None else 'auto')}",
f"iolatency={iolatency if iolatency is not None else 'default'}",
f"pre={pre_silence:.2f}s, post={post_silence:.2f}s",
f"xruns={xrun_counter['count']}",
f"inRMS={rms_info['rms_dbfs']:.1f} dBFS, clip={'YES' if rms_info['clip'] else 'no'}",
f"bandpass={'on' if bandpass else 'off'}",
f"zero_offset={zero_offset[0]:.2f} ms"
]
hw_text.set_text("\n".join(hw_lines))
fig.canvas.draw_idle()
def worker():
f = 440.0
while running.is_set():
lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev)
local_diag = {}
lat_ms, conf = measure_latency_once(
f, fs, dur, vol, indev, outdev,
pre_silence=pre_silence, post_silence=post_silence,
blocksize=blocksize, iolatency=iolatency,
estimator=estimator, xrun_counter=xrun_counter,
bandpass=bandpass, rms_info=rms_info, io_info=io_info, diag=local_diag
)
with lock:
latencies.append(lat_ms)
# Negative latencies are physically impossible -> mark as invalid (NaN)
if np.isfinite(lat_ms) and lat_ms < 0.0:
latencies.append(np.nan)
else:
latencies.append(lat_ms)
confidences.append(conf)
latest_conf["value"] = conf
last_diag.clear()
last_diag.update(local_diag)
latest_changed.set()
def on_start(event):
@@ -237,14 +669,103 @@ def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | No
def on_stop(event):
running.clear()
start_ax = fig.add_axes([0.58, 0.02, 0.15, 0.06])
stop_ax = fig.add_axes([0.77, 0.02, 0.15, 0.06])
# (removed middle window slider control)
# Controls, two rows (no overlap)
slider_ax = fig.add_axes([0.08, 0.02, 0.46, 0.06])
cbox_ax = fig.add_axes([0.6, 0.02, 0.12, 0.06])
info_ax = fig.add_axes([0.75, 0.02, 0.08, 0.06])
start_ax = fig.add_axes([0.08, 0.10, 0.10, 0.08])
stop_ax = fig.add_axes([0.20, 0.10, 0.10, 0.08])
reset_ax = fig.add_axes([0.32, 0.10, 0.08, 0.08])
save_ax = fig.add_axes([0.42, 0.10, 0.08, 0.08])
zero_ax = fig.add_axes([0.52, 0.10, 0.10, 0.08])
zero_clr_ax = fig.add_axes([0.64, 0.10, 0.12, 0.08])
btn_info = Button(info_ax, "Info")
btn_start = Button(start_ax, "Start")
btn_stop = Button(stop_ax, "Stop")
btn_reset = Button(reset_ax, "Clr")
btn_save = Button(save_ax, "Save")
btn_zero = Button(zero_ax, "Zero")
btn_zero_clr = Button(zero_clr_ax, "ZeroClr")
btn_start.on_clicked(on_start)
btn_stop.on_clicked(on_stop)
def on_info(event):
info_visible[0] = not info_visible[0]
ax_info.set_visible(info_visible[0])
fig.canvas.draw_idle()
btn_info.on_clicked(on_info)
nonlocal_err = [err]
# Now create Slider and CheckButtons after their axes exist
slider = Slider(slider_ax, 'conf_min', 0.0, 1.0, valinit=current_conf_min[0], valstep=0.01)
def on_slider(val):
current_conf_min[0] = float(val)
update_plot()
slider.on_changed(on_slider)
cbox = CheckButtons(cbox_ax, ["include low"], [include_low[0]])
def on_cbox(label):
include_low[0] = not include_low[0]
update_plot()
cbox.on_clicked(on_cbox)
def on_reset(event):
with lock:
latencies.clear()
confidences.clear()
latest_conf["value"] = float("nan")
zero_offset[0] = 0.0
update_plot()
btn_reset.on_clicked(on_reset)
def on_zero(event):
# Set zero_offset to current mean of selected (masked) samples
data_all = np.array(latencies, dtype=float)
conf_all = np.array(confidences, dtype=float)
thr = current_conf_min[0]
if include_low[0]:
msk = np.isfinite(data_all)
else:
msk = np.isfinite(data_all) & (data_all >= 0.0)
if conf_all.size == data_all.size:
msk &= np.isfinite(conf_all) & (conf_all >= thr)
if np.any(msk):
zero_offset[0] = float(np.nanmean(data_all[msk]))
else:
zero_offset[0] = 0.0
update_plot()
btn_zero.on_clicked(on_zero)
def on_zero_clr(event):
zero_offset[0] = 0.0
update_plot()
btn_zero_clr.on_clicked(on_zero_clr)
def on_save(event):
# Save current measurements to CSV with timestamped filename
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
fname = f"latency_{ts}.csv"
try:
with open(fname, 'w', encoding='utf-8') as fcsv:
fcsv.write("# latency_440 export\n")
fcsv.write(f"# fs,{fs}\n")
fcsv.write(f"# blocksize,{blocksize}\n")
fcsv.write(f"# iolatency,{iolatency}\n")
fcsv.write(f"# estimator,{estimator}\n")
fcsv.write(f"# pre_silence,{pre_silence}\n")
fcsv.write(f"# post_silence,{post_silence}\n")
fcsv.write(f"# bandpass,{bandpass}\n")
fcsv.write("index,latency_ms,confidence\n")
with lock:
for i, (lat, conf) in enumerate(zip(latencies, confidences)):
lv = '' if not np.isfinite(lat) else f"{lat:.6f}"
cv = '' if not np.isfinite(conf) else f"{conf:.6f}"
fcsv.write(f"{i},{lv},{cv}\n")
except Exception as e:
print(f"Save failed: {e}")
btn_save.on_clicked(on_save)
# (no old errorbar state to keep)
timer = fig.canvas.new_timer(interval=200)
def on_timer():
@@ -264,10 +785,20 @@ def main():
ap.add_argument("-v", "--volume", type=float, default=0.6, help="Lautstärke 0..1")
ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex")
ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex")
ap.add_argument("--conf-min", type=float, default=0.3, help="Warnschwelle für Confidence")
ap.add_argument("--conf-min", type=float, default=0.9, help="Warnschwelle für Confidence")
ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames), e.g. 1024/2048")
ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency (seconds or preset: 'low','high')")
ap.add_argument("--estimator", type=str, choices=["xcorr","timeinfo"], default="xcorr", help="Latency estimator: 'xcorr' (default, robust) or 'timeinfo' (host timestamps)")
ap.add_argument("--pre-silence", type=float, default=0.30, help="Pre-silence before tone (s)")
ap.add_argument("--post-silence", type=float, default=0.60, help="Post-silence after tone (s)")
ap.add_argument("--bandpass", action='store_true', default=True, help="Apply 440 Hz band-pass before correlation")
ap.add_argument("--no-bandpass", dest='bandpass', action='store_false', help="Disable band-pass prefilter")
args = ap.parse_args()
run_gui(args.samplerate, args.time, args.volume, args.indev, args.outdev, args.conf_min)
run_gui(args.samplerate, args.time, args.volume, args.indev, args.outdev,
args.conf_min, blocksize=args.blocksize, iolatency=args.iolatency,
estimator=args.estimator, pre_silence=args.pre_silence,
post_silence=args.post_silence, bandpass=args.bandpass)
if __name__ == "__main__":
main()