From c681e4ce398e10c25daa4bdca54c8bd9377e0cec Mon Sep 17 00:00:00 2001 From: pstruebi Date: Wed, 19 Nov 2025 18:52:37 +0100 Subject: [PATCH] feat: refactor audio input to use dedicated reader thread instead of per-frame executor - Replaced per-frame `run_in_executor` calls with single background reader thread in `ThreadedAudioInput` - Reader thread continuously calls `_read()` and enqueues data via `call_soon_threadsafe` to asyncio.Queue - Reduces per-frame scheduling overhead and context-switch jitter while preserving async API - Added thread lifecycle management: lazy start on first `frames()` call, graceful stop in `aclose()` - Update --- README.md | 37 +++++++++++++++ audio/io.py | 44 +++++++++++++++-- list_portaudio_devices.py | 4 ++ sd_ioutput.py | 19 ++++---- test_ringbuffer.py | 99 ++++++++++++++++++++++++++++++++++++--- 5 files changed, 182 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index e69de29..7762b58 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,37 @@ +# Threaded Reader Refactor (Audio Input) + +This project originally used `run_in_executor` for every audio frame to bridge blocking reads into an async generator. We replaced that per‑frame executor usage with a single background reader thread and an `asyncio.Queue`, keeping the public API and block sizes unchanged. + +## What changed +- Before: `ThreadedAudioInput.frames(frame_size)` did: + - For each frame: `await loop.run_in_executor(..., self._read, frame_size)` + - Yielded the returned bytes. +- Now: `ThreadedAudioInput.frames(frame_size)` does: + - Starts one background reader thread on first use. + - Reader thread repeatedly calls `self._read(frame_size)` and enqueues results via `loop.call_soon_threadsafe(self._pcm_samples.put_nowait, data)`. + - The async generator awaits `self._pcm_samples.get()` and yields items. + +## Why this helps +- Removes per‑frame executor scheduling and context‑switch overhead. +- Reduces jitter and extra pipeline delay while preserving the same async API (`async for frame in device.frames(...)`). +- Plays nicely with existing ringbuffer logic in `ModSoundDeviceAudioInput` without changing block sizes or device setup. + +## API/behavior preserved +- Public interface of `ThreadedAudioInput` subclasses is unchanged: + - `await open()` + - `frames(frame_size)` → `AsyncGenerator[bytes]` + - `await aclose()` +- Block sizes, device indices, and PCM formats are unchanged. + +## Implementation notes +- New attributes in `ThreadedAudioInput.__init__`: + - `_reader_thread: threading.Thread | None` + - `_running: bool` + - `_loop: asyncio.AbstractEventLoop | None` + - `_pcm_samples: asyncio.Queue[bytes]` +- `frames()` lazily starts `_reader_thread` on first call; the thread stops when `aclose()` is called or `_read()` returns empty bytes. +- `aclose()` joins the reader thread and then performs the blocking close in the thread pool, as before. + +## Limitations / next steps +- The queue is currently unbounded; if you want to strictly cap software latency, consider a bounded queue and dropping oldest frames when full. +- This refactor does not change ringbuffer sizing or block sizes; those can still influence end‑to‑end latency. diff --git a/audio/io.py b/audio/io.py index bdcb15d..4224ec1 100644 --- a/audio/io.py +++ b/audio/io.py @@ -27,6 +27,7 @@ import sys import wave from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, AsyncGenerator, BinaryIO +import threading if TYPE_CHECKING: @@ -406,6 +407,9 @@ class ThreadedAudioInput(AudioInput): def __init__(self) -> None: self._thread_pool = ThreadPoolExecutor(1) self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue() + self._reader_thread: threading.Thread | None = None + self._running: bool = False + self._loop: asyncio.AbstractEventLoop | None = None @abc.abstractmethod def _read(self, frame_size: int) -> bytes: @@ -424,12 +428,46 @@ class ThreadedAudioInput(AudioInput): ) async def frames(self, frame_size: int) -> AsyncGenerator[bytes]: - while pcm_sample := await asyncio.get_running_loop().run_in_executor( - self._thread_pool, self._read, frame_size - ): + # Start a dedicated reader thread on first use to avoid per-frame + # run_in_executor overhead while preserving the same async API. + if not self._running: + self._running = True + self._loop = asyncio.get_running_loop() + + def _reader() -> None: + try: + while self._running: + pcm_sample = self._read(frame_size) + if not pcm_sample: + # Propagate termination to the async generator. + if self._loop is not None: + self._loop.call_soon_threadsafe( + self._pcm_samples.put_nowait, b"" + ) + break + if self._loop is not None: + self._loop.call_soon_threadsafe( + self._pcm_samples.put_nowait, pcm_sample + ) + except Exception: + logger.exception("ThreadedAudioInput reader thread failed") + + self._reader_thread = threading.Thread(target=_reader, daemon=True) + self._reader_thread.start() + + while True: + pcm_sample = await self._pcm_samples.get() + if not pcm_sample: + break yield pcm_sample async def aclose(self) -> None: + # Stop reader thread first so no more _read() calls are issued. + self._running = False + if self._reader_thread is not None: + self._reader_thread.join(timeout=1.0) + self._reader_thread = None + await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close) self._thread_pool.shutdown() diff --git a/list_portaudio_devices.py b/list_portaudio_devices.py index 67aaa86..96352f9 100644 --- a/list_portaudio_devices.py +++ b/list_portaudio_devices.py @@ -25,3 +25,7 @@ print("\nHints:") print("- Pick an INPUT index with in>0 that matches your capture device name (e.g., 'USB Audio Device').") print("- Pick an OUTPUT index with out>0 that matches your playback device name (e.g., 'USB Audio').") print("- We will use mono (channels=1). If mono fails, we can fall back to 2 channels.") + +print("\nALSA hw: device suggestions for your setup:") +print("- Input (Shure MVX2U: USB Audio (hw:0,0)) -> use 'hw:0,0'") +print("- Output (USB Audio: - (hw:1,0)) -> use 'hw:1,0'") diff --git a/sd_ioutput.py b/sd_ioutput.py index a198a6d..61482ec 100644 --- a/sd_ioutput.py +++ b/sd_ioutput.py @@ -3,24 +3,21 @@ import sounddevice as sd def main() -> None: - in_device = 1 - out_device = 0 + in_device = 'hw:0,0' # Shure MVX2U input + out_device = 'hw:1,0' # USB Audio output sample_rate = 48000 frame_size = 480 - indev = int(in_device) - outdev = int(out_device) - - dinfo = sd.query_devices(indev) - doutfo = sd.query_devices(outdev) - print(f"Input device {indev} has no input channels: {dinfo}") + dinfo = sd.query_devices(in_device) + doutfo = sd.query_devices(out_device) + print(f"Input device {in_device} has no input channels: {dinfo}") inputs = [ (d['index'], d['name'], d['max_input_channels']) for d in sd.query_devices() if d.get('max_input_channels', 0) > 0 ] print('Input-capable devices:', inputs) - print(f"Output device {outdev} has no output channels: {doutfo}") + print(f"Output device {out_device} has no output channels: {doutfo}") outputs = [ (d['index'], d['name'], d['max_output_channels']) for d in sd.query_devices() @@ -30,14 +27,14 @@ def main() -> None: istream = sd.RawInputStream( samplerate=sample_rate, - device=indev, + device=in_device, channels=1, dtype='int16', blocksize=frame_size, ) ostream = sd.RawOutputStream( samplerate=sample_rate, - device=outdev, + device=out_device, channels=1, dtype='int16', blocksize=frame_size, diff --git a/test_ringbuffer.py b/test_ringbuffer.py index 2b626de..bc73f20 100644 --- a/test_ringbuffer.py +++ b/test_ringbuffer.py @@ -67,7 +67,11 @@ class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput): def _on_audio(self, indata, frames, time_info, status): if status: - logging.warning("SoundDeviceAudioInput: status=%s", status) + # Throttle logging to avoid callback overhead + c = getattr(self, "_status_cnt", 0) + 1 + self._status_cnt = c + if c % 200 == 0: + logging.warning("SoundDeviceAudioInput: status=%s (x%d)", status, c) with self._qlock: self._q.append(bytes(indata)) @@ -76,9 +80,19 @@ class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput): with self._qlock: while self._q and len(self._rb) < needed: self._rb.extend(self._q.popleft()) + # If not enough data yet, wait briefly to accumulate instead of padding immediately. if len(self._rb) < needed: - missing = needed - len(self._rb) - self._rb.extend(b"\x00" * missing) + import time as _t + t0 = _t.perf_counter() + # Wait up to ~15ms in small increments while pulling from _q + while len(self._rb) < needed and (_t.perf_counter() - t0) < 0.015: + with self._qlock: + while self._q and len(self._rb) < needed: + self._rb.extend(self._q.popleft()) + _t.sleep(0.001) + if len(self._rb) < needed: + missing = needed - len(self._rb) + self._rb.extend(b"\x00" * missing) out = bytes(self._rb[:needed]) del self._rb[:needed] @@ -87,18 +101,87 @@ class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput): audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput + +def duplex_main() -> None: + """Simple full-duplex callback stream: copy input directly to output and log latency.""" + logging.basicConfig(level=logging.INFO) + + in_device = 0 + out_device = 1 + sample_rate = 48000 + blocksize = 120 + + try: + stream = sd.RawStream( + samplerate=sample_rate, + blocksize=blocksize, + device=(in_device, out_device), + channels=1, + dtype='int16', + callback=lambda indata, outdata, frames, time_info, status: outdata.__setitem__(slice(None), indata), + ) + except Exception as e: + logging.error("Failed to open full-duplex stream: %s", e) + return + + with stream: + try: + i = 0 + while True: + time.sleep(0.5) + i += 1 + if i % 4 == 0: + lat = getattr(stream, 'latency', None) + in_lat_ms = 0.0 + out_lat_ms = 0.0 + if isinstance(lat, (list, tuple)) and len(lat) >= 2: + in_lat_ms = float(lat[0]) * 1000.0 + out_lat_ms = float(lat[1]) * 1000.0 + elif isinstance(lat, (int, float)): + # If PortAudio reports a single latency, treat as symmetric + in_lat_ms = out_lat_ms = float(lat) * 1000.0 + + blk_ms = (blocksize / sample_rate) * 1000.0 + e2e_ms = in_lat_ms + out_lat_ms + blk_ms + + logging.info( + "duplex: in_lat=%.2fms out_lat=%.2fms blk=%.2fms e2e~%.2fms", + in_lat_ms, + out_lat_ms, + blk_ms, + e2e_ms, + ) + except KeyboardInterrupt: + pass + + async def main() -> None: logging.basicConfig(level=logging.INFO) - device = audio_io.SoundDeviceAudioInput(device_name='1', pcm_format=audio_io.PcmFormat(audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, 48000, 1)) + device = audio_io.SoundDeviceAudioInput( + device_name='0', # Shure MVX2U input (device index 0) + pcm_format=audio_io.PcmFormat( + audio_io.PcmFormat.Endianness.LITTLE, + audio_io.PcmFormat.SampleType.INT16, + 48000, + 1, + ), + ) fmt = await device.open() - ostream = sd.RawOutputStream(samplerate=fmt.sample_rate, device=0, channels=1, dtype='int16', blocksize=480) + ostream = sd.RawOutputStream( + samplerate=fmt.sample_rate, + device=1, # USB Audio output (device index 1) + channels=1, + dtype='int16', + blocksize=480, + ) ostream.start() try: - gen = device.frames(480) + read_w = deque(maxlen=3) write_w = deque(maxlen=3) loop_w = deque(maxlen=3) i = 0 + gen = device.frames(480) while True: t0 = time.perf_counter() t1 = time.perf_counter() @@ -118,6 +201,7 @@ async def main() -> None: in_bytes_rb = len(device._rb) bytes_per_sample = 2 * fmt.channels in_q_ms = ((in_bytes_q + in_bytes_rb) / bytes_per_sample) / fmt.sample_rate * 1000.0 + rb_fill_samples = in_bytes_rb / bytes_per_sample out_lat_ms = 0.0 try: @@ -163,7 +247,7 @@ async def main() -> None: f"read min={min(read_w)*1000:.3f}ms mean={(sum(read_w)/len(read_w))*1000:.3f}ms max={max(read_w)*1000:.3f}ms " f"write min={min(write_w)*1000:.3f}ms mean={(sum(write_w)/len(write_w))*1000:.3f}ms max={max(write_w)*1000:.3f}ms " f"loop min={min(loop_w)*1000:.3f}ms mean={(sum(loop_w)/len(loop_w))*1000:.3f}ms max={max(loop_w)*1000:.3f}ms " - f"qlen={len(device._q)} in_lat={in_lat_ms:.2f}ms in_q={in_q_ms:.2f}ms out_lat={out_lat_ms:.2f}ms out_blk={out_block_ms:.2f}ms out_free={out_free_ms:.2f}ms e2e~{e2e_ms:.2f}ms" + f"qlen={len(device._q)} rbfill={rb_fill_samples:.1f}smp in_lat={in_lat_ms:.2f}ms in_q={in_q_ms:.2f}ms out_lat={out_lat_ms:.2f}ms out_blk={out_block_ms:.2f}ms out_free={out_free_ms:.2f}ms e2e~{e2e_ms:.2f}ms" ) except KeyboardInterrupt: pass @@ -174,5 +258,6 @@ async def main() -> None: except Exception: pass + if __name__ == '__main__': asyncio.run(main())