import audio.io as audio_io import sounddevice as sd import logging import os from collections import deque import asyncio import time import threading class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput): """Patched SoundDeviceAudioInput with low-latency capture and adaptive resampling.""" def _open(self): """Create RawInputStream with low-latency parameters and initialize ring buffer.""" dev_info = sd.query_devices(self._device) hostapis = sd.query_hostapis() api_index = dev_info.get('hostapi') api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown' pa_ver = sd.get_portaudio_version() logging.info( "SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s", api_name, dev_info.get('name'), self._device, dev_info.get('max_input_channels'), float(dev_info.get('default_low_input_latency') or 0.0), float(dev_info.get('default_high_input_latency') or 0.0), pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver, ) # Create RawInputStream with injected low-latency parameters # Target ~2 ms blocksize (48 kHz -> 96 frames). For other rates, keep ~2 ms. _sr = int(self._pcm_format.sample_rate) self.log_counter0=0 self._runavg_samps = deque(maxlen=30) self._runavg = 0 self.max_avail=0 self.logfile_name="available_samples.txt" self.blocksize = 120 self._frames_offset = 0 if os.path.exists(self.logfile_name): os.remove(self.logfile_name) self._q = deque(maxlen=980) self._rb = bytearray() self._qlock = threading.Lock() self._stream = sd.RawInputStream( samplerate=self._pcm_format.sample_rate, device=self._device, channels=self._pcm_format.channels, dtype='int16', blocksize=self.blocksize, latency=0.005, callback=self._on_audio, ) self._stream.start() return audio_io.PcmFormat( audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, self._pcm_format.sample_rate, 1, ) def _on_audio(self, indata, frames, time_info, status): if status: # Throttle logging to avoid callback overhead c = getattr(self, "_status_cnt", 0) + 1 self._status_cnt = c if c % 200 == 0: logging.warning("SoundDeviceAudioInput: status=%s (x%d)", status, c) with self._qlock: self._q.append(bytes(indata)) def _read(self, frame_size: int) -> bytes: needed = frame_size * self._pcm_format.channels * 2 with self._qlock: while self._q and len(self._rb) < needed: self._rb.extend(self._q.popleft()) # If not enough data yet, wait briefly to accumulate instead of padding immediately. if len(self._rb) < needed: import time as _t t0 = _t.perf_counter() # Wait up to ~15ms in small increments while pulling from _q while len(self._rb) < needed and (_t.perf_counter() - t0) < 0.015: with self._qlock: while self._q and len(self._rb) < needed: self._rb.extend(self._q.popleft()) _t.sleep(0.001) if len(self._rb) < needed: missing = needed - len(self._rb) self._rb.extend(b"\x00" * missing) out = bytes(self._rb[:needed]) del self._rb[:needed] return out audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput def duplex_main() -> None: """Simple full-duplex callback stream: copy input directly to output and log latency.""" logging.basicConfig(level=logging.INFO) in_device = 0 out_device = 1 sample_rate = 48000 blocksize = 120 try: stream = sd.RawStream( samplerate=sample_rate, blocksize=blocksize, device=(in_device, out_device), channels=1, dtype='int16', callback=lambda indata, outdata, frames, time_info, status: outdata.__setitem__(slice(None), indata), ) except Exception as e: logging.error("Failed to open full-duplex stream: %s", e) return with stream: try: i = 0 while True: time.sleep(0.5) i += 1 if i % 4 == 0: lat = getattr(stream, 'latency', None) in_lat_ms = 0.0 out_lat_ms = 0.0 if isinstance(lat, (list, tuple)) and len(lat) >= 2: in_lat_ms = float(lat[0]) * 1000.0 out_lat_ms = float(lat[1]) * 1000.0 elif isinstance(lat, (int, float)): # If PortAudio reports a single latency, treat as symmetric in_lat_ms = out_lat_ms = float(lat) * 1000.0 blk_ms = (blocksize / sample_rate) * 1000.0 e2e_ms = in_lat_ms + out_lat_ms + blk_ms logging.info( "duplex: in_lat=%.2fms out_lat=%.2fms blk=%.2fms e2e~%.2fms", in_lat_ms, out_lat_ms, blk_ms, e2e_ms, ) except KeyboardInterrupt: pass async def main() -> None: logging.basicConfig(level=logging.INFO) device = audio_io.SoundDeviceAudioInput( device_name='0', # Shure MVX2U input (device index 0) pcm_format=audio_io.PcmFormat( audio_io.PcmFormat.Endianness.LITTLE, audio_io.PcmFormat.SampleType.INT16, 48000, 1, ), ) fmt = await device.open() ostream = sd.RawOutputStream( samplerate=fmt.sample_rate, device=1, # USB Audio output (device index 1) channels=1, dtype='int16', blocksize=480, ) ostream.start() try: read_w = deque(maxlen=3) write_w = deque(maxlen=3) loop_w = deque(maxlen=3) i = 0 gen = device.frames(480) while True: t0 = time.perf_counter() t1 = time.perf_counter() frame = await gen.__anext__() t2 = time.perf_counter() ostream.write(frame) t3 = time.perf_counter() read_w.append(t2 - t1) write_w.append(t3 - t2) loop_w.append(t3 - t0) i += 1 if i % 300 == 0: try: in_bytes_q = sum((len(b) for b in list(device._q))) except Exception: in_bytes_q = 0 in_bytes_rb = len(device._rb) bytes_per_sample = 2 * fmt.channels in_q_ms = ((in_bytes_q + in_bytes_rb) / bytes_per_sample) / fmt.sample_rate * 1000.0 rb_fill_samples = in_bytes_rb / bytes_per_sample out_lat_ms = 0.0 try: lat = getattr(ostream, 'latency', None) if isinstance(lat, (int, float)): out_lat_ms = float(lat) * 1000.0 elif isinstance(lat, tuple) and len(lat) > 0: out_lat_ms = float(lat[-1]) * 1000.0 except Exception: pass in_lat_ms = 0.0 try: ilat = getattr(device, '_stream', None) if ilat is not None: latv = getattr(ilat, 'latency', None) if isinstance(latv, (int, float)): in_lat_ms = float(latv) * 1000.0 elif isinstance(latv, tuple) and len(latv) > 0: in_lat_ms = float(latv[-1]) * 1000.0 except Exception: pass out_free_ms = 0.0 try: wa = getattr(ostream, 'write_available', None) if isinstance(wa, int): out_free_ms = (wa / fmt.sample_rate) * 1000.0 except Exception: pass out_block_ms = 0.0 try: bs = getattr(ostream, 'blocksize', None) if isinstance(bs, int) and bs > 0: out_block_ms = (bs / fmt.sample_rate) * 1000.0 except Exception: pass e2e_ms = in_lat_ms + in_q_ms + out_lat_ms + out_block_ms print( f"read min={min(read_w)*1000:.3f}ms mean={(sum(read_w)/len(read_w))*1000:.3f}ms max={max(read_w)*1000:.3f}ms " f"write min={min(write_w)*1000:.3f}ms mean={(sum(write_w)/len(write_w))*1000:.3f}ms max={max(write_w)*1000:.3f}ms " f"loop min={min(loop_w)*1000:.3f}ms mean={(sum(loop_w)/len(loop_w))*1000:.3f}ms max={max(loop_w)*1000:.3f}ms " f"qlen={len(device._q)} rbfill={rb_fill_samples:.1f}smp in_lat={in_lat_ms:.2f}ms in_q={in_q_ms:.2f}ms out_lat={out_lat_ms:.2f}ms out_blk={out_block_ms:.2f}ms out_free={out_free_ms:.2f}ms e2e~{e2e_ms:.2f}ms" ) except KeyboardInterrupt: pass finally: await device.aclose() try: ostream.stop(); ostream.close() except Exception: pass if __name__ == '__main__': asyncio.run(main())