- Replaced per-frame `run_in_executor` calls with single background reader thread in `ThreadedAudioInput` - Reader thread continuously calls `_read()` and enqueues data via `call_soon_threadsafe` to asyncio.Queue - Reduces per-frame scheduling overhead and context-switch jitter while preserving async API - Added thread lifecycle management: lazy start on first `frames()` call, graceful stop in `aclose()` - Update
Threaded Reader Refactor (Audio Input)
This project originally used run_in_executor for every audio frame to bridge blocking reads into an async generator. We replaced that per‑frame executor usage with a single background reader thread and an asyncio.Queue, keeping the public API and block sizes unchanged.
What changed
- Before:
ThreadedAudioInput.frames(frame_size)did:- For each frame:
await loop.run_in_executor(..., self._read, frame_size) - Yielded the returned bytes.
- For each frame:
- Now:
ThreadedAudioInput.frames(frame_size)does:- Starts one background reader thread on first use.
- Reader thread repeatedly calls
self._read(frame_size)and enqueues results vialoop.call_soon_threadsafe(self._pcm_samples.put_nowait, data). - The async generator awaits
self._pcm_samples.get()and yields items.
Why this helps
- Removes per‑frame executor scheduling and context‑switch overhead.
- Reduces jitter and extra pipeline delay while preserving the same async API (
async for frame in device.frames(...)). - Plays nicely with existing ringbuffer logic in
ModSoundDeviceAudioInputwithout changing block sizes or device setup.
API/behavior preserved
- Public interface of
ThreadedAudioInputsubclasses is unchanged:await open()frames(frame_size)→AsyncGenerator[bytes]await aclose()
- Block sizes, device indices, and PCM formats are unchanged.
Implementation notes
- New attributes in
ThreadedAudioInput.__init__:_reader_thread: threading.Thread | None_running: bool_loop: asyncio.AbstractEventLoop | None_pcm_samples: asyncio.Queue[bytes]
frames()lazily starts_reader_threadon first call; the thread stops whenaclose()is called or_read()returns empty bytes.aclose()joins the reader thread and then performs the blocking close in the thread pool, as before.
Limitations / next steps
- The queue is currently unbounded; if you want to strictly cap software latency, consider a bounded queue and dropping oldest frames when full.
- This refactor does not change ringbuffer sizing or block sizes; those can still influence end‑to‑end latency.
Description
Languages
Python
100%