- Replaced per-frame `run_in_executor` calls with single background reader thread in `ThreadedAudioInput` - Reader thread continuously calls `_read()` and enqueues data via `call_soon_threadsafe` to asyncio.Queue - Reduces per-frame scheduling overhead and context-switch jitter while preserving async API - Added thread lifecycle management: lazy start on first `frames()` call, graceful stop in `aclose()` - Update
264 lines
9.6 KiB
Python
264 lines
9.6 KiB
Python
import audio.io as audio_io
|
|
import sounddevice as sd
|
|
import logging
|
|
import os
|
|
from collections import deque
|
|
import asyncio
|
|
import time
|
|
import threading
|
|
|
|
class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
|
|
"""Patched SoundDeviceAudioInput with low-latency capture and adaptive resampling."""
|
|
|
|
def _open(self):
|
|
"""Create RawInputStream with low-latency parameters and initialize ring buffer."""
|
|
dev_info = sd.query_devices(self._device)
|
|
hostapis = sd.query_hostapis()
|
|
api_index = dev_info.get('hostapi')
|
|
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
|
|
pa_ver = sd.get_portaudio_version()
|
|
|
|
logging.info(
|
|
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
|
|
api_name,
|
|
dev_info.get('name'),
|
|
self._device,
|
|
dev_info.get('max_input_channels'),
|
|
float(dev_info.get('default_low_input_latency') or 0.0),
|
|
float(dev_info.get('default_high_input_latency') or 0.0),
|
|
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
|
|
)
|
|
# Create RawInputStream with injected low-latency parameters
|
|
# Target ~2 ms blocksize (48 kHz -> 96 frames). For other rates, keep ~2 ms.
|
|
_sr = int(self._pcm_format.sample_rate)
|
|
|
|
self.log_counter0=0
|
|
self._runavg_samps = deque(maxlen=30)
|
|
self._runavg = 0
|
|
self.max_avail=0
|
|
self.logfile_name="available_samples.txt"
|
|
self.blocksize = 120
|
|
self._frames_offset = 0
|
|
|
|
if os.path.exists(self.logfile_name):
|
|
os.remove(self.logfile_name)
|
|
|
|
self._q = deque(maxlen=980)
|
|
self._rb = bytearray()
|
|
self._qlock = threading.Lock()
|
|
|
|
self._stream = sd.RawInputStream(
|
|
samplerate=self._pcm_format.sample_rate,
|
|
device=self._device,
|
|
channels=self._pcm_format.channels,
|
|
dtype='int16',
|
|
blocksize=self.blocksize,
|
|
latency=0.005,
|
|
callback=self._on_audio,
|
|
)
|
|
self._stream.start()
|
|
|
|
return audio_io.PcmFormat(
|
|
audio_io.PcmFormat.Endianness.LITTLE,
|
|
audio_io.PcmFormat.SampleType.INT16,
|
|
self._pcm_format.sample_rate,
|
|
1,
|
|
)
|
|
|
|
def _on_audio(self, indata, frames, time_info, status):
|
|
if status:
|
|
# Throttle logging to avoid callback overhead
|
|
c = getattr(self, "_status_cnt", 0) + 1
|
|
self._status_cnt = c
|
|
if c % 200 == 0:
|
|
logging.warning("SoundDeviceAudioInput: status=%s (x%d)", status, c)
|
|
with self._qlock:
|
|
self._q.append(bytes(indata))
|
|
|
|
def _read(self, frame_size: int) -> bytes:
|
|
needed = frame_size * self._pcm_format.channels * 2
|
|
with self._qlock:
|
|
while self._q and len(self._rb) < needed:
|
|
self._rb.extend(self._q.popleft())
|
|
# If not enough data yet, wait briefly to accumulate instead of padding immediately.
|
|
if len(self._rb) < needed:
|
|
import time as _t
|
|
t0 = _t.perf_counter()
|
|
# Wait up to ~15ms in small increments while pulling from _q
|
|
while len(self._rb) < needed and (_t.perf_counter() - t0) < 0.015:
|
|
with self._qlock:
|
|
while self._q and len(self._rb) < needed:
|
|
self._rb.extend(self._q.popleft())
|
|
_t.sleep(0.001)
|
|
if len(self._rb) < needed:
|
|
missing = needed - len(self._rb)
|
|
self._rb.extend(b"\x00" * missing)
|
|
|
|
out = bytes(self._rb[:needed])
|
|
del self._rb[:needed]
|
|
return out
|
|
|
|
|
|
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
|
|
|
|
|
|
def duplex_main() -> None:
|
|
"""Simple full-duplex callback stream: copy input directly to output and log latency."""
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
in_device = 0
|
|
out_device = 1
|
|
sample_rate = 48000
|
|
blocksize = 120
|
|
|
|
try:
|
|
stream = sd.RawStream(
|
|
samplerate=sample_rate,
|
|
blocksize=blocksize,
|
|
device=(in_device, out_device),
|
|
channels=1,
|
|
dtype='int16',
|
|
callback=lambda indata, outdata, frames, time_info, status: outdata.__setitem__(slice(None), indata),
|
|
)
|
|
except Exception as e:
|
|
logging.error("Failed to open full-duplex stream: %s", e)
|
|
return
|
|
|
|
with stream:
|
|
try:
|
|
i = 0
|
|
while True:
|
|
time.sleep(0.5)
|
|
i += 1
|
|
if i % 4 == 0:
|
|
lat = getattr(stream, 'latency', None)
|
|
in_lat_ms = 0.0
|
|
out_lat_ms = 0.0
|
|
if isinstance(lat, (list, tuple)) and len(lat) >= 2:
|
|
in_lat_ms = float(lat[0]) * 1000.0
|
|
out_lat_ms = float(lat[1]) * 1000.0
|
|
elif isinstance(lat, (int, float)):
|
|
# If PortAudio reports a single latency, treat as symmetric
|
|
in_lat_ms = out_lat_ms = float(lat) * 1000.0
|
|
|
|
blk_ms = (blocksize / sample_rate) * 1000.0
|
|
e2e_ms = in_lat_ms + out_lat_ms + blk_ms
|
|
|
|
logging.info(
|
|
"duplex: in_lat=%.2fms out_lat=%.2fms blk=%.2fms e2e~%.2fms",
|
|
in_lat_ms,
|
|
out_lat_ms,
|
|
blk_ms,
|
|
e2e_ms,
|
|
)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
async def main() -> None:
|
|
logging.basicConfig(level=logging.INFO)
|
|
device = audio_io.SoundDeviceAudioInput(
|
|
device_name='0', # Shure MVX2U input (device index 0)
|
|
pcm_format=audio_io.PcmFormat(
|
|
audio_io.PcmFormat.Endianness.LITTLE,
|
|
audio_io.PcmFormat.SampleType.INT16,
|
|
48000,
|
|
1,
|
|
),
|
|
)
|
|
fmt = await device.open()
|
|
ostream = sd.RawOutputStream(
|
|
samplerate=fmt.sample_rate,
|
|
device=1, # USB Audio output (device index 1)
|
|
channels=1,
|
|
dtype='int16',
|
|
blocksize=480,
|
|
)
|
|
ostream.start()
|
|
try:
|
|
|
|
read_w = deque(maxlen=3)
|
|
write_w = deque(maxlen=3)
|
|
loop_w = deque(maxlen=3)
|
|
i = 0
|
|
gen = device.frames(480)
|
|
while True:
|
|
t0 = time.perf_counter()
|
|
t1 = time.perf_counter()
|
|
frame = await gen.__anext__()
|
|
t2 = time.perf_counter()
|
|
ostream.write(frame)
|
|
t3 = time.perf_counter()
|
|
read_w.append(t2 - t1)
|
|
write_w.append(t3 - t2)
|
|
loop_w.append(t3 - t0)
|
|
i += 1
|
|
if i % 300 == 0:
|
|
try:
|
|
in_bytes_q = sum((len(b) for b in list(device._q)))
|
|
except Exception:
|
|
in_bytes_q = 0
|
|
in_bytes_rb = len(device._rb)
|
|
bytes_per_sample = 2 * fmt.channels
|
|
in_q_ms = ((in_bytes_q + in_bytes_rb) / bytes_per_sample) / fmt.sample_rate * 1000.0
|
|
rb_fill_samples = in_bytes_rb / bytes_per_sample
|
|
|
|
out_lat_ms = 0.0
|
|
try:
|
|
lat = getattr(ostream, 'latency', None)
|
|
if isinstance(lat, (int, float)):
|
|
out_lat_ms = float(lat) * 1000.0
|
|
elif isinstance(lat, tuple) and len(lat) > 0:
|
|
out_lat_ms = float(lat[-1]) * 1000.0
|
|
except Exception:
|
|
pass
|
|
|
|
in_lat_ms = 0.0
|
|
try:
|
|
ilat = getattr(device, '_stream', None)
|
|
if ilat is not None:
|
|
latv = getattr(ilat, 'latency', None)
|
|
if isinstance(latv, (int, float)):
|
|
in_lat_ms = float(latv) * 1000.0
|
|
elif isinstance(latv, tuple) and len(latv) > 0:
|
|
in_lat_ms = float(latv[-1]) * 1000.0
|
|
except Exception:
|
|
pass
|
|
|
|
out_free_ms = 0.0
|
|
try:
|
|
wa = getattr(ostream, 'write_available', None)
|
|
if isinstance(wa, int):
|
|
out_free_ms = (wa / fmt.sample_rate) * 1000.0
|
|
except Exception:
|
|
pass
|
|
|
|
out_block_ms = 0.0
|
|
try:
|
|
bs = getattr(ostream, 'blocksize', None)
|
|
if isinstance(bs, int) and bs > 0:
|
|
out_block_ms = (bs / fmt.sample_rate) * 1000.0
|
|
except Exception:
|
|
pass
|
|
|
|
e2e_ms = in_lat_ms + in_q_ms + out_lat_ms + out_block_ms
|
|
|
|
print(
|
|
f"read min={min(read_w)*1000:.3f}ms mean={(sum(read_w)/len(read_w))*1000:.3f}ms max={max(read_w)*1000:.3f}ms "
|
|
f"write min={min(write_w)*1000:.3f}ms mean={(sum(write_w)/len(write_w))*1000:.3f}ms max={max(write_w)*1000:.3f}ms "
|
|
f"loop min={min(loop_w)*1000:.3f}ms mean={(sum(loop_w)/len(loop_w))*1000:.3f}ms max={max(loop_w)*1000:.3f}ms "
|
|
f"qlen={len(device._q)} rbfill={rb_fill_samples:.1f}smp in_lat={in_lat_ms:.2f}ms in_q={in_q_ms:.2f}ms out_lat={out_lat_ms:.2f}ms out_blk={out_block_ms:.2f}ms out_free={out_free_ms:.2f}ms e2e~{e2e_ms:.2f}ms"
|
|
)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
await device.aclose()
|
|
try:
|
|
ostream.stop(); ostream.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|