From f356c13b98843da86a4364db688c1eacaee59413 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Fri, 14 Nov 2025 16:18:58 +0100 Subject: [PATCH] refactor: consolidate dual-broadcaster state management - Removed StreamerWorker thread wrapper in favor of direct async operations with module-level multicaster instances - Split settings persistence into primary and secondary caches (SETTINGS_CACHE1/2) with dedicated load/save functions - Removed unused imports and simplified initialization logic by inlining worker coroutines into API endpoints --- src/auracast/server/multicast_server.py | 696 +++++++++++------------- src/service/auracast-server.service | 3 + 2 files changed, 323 insertions(+), 376 deletions(-) diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 864ceac..32b5452 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -5,11 +5,7 @@ TODO: in the future the multicaster objects should run in their own threads or e import os import logging as log import json -import sys -import threading -from concurrent.futures import Future from datetime import datetime -import time import asyncio from dotenv import load_dotenv @@ -24,19 +20,21 @@ from auracast.utils.sounddevice_utils import ( resolve_input_device_index, refresh_pw_cache, ) -from auracast.utils.reset_utils import reset_nrf54l load_dotenv() # make sure pipewire sets latency -STREAM_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), 'stream_settings.json') +# Primary and secondary persisted settings files +STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json') +STREAM_SETTINGS_FILE2 = os.path.join(os.path.dirname(__file__), 'stream_settings2.json') # Raspberry Pi UART transports TRANSPORT1 = os.getenv('TRANSPORT1', 'serial:/dev/ttyAMA3,1000000,rtscts') # transport for raspberry pi gpio header TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # transport for raspberry pi gpio header os.environ["PULSE_LATENCY_MSEC"] = "3" -# In-memory cache to avoid disk I/O on hot paths like /status -SETTINGS_CACHE: dict = {} +# In-memory caches to avoid disk I/O on hot paths like /status +SETTINGS_CACHE1: dict = {} +SETTINGS_CACHE2: dict = {} @@ -55,39 +53,66 @@ def get_device_index_by_name(name: str): return None -def _hydrate_settings_cache_from_disk() -> None: - """Populate SETTINGS_CACHE once from disk at startup. +def _init_settings_cache_from_disk() -> None: + """Populate SETTINGS_CACHE1 and SETTINGS_CACHE2 once from disk at startup. - Safe to call multiple times; errors fall back to empty dict. + If a file doesn't exist, initialize to an empty dict. Any JSON or I/O errors raise. """ - global SETTINGS_CACHE - try: - if os.path.exists(STREAM_SETTINGS_FILE): - with open(STREAM_SETTINGS_FILE, 'r', encoding='utf-8') as f: - SETTINGS_CACHE = json.load(f) - else: - SETTINGS_CACHE = {} - except Exception: - SETTINGS_CACHE = {} + global SETTINGS_CACHE1, SETTINGS_CACHE2 + if os.path.exists(STREAM_SETTINGS_FILE1): + with open(STREAM_SETTINGS_FILE1, 'r', encoding='utf-8') as f: + SETTINGS_CACHE1 = json.load(f) + else: + SETTINGS_CACHE1 = {} + if os.path.exists(STREAM_SETTINGS_FILE2): + with open(STREAM_SETTINGS_FILE2, 'r', encoding='utf-8') as f: + SETTINGS_CACHE2 = json.load(f) + else: + SETTINGS_CACHE2 = {} def load_stream_settings() -> dict: - """Return stream settings from in-memory cache. + """Return PRIMARY stream settings from in-memory cache. - The cache is hydrated once at startup and updated by save_stream_settings(). - No disk I/O occurs here. + Hydrated once at startup and updated by save_stream_settings(). No disk I/O occurs here. """ - global SETTINGS_CACHE - return SETTINGS_CACHE + global SETTINGS_CACHE1 + return SETTINGS_CACHE1 + +def load_stream_settings2() -> dict: + """Return SECONDARY stream settings from in-memory cache.""" + global SETTINGS_CACHE2 + return SETTINGS_CACHE2 def save_stream_settings(settings: dict): - """Update in-memory settings cache and persist to disk.""" - global SETTINGS_CACHE - SETTINGS_CACHE = dict(settings) - try: - with open(STREAM_SETTINGS_FILE, 'w', encoding='utf-8') as f: - json.dump(SETTINGS_CACHE, f, indent=2) - except Exception as e: - log.error('Unable to persist stream settings: %s', e) + """Update PRIMARY in-memory settings cache and persist to disk.""" + global SETTINGS_CACHE1 + SETTINGS_CACHE1 = dict(settings) + os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE1), exist_ok=True) + with open(STREAM_SETTINGS_FILE1, 'w', encoding='utf-8') as f: + json.dump(SETTINGS_CACHE1, f, indent=2) + f.flush() + os.fsync(f.fileno()) + log.info("Saved primary settings to %s", STREAM_SETTINGS_FILE1) + +def save_stream_settings2(settings: dict): + """Update SECONDARY in-memory settings cache and persist to disk.""" + global SETTINGS_CACHE2 + SETTINGS_CACHE2 = dict(settings) + os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE2), exist_ok=True) + with open(STREAM_SETTINGS_FILE2, 'w', encoding='utf-8') as f: + json.dump(SETTINGS_CACHE2, f, indent=2) + f.flush() + os.fsync(f.fileno()) + log.info("Saved secondary settings to %s", STREAM_SETTINGS_FILE2) + +def save_settings(persisted: dict, secondary: bool = False) -> None: + """Attach timestamp and persist using the appropriate cache/file.""" + persisted = dict(persisted) + persisted['timestamp'] = datetime.utcnow().isoformat() + if secondary: + save_stream_settings2(persisted) + else: + save_stream_settings(persisted) app = FastAPI() @@ -104,121 +129,176 @@ app.add_middleware( # Initialize global configuration global_config_group = auracast_config.AuracastConfigGroup() -class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? - """Owns multicaster(s) on a dedicated asyncio loop in a background thread.""" +# Module-level state replacing StreamerWorker +multicaster1: multicast_control.Multicaster | None = None +multicaster2: multicast_control.Multicaster | None = None +_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side - def __init__(self) -> None: - self._thread: threading.Thread | None = None - self._loop: asyncio.AbstractEventLoop | None = None - # These live only on the worker loop - self._multicaster1: multicast_control.Multicaster | None = None - self._multicaster2: multicast_control.Multicaster | None = None - self._started = threading.Event() - # ---------- Thread/loop management ---------- - def start(self) -> None: - if self._thread and self._thread.is_alive(): - return - self._thread = threading.Thread(target=self._run, name="StreamerWorker", daemon=True) - self._thread.start() - self._started.wait(timeout=5) - - def _run(self) -> None: - loop = asyncio.new_event_loop() - self._loop = loop - asyncio.set_event_loop(loop) - self._started.set() +async def _stop_all() -> bool: + global multicaster1, multicaster2 + was_running = False + if multicaster1 is not None: try: - loop.run_forever() + await multicaster1.stop_streaming() + await multicaster1.shutdown() + was_running = True finally: - try: - pending = asyncio.all_tasks(loop) - for t in pending: - t.cancel() - loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) - except Exception: - pass - loop.close() - - def _ensure_loop(self) -> asyncio.AbstractEventLoop: - if not self._loop: - raise RuntimeError("StreamerWorker loop not started") - return self._loop - - async def call(self, coro_func, *args, **kwargs): - """Schedule a coroutine on the worker loop and await its result from the API loop.""" - loop = self._ensure_loop() - fut: Future = asyncio.run_coroutine_threadsafe(coro_func(*args, **kwargs), loop) - return await asyncio.wrap_future(fut) - - # ---------- Worker-loop coroutines ---------- - async def _w_init_primary(self, conf: auracast_config.AuracastConfigGroup) -> dict: - # Clean any previous - if self._multicaster1 is not None: - try: - await self._multicaster1.shutdown() - except Exception: - pass - self._multicaster1 = None - - # overwrite some configurations - conf.transport = TRANSPORT1 - # Enable adaptive frame dropping only for device-based inputs (not file/demo) + multicaster1 = None + if multicaster2 is not None: try: + await multicaster2.stop_streaming() + await multicaster2.shutdown() + was_running = True + finally: + multicaster2 = None + return was_running + +async def _status_primary() -> dict: + if multicaster1 is None: + return {'is_initialized': False, 'is_streaming': False} + return multicaster1.get_status() + +async def _stream_lc3(audio_data: dict[str, str], bigs_template: list) -> None: + if multicaster1 is None: + raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized') + for big in bigs_template: + if big.language not in audio_data: + raise HTTPException(status_code=500, detail='language len missmatch') + big.audio_source = audio_data[big.language].encode('latin-1') + multicaster1.big_conf = bigs_template + await multicaster1.start_streaming() + +@app.post("/init") +async def initialize(conf: auracast_config.AuracastConfigGroup): + """Initializes the primary broadcaster on the streamer thread.""" + global global_config_group + async with _stream_lock: + try: + global_config_group = conf + log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2)) + # Inline of _init_primary + global multicaster1 + if multicaster1 is not None: + await multicaster1.shutdown() + multicaster1 = None + + conf.transport = TRANSPORT1 conf.enable_adaptive_frame_dropping = any( isinstance(big.audio_source, str) and big.audio_source.startswith('device:') for big in conf.bigs ) - except Exception: - conf.enable_adaptive_frame_dropping = False - # Derive device name and input mode - first_source = conf.bigs[0].audio_source if conf.bigs else '' - input_device_name = None - audio_mode_persist = 'Demo' - if first_source.startswith('device:'): - input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None - try: + + first_source = conf.bigs[0].audio_source if conf.bigs else '' + input_device_name = None + audio_mode_persist = 'Demo' + if isinstance(first_source, str) and first_source.startswith('device:'): + input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} - except Exception: - alsa_usb_names = set() - try: net_names = {d.get('name') for _, d in get_network_pw_inputs()} - except Exception: - usb_names, net_names = set(), set() - audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB' + audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB' - # Map device name to index using centralized resolver - if input_device_name and input_device_name.isdigit(): - device_index = int(input_device_name) - else: - device_index = resolve_input_device_index(input_device_name or '') - if device_index is None: - raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.") - for big in conf.bigs: - if big.audio_source.startswith('device:'): - big.audio_source = f'device:{device_index}' - devinfo = sd.query_devices(device_index) - # Force capture at 48 kHz to avoid resampler latency and 44.1 kHz incompatibilities - capture_rate = 48000 - max_in = int(devinfo.get('max_input_channels') or 1) - channels = max(1, min(2, max_in)) - for big in conf.bigs: - big.input_format = f"int16le,{capture_rate},{channels}" + if input_device_name and input_device_name.isdigit(): + device_index = int(input_device_name) + else: + device_index = resolve_input_device_index(input_device_name or '') + if device_index is None: + raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.") + for big in conf.bigs: + if isinstance(big.audio_source, str) and big.audio_source.startswith('device:'): + big.audio_source = f'device:{device_index}' + devinfo = sd.query_devices(device_index) + capture_rate = 48000 + max_in = int(devinfo.get('max_input_channels') or 1) + channels = max(1, min(2, max_in)) + for big in conf.bigs: + big.input_format = f"int16le,{capture_rate},{channels}" - # Coerce QoS: compute max_transport_latency from RTN if qos_config present + if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None: + conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3 + + multicaster1 = multicast_control.Multicaster(conf, conf.bigs) + await multicaster1.init_broadcast() + auto_started = False + if any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("file:")) for big in conf.bigs): + await multicaster1.start_streaming() + auto_started = True + + demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:')) + demo_rate = int(conf.auracast_sampling_rate_hz or 0) + demo_type = None + if demo_count > 0 and demo_rate > 0: + if demo_rate in (48000, 24000, 16000): + demo_type = f"{demo_count} × {demo_rate//1000}kHz" + else: + demo_type = f"{demo_count} × {demo_rate}Hz" + persisted = { + 'channel_names': [big.name for big in conf.bigs], + 'languages': [big.language for big in conf.bigs], + 'audio_mode': audio_mode_persist, + 'input_device': input_device_name, + 'program_info': [getattr(big, 'program_info', None) for big in conf.bigs], + 'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs], + 'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz, + 'octets_per_frame': conf.octets_per_frame, + 'presentation_delay_us': getattr(conf, 'presentation_delay_us', None), + 'rtn': getattr(getattr(conf, 'qos_config', None), 'number_of_retransmissions', None), + 'immediate_rendering': getattr(conf, 'immediate_rendering', False), + 'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False), + 'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None), + 'demo_total_streams': demo_count, + 'demo_stream_type': demo_type, + 'is_streaming': auto_started, + } + # Persist returned settings (avoid touching from worker thread) + save_settings(persisted, secondary=False) + except Exception as e: + log.error("Exception in /init: %s", traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/init2") +async def initialize2(conf: auracast_config.AuracastConfigGroup): + """Initializes the secondary broadcaster on the streamer thread.""" + try: + log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) + # Inline of _init_secondary + global multicaster2 + if multicaster2 is not None: + await multicaster2.shutdown() + multicaster2 = None + + conf.transport = TRANSPORT2 + conf.enable_adaptive_frame_dropping = any( + isinstance(big.audio_source, str) and big.audio_source.startswith('device:') + for big in conf.bigs + ) + for big in conf.bigs: + if isinstance(big.audio_source, str) and big.audio_source.startswith('device:'): + device_name = big.audio_source.split(':', 1)[1] + net_names = {d.get('name') for _, d in get_network_pw_inputs()} + alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} + device_index = resolve_input_device_index(device_name) + if device_index is None: + raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.") + big.audio_source = f'device:{device_index}' if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None: conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3 - # Create and init multicaster1 - self._multicaster1 = multicast_control.Multicaster(conf, conf.bigs) - #await reset_nrf54l(1) - await self._multicaster1.init_broadcast() - auto_started = False - if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): - await self._multicaster1.start_streaming() - auto_started = True - - # Return proposed settings to persist on API side + multicaster2 = multicast_control.Multicaster(conf, conf.bigs) + await multicaster2.init_broadcast() + if any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("file:")) for big in conf.bigs): + await multicaster2.start_streaming() + # Build and persist secondary settings analogous to primary + first_source = conf.bigs[0].audio_source if conf.bigs else '' + input_device_name = None + audio_mode_persist = 'Demo' + if isinstance(first_source, str) and first_source.startswith('device:'): + input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None + try: + net_names = {d.get('name') for _, d in get_network_pw_inputs()} + except Exception: + net_names = set() + audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB' demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:')) demo_rate = int(conf.auracast_sampling_rate_hz or 0) demo_type = None @@ -227,7 +307,7 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? demo_type = f"{demo_count} × {demo_rate//1000}kHz" else: demo_type = f"{demo_count} × {demo_rate}Hz" - return { + persisted2 = { 'channel_names': [big.name for big in conf.bigs], 'languages': [big.language for big in conf.bigs], 'audio_mode': audio_mode_persist, @@ -243,148 +323,33 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ? 'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None), 'demo_total_streams': demo_count, 'demo_stream_type': demo_type, - 'is_streaming': auto_started, + 'is_streaming': any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("file:")) for big in conf.bigs), } - - async def _w_init_secondary(self, conf: auracast_config.AuracastConfigGroup) -> None: - if self._multicaster2 is not None: - try: - await self._multicaster2.shutdown() - except Exception: - pass - self._multicaster2 = None - - conf.transport = TRANSPORT2 - # Enable adaptive frame dropping only for device-based inputs (not file/demo) - try: - conf.enable_adaptive_frame_dropping = any( - isinstance(big.audio_source, str) and big.audio_source.startswith('device:') - for big in conf.bigs - ) - except Exception: - conf.enable_adaptive_frame_dropping = False - for big in conf.bigs: - if big.audio_source.startswith('device:'): - device_name = big.audio_source.split(':', 1)[1] - # Resolve backend preference by membership - try: - net_names = {d.get('name') for _, d in get_network_pw_inputs()} - except Exception: - net_names = set() - try: - alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} - except Exception: - alsa_usb_names = set() - device_index = resolve_input_device_index(device_name) - if device_index is None: - raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.") - big.audio_source = f'device:{device_index}' - # Coerce QoS: compute max_transport_latency from RTN if qos_config present - if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None: - conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3 - - - self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs) - #await reset_nrf54l(0) - await self._multicaster2.init_broadcast() - if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): - await self._multicaster2.start_streaming() - - async def _w_stop_all(self) -> bool: - was_running = False - if self._multicaster1 is not None: - try: - await self._multicaster1.stop_streaming() - await self._multicaster1.shutdown() - was_running = True - finally: - self._multicaster1 = None - if self._multicaster2 is not None: - try: - await self._multicaster2.stop_streaming() - await self._multicaster2.shutdown() - was_running = True - finally: - self._multicaster2 = None - return was_running - - async def _w_status_primary(self) -> dict: - if self._multicaster1 is None: - return {'is_initialized': False, 'is_streaming': False} - try: - return self._multicaster1.get_status() - except Exception: - return {'is_initialized': True, 'is_streaming': False} - - async def _w_stream_lc3(self, audio_data: dict[str, str], bigs_template: list) -> None: - if self._multicaster1 is None: - raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized') - # Update bigs audio_source with provided bytes and start - for big in bigs_template: - if big.language not in audio_data: - raise HTTPException(status_code=500, detail='language len missmatch') - big.audio_source = audio_data[big.language].encode('latin-1') - self._multicaster1.big_conf = bigs_template - await self._multicaster1.start_streaming() - - -# Create the worker singleton and a route-level lock -streamer = StreamerWorker() -# multicaster1: multicast_control.Multicaster | None = None # kept for legacy references, do not use on API loop -# multicaster2: multicast_control.Multicaster | None = None -_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side -@app.post("/init") -async def initialize(conf: auracast_config.AuracastConfigGroup): - """Initializes the primary broadcaster on the streamer thread.""" - global global_config_group - async with _stream_lock: - try: - global_config_group = conf - log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2)) - persisted = await streamer.call(streamer._w_init_primary, conf) - # Persist returned settings (avoid touching from worker thread) - persisted['timestamp'] = datetime.utcnow().isoformat() - save_stream_settings(persisted) - except Exception as e: - log.error("Exception in /init: %s", traceback.format_exc()) - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/init2") -async def initialize2(conf: auracast_config.AuracastConfigGroup): - """Initializes the secondary broadcaster on the streamer thread.""" - try: - log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) - await streamer.call(streamer._w_init_secondary, conf) - try: - is_demo = any(isinstance(big.audio_source, str) and big.audio_source.startswith('file:') for big in conf.bigs) - if is_demo: - settings = load_stream_settings() or {} - primary_count = int(settings.get('demo_total_streams') or len(settings.get('channel_names') or [])) - secondary_count = len(conf.bigs or []) - total = primary_count + secondary_count - settings['demo_total_streams'] = total - demo_rate = int(conf.auracast_sampling_rate_hz or 0) - if demo_rate > 0: - if demo_rate in (48000, 24000, 16000): - settings['demo_stream_type'] = f"{total} × {demo_rate//1000}kHz" - else: - settings['demo_stream_type'] = f"{total} × {demo_rate}Hz" - settings['timestamp'] = datetime.utcnow().isoformat() - save_stream_settings(settings) - except Exception: - log.warning("Failed to persist demo_total_streams in /init2", exc_info=True) + save_settings(persisted2, secondary=True) + is_demo = any(isinstance(big.audio_source, str) and big.audio_source.startswith('file:') for big in conf.bigs) + if is_demo: + settings = load_stream_settings() or {} + primary_count = int(settings.get('demo_total_streams') or len(settings.get('channel_names') or [])) + secondary_count = len(conf.bigs or []) + total = primary_count + secondary_count + settings['demo_total_streams'] = total + demo_rate = int(conf.auracast_sampling_rate_hz or 0) + if demo_rate > 0: + if demo_rate in (48000, 24000, 16000): + settings['demo_stream_type'] = f"{total} × {demo_rate//1000}kHz" + else: + settings['demo_stream_type'] = f"{total} × {demo_rate}Hz" + settings['timestamp'] = datetime.utcnow().isoformat() + save_stream_settings(settings) except Exception as e: log.error("Exception in /init2: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - - - @app.post("/stop_audio") async def stop_audio(): """Stops streaming on both multicaster1 and multicaster2 (worker thread).""" try: - was_running = await streamer.call(streamer._w_stop_all) + was_running = await _stop_all() # Persist is_streaming=False try: @@ -402,25 +367,22 @@ async def stop_audio(): log.error("Exception in /stop_audio: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - @app.post("/stream_lc3") async def send_audio(audio_data: dict[str, str]): """Sends a block of pre-coded LC3 audio via the worker.""" try: - await streamer.call(streamer._w_stream_lc3, audio_data, list(global_config_group.bigs)) + await _stream_lc3(audio_data, list(global_config_group.bigs)) return {"status": "audio_sent"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) - @app.get("/status") async def get_status(): """Gets current status (worker) merged with persisted settings cache.""" - status = await streamer.call(streamer._w_status_primary) + status = await _status_primary() status.update(load_stream_settings()) return status - async def _autostart_from_settings(): """Background task: auto-start last selected device-based input at server startup. @@ -428,107 +390,101 @@ async def _autostart_from_settings(): saved device name appears in either USB or Network lists, then builds a config and initializes streaming. """ - try: - settings = load_stream_settings() or {} - audio_mode = settings.get('audio_mode') - input_device_name = settings.get('input_device') - rate = settings.get('auracast_sampling_rate_hz') - octets = settings.get('octets_per_frame') - pres_delay = settings.get('presentation_delay_us') - saved_rtn = settings.get('rtn') - immediate_rendering = settings.get('immediate_rendering', False) - assisted_listening_stream = settings.get('assisted_listening_stream', False) - channel_names = settings.get('channel_names') or ["Broadcast0"] - program_info = settings.get('program_info') or channel_names - languages = settings.get('languages') or ["deu"] - stream_password = settings.get('stream_password') - original_ts = settings.get('timestamp') - previously_streaming = bool(settings.get('is_streaming')) + settings = load_stream_settings() or {} + audio_mode = settings.get('audio_mode') + input_device_name = settings.get('input_device') + rate = settings.get('auracast_sampling_rate_hz') + octets = settings.get('octets_per_frame') + pres_delay = settings.get('presentation_delay_us') + saved_rtn = settings.get('rtn') + immediate_rendering = settings.get('immediate_rendering', False) + assisted_listening_stream = settings.get('assisted_listening_stream', False) + channel_names = settings.get('channel_names') or ["Broadcast0"] + program_info = settings.get('program_info') or channel_names + languages = settings.get('languages') or ["deu"] + stream_password = settings.get('stream_password') + original_ts = settings.get('timestamp') + previously_streaming = bool(settings.get('is_streaming')) - # Only auto-start if the previous state was streaming and it's a device-based input. - if not previously_streaming: - return - if not input_device_name: - return - if rate is None or octets is None: - # Not enough info to reconstruct stream reliably - return + # Only auto-start if the previous state was streaming and it's a device-based input. + if not previously_streaming: + return + if not input_device_name: + return + if rate is None or octets is None: + # Not enough info to reconstruct stream reliably + return - # Avoid duplicate start if already streaming - current = await streamer.call(streamer._w_status_primary) + # Avoid duplicate start if already streaming + current = await _status_primary() + if current.get('is_streaming'): + return + + while True: + # Do not interfere if user started a stream manually in the meantime + current = await _status_primary() if current.get('is_streaming'): return - - while True: - # Do not interfere if user started a stream manually in the meantime - current = await streamer.call(streamer._w_status_primary) - if current.get('is_streaming'): + # Abort if saved settings changed to a different target while we were polling + current_settings = load_stream_settings() or {} + if current_settings.get('timestamp') != original_ts: + # Settings were updated (likely by user via /init) + # If the target device or mode changed, stop autostart + if ( + current_settings.get('input_device') != input_device_name or + current_settings.get('audio_mode') != audio_mode + ): return - # Abort if saved settings changed to a different target while we were polling - current_settings = load_stream_settings() or {} - if current_settings.get('timestamp') != original_ts: - # Settings were updated (likely by user via /init) - # If the target device or mode changed, stop autostart - if ( - current_settings.get('input_device') != input_device_name or - current_settings.get('audio_mode') != audio_mode - ): - return - # Check against the cached device lists - usb = [d for _, d in get_alsa_usb_inputs()] - net = [d for _, d in get_network_pw_inputs()] - names = {d.get('name') for d in usb} | {d.get('name') for d in net} - if input_device_name in names: - # Build a minimal config based on saved fields - bigs = [ - auracast_config.AuracastBigConfig( - code=stream_password, - name=channel_names[0] if channel_names else "Broadcast0", - program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info, - language=languages[0] if languages else "deu", - audio_source=f"device:{input_device_name}", - # input_format is intentionally omitted to use the default - iso_que_len=1, - sampling_frequency=rate, - octets_per_frame=octets, - ) - ] - conf = auracast_config.AuracastConfigGroup( - auracast_sampling_rate_hz=rate, + # Check against the cached device lists + usb = [d for _, d in get_alsa_usb_inputs()] + net = [d for _, d in get_network_pw_inputs()] + names = {d.get('name') for d in usb} | {d.get('name') for d in net} + if input_device_name in names: + # Build a minimal config based on saved fields + bigs = [ + auracast_config.AuracastBigConfig( + code=stream_password, + name=channel_names[0] if channel_names else "Broadcast0", + program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info, + language=languages[0] if languages else "deu", + audio_source=f"device:{input_device_name}", + # input_format is intentionally omitted to use the default + iso_que_len=1, + sampling_frequency=rate, octets_per_frame=octets, - transport=TRANSPORT1, - immediate_rendering=immediate_rendering, - assisted_listening_stream=assisted_listening_stream, - presentation_delay_us=pres_delay if pres_delay is not None else 40000, - bigs=bigs, - ) - # Attach QoS if saved_rtn present - conf.qos_config = auracast_config.AuracastQoSConfig( - iso_int_multiple_10ms=1, - number_of_retransmissions=int(saved_rtn), - max_transport_latency_ms=int(saved_rtn) * 10 + 3, ) + ] + conf = auracast_config.AuracastConfigGroup( + auracast_sampling_rate_hz=rate, + octets_per_frame=octets, + transport=TRANSPORT1, + immediate_rendering=immediate_rendering, + assisted_listening_stream=assisted_listening_stream, + presentation_delay_us=pres_delay if pres_delay is not None else 40000, + bigs=bigs, + ) + # Attach QoS if saved_rtn present + conf.qos_config = auracast_config.AuracastQoSConfig( + iso_int_multiple_10ms=1, + number_of_retransmissions=int(saved_rtn), + max_transport_latency_ms=int(saved_rtn) * 10 + 3, + ) - # Initialize and start - await asyncio.sleep(2) - await initialize(conf) - return + # Initialize and start await asyncio.sleep(2) - except Exception: - log.warning("Autostart task failed", exc_info=True) + await initialize(conf) + return + await asyncio.sleep(2) @app.on_event("startup") async def _startup_autostart_event(): # Spawn the autostart task without blocking startup log.info("Refreshing PipeWire device cache.") # Hydrate settings cache once to avoid disk I/O during /status - _hydrate_settings_cache_from_disk() + _init_settings_cache_from_disk() refresh_pw_cache() - # Start the streamer worker thread - streamer.start() asyncio.create_task(_autostart_from_settings()) - @app.get("/audio_inputs_pw_usb") async def audio_inputs_pw_usb(): """List USB input devices using ALSA backend (USB is ALSA in our scheme).""" @@ -542,7 +498,6 @@ async def audio_inputs_pw_usb(): log.error("Exception in /audio_inputs_pw_usb: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - @app.get("/audio_inputs_pw_network") async def audio_inputs_pw_network(): """List PipeWire Network/AES67 input nodes from cache.""" @@ -556,40 +511,33 @@ async def audio_inputs_pw_network(): log.error("Exception in /audio_inputs_pw_network: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - @app.post("/refresh_audio_devices") async def refresh_audio_devices(): """Triggers a re-scan of audio devices, but only if no stream is active.""" - streaming = False try: - status = await streamer.call(streamer._w_status_primary) + status = await _status_primary() streaming = bool(status.get('is_streaming')) - except Exception: - pass # Ignore errors, default to not refreshing + except Exception as e: + log.error("Exception in /refresh_audio_devices: %s", traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) if streaming: log.warning("Ignoring refresh request: an audio stream is active.") raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.") - try: - log.info("Refreshing PipeWire device cache.") - refresh_pw_cache() - return {"status": "ok"} - except Exception as e: - log.error("Exception during device refresh: %s", traceback.format_exc()) - raise HTTPException(status_code=500, detail=f"Failed to refresh devices: {e}") - + log.info("Refreshing PipeWire device cache.") + refresh_pw_cache() + return {"status": "ok"} @app.post("/shutdown") async def shutdown(): """Stops broadcasting and releases all audio/Bluetooth resources.""" try: - await streamer.call(streamer._w_stop_all) + await _stop_all() return {"status": "stopped"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) - @app.post("/system_reboot") async def system_reboot(): """Stop audio and request a system reboot via sudo. @@ -599,13 +547,9 @@ async def system_reboot(): try: # Best-effort: stop any active streaming cleanly WITHOUT persisting state try: - try: - await streamer.call(streamer._w_stop_all) - except Exception: - pass + await _stop_all() except Exception: - log.warning("Non-fatal: failed to stop streams before reboot", exc_info=True) - + pass # Launch reboot without waiting for completion try: await asyncio.create_subprocess_exec("sudo", "reboot") diff --git a/src/service/auracast-server.service b/src/service/auracast-server.service index 23f8735..06395bf 100644 --- a/src/service/auracast-server.service +++ b/src/service/auracast-server.service @@ -9,6 +9,9 @@ ExecStart=/home/caster/.local/bin/poetry run python src/auracast/server/multicas Restart=on-failure Environment=PYTHONUNBUFFERED=1 Environment=LOG_LEVEL=INFO +CPUSchedulingPolicy=fifo +CPUSchedulingPriority=99 +LimitRTPRIO=99 [Install] WantedBy=default.target