feat: add clock drift compensation to discard excess samples when buffer exceeds threshold

This commit is contained in:
pstruebi
2025-10-08 14:27:58 +02:00
parent 37984b028b
commit 6d835bf1be
4 changed files with 91 additions and 10 deletions

View File

@@ -40,6 +40,9 @@ class AuracastGlobalConfig(BaseModel):
# so receivers may render earlier than the presentation delay for lower latency.
immediate_rendering: bool = False
assisted_listening_stream: bool = False
# Clock drift compensation: discard excess samples when buffer exceeds threshold
enable_drift_compensation: bool = False
drift_threshold_ms: float = 2.0 # Discard threshold in milliseconds
# "Audio input. "
# "'device' -> use the host's default sound input device, "

View File

@@ -26,11 +26,9 @@ import struct
from typing import cast, Any, AsyncGenerator, Coroutine, List
import itertools
import glob
import time
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
import lc3 # type: ignore # pylint: disable=E0401
from bumble.colors import color
from bumble import company_ids
@@ -623,6 +621,25 @@ class Streamer():
logging.info("Streaming audio...")
bigs = self.bigs
self.is_streaming = True
# Sample discard stats (clock drift compensation)
samples_discarded_total = 0 # Total samples discarded
discard_events = 0 # Number of times we discarded samples
enable_drift_compensation = global_config.enable_drift_compensation
# Calculate threshold based on config (default 2ms)
drift_threshold_ms = global_config.drift_threshold_ms if enable_drift_compensation else 0
drop_threshold_samples = 0
if enable_drift_compensation:
logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms")
else:
logging.info("Clock drift compensation DISABLED")
# Periodic monitoring
last_stats_log = time.perf_counter()
stats_interval = 5.0 # Log stats every 5 seconds
frame_count = 0
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
@@ -641,12 +658,39 @@ class Streamer():
if frames_gen is None:
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Read the frame we need for encoding
pcm_frame = await anext(frames_gen, None)
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Calculate threshold samples based on sample rate (only once per BIG)
if enable_drift_compensation and drop_threshold_samples == 0:
sample_rate = big['audio_input']._pcm_format.sample_rate
drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0)
logging.info(f"Drift compensation threshold: {drop_threshold_samples} samples ({drift_threshold_ms}ms @ {sample_rate}Hz)")
# Discard excess samples in buffer if above threshold (clock drift compensation)
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
sd_buffer_samples = big['audio_input']._stream.read_available
if sd_buffer_samples > drop_threshold_samples:
# Discard ALL remaining samples to bring buffer back down
try:
discarded_data = big['audio_input']._stream.read(sd_buffer_samples)
samples_discarded_total += sd_buffer_samples
discard_events += 1
if discard_events % 100 == 0: # Log every 100th discard
logging.warning(
f"Discard #{discard_events}: {sd_buffer_samples} samples ({sd_buffer_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) "
f"| total discarded: {samples_discarded_total} samples"
)
except Exception as e:
logging.error(f"Failed to discard samples: {e}")
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
@@ -665,6 +709,35 @@ class Streamer():
)
await big['iso_queue'].write(lc3_frame)
frame_count += 1
# Periodic stats logging
now = time.perf_counter()
if now - last_stats_log >= stats_interval:
# Get current buffer status
current_sd_buffer = 0
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
try:
current_sd_buffer = big['audio_input']._stream.read_available
except Exception:
pass
if enable_drift_compensation:
avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0
discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0
logging.info(
f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | "
f"avg_discard={avg_discard_per_event:.0f} samples/event | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)"
)
else:
logging.info(
f"STATS: frames={frame_count} | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"drift_compensation=DISABLED"
)
last_stats_log = now
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')

View File

@@ -61,7 +61,7 @@ if __name__ == "__main__":
# Load .env located next to this script (only uppercase keys will be referenced)
load_dotenv(dotenv_path='.env')
os.environ.setdefault("PULSE_LATENCY_MSEC", "3")
os.environ.setdefault("PULSE_LATENCY_MSEC", "2")
# Refresh device cache and list inputs
refresh_pw_cache()
@@ -150,14 +150,17 @@ if __name__ == "__main__":
),
#auracast_config.AuracastBigConfigEng(),
],
immediate_rendering=True,
immediate_rendering=False,
presentation_delay_us=40000,
qos_config=auracast_config.AuracastQosHigh(),
auracast_sampling_rate_hz = LC3_SRATE,
octets_per_frame = OCTETS_PER_FRAME,
transport=TRANSPORT1
transport=TRANSPORT1,
enable_drift_compensation=True,
drift_threshold_ms=2.0
)
#config.debug = True
config.debug = False
logging.info(config.model_dump_json(indent=2))
multicast.run_async(

View File

@@ -42,7 +42,7 @@ TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # tr
PTIME = 40 # seems to have no effect at all
pcs: Set[RTCPeerConnection] = set() # keep refs so they dont GC early
os.environ["PULSE_LATENCY_MSEC"] = "3"
os.environ["PULSE_LATENCY_MSEC"] = "2"
# In-memory cache to avoid disk I/O on hot paths like /status
SETTINGS_CACHE: dict = {}
@@ -172,7 +172,9 @@ class StreamerWorker:
pass
self._multicaster1 = None
# overwrite some configurations
conf.transport = TRANSPORT1
conf.enable_drift_compensation=True
# Derive device name and input mode
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None