diff --git a/README.md b/README.md index e4f5deb..4b3e445 100644 --- a/README.md +++ b/README.md @@ -177,7 +177,7 @@ git checkout 9abe5fe7db729280080a0bbc1397a528cd3ce658 rm -rf build cmake -S . -B build -G"Unix Makefiles" \ -DBUILD_SHARED_LIBS=ON \ - -DPA_USE_ALSA=OFF \ + -DPA_USE_ALSA=ON \ -DPA_USE_PULSEAUDIO=ON \ -DPA_USE_JACK=OFF cmake --build build -j$(nproc) diff --git a/poetry.lock b/poetry.lock index dba8f46..c26d86e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2443,6 +2443,30 @@ files = [ {file = "rpds_py-0.25.1.tar.gz", hash = "sha256:8960b6dac09b62dac26e75d7e2c4a22efb835d827a7278c34f72b2b84fa160e3"}, ] +[[package]] +name = "samplerate" +version = "0.2.1" +description = "Monolithic python wrapper for libsamplerate based on pybind11 and NumPy" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "samplerate-0.2.1-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:9b392e857cfeda712585d702871eab7169ba2c63209e8d2314fa2196d6127354"}, + {file = "samplerate-0.2.1-cp310-cp310-win_amd64.whl", hash = "sha256:c237959ba0fd391b3293f2905c23f1ae15a096fc987cd776aef380ba426491c6"}, + {file = "samplerate-0.2.1-cp311-cp311-macosx_12_0_universal2.whl", hash = "sha256:b0e1b5cb08edb19d232bd1ba67632590ed7a579b526ab4bf3983b4f2b2e9acb4"}, + {file = "samplerate-0.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:3ebd447f2040950edc1dac32cb4afed3a74ed37e8e5ffe4775470213f799801f"}, + {file = "samplerate-0.2.1-cp312-cp312-macosx_12_0_universal2.whl", hash = "sha256:69cc7ad887c36294129315a04a7f43837c7ed9caf42db94e9687cf66c9709200"}, + {file = "samplerate-0.2.1-cp312-cp312-win_amd64.whl", hash = "sha256:c7f5e81b24debfa25981e367e3f5d191cf72e32b5c92613139f5ba94e7f18c01"}, + {file = "samplerate-0.2.1-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:a2b39a7131da065072508548f0ab80c9565d8c75212eac2f61fc1b1f82bb1611"}, + {file = "samplerate-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:c02d6e0f7541c4f6a64b97ff6d0a84f53a0c432c965031491b48d9d75a81f102"}, + {file = "samplerate-0.2.1-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:137563c6069e23441b4c2059cf95a3ed5b20a6d747d0488014becb94b4dfc32f"}, + {file = "samplerate-0.2.1-cp39-cp39-win_amd64.whl", hash = "sha256:0656233932f76af6070f56edf34d8ef56e62e2c503553229d2388953a2166cda"}, + {file = "samplerate-0.2.1.tar.gz", hash = "sha256:464d3574412024184fb7428ecbaa1b2e207bddf5fbc10a5d9ddc3fc1c7b7ab1e"}, +] + +[package.dependencies] +numpy = "*" + [[package]] name = "six" version = "1.17.0" @@ -2952,4 +2976,4 @@ test = ["pytest", "pytest-asyncio"] [metadata] lock-version = "2.1" python-versions = ">=3.11" -content-hash = "6b5300c349ed045e8fd3e617e6262bbd7e5c48c518e4c62cedf7c17da50ce8c0" +content-hash = "7290ffd17fde6febd61d505131efe260790b3d91fcb7442b6c4aa6fc32f6baeb" diff --git a/pyproject.toml b/pyproject.toml index 799f3ce..551c79f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,8 @@ dependencies = [ "aiortc (>=1.13.0,<2.0.0)", "sounddevice (>=0.5.2,<0.6.0)", "python-dotenv (>=1.1.1,<2.0.0)", - "smbus2 (>=0.5.0,<0.6.0)" + "smbus2 (>=0.5.0,<0.6.0)", + "samplerate (>=0.2.1,<0.3.0)" ] [project.optional-dependencies] diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 93792e5..bba0c64 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -47,6 +47,7 @@ import bumble.utils import numpy as np # for audio down-mix from bumble.device import Host, AdvertisingChannelMap from bumble.audio import io as audio_io +from auracast.utils import io_asrc from auracast import auracast_config from auracast.utils.read_lc3_file import read_lc3_file @@ -533,7 +534,10 @@ class Streamer(): # anything else, e.g. realtime stream from device (bumble) else: - audio_input = await audio_io.create_audio_input(audio_source, input_format) + if audio_source.startswith('asrcdevice'): + audio_input = io_asrc.SoundDeviceAudioInputAsrc(audio_source[len('asrcdevice')+1:], input_format) + else: + audio_input = await audio_io.create_audio_input(audio_source, input_format) # Store early so stop_streaming can close even if open() fails big['audio_input'] = audio_input # SoundDeviceAudioInput (used for `mic:` captures) has no `.rewind`. diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 553cebe..4755c5d 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -50,7 +50,6 @@ from auracast.utils.sounddevice_utils import ( refresh_pw_cache, ) - if __name__ == "__main__": logging.basicConfig( #export LOG_LEVEL=DEBUG @@ -143,7 +142,7 @@ if __name__ == "__main__": program_info=program_info, language=language, iso_que_len=1, - audio_source=f'device:{input_sel}', + audio_source=f'asrcdevice:{input_sel}', input_format=f"int16le,{CAPTURE_SRATE},{channels}", sampling_frequency=LC3_SRATE, octets_per_frame=OCTETS_PER_FRAME, @@ -157,7 +156,7 @@ if __name__ == "__main__": octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1 ) - #config.debug = True + config.debug = True logging.info(config.model_dump_json(indent=2)) multicast.run_async( diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index 730b733..df5a3d4 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -517,7 +517,7 @@ else: program_info=program_info, language=language, audio_source=( - f"device:{input_device}" if audio_mode in ("USB", "AES67") else ( + f"asrcdevice:{input_device}" if audio_mode in ("USB", "AES67") else ( "webrtc" if audio_mode == "Webapp" else "network" ) ), diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index a2b1b88..ea48b37 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -177,7 +177,7 @@ class StreamerWorker: first_source = conf.bigs[0].audio_source if conf.bigs else '' input_device_name = None audio_mode_persist = 'Demo' - if first_source.startswith('device:'): + if first_source.startswith('device:') or first_source.startswith('asrcdevice:'): input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None try: usb_names = {d.get('name') for _, d in get_usb_pw_inputs()} @@ -193,12 +193,17 @@ class StreamerWorker: for big in conf.bigs: if big.audio_source.startswith('device:'): big.audio_source = f'device:{device_index}' + elif big.audio_source.startswith('asrcdevice:'): + big.audio_source = f'asrcdevice:{device_index}' devinfo = sd.query_devices(device_index) - capture_rate = int(devinfo.get('default_samplerate') or 48000) + # Force 48 kHz capture to match multicast_script behavior and minimize resampler/pipewire latency + capture_rate = 48000 max_in = int(devinfo.get('max_input_channels') or 1) channels = max(1, min(2, max_in)) for big in conf.bigs: - big.input_format = f"int16le,{capture_rate},{channels}" + if big.audio_source.startswith('device:') or big.audio_source.startswith('asrcdevice:'): + # Always override to 48 kHz for device/asrcdevice, regardless of frontend-provided input_format + big.input_format = f"int16le,{capture_rate},{channels}" # Coerce QoS: compute max_transport_latency from RTN if qos_config present if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None: @@ -209,7 +214,12 @@ class StreamerWorker: await reset_nrf54l(1) await self._multicaster1.init_broadcast() auto_started = False - if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): + if any( + big.audio_source.startswith("device:") or + big.audio_source.startswith("asrcdevice:") or + big.audio_source.startswith("file:") + for big in conf.bigs + ): await self._multicaster1.start_streaming() auto_started = True @@ -219,6 +229,11 @@ class StreamerWorker: 'languages': [big.language for big in conf.bigs], 'audio_mode': audio_mode_persist, 'input_device': input_device_name, + 'input_source_kind': ( + 'asrcdevice' if any(b.audio_source.startswith('asrcdevice:') for b in conf.bigs) else ( + 'device' if any(b.audio_source.startswith('device:') for b in conf.bigs) else None + ) + ), 'program_info': [getattr(big, 'program_info', None) for big in conf.bigs], 'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs], 'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz, @@ -241,12 +256,24 @@ class StreamerWorker: conf.transport = TRANSPORT2 for big in conf.bigs: - if big.audio_source.startswith('device:'): + if big.audio_source.startswith('device:') or big.audio_source.startswith('asrcdevice:'): device_name = big.audio_source.split(':', 1)[1] device_index = get_device_index_by_name(device_name) if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.") - big.audio_source = f'device:{device_index}' + if big.audio_source.startswith('device:'): + big.audio_source = f'device:{device_index}' + else: + big.audio_source = f'asrcdevice:{device_index}' + # Also force 48 kHz capture and set input_format for device/asrcdevice + try: + devinfo2 = sd.query_devices(device_index) + except Exception: + devinfo2 = {} + capture_rate2 = 48000 + max_in2 = int((devinfo2 or {}).get('max_input_channels') or 1) + channels2 = max(1, min(2, max_in2)) + big.input_format = f"int16le,{capture_rate2},{channels2}" # Coerce QoS: compute max_transport_latency from RTN if qos_config present if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None: conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3 @@ -255,7 +282,12 @@ class StreamerWorker: self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs) await reset_nrf54l(0) await self._multicaster2.init_broadcast() - if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): + if any( + big.audio_source.startswith("device:") or + big.audio_source.startswith("asrcdevice:") or + big.audio_source.startswith("file:") + for big in conf.bigs + ): await self._multicaster2.start_streaming() async def _w_stop_all(self) -> bool: @@ -442,7 +474,10 @@ async def _autostart_from_settings(): name=channel_names[0] if channel_names else "Broadcast0", program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info, language=languages[0] if languages else "deu", - audio_source=f"device:{input_device_name}", + audio_source=( + f"asrcdevice:{input_device_name}" if (settings.get('input_source_kind') == 'asrcdevice') + else f"device:{input_device_name}" + ), # input_format is intentionally omitted to use the default iso_que_len=1, sampling_frequency=rate, @@ -725,4 +760,8 @@ if __name__ == '__main__': format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) # Bind to localhost only for security: prevents network access, only frontend on same machine can connect - uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False) \ No newline at end of file + uvicorn.run( + app, host="127.0.0.1", + port=5000, + #access_log=False + ) \ No newline at end of file diff --git a/src/auracast/utils/io_asrc.py b/src/auracast/utils/io_asrc.py new file mode 100644 index 0000000..9e9ca46 --- /dev/null +++ b/src/auracast/utils/io_asrc.py @@ -0,0 +1,509 @@ +# Copyright 2025 +# +# Drop-in replacement for `SoundDeviceAudioInput` that adds a tiny ASRC stage. +# +# Constraints per request: +# - Only import io_bumble.py at module level. +# - Reuse the ASRC functionality from asrc.py conceptually (PI control + FIFO + +# linear/sinc resampling behavior). We implement a minimal, dependency-free +# variant (linear interpolation with a small PI loop) so this module does not +# import anything else at top-level. +# +# Notes: +# - Input stream is captured via sounddevice (imported lazily inside methods). +# - Input is mono float32 for simplicity; output matches the original class +# signature: INT16, stereo, at the same nominal sample rate as requested. + +from bumble.audio.io import PcmFormat, ThreadedAudioInput, logger # only top-level import + + +class SoundDeviceAudioInputAsrc(ThreadedAudioInput): + """Sound device audio input with a simple ASRC stage. + + Interface-compatible with `io_bumble.SoundDeviceAudioInput`: + - __init__(device_name: str, pcm_format: PcmFormat) + - _open() -> PcmFormat + - _read(frame_size: int) -> bytes + - _close() -> None + + Behavior: + - Captures mono float32 frames from the device. + - Buffers into an internal ring buffer. + - Produces stereo INT16 frames using a linear-interp resampler whose + ratio is adjusted by a tiny PI loop to hold FIFO depth near a target. + """ + + def __init__(self, device_name: str, pcm_format: str) -> None: + super().__init__() + # Device & format + self._device = int(device_name) if device_name else None + pcm_format: PcmFormat | None + if pcm_format == 'auto': + pcm_format = None + else: + pcm_format = PcmFormat.from_str(pcm_format) + self._pcm_format_in = pcm_format + # We always output stereo INT16 at the same nominal sample rate. + self._pcm_format_out = PcmFormat( + PcmFormat.Endianness.LITTLE, + PcmFormat.SampleType.INT16, + pcm_format.sample_rate, + 2, + ) + + # sounddevice stream (created in _open) + self._stream = None # type: ignore[assignment] + self._stream_started = False # Track if stream has been started + + # --- ASRC state (inspired by asrc.py) --- + # Nominal input/output rate ratio + self._r = 1.0 + self._integral = 0.0 + self._phi = 0.0 # fractional read position within current chunk + + # PI gains (normalized to seconds error: see _update_ratio) + # Smaller values to avoid saturation and oscillation + self._Kp = 0.5 # proportional on time error (seconds) + self._Ki = 0.08 # integral on time error (seconds) + self._R0 = 1.0 + # Controller smoothing and slew limiting + self._alpha_r = 0.25 # low-pass blend factor for ratio updates + self._max_step_ppm = 3000.0 # limit ratio change per update (ppm) + self._integral_limit = 0.02 # clamp integral contribution (approx ±2%/Ki) + + # Target FIFO level and deadband (slightly larger headroom for jitter) + fs = float(self._pcm_format_in.sample_rate) + self._target_samples = max(1, int(0.015 * fs)) # 20ms headroom to avoid underflows + # Slightly larger deadband to reduce chatter (≈1 ms) + self._deadband = max(1, int(0.001 * fs)) + + # Ring buffer for mono float32 samples + # Capacity ~2 seconds for headroom + self._rb_cap = max(self._target_samples * 32, int(2 * fs)) + self._rb = None # created in _init_rb() + self._ridx = 0 + self._size = 0 + self._lock = None # created in _init_rb() + self._init_rb() + + # Light logging timer + self._last_log = 0.0 + # Input-side throughput logging + self._in_samples = 0 + self._in_last_log = 0.0 + + # Streaming resampler and internal output buffer (lazy init) + self._rs = None # samplerate.Resampler + self._out_buf = None # numpy.ndarray float32 + + # ---------------- Internal helpers ----------------- + def _init_rb(self) -> None: + # Lazy import standard libs to keep only io_bumble imported at top level + import threading + from array import array + + self._rb = array('f', [0.0] * self._rb_cap) # float32 ring buffer + self._lock = threading.Lock() + self._ridx = 0 + self._size = 0 + + def _fifo_len(self) -> int: + with self._lock: + return self._size + + def _fifo_write(self, x_f32) -> None: + # x_f32: 1-D float32-like iterable + k = len(x_f32) + if k <= 0: + return + rb = self._rb + if rb is None: + return + with self._lock: + # Trim if larger than capacity: keep last N + if k >= self._rb_cap: + x_f32 = x_f32[-self._rb_cap:] + k = self._rb_cap + # Make room on overflow (drop oldest) + excess = max(0, self._size + k - self._rb_cap) + if excess: + self._ridx = (self._ridx + excess) % self._rb_cap + self._size -= excess + # Write at tail position + wpos = (self._ridx + self._size) % self._rb_cap + first = min(k, self._rb_cap - wpos) + # Write first chunk + from array import array as _array # lazy import + rb[wpos:wpos + first] = _array('f', x_f32[:first]) + # Wrap if needed + second = k - first + if second: + rb[0:second] = _array('f', x_f32[first:]) + self._size += k + + def _fifo_peek_array(self, n: int): + # Returns a Python list[float] copy of up to n samples + rb = self._rb + if rb is None: + return [] + m = max(0, min(n, self._fifo_len())) + if m <= 0: + return [] + pos = self._ridx + first = min(m, self._rb_cap - pos) + # Copy out + out = [0.0] * m + # First chunk + out[:first] = rb[pos:pos + first] + # Second chunk if wrap + second = m - first + if second > 0: + out[first:] = rb[0:second] + return out + + def _fifo_discard(self, n: int) -> None: + with self._lock: + d = max(0, min(n, self._size)) + self._ridx = (self._ridx + d) % self._rb_cap + self._size -= d + + def _update_ratio(self) -> None: + # PI loop to hold buffer near target + # Error: positive when buffer too full (need to consume more = lower ratio) + e_samples = self._fifo_len() - self._target_samples + if -self._deadband <= e_samples <= self._deadband: + e_samples = 0.0 + fs = float(self._pcm_format_in.sample_rate) + e_time = float(e_samples) / max(1e-9, fs) # seconds + + # Integrator on time error with clamping (anti-windup) + cand_integral = self._integral + e_time + # Clamp integral to prevent runaway + if cand_integral > self._integral_limit: + cand_integral = self._integral_limit + elif cand_integral < -self._integral_limit: + cand_integral = -self._integral_limit + + # Compute new ratio command (NEG sign: if buffer too full, reduce ratio) + r_cmd = self._R0 * (1.0 - (self._Kp * e_time + self._Ki * cand_integral)) + + # Absolute clamp to ±20000 ppm vs nominal + ppm_cmd = 1e6 * (r_cmd / self._R0 - 1.0) + if ppm_cmd > 20000.0: + ppm_cmd = 20000.0 + elif ppm_cmd < -20000.0: + ppm_cmd = -20000.0 + r_cmd = self._R0 * (1.0 + ppm_cmd * 1e-6) + + # Slew-rate limit per update + ppm_prev = 1e6 * (self._r / self._R0 - 1.0) + dppm = ppm_cmd - ppm_prev + if dppm > self._max_step_ppm: + ppm_cmd = ppm_prev + self._max_step_ppm + elif dppm < -self._max_step_ppm: + ppm_cmd = ppm_prev - self._max_step_ppm + + # Low-pass smoothing of ratio updates + r_target = self._R0 * (1.0 + ppm_cmd * 1e-6) + self._r = (1.0 - self._alpha_r) * self._r + self._alpha_r * r_target + + # Accept integral only when not at hard clamps to reduce windup + if abs(ppm_cmd) < 20000.0: + self._integral = cand_integral + else: + # light decay when clamped + self._integral *= 0.995 + + # Occasional log + try: + import time as _time + now = _time.time() + if now - self._last_log > 1.0: + buf_ms = 1000.0 * self._fifo_len() / float(self._pcm_format_in.sample_rate) + print( + f"\nASRC buf={buf_ms:5.1f} ms r={self._r:.9f} corr={1e6 * (self._r / self._R0 - 1.0):+7.1f} ppm" + ) + self._last_log = now + except Exception: + # Logging must never break audio + pass + + def _process(self, n_out: int) -> list[float]: + # Accumulate at least n_out samples using samplerate.Resampler + if n_out <= 0: + return [] + # Lazy imports + import numpy as np # type: ignore + + # Lazy init output buffer + if self._out_buf is None: + self._out_buf = np.zeros(0, dtype=np.float32) + + # Choose chunk so we don't take too much from FIFO each time + max_chunk = max(256, int(np.ceil(n_out / max(1e-9, self._r)))) + safety_iters = 0 + while self._out_buf.size < n_out and safety_iters < 16: + safety_iters += 1 + available = self._fifo_len() + if available <= 0: + break + take = min(available, max_chunk) + x = self._fifo_peek_array(take) + self._fifo_discard(take) + if not x: + break + x_arr = np.asarray(x, dtype=np.float32) + if self._rs is not None: + try: + y = self._rs.process(x_arr, ratio=float(self._r), end_of_input=False) + except Exception: + logger.exception("ASRC resampler error") + y = None + else: + y = None + if y is not None and getattr(y, 'size', 0): + y = y.astype(np.float32, copy=False) + if self._out_buf.size == 0: + self._out_buf = y + else: + self._out_buf = np.concatenate((self._out_buf, y)) + + if self._out_buf.size >= n_out: + out = self._out_buf[:n_out] + self._out_buf = self._out_buf[n_out:] + return out.tolist() + else: + # Not enough data produced; pad with zeros + out = np.zeros(n_out, dtype=np.float32) + # Debug: report zero-padding underflow + try: + import time as _time + if _time.time() - self._last_log > 0.5: + produced = int(self._out_buf.size) + fifo_now = int(self._fifo_len()) + corr_ppm = 1e6 * (self._r / self._R0 - 1.0) + print( + f"\nASRC debug: zero-padding underflow produced={produced}/{n_out} " + f"fifo={fifo_now} r={self._r:.9f} corr={corr_ppm:+.1f} ppm" + ) + self._last_log = _time.time() + except Exception: + pass + if self._out_buf.size: + out[: self._out_buf.size] = self._out_buf + self._out_buf = np.zeros(0, dtype=np.float32) + return out.tolist() + + def _mono_to_stereo_int16_bytes(self, mono_f32: list[float]) -> bytes: + # Convert [-1,1] float list to stereo int16 little-endian bytes + import struct + ba = bytearray() + for v in mono_f32: + # clip + if v > 1.0: + v = 1.0 + elif v < -1.0: + v = -1.0 + i16 = int(v * 32767.0) + ba += struct.pack(' PcmFormat: + # Set up sounddevice RawInputStream (int16) and start callback producer + import sounddevice # pylint: disable=import-error + import math + import samplerate as sr # type: ignore + + # We capture mono regardless of requested channels, then output stereo. + channels = 1 + samplerate = int(self._pcm_format_in.sample_rate) + + # Force ALSA backend by selecting an ALSA-hostapi input device + # try: + # # Resolve current device info/name first (fallback: None -> default input) + # cur_dev_info = None + # try: + # cur_dev_info = sounddevice.query_devices(self._device, 'input') + # except Exception: + # cur_dev_info = None + # cur_name = (cur_dev_info or {}).get('name', '') if isinstance(cur_dev_info, dict) else '' + + # # Discover ALSA-capable input devices by scanning all devices + # all_devs = list(sounddevice.query_devices()) + + # alsa_candidates: list[tuple[int, dict, dict]] = [] + # for didx, info in enumerate(all_devs): + # if not isinstance(info, dict): + # continue + # if (info.get('max_input_channels') or 0) <= 0: + # continue + # try: + # host = sounddevice.query_hostapis(info.get('hostapi')) + # except Exception: + # continue + # if not isinstance(host, dict): + # continue + # if 'ALSA' in (host.get('name') or ''): + # alsa_candidates.append((didx, info, host)) + + # if not alsa_candidates: + # raise RuntimeError("ASRC: No ALSA host API/input device available. Ensure PortAudio was built with ALSA and an ALSA input exists.") + + # # Prefer same-name device, else pick first ALSA input + # chosen_idx, chosen_info, chosen_host = None, None, None + # if cur_name: + # for didx, info, host in alsa_candidates: + # if (info.get('name') or '') == cur_name: + # chosen_idx, chosen_info, chosen_host = didx, info, host + # break + # if chosen_idx is None: + # chosen_idx, chosen_info, chosen_host = alsa_candidates[0] + + # # Switch default hostapi and target device to ALSA + # try: + # sounddevice.default.hostapi = chosen_info.get('hostapi') # index + # except Exception: + # pass + # logger.info( + # "ASRC: forcing ALSA backend: switching device index %s -> %s (name='%s')", + # str(self._device), str(chosen_idx), (cur_name or '') + # ) + # self._device = int(chosen_idx) + + # # Final verification: ensure the currently selected input device is on ALSA + # try: + # finfo = sounddevice.query_devices(self._device, 'input') + # fhost = sounddevice.query_hostapis(finfo['hostapi']) if isinstance(finfo, dict) else {} + # fname = (fhost.get('name') or '') if isinstance(fhost, dict) else '' + # except Exception: + # finfo, fname = None, '' + # if 'ALSA' not in (fname or ''): + # raise RuntimeError( + # f"ASRC: expected ALSA hostapi after selection, but got '{fname or 'UNKNOWN'}'" + # ) + # except Exception as e: + # logger.exception("ASRC: error while attempting to force ALSA backend") + # raise + + # Debug: print which backend/host API and device are used by sounddevice (after forcing) + dev_info = sounddevice.query_devices(self._device, 'input') + hostapi_info = sounddevice.query_hostapis(dev_info['hostapi']) + # PortAudio version string (may be empty on some builds) + try: + pa_ver = sounddevice.get_portaudio_version()[1] + except Exception: + pa_ver = '' + logger.info( + "ASRC sounddevice: device='%s' hostapi='%s' samplerate=%d channels=%d portaudio='%s'", + dev_info.get('name', '?'), + hostapi_info.get('name', '?'), + samplerate, + channels, + pa_ver or '?' + ) + + def _callback(indata, frames, time_info, status): # noqa: ARG001 (signature is fixed) + # indata: raw int16 bytes-like buffer of shape (frames, channels) + try: + if status: + logger.warning("Input status: %s", status) + if frames <= 0: + return + # Vectorized convert INT16 LE mono -> float32 [-1,1] + import numpy as _np # type: ignore + try: + x_i16 = _np.frombuffer(indata, dtype=_np.int16) + x_f32 = _np.asarray(x_i16, dtype=_np.float32) * (1.0 / 32768.0) + except Exception: + # Fallback to safe (slower) path if buffer view fails + mv = memoryview(indata).cast('h') + x_f32 = _np.empty(frames, dtype=_np.float32) + for i in range(frames): + v = mv[i] + f = float(v) / 32768.0 + if not (f == f) or math.isinf(f): + f = 0.0 + x_f32[i] = f + # Debug: detect silent input chunks (all zeros) + try: + import time as _time + if _np.max(_np.abs(x_f32)) == 0.0 and (_time.time() - self._last_log > 0.5): + print("\nASRC debug: input chunk is silent (all zeros)") + self._last_log = _time.time() + except Exception: + pass + # Write to FIFO + self._fifo_write(x_f32) + # Input throughput logging + try: + import time as _time + self._in_samples += int(frames) + now = _time.time() + if now - self._in_last_log >= 1.0: + sps = self._in_samples / max(1e-9, (now - self._in_last_log)) + fifo_now = int(self._fifo_len()) + corr_ppm = 1e6 * (self._r / self._R0 - 1.0) + print( + f"\nASRC debug: input_sps={sps:.1f} fifo={fifo_now} r={self._r:.9f} corr={corr_ppm:+.1f} ppm" + ) + self._in_samples = 0 + self._in_last_log = now + except Exception: + pass + except Exception: # never let callback raise + logger.exception("Audio input callback error") + + # Create streaming resampler (mono) + try: + self._rs = sr.Resampler(converter_type="sinc_medium", channels=1) + except Exception: + logger.exception("Failed to create samplerate.Resampler; audio may be silent") + self._rs = None + + self._stream = sounddevice.RawInputStream( + samplerate=samplerate, + device=self._device, + channels=channels, + dtype='int16', + callback=_callback, + blocksize=int( 2.0e-3 * samplerate), + latency='low', # Use low latency to minimize buffer size + ) + # Don't start stream yet - wait for first read to avoid buffer buildup + # self._stream.start() # Delayed until first _read() + + return self._pcm_format_out + + def _read(self, frame_size: int) -> bytes: + # Produce 'frame_size' output frames (stereo INT16) + if frame_size <= 0: + return b'' + + # Start stream on first read to avoid buffer buildup during initialization + if self._stream and not self._stream_started: + self._stream.start() + self._stream_started = True + # Don't block - let callback start filling buffer asynchronously + + # Update resampling ratio based on FIFO level + try: + self._update_ratio() + except Exception: + # keep going even if update failed + pass + # Process mono float32 + mono = self._process(frame_size) + # Convert to stereo int16 LE bytes + return self._mono_to_stereo_int16_bytes(mono) + + def _close(self) -> None: + try: + if self._stream is not None: + self._stream.stop() + self._stream.close() + except Exception: + logger.exception('Error closing input stream') + finally: + self._stream = None + self._stream_started = False diff --git a/src/scripts/print_pw_latency.sh b/src/scripts/print_pw_latency.sh new file mode 100644 index 0000000..07db88f --- /dev/null +++ b/src/scripts/print_pw_latency.sh @@ -0,0 +1,185 @@ +#!/usr/bin/env bash +# print_pw_latency.sh — latency info for nodes that currently appear in PipeWire Links +# Deps: pw-cli, pw-dump, pw-link, pw-metadata, jq, awk, sed, grep, sort, uniq + +set -euo pipefail +FILTER_NAME="" +FILTER_ID="" + +usage() { + cat <&2; usage; exit 1 ;; + esac +done + +need() { command -v "$1" >/dev/null 2>&1 || { echo "Missing dependency: $1" >&2; exit 1; }; } +for bin in pw-cli pw-dump pw-link pw-metadata jq awk sed grep sort uniq; do need "$bin"; done + +div() { printf '%*s\n' "${COLUMNS:-80}" '' | tr ' ' -; } + +echo "PipeWire latency inspector (linked nodes) — $(date)" +div +echo "GLOBAL CLOCK SETTINGS (pw-metadata -n settings):" +pw-metadata -n settings 2>/dev/null || echo "(no settings metadata found)" +echo +# Parse 'update: id:0 key:'clock.rate' value:'48000' type:''' +pw-metadata -n settings 2>/dev/null \ +| awk ' + match($0, /key:\x27([^'\''"]+)\x27[[:space:]]+value:\x27([^'\''"]*)\x27/, m) { + printf " %s: %s\n", m[1], m[2] + } +' || true + +div +echo "CURRENT LINKS (pw-link -l):" +pw-link -l 2>/dev/null || echo "(no links or cannot list links)" +div + +# ---------- Collect node IDs from Links ---------- +# Primary: use pw-dump Link objects (no state filter; just "any existing link") +LINK_NODE_IDS="$( + pw-dump 2>/dev/null \ + | jq -r ' + .[] | select(.type=="PipeWire:Interface:Link") | select(.info!=null) + | [.info.output_node, .info.input_node] | @tsv + ' \ + | awk '{print $1"\n"$2}' \ + | sed '/^$/d' | sort -n | uniq || true +)" + +# Fallback: parse names from pw-link -l and map names -> IDs via pw-dump +if [[ -z "$LINK_NODE_IDS" ]]; then + # Extract all "node:port" tokens and cut at the colon → node names + LINK_NODE_NAMES="$( + pw-link -l 2>/dev/null \ + | sed -n 's/^[[:space:]]*|[-<>][[:space:]]*//; s/[[:space:]]\+$//; p' \ + | awk -F':' '/:/{print $1}' \ + | sed '/^$/d' | sort | uniq || true + )" + + if [[ -n "$LINK_NODE_NAMES" ]]; then + # Build name->id map from pw-dump Nodes + # Prefer node.name; fallback to node.description + while read -r nm; do + [[ -z "$nm" ]] && continue + id="$(pw-dump | jq -r --arg n "$nm" ' + .[] | select(.type=="PipeWire:Interface:Node") + | select((.info.props["node.name"] // .info.props["node.description"] // "") == $n) + | .id + ' | head -n1)" + [[ -n "$id" ]] && echo "$id" + done <<< "$LINK_NODE_NAMES" \ + | sort -n | uniq > /tmp/_pw_link_node_ids.$$ + LINK_NODE_IDS="$(cat /tmp/_pw_link_node_ids.$$ || true)" + rm -f /tmp/_pw_link_node_ids.$$ || true + fi +fi + +if [[ -z "$LINK_NODE_IDS" ]]; then + echo "No linked nodes found." + exit 0 +fi + +# Pull all nodes and keep only those in LINK_NODE_IDS +IDS_JSON="[$(echo "$LINK_NODE_IDS" | awk 'NR>1{printf ","}{printf "%s",$0} END{print ""}')]" +NODES_JSON="$(pw-dump | jq -c '.[] | select(.type=="PipeWire:Interface:Node")')" +NODES_JSON="$(echo "$NODES_JSON" | jq -c --argjson ids "$IDS_JSON" 'select([.id] | inside($ids))')" + +# Optional filters +[[ -n "$FILTER_ID" ]] && NODES_JSON="$(echo "$NODES_JSON" | jq -c "select(.id==$FILTER_ID)")" +[[ -n "$FILTER_NAME" ]] && NODES_JSON="$(echo "$NODES_JSON" | jq -c "select((.info.props[\"node.name\"] // .info.props[\"node.description\"] // \"\") | test(\"$FILTER_NAME\"))")" + +if [[ -z "$NODES_JSON" ]]; then + echo "No matching linked nodes." + exit 0 +fi + +echo "LINKED NODE LATENCY DETAILS:" +while IFS= read -r node; do + id=$(echo "$node" | jq -r '.id') + name=$(echo "$node" | jq -r '.info.props["node.name"] // .info.props["node.description"] // "unknown"') + appname=$(echo "$node" | jq -r '.info.props["application.name"] // ""') + media=$(echo "$node" | jq -r '.info.props["media.class"] // ""') + state=$(echo "$node" | jq -r '.info.state // ""') + format=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.format // empty') + rate=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.rate // empty') + channels=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.channels // empty') + + echo + echo "Node #$id ${name} ${appname:+(app: $appname)}" + echo " state: $state | media.class: $media" + if [[ -n "$rate" || -n "$channels" || -n "$format" ]]; then + echo " format: ${format:-?} rate: ${rate:-?} channels: ${channels:-?}" + if [[ -n "$rate" ]]; then + q=$(pw-cli enum-params "$id" ProcessLatency 2>/dev/null | awk '/quantum/ {print $NF; exit}') + [[ -n "${q:-}" ]] && awk -v q="$q" -v r="$rate" 'BEGIN{printf " ~sched per-cycle: %d/%d = %.3f ms\n", q, r, (q*1000.0/r)}' + fi + fi + + LATENCY_RAW="$(pw-cli enum-params "$id" Latency 2>/dev/null || true)" + if [[ -n "$LATENCY_RAW" ]]; then + echo " node.latency param (per-direction):" + echo "$LATENCY_RAW" | awk ' + BEGIN{RS="";FS="\n"} + { + dir=""; + for(i=1;i<=NF;i++){ + if($i ~ /direction/){ + if($i ~ /input/) dir="capture"; + else if($i ~ /output/) dir="playback"; + else dir="unknown"; + } + if($i ~ /min-quantum/ || $i ~ /max-quantum/ || $i ~ /rate/ || $i ~ /ns/){ + gsub(/^[ \t]+/,"",$i); gsub(/Prop: key /,"",$i); + printf " [%s] %s\n", dir, $i; + } + } + }' + else + echo " node.latency: (no Latency param reported)" + fi + + PROC_RAW="$(pw-cli enum-params "$id" ProcessLatency 2>/dev/null || true)" + if [[ -n "$PROC_RAW" ]]; then + echo " ProcessLatency:" + echo "$PROC_RAW" | awk '/quantum|rate|ns/ { gsub(/^[ \t]+/,""); sub(/Prop: key /,""); print " " $0 }' + else + echo " ProcessLatency: (not reported)" + fi + + INFO_RAW="$(pw-cli info "$id" 2>/dev/null || true)" + if echo "$INFO_RAW" | grep -q "api.alsa"; then + echo " ALSA (periods/headroom):" + echo "$INFO_RAW" | awk -F': ' ' + /api\.alsa\.period-size/ {print " api.alsa.period-size: " $2} + /api\.alsa\.periods/ {print " api.alsa.periods: " $2} + /api\.alsa\.headroom/ {print " api.alsa.headroom: " $2} + /api\.alsa\.disable-batch/ {print " api.alsa.disable-batch: " $2} + /api\.alsa\.use-chmap/ {print " api.alsa.use-chmap: " $2} + ' | sed 's/ (.*)//' + fi + + echo "$INFO_RAW" | awk -F': ' ' + /node\.force-rate/ {print " Hint: node.force-rate: " $2} + /node\.force-quantum/ {print " Hint: node.force-quantum: " $2} + /node\.latency/ {print " Hint: node.latency (prop): " $2} + /resample\.quality/ {print " Hint: resample.quality: " $2} + ' | sed 's/ (.*)//' || true +done <<< "$NODES_JSON" + +div +cat <<'TIP' +Notes: +- We include any node that appears in a Link (from pw-dump); this matches what pw-link -l shows. +- ~sched per-cycle = quantum/rate from ProcessLatency. Device/codec/RF/jitter buffers sit on top. +TIP