252 lines
9.6 KiB
Python
252 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import threading
|
|
import numpy as np
|
|
import sounddevice as sd
|
|
import matplotlib
|
|
matplotlib.use("TkAgg")
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.widgets import Button, Slider
|
|
|
|
|
|
def generate_log_sweep(f_start: float, f_end: float, dur_s: float, fs: int, volume: float,
|
|
pre_silence: float = 0.10, post_silence: float = 0.20) -> tuple[np.ndarray, np.ndarray, int, int]:
|
|
n_pre = int(pre_silence * fs)
|
|
n_tone = int(dur_s * fs)
|
|
n_post = int(post_silence * fs)
|
|
t = np.arange(n_tone, dtype=np.float64) / fs
|
|
r = float(f_end) / float(f_start)
|
|
k = np.log(r)
|
|
phase = 2.0 * np.pi * f_start * (np.exp((t / dur_s) * k) - 1.0) * (dur_s / k)
|
|
sweep = np.sin(phase).astype(np.float32)
|
|
fade_n = max(1, int(0.005 * fs))
|
|
if fade_n > 0 and fade_n * 2 < n_tone:
|
|
w = np.ones_like(sweep)
|
|
w[:fade_n] *= np.linspace(0, 1, fade_n, endpoint=False, dtype=np.float32)
|
|
w[-fade_n:] *= np.linspace(1, 0, fade_n, endpoint=False, dtype=np.float32)
|
|
sweep *= w
|
|
sweep *= float(volume)
|
|
out = np.concatenate([
|
|
np.zeros(n_pre, dtype=np.float32),
|
|
sweep,
|
|
np.zeros(n_post, dtype=np.float32)
|
|
])
|
|
return out, sweep, n_pre, n_pre + n_tone
|
|
|
|
|
|
def map_time_to_freq_log(t: np.ndarray, f_start: float, f_end: float, dur_s: float) -> np.ndarray:
|
|
r = float(f_end) / float(f_start)
|
|
return f_start * np.exp((t / dur_s) * np.log(r))
|
|
|
|
|
|
def analyze_rms_vs_freq(rec: np.ndarray, fs: int, start_idx: int, end_idx: int,
|
|
dur_s: float, f_start: float, f_end: float,
|
|
frame: int = 2048, hop: int = 1024,
|
|
eps: float = 1e-12) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
x = rec.astype(np.float32)
|
|
x = x[start_idx:end_idx]
|
|
if x.size < frame:
|
|
return np.array([], dtype=np.float32), np.array([], dtype=np.float32)
|
|
n = x.size
|
|
centers = []
|
|
vals = []
|
|
confs = []
|
|
i = 0
|
|
while i + frame <= n:
|
|
seg = x[i:i+frame]
|
|
rms = float(np.sqrt(np.mean(seg.astype(np.float64) ** 2)))
|
|
vals.append(20.0 * np.log10(max(rms, eps)))
|
|
c = (i + frame * 0.5) / fs
|
|
centers.append(c)
|
|
# Confidence via spectral peak prominence near expected frequency
|
|
win = np.hanning(len(seg)).astype(np.float64)
|
|
segw = seg.astype(np.float64) * win
|
|
# next power of two for speed
|
|
nfft = 1 << (int(len(segw) - 1).bit_length())
|
|
spec = np.fft.rfft(segw, n=nfft)
|
|
mag = np.abs(spec)
|
|
# Expected bin
|
|
t_here = c
|
|
f_exp = float(map_time_to_freq_log(np.array([t_here]), f_start, f_end, dur_s)[0])
|
|
bin_exp = int(np.clip(np.round(f_exp / fs * nfft), 1, (nfft // 2)))
|
|
b0 = max(1, bin_exp - 2)
|
|
b1 = min(nfft // 2, bin_exp + 2)
|
|
peak = float(np.max(mag[b0:b1+1])) if b1 >= b0 else float(mag[bin_exp])
|
|
noise = float(np.median(mag[1:])) if mag.size > 1 else peak
|
|
snr_db = 20.0 * np.log10((peak + eps) / (noise + eps))
|
|
conf = float(np.clip((snr_db - 6.0) / 24.0, 0.0, 1.0)) # 6..30 dB -> 0..1
|
|
confs.append(conf)
|
|
i += hop
|
|
t = np.array(centers, dtype=np.float64)
|
|
f = map_time_to_freq_log(t, f_start, f_end, dur_s).astype(np.float32)
|
|
db = np.array(vals, dtype=np.float32)
|
|
confa = np.array(confs, dtype=np.float32)
|
|
return f, db, confa
|
|
|
|
|
|
def run_gui(fs: int, dur: float, vol: float, indev: int | None, outdev: int | None,
|
|
f_start: float = 20.0, f_end: float = 20000.0,
|
|
frame: int = 2048, hop: int = 1024,
|
|
blocksize: int | None = None, iolatency: float | str | None = "high"):
|
|
freqs: list[float] = []
|
|
dbfs: list[float] = []
|
|
confs: list[float] = []
|
|
|
|
fig = plt.figure(figsize=(10, 6))
|
|
plt.tight_layout(rect=[0, 0.16, 1, 1])
|
|
|
|
ax_sc = fig.add_axes([0.08, 0.22, 0.84, 0.72])
|
|
ax_sc.set_title("Sweep capture: dBFS vs frequency", loc="left")
|
|
ax_sc.set_xlabel("frequency [Hz]")
|
|
ax_sc.set_ylabel("level [dBFS]")
|
|
ax_sc.set_xscale("log")
|
|
ax_sc.grid(True, which="both", axis="both", alpha=0.25)
|
|
sc = ax_sc.scatter([], [], c=[], cmap="viridis", vmin=0.0, vmax=1.0, s=18, alpha=0.9)
|
|
cbar = fig.colorbar(sc, ax=ax_sc, pad=0.02)
|
|
cbar.set_label("confidence")
|
|
|
|
busy = threading.Event()
|
|
latest_changed = threading.Event()
|
|
lock = threading.Lock()
|
|
|
|
current_vol = [float(vol)]
|
|
|
|
def update_plot():
|
|
with lock:
|
|
if len(freqs) == 0:
|
|
ax_sc.set_xlim(f_start, f_end)
|
|
ax_sc.set_ylim(-100.0, 0.0)
|
|
sc.set_offsets(np.empty((0, 2)))
|
|
sc.set_array(np.array([]))
|
|
else:
|
|
f = np.array(freqs, dtype=float)
|
|
y = np.array(dbfs, dtype=float)
|
|
xy = np.column_stack([f, y])
|
|
sc.set_offsets(xy)
|
|
sc.set_array(np.array(confs, dtype=float))
|
|
ax_sc.set_xlim(max(10.0, np.nanmin(f)), min(24000.0, np.nanmax(f)))
|
|
ymin = np.nanpercentile(y, 2) if np.isfinite(y).any() else -100.0
|
|
ymax = np.nanpercentile(y, 98) if np.isfinite(y).any() else 0.0
|
|
if not np.isfinite(ymin):
|
|
ymin = -100.0
|
|
if not np.isfinite(ymax):
|
|
ymax = 0.0
|
|
if ymax - ymin < 10.0:
|
|
ymin = min(ymin, -100.0)
|
|
ymax = max(ymax, 0.0)
|
|
pad = 0.05 * (ymax - ymin)
|
|
ax_sc.set_ylim(ymin - pad, ymax + pad)
|
|
fig.canvas.draw_idle()
|
|
|
|
def run_one_sweep():
|
|
play_buf, ref, start_idx, end_idx = generate_log_sweep(
|
|
f_start, f_end, dur, fs, current_vol[0], pre_silence=0.10, post_silence=0.20
|
|
)
|
|
record_buf = []
|
|
written = 0
|
|
|
|
def cb(indata, outdata, frames, time_info, status):
|
|
nonlocal written
|
|
if status:
|
|
print(status, flush=True)
|
|
chunk = play_buf[written:written+frames]
|
|
out = np.zeros((frames,), dtype=np.float32)
|
|
if len(chunk) > 0:
|
|
out[:len(chunk)] = chunk
|
|
outdata[:] = out.reshape(-1, 1)
|
|
record_buf.append(indata.copy().reshape(-1))
|
|
written += frames
|
|
|
|
stream_kwargs = dict(samplerate=fs, dtype="float32", channels=1)
|
|
if indev is not None or outdev is not None:
|
|
stream_kwargs["device"] = (indev, outdev)
|
|
if blocksize is not None:
|
|
stream_kwargs["blocksize"] = int(blocksize)
|
|
if iolatency is not None:
|
|
stream_kwargs["latency"] = iolatency
|
|
|
|
with sd.Stream(callback=cb, **stream_kwargs):
|
|
sd.sleep(int(1000 * (len(play_buf) / fs)))
|
|
sd.sleep(100)
|
|
|
|
if not record_buf:
|
|
return np.array([], dtype=np.float32), np.array([], dtype=np.float32), np.array([], dtype=np.float32)
|
|
rec = np.concatenate(record_buf).astype(np.float32)
|
|
f, y, c = analyze_rms_vs_freq(rec, fs, start_idx, end_idx, dur, f_start, f_end, frame=frame, hop=hop)
|
|
return f, y, c
|
|
def on_sweep(event):
|
|
if busy.is_set():
|
|
return
|
|
busy.set()
|
|
btn_sweep.label.set_text("Running…")
|
|
def task():
|
|
try:
|
|
f, y, c = run_one_sweep()
|
|
with lock:
|
|
if f.size > 0:
|
|
freqs.extend(list(f))
|
|
dbfs.extend(list(y))
|
|
confs.extend(list(c))
|
|
latest_changed.set()
|
|
finally:
|
|
busy.clear()
|
|
btn_sweep.label.set_text("Sweep")
|
|
threading.Thread(target=task, daemon=True).start()
|
|
|
|
sweep_ax = fig.add_axes([0.79, 0.05, 0.15, 0.07])
|
|
btn_sweep = Button(sweep_ax, "Sweep")
|
|
btn_sweep.on_clicked(on_sweep)
|
|
|
|
vol_ax = fig.add_axes([0.08, 0.05, 0.45, 0.07])
|
|
vol_slider = Slider(vol_ax, 'volume', 0.0, 1.0, valinit=current_vol[0], valstep=0.01)
|
|
|
|
def on_vol(val):
|
|
current_vol[0] = float(val)
|
|
vol_slider.on_changed(on_vol)
|
|
|
|
timer = fig.canvas.new_timer(interval=250)
|
|
def on_timer():
|
|
if latest_changed.is_set():
|
|
latest_changed.clear()
|
|
update_plot()
|
|
timer.add_callback(on_timer)
|
|
timer.start()
|
|
|
|
update_plot()
|
|
plt.show()
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Frequency sweep capture: plot recorded dBFS vs frequency")
|
|
ap.add_argument("--samplerate", "-r", type=int, default=48_000, help="Sample rate")
|
|
ap.add_argument("--time", "-t", type=float, default=8.0, help="Sweep duration (s)")
|
|
ap.add_argument("--volume", "-v", type=float, default=0.5, help="Playback volume 0..1")
|
|
ap.add_argument("--indev", type=int, default=None, help="Input device index")
|
|
ap.add_argument("--outdev", type=int, default=None, help="Output device index")
|
|
ap.add_argument("--f-start", type=float, default=20.0, help="Sweep start frequency (Hz)")
|
|
ap.add_argument("--f-end", type=float, default=20000.0, help="Sweep end frequency (Hz)")
|
|
ap.add_argument("--frame", type=int, default=2048, help="RMS window size (samples)")
|
|
ap.add_argument("--hop", type=int, default=1024, help="RMS hop size (samples)")
|
|
ap.add_argument("--blocksize", type=int, default=None, help="Audio blocksize (frames)")
|
|
ap.add_argument("--iolatency", type=str, default="high", help="Audio I/O latency preset or seconds")
|
|
args = ap.parse_args()
|
|
|
|
run_gui(
|
|
fs=args.samplerate,
|
|
dur=args.time,
|
|
vol=args.volume,
|
|
indev=args.indev,
|
|
outdev=args.outdev,
|
|
f_start=args.f_start,
|
|
f_end=args.f_end,
|
|
frame=args.frame,
|
|
hop=args.hop,
|
|
blocksize=args.blocksize,
|
|
iolatency=args.iolatency,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|