From 6d835bf1be1b75cfb100a8da3838cd9dddd5357d Mon Sep 17 00:00:00 2001 From: pstruebi Date: Wed, 8 Oct 2025 14:27:58 +0200 Subject: [PATCH] feat: add clock drift compensation to discard excess samples when buffer exceeds threshold --- src/auracast/auracast_config.py | 3 + src/auracast/multicast.py | 83 +++++++++++++++++++++++-- src/auracast/multicast_script.py | 11 ++-- src/auracast/server/multicast_server.py | 4 +- 4 files changed, 91 insertions(+), 10 deletions(-) diff --git a/src/auracast/auracast_config.py b/src/auracast/auracast_config.py index 2971dea..06d855f 100644 --- a/src/auracast/auracast_config.py +++ b/src/auracast/auracast_config.py @@ -40,6 +40,9 @@ class AuracastGlobalConfig(BaseModel): # so receivers may render earlier than the presentation delay for lower latency. immediate_rendering: bool = False assisted_listening_stream: bool = False + # Clock drift compensation: discard excess samples when buffer exceeds threshold + enable_drift_compensation: bool = False + drift_threshold_ms: float = 2.0 # Discard threshold in milliseconds # "Audio input. " # "'device' -> use the host's default sound input device, " diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 93792e5..f606dcd 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -26,11 +26,9 @@ import struct from typing import cast, Any, AsyncGenerator, Coroutine, List import itertools import glob +import time -try: - import lc3 # type: ignore # pylint: disable=E0401 -except ImportError as e: - raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e +import lc3 # type: ignore # pylint: disable=E0401 from bumble.colors import color from bumble import company_ids @@ -623,6 +621,25 @@ class Streamer(): logging.info("Streaming audio...") bigs = self.bigs self.is_streaming = True + + # Sample discard stats (clock drift compensation) + samples_discarded_total = 0 # Total samples discarded + discard_events = 0 # Number of times we discarded samples + enable_drift_compensation = global_config.enable_drift_compensation + # Calculate threshold based on config (default 2ms) + drift_threshold_ms = global_config.drift_threshold_ms if enable_drift_compensation else 0 + drop_threshold_samples = 0 + + if enable_drift_compensation: + logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms") + else: + logging.info("Clock drift compensation DISABLED") + + # Periodic monitoring + last_stats_log = time.perf_counter() + stats_interval = 5.0 # Log stats every 5 seconds + frame_count = 0 + # One streamer fits all while self.is_streaming: stream_finished = [False for _ in range(len(bigs))] @@ -641,12 +658,39 @@ class Streamer(): if frames_gen is None: frames_gen = big['audio_input'].frames(big['lc3_frame_samples']) big['frames_gen'] = frames_gen + + # Read the frame we need for encoding pcm_frame = await anext(frames_gen, None) if pcm_frame is None: # Not all streams may stop at the same time stream_finished[i] = True continue - + + # Calculate threshold samples based on sample rate (only once per BIG) + if enable_drift_compensation and drop_threshold_samples == 0: + sample_rate = big['audio_input']._pcm_format.sample_rate + drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0) + logging.info(f"Drift compensation threshold: {drop_threshold_samples} samples ({drift_threshold_ms}ms @ {sample_rate}Hz)") + + # Discard excess samples in buffer if above threshold (clock drift compensation) + if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + sd_buffer_samples = big['audio_input']._stream.read_available + + if sd_buffer_samples > drop_threshold_samples: + # Discard ALL remaining samples to bring buffer back down + try: + discarded_data = big['audio_input']._stream.read(sd_buffer_samples) + samples_discarded_total += sd_buffer_samples + discard_events += 1 + + if discard_events % 100 == 0: # Log every 100th discard + logging.warning( + f"Discard #{discard_events}: {sd_buffer_samples} samples ({sd_buffer_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) " + f"| total discarded: {samples_discarded_total} samples" + ) + except Exception as e: + logging.error(f"Failed to discard samples: {e}") + # Down-mix multi-channel PCM to mono for LC3 encoder if needed if big.get('channels', 1) > 1: if isinstance(pcm_frame, np.ndarray): @@ -665,6 +709,35 @@ class Streamer(): ) await big['iso_queue'].write(lc3_frame) + frame_count += 1 + + # Periodic stats logging + now = time.perf_counter() + if now - last_stats_log >= stats_interval: + # Get current buffer status + current_sd_buffer = 0 + if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream: + try: + current_sd_buffer = big['audio_input']._stream.read_available + except Exception: + pass + + if enable_drift_compensation: + avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0 + discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0 + logging.info( + f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | " + f"avg_discard={avg_discard_per_event:.0f} samples/event | " + f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" + ) + else: + logging.info( + f"STATS: frames={frame_count} | " + f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | " + f"drift_compensation=DISABLED" + ) + last_stats_log = now if all(stream_finished): # Take into account that multiple files have different lengths logging.info('All streams finished, stopping streamer') diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 553cebe..b0eb1cf 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -61,7 +61,7 @@ if __name__ == "__main__": # Load .env located next to this script (only uppercase keys will be referenced) load_dotenv(dotenv_path='.env') - os.environ.setdefault("PULSE_LATENCY_MSEC", "3") + os.environ.setdefault("PULSE_LATENCY_MSEC", "2") # Refresh device cache and list inputs refresh_pw_cache() @@ -150,14 +150,17 @@ if __name__ == "__main__": ), #auracast_config.AuracastBigConfigEng(), ], - immediate_rendering=True, + immediate_rendering=False, presentation_delay_us=40000, qos_config=auracast_config.AuracastQosHigh(), auracast_sampling_rate_hz = LC3_SRATE, octets_per_frame = OCTETS_PER_FRAME, - transport=TRANSPORT1 + transport=TRANSPORT1, + enable_drift_compensation=True, + drift_threshold_ms=2.0 ) - #config.debug = True + config.debug = False + logging.info(config.model_dump_json(indent=2)) multicast.run_async( diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index a2b1b88..8c1c52e 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -42,7 +42,7 @@ TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # tr PTIME = 40 # seems to have no effect at all pcs: Set[RTCPeerConnection] = set() # keep refs so they don’t GC early -os.environ["PULSE_LATENCY_MSEC"] = "3" +os.environ["PULSE_LATENCY_MSEC"] = "2" # In-memory cache to avoid disk I/O on hot paths like /status SETTINGS_CACHE: dict = {} @@ -172,7 +172,9 @@ class StreamerWorker: pass self._multicaster1 = None + # overwrite some configurations conf.transport = TRANSPORT1 + conf.enable_drift_compensation=True # Derive device name and input mode first_source = conf.bigs[0].audio_source if conf.bigs else '' input_device_name = None