Compare commits
1 Commits
2d53e6130b
...
a196bcd2fc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a196bcd2fc |
@@ -132,13 +132,10 @@ class ModWaveAudioInput(audio_io.ThreadedAudioInput):
|
||||
return b''
|
||||
|
||||
pcm_samples = self._wav.readframes(frame_size)
|
||||
if self._bytes_read == 0:
|
||||
logging.info(f"WaveAudioInput: first read requested frame_size={frame_size} -> got {len(pcm_samples)} bytes")
|
||||
if not pcm_samples and self._bytes_read:
|
||||
if not self.rewind:
|
||||
return None
|
||||
# Loop around.
|
||||
logging.info("WaveAudioInput: EOF reached, rewinding to start")
|
||||
self._wav.rewind()
|
||||
self._bytes_read = 0
|
||||
pcm_samples = self._wav.readframes(frame_size)
|
||||
@@ -600,9 +597,6 @@ class Streamer():
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
pcm_format = await audio_input.open()
|
||||
logging.info(
|
||||
f"Opened audio input: {type(audio_input).__name__} src={audio_source} sr={pcm_format.sample_rate} ch={pcm_format.channels}"
|
||||
)
|
||||
break # success
|
||||
except _sd.PortAudioError as err:
|
||||
# -9985 == paDeviceUnavailable
|
||||
@@ -680,7 +674,12 @@ class Streamer():
|
||||
self.is_streaming = True
|
||||
|
||||
# frame drop algo parameters
|
||||
sample_rate = big['audio_input']._pcm_format.sample_rate
|
||||
# In demo/precoded modes there may be no audio_input or no _pcm_format yet
|
||||
ai = big.get('audio_input')
|
||||
if ai is not None and hasattr(ai, '_pcm_format') and getattr(ai, '_pcm_format') is not None:
|
||||
sample_rate = ai._pcm_format.sample_rate
|
||||
else:
|
||||
sample_rate = global_config.auracast_sampling_rate_hz
|
||||
samples_discarded_total = 0 # Total samples discarded
|
||||
discard_events = 0 # Number of times we discarded samples
|
||||
frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop)
|
||||
@@ -703,144 +702,127 @@ class Streamer():
|
||||
last_stats_log = time.perf_counter()
|
||||
stats_interval = 5.0 # Log stats every 5 seconds
|
||||
frame_count = 0
|
||||
# Prime inputs: read one frame from each non-precoded input to verify data flow
|
||||
try:
|
||||
for j, _big in enumerate(bigs.values()):
|
||||
if not _big.get('precoded'):
|
||||
gen = _big.get('frames_gen')
|
||||
if gen is None:
|
||||
gen = _big['audio_input'].frames(_big['lc3_frame_samples'])
|
||||
_big['frames_gen'] = gen
|
||||
test_frame = await anext(gen, None)
|
||||
logging.info(
|
||||
f"Prime read BIG{j}: bytes={0 if test_frame is None else len(test_frame)} samples={_big['lc3_frame_samples']}"
|
||||
)
|
||||
# Store for crossfade if needed
|
||||
if enable_drift_compensation and test_frame is not None:
|
||||
_big['prev_pcm_frame'] = test_frame
|
||||
except Exception as e:
|
||||
logging.error(f"Prime read failed: {e}", exc_info=True)
|
||||
|
||||
# One streamer fits all
|
||||
while self.is_streaming:
|
||||
stream_finished = [False for _ in range(len(bigs))]
|
||||
for i, big in enumerate(bigs.values()):
|
||||
if big.get('logged_loop_enter') is not True:
|
||||
logging.info(
|
||||
f"Stream loop enter: input={type(big['audio_input']).__name__} lc3_frame_samples={big.get('lc3_frame_samples')} bytes_per_frame={big.get('lc3_bytes_per_frame')}"
|
||||
)
|
||||
big['logged_loop_enter'] = True
|
||||
try:
|
||||
if big['precoded']:
|
||||
# everything was already lc3 coded beforehand
|
||||
lc3_frame = bytes(itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame']))
|
||||
if lc3_frame == b'':
|
||||
# Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
else:
|
||||
# code lc3 on the fly
|
||||
# Use stored frames generator when available so we can aclose() it on stop
|
||||
frames_gen = big.get('frames_gen')
|
||||
if frames_gen is None:
|
||||
logging.info("Creating frames generator for input")
|
||||
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
|
||||
big['frames_gen'] = frames_gen
|
||||
# Read the frame we need for encoding
|
||||
logging.info("Awaiting next PCM frame from frames_gen…")
|
||||
pcm_frame = await anext(frames_gen, None)
|
||||
if big['precoded']:# everything was already lc3 coded beforehand
|
||||
lc3_frame = bytes(
|
||||
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
|
||||
)
|
||||
|
||||
if big.get('logged_first_frame') is not True:
|
||||
logging.info(
|
||||
f"First PCM frame bytes={0 if pcm_frame is None else len(pcm_frame)} | lc3_frame_samples={big['lc3_frame_samples']} | bytes_per_frame={big['lc3_bytes_per_frame']}"
|
||||
)
|
||||
big['logged_first_frame'] = True
|
||||
if lc3_frame == b'': # Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
else: # code lc3 on the fly
|
||||
# Use stored frames generator when available so we can aclose() it on stop
|
||||
frames_gen = big.get('frames_gen')
|
||||
if frames_gen is None:
|
||||
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
|
||||
big['frames_gen'] = frames_gen
|
||||
|
||||
# Read the frame we need for encoding
|
||||
pcm_frame = await anext(frames_gen, None)
|
||||
|
||||
if pcm_frame is None:
|
||||
# Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
|
||||
# Discard excess samples in buffer if above threshold (clock drift compensation)
|
||||
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
|
||||
sd_buffer_samples = big['audio_input']._stream.read_available
|
||||
# Guard: only allow discard if enough frames have passed since last discard
|
||||
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
|
||||
# Always drop a static amount for predictable behavior
|
||||
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
|
||||
try:
|
||||
await anext(big['audio_input'].frames(samples_to_drop))
|
||||
samples_discarded_total += samples_to_drop
|
||||
discard_events += 1
|
||||
sample_rate = big['audio_input']._pcm_format.sample_rate
|
||||
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
|
||||
logging.info(
|
||||
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
|
||||
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
|
||||
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
|
||||
f"frame={frame_count}"
|
||||
)
|
||||
frames_since_last_discard = 0
|
||||
big['last_drop_samples'] = samples_to_drop
|
||||
big['apply_crossfade'] = True
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to discard samples: {e}")
|
||||
|
||||
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
|
||||
if big.get('channels', 1) > 1:
|
||||
if isinstance(pcm_frame, np.ndarray):
|
||||
if pcm_frame.ndim > 1:
|
||||
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
|
||||
pcm_frame = mono
|
||||
else:
|
||||
# Convert raw bytes to numpy, average channels, convert back
|
||||
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
|
||||
samples = np.frombuffer(pcm_frame, dtype=dtype)
|
||||
samples = samples.reshape(-1, big['channels']).mean(axis=1)
|
||||
pcm_frame = samples.astype(dtype).tobytes()
|
||||
|
||||
# Apply crossfade if samples were just dropped (drift compensation)
|
||||
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
|
||||
if pcm_frame is None: # Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
|
||||
# Discard excess samples in buffer if above threshold (clock drift compensation)
|
||||
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
|
||||
sd_buffer_samples = big['audio_input']._stream.read_available
|
||||
|
||||
# Guard: only allow discard if enough frames have passed since last discard
|
||||
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
|
||||
# Always drop a static amount (3ms) for predictable behavior
|
||||
# This matches the crossfade duration better for smoother transitions
|
||||
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
|
||||
try:
|
||||
discarded_data = await anext(big['audio_input'].frames(samples_to_drop))
|
||||
samples_discarded_total += samples_to_drop
|
||||
discard_events += 1
|
||||
|
||||
# Log every discard event with timing information
|
||||
sample_rate = big['audio_input']._pcm_format.sample_rate
|
||||
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
|
||||
logging.info(
|
||||
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
|
||||
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
|
||||
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
|
||||
f"frame={frame_count}"
|
||||
)
|
||||
|
||||
# Reset guard counter
|
||||
frames_since_last_discard = 0
|
||||
# Store how much we dropped for potential adaptive crossfade
|
||||
big['last_drop_samples'] = samples_to_drop
|
||||
# Set flag to apply crossfade on next frame
|
||||
big['apply_crossfade'] = True
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to discard samples: {e}")
|
||||
|
||||
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
|
||||
if big.get('channels', 1) > 1:
|
||||
if isinstance(pcm_frame, np.ndarray):
|
||||
if pcm_frame.ndim > 1:
|
||||
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
|
||||
pcm_frame = mono
|
||||
else:
|
||||
# Convert raw bytes to numpy, average channels, convert back
|
||||
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
|
||||
sample_rate = big['audio_input']._pcm_format.sample_rate
|
||||
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
|
||||
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
|
||||
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
|
||||
t = np.linspace(0, 1, crossfade_samples)
|
||||
fade_out = np.cos(t * np.pi / 2)
|
||||
fade_in = np.sin(t * np.pi / 2)
|
||||
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
|
||||
crossfaded = (prev_samples[-crossfade_samples:] * fade_out + curr_samples[:crossfade_samples] * fade_in).astype(dtype)
|
||||
curr_samples[:crossfade_samples] = crossfaded
|
||||
pcm_frame = curr_samples.tobytes()
|
||||
big['apply_crossfade'] = False
|
||||
samples = np.frombuffer(pcm_frame, dtype=dtype)
|
||||
samples = samples.reshape(-1, big['channels']).mean(axis=1)
|
||||
pcm_frame = samples.astype(dtype).tobytes()
|
||||
|
||||
# Apply crossfade if samples were just dropped (drift compensation)
|
||||
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
|
||||
# Crossfade duration: 10ms for smoother transition (was 5ms)
|
||||
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
|
||||
sample_rate = big['audio_input']._pcm_format.sample_rate
|
||||
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
|
||||
|
||||
# Convert frames to numpy arrays (make writable copies)
|
||||
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
|
||||
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
|
||||
|
||||
# Create equal-power crossfade curves (smoother than linear)
|
||||
# Equal-power maintains perceived loudness during transition
|
||||
t = np.linspace(0, 1, crossfade_samples)
|
||||
fade_out = np.cos(t * np.pi / 2) # Cosine fade out
|
||||
fade_in = np.sin(t * np.pi / 2) # Sine fade in
|
||||
|
||||
# Apply crossfade to the beginning of current frame with end of previous frame
|
||||
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
|
||||
crossfaded = (
|
||||
prev_samples[-crossfade_samples:] * fade_out +
|
||||
curr_samples[:crossfade_samples] * fade_in
|
||||
).astype(dtype)
|
||||
# Replace beginning of current frame with crossfaded section
|
||||
curr_samples[:crossfade_samples] = crossfaded
|
||||
pcm_frame = curr_samples.tobytes()
|
||||
|
||||
big['apply_crossfade'] = False
|
||||
|
||||
# Store current frame for potential next crossfade
|
||||
if enable_drift_compensation:
|
||||
big['prev_pcm_frame'] = pcm_frame
|
||||
|
||||
if enable_drift_compensation:
|
||||
big['prev_pcm_frame'] = pcm_frame
|
||||
lc3_frame = big['encoder'].encode(
|
||||
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
|
||||
)
|
||||
|
||||
lc3_frame = big['encoder'].encode(pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'])
|
||||
|
||||
if big.get('logged_first_lc3') is not True:
|
||||
try:
|
||||
logging.info(f"First LC3 frame size={len(lc3_frame)} bytes")
|
||||
except Exception:
|
||||
pass
|
||||
big['logged_first_lc3'] = True
|
||||
|
||||
await big['iso_queue'].write(lc3_frame)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
if not big.get('logged_exception'):
|
||||
logging.error(f"Exception in stream loop for BIG {i}: {e}", exc_info=True)
|
||||
big['logged_exception'] = True
|
||||
await big['iso_queue'].write(lc3_frame)
|
||||
frame_count += 1
|
||||
# Increment guard counter (tracks frames since last discard)
|
||||
frames_since_last_discard += 1
|
||||
|
||||
# Periodic stats logging
|
||||
# Periodic stats logging (only for device/sounddevice streams, not WAV files)
|
||||
# WAV file concurrent access causes deadlock in ThreadedAudioInput
|
||||
now = time.perf_counter()
|
||||
if now - last_stats_log >= stats_interval:
|
||||
is_device_stream = hasattr(big['audio_input'], '_stream') and big['audio_input']._stream is not None
|
||||
if is_device_stream and now - last_stats_log >= stats_interval:
|
||||
# Get current buffer status from PortAudio
|
||||
current_sd_buffer = 0
|
||||
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
|
||||
|
||||
@@ -205,27 +205,6 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
|
||||
for big in conf.bigs:
|
||||
big.input_format = f"int16le,{capture_rate},{channels}"
|
||||
|
||||
# Resolve relative file: paths to absolute paths (robust against varying cwd)
|
||||
try:
|
||||
server_dir = os.path.dirname(__file__)
|
||||
pkg_root = os.path.dirname(os.path.dirname(__file__)) # .../auracast
|
||||
for big in conf.bigs:
|
||||
if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'):
|
||||
rel = big.audio_source.split(':', 1)[1]
|
||||
if not os.path.isabs(rel):
|
||||
candidates = [
|
||||
os.path.normpath(os.path.join(server_dir, rel)),
|
||||
os.path.normpath(os.path.join(pkg_root, rel)),
|
||||
]
|
||||
resolved = next((c for c in candidates if os.path.exists(c)), None)
|
||||
if resolved:
|
||||
log.info("Resolved demo file path '%s' -> '%s'", rel, resolved)
|
||||
big.audio_source = f"file:{resolved}"
|
||||
else:
|
||||
raise FileNotFoundError(rel)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f"Demo audio file not found or unreadable: {e}")
|
||||
|
||||
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
|
||||
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
|
||||
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
|
||||
@@ -300,26 +279,6 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
|
||||
if device_index is None:
|
||||
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
|
||||
big.audio_source = f'device:{device_index}'
|
||||
# Resolve relative file: paths to absolute paths (robust against varying cwd)
|
||||
try:
|
||||
server_dir = os.path.dirname(__file__)
|
||||
pkg_root = os.path.dirname(os.path.dirname(__file__)) # .../auracast
|
||||
for big in conf.bigs:
|
||||
if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'):
|
||||
rel = big.audio_source.split(':', 1)[1]
|
||||
if not os.path.isabs(rel):
|
||||
candidates = [
|
||||
os.path.normpath(os.path.join(server_dir, rel)),
|
||||
os.path.normpath(os.path.join(pkg_root, rel)),
|
||||
]
|
||||
resolved = next((c for c in candidates if os.path.exists(c)), None)
|
||||
if resolved:
|
||||
log.info("Resolved demo file path '%s' -> '%s'", rel, resolved)
|
||||
big.audio_source = f"file:{resolved}"
|
||||
else:
|
||||
raise FileNotFoundError(rel)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f"Demo audio file not found or unreadable: {e}")
|
||||
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
|
||||
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
|
||||
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
|
||||
|
||||
Reference in New Issue
Block a user