1 Commits

Author SHA1 Message Date
pstruebi
a196bcd2fc fix: disable periodic stats for WAV inputs and guard sample_rate access 2025-11-04 15:11:47 +01:00
2 changed files with 112 additions and 171 deletions

View File

@@ -132,13 +132,10 @@ class ModWaveAudioInput(audio_io.ThreadedAudioInput):
return b''
pcm_samples = self._wav.readframes(frame_size)
if self._bytes_read == 0:
logging.info(f"WaveAudioInput: first read requested frame_size={frame_size} -> got {len(pcm_samples)} bytes")
if not pcm_samples and self._bytes_read:
if not self.rewind:
return None
# Loop around.
logging.info("WaveAudioInput: EOF reached, rewinding to start")
self._wav.rewind()
self._bytes_read = 0
pcm_samples = self._wav.readframes(frame_size)
@@ -600,9 +597,6 @@ class Streamer():
for attempt in range(1, max_attempts + 1):
try:
pcm_format = await audio_input.open()
logging.info(
f"Opened audio input: {type(audio_input).__name__} src={audio_source} sr={pcm_format.sample_rate} ch={pcm_format.channels}"
)
break # success
except _sd.PortAudioError as err:
# -9985 == paDeviceUnavailable
@@ -680,7 +674,12 @@ class Streamer():
self.is_streaming = True
# frame drop algo parameters
sample_rate = big['audio_input']._pcm_format.sample_rate
# In demo/precoded modes there may be no audio_input or no _pcm_format yet
ai = big.get('audio_input')
if ai is not None and hasattr(ai, '_pcm_format') and getattr(ai, '_pcm_format') is not None:
sample_rate = ai._pcm_format.sample_rate
else:
sample_rate = global_config.auracast_sampling_rate_hz
samples_discarded_total = 0 # Total samples discarded
discard_events = 0 # Number of times we discarded samples
frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop)
@@ -703,144 +702,127 @@ class Streamer():
last_stats_log = time.perf_counter()
stats_interval = 5.0 # Log stats every 5 seconds
frame_count = 0
# Prime inputs: read one frame from each non-precoded input to verify data flow
try:
for j, _big in enumerate(bigs.values()):
if not _big.get('precoded'):
gen = _big.get('frames_gen')
if gen is None:
gen = _big['audio_input'].frames(_big['lc3_frame_samples'])
_big['frames_gen'] = gen
test_frame = await anext(gen, None)
logging.info(
f"Prime read BIG{j}: bytes={0 if test_frame is None else len(test_frame)} samples={_big['lc3_frame_samples']}"
)
# Store for crossfade if needed
if enable_drift_compensation and test_frame is not None:
_big['prev_pcm_frame'] = test_frame
except Exception as e:
logging.error(f"Prime read failed: {e}", exc_info=True)
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
for i, big in enumerate(bigs.values()):
if big.get('logged_loop_enter') is not True:
logging.info(
f"Stream loop enter: input={type(big['audio_input']).__name__} lc3_frame_samples={big.get('lc3_frame_samples')} bytes_per_frame={big.get('lc3_bytes_per_frame')}"
)
big['logged_loop_enter'] = True
try:
if big['precoded']:
# everything was already lc3 coded beforehand
lc3_frame = bytes(itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame']))
if lc3_frame == b'':
# Not all streams may stop at the same time
stream_finished[i] = True
continue
else:
# code lc3 on the fly
# Use stored frames generator when available so we can aclose() it on stop
frames_gen = big.get('frames_gen')
if frames_gen is None:
logging.info("Creating frames generator for input")
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Read the frame we need for encoding
logging.info("Awaiting next PCM frame from frames_gen…")
pcm_frame = await anext(frames_gen, None)
if big['precoded']:# everything was already lc3 coded beforehand
lc3_frame = bytes(
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
)
if big.get('logged_first_frame') is not True:
logging.info(
f"First PCM frame bytes={0 if pcm_frame is None else len(pcm_frame)} | lc3_frame_samples={big['lc3_frame_samples']} | bytes_per_frame={big['lc3_bytes_per_frame']}"
)
big['logged_first_frame'] = True
if lc3_frame == b'': # Not all streams may stop at the same time
stream_finished[i] = True
continue
else: # code lc3 on the fly
# Use stored frames generator when available so we can aclose() it on stop
frames_gen = big.get('frames_gen')
if frames_gen is None:
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Read the frame we need for encoding
pcm_frame = await anext(frames_gen, None)
if pcm_frame is None:
# Not all streams may stop at the same time
stream_finished[i] = True
continue
# Discard excess samples in buffer if above threshold (clock drift compensation)
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
sd_buffer_samples = big['audio_input']._stream.read_available
# Guard: only allow discard if enough frames have passed since last discard
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
# Always drop a static amount for predictable behavior
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
try:
await anext(big['audio_input'].frames(samples_to_drop))
samples_discarded_total += samples_to_drop
discard_events += 1
sample_rate = big['audio_input']._pcm_format.sample_rate
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
logging.info(
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
f"frame={frame_count}"
)
frames_since_last_discard = 0
big['last_drop_samples'] = samples_to_drop
big['apply_crossfade'] = True
except Exception as e:
logging.error(f"Failed to discard samples: {e}")
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
if pcm_frame.ndim > 1:
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
pcm_frame = mono
else:
# Convert raw bytes to numpy, average channels, convert back
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
# Apply crossfade if samples were just dropped (drift compensation)
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Discard excess samples in buffer if above threshold (clock drift compensation)
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
sd_buffer_samples = big['audio_input']._stream.read_available
# Guard: only allow discard if enough frames have passed since last discard
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
# Always drop a static amount (3ms) for predictable behavior
# This matches the crossfade duration better for smoother transitions
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
try:
discarded_data = await anext(big['audio_input'].frames(samples_to_drop))
samples_discarded_total += samples_to_drop
discard_events += 1
# Log every discard event with timing information
sample_rate = big['audio_input']._pcm_format.sample_rate
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
logging.info(
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
f"frame={frame_count}"
)
# Reset guard counter
frames_since_last_discard = 0
# Store how much we dropped for potential adaptive crossfade
big['last_drop_samples'] = samples_to_drop
# Set flag to apply crossfade on next frame
big['apply_crossfade'] = True
except Exception as e:
logging.error(f"Failed to discard samples: {e}")
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
if pcm_frame.ndim > 1:
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
pcm_frame = mono
else:
# Convert raw bytes to numpy, average channels, convert back
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
sample_rate = big['audio_input']._pcm_format.sample_rate
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
t = np.linspace(0, 1, crossfade_samples)
fade_out = np.cos(t * np.pi / 2)
fade_in = np.sin(t * np.pi / 2)
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
crossfaded = (prev_samples[-crossfade_samples:] * fade_out + curr_samples[:crossfade_samples] * fade_in).astype(dtype)
curr_samples[:crossfade_samples] = crossfaded
pcm_frame = curr_samples.tobytes()
big['apply_crossfade'] = False
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
# Apply crossfade if samples were just dropped (drift compensation)
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
# Crossfade duration: 10ms for smoother transition (was 5ms)
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
sample_rate = big['audio_input']._pcm_format.sample_rate
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
# Convert frames to numpy arrays (make writable copies)
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
# Create equal-power crossfade curves (smoother than linear)
# Equal-power maintains perceived loudness during transition
t = np.linspace(0, 1, crossfade_samples)
fade_out = np.cos(t * np.pi / 2) # Cosine fade out
fade_in = np.sin(t * np.pi / 2) # Sine fade in
# Apply crossfade to the beginning of current frame with end of previous frame
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
crossfaded = (
prev_samples[-crossfade_samples:] * fade_out +
curr_samples[:crossfade_samples] * fade_in
).astype(dtype)
# Replace beginning of current frame with crossfaded section
curr_samples[:crossfade_samples] = crossfaded
pcm_frame = curr_samples.tobytes()
big['apply_crossfade'] = False
# Store current frame for potential next crossfade
if enable_drift_compensation:
big['prev_pcm_frame'] = pcm_frame
if enable_drift_compensation:
big['prev_pcm_frame'] = pcm_frame
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
lc3_frame = big['encoder'].encode(pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'])
if big.get('logged_first_lc3') is not True:
try:
logging.info(f"First LC3 frame size={len(lc3_frame)} bytes")
except Exception:
pass
big['logged_first_lc3'] = True
await big['iso_queue'].write(lc3_frame)
except asyncio.CancelledError:
raise
except Exception as e:
if not big.get('logged_exception'):
logging.error(f"Exception in stream loop for BIG {i}: {e}", exc_info=True)
big['logged_exception'] = True
await big['iso_queue'].write(lc3_frame)
frame_count += 1
# Increment guard counter (tracks frames since last discard)
frames_since_last_discard += 1
# Periodic stats logging
# Periodic stats logging (only for device/sounddevice streams, not WAV files)
# WAV file concurrent access causes deadlock in ThreadedAudioInput
now = time.perf_counter()
if now - last_stats_log >= stats_interval:
is_device_stream = hasattr(big['audio_input'], '_stream') and big['audio_input']._stream is not None
if is_device_stream and now - last_stats_log >= stats_interval:
# Get current buffer status from PortAudio
current_sd_buffer = 0
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:

View File

@@ -205,27 +205,6 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
for big in conf.bigs:
big.input_format = f"int16le,{capture_rate},{channels}"
# Resolve relative file: paths to absolute paths (robust against varying cwd)
try:
server_dir = os.path.dirname(__file__)
pkg_root = os.path.dirname(os.path.dirname(__file__)) # .../auracast
for big in conf.bigs:
if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'):
rel = big.audio_source.split(':', 1)[1]
if not os.path.isabs(rel):
candidates = [
os.path.normpath(os.path.join(server_dir, rel)),
os.path.normpath(os.path.join(pkg_root, rel)),
]
resolved = next((c for c in candidates if os.path.exists(c)), None)
if resolved:
log.info("Resolved demo file path '%s' -> '%s'", rel, resolved)
big.audio_source = f"file:{resolved}"
else:
raise FileNotFoundError(rel)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Demo audio file not found or unreadable: {e}")
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
@@ -300,26 +279,6 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
big.audio_source = f'device:{device_index}'
# Resolve relative file: paths to absolute paths (robust against varying cwd)
try:
server_dir = os.path.dirname(__file__)
pkg_root = os.path.dirname(os.path.dirname(__file__)) # .../auracast
for big in conf.bigs:
if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'):
rel = big.audio_source.split(':', 1)[1]
if not os.path.isabs(rel):
candidates = [
os.path.normpath(os.path.join(server_dir, rel)),
os.path.normpath(os.path.join(pkg_root, rel)),
]
resolved = next((c for c in candidates if os.path.exists(c)), None)
if resolved:
log.info("Resolved demo file path '%s' -> '%s'", rel, resolved)
big.audio_source = f"file:{resolved}"
else:
raise FileNotFoundError(rel)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Demo audio file not found or unreadable: {e}")
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3