Files
latency_test_suit/frequency_quality.py

252 lines
9.6 KiB
Python

#!/usr/bin/env python3
import argparse
import threading
import numpy as np
import sounddevice as sd
import matplotlib
matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
from matplotlib.widgets import Button, Slider
def generate_log_sweep(f_start: float, f_end: float, dur_s: float, fs: int, volume: float,
pre_silence: float = 0.10, post_silence: float = 0.20) -> tuple[np.ndarray, np.ndarray, int, int]:
n_pre = int(pre_silence * fs)
n_tone = int(dur_s * fs)
n_post = int(post_silence * fs)
t = np.arange(n_tone, dtype=np.float64) / fs
r = float(f_end) / float(f_start)
k = np.log(r)
phase = 2.0 * np.pi * f_start * (np.exp((t / dur_s) * k) - 1.0) * (dur_s / k)
sweep = np.sin(phase).astype(np.float32)
fade_n = max(1, int(0.005 * fs))
if fade_n > 0 and fade_n * 2 < n_tone:
w = np.ones_like(sweep)
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False, dtype=np.float32)
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False, dtype=np.float32)
sweep *= w
sweep *= float(volume)
out = np.concatenate([
np.zeros(n_pre, dtype=np.float32),
sweep,
np.zeros(n_post, dtype=np.float32)
])
return out, sweep, n_pre, n_pre + n_tone
def map_time_to_freq_log(t: np.ndarray, f_start: float, f_end: float, dur_s: float) -> np.ndarray:
r = float(f_end) / float(f_start)
return f_start * np.exp((t / dur_s) * np.log(r))
def analyze_rms_vs_freq(rec: np.ndarray, fs: int, start_idx: int, end_idx: int,
dur_s: float, f_start: float, f_end: float,
frame: int = 2048, hop: int = 1024,
eps: float = 1e-12) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
x = rec.astype(np.float32)
x = x[start_idx:end_idx]
if x.size < frame:
return np.array([], dtype=np.float32), np.array([], dtype=np.float32)
n = x.size
centers = []
vals = []
confs = []
i = 0
while i + frame <= n:
seg = x[i:i+frame]
rms = float(np.sqrt(np.mean(seg.astype(np.float64) ** 2)))
vals.append(20.0 * np.log10(max(rms, eps)))
c = (i + frame * 0.5) / fs
centers.append(c)
# Confidence via spectral peak prominence near expected frequency
win = np.hanning(len(seg)).astype(np.float64)
segw = seg.astype(np.float64) * win
# next power of two for speed
nfft = 1 << (int(len(segw) - 1).bit_length())
spec = np.fft.rfft(segw, n=nfft)
mag = np.abs(spec)
# Expected bin
t_here = c
f_exp = float(map_time_to_freq_log(np.array([t_here]), f_start, f_end, dur_s)[0])
bin_exp = int(np.clip(np.round(f_exp / fs * nfft), 1, (nfft // 2)))
b0 = max(1, bin_exp - 2)
b1 = min(nfft // 2, bin_exp + 2)
peak = float(np.max(mag[b0:b1+1])) if b1 >= b0 else float(mag[bin_exp])
noise = float(np.median(mag[1:])) if mag.size > 1 else peak
snr_db = 20.0 * np.log10((peak + eps) / (noise + eps))
conf = float(np.clip((snr_db - 6.0) / 24.0, 0.0, 1.0)) # 6..30 dB -> 0..1
confs.append(conf)
i += hop
t = np.array(centers, dtype=np.float64)
f = map_time_to_freq_log(t, f_start, f_end, dur_s).astype(np.float32)
db = np.array(vals, dtype=np.float32)
confa = np.array(confs, dtype=np.float32)
return f, db, confa
def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None,
f_start: float = 20.0, f_end: float = 20000.0,
frame: int = 2048, hop: int = 1024,
blocksize: int | None = None, iolatency: float | str | None = "high"):
freqs: list[float] = []
dbfs: list[float] = []
confs: list[float] = []
fig = plt.figure(figsize=(10, 6))
plt.tight_layout(rect=[0, 0.16, 1, 1])
ax_sc = fig.add_axes([0.08, 0.22, 0.84, 0.72])
ax_sc.set_title("Sweep capture: dBFS vs frequency", loc="left")
ax_sc.set_xlabel("frequency [Hz]")
ax_sc.set_ylabel("level [dBFS]")
ax_sc.set_xscale("log")
ax_sc.grid(True, which="both", axis="both", alpha=0.25)
sc = ax_sc.scatter([], [], c=[], cmap="viridis", vmin=0.0, vmax=1.0, s=18, alpha=0.9)
cbar = fig.colorbar(sc, ax=ax_sc, pad=0.02)
cbar.set_label("confidence")
busy = threading.Event()
latest_changed = threading.Event()
lock = threading.Lock()
current_vol = [float(vol)]
def update_plot():
with lock:
if len(freqs) == 0:
ax_sc.set_xlim(f_start, f_end)
ax_sc.set_ylim(-100.0, 0.0)
sc.set_offsets(np.empty((0, 2)))
sc.set_array(np.array([]))
else:
f = np.array(freqs, dtype=float)
y = np.array(dbfs, dtype=float)
xy = np.column_stack([f, y])
sc.set_offsets(xy)
sc.set_array(np.array(confs, dtype=float))
ax_sc.set_xlim(max(10.0, np.nanmin(f)), min(24000.0, np.nanmax(f)))
ymin = np.nanpercentile(y, 2) if np.isfinite(y).any() else -100.0
ymax = np.nanpercentile(y, 98) if np.isfinite(y).any() else 0.0
if not np.isfinite(ymin):
ymin = -100.0
if not np.isfinite(ymax):
ymax = 0.0
if ymax - ymin < 10.0:
ymin = min(ymin, -100.0)
ymax = max(ymax, 0.0)
pad = 0.05 * (ymax - ymin)
ax_sc.set_ylim(ymin - pad, ymax + pad)
fig.canvas.draw_idle()
def run_one_sweep():
play_buf, ref, start_idx, end_idx = generate_log_sweep(
f_start, f_end, dur, fs, current_vol[0], pre_silence=0.10, post_silence=0.20
)
record_buf = []
written = 0
def cb(indata, outdata, frames, time_info, status):
nonlocal written
if status:
print(status, flush=True)
chunk = play_buf[written:written+frames]
out = np.zeros((frames,), dtype=np.float32)
if len(chunk) > 0:
out[:len(chunk)] = chunk
outdata[:] = out.reshape(-1, 1)
record_buf.append(indata.copy().reshape(-1))
written += frames
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
if indev is not None or outdev is not None:
stream_kwargs["device"] = (indev, outdev)
if blocksize is not None:
stream_kwargs["blocksize"] = int(blocksize)
if iolatency is not None:
stream_kwargs["latency"] = iolatency
with sd.Stream(callback=cb, **stream_kwargs):
sd.sleep(int(1000 * (len(play_buf) / fs)))
sd.sleep(100)
if not record_buf:
return np.array([], dtype=np.float32), np.array([], dtype=np.float32), np.array([], dtype=np.float32)
rec = np.concatenate(record_buf).astype(np.float32)
f, y, c = analyze_rms_vs_freq(rec, fs, start_idx, end_idx, dur, f_start, f_end, frame=frame, hop=hop)
return f, y, c
def on_sweep(event):
if busy.is_set():
return
busy.set()
btn_sweep.label.set_text("Running…")
def task():
try:
f, y, c = run_one_sweep()
with lock:
if f.size > 0:
freqs.extend(list(f))
dbfs.extend(list(y))
confs.extend(list(c))
latest_changed.set()
finally:
busy.clear()
btn_sweep.label.set_text("Sweep")
threading.Thread(target=task, daemon=True).start()
sweep_ax = fig.add_axes([0.79, 0.05, 0.15, 0.07])
btn_sweep = Button(sweep_ax, "Sweep")
btn_sweep.on_clicked(on_sweep)
vol_ax = fig.add_axes([0.08, 0.05, 0.45, 0.07])
vol_slider = Slider(vol_ax, 'volume', 0.0, 1.0, valinit=current_vol[0], valstep=0.01)
def on_vol(val):
current_vol[0] = float(val)
vol_slider.on_changed(on_vol)
timer = fig.canvas.new_timer(interval=250)
def on_timer():
if latest_changed.is_set():
latest_changed.clear()
update_plot()
timer.add_callback(on_timer)
timer.start()
update_plot()
plt.show()
def main():
ap = argparse.ArgumentParser(description="Frequency sweep capture: plot recorded dBFS vs frequency")
ap.add_argument("--samplerate", "-r", type=int, default=48_000, help="Sample rate")
ap.add_argument("--time", "-t", type=float, default=8.0, help="Sweep duration (s)")
ap.add_argument("--volume", "-v", type=float, default=0.5, help="Playback volume 0..1")
ap.add_argument("--indev", type=int, default=None, help="Input device index")
ap.add_argument("--outdev", type=int, default=None, help="Output device index")
ap.add_argument("--f-start", type=float, default=20.0, help="Sweep start frequency (Hz)")
ap.add_argument("--f-end", type=float, default=20000.0, help="Sweep end frequency (Hz)")
ap.add_argument("--frame", type=int, default=2048, help="RMS window size (samples)")
ap.add_argument("--hop", type=int, default=1024, help="RMS hop size (samples)")
ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames)")
ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency preset or seconds")
args = ap.parse_args()
run_gui(
fs=args.samplerate,
dur=args.time,
vol=args.volume,
indev=args.indev,
outdev=args.outdev,
f_start=args.f_start,
f_end=args.f_end,
frame=args.frame,
hop=args.hop,
blocksize=args.blocksize,
iolatency=args.iolatency,
)
if __name__ == "__main__":
main()