feat: add sample rate conversion to audio input for improved device compatibility

This commit is contained in:
pstruebi
2025-10-06 15:07:51 +02:00
parent 37984b028b
commit e503b36c50
9 changed files with 778 additions and 17 deletions

View File

@@ -177,7 +177,7 @@ git checkout 9abe5fe7db729280080a0bbc1397a528cd3ce658
rm -rf build
cmake -S . -B build -G"Unix Makefiles" \
-DBUILD_SHARED_LIBS=ON \
-DPA_USE_ALSA=OFF \
-DPA_USE_ALSA=ON \
-DPA_USE_PULSEAUDIO=ON \
-DPA_USE_JACK=OFF
cmake --build build -j$(nproc)

26
poetry.lock generated
View File

@@ -2443,6 +2443,30 @@ files = [
{file = "rpds_py-0.25.1.tar.gz", hash = "sha256:8960b6dac09b62dac26e75d7e2c4a22efb835d827a7278c34f72b2b84fa160e3"},
]
[[package]]
name = "samplerate"
version = "0.2.1"
description = "Monolithic python wrapper for libsamplerate based on pybind11 and NumPy"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "samplerate-0.2.1-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:9b392e857cfeda712585d702871eab7169ba2c63209e8d2314fa2196d6127354"},
{file = "samplerate-0.2.1-cp310-cp310-win_amd64.whl", hash = "sha256:c237959ba0fd391b3293f2905c23f1ae15a096fc987cd776aef380ba426491c6"},
{file = "samplerate-0.2.1-cp311-cp311-macosx_12_0_universal2.whl", hash = "sha256:b0e1b5cb08edb19d232bd1ba67632590ed7a579b526ab4bf3983b4f2b2e9acb4"},
{file = "samplerate-0.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:3ebd447f2040950edc1dac32cb4afed3a74ed37e8e5ffe4775470213f799801f"},
{file = "samplerate-0.2.1-cp312-cp312-macosx_12_0_universal2.whl", hash = "sha256:69cc7ad887c36294129315a04a7f43837c7ed9caf42db94e9687cf66c9709200"},
{file = "samplerate-0.2.1-cp312-cp312-win_amd64.whl", hash = "sha256:c7f5e81b24debfa25981e367e3f5d191cf72e32b5c92613139f5ba94e7f18c01"},
{file = "samplerate-0.2.1-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:a2b39a7131da065072508548f0ab80c9565d8c75212eac2f61fc1b1f82bb1611"},
{file = "samplerate-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:c02d6e0f7541c4f6a64b97ff6d0a84f53a0c432c965031491b48d9d75a81f102"},
{file = "samplerate-0.2.1-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:137563c6069e23441b4c2059cf95a3ed5b20a6d747d0488014becb94b4dfc32f"},
{file = "samplerate-0.2.1-cp39-cp39-win_amd64.whl", hash = "sha256:0656233932f76af6070f56edf34d8ef56e62e2c503553229d2388953a2166cda"},
{file = "samplerate-0.2.1.tar.gz", hash = "sha256:464d3574412024184fb7428ecbaa1b2e207bddf5fbc10a5d9ddc3fc1c7b7ab1e"},
]
[package.dependencies]
numpy = "*"
[[package]]
name = "six"
version = "1.17.0"
@@ -2952,4 +2976,4 @@ test = ["pytest", "pytest-asyncio"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.11"
content-hash = "6b5300c349ed045e8fd3e617e6262bbd7e5c48c518e4c62cedf7c17da50ce8c0"
content-hash = "7290ffd17fde6febd61d505131efe260790b3d91fcb7442b6c4aa6fc32f6baeb"

View File

@@ -16,7 +16,8 @@ dependencies = [
"aiortc (>=1.13.0,<2.0.0)",
"sounddevice (>=0.5.2,<0.6.0)",
"python-dotenv (>=1.1.1,<2.0.0)",
"smbus2 (>=0.5.0,<0.6.0)"
"smbus2 (>=0.5.0,<0.6.0)",
"samplerate (>=0.2.1,<0.3.0)"
]
[project.optional-dependencies]

View File

@@ -47,6 +47,7 @@ import bumble.utils
import numpy as np # for audio down-mix
from bumble.device import Host, AdvertisingChannelMap
from bumble.audio import io as audio_io
from auracast.utils import io_asrc
from auracast import auracast_config
from auracast.utils.read_lc3_file import read_lc3_file
@@ -533,7 +534,10 @@ class Streamer():
# anything else, e.g. realtime stream from device (bumble)
else:
audio_input = await audio_io.create_audio_input(audio_source, input_format)
if audio_source.startswith('asrcdevice'):
audio_input = io_asrc.SoundDeviceAudioInputAsrc(audio_source[len('asrcdevice')+1:], input_format)
else:
audio_input = await audio_io.create_audio_input(audio_source, input_format)
# Store early so stop_streaming can close even if open() fails
big['audio_input'] = audio_input
# SoundDeviceAudioInput (used for `mic:<device>` captures) has no `.rewind`.

View File

@@ -50,7 +50,6 @@ from auracast.utils.sounddevice_utils import (
refresh_pw_cache,
)
if __name__ == "__main__":
logging.basicConfig( #export LOG_LEVEL=DEBUG
@@ -143,7 +142,7 @@ if __name__ == "__main__":
program_info=program_info,
language=language,
iso_que_len=1,
audio_source=f'device:{input_sel}',
audio_source=f'asrcdevice:{input_sel}',
input_format=f"int16le,{CAPTURE_SRATE},{channels}",
sampling_frequency=LC3_SRATE,
octets_per_frame=OCTETS_PER_FRAME,
@@ -157,7 +156,7 @@ if __name__ == "__main__":
octets_per_frame = OCTETS_PER_FRAME,
transport=TRANSPORT1
)
#config.debug = True
config.debug = True
logging.info(config.model_dump_json(indent=2))
multicast.run_async(

View File

@@ -517,7 +517,7 @@ else:
program_info=program_info,
language=language,
audio_source=(
f"device:{input_device}" if audio_mode in ("USB", "AES67") else (
f"asrcdevice:{input_device}" if audio_mode in ("USB", "AES67") else (
"webrtc" if audio_mode == "Webapp" else "network"
)
),

View File

@@ -177,7 +177,7 @@ class StreamerWorker:
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None
audio_mode_persist = 'Demo'
if first_source.startswith('device:'):
if first_source.startswith('device:') or first_source.startswith('asrcdevice:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
try:
usb_names = {d.get('name') for _, d in get_usb_pw_inputs()}
@@ -193,12 +193,17 @@ class StreamerWorker:
for big in conf.bigs:
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
elif big.audio_source.startswith('asrcdevice:'):
big.audio_source = f'asrcdevice:{device_index}'
devinfo = sd.query_devices(device_index)
capture_rate = int(devinfo.get('default_samplerate') or 48000)
# Force 48 kHz capture to match multicast_script behavior and minimize resampler/pipewire latency
capture_rate = 48000
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
for big in conf.bigs:
big.input_format = f"int16le,{capture_rate},{channels}"
if big.audio_source.startswith('device:') or big.audio_source.startswith('asrcdevice:'):
# Always override to 48 kHz for device/asrcdevice, regardless of frontend-provided input_format
big.input_format = f"int16le,{capture_rate},{channels}"
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
@@ -209,7 +214,12 @@ class StreamerWorker:
await reset_nrf54l(1)
await self._multicaster1.init_broadcast()
auto_started = False
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
if any(
big.audio_source.startswith("device:") or
big.audio_source.startswith("asrcdevice:") or
big.audio_source.startswith("file:")
for big in conf.bigs
):
await self._multicaster1.start_streaming()
auto_started = True
@@ -219,6 +229,11 @@ class StreamerWorker:
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
'input_device': input_device_name,
'input_source_kind': (
'asrcdevice' if any(b.audio_source.startswith('asrcdevice:') for b in conf.bigs) else (
'device' if any(b.audio_source.startswith('device:') for b in conf.bigs) else None
)
),
'program_info': [getattr(big, 'program_info', None) for big in conf.bigs],
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
@@ -241,12 +256,24 @@ class StreamerWorker:
conf.transport = TRANSPORT2
for big in conf.bigs:
if big.audio_source.startswith('device:'):
if big.audio_source.startswith('device:') or big.audio_source.startswith('asrcdevice:'):
device_name = big.audio_source.split(':', 1)[1]
device_index = get_device_index_by_name(device_name)
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
big.audio_source = f'device:{device_index}'
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
else:
big.audio_source = f'asrcdevice:{device_index}'
# Also force 48 kHz capture and set input_format for device/asrcdevice
try:
devinfo2 = sd.query_devices(device_index)
except Exception:
devinfo2 = {}
capture_rate2 = 48000
max_in2 = int((devinfo2 or {}).get('max_input_channels') or 1)
channels2 = max(1, min(2, max_in2))
big.input_format = f"int16le,{capture_rate2},{channels2}"
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
@@ -255,7 +282,12 @@ class StreamerWorker:
self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs)
await reset_nrf54l(0)
await self._multicaster2.init_broadcast()
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
if any(
big.audio_source.startswith("device:") or
big.audio_source.startswith("asrcdevice:") or
big.audio_source.startswith("file:")
for big in conf.bigs
):
await self._multicaster2.start_streaming()
async def _w_stop_all(self) -> bool:
@@ -442,7 +474,10 @@ async def _autostart_from_settings():
name=channel_names[0] if channel_names else "Broadcast0",
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
language=languages[0] if languages else "deu",
audio_source=f"device:{input_device_name}",
audio_source=(
f"asrcdevice:{input_device_name}" if (settings.get('input_source_kind') == 'asrcdevice')
else f"device:{input_device_name}"
),
# input_format is intentionally omitted to use the default
iso_que_len=1,
sampling_frequency=rate,
@@ -725,4 +760,8 @@ if __name__ == '__main__':
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
# Bind to localhost only for security: prevents network access, only frontend on same machine can connect
uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False)
uvicorn.run(
app, host="127.0.0.1",
port=5000,
#access_log=False
)

View File

@@ -0,0 +1,509 @@
# Copyright 2025
#
# Drop-in replacement for `SoundDeviceAudioInput` that adds a tiny ASRC stage.
#
# Constraints per request:
# - Only import io_bumble.py at module level.
# - Reuse the ASRC functionality from asrc.py conceptually (PI control + FIFO +
# linear/sinc resampling behavior). We implement a minimal, dependency-free
# variant (linear interpolation with a small PI loop) so this module does not
# import anything else at top-level.
#
# Notes:
# - Input stream is captured via sounddevice (imported lazily inside methods).
# - Input is mono float32 for simplicity; output matches the original class
# signature: INT16, stereo, at the same nominal sample rate as requested.
from bumble.audio.io import PcmFormat, ThreadedAudioInput, logger # only top-level import
class SoundDeviceAudioInputAsrc(ThreadedAudioInput):
"""Sound device audio input with a simple ASRC stage.
Interface-compatible with `io_bumble.SoundDeviceAudioInput`:
- __init__(device_name: str, pcm_format: PcmFormat)
- _open() -> PcmFormat
- _read(frame_size: int) -> bytes
- _close() -> None
Behavior:
- Captures mono float32 frames from the device.
- Buffers into an internal ring buffer.
- Produces stereo INT16 frames using a linear-interp resampler whose
ratio is adjusted by a tiny PI loop to hold FIFO depth near a target.
"""
def __init__(self, device_name: str, pcm_format: str) -> None:
super().__init__()
# Device & format
self._device = int(device_name) if device_name else None
pcm_format: PcmFormat | None
if pcm_format == 'auto':
pcm_format = None
else:
pcm_format = PcmFormat.from_str(pcm_format)
self._pcm_format_in = pcm_format
# We always output stereo INT16 at the same nominal sample rate.
self._pcm_format_out = PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
pcm_format.sample_rate,
2,
)
# sounddevice stream (created in _open)
self._stream = None # type: ignore[assignment]
self._stream_started = False # Track if stream has been started
# --- ASRC state (inspired by asrc.py) ---
# Nominal input/output rate ratio
self._r = 1.0
self._integral = 0.0
self._phi = 0.0 # fractional read position within current chunk
# PI gains (normalized to seconds error: see _update_ratio)
# Smaller values to avoid saturation and oscillation
self._Kp = 0.5 # proportional on time error (seconds)
self._Ki = 0.08 # integral on time error (seconds)
self._R0 = 1.0
# Controller smoothing and slew limiting
self._alpha_r = 0.25 # low-pass blend factor for ratio updates
self._max_step_ppm = 3000.0 # limit ratio change per update (ppm)
self._integral_limit = 0.02 # clamp integral contribution (approx ±2%/Ki)
# Target FIFO level and deadband (slightly larger headroom for jitter)
fs = float(self._pcm_format_in.sample_rate)
self._target_samples = max(1, int(0.015 * fs)) # 20ms headroom to avoid underflows
# Slightly larger deadband to reduce chatter (≈1 ms)
self._deadband = max(1, int(0.001 * fs))
# Ring buffer for mono float32 samples
# Capacity ~2 seconds for headroom
self._rb_cap = max(self._target_samples * 32, int(2 * fs))
self._rb = None # created in _init_rb()
self._ridx = 0
self._size = 0
self._lock = None # created in _init_rb()
self._init_rb()
# Light logging timer
self._last_log = 0.0
# Input-side throughput logging
self._in_samples = 0
self._in_last_log = 0.0
# Streaming resampler and internal output buffer (lazy init)
self._rs = None # samplerate.Resampler
self._out_buf = None # numpy.ndarray float32
# ---------------- Internal helpers -----------------
def _init_rb(self) -> None:
# Lazy import standard libs to keep only io_bumble imported at top level
import threading
from array import array
self._rb = array('f', [0.0] * self._rb_cap) # float32 ring buffer
self._lock = threading.Lock()
self._ridx = 0
self._size = 0
def _fifo_len(self) -> int:
with self._lock:
return self._size
def _fifo_write(self, x_f32) -> None:
# x_f32: 1-D float32-like iterable
k = len(x_f32)
if k <= 0:
return
rb = self._rb
if rb is None:
return
with self._lock:
# Trim if larger than capacity: keep last N
if k >= self._rb_cap:
x_f32 = x_f32[-self._rb_cap:]
k = self._rb_cap
# Make room on overflow (drop oldest)
excess = max(0, self._size + k - self._rb_cap)
if excess:
self._ridx = (self._ridx + excess) % self._rb_cap
self._size -= excess
# Write at tail position
wpos = (self._ridx + self._size) % self._rb_cap
first = min(k, self._rb_cap - wpos)
# Write first chunk
from array import array as _array # lazy import
rb[wpos:wpos + first] = _array('f', x_f32[:first])
# Wrap if needed
second = k - first
if second:
rb[0:second] = _array('f', x_f32[first:])
self._size += k
def _fifo_peek_array(self, n: int):
# Returns a Python list[float] copy of up to n samples
rb = self._rb
if rb is None:
return []
m = max(0, min(n, self._fifo_len()))
if m <= 0:
return []
pos = self._ridx
first = min(m, self._rb_cap - pos)
# Copy out
out = [0.0] * m
# First chunk
out[:first] = rb[pos:pos + first]
# Second chunk if wrap
second = m - first
if second > 0:
out[first:] = rb[0:second]
return out
def _fifo_discard(self, n: int) -> None:
with self._lock:
d = max(0, min(n, self._size))
self._ridx = (self._ridx + d) % self._rb_cap
self._size -= d
def _update_ratio(self) -> None:
# PI loop to hold buffer near target
# Error: positive when buffer too full (need to consume more = lower ratio)
e_samples = self._fifo_len() - self._target_samples
if -self._deadband <= e_samples <= self._deadband:
e_samples = 0.0
fs = float(self._pcm_format_in.sample_rate)
e_time = float(e_samples) / max(1e-9, fs) # seconds
# Integrator on time error with clamping (anti-windup)
cand_integral = self._integral + e_time
# Clamp integral to prevent runaway
if cand_integral > self._integral_limit:
cand_integral = self._integral_limit
elif cand_integral < -self._integral_limit:
cand_integral = -self._integral_limit
# Compute new ratio command (NEG sign: if buffer too full, reduce ratio)
r_cmd = self._R0 * (1.0 - (self._Kp * e_time + self._Ki * cand_integral))
# Absolute clamp to ±20000 ppm vs nominal
ppm_cmd = 1e6 * (r_cmd / self._R0 - 1.0)
if ppm_cmd > 20000.0:
ppm_cmd = 20000.0
elif ppm_cmd < -20000.0:
ppm_cmd = -20000.0
r_cmd = self._R0 * (1.0 + ppm_cmd * 1e-6)
# Slew-rate limit per update
ppm_prev = 1e6 * (self._r / self._R0 - 1.0)
dppm = ppm_cmd - ppm_prev
if dppm > self._max_step_ppm:
ppm_cmd = ppm_prev + self._max_step_ppm
elif dppm < -self._max_step_ppm:
ppm_cmd = ppm_prev - self._max_step_ppm
# Low-pass smoothing of ratio updates
r_target = self._R0 * (1.0 + ppm_cmd * 1e-6)
self._r = (1.0 - self._alpha_r) * self._r + self._alpha_r * r_target
# Accept integral only when not at hard clamps to reduce windup
if abs(ppm_cmd) < 20000.0:
self._integral = cand_integral
else:
# light decay when clamped
self._integral *= 0.995
# Occasional log
try:
import time as _time
now = _time.time()
if now - self._last_log > 1.0:
buf_ms = 1000.0 * self._fifo_len() / float(self._pcm_format_in.sample_rate)
print(
f"\nASRC buf={buf_ms:5.1f} ms r={self._r:.9f} corr={1e6 * (self._r / self._R0 - 1.0):+7.1f} ppm"
)
self._last_log = now
except Exception:
# Logging must never break audio
pass
def _process(self, n_out: int) -> list[float]:
# Accumulate at least n_out samples using samplerate.Resampler
if n_out <= 0:
return []
# Lazy imports
import numpy as np # type: ignore
# Lazy init output buffer
if self._out_buf is None:
self._out_buf = np.zeros(0, dtype=np.float32)
# Choose chunk so we don't take too much from FIFO each time
max_chunk = max(256, int(np.ceil(n_out / max(1e-9, self._r))))
safety_iters = 0
while self._out_buf.size < n_out and safety_iters < 16:
safety_iters += 1
available = self._fifo_len()
if available <= 0:
break
take = min(available, max_chunk)
x = self._fifo_peek_array(take)
self._fifo_discard(take)
if not x:
break
x_arr = np.asarray(x, dtype=np.float32)
if self._rs is not None:
try:
y = self._rs.process(x_arr, ratio=float(self._r), end_of_input=False)
except Exception:
logger.exception("ASRC resampler error")
y = None
else:
y = None
if y is not None and getattr(y, 'size', 0):
y = y.astype(np.float32, copy=False)
if self._out_buf.size == 0:
self._out_buf = y
else:
self._out_buf = np.concatenate((self._out_buf, y))
if self._out_buf.size >= n_out:
out = self._out_buf[:n_out]
self._out_buf = self._out_buf[n_out:]
return out.tolist()
else:
# Not enough data produced; pad with zeros
out = np.zeros(n_out, dtype=np.float32)
# Debug: report zero-padding underflow
try:
import time as _time
if _time.time() - self._last_log > 0.5:
produced = int(self._out_buf.size)
fifo_now = int(self._fifo_len())
corr_ppm = 1e6 * (self._r / self._R0 - 1.0)
print(
f"\nASRC debug: zero-padding underflow produced={produced}/{n_out} "
f"fifo={fifo_now} r={self._r:.9f} corr={corr_ppm:+.1f} ppm"
)
self._last_log = _time.time()
except Exception:
pass
if self._out_buf.size:
out[: self._out_buf.size] = self._out_buf
self._out_buf = np.zeros(0, dtype=np.float32)
return out.tolist()
def _mono_to_stereo_int16_bytes(self, mono_f32: list[float]) -> bytes:
# Convert [-1,1] float list to stereo int16 little-endian bytes
import struct
ba = bytearray()
for v in mono_f32:
# clip
if v > 1.0:
v = 1.0
elif v < -1.0:
v = -1.0
i16 = int(v * 32767.0)
ba += struct.pack('<hh', i16, i16)
return bytes(ba)
# ---------------- ThreadedAudioInput hooks -----------------
def _open(self) -> PcmFormat:
# Set up sounddevice RawInputStream (int16) and start callback producer
import sounddevice # pylint: disable=import-error
import math
import samplerate as sr # type: ignore
# We capture mono regardless of requested channels, then output stereo.
channels = 1
samplerate = int(self._pcm_format_in.sample_rate)
# Force ALSA backend by selecting an ALSA-hostapi input device
# try:
# # Resolve current device info/name first (fallback: None -> default input)
# cur_dev_info = None
# try:
# cur_dev_info = sounddevice.query_devices(self._device, 'input')
# except Exception:
# cur_dev_info = None
# cur_name = (cur_dev_info or {}).get('name', '') if isinstance(cur_dev_info, dict) else ''
# # Discover ALSA-capable input devices by scanning all devices
# all_devs = list(sounddevice.query_devices())
# alsa_candidates: list[tuple[int, dict, dict]] = []
# for didx, info in enumerate(all_devs):
# if not isinstance(info, dict):
# continue
# if (info.get('max_input_channels') or 0) <= 0:
# continue
# try:
# host = sounddevice.query_hostapis(info.get('hostapi'))
# except Exception:
# continue
# if not isinstance(host, dict):
# continue
# if 'ALSA' in (host.get('name') or ''):
# alsa_candidates.append((didx, info, host))
# if not alsa_candidates:
# raise RuntimeError("ASRC: No ALSA host API/input device available. Ensure PortAudio was built with ALSA and an ALSA input exists.")
# # Prefer same-name device, else pick first ALSA input
# chosen_idx, chosen_info, chosen_host = None, None, None
# if cur_name:
# for didx, info, host in alsa_candidates:
# if (info.get('name') or '') == cur_name:
# chosen_idx, chosen_info, chosen_host = didx, info, host
# break
# if chosen_idx is None:
# chosen_idx, chosen_info, chosen_host = alsa_candidates[0]
# # Switch default hostapi and target device to ALSA
# try:
# sounddevice.default.hostapi = chosen_info.get('hostapi') # index
# except Exception:
# pass
# logger.info(
# "ASRC: forcing ALSA backend: switching device index %s -> %s (name='%s')",
# str(self._device), str(chosen_idx), (cur_name or '')
# )
# self._device = int(chosen_idx)
# # Final verification: ensure the currently selected input device is on ALSA
# try:
# finfo = sounddevice.query_devices(self._device, 'input')
# fhost = sounddevice.query_hostapis(finfo['hostapi']) if isinstance(finfo, dict) else {}
# fname = (fhost.get('name') or '') if isinstance(fhost, dict) else ''
# except Exception:
# finfo, fname = None, ''
# if 'ALSA' not in (fname or ''):
# raise RuntimeError(
# f"ASRC: expected ALSA hostapi after selection, but got '{fname or 'UNKNOWN'}'"
# )
# except Exception as e:
# logger.exception("ASRC: error while attempting to force ALSA backend")
# raise
# Debug: print which backend/host API and device are used by sounddevice (after forcing)
dev_info = sounddevice.query_devices(self._device, 'input')
hostapi_info = sounddevice.query_hostapis(dev_info['hostapi'])
# PortAudio version string (may be empty on some builds)
try:
pa_ver = sounddevice.get_portaudio_version()[1]
except Exception:
pa_ver = ''
logger.info(
"ASRC sounddevice: device='%s' hostapi='%s' samplerate=%d channels=%d portaudio='%s'",
dev_info.get('name', '?'),
hostapi_info.get('name', '?'),
samplerate,
channels,
pa_ver or '?'
)
def _callback(indata, frames, time_info, status): # noqa: ARG001 (signature is fixed)
# indata: raw int16 bytes-like buffer of shape (frames, channels)
try:
if status:
logger.warning("Input status: %s", status)
if frames <= 0:
return
# Vectorized convert INT16 LE mono -> float32 [-1,1]
import numpy as _np # type: ignore
try:
x_i16 = _np.frombuffer(indata, dtype=_np.int16)
x_f32 = _np.asarray(x_i16, dtype=_np.float32) * (1.0 / 32768.0)
except Exception:
# Fallback to safe (slower) path if buffer view fails
mv = memoryview(indata).cast('h')
x_f32 = _np.empty(frames, dtype=_np.float32)
for i in range(frames):
v = mv[i]
f = float(v) / 32768.0
if not (f == f) or math.isinf(f):
f = 0.0
x_f32[i] = f
# Debug: detect silent input chunks (all zeros)
try:
import time as _time
if _np.max(_np.abs(x_f32)) == 0.0 and (_time.time() - self._last_log > 0.5):
print("\nASRC debug: input chunk is silent (all zeros)")
self._last_log = _time.time()
except Exception:
pass
# Write to FIFO
self._fifo_write(x_f32)
# Input throughput logging
try:
import time as _time
self._in_samples += int(frames)
now = _time.time()
if now - self._in_last_log >= 1.0:
sps = self._in_samples / max(1e-9, (now - self._in_last_log))
fifo_now = int(self._fifo_len())
corr_ppm = 1e6 * (self._r / self._R0 - 1.0)
print(
f"\nASRC debug: input_sps={sps:.1f} fifo={fifo_now} r={self._r:.9f} corr={corr_ppm:+.1f} ppm"
)
self._in_samples = 0
self._in_last_log = now
except Exception:
pass
except Exception: # never let callback raise
logger.exception("Audio input callback error")
# Create streaming resampler (mono)
try:
self._rs = sr.Resampler(converter_type="sinc_medium", channels=1)
except Exception:
logger.exception("Failed to create samplerate.Resampler; audio may be silent")
self._rs = None
self._stream = sounddevice.RawInputStream(
samplerate=samplerate,
device=self._device,
channels=channels,
dtype='int16',
callback=_callback,
blocksize=int( 2.0e-3 * samplerate),
latency='low', # Use low latency to minimize buffer size
)
# Don't start stream yet - wait for first read to avoid buffer buildup
# self._stream.start() # Delayed until first _read()
return self._pcm_format_out
def _read(self, frame_size: int) -> bytes:
# Produce 'frame_size' output frames (stereo INT16)
if frame_size <= 0:
return b''
# Start stream on first read to avoid buffer buildup during initialization
if self._stream and not self._stream_started:
self._stream.start()
self._stream_started = True
# Don't block - let callback start filling buffer asynchronously
# Update resampling ratio based on FIFO level
try:
self._update_ratio()
except Exception:
# keep going even if update failed
pass
# Process mono float32
mono = self._process(frame_size)
# Convert to stereo int16 LE bytes
return self._mono_to_stereo_int16_bytes(mono)
def _close(self) -> None:
try:
if self._stream is not None:
self._stream.stop()
self._stream.close()
except Exception:
logger.exception('Error closing input stream')
finally:
self._stream = None
self._stream_started = False

View File

@@ -0,0 +1,185 @@
#!/usr/bin/env bash
# print_pw_latency.sh — latency info for nodes that currently appear in PipeWire Links
# Deps: pw-cli, pw-dump, pw-link, pw-metadata, jq, awk, sed, grep, sort, uniq
set -euo pipefail
FILTER_NAME=""
FILTER_ID=""
usage() {
cat <<EOF
Usage: $0 [-n name_regex] [-i node_id]
Shows latency-related info ONLY for nodes that participate in current links.
EOF
}
while getopts ":n:i:h" opt; do
case "$opt" in
n) FILTER_NAME="$OPTARG" ;;
i) FILTER_ID="$OPTARG" ;;
h) usage; exit 0 ;;
\?) echo "Invalid option: -$OPTARG" >&2; usage; exit 1 ;;
esac
done
need() { command -v "$1" >/dev/null 2>&1 || { echo "Missing dependency: $1" >&2; exit 1; }; }
for bin in pw-cli pw-dump pw-link pw-metadata jq awk sed grep sort uniq; do need "$bin"; done
div() { printf '%*s\n' "${COLUMNS:-80}" '' | tr ' ' -; }
echo "PipeWire latency inspector (linked nodes) — $(date)"
div
echo "GLOBAL CLOCK SETTINGS (pw-metadata -n settings):"
pw-metadata -n settings 2>/dev/null || echo "(no settings metadata found)"
echo
# Parse 'update: id:0 key:'clock.rate' value:'48000' type:'''
pw-metadata -n settings 2>/dev/null \
| awk '
match($0, /key:\x27([^'\''"]+)\x27[[:space:]]+value:\x27([^'\''"]*)\x27/, m) {
printf " %s: %s\n", m[1], m[2]
}
' || true
div
echo "CURRENT LINKS (pw-link -l):"
pw-link -l 2>/dev/null || echo "(no links or cannot list links)"
div
# ---------- Collect node IDs from Links ----------
# Primary: use pw-dump Link objects (no state filter; just "any existing link")
LINK_NODE_IDS="$(
pw-dump 2>/dev/null \
| jq -r '
.[] | select(.type=="PipeWire:Interface:Link") | select(.info!=null)
| [.info.output_node, .info.input_node] | @tsv
' \
| awk '{print $1"\n"$2}' \
| sed '/^$/d' | sort -n | uniq || true
)"
# Fallback: parse names from pw-link -l and map names -> IDs via pw-dump
if [[ -z "$LINK_NODE_IDS" ]]; then
# Extract all "node:port" tokens and cut at the colon → node names
LINK_NODE_NAMES="$(
pw-link -l 2>/dev/null \
| sed -n 's/^[[:space:]]*|[-<>][[:space:]]*//; s/[[:space:]]\+$//; p' \
| awk -F':' '/:/{print $1}' \
| sed '/^$/d' | sort | uniq || true
)"
if [[ -n "$LINK_NODE_NAMES" ]]; then
# Build name->id map from pw-dump Nodes
# Prefer node.name; fallback to node.description
while read -r nm; do
[[ -z "$nm" ]] && continue
id="$(pw-dump | jq -r --arg n "$nm" '
.[] | select(.type=="PipeWire:Interface:Node")
| select((.info.props["node.name"] // .info.props["node.description"] // "") == $n)
| .id
' | head -n1)"
[[ -n "$id" ]] && echo "$id"
done <<< "$LINK_NODE_NAMES" \
| sort -n | uniq > /tmp/_pw_link_node_ids.$$
LINK_NODE_IDS="$(cat /tmp/_pw_link_node_ids.$$ || true)"
rm -f /tmp/_pw_link_node_ids.$$ || true
fi
fi
if [[ -z "$LINK_NODE_IDS" ]]; then
echo "No linked nodes found."
exit 0
fi
# Pull all nodes and keep only those in LINK_NODE_IDS
IDS_JSON="[$(echo "$LINK_NODE_IDS" | awk 'NR>1{printf ","}{printf "%s",$0} END{print ""}')]"
NODES_JSON="$(pw-dump | jq -c '.[] | select(.type=="PipeWire:Interface:Node")')"
NODES_JSON="$(echo "$NODES_JSON" | jq -c --argjson ids "$IDS_JSON" 'select([.id] | inside($ids))')"
# Optional filters
[[ -n "$FILTER_ID" ]] && NODES_JSON="$(echo "$NODES_JSON" | jq -c "select(.id==$FILTER_ID)")"
[[ -n "$FILTER_NAME" ]] && NODES_JSON="$(echo "$NODES_JSON" | jq -c "select((.info.props[\"node.name\"] // .info.props[\"node.description\"] // \"\") | test(\"$FILTER_NAME\"))")"
if [[ -z "$NODES_JSON" ]]; then
echo "No matching linked nodes."
exit 0
fi
echo "LINKED NODE LATENCY DETAILS:"
while IFS= read -r node; do
id=$(echo "$node" | jq -r '.id')
name=$(echo "$node" | jq -r '.info.props["node.name"] // .info.props["node.description"] // "unknown"')
appname=$(echo "$node" | jq -r '.info.props["application.name"] // ""')
media=$(echo "$node" | jq -r '.info.props["media.class"] // ""')
state=$(echo "$node" | jq -r '.info.state // ""')
format=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.format // empty')
rate=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.rate // empty')
channels=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.channels // empty')
echo
echo "Node #$id ${name} ${appname:+(app: $appname)}"
echo " state: $state | media.class: $media"
if [[ -n "$rate" || -n "$channels" || -n "$format" ]]; then
echo " format: ${format:-?} rate: ${rate:-?} channels: ${channels:-?}"
if [[ -n "$rate" ]]; then
q=$(pw-cli enum-params "$id" ProcessLatency 2>/dev/null | awk '/quantum/ {print $NF; exit}')
[[ -n "${q:-}" ]] && awk -v q="$q" -v r="$rate" 'BEGIN{printf " ~sched per-cycle: %d/%d = %.3f ms\n", q, r, (q*1000.0/r)}'
fi
fi
LATENCY_RAW="$(pw-cli enum-params "$id" Latency 2>/dev/null || true)"
if [[ -n "$LATENCY_RAW" ]]; then
echo " node.latency param (per-direction):"
echo "$LATENCY_RAW" | awk '
BEGIN{RS="";FS="\n"}
{
dir="";
for(i=1;i<=NF;i++){
if($i ~ /direction/){
if($i ~ /input/) dir="capture";
else if($i ~ /output/) dir="playback";
else dir="unknown";
}
if($i ~ /min-quantum/ || $i ~ /max-quantum/ || $i ~ /rate/ || $i ~ /ns/){
gsub(/^[ \t]+/,"",$i); gsub(/Prop: key /,"",$i);
printf " [%s] %s\n", dir, $i;
}
}
}'
else
echo " node.latency: (no Latency param reported)"
fi
PROC_RAW="$(pw-cli enum-params "$id" ProcessLatency 2>/dev/null || true)"
if [[ -n "$PROC_RAW" ]]; then
echo " ProcessLatency:"
echo "$PROC_RAW" | awk '/quantum|rate|ns/ { gsub(/^[ \t]+/,""); sub(/Prop: key /,""); print " " $0 }'
else
echo " ProcessLatency: (not reported)"
fi
INFO_RAW="$(pw-cli info "$id" 2>/dev/null || true)"
if echo "$INFO_RAW" | grep -q "api.alsa"; then
echo " ALSA (periods/headroom):"
echo "$INFO_RAW" | awk -F': ' '
/api\.alsa\.period-size/ {print " api.alsa.period-size: " $2}
/api\.alsa\.periods/ {print " api.alsa.periods: " $2}
/api\.alsa\.headroom/ {print " api.alsa.headroom: " $2}
/api\.alsa\.disable-batch/ {print " api.alsa.disable-batch: " $2}
/api\.alsa\.use-chmap/ {print " api.alsa.use-chmap: " $2}
' | sed 's/ (.*)//'
fi
echo "$INFO_RAW" | awk -F': ' '
/node\.force-rate/ {print " Hint: node.force-rate: " $2}
/node\.force-quantum/ {print " Hint: node.force-quantum: " $2}
/node\.latency/ {print " Hint: node.latency (prop): " $2}
/resample\.quality/ {print " Hint: resample.quality: " $2}
' | sed 's/ (.*)//' || true
done <<< "$NODES_JSON"
div
cat <<'TIP'
Notes:
- We include any node that appears in a Link (from pw-dump); this matches what pw-link -l shows.
- ~sched per-cycle = quantum/rate from ProcessLatency. Device/codec/RF/jitter buffers sit on top.
TIP