Files
alsa_latency_test/test_ringbuffer.py
2025-11-17 13:33:17 +01:00

179 lines
6.8 KiB
Python

import audio.io as audio_io
import sounddevice as sd
import logging
import os
from collections import deque
import asyncio
import time
import threading
class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
"""Patched SoundDeviceAudioInput with low-latency capture and adaptive resampling."""
def _open(self):
"""Create RawInputStream with low-latency parameters and initialize ring buffer."""
dev_info = sd.query_devices(self._device)
hostapis = sd.query_hostapis()
api_index = dev_info.get('hostapi')
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
pa_ver = sd.get_portaudio_version()
logging.info(
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
api_name,
dev_info.get('name'),
self._device,
dev_info.get('max_input_channels'),
float(dev_info.get('default_low_input_latency') or 0.0),
float(dev_info.get('default_high_input_latency') or 0.0),
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
)
# Create RawInputStream with injected low-latency parameters
# Target ~2 ms blocksize (48 kHz -> 96 frames). For other rates, keep ~2 ms.
_sr = int(self._pcm_format.sample_rate)
self.log_counter0=0
self._runavg_samps = deque(maxlen=30)
self._runavg = 0
self.max_avail=0
self.logfile_name="available_samples.txt"
self.blocksize = 120
self._frames_offset = 0
if os.path.exists(self.logfile_name):
os.remove(self.logfile_name)
self._q = deque(maxlen=980)
self._rb = bytearray()
self._qlock = threading.Lock()
self._stream = sd.RawInputStream(
samplerate=self._pcm_format.sample_rate,
device=self._device,
channels=self._pcm_format.channels,
dtype='int16',
blocksize=self.blocksize,
latency=0.005,
callback=self._on_audio,
)
self._stream.start()
return audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
audio_io.PcmFormat.SampleType.INT16,
self._pcm_format.sample_rate,
1,
)
def _on_audio(self, indata, frames, time_info, status):
if status:
logging.warning("SoundDeviceAudioInput: status=%s", status)
with self._qlock:
self._q.append(bytes(indata))
def _read(self, frame_size: int) -> bytes:
needed = frame_size * self._pcm_format.channels * 2
with self._qlock:
while self._q and len(self._rb) < needed:
self._rb.extend(self._q.popleft())
if len(self._rb) < needed:
missing = needed - len(self._rb)
self._rb.extend(b"\x00" * missing)
out = bytes(self._rb[:needed])
del self._rb[:needed]
return out
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
async def main() -> None:
logging.basicConfig(level=logging.INFO)
device = audio_io.SoundDeviceAudioInput(device_name='1', pcm_format=audio_io.PcmFormat(audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, 48000, 1))
fmt = await device.open()
ostream = sd.RawOutputStream(samplerate=fmt.sample_rate, device=0, channels=1, dtype='int16', blocksize=480)
ostream.start()
try:
gen = device.frames(480)
read_w = deque(maxlen=3)
write_w = deque(maxlen=3)
loop_w = deque(maxlen=3)
i = 0
while True:
t0 = time.perf_counter()
t1 = time.perf_counter()
frame = await gen.__anext__()
t2 = time.perf_counter()
ostream.write(frame)
t3 = time.perf_counter()
read_w.append(t2 - t1)
write_w.append(t3 - t2)
loop_w.append(t3 - t0)
i += 1
if i % 300 == 0:
try:
in_bytes_q = sum((len(b) for b in list(device._q)))
except Exception:
in_bytes_q = 0
in_bytes_rb = len(device._rb)
bytes_per_sample = 2 * fmt.channels
in_q_ms = ((in_bytes_q + in_bytes_rb) / bytes_per_sample) / fmt.sample_rate * 1000.0
out_lat_ms = 0.0
try:
lat = getattr(ostream, 'latency', None)
if isinstance(lat, (int, float)):
out_lat_ms = float(lat) * 1000.0
elif isinstance(lat, tuple) and len(lat) > 0:
out_lat_ms = float(lat[-1]) * 1000.0
except Exception:
pass
in_lat_ms = 0.0
try:
ilat = getattr(device, '_stream', None)
if ilat is not None:
latv = getattr(ilat, 'latency', None)
if isinstance(latv, (int, float)):
in_lat_ms = float(latv) * 1000.0
elif isinstance(latv, tuple) and len(latv) > 0:
in_lat_ms = float(latv[-1]) * 1000.0
except Exception:
pass
out_free_ms = 0.0
try:
wa = getattr(ostream, 'write_available', None)
if isinstance(wa, int):
out_free_ms = (wa / fmt.sample_rate) * 1000.0
except Exception:
pass
out_block_ms = 0.0
try:
bs = getattr(ostream, 'blocksize', None)
if isinstance(bs, int) and bs > 0:
out_block_ms = (bs / fmt.sample_rate) * 1000.0
except Exception:
pass
e2e_ms = in_lat_ms + in_q_ms + out_lat_ms + out_block_ms
print(
f"read min={min(read_w)*1000:.3f}ms mean={(sum(read_w)/len(read_w))*1000:.3f}ms max={max(read_w)*1000:.3f}ms "
f"write min={min(write_w)*1000:.3f}ms mean={(sum(write_w)/len(write_w))*1000:.3f}ms max={max(write_w)*1000:.3f}ms "
f"loop min={min(loop_w)*1000:.3f}ms mean={(sum(loop_w)/len(loop_w))*1000:.3f}ms max={max(loop_w)*1000:.3f}ms "
f"qlen={len(device._q)} in_lat={in_lat_ms:.2f}ms in_q={in_q_ms:.2f}ms out_lat={out_lat_ms:.2f}ms out_blk={out_block_ms:.2f}ms out_free={out_free_ms:.2f}ms e2e~{e2e_ms:.2f}ms"
)
except KeyboardInterrupt:
pass
finally:
await device.aclose()
try:
ostream.stop(); ostream.close()
except Exception:
pass
if __name__ == '__main__':
asyncio.run(main())