#!/usr/bin/env python3 import argparse import threading import numpy as np import sounddevice as sd import matplotlib matplotlib.use("TkAgg") import matplotlib.pyplot as plt from matplotlib.widgets import Button, Slider def generate_log_sweep(f_start: float, f_end: float, dur_s: float, fs: int, volume: float, pre_silence: float = 0.10, post_silence: float = 0.20) -> tuple[np.ndarray, np.ndarray, int, int]: n_pre = int(pre_silence * fs) n_tone = int(dur_s * fs) n_post = int(post_silence * fs) t = np.arange(n_tone, dtype=np.float64) / fs r = float(f_end) / float(f_start) k = np.log(r) phase = 2.0 * np.pi * f_start * (np.exp((t / dur_s) * k) - 1.0) * (dur_s / k) sweep = np.sin(phase).astype(np.float32) fade_n = max(1, int(0.005 * fs)) if fade_n > 0 and fade_n * 2 < n_tone: w = np.ones_like(sweep) w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False, dtype=np.float32) w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False, dtype=np.float32) sweep *= w sweep *= float(volume) out = np.concatenate([ np.zeros(n_pre, dtype=np.float32), sweep, np.zeros(n_post, dtype=np.float32) ]) return out, sweep, n_pre, n_pre + n_tone def map_time_to_freq_log(t: np.ndarray, f_start: float, f_end: float, dur_s: float) -> np.ndarray: r = float(f_end) / float(f_start) return f_start * np.exp((t / dur_s) * np.log(r)) def analyze_rms_vs_freq(rec: np.ndarray, fs: int, start_idx: int, end_idx: int, dur_s: float, f_start: float, f_end: float, frame: int = 2048, hop: int = 1024, eps: float = 1e-12) -> tuple[np.ndarray, np.ndarray, np.ndarray]: x = rec.astype(np.float32) x = x[start_idx:end_idx] if x.size < frame: return np.array([], dtype=np.float32), np.array([], dtype=np.float32) n = x.size centers = [] vals = [] confs = [] i = 0 while i + frame <= n: seg = x[i:i+frame] rms = float(np.sqrt(np.mean(seg.astype(np.float64) ** 2))) vals.append(20.0 * np.log10(max(rms, eps))) c = (i + frame * 0.5) / fs centers.append(c) # Confidence via spectral peak prominence near expected frequency win = np.hanning(len(seg)).astype(np.float64) segw = seg.astype(np.float64) * win # next power of two for speed nfft = 1 << (int(len(segw) - 1).bit_length()) spec = np.fft.rfft(segw, n=nfft) mag = np.abs(spec) # Expected bin t_here = c f_exp = float(map_time_to_freq_log(np.array([t_here]), f_start, f_end, dur_s)[0]) bin_exp = int(np.clip(np.round(f_exp / fs * nfft), 1, (nfft // 2))) b0 = max(1, bin_exp - 2) b1 = min(nfft // 2, bin_exp + 2) peak = float(np.max(mag[b0:b1+1])) if b1 >= b0 else float(mag[bin_exp]) noise = float(np.median(mag[1:])) if mag.size > 1 else peak snr_db = 20.0 * np.log10((peak + eps) / (noise + eps)) conf = float(np.clip((snr_db - 6.0) / 24.0, 0.0, 1.0)) # 6..30 dB -> 0..1 confs.append(conf) i += hop t = np.array(centers, dtype=np.float64) f = map_time_to_freq_log(t, f_start, f_end, dur_s).astype(np.float32) db = np.array(vals, dtype=np.float32) confa = np.array(confs, dtype=np.float32) return f, db, confa def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None, f_start: float = 20.0, f_end: float = 20000.0, frame: int = 2048, hop: int = 1024, blocksize: int | None = None, iolatency: float | str | None = "high"): freqs: list[float] = [] dbfs: list[float] = [] confs: list[float] = [] fig = plt.figure(figsize=(10, 6)) plt.tight_layout(rect=[0, 0.16, 1, 1]) ax_sc = fig.add_axes([0.08, 0.22, 0.84, 0.72]) ax_sc.set_title("Sweep capture: dBFS vs frequency", loc="left") ax_sc.set_xlabel("frequency [Hz]") ax_sc.set_ylabel("level [dBFS]") ax_sc.set_xscale("log") ax_sc.grid(True, which="both", axis="both", alpha=0.25) sc = ax_sc.scatter([], [], c=[], cmap="viridis", vmin=0.0, vmax=1.0, s=18, alpha=0.9) cbar = fig.colorbar(sc, ax=ax_sc, pad=0.02) cbar.set_label("confidence") busy = threading.Event() latest_changed = threading.Event() lock = threading.Lock() current_vol = [float(vol)] def update_plot(): with lock: if len(freqs) == 0: ax_sc.set_xlim(f_start, f_end) ax_sc.set_ylim(-100.0, 0.0) sc.set_offsets(np.empty((0, 2))) sc.set_array(np.array([])) else: f = np.array(freqs, dtype=float) y = np.array(dbfs, dtype=float) xy = np.column_stack([f, y]) sc.set_offsets(xy) sc.set_array(np.array(confs, dtype=float)) ax_sc.set_xlim(max(10.0, np.nanmin(f)), min(24000.0, np.nanmax(f))) ymin = np.nanpercentile(y, 2) if np.isfinite(y).any() else -100.0 ymax = np.nanpercentile(y, 98) if np.isfinite(y).any() else 0.0 if not np.isfinite(ymin): ymin = -100.0 if not np.isfinite(ymax): ymax = 0.0 if ymax - ymin < 10.0: ymin = min(ymin, -100.0) ymax = max(ymax, 0.0) pad = 0.05 * (ymax - ymin) ax_sc.set_ylim(ymin - pad, ymax + pad) fig.canvas.draw_idle() def run_one_sweep(): play_buf, ref, start_idx, end_idx = generate_log_sweep( f_start, f_end, dur, fs, current_vol[0], pre_silence=0.10, post_silence=0.20 ) record_buf = [] written = 0 def cb(indata, outdata, frames, time_info, status): nonlocal written if status: print(status, flush=True) chunk = play_buf[written:written+frames] out = np.zeros((frames,), dtype=np.float32) if len(chunk) > 0: out[:len(chunk)] = chunk outdata[:] = out.reshape(-1, 1) record_buf.append(indata.copy().reshape(-1)) written += frames stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1) if indev is not None or outdev is not None: stream_kwargs["device"] = (indev, outdev) if blocksize is not None: stream_kwargs["blocksize"] = int(blocksize) if iolatency is not None: stream_kwargs["latency"] = iolatency with sd.Stream(callback=cb, **stream_kwargs): sd.sleep(int(1000 * (len(play_buf) / fs))) sd.sleep(100) if not record_buf: return np.array([], dtype=np.float32), np.array([], dtype=np.float32), np.array([], dtype=np.float32) rec = np.concatenate(record_buf).astype(np.float32) f, y, c = analyze_rms_vs_freq(rec, fs, start_idx, end_idx, dur, f_start, f_end, frame=frame, hop=hop) return f, y, c def on_sweep(event): if busy.is_set(): return busy.set() btn_sweep.label.set_text("Running…") def task(): try: f, y, c = run_one_sweep() with lock: if f.size > 0: freqs.extend(list(f)) dbfs.extend(list(y)) confs.extend(list(c)) latest_changed.set() finally: busy.clear() btn_sweep.label.set_text("Sweep") threading.Thread(target=task, daemon=True).start() sweep_ax = fig.add_axes([0.79, 0.05, 0.15, 0.07]) btn_sweep = Button(sweep_ax, "Sweep") btn_sweep.on_clicked(on_sweep) vol_ax = fig.add_axes([0.08, 0.05, 0.45, 0.07]) vol_slider = Slider(vol_ax, 'volume', 0.0, 1.0, valinit=current_vol[0], valstep=0.01) def on_vol(val): current_vol[0] = float(val) vol_slider.on_changed(on_vol) timer = fig.canvas.new_timer(interval=250) def on_timer(): if latest_changed.is_set(): latest_changed.clear() update_plot() timer.add_callback(on_timer) timer.start() update_plot() plt.show() def main(): ap = argparse.ArgumentParser(description="Frequency sweep capture: plot recorded dBFS vs frequency") ap.add_argument("--samplerate", "-r", type=int, default=48_000, help="Sample rate") ap.add_argument("--time", "-t", type=float, default=8.0, help="Sweep duration (s)") ap.add_argument("--volume", "-v", type=float, default=0.5, help="Playback volume 0..1") ap.add_argument("--indev", type=int, default=None, help="Input device index") ap.add_argument("--outdev", type=int, default=None, help="Output device index") ap.add_argument("--f-start", type=float, default=20.0, help="Sweep start frequency (Hz)") ap.add_argument("--f-end", type=float, default=20000.0, help="Sweep end frequency (Hz)") ap.add_argument("--frame", type=int, default=2048, help="RMS window size (samples)") ap.add_argument("--hop", type=int, default=1024, help="RMS hop size (samples)") ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames)") ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency preset or seconds") args = ap.parse_args() run_gui( fs=args.samplerate, dur=args.time, vol=args.volume, indev=args.indev, outdev=args.outdev, f_start=args.f_start, f_end=args.f_end, frame=args.frame, hop=args.hop, blocksize=args.blocksize, iolatency=args.iolatency, ) if __name__ == "__main__": main()