feat: refactor audio input to use dedicated reader thread instead of per-frame executor
- Replaced per-frame `run_in_executor` calls with single background reader thread in `ThreadedAudioInput` - Reader thread continuously calls `_read()` and enqueues data via `call_soon_threadsafe` to asyncio.Queue - Reduces per-frame scheduling overhead and context-switch jitter while preserving async API - Added thread lifecycle management: lazy start on first `frames()` call, graceful stop in `aclose()` - Update
This commit is contained in:
37
README.md
37
README.md
@@ -0,0 +1,37 @@
|
|||||||
|
# Threaded Reader Refactor (Audio Input)
|
||||||
|
|
||||||
|
This project originally used `run_in_executor` for every audio frame to bridge blocking reads into an async generator. We replaced that per‑frame executor usage with a single background reader thread and an `asyncio.Queue`, keeping the public API and block sizes unchanged.
|
||||||
|
|
||||||
|
## What changed
|
||||||
|
- Before: `ThreadedAudioInput.frames(frame_size)` did:
|
||||||
|
- For each frame: `await loop.run_in_executor(..., self._read, frame_size)`
|
||||||
|
- Yielded the returned bytes.
|
||||||
|
- Now: `ThreadedAudioInput.frames(frame_size)` does:
|
||||||
|
- Starts one background reader thread on first use.
|
||||||
|
- Reader thread repeatedly calls `self._read(frame_size)` and enqueues results via `loop.call_soon_threadsafe(self._pcm_samples.put_nowait, data)`.
|
||||||
|
- The async generator awaits `self._pcm_samples.get()` and yields items.
|
||||||
|
|
||||||
|
## Why this helps
|
||||||
|
- Removes per‑frame executor scheduling and context‑switch overhead.
|
||||||
|
- Reduces jitter and extra pipeline delay while preserving the same async API (`async for frame in device.frames(...)`).
|
||||||
|
- Plays nicely with existing ringbuffer logic in `ModSoundDeviceAudioInput` without changing block sizes or device setup.
|
||||||
|
|
||||||
|
## API/behavior preserved
|
||||||
|
- Public interface of `ThreadedAudioInput` subclasses is unchanged:
|
||||||
|
- `await open()`
|
||||||
|
- `frames(frame_size)` → `AsyncGenerator[bytes]`
|
||||||
|
- `await aclose()`
|
||||||
|
- Block sizes, device indices, and PCM formats are unchanged.
|
||||||
|
|
||||||
|
## Implementation notes
|
||||||
|
- New attributes in `ThreadedAudioInput.__init__`:
|
||||||
|
- `_reader_thread: threading.Thread | None`
|
||||||
|
- `_running: bool`
|
||||||
|
- `_loop: asyncio.AbstractEventLoop | None`
|
||||||
|
- `_pcm_samples: asyncio.Queue[bytes]`
|
||||||
|
- `frames()` lazily starts `_reader_thread` on first call; the thread stops when `aclose()` is called or `_read()` returns empty bytes.
|
||||||
|
- `aclose()` joins the reader thread and then performs the blocking close in the thread pool, as before.
|
||||||
|
|
||||||
|
## Limitations / next steps
|
||||||
|
- The queue is currently unbounded; if you want to strictly cap software latency, consider a bounded queue and dropping oldest frames when full.
|
||||||
|
- This refactor does not change ringbuffer sizing or block sizes; those can still influence end‑to‑end latency.
|
||||||
|
|||||||
44
audio/io.py
44
audio/io.py
@@ -27,6 +27,7 @@ import sys
|
|||||||
import wave
|
import wave
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from typing import TYPE_CHECKING, AsyncGenerator, BinaryIO
|
from typing import TYPE_CHECKING, AsyncGenerator, BinaryIO
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -406,6 +407,9 @@ class ThreadedAudioInput(AudioInput):
|
|||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._thread_pool = ThreadPoolExecutor(1)
|
self._thread_pool = ThreadPoolExecutor(1)
|
||||||
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
|
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
|
||||||
|
self._reader_thread: threading.Thread | None = None
|
||||||
|
self._running: bool = False
|
||||||
|
self._loop: asyncio.AbstractEventLoop | None = None
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _read(self, frame_size: int) -> bytes:
|
def _read(self, frame_size: int) -> bytes:
|
||||||
@@ -424,12 +428,46 @@ class ThreadedAudioInput(AudioInput):
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
|
async def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
|
||||||
while pcm_sample := await asyncio.get_running_loop().run_in_executor(
|
# Start a dedicated reader thread on first use to avoid per-frame
|
||||||
self._thread_pool, self._read, frame_size
|
# run_in_executor overhead while preserving the same async API.
|
||||||
):
|
if not self._running:
|
||||||
|
self._running = True
|
||||||
|
self._loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
def _reader() -> None:
|
||||||
|
try:
|
||||||
|
while self._running:
|
||||||
|
pcm_sample = self._read(frame_size)
|
||||||
|
if not pcm_sample:
|
||||||
|
# Propagate termination to the async generator.
|
||||||
|
if self._loop is not None:
|
||||||
|
self._loop.call_soon_threadsafe(
|
||||||
|
self._pcm_samples.put_nowait, b""
|
||||||
|
)
|
||||||
|
break
|
||||||
|
if self._loop is not None:
|
||||||
|
self._loop.call_soon_threadsafe(
|
||||||
|
self._pcm_samples.put_nowait, pcm_sample
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("ThreadedAudioInput reader thread failed")
|
||||||
|
|
||||||
|
self._reader_thread = threading.Thread(target=_reader, daemon=True)
|
||||||
|
self._reader_thread.start()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
pcm_sample = await self._pcm_samples.get()
|
||||||
|
if not pcm_sample:
|
||||||
|
break
|
||||||
yield pcm_sample
|
yield pcm_sample
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
|
# Stop reader thread first so no more _read() calls are issued.
|
||||||
|
self._running = False
|
||||||
|
if self._reader_thread is not None:
|
||||||
|
self._reader_thread.join(timeout=1.0)
|
||||||
|
self._reader_thread = None
|
||||||
|
|
||||||
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
|
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
|
||||||
self._thread_pool.shutdown()
|
self._thread_pool.shutdown()
|
||||||
|
|
||||||
|
|||||||
@@ -25,3 +25,7 @@ print("\nHints:")
|
|||||||
print("- Pick an INPUT index with in>0 that matches your capture device name (e.g., 'USB Audio Device').")
|
print("- Pick an INPUT index with in>0 that matches your capture device name (e.g., 'USB Audio Device').")
|
||||||
print("- Pick an OUTPUT index with out>0 that matches your playback device name (e.g., 'USB Audio').")
|
print("- Pick an OUTPUT index with out>0 that matches your playback device name (e.g., 'USB Audio').")
|
||||||
print("- We will use mono (channels=1). If mono fails, we can fall back to 2 channels.")
|
print("- We will use mono (channels=1). If mono fails, we can fall back to 2 channels.")
|
||||||
|
|
||||||
|
print("\nALSA hw: device suggestions for your setup:")
|
||||||
|
print("- Input (Shure MVX2U: USB Audio (hw:0,0)) -> use 'hw:0,0'")
|
||||||
|
print("- Output (USB Audio: - (hw:1,0)) -> use 'hw:1,0'")
|
||||||
|
|||||||
@@ -3,24 +3,21 @@ import sounddevice as sd
|
|||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
in_device = 1
|
in_device = 'hw:0,0' # Shure MVX2U input
|
||||||
out_device = 0
|
out_device = 'hw:1,0' # USB Audio output
|
||||||
sample_rate = 48000
|
sample_rate = 48000
|
||||||
frame_size = 480
|
frame_size = 480
|
||||||
|
|
||||||
indev = int(in_device)
|
dinfo = sd.query_devices(in_device)
|
||||||
outdev = int(out_device)
|
doutfo = sd.query_devices(out_device)
|
||||||
|
print(f"Input device {in_device} has no input channels: {dinfo}")
|
||||||
dinfo = sd.query_devices(indev)
|
|
||||||
doutfo = sd.query_devices(outdev)
|
|
||||||
print(f"Input device {indev} has no input channels: {dinfo}")
|
|
||||||
inputs = [
|
inputs = [
|
||||||
(d['index'], d['name'], d['max_input_channels'])
|
(d['index'], d['name'], d['max_input_channels'])
|
||||||
for d in sd.query_devices()
|
for d in sd.query_devices()
|
||||||
if d.get('max_input_channels', 0) > 0
|
if d.get('max_input_channels', 0) > 0
|
||||||
]
|
]
|
||||||
print('Input-capable devices:', inputs)
|
print('Input-capable devices:', inputs)
|
||||||
print(f"Output device {outdev} has no output channels: {doutfo}")
|
print(f"Output device {out_device} has no output channels: {doutfo}")
|
||||||
outputs = [
|
outputs = [
|
||||||
(d['index'], d['name'], d['max_output_channels'])
|
(d['index'], d['name'], d['max_output_channels'])
|
||||||
for d in sd.query_devices()
|
for d in sd.query_devices()
|
||||||
@@ -30,14 +27,14 @@ def main() -> None:
|
|||||||
|
|
||||||
istream = sd.RawInputStream(
|
istream = sd.RawInputStream(
|
||||||
samplerate=sample_rate,
|
samplerate=sample_rate,
|
||||||
device=indev,
|
device=in_device,
|
||||||
channels=1,
|
channels=1,
|
||||||
dtype='int16',
|
dtype='int16',
|
||||||
blocksize=frame_size,
|
blocksize=frame_size,
|
||||||
)
|
)
|
||||||
ostream = sd.RawOutputStream(
|
ostream = sd.RawOutputStream(
|
||||||
samplerate=sample_rate,
|
samplerate=sample_rate,
|
||||||
device=outdev,
|
device=out_device,
|
||||||
channels=1,
|
channels=1,
|
||||||
dtype='int16',
|
dtype='int16',
|
||||||
blocksize=frame_size,
|
blocksize=frame_size,
|
||||||
|
|||||||
@@ -67,7 +67,11 @@ class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
|
|||||||
|
|
||||||
def _on_audio(self, indata, frames, time_info, status):
|
def _on_audio(self, indata, frames, time_info, status):
|
||||||
if status:
|
if status:
|
||||||
logging.warning("SoundDeviceAudioInput: status=%s", status)
|
# Throttle logging to avoid callback overhead
|
||||||
|
c = getattr(self, "_status_cnt", 0) + 1
|
||||||
|
self._status_cnt = c
|
||||||
|
if c % 200 == 0:
|
||||||
|
logging.warning("SoundDeviceAudioInput: status=%s (x%d)", status, c)
|
||||||
with self._qlock:
|
with self._qlock:
|
||||||
self._q.append(bytes(indata))
|
self._q.append(bytes(indata))
|
||||||
|
|
||||||
@@ -76,9 +80,19 @@ class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
|
|||||||
with self._qlock:
|
with self._qlock:
|
||||||
while self._q and len(self._rb) < needed:
|
while self._q and len(self._rb) < needed:
|
||||||
self._rb.extend(self._q.popleft())
|
self._rb.extend(self._q.popleft())
|
||||||
|
# If not enough data yet, wait briefly to accumulate instead of padding immediately.
|
||||||
if len(self._rb) < needed:
|
if len(self._rb) < needed:
|
||||||
missing = needed - len(self._rb)
|
import time as _t
|
||||||
self._rb.extend(b"\x00" * missing)
|
t0 = _t.perf_counter()
|
||||||
|
# Wait up to ~15ms in small increments while pulling from _q
|
||||||
|
while len(self._rb) < needed and (_t.perf_counter() - t0) < 0.015:
|
||||||
|
with self._qlock:
|
||||||
|
while self._q and len(self._rb) < needed:
|
||||||
|
self._rb.extend(self._q.popleft())
|
||||||
|
_t.sleep(0.001)
|
||||||
|
if len(self._rb) < needed:
|
||||||
|
missing = needed - len(self._rb)
|
||||||
|
self._rb.extend(b"\x00" * missing)
|
||||||
|
|
||||||
out = bytes(self._rb[:needed])
|
out = bytes(self._rb[:needed])
|
||||||
del self._rb[:needed]
|
del self._rb[:needed]
|
||||||
@@ -87,18 +101,87 @@ class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
|
|||||||
|
|
||||||
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
|
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
|
||||||
|
|
||||||
|
|
||||||
|
def duplex_main() -> None:
|
||||||
|
"""Simple full-duplex callback stream: copy input directly to output and log latency."""
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
in_device = 0
|
||||||
|
out_device = 1
|
||||||
|
sample_rate = 48000
|
||||||
|
blocksize = 120
|
||||||
|
|
||||||
|
try:
|
||||||
|
stream = sd.RawStream(
|
||||||
|
samplerate=sample_rate,
|
||||||
|
blocksize=blocksize,
|
||||||
|
device=(in_device, out_device),
|
||||||
|
channels=1,
|
||||||
|
dtype='int16',
|
||||||
|
callback=lambda indata, outdata, frames, time_info, status: outdata.__setitem__(slice(None), indata),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("Failed to open full-duplex stream: %s", e)
|
||||||
|
return
|
||||||
|
|
||||||
|
with stream:
|
||||||
|
try:
|
||||||
|
i = 0
|
||||||
|
while True:
|
||||||
|
time.sleep(0.5)
|
||||||
|
i += 1
|
||||||
|
if i % 4 == 0:
|
||||||
|
lat = getattr(stream, 'latency', None)
|
||||||
|
in_lat_ms = 0.0
|
||||||
|
out_lat_ms = 0.0
|
||||||
|
if isinstance(lat, (list, tuple)) and len(lat) >= 2:
|
||||||
|
in_lat_ms = float(lat[0]) * 1000.0
|
||||||
|
out_lat_ms = float(lat[1]) * 1000.0
|
||||||
|
elif isinstance(lat, (int, float)):
|
||||||
|
# If PortAudio reports a single latency, treat as symmetric
|
||||||
|
in_lat_ms = out_lat_ms = float(lat) * 1000.0
|
||||||
|
|
||||||
|
blk_ms = (blocksize / sample_rate) * 1000.0
|
||||||
|
e2e_ms = in_lat_ms + out_lat_ms + blk_ms
|
||||||
|
|
||||||
|
logging.info(
|
||||||
|
"duplex: in_lat=%.2fms out_lat=%.2fms blk=%.2fms e2e~%.2fms",
|
||||||
|
in_lat_ms,
|
||||||
|
out_lat_ms,
|
||||||
|
blk_ms,
|
||||||
|
e2e_ms,
|
||||||
|
)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
device = audio_io.SoundDeviceAudioInput(device_name='1', pcm_format=audio_io.PcmFormat(audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, 48000, 1))
|
device = audio_io.SoundDeviceAudioInput(
|
||||||
|
device_name='0', # Shure MVX2U input (device index 0)
|
||||||
|
pcm_format=audio_io.PcmFormat(
|
||||||
|
audio_io.PcmFormat.Endianness.LITTLE,
|
||||||
|
audio_io.PcmFormat.SampleType.INT16,
|
||||||
|
48000,
|
||||||
|
1,
|
||||||
|
),
|
||||||
|
)
|
||||||
fmt = await device.open()
|
fmt = await device.open()
|
||||||
ostream = sd.RawOutputStream(samplerate=fmt.sample_rate, device=0, channels=1, dtype='int16', blocksize=480)
|
ostream = sd.RawOutputStream(
|
||||||
|
samplerate=fmt.sample_rate,
|
||||||
|
device=1, # USB Audio output (device index 1)
|
||||||
|
channels=1,
|
||||||
|
dtype='int16',
|
||||||
|
blocksize=480,
|
||||||
|
)
|
||||||
ostream.start()
|
ostream.start()
|
||||||
try:
|
try:
|
||||||
gen = device.frames(480)
|
|
||||||
read_w = deque(maxlen=3)
|
read_w = deque(maxlen=3)
|
||||||
write_w = deque(maxlen=3)
|
write_w = deque(maxlen=3)
|
||||||
loop_w = deque(maxlen=3)
|
loop_w = deque(maxlen=3)
|
||||||
i = 0
|
i = 0
|
||||||
|
gen = device.frames(480)
|
||||||
while True:
|
while True:
|
||||||
t0 = time.perf_counter()
|
t0 = time.perf_counter()
|
||||||
t1 = time.perf_counter()
|
t1 = time.perf_counter()
|
||||||
@@ -118,6 +201,7 @@ async def main() -> None:
|
|||||||
in_bytes_rb = len(device._rb)
|
in_bytes_rb = len(device._rb)
|
||||||
bytes_per_sample = 2 * fmt.channels
|
bytes_per_sample = 2 * fmt.channels
|
||||||
in_q_ms = ((in_bytes_q + in_bytes_rb) / bytes_per_sample) / fmt.sample_rate * 1000.0
|
in_q_ms = ((in_bytes_q + in_bytes_rb) / bytes_per_sample) / fmt.sample_rate * 1000.0
|
||||||
|
rb_fill_samples = in_bytes_rb / bytes_per_sample
|
||||||
|
|
||||||
out_lat_ms = 0.0
|
out_lat_ms = 0.0
|
||||||
try:
|
try:
|
||||||
@@ -163,7 +247,7 @@ async def main() -> None:
|
|||||||
f"read min={min(read_w)*1000:.3f}ms mean={(sum(read_w)/len(read_w))*1000:.3f}ms max={max(read_w)*1000:.3f}ms "
|
f"read min={min(read_w)*1000:.3f}ms mean={(sum(read_w)/len(read_w))*1000:.3f}ms max={max(read_w)*1000:.3f}ms "
|
||||||
f"write min={min(write_w)*1000:.3f}ms mean={(sum(write_w)/len(write_w))*1000:.3f}ms max={max(write_w)*1000:.3f}ms "
|
f"write min={min(write_w)*1000:.3f}ms mean={(sum(write_w)/len(write_w))*1000:.3f}ms max={max(write_w)*1000:.3f}ms "
|
||||||
f"loop min={min(loop_w)*1000:.3f}ms mean={(sum(loop_w)/len(loop_w))*1000:.3f}ms max={max(loop_w)*1000:.3f}ms "
|
f"loop min={min(loop_w)*1000:.3f}ms mean={(sum(loop_w)/len(loop_w))*1000:.3f}ms max={max(loop_w)*1000:.3f}ms "
|
||||||
f"qlen={len(device._q)} in_lat={in_lat_ms:.2f}ms in_q={in_q_ms:.2f}ms out_lat={out_lat_ms:.2f}ms out_blk={out_block_ms:.2f}ms out_free={out_free_ms:.2f}ms e2e~{e2e_ms:.2f}ms"
|
f"qlen={len(device._q)} rbfill={rb_fill_samples:.1f}smp in_lat={in_lat_ms:.2f}ms in_q={in_q_ms:.2f}ms out_lat={out_lat_ms:.2f}ms out_blk={out_block_ms:.2f}ms out_free={out_free_ms:.2f}ms e2e~{e2e_ms:.2f}ms"
|
||||||
)
|
)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
@@ -174,5 +258,6 @@ async def main() -> None:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|||||||
Reference in New Issue
Block a user