From 125385a202dcc882b1b0995fb24d10a997bb44b0 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Tue, 30 Sep 2025 16:29:14 +0200 Subject: [PATCH] chore: improve audio device management and streaming stability --- .gitignore | 1 + src/auracast/multicast.py | 4 +- src/auracast/multicast_control.py | 4 +- src/auracast/multicast_script.py | 3 +- src/auracast/server/multicast_frontend.py | 34 +++--- src/auracast/server/multicast_server.py | 137 ++++++++-------------- src/auracast/utils/reset_utils.py | 13 ++ src/auracast/utils/sounddevice_utils.py | 99 +++++++--------- 8 files changed, 124 insertions(+), 171 deletions(-) diff --git a/.gitignore b/.gitignore index 68ac071..0cbe8da 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ records/DISABLE_FRONTEND_PW src/auracast/server/stream_settings.json src/auracast/server/certs/per_device/ src/auracast/.env +src/auracast/server/certs/ca/ca_cert.srl diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 9a00846..7e40dd2 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -313,8 +313,8 @@ async def init_broadcast( bigs[f'big{i}']['iso_queue'] = iso_queue - logging.debug(f'big{i} parameters are:') - logging.debug('%s', pprint.pformat(vars(big))) + logging.info(f'big{i} parameters are:') + logging.info('%s', pprint.pformat(vars(big))) logging.info(f'Finished setup of big{i}.') await asyncio.sleep(i+1) # Wait for advertising to set up diff --git a/src/auracast/multicast_control.py b/src/auracast/multicast_control.py index ac9b540..055ff77 100644 --- a/src/auracast/multicast_control.py +++ b/src/auracast/multicast_control.py @@ -56,8 +56,6 @@ class Multicaster: """Start streaming; if an old stream is running, stop it first to release audio devices.""" if self.streamer is not None: await self.stop_streaming() - # Brief pause to ensure ALSA/PortAudio fully releases the input device - await asyncio.sleep(0.5) self.streamer = multicast.Streamer(self.bigs, self.global_conf, self.big_conf) self.streamer.start_streaming() @@ -91,6 +89,8 @@ class Multicaster: for big in self.bigs.values(): if big.get('advertising_set'): await big['advertising_set'].stop() + # Explicitly power off the device to ensure a clean state before closing the transport + await self.device.power_off() await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 6b1ca16..b0eeb6e 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -146,11 +146,12 @@ if __name__ == "__main__": presentation_delay_us=40000, qos_config=auracast_config.AuracastQosHigh(), auracast_sampling_rate_hz = LC3_SRATE, - octets_per_frame = OCTETS_PER_FRAME, # 32kbps@16kHz + octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1 ) #config.debug = True + logging.info(config.model_dump_json(indent=2)) multicast.run_async( multicast.broadcast( config, diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index ce8f3c1..4f46cf9 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -99,6 +99,9 @@ try: except Exception: saved_settings = {} +# Define is_streaming early from the fetched status for use throughout the UI +is_streaming = bool(saved_settings.get("is_streaming", False)) + st.title("Auracast Audio Mode Control") # Audio mode selection with persisted default @@ -347,14 +350,11 @@ else: "No AES67/Network inputs found." ) st.warning(warn_text) - if st.button("Refresh"): - # For completeness, refresh the general audio cache as well + if st.button("Refresh", disabled=is_streaming): try: - r = requests.post(f"{BACKEND_URL}/refresh_audio_inputs", json={"force": True}, timeout=5) - if r.ok: - jr = r.json() - if jr.get('stopped_stream'): - st.info("An active stream was stopped to perform a full device refresh.") + r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8) + if not r.ok: + st.error(f"Failed to refresh: {r.text}") except Exception as e: st.error(f"Failed to refresh devices: {e}") st.rerun() @@ -368,13 +368,11 @@ else: index=input_options.index(default_input_label) if default_input_label in input_options else 0 ) with col2: - if st.button("Refresh"): + if st.button("Refresh", disabled=is_streaming): try: - r = requests.post(f"{BACKEND_URL}/refresh_audio_inputs", json={"force": True}, timeout=5) - if r.ok: - jr = r.json() - if jr.get('stopped_stream'): - st.info("An active stream was stopped to perform a full device refresh.") + r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8) + if not r.ok: + st.error(f"Failed to refresh: {r.text}") except Exception as e: st.error(f"Failed to refresh devices: {e}") st.rerun() @@ -392,12 +390,9 @@ else: # c_spacer intentionally left empty to push status to the far right with c_status: # Fetch current status from backend and render using Streamlit widgets (no HTML) - try: - status_resp = requests.get(f"{BACKEND_URL}/status", timeout=0.8) - status_json = status_resp.json() if status_resp.ok else {} - except Exception: - status_json = {} - is_streaming = bool(status_json.get("is_streaming", False)) + # The is_streaming variable is now defined at the top of the script. + # We only need to re-fetch here if we want the absolute latest status for the display, + # but for UI consistency, we can just use the value from the top of the script run. st.write("🟢 Streaming" if is_streaming else "🔴 Stopped") # If gain slider moved while streaming, send update to JS without restarting @@ -451,6 +446,7 @@ else: transport='', # is set in backend assisted_listening_stream=assisted_listening, immediate_rendering=immediate_rendering, + presentation_delay_us=40000, bigs = [ auracast_config.AuracastBigConfig( code=(stream_passwort.strip() or None), diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 6d0241b..e50f199 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -20,8 +20,9 @@ import sounddevice as sd # type: ignore from typing import Set import traceback from auracast.utils.sounddevice_utils import ( - list_usb_pw_inputs, - list_network_pw_inputs, + get_usb_pw_inputs, + get_network_pw_inputs, + refresh_pw_cache, ) from auracast.utils.reset_utils import reset_nrf54l @@ -35,6 +36,8 @@ TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # tr PTIME = 40 # seems to have no effect at all pcs: Set[RTCPeerConnection] = set() # keep refs so they don’t GC early +os.environ["PULSE_LATENCY_MSEC"] = "3" + class Offer(BaseModel): sdp: str type: str @@ -105,20 +108,17 @@ async def initialize(conf: auracast_config.AuracastConfigGroup): input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None # Determine if the device is a USB or Network(AES67) PipeWire input try: - usb_names = {d.get('name') for _, d in list_usb_pw_inputs(refresh=False)} - net_names = {d.get('name') for _, d in list_network_pw_inputs(refresh=False)} + usb_names = {d.get('name') for _, d in get_usb_pw_inputs()} + net_names = {d.get('name') for _, d in get_network_pw_inputs()} except Exception: usb_names, net_names = set(), set() if input_device_name in net_names: audio_mode_persist = 'AES67' - os.environ.setdefault("PULSE_LATENCY_MSEC", "6") else: audio_mode_persist = 'USB' - os.environ.setdefault("PULSE_LATENCY_MSEC", "3") # Map device name to current index for use with sounddevice device_index = get_device_index_by_name(input_device_name) if input_device_name else None - # Patch config to use index for sounddevice (but persist name) if device_index is not None: for big in conf.bigs: if big.audio_source.startswith('device:'): @@ -151,15 +151,10 @@ async def initialize(conf: auracast_config.AuracastConfigGroup): 'timestamp': datetime.utcnow().isoformat() }) global_config_group = conf - if multicaster1 is not None: - try: - await multicaster1.shutdown() - except Exception: - log.warning("Failed to shutdown previous multicaster", exc_info=True) log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2)) multicaster1 = multicast_control.Multicaster(conf, conf.bigs) # Ensure target is reset before initializing broadcast - await reset_nrf54l(0) + await reset_nrf54l(1) await multicaster1.init_broadcast() if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): log.info("Auto-starting streaming on multicaster1") @@ -192,7 +187,7 @@ async def initialize2(conf: auracast_config.AuracastConfigGroup): log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) multicaster2 = multicast_control.Multicaster(conf, conf.bigs) # Ensure target is reset before initializing broadcast - await reset_nrf54l(1) + await reset_nrf54l(0) await multicaster2.init_broadcast() if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): log.info("Auto-starting streaming on multicaster2") @@ -231,14 +226,15 @@ async def stop_audio(): await asyncio.gather(*close_tasks, return_exceptions=True) # Now shut down both multicasters and release audio devices + global multicaster1, multicaster2 running = False if multicaster1 is not None: - await multicaster1.stop_streaming() - await multicaster1.reset() # Fully reset controller and advertising + await multicaster1.shutdown() + multicaster1 = None running = True if multicaster2 is not None: - await multicaster2.stop_streaming() - await multicaster2.reset() # Fully reset controller and advertising + await multicaster2.shutdown() + multicaster2 = None running = True # Persist is_streaming=False @@ -291,16 +287,6 @@ async def _autostart_from_settings(): original_ts = settings.get('timestamp') previously_streaming = bool(settings.get('is_streaming')) - try: - usb_names = {d.get('name') for _, d in list_usb_pw_inputs(refresh=False)} - net_names = {d.get('name') for _, d in list_network_pw_inputs(refresh=False)} - except Exception: - usb_names, net_names = set(), set() - if input_device_name in net_names: - os.environ.setdefault("PULSE_LATENCY_MSEC", "6") - else: - os.environ.setdefault("PULSE_LATENCY_MSEC", "3") - # Only auto-start if the previous state was streaming and it's a device-based input. if not previously_streaming: return @@ -328,9 +314,9 @@ async def _autostart_from_settings(): current_settings.get('audio_mode') != audio_mode ): return - # Avoid refreshing PortAudio while we poll - usb = [d for _, d in list_usb_pw_inputs(refresh=False)] - net = [d for _, d in list_network_pw_inputs(refresh=False)] + # Check against the cached device lists + usb = [d for _, d in get_usb_pw_inputs()] + net = [d for _, d in get_network_pw_inputs()] names = {d.get('name') for d in usb} | {d.get('name') for d in net} if input_device_name in names: # Build a minimal config based on saved fields @@ -341,7 +327,7 @@ async def _autostart_from_settings(): program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info, language=languages[0] if languages else "deu", audio_source=f"device:{input_device_name}", - input_format=f"int16le,{rate},1", + # input_format is intentionally omitted to use the default iso_que_len=1, sampling_frequency=rate, octets_per_frame=octets, @@ -368,56 +354,14 @@ async def _startup_autostart_event(): # Spawn the autostart task without blocking startup asyncio.create_task(_autostart_from_settings()) - -@app.post("/refresh_audio_inputs") -async def refresh_audio_inputs(force: bool = False): - """Triggers a re-scan of audio devices. - - If force is True and a stream is active, the stream(s) will be stopped to allow - a full re-initialization of the sounddevice backend. The response will include - 'stopped_stream': True if any running stream was stopped. - """ - stopped = False - if force: - try: - # Stop active streams before forcing sounddevice re-init - if multicaster1 is not None and multicaster1.get_status().get('is_streaming'): - await multicaster1.stop_streaming() - stopped = True - if multicaster2 is not None and multicaster2.get_status().get('is_streaming'): - await multicaster2.stop_streaming() - stopped = True - except Exception: - log.warning("Failed to stop stream(s) before force refresh", exc_info=True) - # Reinitialize sounddevice backend if requested - try: - if sys.platform == 'linux' and force: - log.info("Force re-initializing sounddevice backend") - sd._terminate() - sd._initialize() - except Exception: - log.error("Exception while force-refreshing audio devices:", exc_info=True) - return {"status": "ok", "inputs": [], "stopped_stream": stopped} - - + @app.get("/audio_inputs_pw_usb") async def audio_inputs_pw_usb(): - """List PipeWire USB input nodes mapped to sounddevice indices. - - Returns a list of dicts: [{id, name, max_input_channels}]. - """ + """List PipeWire USB input nodes from cache.""" try: - # Do not refresh PortAudio if we are currently streaming to avoid termination - streaming = False - try: - if multicaster1 is not None: - status = multicaster1.get_status() - streaming = bool(status.get('is_streaming')) - except Exception: - streaming = False devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} - for idx, dev in list_usb_pw_inputs(refresh=not streaming) + for idx, dev in get_usb_pw_inputs() ] return {"inputs": devices} except Exception as e: @@ -427,22 +371,11 @@ async def audio_inputs_pw_usb(): @app.get("/audio_inputs_pw_network") async def audio_inputs_pw_network(): - """List PipeWire Network/AES67 input nodes mapped to sounddevice indices. - - Returns a list of dicts: [{id, name, max_input_channels}]. - """ + """List PipeWire Network/AES67 input nodes from cache.""" try: - # Do not refresh PortAudio if we are currently streaming to avoid termination - streaming = False - try: - if multicaster1 is not None: - status = multicaster1.get_status() - streaming = bool(status.get('is_streaming')) - except Exception: - streaming = False devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} - for idx, dev in list_network_pw_inputs(refresh=not streaming) + for idx, dev in get_network_pw_inputs() ] return {"inputs": devices} except Exception as e: @@ -450,6 +383,30 @@ async def audio_inputs_pw_network(): raise HTTPException(status_code=500, detail=str(e)) +@app.post("/refresh_audio_devices") +async def refresh_audio_devices(): + """Triggers a re-scan of audio devices, but only if no stream is active.""" + streaming = False + try: + if multicaster1 is not None: + status = multicaster1.get_status() + streaming = bool(status.get('is_streaming')) + except Exception: + pass # Ignore errors, default to not refreshing + + if streaming: + log.warning("Ignoring refresh request: an audio stream is active.") + raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.") + + try: + log.info("Refreshing PipeWire device cache.") + refresh_pw_cache() + return {"status": "ok"} + except Exception as e: + log.error("Exception during device refresh: %s", traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Failed to refresh devices: {e}") + + @app.post("/offer") async def offer(offer: Offer): log.info("/offer endpoint called") diff --git a/src/auracast/utils/reset_utils.py b/src/auracast/utils/reset_utils.py index 54bb659..74f549b 100644 --- a/src/auracast/utils/reset_utils.py +++ b/src/auracast/utils/reset_utils.py @@ -62,3 +62,16 @@ async def reset_nrf54l(slot: int = 0, timeout: float = 8.0): log.error("reset_nrf54l: openocd not found; skipping reset") except Exception: log.error("reset_nrf54l failed", exc_info=True) + + +if __name__ == '__main__': + # Basic logging setup + log.basicConfig( + level=os.environ.get('LOG_LEVEL', log.INFO), + format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + slot_to_reset = 1 + log.info(f"Executing reset for slot {slot_to_reset}") + asyncio.run(reset_nrf54l(slot=slot_to_reset)) diff --git a/src/auracast/utils/sounddevice_utils.py b/src/auracast/utils/sounddevice_utils.py index 35dbea0..7db59a4 100644 --- a/src/auracast/utils/sounddevice_utils.py +++ b/src/auracast/utils/sounddevice_utils.py @@ -49,64 +49,40 @@ def _sd_matches_from_names(pa_idx, names): out.append((i, d)) return out -def list_usb_pw_inputs(refresh: bool = True): - """ - Return [(device_index, device_dict), ...] for PipeWire **input** nodes - backed by **USB** devices (excludes monitor sources). +# Module-level caches for device lists +_usb_inputs_cache = [] +_network_inputs_cache = [] - Parameters: - - refresh (bool): If True (default), force PortAudio to re-enumerate devices - before mapping. Set to False to avoid disrupting active streams. +def get_usb_pw_inputs(): + """Return cached list of USB PipeWire inputs.""" + return _usb_inputs_cache + +def get_network_pw_inputs(): + """Return cached list of Network/AES67 PipeWire inputs.""" + return _network_inputs_cache + +def refresh_pw_cache(): """ - # Refresh PortAudio so we see newly added nodes before mapping - if refresh: - _sd_refresh() + Performs a full device scan and updates the internal caches for both USB + and Network audio devices. This is a heavy operation and should not be + called frequently or during active streams. + """ + global _usb_inputs_cache, _network_inputs_cache + + # Force PortAudio to re-enumerate devices + _sd_refresh() pa_idx = _pa_like_hostapi_index() pw = _pw_dump() - # Map device.id -> device.bus ("usb"/"pci"/"platform"/"network"/...) + # --- Pass 1: Map device.id to device.bus --- device_bus = {} for obj in pw: if obj.get("type") == "PipeWire:Interface:Device": props = (obj.get("info") or {}).get("props") or {} device_bus[obj["id"]] = (props.get("device.bus") or "").lower() - # Collect names/descriptions of USB input nodes + # --- Pass 2: Identify all USB and Network nodes --- usb_input_names = set() - for obj in pw: - if obj.get("type") != "PipeWire:Interface:Node": - continue - props = (obj.get("info") or {}).get("props") or {} - media = (props.get("media.class") or "").lower() - if "source" not in media and "stream/input" not in media: - continue - # skip monitor sources ("Monitor of ..." or *.monitor) - nname = (props.get("node.name") or "").lower() - ndesc = (props.get("node.description") or "").lower() - if ".monitor" in nname or "monitor" in ndesc: - continue - bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower() - if bus == "usb": - usb_input_names.add(props.get("node.description") or props.get("node.name")) - - # Map to sounddevice devices on PipeWire host API - return _sd_matches_from_names(pa_idx, usb_input_names) - -def list_network_pw_inputs(refresh: bool = True): - """ - Return [(device_index, device_dict), ...] for PipeWire **input** nodes that - look like network/AES67/RTP sources (excludes monitor sources). - - Parameters: - - refresh (bool): If True (default), force PortAudio to re-enumerate devices - before mapping. Set to False to avoid disrupting active streams. - """ - # Refresh PortAudio so we see newly added nodes before mapping - if refresh: - _sd_refresh() - pa_idx = _pa_like_hostapi_index() - pw = _pw_dump() - network_input_names = set() for obj in pw: if obj.get("type") != "PipeWire:Interface:Node": @@ -115,26 +91,29 @@ def list_network_pw_inputs(refresh: bool = True): media = (props.get("media.class") or "").lower() if "source" not in media and "stream/input" not in media: continue + nname = (props.get("node.name") or "") ndesc = (props.get("node.description") or "") - # skip monitor sources + # Skip all monitor sources if ".monitor" in nname.lower() or "monitor" in ndesc.lower(): continue - # Heuristics for network/AES67/RTP + # Check for USB + bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower() + if bus == "usb": + usb_input_names.add(ndesc or nname) + continue # A device is either USB or Network, not both + + # Heuristics for Network/AES67/RTP text = (nname + " " + ndesc).lower() media_name = (props.get("media.name") or "").lower() node_group = (props.get("node.group") or "").lower() - # Presence flags/keys that strongly indicate network RTP/AES67 sources node_network_flag = bool(props.get("node.network")) - has_rtp_keys = any(k in props for k in ( - "rtp.session", "rtp.source.ip", "rtp.source.port", "rtp.fmtp", "rtp.rate" - )) - has_sess_keys = any(k in props for k in ( - "sess.name", "sess.media", "sess.latency.msec" - )) + has_rtp_keys = any(k in props for k in ("rtp.session", "rtp.source.ip")) + has_sess_keys = any(k in props for k in ("sess.name", "sess.media")) + is_network = ( - (props.get("device.bus") or "").lower() == "network" or + bus == "network" or node_network_flag or "rtp" in media_name or any(k in text for k in ("rtp", "sap", "aes67", "network", "raop", "airplay")) or @@ -145,7 +124,13 @@ def list_network_pw_inputs(refresh: bool = True): if is_network: network_input_names.add(ndesc or nname) - return _sd_matches_from_names(pa_idx, network_input_names) + # --- Final Step: Update caches --- + _usb_inputs_cache = _sd_matches_from_names(pa_idx, usb_input_names) + _network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names) + + +# Populate cache on initial module load +refresh_pw_cache() # Example usage: # for i, d in list_usb_pw_inputs():