diff --git a/frequency_quality.py b/frequency_quality.py new file mode 100644 index 0000000..0037f24 --- /dev/null +++ b/frequency_quality.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +import argparse +import threading +import numpy as np +import sounddevice as sd +import matplotlib +matplotlib.use("TkAgg") +import matplotlib.pyplot as plt +from matplotlib.widgets import Button, Slider + + +def generate_log_sweep(f_start: float, f_end: float, dur_s: float, fs: int, volume: float, + pre_silence: float = 0.10, post_silence: float = 0.20) -> tuple[np.ndarray, np.ndarray, int, int]: + n_pre = int(pre_silence * fs) + n_tone = int(dur_s * fs) + n_post = int(post_silence * fs) + t = np.arange(n_tone, dtype=np.float64) / fs + r = float(f_end) / float(f_start) + k = np.log(r) + phase = 2.0 * np.pi * f_start * (np.exp((t / dur_s) * k) - 1.0) * (dur_s / k) + sweep = np.sin(phase).astype(np.float32) + fade_n = max(1, int(0.005 * fs)) + if fade_n > 0 and fade_n * 2 < n_tone: + w = np.ones_like(sweep) + w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False, dtype=np.float32) + w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False, dtype=np.float32) + sweep *= w + sweep *= float(volume) + out = np.concatenate([ + np.zeros(n_pre, dtype=np.float32), + sweep, + np.zeros(n_post, dtype=np.float32) + ]) + return out, sweep, n_pre, n_pre + n_tone + + +def map_time_to_freq_log(t: np.ndarray, f_start: float, f_end: float, dur_s: float) -> np.ndarray: + r = float(f_end) / float(f_start) + return f_start * np.exp((t / dur_s) * np.log(r)) + + +def analyze_rms_vs_freq(rec: np.ndarray, fs: int, start_idx: int, end_idx: int, + dur_s: float, f_start: float, f_end: float, + frame: int = 2048, hop: int = 1024, + eps: float = 1e-12) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + x = rec.astype(np.float32) + x = x[start_idx:end_idx] + if x.size < frame: + return np.array([], dtype=np.float32), np.array([], dtype=np.float32) + n = x.size + centers = [] + vals = [] + confs = [] + i = 0 + while i + frame <= n: + seg = x[i:i+frame] + rms = float(np.sqrt(np.mean(seg.astype(np.float64) ** 2))) + vals.append(20.0 * np.log10(max(rms, eps))) + c = (i + frame * 0.5) / fs + centers.append(c) + # Confidence via spectral peak prominence near expected frequency + win = np.hanning(len(seg)).astype(np.float64) + segw = seg.astype(np.float64) * win + # next power of two for speed + nfft = 1 << (int(len(segw) - 1).bit_length()) + spec = np.fft.rfft(segw, n=nfft) + mag = np.abs(spec) + # Expected bin + t_here = c + f_exp = float(map_time_to_freq_log(np.array([t_here]), f_start, f_end, dur_s)[0]) + bin_exp = int(np.clip(np.round(f_exp / fs * nfft), 1, (nfft // 2))) + b0 = max(1, bin_exp - 2) + b1 = min(nfft // 2, bin_exp + 2) + peak = float(np.max(mag[b0:b1+1])) if b1 >= b0 else float(mag[bin_exp]) + noise = float(np.median(mag[1:])) if mag.size > 1 else peak + snr_db = 20.0 * np.log10((peak + eps) / (noise + eps)) + conf = float(np.clip((snr_db - 6.0) / 24.0, 0.0, 1.0)) # 6..30 dB -> 0..1 + confs.append(conf) + i += hop + t = np.array(centers, dtype=np.float64) + f = map_time_to_freq_log(t, f_start, f_end, dur_s).astype(np.float32) + db = np.array(vals, dtype=np.float32) + confa = np.array(confs, dtype=np.float32) + return f, db, confa + + +def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None, + f_start: float = 20.0, f_end: float = 20000.0, + frame: int = 2048, hop: int = 1024, + blocksize: int | None = None, iolatency: float | str | None = "high"): + freqs: list[float] = [] + dbfs: list[float] = [] + confs: list[float] = [] + + fig = plt.figure(figsize=(10, 6)) + plt.tight_layout(rect=[0, 0.16, 1, 1]) + + ax_sc = fig.add_axes([0.08, 0.22, 0.84, 0.72]) + ax_sc.set_title("Sweep capture: dBFS vs frequency", loc="left") + ax_sc.set_xlabel("frequency [Hz]") + ax_sc.set_ylabel("level [dBFS]") + ax_sc.set_xscale("log") + ax_sc.grid(True, which="both", axis="both", alpha=0.25) + sc = ax_sc.scatter([], [], c=[], cmap="viridis", vmin=0.0, vmax=1.0, s=18, alpha=0.9) + cbar = fig.colorbar(sc, ax=ax_sc, pad=0.02) + cbar.set_label("confidence") + + busy = threading.Event() + latest_changed = threading.Event() + lock = threading.Lock() + + current_vol = [float(vol)] + + def update_plot(): + with lock: + if len(freqs) == 0: + ax_sc.set_xlim(f_start, f_end) + ax_sc.set_ylim(-100.0, 0.0) + sc.set_offsets(np.empty((0, 2))) + sc.set_array(np.array([])) + else: + f = np.array(freqs, dtype=float) + y = np.array(dbfs, dtype=float) + xy = np.column_stack([f, y]) + sc.set_offsets(xy) + sc.set_array(np.array(confs, dtype=float)) + ax_sc.set_xlim(max(10.0, np.nanmin(f)), min(24000.0, np.nanmax(f))) + ymin = np.nanpercentile(y, 2) if np.isfinite(y).any() else -100.0 + ymax = np.nanpercentile(y, 98) if np.isfinite(y).any() else 0.0 + if not np.isfinite(ymin): + ymin = -100.0 + if not np.isfinite(ymax): + ymax = 0.0 + if ymax - ymin < 10.0: + ymin = min(ymin, -100.0) + ymax = max(ymax, 0.0) + pad = 0.05 * (ymax - ymin) + ax_sc.set_ylim(ymin - pad, ymax + pad) + fig.canvas.draw_idle() + + def run_one_sweep(): + play_buf, ref, start_idx, end_idx = generate_log_sweep( + f_start, f_end, dur, fs, current_vol[0], pre_silence=0.10, post_silence=0.20 + ) + record_buf = [] + written = 0 + + def cb(indata, outdata, frames, time_info, status): + nonlocal written + if status: + print(status, flush=True) + chunk = play_buf[written:written+frames] + out = np.zeros((frames,), dtype=np.float32) + if len(chunk) > 0: + out[:len(chunk)] = chunk + outdata[:] = out.reshape(-1, 1) + record_buf.append(indata.copy().reshape(-1)) + written += frames + + stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1) + if indev is not None or outdev is not None: + stream_kwargs["device"] = (indev, outdev) + if blocksize is not None: + stream_kwargs["blocksize"] = int(blocksize) + if iolatency is not None: + stream_kwargs["latency"] = iolatency + + with sd.Stream(callback=cb, **stream_kwargs): + sd.sleep(int(1000 * (len(play_buf) / fs))) + sd.sleep(100) + + if not record_buf: + return np.array([], dtype=np.float32), np.array([], dtype=np.float32), np.array([], dtype=np.float32) + rec = np.concatenate(record_buf).astype(np.float32) + f, y, c = analyze_rms_vs_freq(rec, fs, start_idx, end_idx, dur, f_start, f_end, frame=frame, hop=hop) + return f, y, c + def on_sweep(event): + if busy.is_set(): + return + busy.set() + btn_sweep.label.set_text("Running…") + def task(): + try: + f, y, c = run_one_sweep() + with lock: + if f.size > 0: + freqs.extend(list(f)) + dbfs.extend(list(y)) + confs.extend(list(c)) + latest_changed.set() + finally: + busy.clear() + btn_sweep.label.set_text("Sweep") + threading.Thread(target=task, daemon=True).start() + + sweep_ax = fig.add_axes([0.79, 0.05, 0.15, 0.07]) + btn_sweep = Button(sweep_ax, "Sweep") + btn_sweep.on_clicked(on_sweep) + + vol_ax = fig.add_axes([0.08, 0.05, 0.45, 0.07]) + vol_slider = Slider(vol_ax, 'volume', 0.0, 1.0, valinit=current_vol[0], valstep=0.01) + + def on_vol(val): + current_vol[0] = float(val) + vol_slider.on_changed(on_vol) + + timer = fig.canvas.new_timer(interval=250) + def on_timer(): + if latest_changed.is_set(): + latest_changed.clear() + update_plot() + timer.add_callback(on_timer) + timer.start() + + update_plot() + plt.show() + + +def main(): + ap = argparse.ArgumentParser(description="Frequency sweep capture: plot recorded dBFS vs frequency") + ap.add_argument("--samplerate", "-r", type=int, default=48_000, help="Sample rate") + ap.add_argument("--time", "-t", type=float, default=8.0, help="Sweep duration (s)") + ap.add_argument("--volume", "-v", type=float, default=0.5, help="Playback volume 0..1") + ap.add_argument("--indev", type=int, default=None, help="Input device index") + ap.add_argument("--outdev", type=int, default=None, help="Output device index") + ap.add_argument("--f-start", type=float, default=20.0, help="Sweep start frequency (Hz)") + ap.add_argument("--f-end", type=float, default=20000.0, help="Sweep end frequency (Hz)") + ap.add_argument("--frame", type=int, default=2048, help="RMS window size (samples)") + ap.add_argument("--hop", type=int, default=1024, help="RMS hop size (samples)") + ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames)") + ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency preset or seconds") + args = ap.parse_args() + + run_gui( + fs=args.samplerate, + dur=args.time, + vol=args.volume, + indev=args.indev, + outdev=args.outdev, + f_start=args.f_start, + f_end=args.f_end, + frame=args.frame, + hop=args.hop, + blocksize=args.blocksize, + iolatency=args.iolatency, + ) + + +if __name__ == "__main__": + main() + diff --git a/latency_440.py b/latency_440.py index 0f6e12e..9ea6fa2 100644 --- a/latency_440.py +++ b/latency_440.py @@ -7,8 +7,11 @@ import sounddevice as sd import matplotlib matplotlib.use("TkAgg") # GUI-Ausgabe für interaktives Fenster import matplotlib.pyplot as plt -from matplotlib.widgets import Button +import matplotlib.ticker as mticker +from matplotlib.widgets import Button, Slider, CheckButtons import threading +import time +from datetime import datetime # ---------- Audio/Signal-Helfer ---------- @dataclass @@ -53,11 +56,43 @@ def detect_onset_xcorr(signal: np.ndarray, ref: np.ndarray): conf = float(nxc[k]) return k, nxc, conf +# Simple biquad band-pass (RBJ cookbook) and direct-form I filter +def design_biquad_bandpass(fs: float, f0: float, Q: float) -> tuple[np.ndarray, np.ndarray]: + w0 = 2.0 * np.pi * (f0 / fs) + alpha = np.sin(w0) / (2.0 * Q) + b0 = Q * alpha + b1 = 0.0 + b2 = -Q * alpha + a0 = 1.0 + alpha + a1 = -2.0 * np.cos(w0) + a2 = 1.0 - alpha + # Normalize + b = np.array([b0/a0, b1/a0, b2/a0], dtype=np.float64) + a = np.array([1.0, a1/a0, a2/a0], dtype=np.float64) + return b, a + +def lfilter_biquad(b: np.ndarray, a: np.ndarray, x: np.ndarray) -> np.ndarray: + y = np.zeros_like(x, dtype=np.float64) + x1 = x2 = y1 = y2 = 0.0 + b0, b1, b2 = float(b[0]), float(b[1]), float(b[2]) + a1, a2 = float(a[1]), float(a[2]) + for i in range(len(x)): + xi = float(x[i]) + yi = b0*xi + b1*x1 + b2*x2 - a1*y1 - a2*y2 + y[i] = yi + x2, x1 = x1, xi + y2, y1 = y1, yi + return y.astype(np.float32) + def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float, indev: int | None, outdev: int | None, - pre_silence: float = 0.20, post_silence: float = 0.40): + pre_silence: float = 0.20, post_silence: float = 0.40, + blocksize: int | None = None, iolatency: float | str | None = None, + estimator: str = "xcorr", xrun_counter: dict | None = None, + bandpass: bool = True, rms_info: dict | None = None, + io_info: dict | None = None): """Spielt einen Ton, nimmt parallel auf, schätzt Latenz in ms, gibt Confidence zurück.""" - play_buf, ref, _ = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence) + play_buf, ref, n_pre = generate_tone(freq_hz, dur_s, fs, volume, pre_silence, post_silence) record_buf = [] written = 0 times = Times() @@ -67,6 +102,27 @@ def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float, if status: # Ausgabe nur informativ; Xruns etc. beeinflussen Latenz print(status, flush=True) + if xrun_counter is not None: + try: + xrun_counter["count"] = int(xrun_counter.get("count", 0)) + 1 + except Exception: + pass + # Input RMS/clip + if rms_info is not None: + try: + rms = float(np.sqrt(np.mean(np.square(indata.astype(np.float64))))) + peak = float(np.max(np.abs(indata))) + rms_db = -120.0 if rms <= 1e-9 else 20.0 * np.log10(rms) + rms_info["rms_dbfs"] = rms_db + rms_info["clip"] = bool(peak >= 0.999) + except Exception: + pass + # Report actual frames-per-buffer used by the stream + if io_info is not None: + try: + io_info["blocksize_actual"] = int(frames) + except Exception: + pass if times.adc_first_time is None: times.adc_first_time = time_info.inputBufferAdcTime @@ -86,19 +142,38 @@ def measure_latency_once(freq_hz: float, fs: int, dur_s: float, volume: float, stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1) if indev is not None or outdev is not None: stream_kwargs["device"] = (indev, outdev) + if blocksize is not None: + stream_kwargs["blocksize"] = int(blocksize) + if iolatency is not None: + stream_kwargs["latency"] = iolatency with sd.Stream(callback=cb, **stream_kwargs): sd.sleep(int(1000 * (len(play_buf) / fs))) sd.sleep(200) - if times.adc_first_time is None or times.dac_first_time is None or not record_buf: + if not record_buf: return np.nan, 0.0 rec = np.concatenate(record_buf).astype(np.float32) + # Optional band-pass around the test tone to increase SNR + if bandpass: + try: + b, a = design_biquad_bandpass(fs=float(fs), f0=float(freq_hz), Q=8.0) + rec = lfilter_biquad(b, a, rec) + except Exception: + pass onset_idx, _, conf = detect_onset_xcorr(rec, ref) - adc_detect_time = times.adc_first_time + onset_idx / fs - latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0 - return float(latency_ms), conf + + if estimator == "timeinfo": + if times.adc_first_time is None or times.dac_first_time is None: + return np.nan, conf + adc_detect_time = times.adc_first_time + onset_idx / fs + latency_ms = (adc_detect_time - times.dac_first_time) * 1000.0 + return float(latency_ms), conf + else: # "xcorr" (default) + latency_samples = max(0, int(onset_idx) - int(n_pre)) + latency_ms = (latency_samples / fs) * 1000.0 + return float(latency_ms), conf # ---------- 440-Hz-Runner & Einzel-Balken-Plot ---------- def run_440(repeats, fs, dur, vol, indev, outdev, conf_min): @@ -141,87 +216,286 @@ def plot_single_bar(latencies: list[float]): plt.tight_layout() plt.show() -def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None, conf_min: float): +def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None, + conf_min: float, blocksize: int | None = None, iolatency: float | str | None = None, + estimator: str = "xcorr", pre_silence: float = 0.20, post_silence: float = 0.40, + bandpass: bool = True): latencies: list[float] = [] confidences: list[float] = [] - fig, ax = plt.subplots(figsize=(5, 6)) - ax.set_title("Latenz bei 440 Hz") - bar = ax.bar([0], [0.0], color="#4C78A8", width=0.6, label="Mittelwert") - err = ax.errorbar([0], [0.0], yerr=[[0.0], [0.0]], fmt="none", ecolor="#333333", capsize=6, label="Std") - vline = ax.vlines(0, 0.0, 0.0, colors="#E45756", linewidth=3, label="Min–Max") - ax.set_xticks([0]) - ax.set_xticklabels(["440 Hz"]) - ax.set_ylim(0.0, 1.0) - ax.set_ylabel("Latenz [ms]") - ax.grid(True, axis="y", alpha=0.3) - ax.legend(loc="best") - txt = ax.text(0.02, 0.96, "Confidence: -", transform=ax.transAxes, ha="left", va="top") - txt_mean = ax.text(0.02, 0.90, "Mean: - ms", transform=ax.transAxes, ha="left", va="top") - plt.tight_layout(rect=[0, 0.1, 1, 1]) + fig = plt.figure(figsize=(11, 6)) + # Leave space at bottom for controls; scatter on left, terminal on right + plt.tight_layout(rect=[0, 0.16, 1, 1]) + + # Scatterplot of latency vs sample index (left) + ax_sc = fig.add_axes([0.08, 0.22, 0.62, 0.72]) + ax_sc.set_title("Latency over samples", loc="left") + ax_sc.set_xlabel("sample index") + ax_sc.set_ylabel("latency [ms]") + ax_sc.grid(True, axis="both", alpha=0.25) + # Zero reference line + zero_line = ax_sc.axhline(0.0, color="#999999", linewidth=1, alpha=0.6, zorder=0) + # Two series (legacy, cleared each update) and one confidence-graded scatter + sc_valid, = ax_sc.plot([], [], 'o', color="#4C78A8", markersize=6, label="valid") + sc_low, = ax_sc.plot([], [], 'o', markerfacecolor='none', markeredgecolor="#E45756", markersize=6, label="low/invalid") + sc_conf = ax_sc.scatter([], [], c=[], s=24, cmap='viridis_r', vmin=0.0, vmax=1.0, edgecolors='none', alpha=0.9) + # Legend cleanup: show only rolling mean and last sample + leg = ax_sc.legend([ ], [ ], loc="upper right") + SCATTER_WINDOW = [50] # fixed default: number of last points to display + # Rolling mean and std band (initialized empty) + line_mean, = ax_sc.plot([], [], '-', color="#1f77b4", linewidth=1.5, alpha=0.9, label="rolling mean") + band_poly = [None] + # Latest sample highlight + sc_last, = ax_sc.plot([], [], 'o', color="#2ca02c", markersize=7, label="last") + ann_last = ax_sc.text(0, 0, "", va="bottom", ha="left", fontsize=8, color="#2ca02c") + # Add colorbar for confidence + try: + cbar = fig.colorbar(sc_conf, ax=ax_sc, fraction=0.046, pad=0.04) + cbar.set_label('confidence') + except Exception: + pass + + # Terminal-style readout panel (right) + ax_log = fig.add_axes([0.73, 0.30, 0.23, 0.60]) + ax_log.set_title("Measurements", loc="center", fontsize=9) + ax_log.axis("off") + log_text = ax_log.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8) + LOG_WINDOW = 10 # show last 10 lines; start scrolling after 10 + + # Stats panel (immediately below terminal) + ax_stats = fig.add_axes([0.73, 0.32, 0.23, 0.02]) + ax_stats.axis("off") + stats_text = ax_stats.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8) + + # Hardware/Status panel (directly below stats) + ax_hw = fig.add_axes([0.73, 0.28, 0.23, 0.02]) + ax_hw.axis("off") + hw_text = ax_hw.text(0.0, 1.0, "", va="top", ha="left", family="monospace", fontsize=8) running = threading.Event() latest_changed = threading.Event() lock = threading.Lock() latest_conf = {"value": float("nan")} + current_conf_min = [float(conf_min)] + include_low = [False] + # status/xrun counter shared with stream callback + xrun_counter = {"count": 0} + # input RMS meter shared + rms_info = {"rms_dbfs": float('nan'), "clip": False} + # runtime I/O info + io_info = {"blocksize_actual": None} + + # Resolve device names for display + try: + dev_in_name = sd.query_devices(indev)["name"] if indev is not None else sd.query_devices(sd.default.device[0])["name"] + except Exception: + dev_in_name = str(indev) + try: + dev_out_name = sd.query_devices(outdev)["name"] if outdev is not None else sd.query_devices(sd.default.device[1])["name"] + except Exception: + dev_out_name = str(outdev) + def compute_stats(): data = np.array(latencies, dtype=float) - mean = float(np.nanmean(data)) if data.size else 0.0 - std = float(np.nanstd(data, ddof=1)) if data.size > 1 else 0.0 - vmin = float(np.nanmin(data)) if data.size else 0.0 - vmax = float(np.nanmax(data)) if data.size else 0.0 - return mean, std, vmin, vmax + conf = np.array(confidences, dtype=float) + if data.size == 0: + return float('nan'), 0.0, 0.0, 0.0, 0 + # When include_low is ON, include all finite samples (even negative latencies) + if include_low[0]: + mask = np.isfinite(data) + else: + mask = np.isfinite(data) & (data >= 0.0) + if conf.size == data.size: + mask &= np.isfinite(conf) & (conf >= current_conf_min[0]) + valid = data[mask] + if valid.size == 0: + return float('nan'), 0.0, 0.0, 0.0, 0 + mean = float(np.nanmean(valid)) + std = float(np.nanstd(valid, ddof=1)) if valid.size > 1 else 0.0 + vmin = float(np.nanmin(valid)) + vmax = float(np.nanmax(valid)) + return mean, std, vmin, vmax, int(valid.size) - def remove_errorbar(container): - if container is None: - return - # container has: lines (list[Line2D] or []), caplines (list[Line2D]), barlinecols (list[LineCollection]) - for artist in list(getattr(container, 'lines', []) or []): - try: - artist.remove() - except Exception: - pass - for artist in list(getattr(container, 'caplines', []) or []): - try: - artist.remove() - except Exception: - pass - for artist in list(getattr(container, 'barlinecols', []) or []): - try: - artist.remove() - except Exception: - pass + def compute_stats_all(): + data = np.array(latencies, dtype=float) + if data.size == 0: + return float('nan'), 0 + # 'All' means all finite samples, including negatives + mask = np.isfinite(data) + allv = data[mask] + if allv.size == 0: + return float('nan'), 0 + return float(np.nanmean(allv)), int(allv.size) + + # (removed old remove_errorbar helper; no longer needed) def update_plot(): with lock: - mean, std, vmin, vmax = compute_stats() - bar.patches[0].set_height(mean) - # remove previous errorbar artists and create new ones - prev = nonlocal_err[0] - remove_errorbar(prev) - err_lines = ax.errorbar([0], [mean], yerr=[[std], [std]], fmt="none", ecolor="#333333", capsize=6) - nonlocal_err[0] = err_lines - vline.set_segments([[(0, vmin), (0, vmax)]]) - c = latest_conf["value"] - if np.isfinite(c): - txt.set_text(f"Confidence: {c:.3f}") + # Update scatterplot with last N points + n = len(latencies) + if n > 0: + start = max(0, n - SCATTER_WINDOW[0]) + idx = np.arange(start, n) + y = np.array(latencies[start:n], dtype=float) + c = np.array(confidences[start:n], dtype=float) + finite = np.isfinite(y) + thr = current_conf_min[0] + is_valid = finite & (y >= 0.0) & np.isfinite(c) & (c >= thr) + is_low = finite & ~is_valid + y_plot = y.copy() + y_plot[~finite] = np.nan + # Visual floor epsilon to avoid points collapsing exactly at 0 + eps = 0.1 # ms + y_plot = np.maximum(y_plot, eps) + # Build display mask respecting include_low and conf_min + if include_low[0]: + disp_mask = (np.isfinite(y_plot)) + else: + disp_mask = is_valid + # Update confidence-graded scatter + x_disp = idx[disp_mask] + y_disp = y_plot[disp_mask] + c_disp = c[disp_mask] + if c_disp.size > 0: + offs = np.column_stack([x_disp, y_disp]) + sc_conf.set_offsets(offs) + sc_conf.set_array(c_disp.astype(float)) + # Marker size scaled by confidence + sizes = 16.0 + 36.0 * np.clip(c_disp.astype(float), 0.0, 1.0) + sc_conf.set_sizes(sizes) + else: + sc_conf.set_offsets(np.empty((0, 2))) + sc_conf.set_array(np.array([], dtype=float)) + sc_conf.set_sizes(np.array([], dtype=float)) + # Clear legacy series to avoid double plotting + sc_valid.set_data([], []) + sc_low.set_data([], []) + # Y-axis starts at 0 with small padding above max + # Guard all-NaN window: if no finite data, show default axes and clear rolling overlays + if not np.any(np.isfinite(y_plot)): + ax_sc.set_ylim(0.0, 1.0) + ax_sc.set_xlim(max(0, n - SCATTER_WINDOW[0]), max(SCATTER_WINDOW[0], n)) + line_mean.set_data([], []) + if band_poly[0] is not None: + try: + band_poly[0].remove() + except Exception: + pass + band_poly[0] = None + sc_last.set_data([], []) + ann_last.set_text("") + else: + y_max = float(np.nanmax(y_plot)) + y_low = 0.0 + y_high = max(1.0, y_max) + pad = 0.05 * (y_high - y_low) + ax_sc.set_ylim(y_low, y_high + pad) + # Use nice ticks and a readable formatter + ax_sc.yaxis.set_major_locator(mticker.MaxNLocator(nbins=6)) + ax_sc.yaxis.set_major_formatter(mticker.FormatStrFormatter('%.1f')) + ax_sc.set_xlim(max(0, n - SCATTER_WINDOW[0]), max(SCATTER_WINDOW[0], n)) + + # Rolling mean/std band on displayed window (only if some finite data) + if np.any(np.isfinite(y_plot)): + win = max(3, min(50, int(max(10, SCATTER_WINDOW[0] * 0.05)))) + y_roll = y_plot.copy() + m = np.full_like(y_roll, np.nan, dtype=float) + s = np.full_like(y_roll, np.nan, dtype=float) + for k in range(len(y_roll)): + a = max(0, k - win + 1) + seg = y_roll[a:k+1] + seg = seg[np.isfinite(seg)] + if seg.size >= 2: + m[k] = float(np.nanmean(seg)) + s[k] = float(np.nanstd(seg, ddof=1)) + elif seg.size == 1: + m[k] = float(seg[0]) + s[k] = 0.0 + line_mean.set_data(idx, m) + if band_poly[0] is not None: + try: + band_poly[0].remove() + except Exception: + pass + upper = m + s + lower = np.maximum(0.0, m - s) + band_poly[0] = ax_sc.fill_between(idx, lower, upper, color="#1f77b4", alpha=0.15, linewidth=0) + + # Latest sample highlight + last_x = idx[-1] + last_y = y_plot[-1] if np.isfinite(y_plot[-1]) else np.nan + if np.isfinite(last_y): + sc_last.set_data([last_x], [last_y]) + ann_last.set_position((last_x, last_y)) + ann_last.set_text(f"{last_y:.2f} ms") + else: + sc_last.set_data([], []) + ann_last.set_text("") + + # Update rolling terminal (right) + lines = [] + thr = current_conf_min[0] + for i, (lat, conf) in enumerate(zip(latencies, confidences)): + lat_ok = np.isfinite(lat) + conf_ok = np.isfinite(conf) and (conf >= thr) + flag = "OK " if (lat_ok and lat >= 0.0 and conf_ok) else ("LOW" if np.isfinite(conf) else "NA ") + lat_str = f"{lat:8.2f}" if np.isfinite(lat) else " NaN" + conf_str = f"{conf:5.3f}" if np.isfinite(conf) else " NaN" + lines.append(f"{i:04d} | {lat_str} ms | conf={conf_str} | {flag}") + log_text.set_text("\n".join(lines[-LOG_WINDOW:])) + + # Update stats panel (respect filters like the scatter) + data_all = np.array(latencies, dtype=float) + conf_all = np.array(confidences, dtype=float) + thr = current_conf_min[0] + if include_low[0]: + msk = np.isfinite(data_all) else: - txt.set_text("Confidence: -") - if np.isfinite(mean): - txt_mean.set_text(f"Mean: {mean:.2f} ms") + msk = np.isfinite(data_all) & (data_all >= 0.0) + if conf_all.size == data_all.size: + msk &= np.isfinite(conf_all) & (conf_all >= thr) + n_sel = int(np.sum(msk)) + if n_sel >= 1: + mean_val = float(np.nanmean(data_all[msk])) + std_val = float(np.nanstd(data_all[msk], ddof=1)) if n_sel >= 2 else 0.0 + stats_text.set_text(f"N={n_sel} mean={mean_val:.2f} ms std={std_val:.2f} ms") else: - txt_mean.set_text("Mean: - ms") - ymax = max(1.0, vmax) - ax.set_ylim(0.0, ymax * 1.1) + stats_text.set_text("N=0 mean=-- std=--") + + # Update hardware/status panel + hw_lines = [ + f"fs={fs} Hz, win={SCATTER_WINDOW[0]}", + f"conf_min={current_conf_min[0]:.2f}, include_low={'on' if include_low[0] else 'off'}", + f"estimator={estimator}", + f"indev={indev} ({dev_in_name})", + f"outdev={outdev} ({dev_out_name})", + f"blocksize={io_info['blocksize_actual'] if io_info['blocksize_actual'] is not None else (blocksize if blocksize is not None else 'auto')}", + f"iolatency={iolatency if iolatency is not None else 'default'}", + f"pre={pre_silence:.2f}s, post={post_silence:.2f}s", + f"xruns={xrun_counter['count']}", + f"inRMS={rms_info['rms_dbfs']:.1f} dBFS, clip={'YES' if rms_info['clip'] else 'no'}", + f"bandpass={'on' if bandpass else 'off'}" + ] + hw_text.set_text("\n".join(hw_lines)) fig.canvas.draw_idle() def worker(): f = 440.0 while running.is_set(): - lat_ms, conf = measure_latency_once(f, fs, dur, vol, indev, outdev) + lat_ms, conf = measure_latency_once( + f, fs, dur, vol, indev, outdev, + pre_silence=pre_silence, post_silence=post_silence, + blocksize=blocksize, iolatency=iolatency, + estimator=estimator, xrun_counter=xrun_counter, + bandpass=bandpass, rms_info=rms_info, io_info=io_info + ) with lock: - latencies.append(lat_ms) + # Negative latencies are physically impossible -> mark as invalid (NaN) + if np.isfinite(lat_ms) and lat_ms < 0.0: + latencies.append(np.nan) + else: + latencies.append(lat_ms) confidences.append(conf) latest_conf["value"] = conf latest_changed.set() @@ -237,14 +511,68 @@ def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | No def on_stop(event): running.clear() - start_ax = fig.add_axes([0.58, 0.02, 0.15, 0.06]) - stop_ax = fig.add_axes([0.77, 0.02, 0.15, 0.06]) + # Slider for confidence threshold + slider_ax = fig.add_axes([0.10, 0.02, 0.32, 0.05]) + slider = Slider(slider_ax, 'conf_min', 0.0, 1.0, valinit=current_conf_min[0], valstep=0.01) + + def on_slider(val): + current_conf_min[0] = float(val) + update_plot() + slider.on_changed(on_slider) + + # Checkbox to include low-confidence samples (placed next to conf_min slider) + cbox_ax = fig.add_axes([0.45, 0.02, 0.12, 0.05]) + cbox = CheckButtons(cbox_ax, ["include low"], [include_low[0]]) + def on_cbox(label): + include_low[0] = not include_low[0] + update_plot() + cbox.on_clicked(on_cbox) + + # (removed middle window slider control) + + start_ax = fig.add_axes([0.54, 0.02, 0.13, 0.06]) + stop_ax = fig.add_axes([0.69, 0.02, 0.13, 0.06]) + reset_ax = fig.add_axes([0.84, 0.02, 0.06, 0.06]) + save_ax = fig.add_axes([0.92, 0.02, 0.06, 0.06]) btn_start = Button(start_ax, "Start") btn_stop = Button(stop_ax, "Stop") + btn_reset = Button(reset_ax, "Clr") + btn_save = Button(save_ax, "Save") btn_start.on_clicked(on_start) btn_stop.on_clicked(on_stop) + def on_reset(event): + with lock: + latencies.clear() + confidences.clear() + latest_conf["value"] = float("nan") + update_plot() + btn_reset.on_clicked(on_reset) - nonlocal_err = [err] + def on_save(event): + # Save current measurements to CSV with timestamped filename + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + fname = f"latency_{ts}.csv" + try: + with open(fname, 'w', encoding='utf-8') as fcsv: + fcsv.write("# latency_440 export\n") + fcsv.write(f"# fs,{fs}\n") + fcsv.write(f"# blocksize,{blocksize}\n") + fcsv.write(f"# iolatency,{iolatency}\n") + fcsv.write(f"# estimator,{estimator}\n") + fcsv.write(f"# pre_silence,{pre_silence}\n") + fcsv.write(f"# post_silence,{post_silence}\n") + fcsv.write(f"# bandpass,{bandpass}\n") + fcsv.write("index,latency_ms,confidence\n") + with lock: + for i, (lat, conf) in enumerate(zip(latencies, confidences)): + lv = '' if not np.isfinite(lat) else f"{lat:.6f}" + cv = '' if not np.isfinite(conf) else f"{conf:.6f}" + fcsv.write(f"{i},{lv},{cv}\n") + except Exception as e: + print(f"Save failed: {e}") + btn_save.on_clicked(on_save) + + # (no old errorbar state to keep) timer = fig.canvas.new_timer(interval=200) def on_timer(): @@ -265,9 +593,19 @@ def main(): ap.add_argument("--indev", type=int, default=None, help="Input-Geräteindex") ap.add_argument("--outdev", type=int, default=None, help="Output-Geräteindex") ap.add_argument("--conf-min", type=float, default=0.3, help="Warnschwelle für Confidence") + ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames), e.g. 1024/2048") + ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency (seconds or preset: 'low','high')") + ap.add_argument("--estimator", type=str, choices=["xcorr","timeinfo"], default="xcorr", help="Latency estimator: 'xcorr' (default, robust) or 'timeinfo' (host timestamps)") + ap.add_argument("--pre-silence", type=float, default=0.30, help="Pre-silence before tone (s)") + ap.add_argument("--post-silence", type=float, default=0.60, help="Post-silence after tone (s)") + ap.add_argument("--bandpass", action='store_true', default=True, help="Apply 440 Hz band-pass before correlation") + ap.add_argument("--no-bandpass", dest='bandpass', action='store_false', help="Disable band-pass prefilter") args = ap.parse_args() - run_gui(args.samplerate, args.time, args.volume, args.indev, args.outdev, args.conf_min) + run_gui(args.samplerate, args.time, args.volume, args.indev, args.outdev, + args.conf_min, blocksize=args.blocksize, iolatency=args.iolatency, + estimator=args.estimator, pre_silence=args.pre_silence, + post_silence=args.post_silence, bandpass=args.bandpass) if __name__ == "__main__": main()