- Replaced per-frame `run_in_executor` calls with single background reader thread in `ThreadedAudioInput` - Reader thread continuously calls `_read()` and enqueues data via `call_soon_threadsafe` to asyncio.Queue - Reduces per-frame scheduling overhead and context-switch jitter while preserving async API - Added thread lifecycle management: lazy start on first `frames()` call, graceful stop in `aclose()` - Update
38 lines
2.1 KiB
Markdown
38 lines
2.1 KiB
Markdown
# Threaded Reader Refactor (Audio Input)
|
||
|
||
This project originally used `run_in_executor` for every audio frame to bridge blocking reads into an async generator. We replaced that per‑frame executor usage with a single background reader thread and an `asyncio.Queue`, keeping the public API and block sizes unchanged.
|
||
|
||
## What changed
|
||
- Before: `ThreadedAudioInput.frames(frame_size)` did:
|
||
- For each frame: `await loop.run_in_executor(..., self._read, frame_size)`
|
||
- Yielded the returned bytes.
|
||
- Now: `ThreadedAudioInput.frames(frame_size)` does:
|
||
- Starts one background reader thread on first use.
|
||
- Reader thread repeatedly calls `self._read(frame_size)` and enqueues results via `loop.call_soon_threadsafe(self._pcm_samples.put_nowait, data)`.
|
||
- The async generator awaits `self._pcm_samples.get()` and yields items.
|
||
|
||
## Why this helps
|
||
- Removes per‑frame executor scheduling and context‑switch overhead.
|
||
- Reduces jitter and extra pipeline delay while preserving the same async API (`async for frame in device.frames(...)`).
|
||
- Plays nicely with existing ringbuffer logic in `ModSoundDeviceAudioInput` without changing block sizes or device setup.
|
||
|
||
## API/behavior preserved
|
||
- Public interface of `ThreadedAudioInput` subclasses is unchanged:
|
||
- `await open()`
|
||
- `frames(frame_size)` → `AsyncGenerator[bytes]`
|
||
- `await aclose()`
|
||
- Block sizes, device indices, and PCM formats are unchanged.
|
||
|
||
## Implementation notes
|
||
- New attributes in `ThreadedAudioInput.__init__`:
|
||
- `_reader_thread: threading.Thread | None`
|
||
- `_running: bool`
|
||
- `_loop: asyncio.AbstractEventLoop | None`
|
||
- `_pcm_samples: asyncio.Queue[bytes]`
|
||
- `frames()` lazily starts `_reader_thread` on first call; the thread stops when `aclose()` is called or `_read()` returns empty bytes.
|
||
- `aclose()` joins the reader thread and then performs the blocking close in the thread pool, as before.
|
||
|
||
## Limitations / next steps
|
||
- The queue is currently unbounded; if you want to strictly cap software latency, consider a bounded queue and dropping oldest frames when full.
|
||
- This refactor does not change ringbuffer sizing or block sizes; those can still influence end‑to‑end latency.
|