cleaned up - implementing adaptive resampling - work in progress
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -47,3 +47,4 @@ src/auracast/server/credentials.json
|
||||
pcm1862-i2s.dtbo
|
||||
ch1.wav
|
||||
ch2.wav
|
||||
src/auracast/available_samples.txt
|
||||
|
||||
@@ -234,6 +234,15 @@ i2cset -f -y 1 0x4a 0x07 0x10 # Right = VIN2P/M [DIFF]
|
||||
arecord -f cd -c 1 -D record_left left.wav -r48000
|
||||
arecord -f cd -c 1 -D record_right right.wav -r48000
|
||||
|
||||
# Run with realtime priority
|
||||
- for the feedback loop to work right realtime priority is absolutely nececcarry.
|
||||
chrt -f 99 python src/auracast/multicast.py
|
||||
- give the user realtime priority:
|
||||
sudo tee /etc/security/limits.d/99-realtime.conf >/dev/null <<'EOF'
|
||||
caster - rtprio 99
|
||||
caster - memlock unlimited
|
||||
EOF
|
||||
|
||||
# Known issues:
|
||||
- When running on a laptop there might be issues switching between usb and browser audio input since they use the same audio device
|
||||
|
||||
|
||||
@@ -27,6 +27,10 @@ from typing import cast, Any, AsyncGenerator, Coroutine, List
|
||||
import itertools
|
||||
import glob
|
||||
import time
|
||||
import threading
|
||||
|
||||
import numpy as np # for audio down-mix
|
||||
import os
|
||||
|
||||
import lc3 # type: ignore # pylint: disable=E0401
|
||||
|
||||
@@ -42,7 +46,6 @@ from bumble.profiles import bass
|
||||
import bumble.device
|
||||
import bumble.transport
|
||||
import bumble.utils
|
||||
import numpy as np # for audio down-mix
|
||||
from bumble.device import Host, AdvertisingChannelMap
|
||||
from bumble.audio import io as audio_io
|
||||
|
||||
@@ -54,61 +57,117 @@ from auracast.utils.webrtc_audio_input import WebRTCAudioInput
|
||||
|
||||
# Patch sounddevice.InputStream globally to use low-latency settings
|
||||
import sounddevice as sd
|
||||
from collections import deque
|
||||
import statistics
|
||||
|
||||
|
||||
def popleft_n(d: deque, n: int) -> bytes:
|
||||
# pops up to n items (handles short deques)
|
||||
# if the deque was too short, fill with 0s and print a warning
|
||||
it = (d.popleft() for _ in range(min(n, len(d))))
|
||||
if len(d) < n:
|
||||
logging.warning("SoundDeviceAudioInput: deque was too short, requested %d, got %d", n, len(d))
|
||||
return b"".join(it)
|
||||
|
||||
class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
|
||||
"""Patched SoundDeviceAudioInput that creates RawInputStream with low-latency parameters."""
|
||||
"""Patched SoundDeviceAudioInput with low-latency capture and adaptive resampling."""
|
||||
|
||||
def _open(self):
|
||||
"""Patched _open method that creates RawInputStream with low-latency parameters."""
|
||||
try:
|
||||
dev_info = sd.query_devices(self._device)
|
||||
hostapis = sd.query_hostapis()
|
||||
api_index = dev_info.get('hostapi')
|
||||
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
|
||||
pa_ver = sd.get_portaudio_version()
|
||||
|
||||
logging.info(
|
||||
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
|
||||
api_name,
|
||||
dev_info.get('name'),
|
||||
self._device,
|
||||
dev_info.get('max_input_channels'),
|
||||
float(dev_info.get('default_low_input_latency') or 0.0),
|
||||
float(dev_info.get('default_high_input_latency') or 0.0),
|
||||
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warning("Failed to query sounddevice backend/device info: %s", e)
|
||||
"""Create RawInputStream with low-latency parameters and initialize ring buffer."""
|
||||
dev_info = sd.query_devices(self._device)
|
||||
hostapis = sd.query_hostapis()
|
||||
api_index = dev_info.get('hostapi')
|
||||
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
|
||||
pa_ver = sd.get_portaudio_version()
|
||||
|
||||
logging.info(
|
||||
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
|
||||
api_name,
|
||||
dev_info.get('name'),
|
||||
self._device,
|
||||
dev_info.get('max_input_channels'),
|
||||
float(dev_info.get('default_low_input_latency') or 0.0),
|
||||
float(dev_info.get('default_high_input_latency') or 0.0),
|
||||
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
|
||||
)
|
||||
# Create RawInputStream with injected low-latency parameters
|
||||
# Match dsnoop period (5 ms @ 48 kHz -> 240 frames). For other rates, keep 5 ms.
|
||||
# Target ~2 ms blocksize (48 kHz -> 96 frames). For other rates, keep ~2 ms.
|
||||
_sr = int(self._pcm_format.sample_rate)
|
||||
_block = max(24, _sr // 200)
|
||||
_extra = None
|
||||
try:
|
||||
_extra = sd.AlsaSettings(period_size=_block, periods=2)
|
||||
except Exception:
|
||||
_extra = None
|
||||
|
||||
self.log_counter0=0
|
||||
self._runavg_samps = deque(maxlen=30)
|
||||
self._runavg = 0
|
||||
self.max_avail=0
|
||||
self.logfile_name="available_samples.txt"
|
||||
self.blocksize = 480
|
||||
self._frames_offset = 0
|
||||
|
||||
if os.path.exists(self.logfile_name):
|
||||
os.remove(self.logfile_name)
|
||||
|
||||
self._stream = sd.RawInputStream(
|
||||
samplerate=self._pcm_format.sample_rate,
|
||||
device=self._device,
|
||||
channels=self._pcm_format.channels,
|
||||
dtype='int16',
|
||||
blocksize=_block,
|
||||
blocksize=self.blocksize,
|
||||
latency=0.005,
|
||||
extra_settings=_extra,
|
||||
)
|
||||
self._stream.start()
|
||||
|
||||
logging.info(f"SoundDeviceAudioInput: Opened with blocksize={_block}, latency=0.010 (~10ms target)")
|
||||
|
||||
|
||||
return audio_io.PcmFormat(
|
||||
audio_io.PcmFormat.Endianness.LITTLE,
|
||||
audio_io.PcmFormat.SampleType.INT16,
|
||||
self._pcm_format.sample_rate,
|
||||
2,
|
||||
1,
|
||||
)
|
||||
|
||||
def _read(self, frame_size: int) -> bytes:
|
||||
"""Generate frames while minimally adjusting frames_offset to keep _runavg ~ blocksize/2."""
|
||||
# Persist frames_offset across calls; clamp within 1% of blocksize
|
||||
|
||||
max_offset = 30
|
||||
target = 0.5*self.blocksize
|
||||
deadband = 2
|
||||
|
||||
pcm_buffer, overflowed = self._stream.read(frame_size + self._frames_offset)
|
||||
if overflowed:
|
||||
logging.warning("SoundDeviceAudioInput: overflowed")
|
||||
|
||||
available = self._stream.read_available
|
||||
if available > 0:
|
||||
self.max_avail = max(self.max_avail, available)
|
||||
|
||||
self._runavg_samps.append(available)
|
||||
self._runavg = max(self._runavg_samps) # statistics.median(self._runavg_samps)
|
||||
|
||||
# Minimal feedback: nudge frames_offset by 1 to target blocksize/2 with small deadband
|
||||
|
||||
increment = self._runavg > (target + deadband) or overflowed
|
||||
decrement = self._runavg < (target - deadband)
|
||||
clipped_upper = self._frames_offset >= max_offset
|
||||
clipped_lower = self._frames_offset <= -1 * max_offset
|
||||
|
||||
self._frames_offset = 0
|
||||
if increment and not clipped_upper:
|
||||
self._frames_offset += 1
|
||||
elif decrement and not clipped_lower:
|
||||
self._frames_offset -= 1
|
||||
|
||||
#Diagnostics
|
||||
with open(self.logfile_name, "a", encoding="utf-8") as f:
|
||||
f.write(f"{available}, {round(self._runavg, 2)}, {self._frames_offset}, {overflowed}\n")
|
||||
|
||||
if self.log_counter0 % 500 == 0:
|
||||
logging.info(
|
||||
"read available=%d, max=%d, runavg=%.3f, frames_offset=%d (max %d)",
|
||||
available, self.max_avail, round(self._runavg, 2), self._frames_offset, max_offset
|
||||
)
|
||||
self.max_avail = 0
|
||||
|
||||
self.log_counter0 += 1
|
||||
return bytes(pcm_buffer)
|
||||
|
||||
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
|
||||
|
||||
# modified from bumble
|
||||
@@ -596,58 +655,7 @@ class Streamer():
|
||||
if hasattr(audio_input, "rewind"):
|
||||
audio_input.rewind = big_config[i].loop
|
||||
|
||||
# Retry logic – ALSA sometimes keeps the device busy for a short time after the
|
||||
# previous stream has closed. Handle PortAudioError -9985 with back-off retries.
|
||||
import sounddevice as _sd
|
||||
max_attempts = 3
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
pcm_format = await audio_input.open()
|
||||
break # success
|
||||
except _sd.PortAudioError as err:
|
||||
# -9985 == paDeviceUnavailable
|
||||
logging.error('Could not open audio device %s with error %s', audio_source, err)
|
||||
code = None
|
||||
if hasattr(err, 'errno'):
|
||||
code = err.errno
|
||||
elif len(err.args) > 1 and isinstance(err.args[1], int):
|
||||
code = err.args[1]
|
||||
if code == -9985 and attempt < max_attempts:
|
||||
backoff_ms = 200 * attempt
|
||||
logging.warning("PortAudio device busy (attempt %d/%d). Retrying in %.1f ms…", attempt, max_attempts, backoff_ms)
|
||||
# ensure device handle and PortAudio context are closed before retrying
|
||||
try:
|
||||
if hasattr(audio_input, "aclose"):
|
||||
await audio_input.aclose()
|
||||
elif hasattr(audio_input, "close"):
|
||||
audio_input.close()
|
||||
except Exception:
|
||||
pass
|
||||
# Fully terminate PortAudio to drop lingering handles (sounddevice quirk)
|
||||
if hasattr(_sd, "_terminate"):
|
||||
try:
|
||||
_sd._terminate()
|
||||
except Exception:
|
||||
pass
|
||||
# Small pause then re-initialize PortAudio
|
||||
await asyncio.sleep(0.1)
|
||||
if hasattr(_sd, "_initialize"):
|
||||
try:
|
||||
_sd._initialize()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Back-off before next attempt
|
||||
await asyncio.sleep(backoff_ms / 1000)
|
||||
# Recreate audio_input fresh for next attempt
|
||||
audio_input = await audio_io.create_audio_input(audio_source, input_format)
|
||||
continue
|
||||
# Other errors or final attempt – re-raise so caller can abort gracefully
|
||||
raise
|
||||
else:
|
||||
# Loop exhausted without break
|
||||
logging.error("Unable to open audio device after %d attempts – giving up", max_attempts)
|
||||
return
|
||||
pcm_format = await audio_input.open()
|
||||
|
||||
if pcm_format.channels != 1:
|
||||
logging.info("Input device provides %d channels – will down-mix to mono for LC3", pcm_format.channels)
|
||||
@@ -679,251 +687,96 @@ class Streamer():
|
||||
bigs = self.bigs
|
||||
self.is_streaming = True
|
||||
|
||||
# frame drop algo parameters
|
||||
# In demo/precoded modes there may be no audio_input or no _pcm_format yet
|
||||
ai = big.get('audio_input')
|
||||
if ai is not None and hasattr(ai, '_pcm_format') and getattr(ai, '_pcm_format') is not None:
|
||||
sample_rate = ai._pcm_format.sample_rate
|
||||
else:
|
||||
sample_rate = global_config.auracast_sampling_rate_hz
|
||||
samples_discarded_total = 0 # Total samples discarded
|
||||
discard_events = 0 # Number of times we discarded samples
|
||||
frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop)
|
||||
enable_drift_compensation = getattr(global_config, 'enable_adaptive_frame_dropping', False)
|
||||
# Hardcoded parameters (unit: milliseconds)
|
||||
drift_threshold_ms = 2.0 if enable_drift_compensation else 0.0
|
||||
static_drop_ms = 2 if enable_drift_compensation else 0.0
|
||||
# Guard interval measured in LC3 frames (10 ms each)
|
||||
discard_guard_frames = 10 if enable_drift_compensation else 0
|
||||
# Derived sample counts
|
||||
drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0)
|
||||
static_drop_samples = int(sample_rate * static_drop_ms / 1000.0)
|
||||
|
||||
if enable_drift_compensation:
|
||||
logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms, guard={discard_guard_frames} frames")
|
||||
else:
|
||||
logging.info("Clock drift compensation DISABLED")
|
||||
|
||||
# Periodic monitoring
|
||||
last_stats_log = time.perf_counter()
|
||||
stats_interval = 5.0 # Log stats every 5 seconds
|
||||
frame_count = 0
|
||||
|
||||
|
||||
# One streamer fits all
|
||||
while self.is_streaming:
|
||||
stream_finished = [False for _ in range(len(bigs))]
|
||||
for i, big in enumerate(bigs.values()):
|
||||
if big['precoded']:# everything was already lc3 coded beforehand
|
||||
if big['precoded']: # everything was already lc3 coded beforehand
|
||||
lc3_frame = bytes(
|
||||
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
|
||||
)
|
||||
)
|
||||
|
||||
if lc3_frame == b'': # Not all streams may stop at the same time
|
||||
if lc3_frame == b'': # Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
else: # code lc3 on the fly
|
||||
# Use stored frames generator when available so we can aclose() it on stop
|
||||
else: # code lc3 on the fly with perf counters
|
||||
# Ensure frames generator exists (so we can aclose() on stop)
|
||||
frames_gen = big.get('frames_gen')
|
||||
if frames_gen is None:
|
||||
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
|
||||
big['frames_gen'] = frames_gen
|
||||
|
||||
# Read the frame we need for encoding
|
||||
pcm_frame = await anext(frames_gen, None)
|
||||
|
||||
if pcm_frame is None: # Not all streams may stop at the same time
|
||||
big['frames_gen'] = frames_gen
|
||||
|
||||
# Initialize perf tracking bucket per BIG
|
||||
perf = big.setdefault('_perf', {
|
||||
'n': 0,
|
||||
'samples_sum': 0.0, 'samples_max': 0.0,
|
||||
'enc_sum': 0.0, 'enc_max': 0.0,
|
||||
'write_sum': 0.0, 'write_max': 0.0,
|
||||
'loop_sum': 0.0, 'loop_max': 0.0,
|
||||
})
|
||||
|
||||
# Total loop duration timer (sample + encode + write)
|
||||
t_loop0 = time.perf_counter()
|
||||
|
||||
# Measure time to get a sample from the buffer
|
||||
t0 = time.perf_counter()
|
||||
pcm_frame = await anext(frames_gen, None)
|
||||
dt_sample = time.perf_counter() - t0
|
||||
|
||||
if pcm_frame is None: # Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
|
||||
# Discard excess samples in buffer if above threshold (clock drift compensation)
|
||||
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
|
||||
sd_buffer_samples = big['audio_input']._stream.read_available
|
||||
|
||||
# Guard: only allow discard if enough frames have passed since last discard
|
||||
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
|
||||
# Always drop a static amount (3ms) for predictable behavior
|
||||
# This matches the crossfade duration better for smoother transitions
|
||||
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
|
||||
try:
|
||||
discarded_data = await anext(big['audio_input'].frames(samples_to_drop))
|
||||
samples_discarded_total += samples_to_drop
|
||||
discard_events += 1
|
||||
|
||||
# Log every discard event with timing information
|
||||
sample_rate = big['audio_input']._pcm_format.sample_rate
|
||||
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
|
||||
logging.info(
|
||||
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
|
||||
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
|
||||
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
|
||||
f"frame={frame_count}"
|
||||
)
|
||||
|
||||
# Reset guard counter
|
||||
frames_since_last_discard = 0
|
||||
# Store how much we dropped for potential adaptive crossfade
|
||||
big['last_drop_samples'] = samples_to_drop
|
||||
# Set flag to apply crossfade on next frame
|
||||
big['apply_crossfade'] = True
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to discard samples: {e}")
|
||||
|
||||
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
|
||||
if big.get('channels', 1) > 1:
|
||||
if isinstance(pcm_frame, np.ndarray):
|
||||
if pcm_frame.ndim > 1:
|
||||
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
|
||||
pcm_frame = mono
|
||||
else:
|
||||
# Convert raw bytes to numpy, average channels, convert back
|
||||
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
|
||||
samples = np.frombuffer(pcm_frame, dtype=dtype)
|
||||
samples = samples.reshape(-1, big['channels']).mean(axis=1)
|
||||
pcm_frame = samples.astype(dtype).tobytes()
|
||||
|
||||
# Apply crossfade if samples were just dropped (drift compensation)
|
||||
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
|
||||
# Crossfade duration: 10ms for smoother transition (was 5ms)
|
||||
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
|
||||
sample_rate = big['audio_input']._pcm_format.sample_rate
|
||||
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
|
||||
|
||||
# Convert frames to numpy arrays (make writable copies)
|
||||
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
|
||||
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
|
||||
|
||||
# Create equal-power crossfade curves (smoother than linear)
|
||||
# Equal-power maintains perceived loudness during transition
|
||||
t = np.linspace(0, 1, crossfade_samples)
|
||||
fade_out = np.cos(t * np.pi / 2) # Cosine fade out
|
||||
fade_in = np.sin(t * np.pi / 2) # Sine fade in
|
||||
|
||||
# Apply crossfade to the beginning of current frame with end of previous frame
|
||||
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
|
||||
crossfaded = (
|
||||
prev_samples[-crossfade_samples:] * fade_out +
|
||||
curr_samples[:crossfade_samples] * fade_in
|
||||
).astype(dtype)
|
||||
# Replace beginning of current frame with crossfaded section
|
||||
curr_samples[:crossfade_samples] = crossfaded
|
||||
pcm_frame = curr_samples.tobytes()
|
||||
|
||||
big['apply_crossfade'] = False
|
||||
|
||||
# Store current frame for potential next crossfade
|
||||
if enable_drift_compensation:
|
||||
big['prev_pcm_frame'] = pcm_frame
|
||||
|
||||
# Measure LC3 encoding time
|
||||
t1 = time.perf_counter()
|
||||
lc3_frame = big['encoder'].encode(
|
||||
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
|
||||
)
|
||||
dt_enc = time.perf_counter() - t1
|
||||
|
||||
await big['iso_queue'].write(lc3_frame)
|
||||
frame_count += 1
|
||||
# Increment guard counter (tracks frames since last discard)
|
||||
frames_since_last_discard += 1
|
||||
|
||||
# Periodic stats logging (only for device/sounddevice streams, not WAV files)
|
||||
# WAV file concurrent access causes deadlock in ThreadedAudioInput
|
||||
now = time.perf_counter()
|
||||
is_device_stream = hasattr(big['audio_input'], '_stream') and big['audio_input']._stream is not None
|
||||
if is_device_stream and now - last_stats_log >= stats_interval:
|
||||
# Get current buffer status from PortAudio
|
||||
current_sd_buffer = 0
|
||||
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
|
||||
try:
|
||||
current_sd_buffer = big['audio_input']._stream.read_available
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Get stream latency and CPU load from sounddevice
|
||||
stream_latency_ms = None
|
||||
cpu_load_pct = None
|
||||
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
|
||||
try:
|
||||
latency = big['audio_input']._stream.latency
|
||||
if frame_count == 501: # Debug log once
|
||||
logging.info(f"DEBUG: stream.latency raw value = {latency}, type = {type(latency)}")
|
||||
# latency can be either a float (for input-only streams) or tuple (input, output)
|
||||
if latency is not None:
|
||||
if isinstance(latency, (int, float)):
|
||||
# Single value for input-only stream
|
||||
stream_latency_ms = float(latency) * 1000.0
|
||||
elif isinstance(latency, (tuple, list)) and len(latency) >= 1:
|
||||
# Tuple (input_latency, output_latency)
|
||||
stream_latency_ms = latency[0] * 1000.0
|
||||
except Exception as e:
|
||||
if frame_count == 501: # Log once at startup
|
||||
logging.warning(f"Could not get stream.latency: {e}")
|
||||
|
||||
try:
|
||||
cpu_load = big['audio_input']._stream.cpu_load
|
||||
if frame_count == 501: # Debug log once
|
||||
logging.info(f"DEBUG: stream.cpu_load raw value = {cpu_load}")
|
||||
# cpu_load is a fraction (0.0 to 1.0)
|
||||
if cpu_load is not None and cpu_load >= 0:
|
||||
cpu_load_pct = cpu_load * 100.0 # Convert to percentage
|
||||
except Exception as e:
|
||||
if frame_count == 501: # Log once at startup
|
||||
logging.warning(f"Could not get stream.cpu_load: {e}")
|
||||
|
||||
# Get backend-specific buffer status
|
||||
backend_delay = None
|
||||
backend_label = "Backend"
|
||||
|
||||
# Determine which backend we're using based on audio_input device
|
||||
try:
|
||||
device_info = big['audio_input']._device if hasattr(big['audio_input'], '_device') else None
|
||||
if device_info is not None and isinstance(device_info, int):
|
||||
hostapi = sd.query_hostapis(sd.query_devices(device_info)['hostapi'])
|
||||
backend_name = hostapi['name']
|
||||
else:
|
||||
backend_name = "Unknown"
|
||||
except Exception:
|
||||
backend_name = "Unknown"
|
||||
|
||||
if 'pulse' in backend_name.lower():
|
||||
# PipeWire/PulseAudio backend - no direct buffer access
|
||||
# SD_buffer is the only reliable metric
|
||||
backend_label = "PipeWire"
|
||||
backend_delay = None # Cannot read PipeWire internal buffers directly
|
||||
else:
|
||||
# ALSA backend - can read kernel buffer
|
||||
backend_label = "ALSA_kernel"
|
||||
try:
|
||||
with open('/proc/asound/card0/pcm0c/sub0/status', 'r') as f:
|
||||
for line in f:
|
||||
if 'delay' in line and ':' in line:
|
||||
backend_delay = int(line.split(':')[1].strip())
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if enable_drift_compensation:
|
||||
avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0
|
||||
discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0
|
||||
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
|
||||
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
|
||||
# Measure write blocking time
|
||||
t2 = time.perf_counter()
|
||||
await big['iso_queue'].write(lc3_frame)
|
||||
dt_write = time.perf_counter() - t2
|
||||
|
||||
# Total loop duration
|
||||
dt_loop = time.perf_counter() - t_loop0
|
||||
|
||||
# Update stats
|
||||
perf['n'] += 1
|
||||
perf['samples_sum'] += dt_sample
|
||||
perf['enc_sum'] += dt_enc
|
||||
perf['write_sum'] += dt_write
|
||||
perf['loop_sum'] += dt_loop
|
||||
perf['samples_max'] = max(perf['samples_max'], dt_sample)
|
||||
perf['enc_max'] = max(perf['enc_max'], dt_enc)
|
||||
perf['write_max'] = max(perf['write_max'], dt_write)
|
||||
perf['loop_max'] = max(perf['loop_max'], dt_loop)
|
||||
|
||||
frame_count += 1
|
||||
|
||||
# Log every 500 frames for this BIG and reset accumulators
|
||||
if perf['n'] >= 500:
|
||||
n = perf['n']
|
||||
logging.info(
|
||||
f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | "
|
||||
f"avg_discard={avg_discard_per_event:.0f} samples/event | "
|
||||
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
|
||||
f"{latency_str} | {cpu_str} | "
|
||||
f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)"
|
||||
"Perf(i=%d, last %d): sample mean=%.6fms max=%.6fms | encode mean=%.6fms max=%.6fms | write mean=%.6fms max=%.6fms | loop mean=%.6fms max=%.6fms",
|
||||
i,
|
||||
n,
|
||||
(perf['samples_sum'] / n) * 1e3, perf['samples_max'] * 1e3,
|
||||
(perf['enc_sum'] / n) * 1e3, perf['enc_max'] * 1e3,
|
||||
(perf['write_sum'] / n) * 1e3, perf['write_max'] * 1e3,
|
||||
(perf['loop_sum'] / n) * 1e3, perf['loop_max'] * 1e3,
|
||||
)
|
||||
else:
|
||||
backend_str = f"{backend_label}={backend_delay} samples ({backend_delay / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" if backend_delay is not None else f"{backend_label}=N/A (use pw-top)"
|
||||
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
|
||||
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
|
||||
logging.info(
|
||||
f"STATS: frames={frame_count} | "
|
||||
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
|
||||
f"{latency_str} | {cpu_str} | "
|
||||
f"{backend_str} | "
|
||||
f"drift_compensation=DISABLED"
|
||||
)
|
||||
last_stats_log = now
|
||||
perf.update({
|
||||
'n': 0,
|
||||
'samples_sum': 0.0, 'samples_max': 0.0,
|
||||
'enc_sum': 0.0, 'enc_max': 0.0,
|
||||
'write_sum': 0.0, 'write_max': 0.0,
|
||||
'loop_sum': 0.0, 'loop_max': 0.0,
|
||||
})
|
||||
|
||||
if all(stream_finished): # Take into account that multiple files have different lengths
|
||||
logging.info('All streams finished, stopping streamer')
|
||||
@@ -976,103 +829,25 @@ if __name__ == "__main__":
|
||||
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
|
||||
)
|
||||
os.chdir(os.path.dirname(__file__))
|
||||
|
||||
# =============================================================================
|
||||
# AUDIO BACKEND CONFIGURATION - Toggle between ALSA and PipeWire
|
||||
# =============================================================================
|
||||
# Uncomment ONE of the following backend configurations:
|
||||
|
||||
# Option 1: Direct ALSA (Direct hardware access, bypasses PipeWire)
|
||||
AUDIO_BACKEND = 'ALSA'
|
||||
target_latency_ms = 10.0
|
||||
|
||||
# Option 2: PipeWire via PulseAudio API (Routes through pipewire-pulse)
|
||||
#AUDIO_BACKEND = 'PipeWire'
|
||||
#target_latency_ms = 5.0 # PipeWire typically handles lower latency better
|
||||
|
||||
# =============================================================================
|
||||
|
||||
import sounddevice as sd
|
||||
import subprocess
|
||||
|
||||
# Detect if PipeWire is running (even if we're using ALSA API)
|
||||
pipewire_running = False
|
||||
try:
|
||||
result = subprocess.run(['systemctl', '--user', 'is-active', 'pipewire'],
|
||||
capture_output=True, text=True, timeout=1)
|
||||
pipewire_running = (result.returncode == 0)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if AUDIO_BACKEND == 'ALSA':
|
||||
os.environ['SDL_AUDIODRIVER'] = 'alsa'
|
||||
sd.default.latency = target_latency_ms / 1000.0
|
||||
|
||||
# Find ALSA host API
|
||||
try:
|
||||
alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
|
||||
if 'ALSA' in ha['name'])
|
||||
logging.info(f"ALSA host API available at index: {alsa_hostapi}")
|
||||
except StopIteration:
|
||||
logging.error("ALSA backend not found!")
|
||||
# Find ALSA host API
|
||||
alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
|
||||
if 'ALSA' in ha['name'])
|
||||
|
||||
elif AUDIO_BACKEND == 'PipeWire':
|
||||
os.environ['SDL_AUDIODRIVER'] = 'pulseaudio'
|
||||
sd.default.latency = target_latency_ms / 1000.0
|
||||
|
||||
if not pipewire_running:
|
||||
logging.error("PipeWire selected but not running!")
|
||||
raise RuntimeError("PipeWire is not active")
|
||||
|
||||
# Find PulseAudio host API (required for PipeWire mode)
|
||||
try:
|
||||
pulse_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
|
||||
if 'pulse' in ha['name'].lower())
|
||||
logging.info(f"Using PulseAudio host API at index: {pulse_hostapi} → routes to PipeWire")
|
||||
except StopIteration:
|
||||
logging.error("PulseAudio host API not found! Did you rebuild PortAudio with -DPA_USE_PULSEAUDIO=ON?")
|
||||
raise RuntimeError("PulseAudio API not available in PortAudio")
|
||||
else:
|
||||
logging.error(f"Unknown AUDIO_BACKEND: {AUDIO_BACKEND}")
|
||||
raise ValueError(f"Invalid AUDIO_BACKEND: {AUDIO_BACKEND}")
|
||||
|
||||
# Select audio input device based on backend
|
||||
search_str='ch1'
|
||||
# Use ALSA devices
|
||||
from auracast.utils.sounddevice_utils import get_alsa_usb_inputs
|
||||
devices = get_alsa_usb_inputs()
|
||||
logging.info(f"Searching ALSA devices for first device with string {search_str}...")
|
||||
|
||||
audio_dev = None
|
||||
if AUDIO_BACKEND == 'ALSA':
|
||||
search_str='ch1'
|
||||
# Use ALSA devices
|
||||
from auracast.utils.sounddevice_utils import get_alsa_usb_inputs
|
||||
devices = get_alsa_usb_inputs()
|
||||
logging.info(f"Searching ALSA devices for first device with string {search_str}...")
|
||||
|
||||
for idx, dev in devices:
|
||||
logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
|
||||
if search_str in dev['name'].lower():
|
||||
audio_dev = idx
|
||||
logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}")
|
||||
break
|
||||
|
||||
elif AUDIO_BACKEND == 'PipeWire':
|
||||
# Use PulseAudio devices (routed through PipeWire)
|
||||
logging.info("Searching PulseAudio devices for Shure MVX2U...")
|
||||
|
||||
for idx, dev in enumerate(sd.query_devices()):
|
||||
# Only consider PulseAudio input devices
|
||||
if dev['max_input_channels'] > 0:
|
||||
hostapi = sd.query_hostapis(dev['hostapi'])
|
||||
if 'pulse' in hostapi['name'].lower():
|
||||
dev_name_lower = dev['name'].lower()
|
||||
logging.info(f" PulseAudio device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
|
||||
|
||||
# Skip monitor devices (they're output monitors, not real inputs)
|
||||
if 'monitor' in dev_name_lower:
|
||||
continue
|
||||
|
||||
# Look for Shure MVX2U - prefer "Mono" device for mono input
|
||||
if 'shure' in dev_name_lower and 'mvx2u' in dev_name_lower:
|
||||
shure_device_idx = idx
|
||||
logging.info(f"✓ Selected PulseAudio device {idx}: {dev['name']} → routes to PipeWire")
|
||||
break
|
||||
for idx, dev in devices:
|
||||
logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
|
||||
if search_str in dev['name'].lower():
|
||||
audio_dev = idx
|
||||
logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}")
|
||||
break
|
||||
|
||||
|
||||
if audio_dev is None:
|
||||
logging.error(f"Audio device {audio_dev} not found in {AUDIO_BACKEND} devices!")
|
||||
@@ -1099,8 +874,6 @@ if __name__ == "__main__":
|
||||
#config.transport= 'auto'
|
||||
config.transport='serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi
|
||||
|
||||
# TODO: encrypted streams are not working
|
||||
|
||||
for big in config.bigs:
|
||||
#big.code = 'abcd'
|
||||
#big.code = '78 e5 dc f1 34 ab 42 bf c1 92 ef dd 3a fd 67 ae'
|
||||
@@ -1108,11 +881,11 @@ if __name__ == "__main__":
|
||||
#big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files
|
||||
#big.audio_source = read_lc3_file(big.audio_source) # load files in advance
|
||||
|
||||
# --- Configure Shure MVX2U USB Audio Interface (ALSA backend) ---
|
||||
# --- Configure device (ALSA backend) ---
|
||||
if audio_dev is not None:
|
||||
big.audio_source = f'device:{audio_dev}' # Shure MVX2U USB mono interface
|
||||
big.audio_source = f'device:{audio_dev}'
|
||||
big.input_format = 'int16le,48000,1' # int16, 48kHz, mono
|
||||
logging.info(f"Configured BIG '{big.name}' with Shure MVX2U (device:{audio_dev}, 48kHz mono)")
|
||||
logging.info(f"Configured BIG '{big.name}' with (device:{audio_dev}, 48kHz mono)")
|
||||
else:
|
||||
logging.warning(f"Shure device not found, BIG '{big.name}' will use default audio_source: {big.audio_source}")
|
||||
|
||||
@@ -1126,12 +899,12 @@ if __name__ == "__main__":
|
||||
# TODO: with more than three broadcasters (16kHz) no advertising (no primary channels is present anymore)
|
||||
# TODO: find the bottleneck - probably airtime
|
||||
|
||||
|
||||
config.auracast_sampling_rate_hz = 16000
|
||||
config.octets_per_frame = 40 # 32kbps@16kHz
|
||||
config.auracast_sampling_rate_hz = 24000
|
||||
config.octets_per_frame = 60 # 32kbps@16kHz
|
||||
#config.immediate_rendering = True
|
||||
#config.debug = True
|
||||
|
||||
config.enable_adaptive_frame_dropping=True
|
||||
config.enable_adaptive_frame_dropping=False
|
||||
# Enable clock drift compensation to prevent latency accumulation
|
||||
|
||||
run_async(
|
||||
|
||||
@@ -6,8 +6,8 @@ pcm.ch1 {
|
||||
channels 2
|
||||
rate 48000
|
||||
format S16_LE
|
||||
period_size 120 # 2.5ms
|
||||
buffer_size 360
|
||||
period_size 480 # 2.5ms
|
||||
buffer_size 960
|
||||
}
|
||||
bindings.0 0
|
||||
}
|
||||
@@ -21,8 +21,8 @@ pcm.ch2 {
|
||||
channels 2
|
||||
rate 48000
|
||||
format S16_LE
|
||||
period_size 120
|
||||
buffer_size 360
|
||||
period_size 480
|
||||
buffer_size 960
|
||||
}
|
||||
bindings.0 1
|
||||
}
|
||||
Reference in New Issue
Block a user