diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 6c8012c..5128d4c 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -713,126 +713,110 @@ class Streamer(): f"Stream loop enter: input={type(big['audio_input']).__name__} lc3_frame_samples={big.get('lc3_frame_samples')} bytes_per_frame={big.get('lc3_bytes_per_frame')}" ) big['logged_loop_enter'] = True - if big['precoded']:# everything was already lc3 coded beforehand - lc3_frame = bytes( - itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame']) - ) + try: + if big['precoded']: + # everything was already lc3 coded beforehand + lc3_frame = bytes(itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])) + if lc3_frame == b'': + # Not all streams may stop at the same time + stream_finished[i] = True + continue + else: + # code lc3 on the fly + # Use stored frames generator when available so we can aclose() it on stop + frames_gen = big.get('frames_gen') + if frames_gen is None: + logging.info("Creating frames generator for input") + frames_gen = big['audio_input'].frames(big['lc3_frame_samples']) + big['frames_gen'] = frames_gen + # Read the frame we need for encoding + logging.info("Awaiting next PCM frame from frames_gen…") + pcm_frame = await anext(frames_gen, None) - if lc3_frame == b'': # Not all streams may stop at the same time - stream_finished[i] = True - continue - else: # code lc3 on the fly - # Use stored frames generator when available so we can aclose() it on stop - frames_gen = big.get('frames_gen') - if frames_gen is None: - frames_gen = big['audio_input'].frames(big['lc3_frame_samples']) - big['frames_gen'] = frames_gen - - # Read the frame we need for encoding - pcm_frame = await anext(frames_gen, None) + if big.get('logged_first_frame') is not True: + logging.info( + f"First PCM frame bytes={0 if pcm_frame is None else len(pcm_frame)} | lc3_frame_samples={big['lc3_frame_samples']} | bytes_per_frame={big['lc3_bytes_per_frame']}" + ) + big['logged_first_frame'] = True - if big.get('logged_first_frame') is not True: - logging.info( - f"First PCM frame bytes={0 if pcm_frame is None else len(pcm_frame)} | lc3_frame_samples={big['lc3_frame_samples']} | bytes_per_frame={big['lc3_bytes_per_frame']}" - ) - big['logged_first_frame'] = True + if pcm_frame is None: + # Not all streams may stop at the same time + stream_finished[i] = True + continue - if pcm_frame is None: # Not all streams may stop at the same time - stream_finished[i] = True - continue - - # Discard excess samples in buffer if above threshold (clock drift compensation) - if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: - sd_buffer_samples = big['audio_input']._stream.read_available - - # Guard: only allow discard if enough frames have passed since last discard - if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames: - # Always drop a static amount (3ms) for predictable behavior - # This matches the crossfade duration better for smoother transitions - samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1)) - try: - discarded_data = await anext(big['audio_input'].frames(samples_to_drop)) - samples_discarded_total += samples_to_drop - discard_events += 1 - - # Log every discard event with timing information - sample_rate = big['audio_input']._pcm_format.sample_rate - time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms - logging.info( - f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | " - f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | " - f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | " - f"frame={frame_count}" - ) - - # Reset guard counter - frames_since_last_discard = 0 - # Store how much we dropped for potential adaptive crossfade - big['last_drop_samples'] = samples_to_drop - # Set flag to apply crossfade on next frame - big['apply_crossfade'] = True - - except Exception as e: - logging.error(f"Failed to discard samples: {e}") - - # Down-mix multi-channel PCM to mono for LC3 encoder if needed - if big.get('channels', 1) > 1: - if isinstance(pcm_frame, np.ndarray): - if pcm_frame.ndim > 1: - mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype) - pcm_frame = mono - else: - # Convert raw bytes to numpy, average channels, convert back + # Discard excess samples in buffer if above threshold (clock drift compensation) + if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + sd_buffer_samples = big['audio_input']._stream.read_available + # Guard: only allow discard if enough frames have passed since last discard + if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames: + # Always drop a static amount for predictable behavior + samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1)) + try: + await anext(big['audio_input'].frames(samples_to_drop)) + samples_discarded_total += samples_to_drop + discard_events += 1 + sample_rate = big['audio_input']._pcm_format.sample_rate + time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms + logging.info( + f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | " + f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | " + f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | " + f"frame={frame_count}" + ) + frames_since_last_discard = 0 + big['last_drop_samples'] = samples_to_drop + big['apply_crossfade'] = True + except Exception as e: + logging.error(f"Failed to discard samples: {e}") + + # Down-mix multi-channel PCM to mono for LC3 encoder if needed + if big.get('channels', 1) > 1: + if isinstance(pcm_frame, np.ndarray): + if pcm_frame.ndim > 1: + mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype) + pcm_frame = mono + else: + # Convert raw bytes to numpy, average channels, convert back + dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32 + samples = np.frombuffer(pcm_frame, dtype=dtype) + samples = samples.reshape(-1, big['channels']).mean(axis=1) + pcm_frame = samples.astype(dtype).tobytes() + + # Apply crossfade if samples were just dropped (drift compensation) + if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None: dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32 - samples = np.frombuffer(pcm_frame, dtype=dtype) - samples = samples.reshape(-1, big['channels']).mean(axis=1) - pcm_frame = samples.astype(dtype).tobytes() - - # Apply crossfade if samples were just dropped (drift compensation) - if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None: - # Crossfade duration: 10ms for smoother transition (was 5ms) - dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32 - sample_rate = big['audio_input']._pcm_format.sample_rate - crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2) - - # Convert frames to numpy arrays (make writable copies) - prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy() - curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy() - - # Create equal-power crossfade curves (smoother than linear) - # Equal-power maintains perceived loudness during transition - t = np.linspace(0, 1, crossfade_samples) - fade_out = np.cos(t * np.pi / 2) # Cosine fade out - fade_in = np.sin(t * np.pi / 2) # Sine fade in - - # Apply crossfade to the beginning of current frame with end of previous frame - if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples: - crossfaded = ( - prev_samples[-crossfade_samples:] * fade_out + - curr_samples[:crossfade_samples] * fade_in - ).astype(dtype) - # Replace beginning of current frame with crossfaded section - curr_samples[:crossfade_samples] = crossfaded - pcm_frame = curr_samples.tobytes() - - big['apply_crossfade'] = False - - # Store current frame for potential next crossfade - if enable_drift_compensation: - big['prev_pcm_frame'] = pcm_frame + sample_rate = big['audio_input']._pcm_format.sample_rate + crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2) + prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy() + curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy() + t = np.linspace(0, 1, crossfade_samples) + fade_out = np.cos(t * np.pi / 2) + fade_in = np.sin(t * np.pi / 2) + if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples: + crossfaded = (prev_samples[-crossfade_samples:] * fade_out + curr_samples[:crossfade_samples] * fade_in).astype(dtype) + curr_samples[:crossfade_samples] = crossfaded + pcm_frame = curr_samples.tobytes() + big['apply_crossfade'] = False - lc3_frame = big['encoder'].encode( - pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'] - ) + if enable_drift_compensation: + big['prev_pcm_frame'] = pcm_frame - if big.get('logged_first_lc3') is not True: - try: - logging.info(f"First LC3 frame size={len(lc3_frame)} bytes") - except Exception: - pass - big['logged_first_lc3'] = True + lc3_frame = big['encoder'].encode(pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']) - await big['iso_queue'].write(lc3_frame) + if big.get('logged_first_lc3') is not True: + try: + logging.info(f"First LC3 frame size={len(lc3_frame)} bytes") + except Exception: + pass + big['logged_first_lc3'] = True + + await big['iso_queue'].write(lc3_frame) + except asyncio.CancelledError: + raise + except Exception as e: + if not big.get('logged_exception'): + logging.error(f"Exception in stream loop for BIG {i}: {e}", exc_info=True) + big['logged_exception'] = True frame_count += 1 # Increment guard counter (tracks frames since last discard) frames_since_last_discard += 1