feature/asrc #9

Closed
pstruebi wants to merge 2 commits from feature/asrc into main
12 changed files with 1431 additions and 27 deletions
+158
View File
@@ -0,0 +1,158 @@
# Auracast Latency Analysis
**Date**: 2025-10-08
**Measured End-to-End Latency**: ~180ms
**Expected Latency**: ~130ms
**Gap**: ~74ms (unaccounted)
---
## Complete Latency Breakdown
### 1. **Transmitter Pipeline** (Measured: 32-34ms)
| Component | Latency | Status | Notes |
|-----------|---------|--------|-------|
| USB Device ADC→Callback | ~3ms | ✅ **VERIFIED** | Measured via sounddevice timing |
| ASRC Buffer (FIFO) | 1.6-3.7ms | ✅ Measured | Varies with clock drift correction |
| LC3 Encode | 0.2-0.5ms | ✅ Measured | Real-time frame encoding |
| ISO Application Queue | ~20ms | ✅ Measured | 1 frame × 20ms (iso_que_len=1) |
| HCI/Serial Queue | 0-10ms | ✅ Measured | Usually 0, UART keeps up |
| **Subtotal** | **32-34ms** | | |
### 2. **Bluetooth Transport** (32ms)
| Component | Latency | Status | Notes |
|-----------|---------|--------|-------|
| Transport Latency | 32ms | ✅ From BIG parameters | `transport_latency_big=32020` μs |
### 3. **Configured Presentation Delay** (40ms)
| Component | Latency | Status | Notes |
|-----------|---------|--------|-------|
| Presentation Delay | 40ms | ✅ From config | `presentation_delay_us=40000` |
### 4. **Total Accounted** = 104-106ms
### 5. **Missing Gap** = 74-76ms ❓
---
## Hypothesis Testing Results
### ✅ DISPROVEN: USB Device Internal Latency
**Test**: Measured sounddevice callback timing using `inputBufferAdcTime` vs `currentTime`
**Results**:
- Mean callback latency: **3.0ms**
- Range: 2-5ms
- USB device reports: 8.7ms default low latency
**Conclusion**: USB device is NOT contributing 74ms. It adds only ~3ms.
### ✅ VERIFIED: PipeWire Not Adding Significant Latency
**Settings**:
- clock.rate: 48000 Hz
- clock.quantum: 144 samples = **3ms** per cycle
- Minimal buffering
**Conclusion**: PipeWire adds ~3ms, not 74ms.
---
## Where Is the Missing 74ms?
Given that the **transmitter-side is fully accounted** (32-34ms + 32ms transport + 40ms presentation = 104-106ms), the missing **74ms must be on the RECEIVER side**.
### Likely Receiver Components:
#### 1. **Jitter Buffer** (30-50ms estimated)
- LE Audio receivers add buffering beyond `presentation_delay` to handle:
- Packet loss and retransmissions
- Clock drift between transmitter and receiver
- Radio interference and timing jitter
- **Industry typical**: 30-50ms for robust operation
#### 2. **DAC/Audio Output Pipeline** (20-30ms estimated)
- Hardware audio buffering
- DAC conversion latency
- Operating system audio stack
- Driver scheduling
#### 3. **Actual Presentation Delay > Configured** (10-20ms)
- `presentation_delay_us=40000` (40ms) may be a **minimum**
- Receiver might use larger value for robustness
- Some receivers add fixed safety margin
### **Estimated Receiver Total**: ~60-100ms (explains the 74ms gap!)
---
## Verification Steps
### To Prove Receiver-Side Latency:
1. **Check receiver's actual presentation delay**
- Query receiver's reported latency (if available via Bluetooth)
- Some LE Audio devices expose this via vendor commands
2. **Test with different presentation_delay_us values**
```python
# In auracast_config.py
presentation_delay_us = 20000 # Try 20ms instead of 40ms
```
- If end-to-end latency decreases proportionally → receiver respects it
- If latency stays ~180ms → receiver has fixed buffering
3. **Try different receiver device**
- If another LE Audio receiver shows ~130ms → confirms current receiver adds extra buffering
- If all receivers show ~180ms → it's inherent to LE Audio stack
4. **Check receiver "low latency mode"**
- Some LE Audio devices have gaming/low-latency modes
- May reduce jitter buffer at cost of robustness
---
## Current System Performance
### ✅ Excellent Transmitter Performance
- **32-34ms pipeline latency** is very good for software implementation
- ASRC stable at 1.6-3.7ms
- Encode fast at 0.2-0.5ms
- HCI queue not backing up (0-10ms)
### 📊 Bluetooth Transport
- **32ms transport latency** is standard for LE Audio with retransmissions (RTN=4)
- Could be reduced by lowering RTN, but reduces reliability
### ⚠️ Receiver Buffering (suspected)
- **~74ms unaccounted** likely in receiver
- Probably unavoidable with current receiver hardware/firmware
---
## Recommendations
1. **Accept ~180ms as baseline** if receiver-side buffering is confirmed
- Transmitter-side optimization complete
- 74ms receiver buffering is typical for robust LE Audio
2. **Test lower presentation_delay_us** (20ms instead of 40ms)
- May reduce total by ~20ms if receiver respects it
- Watch for audio glitches (sign of too-aggressive setting)
3. **Try different receiver** if ultra-low latency is critical
- Look for "gaming" or "low latency" LE Audio receivers
- Trade-off: May have more audio dropouts
4. **Consider LC3plus codec** (if supported)
- Lower frame duration (5ms vs 10ms)
- May reduce buffering requirements
- Not all devices support it
---
## Summary
**Your transmitter pipeline is excellent at ~32-34ms.** The additional 74ms gap is almost certainly receiver-side jitter buffering + DAC latency, which is typical for robust LE Audio operation. Total ~104-106ms measured on transmitter + ~74ms receiver = ~180ms end-to-end matches your measurements.
+1 -1
View File
@@ -177,7 +177,7 @@ git checkout 9abe5fe7db729280080a0bbc1397a528cd3ce658
rm -rf build
cmake -S . -B build -G"Unix Makefiles" \
-DBUILD_SHARED_LIBS=ON \
-DPA_USE_ALSA=OFF \
-DPA_USE_ALSA=ON \
-DPA_USE_PULSEAUDIO=ON \
-DPA_USE_JACK=OFF
cmake --build build -j$(nproc)
Generated
+25 -1
View File
@@ -2443,6 +2443,30 @@ files = [
{file = "rpds_py-0.25.1.tar.gz", hash = "sha256:8960b6dac09b62dac26e75d7e2c4a22efb835d827a7278c34f72b2b84fa160e3"},
]
[[package]]
name = "samplerate"
version = "0.2.1"
description = "Monolithic python wrapper for libsamplerate based on pybind11 and NumPy"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "samplerate-0.2.1-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:9b392e857cfeda712585d702871eab7169ba2c63209e8d2314fa2196d6127354"},
{file = "samplerate-0.2.1-cp310-cp310-win_amd64.whl", hash = "sha256:c237959ba0fd391b3293f2905c23f1ae15a096fc987cd776aef380ba426491c6"},
{file = "samplerate-0.2.1-cp311-cp311-macosx_12_0_universal2.whl", hash = "sha256:b0e1b5cb08edb19d232bd1ba67632590ed7a579b526ab4bf3983b4f2b2e9acb4"},
{file = "samplerate-0.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:3ebd447f2040950edc1dac32cb4afed3a74ed37e8e5ffe4775470213f799801f"},
{file = "samplerate-0.2.1-cp312-cp312-macosx_12_0_universal2.whl", hash = "sha256:69cc7ad887c36294129315a04a7f43837c7ed9caf42db94e9687cf66c9709200"},
{file = "samplerate-0.2.1-cp312-cp312-win_amd64.whl", hash = "sha256:c7f5e81b24debfa25981e367e3f5d191cf72e32b5c92613139f5ba94e7f18c01"},
{file = "samplerate-0.2.1-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:a2b39a7131da065072508548f0ab80c9565d8c75212eac2f61fc1b1f82bb1611"},
{file = "samplerate-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:c02d6e0f7541c4f6a64b97ff6d0a84f53a0c432c965031491b48d9d75a81f102"},
{file = "samplerate-0.2.1-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:137563c6069e23441b4c2059cf95a3ed5b20a6d747d0488014becb94b4dfc32f"},
{file = "samplerate-0.2.1-cp39-cp39-win_amd64.whl", hash = "sha256:0656233932f76af6070f56edf34d8ef56e62e2c503553229d2388953a2166cda"},
{file = "samplerate-0.2.1.tar.gz", hash = "sha256:464d3574412024184fb7428ecbaa1b2e207bddf5fbc10a5d9ddc3fc1c7b7ab1e"},
]
[package.dependencies]
numpy = "*"
[[package]]
name = "six"
version = "1.17.0"
@@ -2952,4 +2976,4 @@ test = ["pytest", "pytest-asyncio"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.11"
content-hash = "6b5300c349ed045e8fd3e617e6262bbd7e5c48c518e4c62cedf7c17da50ce8c0"
content-hash = "7290ffd17fde6febd61d505131efe260790b3d91fcb7442b6c4aa6fc32f6baeb"
+2 -1
View File
@@ -16,7 +16,8 @@ dependencies = [
"aiortc (>=1.13.0,<2.0.0)",
"sounddevice (>=0.5.2,<0.6.0)",
"python-dotenv (>=1.1.1,<2.0.0)",
"smbus2 (>=0.5.0,<0.6.0)"
"smbus2 (>=0.5.0,<0.6.0)",
"samplerate (>=0.2.1,<0.3.0)"
]
[project.optional-dependencies]
+160 -10
View File
@@ -47,6 +47,7 @@ import bumble.utils
import numpy as np # for audio down-mix
from bumble.device import Host, AdvertisingChannelMap
from bumble.audio import io as audio_io
from auracast.utils import io_asrc
from auracast import auracast_config
from auracast.utils.read_lc3_file import read_lc3_file
@@ -312,6 +313,10 @@ async def init_broadcast(
logging.info('Setup ISO Data Path')
bigs[f'big{i}']['iso_queue'] = iso_queue
bigs[f'big{i}']['iso_que_len'] = conf.iso_que_len
# Store timing-critical BIG parameters for latency tracking
bigs[f'big{i}']['iso_interval'] = big.iso_interval # milliseconds
bigs[f'big{i}']['transport_latency_big'] = big.transport_latency_big # microseconds
logging.info(f'big{i} parameters are:')
logging.info('%s', pprint.pformat(vars(big)))
@@ -319,17 +324,53 @@ async def init_broadcast(
await asyncio.sleep(i+1) # Wait for advertising to set up
# Track HCI/serial queue flow for latency measurement
import time
flow_state = {
'last_completed': 0,
'last_log_time': 0.0,
'max_pending': 0,
'max_queued': 0,
}
def on_flow():
data_packet_queue = iso_queue.data_packet_queue
print(
f'\rPACKETS: pending={data_packet_queue.pending}, '
f'queued={data_packet_queue.queued}, '
f'completed={data_packet_queue.completed}',
end='',
)
if global_config.debug:
bigs[f'big{0}']['iso_queue'].data_packet_queue.on('flow', on_flow)
now = time.time()
# Track maximums
if data_packet_queue.pending > flow_state['max_pending']:
flow_state['max_pending'] = data_packet_queue.pending
if data_packet_queue.queued > flow_state['max_queued']:
flow_state['max_queued'] = data_packet_queue.queued
# Log periodically (every 1 second)
if now - flow_state['last_log_time'] >= 1.0:
completed_delta = data_packet_queue.completed - flow_state['last_completed']
flow_state['last_completed'] = data_packet_queue.completed
flow_state['last_log_time'] = now
# Estimate serial/HCI latency from pending packets
# Each packet takes ~iso_interval to transmit (10ms typically)
conf_ref = bigs.get(f'big{i}', {})
iso_interval_ms = conf_ref.get('iso_interval', 10.0)
hci_latency_ms = data_packet_queue.pending * iso_interval_ms
logging.info(
"LATENCY HCI/Serial: pending=%d queued=%d (HCI buffer ~%.1f ms) | completed/s=%d | max: pending=%d queued=%d",
data_packet_queue.pending,
data_packet_queue.queued,
hci_latency_ms,
completed_delta,
flow_state['max_pending'],
flow_state['max_queued']
)
# Reset max counters
flow_state['max_pending'] = 0
flow_state['max_queued'] = 0
# Always enable flow tracking for latency measurement
bigs[f'big{0}']['iso_queue'].data_packet_queue.on('flow', on_flow)
bigs[f'big{0}']['flow_state'] = flow_state
return bigs
@@ -533,7 +574,10 @@ class Streamer():
# anything else, e.g. realtime stream from device (bumble)
else:
audio_input = await audio_io.create_audio_input(audio_source, input_format)
if audio_source.startswith('asrcdevice'):
audio_input = io_asrc.SoundDeviceAudioInputAsrc(audio_source[len('asrcdevice')+1:], input_format)
else:
audio_input = await audio_io.create_audio_input(audio_source, input_format)
# Store early so stop_streaming can close even if open() fails
big['audio_input'] = audio_input
# SoundDeviceAudioInput (used for `mic:<device>` captures) has no `.rewind`.
@@ -615,14 +659,54 @@ class Streamer():
big['channels'] = pcm_format.channels
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['sampling_frequency'] = global_config.auracast_sampling_rate_hz
big['audio_input'] = audio_input
big['encoder'] = encoder
big['precoded'] = False
# get and discard first frame
await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
# Log latency-relevant configuration and expected latency budget
for big_name, big in self.bigs.items():
frame_duration_ms = (big['lc3_frame_samples'] / big['sampling_frequency']) * 1000.0
iso_interval_ms = big.get('iso_interval', 0.0)
presentation_delay_ms = global_config.presentation_delay_us / 1000.0
transport_latency_ms = big.get('transport_latency_big', 0) / 1000.0
iso_queue_len = big.get('iso_que_len', 1)
# Expected latency breakdown:
# 1. ASRC buffer (target ~10-15ms, varies with clock drift)
# 2. Frame read/encode/write (measured ~0.2-0.5ms typically)
# 3. ISO app queue (IsoPacketStream._thresholds, max=iso_queue_len frames)
# 4. HCI/Serial queue (data_packet_queue.pending, varies with UART speed)
# 5. Transport latency (from BIG parameters - radio air time)
# 6. Presentation delay (configured delay at receiver)
asrc_target_ms = 10.0 # from io_asrc.py, but varies with clock drift
encode_estimate_ms = 0.5 # typical LC3 encode time
iso_buffer_ms = iso_queue_len * frame_duration_ms # max capacity
hci_estimate_ms = 0.0 # measured in real-time, typically 0 if UART keeps up
pipeline_estimate_ms = asrc_target_ms + encode_estimate_ms + iso_buffer_ms + hci_estimate_ms
expected_total_ms = pipeline_estimate_ms + transport_latency_ms + presentation_delay_ms
logging.info(
"LATENCY CONFIG [%s]: Frame=%.1f ms | ISO interval=%.1f ms | ISO queue capacity=%d frames",
big_name, frame_duration_ms, iso_interval_ms, iso_queue_len
)
logging.info(
"LATENCY BUDGET [%s]: Pipeline~%.1f (ASRC~%.1f + enc~%.1f + ISO~%.1f + HCI~%.1f) + Transport~%.1f + Presentation~%.1f = ~%.1f ms end-to-end",
big_name, pipeline_estimate_ms, asrc_target_ms, encode_estimate_ms, iso_buffer_ms, hci_estimate_ms, transport_latency_ms, presentation_delay_ms, expected_total_ms
)
logging.info("Streaming audio...")
bigs = self.bigs
self.is_streaming = True
# Latency tracking
import time
frame_count = 0
last_latency_log = 0.0
latency_log_interval = 1.0 # seconds
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
@@ -641,13 +725,25 @@ class Streamer():
if frames_gen is None:
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
t_read_start = time.time()
pcm_frame = await anext(frames_gen, None)
t_read_end = time.time()
# Get frame age from ASRC if available
asrc_frame_age_ms = 0.0
try:
if hasattr(big['audio_input'], '_last_frame_age_ms'):
asrc_frame_age_ms = big['audio_input']._last_frame_age_ms
except Exception:
pass
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
t_mix_start = time.time()
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
if pcm_frame.ndim > 1:
@@ -659,12 +755,66 @@ class Streamer():
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
t_mix_end = time.time()
t_encode_start = time.time()
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
t_encode_end = time.time()
t_write_start = time.time()
await big['iso_queue'].write(lc3_frame)
t_write_end = time.time()
# Periodic latency logging
frame_count += 1
now = time.time()
if now - last_latency_log >= latency_log_interval:
# Get ISO application queue depth (packets waiting in IsoPacketStream)
iso_depth = 0
try:
# The _thresholds deque tracks packets queued but not yet completed
iso_depth = len(big['iso_queue']._thresholds)
except Exception:
iso_depth = 0
# Calculate ISO queue latency (application-level buffering)
frame_duration_ms = (big['lc3_frame_samples'] / big['sampling_frequency']) * 1000.0
iso_latency_ms = iso_depth * frame_duration_ms
# Get HCI/serial queue depth (packets in HCI layer waiting for UART transmission)
hci_pending = 0
hci_latency_ms = 0.0
try:
hci_pending = big['iso_queue'].data_packet_queue.pending
iso_interval_ms = big.get('iso_interval', 10.0)
hci_latency_ms = hci_pending * iso_interval_ms
except Exception:
pass
# Measure operation times
read_ms = (t_read_end - t_read_start) * 1000.0
encode_ms = (t_encode_end - t_encode_start) * 1000.0 if 'precoded' not in big or not big['precoded'] else 0.0
write_ms = (t_write_end - t_write_start) * 1000.0
# Calculate total pipeline latency: ASRC age + encode + ISO buffer + HCI buffer
pipeline_latency_ms = asrc_frame_age_ms + encode_ms + iso_latency_ms + hci_latency_ms
# Get transport and presentation from config
transport_ms = big.get('transport_latency_big', 0) / 1000.0
presentation_ms = global_config.presentation_delay_us / 1000.0
total_expected_ms = pipeline_latency_ms + transport_ms + presentation_ms
logging.info(
"LATENCY REALTIME: capture→radio=%.1f ms (ASRC=%.1f + enc=%.1f + ISO_q=%.1f + HCI=%.1f) | depths: ISO=%d HCI=%d",
pipeline_latency_ms, asrc_frame_age_ms, encode_ms, iso_latency_ms, hci_latency_ms, iso_depth, hci_pending
)
logging.info(
"LATENCY TOTAL: capture→speaker = %.1f ms pipeline + %.1f ms transport + %.1f ms presentation = %.1f ms (measured ~180ms, gap=%.1f ms)",
pipeline_latency_ms, transport_ms, presentation_ms, total_expected_ms, 180.0 - total_expected_ms
)
last_latency_log = now
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')
+2 -3
View File
@@ -50,7 +50,6 @@ from auracast.utils.sounddevice_utils import (
refresh_pw_cache,
)
if __name__ == "__main__":
logging.basicConfig( #export LOG_LEVEL=DEBUG
@@ -61,7 +60,7 @@ if __name__ == "__main__":
# Load .env located next to this script (only uppercase keys will be referenced)
load_dotenv(dotenv_path='.env')
os.environ.setdefault("PULSE_LATENCY_MSEC", "3")
#os.environ.setdefault("PULSE_LATENCY_MSEC", "3")
# Refresh device cache and list inputs
refresh_pw_cache()
@@ -143,7 +142,7 @@ if __name__ == "__main__":
program_info=program_info,
language=language,
iso_que_len=1,
audio_source=f'device:{input_sel}',
audio_source=f'asrcdevice:{input_sel}',
input_format=f"int16le,{CAPTURE_SRATE},{channels}",
sampling_frequency=LC3_SRATE,
octets_per_frame=OCTETS_PER_FRAME,
+1 -1
View File
@@ -1 +1 @@
5078804E6FBCF893D5537715FD928E46AD576ECB
5078804E6FBCF893D5537715FD928E46AD576ECD
+1 -1
View File
@@ -517,7 +517,7 @@ else:
program_info=program_info,
language=language,
audio_source=(
f"device:{input_device}" if audio_mode in ("USB", "AES67") else (
f"asrcdevice:{input_device}" if audio_mode in ("USB", "AES67") else (
"webrtc" if audio_mode == "Webapp" else "network"
)
),
+48 -9
View File
@@ -177,7 +177,7 @@ class StreamerWorker:
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None
audio_mode_persist = 'Demo'
if first_source.startswith('device:'):
if first_source.startswith('device:') or first_source.startswith('asrcdevice:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
try:
usb_names = {d.get('name') for _, d in get_usb_pw_inputs()}
@@ -193,12 +193,17 @@ class StreamerWorker:
for big in conf.bigs:
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
elif big.audio_source.startswith('asrcdevice:'):
big.audio_source = f'asrcdevice:{device_index}'
devinfo = sd.query_devices(device_index)
capture_rate = int(devinfo.get('default_samplerate') or 48000)
# Force 48 kHz capture to match multicast_script behavior and minimize resampler/pipewire latency
capture_rate = 48000
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
for big in conf.bigs:
big.input_format = f"int16le,{capture_rate},{channels}"
if big.audio_source.startswith('device:') or big.audio_source.startswith('asrcdevice:'):
# Always override to 48 kHz for device/asrcdevice, regardless of frontend-provided input_format
big.input_format = f"int16le,{capture_rate},{channels}"
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
@@ -209,7 +214,12 @@ class StreamerWorker:
await reset_nrf54l(1)
await self._multicaster1.init_broadcast()
auto_started = False
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
if any(
big.audio_source.startswith("device:") or
big.audio_source.startswith("asrcdevice:") or
big.audio_source.startswith("file:")
for big in conf.bigs
):
await self._multicaster1.start_streaming()
auto_started = True
@@ -219,6 +229,11 @@ class StreamerWorker:
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
'input_device': input_device_name,
'input_source_kind': (
'asrcdevice' if any(b.audio_source.startswith('asrcdevice:') for b in conf.bigs) else (
'device' if any(b.audio_source.startswith('device:') for b in conf.bigs) else None
)
),
'program_info': [getattr(big, 'program_info', None) for big in conf.bigs],
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
@@ -241,12 +256,24 @@ class StreamerWorker:
conf.transport = TRANSPORT2
for big in conf.bigs:
if big.audio_source.startswith('device:'):
if big.audio_source.startswith('device:') or big.audio_source.startswith('asrcdevice:'):
device_name = big.audio_source.split(':', 1)[1]
device_index = get_device_index_by_name(device_name)
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
big.audio_source = f'device:{device_index}'
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
else:
big.audio_source = f'asrcdevice:{device_index}'
# Also force 48 kHz capture and set input_format for device/asrcdevice
try:
devinfo2 = sd.query_devices(device_index)
except Exception:
devinfo2 = {}
capture_rate2 = 48000
max_in2 = int((devinfo2 or {}).get('max_input_channels') or 1)
channels2 = max(1, min(2, max_in2))
big.input_format = f"int16le,{capture_rate2},{channels2}"
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
@@ -255,7 +282,12 @@ class StreamerWorker:
self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs)
await reset_nrf54l(0)
await self._multicaster2.init_broadcast()
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
if any(
big.audio_source.startswith("device:") or
big.audio_source.startswith("asrcdevice:") or
big.audio_source.startswith("file:")
for big in conf.bigs
):
await self._multicaster2.start_streaming()
async def _w_stop_all(self) -> bool:
@@ -442,7 +474,10 @@ async def _autostart_from_settings():
name=channel_names[0] if channel_names else "Broadcast0",
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
language=languages[0] if languages else "deu",
audio_source=f"device:{input_device_name}",
audio_source=(
f"asrcdevice:{input_device_name}" if (settings.get('input_source_kind') == 'asrcdevice')
else f"device:{input_device_name}"
),
# input_format is intentionally omitted to use the default
iso_que_len=1,
sampling_frequency=rate,
@@ -725,4 +760,8 @@ if __name__ == '__main__':
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
# Bind to localhost only for security: prevents network access, only frontend on same machine can connect
uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False)
uvicorn.run(
app, host="127.0.0.1",
port=5000,
#access_log=False
)
+727
View File
@@ -0,0 +1,727 @@
# Copyright 2025
#
# Drop-in replacement for `SoundDeviceAudioInput` that adds a tiny ASRC stage.
#
# Constraints per request:
# - Only import io_bumble.py at module level.
# - Reuse the ASRC functionality from asrc.py conceptually (PI control + FIFO +
# linear/sinc resampling behavior). We implement a minimal, dependency-free
# variant (linear interpolation with a small PI loop) so this module does not
# import anything else at top-level.
#
# Notes:
# - Input stream is captured via sounddevice (imported lazily inside methods).
# - Input is mono float32 for simplicity; output matches the original class
# signature: INT16, stereo, at the same nominal sample rate as requested.
from bumble.audio.io import PcmFormat, ThreadedAudioInput, logger # only top-level import
# Toggle for local debug logging in this module
DEBUG_LOG = False
LATENCY_DEBUG = True # Track latency through the pipeline
class SoundDeviceAudioInputAsrc(ThreadedAudioInput):
"""Sound device audio input with a simple ASRC stage.
Interface-compatible with `io_bumble.SoundDeviceAudioInput`:
- __init__(device_name: str, pcm_format: PcmFormat)
- _open() -> PcmFormat
- _read(frame_size: int) -> bytes
- _close() -> None
Behavior:
- Captures mono PCM frames using the device's native integer format by
attempting (in order): S32LE (int32), S24LE (packed int24), S16LE (int16),
and converts to internal float32.
- Buffers into an internal ring buffer.
- Produces stereo INT16 frames using a linear-interp resampler whose
ratio is adjusted by a tiny PI loop to hold FIFO depth near a target.
"""
def __init__(self, device_name: str, pcm_format: str) -> None:
super().__init__()
# Device & format
self._device = int(device_name) if device_name else None
pcm_format: PcmFormat | None
if pcm_format == 'auto':
pcm_format = None
else:
pcm_format = PcmFormat.from_str(pcm_format)
self._pcm_format_in = pcm_format
# We always output stereo INT16 at the same nominal sample rate.
self._pcm_format_out = PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
pcm_format.sample_rate,
2,
)
# sounddevice stream (created in _open)
self._stream = None # type: ignore[assignment]
self._stream_started = False # Track if stream has been started
# --- ASRC state (inspired by asrc.py) ---
# Nominal input/output rate ratio
self._r = 1.0
self._integral = 0.0
self._phi = 0.0 # fractional read position within current chunk
# PI gains (normalized to seconds error: see _update_ratio)
# Smaller values to avoid saturation and oscillation
self._Kp = 0.5 # proportional on time error (seconds)
self._Ki = 0.08 # integral on time error (seconds)
self._R0 = 1.0
# Controller smoothing and slew limiting
self._alpha_r = 0.25 # low-pass blend factor for ratio updates
self._max_step_ppm = 3000.0 # limit ratio change per update (ppm)
self._integral_limit = 0.02 # clamp integral contribution (approx ±2%/Ki)
# Target FIFO level and deadband (slightly larger headroom for jitter)
fs = float(self._pcm_format_in.sample_rate)
self._target_samples = max(1, int(0.015 * fs))
# Slightly larger deadband to reduce chatter (≈1 ms)
self._deadband = max(1, int(0.001 * fs))
# Ring buffer for mono float32 samples
# Capacity ~2 seconds for headroom
self._rb_cap = max(self._target_samples * 32, int(2 * fs))
self._rb = None # created in _init_rb()
self._ridx = 0
self._size = 0
self._lock = None # created in _init_rb()
self._init_rb()
# Light logging timer
self._last_log = 0.0
# Input-side throughput logging
self._in_samples = 0
self._in_last_log = 0.0
# --- Benchmarking state for _read() ---
self._bench_count = 0
self._bench_total_sum = 0.0
self._bench_proc_sum = 0.0
self._bench_conv_sum = 0.0
self._bench_total_min = float('inf')
self._bench_total_max = 0.0
self._bench_last_log = 0.0
self._bench_loop = 0 # increments every _read call
# Streaming resampler and internal output buffer (lazy init)
self._rs = None # samplerate.Resampler
self._out_buf = None # numpy.ndarray float32
self._in_dtype = None # chosen input dtype string: 'int32' | 'int24' | 'int16'
# Latency tracking
self._capture_timestamp = None # timestamp of last captured samples
self._latency_log_interval = 1.0 # log every N seconds
self._last_latency_log = 0.0
# Frame timestamp queue (circular buffer)
self._frame_timestamps = [] # list of (sample_position, timestamp)
self._total_samples_written = 0 # total samples written to FIFO
# ---------------- Internal helpers -----------------
def _init_rb(self) -> None:
# Lazy import standard libs to keep only io_bumble imported at top level
import threading
from array import array
self._rb = array('f', [0.0] * self._rb_cap) # float32 ring buffer
self._lock = threading.Lock()
self._ridx = 0
self._size = 0
def _fifo_len(self) -> int:
with self._lock:
return self._size
def _fifo_write(self, x_f32) -> None:
# x_f32: 1-D float32-like iterable
k = len(x_f32)
if k <= 0:
return
rb = self._rb
if rb is None:
return
with self._lock:
# Trim if larger than capacity: keep last N
if k >= self._rb_cap:
x_f32 = x_f32[-self._rb_cap:]
k = self._rb_cap
# Make room on overflow (drop oldest)
excess = max(0, self._size + k - self._rb_cap)
if excess:
self._ridx = (self._ridx + excess) % self._rb_cap
self._size -= excess
# Write at tail position
wpos = (self._ridx + self._size) % self._rb_cap
first = min(k, self._rb_cap - wpos)
# Write first chunk
from array import array as _array # lazy import
rb[wpos:wpos + first] = _array('f', x_f32[:first])
# Wrap if needed
second = k - first
if second:
rb[0:second] = _array('f', x_f32[first:])
self._size += k
def _fifo_peek_array(self, n: int):
# Returns a Python list[float] copy of up to n samples
rb = self._rb
if rb is None:
return []
m = max(0, min(n, self._fifo_len()))
if m <= 0:
return []
pos = self._ridx
first = min(m, self._rb_cap - pos)
# Copy out
out = [0.0] * m
# First chunk
out[:first] = rb[pos:pos + first]
# Second chunk if wrap
second = m - first
if second > 0:
out[first:] = rb[0:second]
return out
def _fifo_discard(self, n: int) -> None:
with self._lock:
d = max(0, min(n, self._size))
self._ridx = (self._ridx + d) % self._rb_cap
self._size -= d
def _update_ratio(self) -> None:
# PI loop to hold buffer near target
# Error: positive when buffer too full (need to consume more = lower ratio)
e_samples = self._fifo_len() - self._target_samples
if -self._deadband <= e_samples <= self._deadband:
e_samples = 0.0
fs = float(self._pcm_format_in.sample_rate)
e_time = float(e_samples) / max(1e-9, fs) # seconds
# Integrator on time error with clamping (anti-windup)
cand_integral = self._integral + e_time
# Clamp integral to prevent runaway
if cand_integral > self._integral_limit:
cand_integral = self._integral_limit
elif cand_integral < -self._integral_limit:
cand_integral = -self._integral_limit
# Compute new ratio command (NEG sign: if buffer too full, reduce ratio)
r_cmd = self._R0 * (1.0 - (self._Kp * e_time + self._Ki * cand_integral))
# Absolute clamp to ±20000 ppm vs nominal
ppm_cmd = 1e6 * (r_cmd / self._R0 - 1.0)
if ppm_cmd > 20000.0:
ppm_cmd = 20000.0
elif ppm_cmd < -20000.0:
ppm_cmd = -20000.0
r_cmd = self._R0 * (1.0 + ppm_cmd * 1e-6)
# Slew-rate limit per update
ppm_prev = 1e6 * (self._r / self._R0 - 1.0) # --- Benchmarking state for _read() ---
dppm = ppm_cmd - ppm_prev
if dppm > self._max_step_ppm:
ppm_cmd = ppm_prev + self._max_step_ppm
elif dppm < -self._max_step_ppm:
ppm_cmd = ppm_prev - self._max_step_ppm
# Low-pass smoothing of ratio updates
r_target = self._R0 * (1.0 + ppm_cmd * 1e-6)
self._r = (1.0 - self._alpha_r) * self._r + self._alpha_r * r_target
# Accept integral only when not at hard clamps to reduce windup
if abs(ppm_cmd) < 20000.0:
self._integral = cand_integral
else:
# light decay when clamped
self._integral *= 0.995
# Occasional log
try:
import time as _time
now = _time.time()
if DEBUG_LOG and (now - self._last_log > 1.0):
buf_ms = 1000.0 * self._fifo_len() / float(self._pcm_format_in.sample_rate)
print(
f"\nASRC buf={buf_ms:5.1f} ms r={self._r:.9f} corr={1e6 * (self._r / self._R0 - 1.0):+7.1f} ppm"
)
self._last_log = now
except Exception:
# Logging must never break audio
pass
def _process(self, n_out: int) -> list[float]:
# Accumulate at least n_out samples using samplerate.Resampler
if n_out <= 0:
return []
# Lazy imports
import numpy as np # type: ignore
# Lazy init output buffer
if self._out_buf is None:
self._out_buf = np.zeros(0, dtype=np.float32)
# Choose chunk so we don't take too much from FIFO each time
max_chunk = max(256, int(np.ceil(n_out / max(1e-9, self._r))))
safety_iters = 0
while self._out_buf.size < n_out and safety_iters < 16:
safety_iters += 1
available = self._fifo_len()
if available <= 0:
break
take = min(available, max_chunk)
x = self._fifo_peek_array(take)
self._fifo_discard(take)
if not x:
break
x_arr = np.asarray(x, dtype=np.float32)
if self._rs is not None:
try:
y = self._rs.process(x_arr, ratio=float(self._r), end_of_input=False)
except Exception:
logger.exception("ASRC resampler error")
y = None
else:
y = None
if y is not None and getattr(y, 'size', 0):
y = y.astype(np.float32, copy=False)
if self._out_buf.size == 0:
self._out_buf = y
else:
self._out_buf = np.concatenate((self._out_buf, y))
if self._out_buf.size >= n_out:
out = self._out_buf[:n_out]
self._out_buf = self._out_buf[n_out:]
return out.tolist()
else:
# Not enough data produced; pad with zeros
out = np.zeros(n_out, dtype=np.float32)
# Debug: report zero-padding underflow
try:
if DEBUG_LOG:
import time as _time
if _time.time() - self._last_log > 0.5:
produced = int(self._out_buf.size)
fifo_now = int(self._fifo_len())
corr_ppm = 1e6 * (self._r / self._R0 - 1.0)
print(
f"\nASRC debug: zero-padding underflow produced={produced}/{n_out} "
f"fifo={fifo_now} r={self._r:.9f} corr={corr_ppm:+.1f} ppm"
)
self._last_log = _time.time()
except Exception:
pass
if self._out_buf.size:
out[: self._out_buf.size] = self._out_buf
self._out_buf = np.zeros(0, dtype=np.float32)
return out.tolist()
def _mono_to_stereo_int16_bytes(self, mono_f32: list[float]) -> bytes:
# Convert [-1,1] float list to stereo int16 little-endian bytes
import struct
ba = bytearray()
for v in mono_f32:
# clip
if v > 1.0:
v = 1.0
elif v < -1.0:
v = -1.0
i16 = int(v * 32767.0)
ba += struct.pack('<hh', i16, i16)
return bytes(ba)
# ---------------- ThreadedAudioInput hooks -----------------
def _open(self) -> PcmFormat:
# Set up sounddevice RawInputStream (auto: int32 -> int24 -> int16) and start callback producer
import sounddevice # pylint: disable=import-error
import math
import samplerate as sr # type: ignore
# We capture mono regardless of requested channels, then output stereo.
channels = 1
samplerate = int(self._pcm_format_in.sample_rate)
# Force ALSA backend by selecting an ALSA-hostapi input device
# try:
# # Resolve current device info/name first (fallback: None -> default input)
# cur_dev_info = None
# try:
# cur_dev_info = sounddevice.query_devices(self._device, 'input')
# except Exception:
# cur_dev_info = None
# cur_name = (cur_dev_info or {}).get('name', '') if isinstance(cur_dev_info, dict) else ''
# # Discover ALSA-capable input devices by scanning all devices
# all_devs = list(sounddevice.query_devices())
# alsa_candidates: list[tuple[int, dict, dict]] = []
# for didx, info in enumerate(all_devs):
# if not isinstance(info, dict):
# continue
# if (info.get('max_input_channels') or 0) <= 0:
# continue
# try:
# host = sounddevice.query_hostapis(info.get('hostapi'))
# except Exception:
# continue
# if not isinstance(host, dict):
# continue
# if 'ALSA' in (host.get('name') or ''):
# alsa_candidates.append((didx, info, host))
# if not alsa_candidates:
# raise RuntimeError("ASRC: No ALSA host API/input device available. Ensure PortAudio was built with ALSA and an ALSA input exists.")
# # Prefer same-name device, else pick first ALSA input
# chosen_idx, chosen_info, chosen_host = None, None, None
# if cur_name:
# for didx, info, host in alsa_candidates:
# if (info.get('name') or '') == cur_name:
# chosen_idx, chosen_info, chosen_host = didx, info, host
# break
# if chosen_idx is None:
# chosen_idx, chosen_info, chosen_host = alsa_candidates[0]
# # Switch default hostapi and target device to ALSA
# try:
# sounddevice.default.hostapi = chosen_info.get('hostapi') # index
# except Exception:
# pass
# logger.info(
# "ASRC: forcing ALSA backend: switching device index %s -> %s (name='%s')",
# str(self._device), str(chosen_idx), (cur_name or '')
# )
# self._device = int(chosen_idx)
# # Final verification: ensure the currently selected input device is on ALSA
# try:
# finfo = sounddevice.query_devices(self._device, 'input')
# fhost = sounddevice.query_hostapis(finfo['hostapi']) if isinstance(finfo, dict) else {}
# fname = (fhost.get('name') or '') if isinstance(fhost, dict) else ''
# except Exception:
# finfo, fname = None, ''
# if 'ALSA' not in (fname or ''):
# raise RuntimeError(
# f"ASRC: expected ALSA hostapi after selection, but got '{fname or 'UNKNOWN'}'"
# )
# except Exception as e:
# logger.exception("ASRC: error while attempting to force ALSA backend")
# raise
# Debug: print which backend/host API and device are used by sounddevice (after forcing)
dev_info = sounddevice.query_devices(self._device, 'input')
hostapi_info = sounddevice.query_hostapis(dev_info['hostapi'])
# PortAudio version string (may be empty on some builds)
try:
pa_ver = sounddevice.get_portaudio_version()[1]
except Exception:
pa_ver = ''
if DEBUG_LOG:
logger.info(
"ASRC sounddevice: device='%s' hostapi='%s' samplerate=%d channels=%d portaudio='%s'",
dev_info.get('name', '?'),
hostapi_info.get('name', '?'),
samplerate,
channels,
pa_ver or '?'
)
def _callback(indata, frames, time_info, status): # noqa: ARG001 (signature is fixed)
# indata: raw integer PCM bytes-like buffer of shape (frames, channels)
try:
if status and DEBUG_LOG:
logger.warning("Input status: %s", status)
if frames <= 0:
return
# Convert native integer PCM -> float32 [-1,1]
import numpy as _np # type: ignore
dtype = self._in_dtype or 'int32'
if dtype == 'int32':
try:
x_i32 = _np.frombuffer(indata, dtype=_np.int32)
x_f32 = _np.asarray(x_i32, dtype=_np.float32) * (1.0 / 2147483648.0)
except Exception:
mv = memoryview(indata).cast('i')
x_f32 = _np.empty(frames, dtype=_np.float32)
for i in range(frames):
v = mv[i]
f = float(v) / 2147483648.0
if not (f == f) or math.isinf(f):
f = 0.0
x_f32[i] = f
elif dtype == 'int16':
try:
x_i16 = _np.frombuffer(indata, dtype=_np.int16)
x_f32 = _np.asarray(x_i16, dtype=_np.float32) * (1.0 / 32768.0)
except Exception:
mv = memoryview(indata).cast('h')
x_f32 = _np.empty(frames, dtype=_np.float32)
for i in range(frames):
v = mv[i]
f = float(v) / 32768.0
if not (f == f) or math.isinf(f):
f = 0.0
x_f32[i] = f
else: # 'int24' packed S24LE
try:
b = _np.frombuffer(indata, dtype=_np.uint8)
# Expect length == frames * channels * 3, channels=1
if b.size != frames * 3:
# Fallback: truncate to multiple of 3
n3 = (b.size // 3) * 3
b = b[:n3]
a = b.reshape(-1, 3)
# Little-endian assembly
val = (a[:, 0].astype(_np.int32) |
(a[:, 1].astype(_np.int32) << 8) |
(a[:, 2].astype(_np.int32) << 16))
# Sign-extend from 24-bit
neg = (a[:, 2] & 0x80) != 0
val[neg] -= (1 << 24)
x_f32 = val.astype(_np.float32) * (1.0 / 8388608.0) # 2**23
except Exception:
# Very slow scalar fallback
mv = memoryview(indata)
x_f32 = _np.empty(frames, dtype=_np.float32)
for i in range(frames):
o = i * 3
b0 = mv[o]
b1 = mv[o + 1]
b2 = mv[o + 2]
v = (b0 | (b1 << 8) | (b2 << 16))
if b2 & 0x80:
v -= (1 << 24)
f = float(v) / 8388608.0
if not (f == f) or math.isinf(f):
f = 0.0
x_f32[i] = f
# Debug: detect silent input chunks (all zeros)
try:
if DEBUG_LOG:
import time as _time
if _np.max(_np.abs(x_f32)) == 0.0 and (_time.time() - self._last_log > 0.5):
print("\nASRC debug: input chunk is silent (all zeros)")
self._last_log = _time.time()
except Exception:
pass
# Write to FIFO
self._fifo_write(x_f32)
# Track capture timestamp for latency measurement
if LATENCY_DEBUG:
import time as _time
capture_ts = _time.time()
self._capture_timestamp = capture_ts
# Record timestamp for these samples
self._total_samples_written += frames
self._frame_timestamps.append((self._total_samples_written, capture_ts))
# Keep only last 100 timestamps
if len(self._frame_timestamps) > 100:
self._frame_timestamps.pop(0)
# Input throughput logging
try:
if DEBUG_LOG:
import time as _time
self._in_samples += int(frames)
now = _time.time()
if now - self._in_last_log >= 1.0:
sps = self._in_samples / max(1e-9, (now - self._in_last_log))
fifo_now = int(self._fifo_len())
corr_ppm = 1e6 * (self._r / self._R0 - 1.0)
print(
f"\nASRC debug: input_sps={sps:.1f} fifo={fifo_now} r={self._r:.9f} corr={corr_ppm:+.1f} ppm"
)
self._in_samples = 0
self._in_last_log = now
except Exception:
pass
except Exception: # never let callback raise
logger.exception("Audio input callback error")
# Create streaming resampler (mono)
try:
self._rs = sr.Resampler(converter_type="sinc_medium", channels=1)
except Exception:
logger.exception("Failed to create samplerate.Resampler; audio may be silent")
self._rs = None
# Try opening stream with preferred native formats
last_err = None
for cand in ('int32', 'int24', 'int16'):
try:
st = sounddevice.RawInputStream(
samplerate=samplerate,
device=self._device,
channels=channels,
dtype=cand,
callback=_callback,
blocksize=int(2e-3 * samplerate),
latency='low',
)
self._stream = st
self._in_dtype = cand
if DEBUG_LOG:
logger.info("ASRC selected input dtype: %s", cand)
break
except Exception as e: # keep trying next
last_err = e
continue
if self._stream is None:
logger.exception("ASRC failed to open RawInputStream for all dtypes (int32,int24,int16)")
if last_err:
raise last_err
raise RuntimeError("ASRC: could not open input stream")
# Don't start stream yet - wait for first read to avoid buffer buildup
return self._pcm_format_out
def _read(self, frame_size: int) -> bytes:
# Produce 'frame_size' output frames (stereo INT16)
if frame_size <= 0:
return b''
# loop counter
self._bench_loop += 1
# Benchmark start
try:
from time import perf_counter as _pc
except Exception:
_pc = None # type: ignore
_t0 = _pc() if _pc else None
# Track which samples we're reading for latency measurement
read_start_time = None
if LATENCY_DEBUG:
import time as _time
read_start_time = _time.time()
# Start stream on first read to avoid buffer buildup during initialization
if self._stream and not self._stream_started:
self._stream.start()
self._stream_started = True
# Don't block - let callback start filling buffer asynchronously
_t1 = _pc() if _pc else None
# Update resampling ratio based on FIFO level
try:
self._update_ratio()
except Exception:
# keep going even if update failed
pass
_t2 = _pc() if _pc else None
# Process mono float32
mono = self._process(frame_size)
_t3 = _pc() if _pc else None
# Convert to stereo int16 LE bytes
out = self._mono_to_stereo_int16_bytes(mono)
_t4 = _pc() if _pc else None
# Latency tracking: estimate frame age from timestamp queue
frame_age_ms = 0.0
if LATENCY_DEBUG and read_start_time and self._frame_timestamps:
import time as _time
now = _time.time()
# Find oldest timestamp still in FIFO (conservative estimate)
fifo_samples = self._fifo_len()
if fifo_samples > 0 and self._frame_timestamps:
# Estimate: we're reading from tail of FIFO, oldest data is ~fifo_samples ago
# Find timestamp of samples that are now at the read position
read_position = self._total_samples_written - fifo_samples
# Find closest timestamp
frame_ts = None
for pos, ts in self._frame_timestamps:
if pos >= read_position:
frame_ts = ts
break
if frame_ts:
frame_age_ms = 1000.0 * (now - frame_ts)
fifo_ms = 1000.0 * fifo_samples / float(self._pcm_format_in.sample_rate)
if now - self._last_latency_log >= self._latency_log_interval:
logger.info(
"LATENCY: ASRC frame_age=%.1f ms | FIFO: %d samples (%.1f ms) | corr: %+.1f ppm",
frame_age_ms,
fifo_samples,
fifo_ms,
1e6 * (self._r / self._R0 - 1.0)
)
self._last_latency_log = now
# Aggregate benchmarking stats
try:
if _pc and _t0 is not None and _t4 is not None:
total = _t4 - _t0
proc = (_t3 - _t2) if (_t3 is not None and _t2 is not None) else 0.0
conv = (_t4 - _t3) if (_t4 is not None and _t3 is not None) else 0.0
# Compute 'other' segment (everything outside proc+conv)
other = max(0.0, total - (proc + conv))
# Immediate warning if proc+conv or total exceeds 10 ms
if DEBUG_LOG and ((proc + conv) > 0.010 or total > 0.010):
logger.warning(
"ASRC _read SLOW: loop=%d frame=%d total=%.3f ms | proc=%.3f ms conv=%.3f ms other=%.3f ms",
self._bench_loop,
frame_size,
1000.0 * total,
1000.0 * proc,
1000.0 * conv,
1000.0 * other,
)
self._bench_count += 1
self._bench_total_sum += total
self._bench_proc_sum += proc
self._bench_conv_sum += conv
if total < self._bench_total_min:
self._bench_total_min = total
if total > self._bench_total_max:
self._bench_total_max = total
# Periodic log (~2s)
import time as _time
now = _time.time()
if DEBUG_LOG and (now - self._bench_last_log >= 2.0) and self._bench_count > 0:
avg_total_ms = 1000.0 * (self._bench_total_sum / self._bench_count)
avg_proc_ms = 1000.0 * (self._bench_proc_sum / self._bench_count)
avg_conv_ms = 1000.0 * (self._bench_conv_sum / self._bench_count)
min_ms = 1000.0 * (self._bench_total_min if self._bench_total_min != float('inf') else 0.0)
max_ms = 1000.0 * self._bench_total_max
# Per-frame timing (µs) for convenience
per_frame_us = (avg_total_ms * 1000.0) / max(1, frame_size)
logger.info(
"ASRC _read bench: calls=%d frame=%d avg=%.3f ms min=%.3f ms max=%.3f ms | proc=%.3f ms conv=%.3f ms | per_frame=%.2f µs",
self._bench_count,
frame_size,
avg_total_ms,
min_ms,
max_ms,
avg_proc_ms,
avg_conv_ms,
per_frame_us,
)
# Reset period accumulators but keep extrema across run
self._bench_count = 0
self._bench_total_sum = 0.0
self._bench_proc_sum = 0.0
self._bench_conv_sum = 0.0
self._bench_last_log = now
except Exception:
# Never let benchmarking break audio path
pass
# Attach frame timestamp to output (as a pseudo-header for tracking)
# We'll store it in a class variable that multicast.py can read
if LATENCY_DEBUG and frame_age_ms > 0:
self._last_frame_age_ms = frame_age_ms
return out
def _close(self) -> None:
try:
if self._stream is not None:
self._stream.stop()
self._stream.close()
except Exception:
logger.exception('Error closing input stream')
finally:
self._stream = None
self._stream_started = False
+185
View File
@@ -0,0 +1,185 @@
#!/usr/bin/env bash
# print_pw_latency.sh — latency info for nodes that currently appear in PipeWire Links
# Deps: pw-cli, pw-dump, pw-link, pw-metadata, jq, awk, sed, grep, sort, uniq
set -euo pipefail
FILTER_NAME=""
FILTER_ID=""
usage() {
cat <<EOF
Usage: $0 [-n name_regex] [-i node_id]
Shows latency-related info ONLY for nodes that participate in current links.
EOF
}
while getopts ":n:i:h" opt; do
case "$opt" in
n) FILTER_NAME="$OPTARG" ;;
i) FILTER_ID="$OPTARG" ;;
h) usage; exit 0 ;;
\?) echo "Invalid option: -$OPTARG" >&2; usage; exit 1 ;;
esac
done
need() { command -v "$1" >/dev/null 2>&1 || { echo "Missing dependency: $1" >&2; exit 1; }; }
for bin in pw-cli pw-dump pw-link pw-metadata jq awk sed grep sort uniq; do need "$bin"; done
div() { printf '%*s\n' "${COLUMNS:-80}" '' | tr ' ' -; }
echo "PipeWire latency inspector (linked nodes) — $(date)"
div
echo "GLOBAL CLOCK SETTINGS (pw-metadata -n settings):"
pw-metadata -n settings 2>/dev/null || echo "(no settings metadata found)"
echo
# Parse 'update: id:0 key:'clock.rate' value:'48000' type:'''
pw-metadata -n settings 2>/dev/null \
| awk '
match($0, /key:\x27([^'\''"]+)\x27[[:space:]]+value:\x27([^'\''"]*)\x27/, m) {
printf " %s: %s\n", m[1], m[2]
}
' || true
div
echo "CURRENT LINKS (pw-link -l):"
pw-link -l 2>/dev/null || echo "(no links or cannot list links)"
div
# ---------- Collect node IDs from Links ----------
# Primary: use pw-dump Link objects (no state filter; just "any existing link")
LINK_NODE_IDS="$(
pw-dump 2>/dev/null \
| jq -r '
.[] | select(.type=="PipeWire:Interface:Link") | select(.info!=null)
| [.info.output_node, .info.input_node] | @tsv
' \
| awk '{print $1"\n"$2}' \
| sed '/^$/d' | sort -n | uniq || true
)"
# Fallback: parse names from pw-link -l and map names -> IDs via pw-dump
if [[ -z "$LINK_NODE_IDS" ]]; then
# Extract all "node:port" tokens and cut at the colon → node names
LINK_NODE_NAMES="$(
pw-link -l 2>/dev/null \
| sed -n 's/^[[:space:]]*|[-<>][[:space:]]*//; s/[[:space:]]\+$//; p' \
| awk -F':' '/:/{print $1}' \
| sed '/^$/d' | sort | uniq || true
)"
if [[ -n "$LINK_NODE_NAMES" ]]; then
# Build name->id map from pw-dump Nodes
# Prefer node.name; fallback to node.description
while read -r nm; do
[[ -z "$nm" ]] && continue
id="$(pw-dump | jq -r --arg n "$nm" '
.[] | select(.type=="PipeWire:Interface:Node")
| select((.info.props["node.name"] // .info.props["node.description"] // "") == $n)
| .id
' | head -n1)"
[[ -n "$id" ]] && echo "$id"
done <<< "$LINK_NODE_NAMES" \
| sort -n | uniq > /tmp/_pw_link_node_ids.$$
LINK_NODE_IDS="$(cat /tmp/_pw_link_node_ids.$$ || true)"
rm -f /tmp/_pw_link_node_ids.$$ || true
fi
fi
if [[ -z "$LINK_NODE_IDS" ]]; then
echo "No linked nodes found."
exit 0
fi
# Pull all nodes and keep only those in LINK_NODE_IDS
IDS_JSON="[$(echo "$LINK_NODE_IDS" | awk 'NR>1{printf ","}{printf "%s",$0} END{print ""}')]"
NODES_JSON="$(pw-dump | jq -c '.[] | select(.type=="PipeWire:Interface:Node")')"
NODES_JSON="$(echo "$NODES_JSON" | jq -c --argjson ids "$IDS_JSON" 'select([.id] | inside($ids))')"
# Optional filters
[[ -n "$FILTER_ID" ]] && NODES_JSON="$(echo "$NODES_JSON" | jq -c "select(.id==$FILTER_ID)")"
[[ -n "$FILTER_NAME" ]] && NODES_JSON="$(echo "$NODES_JSON" | jq -c "select((.info.props[\"node.name\"] // .info.props[\"node.description\"] // \"\") | test(\"$FILTER_NAME\"))")"
if [[ -z "$NODES_JSON" ]]; then
echo "No matching linked nodes."
exit 0
fi
echo "LINKED NODE LATENCY DETAILS:"
while IFS= read -r node; do
id=$(echo "$node" | jq -r '.id')
name=$(echo "$node" | jq -r '.info.props["node.name"] // .info.props["node.description"] // "unknown"')
appname=$(echo "$node" | jq -r '.info.props["application.name"] // ""')
media=$(echo "$node" | jq -r '.info.props["media.class"] // ""')
state=$(echo "$node" | jq -r '.info.state // ""')
format=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.format // empty')
rate=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.rate // empty')
channels=$(echo "$node" | jq -r '.info.params?.Format?[0]?.audio?.channels // empty')
echo
echo "Node #$id ${name} ${appname:+(app: $appname)}"
echo " state: $state | media.class: $media"
if [[ -n "$rate" || -n "$channels" || -n "$format" ]]; then
echo " format: ${format:-?} rate: ${rate:-?} channels: ${channels:-?}"
if [[ -n "$rate" ]]; then
q=$(pw-cli enum-params "$id" ProcessLatency 2>/dev/null | awk '/quantum/ {print $NF; exit}')
[[ -n "${q:-}" ]] && awk -v q="$q" -v r="$rate" 'BEGIN{printf " ~sched per-cycle: %d/%d = %.3f ms\n", q, r, (q*1000.0/r)}'
fi
fi
LATENCY_RAW="$(pw-cli enum-params "$id" Latency 2>/dev/null || true)"
if [[ -n "$LATENCY_RAW" ]]; then
echo " node.latency param (per-direction):"
echo "$LATENCY_RAW" | awk '
BEGIN{RS="";FS="\n"}
{
dir="";
for(i=1;i<=NF;i++){
if($i ~ /direction/){
if($i ~ /input/) dir="capture";
else if($i ~ /output/) dir="playback";
else dir="unknown";
}
if($i ~ /min-quantum/ || $i ~ /max-quantum/ || $i ~ /rate/ || $i ~ /ns/){
gsub(/^[ \t]+/,"",$i); gsub(/Prop: key /,"",$i);
printf " [%s] %s\n", dir, $i;
}
}
}'
else
echo " node.latency: (no Latency param reported)"
fi
PROC_RAW="$(pw-cli enum-params "$id" ProcessLatency 2>/dev/null || true)"
if [[ -n "$PROC_RAW" ]]; then
echo " ProcessLatency:"
echo "$PROC_RAW" | awk '/quantum|rate|ns/ { gsub(/^[ \t]+/,""); sub(/Prop: key /,""); print " " $0 }'
else
echo " ProcessLatency: (not reported)"
fi
INFO_RAW="$(pw-cli info "$id" 2>/dev/null || true)"
if echo "$INFO_RAW" | grep -q "api.alsa"; then
echo " ALSA (periods/headroom):"
echo "$INFO_RAW" | awk -F': ' '
/api\.alsa\.period-size/ {print " api.alsa.period-size: " $2}
/api\.alsa\.periods/ {print " api.alsa.periods: " $2}
/api\.alsa\.headroom/ {print " api.alsa.headroom: " $2}
/api\.alsa\.disable-batch/ {print " api.alsa.disable-batch: " $2}
/api\.alsa\.use-chmap/ {print " api.alsa.use-chmap: " $2}
' | sed 's/ (.*)//'
fi
echo "$INFO_RAW" | awk -F': ' '
/node\.force-rate/ {print " Hint: node.force-rate: " $2}
/node\.force-quantum/ {print " Hint: node.force-quantum: " $2}
/node\.latency/ {print " Hint: node.latency (prop): " $2}
/resample\.quality/ {print " Hint: resample.quality: " $2}
' | sed 's/ (.*)//' || true
done <<< "$NODES_JSON"
div
cat <<'TIP'
Notes:
- We include any node that appears in a Link (from pw-dump); this matches what pw-link -l shows.
- ~sched per-cycle = quantum/rate from ProcessLatency. Device/codec/RF/jitter buffers sit on top.
TIP
+121
View File
@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Test USB audio device latency by measuring time from generating a click
to receiving it back through the microphone (requires physical loopback).
If no physical loopback is available, this measures the baseline capture latency
by timestamping at different points in the audio pipeline.
"""
import sounddevice as sd
import numpy as np
import time
import sys
print("Testing USB Audio Device Latency")
print("=" * 60)
# List devices
print("\nAvailable devices:")
devices = sd.query_devices()
for i, dev in enumerate(devices):
print(f" {i}: {dev['name']}")
if dev['max_input_channels'] > 0:
print(f" Input: {dev['max_input_channels']} ch, latency: {dev['default_low_input_latency']*1000:.1f}ms (low) / {dev['default_high_input_latency']*1000:.1f}ms (high)")
if dev['max_output_channels'] > 0:
print(f" Output: {dev['max_output_channels']} ch, latency: {dev['default_low_output_latency']*1000:.1f}ms (low) / {dev['default_high_output_latency']*1000:.1f}ms (high)")
# Find USB Audio Device
usb_device_idx = None
for i, dev in enumerate(devices):
if 'USB Audio' in dev['name'] and dev['max_input_channels'] > 0:
usb_device_idx = i
break
if usb_device_idx is None:
print("\nError: USB Audio Device not found!")
sys.exit(1)
device_info = devices[usb_device_idx]
print(f"\nTesting device #{usb_device_idx}: {device_info['name']}")
print(f" Default low latency: {device_info['default_low_input_latency']*1000:.1f}ms")
print(f" Default high latency: {device_info['default_high_input_latency']*1000:.1f}ms")
print(f" Default sample rate: {device_info['default_samplerate']} Hz")
# Test capture latency by measuring callback timing
print("\n" + "="*60)
print("Test: Measuring callback timing (10 second capture)")
print("="*60)
capture_times = []
callback_count = 0
def callback(indata, frames, time_info, status):
global callback_count
if status:
print(f"Status: {status}")
# Record timing info
capture_times.append({
'callback_num': callback_count,
'frames': frames,
'input_buffer_adc_time': time_info.inputBufferAdcTime,
'current_time': time_info.currentTime,
'callback_latency': (time_info.currentTime - time_info.inputBufferAdcTime) * 1000, # ms
})
callback_count += 1
samplerate = 48000
blocksize = int(0.002 * samplerate) # 2ms blocks (matches ASRC)
print(f"\nConfig: {samplerate}Hz, blocksize={blocksize} ({blocksize/samplerate*1000:.1f}ms)")
print("\nStarting capture...")
try:
with sd.InputStream(
device=usb_device_idx,
channels=1,
samplerate=samplerate,
blocksize=blocksize,
latency='low',
callback=callback
):
time.sleep(10)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
print(f"\nCaptured {len(capture_times)} callbacks")
if capture_times:
latencies = [t['callback_latency'] for t in capture_times]
print(f"\nCallback Latency Statistics:")
print(f" Mean: {np.mean(latencies):.2f} ms")
print(f" Median: {np.median(latencies):.2f} ms")
print(f" Min: {np.min(latencies):.2f} ms")
print(f" Max: {np.max(latencies):.2f} ms")
print(f" StdDev: {np.std(latencies):.2f} ms")
print(f"\nFirst 5 callbacks:")
for t in capture_times[:5]:
print(f" #{t['callback_num']}: {t['frames']} frames, latency={t['callback_latency']:.2f}ms")
print(f"\nLast 5 callbacks:")
for t in capture_times[-5:]:
print(f" #{t['callback_num']}: {t['frames']} frames, latency={t['callback_latency']:.2f}ms")
print("\n" + "="*60)
print("INTERPRETATION:")
print("="*60)
print("The 'callback_latency' shows the time between when audio was")
print("captured by the USB device ADC (inputBufferAdcTime) and when")
print("the callback is invoked (currentTime).")
print("")
print("This includes:")
print(" - USB device internal buffering")
print(" - USB bus latency")
print(" - PipeWire/ALSA buffering")
print(" - Driver/kernel scheduling latency")
print("")
print("If this latency is ~50-70ms, it explains your missing gap!")
print("="*60)