refactor: improve error handling and logging in stream loop

- Wrapped main stream processing logic in try-except block to catch and log exceptions per BIG
- Added debug logging for frames generator creation and PCM frame retrieval
- Reduced code indentation and removed unnecessary comments for better readability
This commit is contained in:
2025-11-04 14:01:53 +01:00
parent 88ca6f5976
commit b4a9b4fc8f
+97 -113
View File
@@ -713,126 +713,110 @@ class Streamer():
f"Stream loop enter: input={type(big['audio_input']).__name__} lc3_frame_samples={big.get('lc3_frame_samples')} bytes_per_frame={big.get('lc3_bytes_per_frame')}"
)
big['logged_loop_enter'] = True
if big['precoded']:# everything was already lc3 coded beforehand
lc3_frame = bytes(
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
)
try:
if big['precoded']:
# everything was already lc3 coded beforehand
lc3_frame = bytes(itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame']))
if lc3_frame == b'':
# Not all streams may stop at the same time
stream_finished[i] = True
continue
else:
# code lc3 on the fly
# Use stored frames generator when available so we can aclose() it on stop
frames_gen = big.get('frames_gen')
if frames_gen is None:
logging.info("Creating frames generator for input")
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Read the frame we need for encoding
logging.info("Awaiting next PCM frame from frames_gen…")
pcm_frame = await anext(frames_gen, None)
if lc3_frame == b'': # Not all streams may stop at the same time
stream_finished[i] = True
continue
else: # code lc3 on the fly
# Use stored frames generator when available so we can aclose() it on stop
frames_gen = big.get('frames_gen')
if frames_gen is None:
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Read the frame we need for encoding
pcm_frame = await anext(frames_gen, None)
if big.get('logged_first_frame') is not True:
logging.info(
f"First PCM frame bytes={0 if pcm_frame is None else len(pcm_frame)} | lc3_frame_samples={big['lc3_frame_samples']} | bytes_per_frame={big['lc3_bytes_per_frame']}"
)
big['logged_first_frame'] = True
if big.get('logged_first_frame') is not True:
logging.info(
f"First PCM frame bytes={0 if pcm_frame is None else len(pcm_frame)} | lc3_frame_samples={big['lc3_frame_samples']} | bytes_per_frame={big['lc3_bytes_per_frame']}"
)
big['logged_first_frame'] = True
if pcm_frame is None:
# Not all streams may stop at the same time
stream_finished[i] = True
continue
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Discard excess samples in buffer if above threshold (clock drift compensation)
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
sd_buffer_samples = big['audio_input']._stream.read_available
# Guard: only allow discard if enough frames have passed since last discard
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
# Always drop a static amount (3ms) for predictable behavior
# This matches the crossfade duration better for smoother transitions
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
try:
discarded_data = await anext(big['audio_input'].frames(samples_to_drop))
samples_discarded_total += samples_to_drop
discard_events += 1
# Log every discard event with timing information
sample_rate = big['audio_input']._pcm_format.sample_rate
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
logging.info(
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
f"frame={frame_count}"
)
# Reset guard counter
frames_since_last_discard = 0
# Store how much we dropped for potential adaptive crossfade
big['last_drop_samples'] = samples_to_drop
# Set flag to apply crossfade on next frame
big['apply_crossfade'] = True
except Exception as e:
logging.error(f"Failed to discard samples: {e}")
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
if pcm_frame.ndim > 1:
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
pcm_frame = mono
else:
# Convert raw bytes to numpy, average channels, convert back
# Discard excess samples in buffer if above threshold (clock drift compensation)
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
sd_buffer_samples = big['audio_input']._stream.read_available
# Guard: only allow discard if enough frames have passed since last discard
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
# Always drop a static amount for predictable behavior
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
try:
await anext(big['audio_input'].frames(samples_to_drop))
samples_discarded_total += samples_to_drop
discard_events += 1
sample_rate = big['audio_input']._pcm_format.sample_rate
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
logging.info(
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
f"frame={frame_count}"
)
frames_since_last_discard = 0
big['last_drop_samples'] = samples_to_drop
big['apply_crossfade'] = True
except Exception as e:
logging.error(f"Failed to discard samples: {e}")
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
if pcm_frame.ndim > 1:
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
pcm_frame = mono
else:
# Convert raw bytes to numpy, average channels, convert back
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
# Apply crossfade if samples were just dropped (drift compensation)
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
# Apply crossfade if samples were just dropped (drift compensation)
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
# Crossfade duration: 10ms for smoother transition (was 5ms)
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
sample_rate = big['audio_input']._pcm_format.sample_rate
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
# Convert frames to numpy arrays (make writable copies)
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
# Create equal-power crossfade curves (smoother than linear)
# Equal-power maintains perceived loudness during transition
t = np.linspace(0, 1, crossfade_samples)
fade_out = np.cos(t * np.pi / 2) # Cosine fade out
fade_in = np.sin(t * np.pi / 2) # Sine fade in
# Apply crossfade to the beginning of current frame with end of previous frame
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
crossfaded = (
prev_samples[-crossfade_samples:] * fade_out +
curr_samples[:crossfade_samples] * fade_in
).astype(dtype)
# Replace beginning of current frame with crossfaded section
curr_samples[:crossfade_samples] = crossfaded
pcm_frame = curr_samples.tobytes()
big['apply_crossfade'] = False
# Store current frame for potential next crossfade
if enable_drift_compensation:
big['prev_pcm_frame'] = pcm_frame
sample_rate = big['audio_input']._pcm_format.sample_rate
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
t = np.linspace(0, 1, crossfade_samples)
fade_out = np.cos(t * np.pi / 2)
fade_in = np.sin(t * np.pi / 2)
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
crossfaded = (prev_samples[-crossfade_samples:] * fade_out + curr_samples[:crossfade_samples] * fade_in).astype(dtype)
curr_samples[:crossfade_samples] = crossfaded
pcm_frame = curr_samples.tobytes()
big['apply_crossfade'] = False
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
if enable_drift_compensation:
big['prev_pcm_frame'] = pcm_frame
if big.get('logged_first_lc3') is not True:
try:
logging.info(f"First LC3 frame size={len(lc3_frame)} bytes")
except Exception:
pass
big['logged_first_lc3'] = True
lc3_frame = big['encoder'].encode(pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'])
await big['iso_queue'].write(lc3_frame)
if big.get('logged_first_lc3') is not True:
try:
logging.info(f"First LC3 frame size={len(lc3_frame)} bytes")
except Exception:
pass
big['logged_first_lc3'] = True
await big['iso_queue'].write(lc3_frame)
except asyncio.CancelledError:
raise
except Exception as e:
if not big.get('logged_exception'):
logging.error(f"Exception in stream loop for BIG {i}: {e}", exc_info=True)
big['logged_exception'] = True
frame_count += 1
# Increment guard counter (tracks frames since last discard)
frames_since_last_discard += 1