diff --git a/.gitignore b/.gitignore index 68ac071..0cbe8da 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ records/DISABLE_FRONTEND_PW src/auracast/server/stream_settings.json src/auracast/server/certs/per_device/ src/auracast/.env +src/auracast/server/certs/ca/ca_cert.srl diff --git a/README.md b/README.md index fbe9dc3..e4f5deb 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,8 @@ stty -F /dev/ttyAMA3 -a | grep -o 'hupcl' || echo "-hupcl is set" # Audio latency if there is hearable audio error with aes67, tune sess.latency.msec in pipewire-aes67.conf +if latency is piling up something may be blocking the event loop in multicast_server.py - the event loop must never block at any time + --- After completing these steps, your device will be discoverable as `.` (e.g., `box1.auracast.local`) on the local network via mDNS. diff --git a/poetry.lock b/poetry.lock index 70213c8..dba8f46 100644 --- a/poetry.lock +++ b/poetry.lock @@ -332,7 +332,7 @@ files = [ [[package]] name = "bumble" -version = "0.0.216.dev1+g6eba81e3d" +version = "0.0.218.dev6+g32d448edf" description = "Bluetooth Stack for Apps, Emulation, Test and Experimentation" optional = false python-versions = ">=3.9" @@ -371,8 +371,8 @@ test = ["coverage (>=6.4)", "pytest (>=8.2)", "pytest-asyncio (>=0.23.5)", "pyte [package.source] type = "git" url = "ssh://git@ssh.pstruebi.xyz:222/auracaster/bumble_mirror.git" -reference = "6eba81e3ddb8ac0e4c336ca244892a0a8d43ba1c" -resolved_reference = "6eba81e3ddb8ac0e4c336ca244892a0a8d43ba1c" +reference = "32d448edf3276f6b9056765a12879054d8a01fd8" +resolved_reference = "32d448edf3276f6b9056765a12879054d8a01fd8" [[package]] name = "cachetools" @@ -2952,4 +2952,4 @@ test = ["pytest", "pytest-asyncio"] [metadata] lock-version = "2.1" python-versions = ">=3.11" -content-hash = "3afe565be2664b3d7f1cfdb1c5a73d931c14e97d3622aef24ba2f06f78e00e2b" +content-hash = "6b5300c349ed045e8fd3e617e6262bbd7e5c48c518e4c62cedf7c17da50ce8c0" diff --git a/pyproject.toml b/pyproject.toml index dd44d25..799f3ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ version = "0.0.1" requires-python = ">=3.11" dependencies = [ - "bumble @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/bumble_mirror.git@6eba81e3ddb8ac0e4c336ca244892a0a8d43ba1c", + "bumble @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/bumble_mirror.git@32d448edf3276f6b9056765a12879054d8a01fd8", "lc3py @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/liblc3.git@ce2e41faf8c06d038df9f32504c61109a14130be", "aioconsole", "fastapi==0.115.11", diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index 9a00846..93792e5 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -313,8 +313,8 @@ async def init_broadcast( bigs[f'big{i}']['iso_queue'] = iso_queue - logging.debug(f'big{i} parameters are:') - logging.debug('%s', pprint.pformat(vars(big))) + logging.info(f'big{i} parameters are:') + logging.info('%s', pprint.pformat(vars(big))) logging.info(f'Finished setup of big{i}.') await asyncio.sleep(i+1) # Wait for advertising to set up @@ -375,18 +375,51 @@ class Streamer(): if self.task is not None: self.task.cancel() + # Let cancellation propagate to the stream() coroutine + await asyncio.sleep(0.01) + self.task = None # Close audio inputs (await to ensure ALSA devices are released) - close_tasks = [] + async_closers = [] + sync_closers = [] for big in self.bigs.values(): ai = big.get("audio_input") - if ai and hasattr(ai, "close"): - close_tasks.append(ai.close()) - # Remove reference so a fresh one is created next time - big.pop("audio_input", None) - if close_tasks: - await asyncio.gather(*close_tasks, return_exceptions=True) + if not ai: + continue + # First close any frames generator backed by the input to stop reads + frames_gen = big.get("frames_gen") + if frames_gen and hasattr(frames_gen, "aclose"): + try: + await frames_gen.aclose() + except Exception: + pass + big.pop("frames_gen", None) + if hasattr(ai, "aclose") and callable(getattr(ai, "aclose")): + async_closers.append(ai.aclose()) + elif hasattr(ai, "close") and callable(getattr(ai, "close")): + sync_closers.append(ai.close) + # Remove reference so a fresh one is created next time + big.pop("audio_input", None) + + if async_closers: + await asyncio.gather(*async_closers, return_exceptions=True) + for fn in sync_closers: + try: + fn() + except Exception: + pass + + # Reset PortAudio to drop lingering PipeWire capture nodes + try: + import sounddevice as _sd + if hasattr(_sd, "_terminate"): + _sd._terminate() + await asyncio.sleep(0.05) + if hasattr(_sd, "_initialize"): + _sd._initialize() + except Exception: + pass async def stream(self): @@ -414,6 +447,8 @@ class Streamer(): big['audio_input'] = audio_source big['encoder'] = encoder big['precoded'] = False + # Prepare frames generator for graceful shutdown + big['frames_gen'] = big['audio_input'].frames(lc3_frame_samples) elif audio_source == 'webrtc': big['audio_input'] = WebRTCAudioInput() @@ -429,6 +464,8 @@ class Streamer(): big['lc3_bytes_per_frame'] = global_config.octets_per_frame big['encoder'] = encoder big['precoded'] = False + # Prepare frames generator for graceful shutdown + big['frames_gen'] = big['audio_input'].frames(lc3_frame_samples) # precoded lc3 from ram elif isinstance(big_config[i].audio_source, bytes): @@ -599,7 +636,12 @@ class Streamer(): stream_finished[i] = True continue else: # code lc3 on the fly - pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None) + # Use stored frames generator when available so we can aclose() it on stop + frames_gen = big.get('frames_gen') + if frames_gen is None: + frames_gen = big['audio_input'].frames(big['lc3_frame_samples']) + big['frames_gen'] = frames_gen + pcm_frame = await anext(frames_gen, None) if pcm_frame is None: # Not all streams may stop at the same time stream_finished[i] = True diff --git a/src/auracast/multicast_control.py b/src/auracast/multicast_control.py index ac9b540..f651511 100644 --- a/src/auracast/multicast_control.py +++ b/src/auracast/multicast_control.py @@ -91,6 +91,8 @@ class Multicaster: for big in self.bigs.values(): if big.get('advertising_set'): await big['advertising_set'].stop() + # Explicitly power off the device to ensure a clean state before closing the transport + await self.device.power_off() await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown diff --git a/src/auracast/multicast_script.py b/src/auracast/multicast_script.py index 6b1ca16..b0eeb6e 100644 --- a/src/auracast/multicast_script.py +++ b/src/auracast/multicast_script.py @@ -146,11 +146,12 @@ if __name__ == "__main__": presentation_delay_us=40000, qos_config=auracast_config.AuracastQosHigh(), auracast_sampling_rate_hz = LC3_SRATE, - octets_per_frame = OCTETS_PER_FRAME, # 32kbps@16kHz + octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1 ) #config.debug = True + logging.info(config.model_dump_json(indent=2)) multicast.run_async( multicast.broadcast( config, diff --git a/src/auracast/server/certs/ca/ca_cert.srl b/src/auracast/server/certs/ca/ca_cert.srl index a29e13e..2a39b85 100644 --- a/src/auracast/server/certs/ca/ca_cert.srl +++ b/src/auracast/server/certs/ca/ca_cert.srl @@ -1 +1 @@ -5078804E6FBCF893D5537715FD928E46AD576ECA +5078804E6FBCF893D5537715FD928E46AD576ECB diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index e63fc25..730b733 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -99,11 +99,19 @@ try: except Exception: saved_settings = {} +# Define is_streaming early from the fetched status for use throughout the UI +is_streaming = bool(saved_settings.get("is_streaming", False)) + st.title("Auracast Audio Mode Control") # Audio mode selection with persisted default # Note: backend persists 'USB' for any device: source (including AES67). We default to 'USB' in that case. -options = ["Demo", "USB", "AES67", "Webapp"] +options = [ + "Demo", + "USB", + "AES67", + # "Webapp" + ] saved_audio_mode = saved_settings.get("audio_mode", "Demo") if saved_audio_mode not in options: # Map legacy/unknown modes to closest @@ -146,7 +154,7 @@ if audio_mode == "Demo": type=("password"), help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast." ) - col_flags1, col_flags2, col_placeholder = st.columns([1, 1, 2]) + col_flags1, col_flags2, col_pdelay, col_rtn = st.columns([1, 1, 1, 1], gap="small") with col_flags1: assisted_listening = st.checkbox( "Assistive listening", @@ -157,15 +165,29 @@ if audio_mode == "Demo": "Immediate rendering", value=bool(saved_settings.get('immediate_rendering', False)) ) + # QoS/presentation controls inline with flags + default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000) + with col_pdelay: + presentation_delay_us = st.number_input( + "Presentation delay (µs)", + min_value=10000, max_value=200000, step=1000, value=default_pdelay, + help="Delay between capture and presentation for receivers." + ) + default_rtn = int(saved_settings.get('rtn', 4) or 4) + with col_rtn: + rtn = st.selectbox( + "Retransmissions (RTN)", options=[0,1,2,3,4], index=[0,1,2,3,4].index(default_rtn), + help="Number of ISO retransmissions (higher improves robustness at cost of airtime)." + ) #st.info(f"Demo mode selected: {demo_selected} (Streams: {demo_stream_map[demo_selected]['streams']}, Rate: {demo_stream_map[demo_selected]['rate']} Hz)") # Start/Stop buttons for demo mode if 'demo_stream_started' not in st.session_state: st.session_state['demo_stream_started'] = False col1, col2 = st.columns(2) with col1: - start_demo = st.button("Start Demo Stream") + start_demo = st.button("Start Demo Stream", disabled=is_streaming) with col2: - stop_demo = st.button("Stop Demo Stream") + stop_demo = st.button("Stop Demo Stream", disabled=not is_streaming) if start_demo: # Always stop any running stream for clean state try: @@ -207,9 +229,15 @@ if audio_mode == "Demo": config1 = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=q['rate'], octets_per_frame=q['octets'], - transport='', # is set in backend + transport='', # is set in baccol_qoskend assisted_listening_stream=assisted_listening, immediate_rendering=immediate_rendering, + presentation_delay_us=presentation_delay_us, + qos_config=auracast_config.AuracastQoSConfig( + iso_int_multiple_10ms=1, + number_of_retransmissions=int(rtn), + max_transport_latency_ms=int(rtn)*10 + 3, + ), bigs=bigs1 ) config2 = None @@ -220,6 +248,12 @@ if audio_mode == "Demo": transport='', # is set in backend assisted_listening_stream=assisted_listening, immediate_rendering=immediate_rendering, + presentation_delay_us=presentation_delay_us, + qos_config=auracast_config.AuracastQoSConfig( + iso_int_multiple_10ms=1, + number_of_retransmissions=int(rtn), + max_transport_latency_ms=int(rtn)*10 + 3, + ), bigs=bigs2 ) # Call /init and /init2 @@ -298,8 +332,8 @@ else: type="password", help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast." ) - # Flags: Assistive Listening and Immediate Rendering (one row) - col_flags1, col_flags2, col_placeholder = st.columns([1, 1, 2]) + # Flags and QoS row (compact, four columns) + col_flags1, col_flags2, col_pdelay, col_rtn = st.columns([1, 1, 1, 1], gap="small") with col_flags1: assisted_listening = st.checkbox( "Assistive listening", @@ -310,6 +344,20 @@ else: "Immediate rendering", value=bool(saved_settings.get('immediate_rendering', False)) ) + # QoS/presentation controls inline with flags + default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000) + with col_pdelay: + presentation_delay_us = st.number_input( + "Presentation delay (µs)", + min_value=10000, max_value=200000, step=1000, value=default_pdelay, + help="Delay between capture and presentation for receivers." + ) + default_rtn = int(saved_settings.get('rtn', 4) or 4) + with col_rtn: + rtn = st.selectbox( + "Retransmissions (RTN)", options=[0,1,2,3,4], index=[0,1,2,3,4].index(default_rtn), + help="Number of ISO retransmissions (higher improves robustness at cost of airtime)." + ) # Gain slider for Webapp mode if audio_mode == "Webapp": mic_gain = st.slider("Microphone Gain", 0.0, 2.0, 1.0, 0.1, help="Adjust microphone volume sent to Auracast") @@ -318,86 +366,91 @@ else: # Input device selection for USB or AES67 mode if audio_mode in ("USB", "AES67"): - try: - endpoint = "/audio_inputs_pw_usb" if audio_mode == "USB" else "/audio_inputs_pw_network" - resp = requests.get(f"{BACKEND_URL}{endpoint}") - device_list = resp.json().get('inputs', []) - except Exception as e: - st.error(f"Failed to fetch devices: {e}") - device_list = [] + if not is_streaming: + # Only query device lists when NOT streaming to avoid extra backend calls + try: + endpoint = "/audio_inputs_pw_usb" if audio_mode == "USB" else "/audio_inputs_pw_network" + resp = requests.get(f"{BACKEND_URL}{endpoint}") + device_list = resp.json().get('inputs', []) + except Exception as e: + st.error(f"Failed to fetch devices: {e}") + device_list = [] - # Display "name [id]" but use name as value - input_options = [f"{d['name']} [{d['id']}]" for d in device_list] - option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list} - device_names = [d['name'] for d in device_list] + # Display "name [id]" but use name as value + input_options = [f"{d['name']} [{d['id']}]" for d in device_list] + option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list} + device_names = [d['name'] for d in device_list] - # Determine default input by name (from persisted server state) - default_input_name = saved_settings.get('input_device') - if default_input_name not in device_names and device_names: - default_input_name = device_names[0] - default_input_label = None - for label, name in option_name_map.items(): - if name == default_input_name: - default_input_label = label - break - if not input_options: - warn_text = ( - "No USB audio input devices found. Connect a USB input and click Refresh." - if audio_mode == "USB" else - "No AES67/Network inputs found." - ) - st.warning(warn_text) - if st.button("Refresh"): - # For completeness, refresh the general audio cache as well - try: - r = requests.post(f"{BACKEND_URL}/refresh_audio_inputs", json={"force": True}, timeout=5) - if r.ok: - jr = r.json() - if jr.get('stopped_stream'): - st.info("An active stream was stopped to perform a full device refresh.") - except Exception as e: - st.error(f"Failed to refresh devices: {e}") - st.rerun() - input_device = None - else: - col1, col2 = st.columns([3, 1], vertical_alignment="bottom") - with col1: - selected_option = st.selectbox( - "Input Device", - input_options, - index=input_options.index(default_input_label) if default_input_label in input_options else 0 + # Determine default input by name (from persisted server state) + default_input_name = saved_settings.get('input_device') + if default_input_name not in device_names and device_names: + default_input_name = device_names[0] + default_input_label = None + for label, name in option_name_map.items(): + if name == default_input_name: + default_input_label = label + break + if not input_options: + warn_text = ( + "No USB audio input devices found. Connect a USB input and click Refresh." + if audio_mode == "USB" else + "No AES67/Network inputs found." ) - with col2: - if st.button("Refresh"): + st.warning(warn_text) + if st.button("Refresh", disabled=is_streaming): try: - r = requests.post(f"{BACKEND_URL}/refresh_audio_inputs", json={"force": True}, timeout=5) - if r.ok: - jr = r.json() - if jr.get('stopped_stream'): - st.info("An active stream was stopped to perform a full device refresh.") + r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8) + if not r.ok: + st.error(f"Failed to refresh: {r.text}") except Exception as e: st.error(f"Failed to refresh devices: {e}") st.rerun() - # Send only the device name to backend - input_device = option_name_map.get(selected_option) + input_device = None + else: + col1, col2 = st.columns([3, 1], vertical_alignment="bottom") + with col1: + selected_option = st.selectbox( + "Input Device", + input_options, + index=input_options.index(default_input_label) if default_input_label in input_options else 0 + ) + with col2: + if st.button("Refresh", disabled=is_streaming): + try: + r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8) + if not r.ok: + st.error(f"Failed to refresh: {r.text}") + except Exception as e: + st.error(f"Failed to refresh devices: {e}") + st.rerun() + # Send only the device name to backend + input_device = option_name_map.get(selected_option) + else: + # When streaming, keep showing the current selection but lock editing. + input_device = saved_settings.get('input_device') + current_label = input_device or "No device selected" + st.selectbox( + "Input Device", + [current_label], + index=0, + disabled=True, + help="Stop the stream to change the input device." + ) else: input_device = None # Buttons and status on a single row (4 columns: start, stop, spacer, status) c_start, c_stop, c_spacer, c_status = st.columns([1, 1, 1, 2], gap="small", vertical_alignment="center") with c_start: - start_stream = st.button("Start Auracast") + start_stream = st.button("Start Auracast", disabled=is_streaming) with c_stop: - stop_stream = st.button("Stop Auracast") + stop_stream = st.button("Stop Auracast", disabled=not is_streaming) # c_spacer intentionally left empty to push status to the far right with c_status: # Fetch current status from backend and render using Streamlit widgets (no HTML) - try: - status_resp = requests.get(f"{BACKEND_URL}/status", timeout=0.8) - status_json = status_resp.json() if status_resp.ok else {} - except Exception: - status_json = {} - is_streaming = bool(status_json.get("is_streaming", False)) + # The is_streaming variable is now defined at the top of the script. + # We only need to re-fetch here if we want the absolute latest status for the display, + # but for UI consistency, we can just use the value from the top of the script run. st.write("🟢 Streaming" if is_streaming else "🔴 Stopped") # If gain slider moved while streaming, send update to JS without restarting @@ -438,8 +491,8 @@ else: if start_stream: # Always send stop to ensure backend is in a clean state, regardless of current status r = requests.post(f"{BACKEND_URL}/stop_audio").json() - if r['was_running']: - st.success("Stream Stopped!") + #if r['was_running']: + # st.success("Stream Stopped!") # Small pause lets backend fully release audio devices before re-init time.sleep(1) @@ -451,6 +504,12 @@ else: transport='', # is set in backend assisted_listening_stream=assisted_listening, immediate_rendering=immediate_rendering, + presentation_delay_us=presentation_delay_us, + qos_config=auracast_config.AuracastQoSConfig( + iso_int_multiple_10ms=1, + number_of_retransmissions=int(rtn), + max_transport_latency_ms=int(rtn)*10 + 3, + ), bigs = [ auracast_config.AuracastBigConfig( code=(stream_passwort.strip() or None), @@ -551,11 +610,12 @@ else: ############################ # System expander (collapsed) ############################ -with st.expander("System", expanded=False): +with st.expander("System control", expanded=False): + + st.subheader("Change password") if is_pw_disabled(): st.info("Frontend password protection is disabled via DISABLE_FRONTEND_PW.") else: - st.subheader("Change password") with st.form("change_pw_form"): cur = st.text_input("Current password", type="password") new1 = st.text_input("New password", type="password") @@ -577,6 +637,17 @@ with st.expander("System", expanded=False): except Exception as e: st.error(f"Failed to update password: {e}") + st.subheader("Reboot") + if st.button("Reboot now", type="primary"): + try: + r = requests.post(f"{BACKEND_URL}/system_reboot", timeout=1) + if r.ok: + st.success("Reboot initiated. The UI will become unreachable shortly.") + else: + st.error(f"Failed to reboot: {r.status_code} {r.text}") + except Exception as e: + st.error(f"Error calling reboot: {e}") + log.basicConfig( level=os.environ.get('LOG_LEVEL', log.DEBUG), format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 434859a..a2b1b88 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -1,10 +1,16 @@ -import glob +""" the main server where our multicaster objects live. +TODO: in the future the multicaster objects should run in their own threads or even make a second server since everything thats blocking the main event loop leads to inceased latency. + +""" import os import logging as log import uuid import json import sys +import threading +from concurrent.futures import Future from datetime import datetime +import time import asyncio import numpy as np from dotenv import load_dotenv @@ -20,9 +26,11 @@ import sounddevice as sd # type: ignore from typing import Set import traceback from auracast.utils.sounddevice_utils import ( - list_usb_pw_inputs, - list_network_pw_inputs, + get_usb_pw_inputs, + get_network_pw_inputs, + refresh_pw_cache, ) +from auracast.utils.reset_utils import reset_nrf54l load_dotenv() # make sure pipewire sets latency @@ -34,6 +42,11 @@ TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # tr PTIME = 40 # seems to have no effect at all pcs: Set[RTCPeerConnection] = set() # keep refs so they don’t GC early +os.environ["PULSE_LATENCY_MSEC"] = "3" + +# In-memory cache to avoid disk I/O on hot paths like /status +SETTINGS_CACHE: dict = {} + class Offer(BaseModel): sdp: str type: str @@ -53,21 +66,37 @@ def get_device_index_by_name(name: str): return None -def load_stream_settings() -> dict: - """Load persisted stream settings if available.""" - if os.path.exists(STREAM_SETTINGS_FILE): - try: +def _hydrate_settings_cache_from_disk() -> None: + """Populate SETTINGS_CACHE once from disk at startup. + + Safe to call multiple times; errors fall back to empty dict. + """ + global SETTINGS_CACHE + try: + if os.path.exists(STREAM_SETTINGS_FILE): with open(STREAM_SETTINGS_FILE, 'r', encoding='utf-8') as f: - return json.load(f) - except Exception: - return {} - return {} + SETTINGS_CACHE = json.load(f) + else: + SETTINGS_CACHE = {} + except Exception: + SETTINGS_CACHE = {} + +def load_stream_settings() -> dict: + """Return stream settings from in-memory cache. + + The cache is hydrated once at startup and updated by save_stream_settings(). + No disk I/O occurs here. + """ + global SETTINGS_CACHE + return SETTINGS_CACHE def save_stream_settings(settings: dict): - """Save stream settings to disk.""" + """Update in-memory settings cache and persist to disk.""" + global SETTINGS_CACHE + SETTINGS_CACHE = dict(settings) try: with open(STREAM_SETTINGS_FILE, 'w', encoding='utf-8') as f: - json.dump(settings, f, indent=2) + json.dump(SETTINGS_CACHE, f, indent=2) except Exception as e: log.error('Unable to persist stream settings: %s', e) @@ -86,55 +115,106 @@ app.add_middleware( # Initialize global configuration global_config_group = auracast_config.AuracastConfigGroup() -# Create multicast controller -multicaster1: multicast_control.Multicaster | None = None -multicaster2: multicast_control.Multicaster | None = None +class StreamerWorker: + """Owns multicaster(s) on a dedicated asyncio loop in a background thread.""" -@app.post("/init") -async def initialize(conf: auracast_config.AuracastConfigGroup): - """Initializes the primary broadcaster (multicaster1).""" - global global_config_group - global multicaster1 - try: + def __init__(self) -> None: + self._thread: threading.Thread | None = None + self._loop: asyncio.AbstractEventLoop | None = None + # These live only on the worker loop + self._multicaster1: multicast_control.Multicaster | None = None + self._multicaster2: multicast_control.Multicaster | None = None + self._started = threading.Event() + + # ---------- Thread/loop management ---------- + def start(self) -> None: + if self._thread and self._thread.is_alive(): + return + self._thread = threading.Thread(target=self._run, name="StreamerWorker", daemon=True) + self._thread.start() + self._started.wait(timeout=5) + + def _run(self) -> None: + loop = asyncio.new_event_loop() + self._loop = loop + asyncio.set_event_loop(loop) + self._started.set() + try: + loop.run_forever() + finally: + try: + pending = asyncio.all_tasks(loop) + for t in pending: + t.cancel() + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + except Exception: + pass + loop.close() + + def _ensure_loop(self) -> asyncio.AbstractEventLoop: + if not self._loop: + raise RuntimeError("StreamerWorker loop not started") + return self._loop + + async def call(self, coro_func, *args, **kwargs): + """Schedule a coroutine on the worker loop and await its result from the API loop.""" + loop = self._ensure_loop() + fut: Future = asyncio.run_coroutine_threadsafe(coro_func(*args, **kwargs), loop) + return await asyncio.wrap_future(fut) + + # ---------- Worker-loop coroutines ---------- + async def _w_init_primary(self, conf: auracast_config.AuracastConfigGroup) -> dict: + # Clean any previous + if self._multicaster1 is not None: + try: + await self._multicaster1.shutdown() + except Exception: + pass + self._multicaster1 = None conf.transport = TRANSPORT1 - # Derive audio_mode and input_device from first BIG audio_source + # Derive device name and input mode first_source = conf.bigs[0].audio_source if conf.bigs else '' + input_device_name = None + audio_mode_persist = 'Demo' if first_source.startswith('device:'): input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None - # Determine if the device is a USB or Network(AES67) PipeWire input try: - usb_names = {d.get('name') for _, d in list_usb_pw_inputs(refresh=False)} - net_names = {d.get('name') for _, d in list_network_pw_inputs(refresh=False)} + usb_names = {d.get('name') for _, d in get_usb_pw_inputs()} + net_names = {d.get('name') for _, d in get_network_pw_inputs()} except Exception: usb_names, net_names = set(), set() - if input_device_name in net_names: - audio_mode_persist = 'AES67' - os.environ.setdefault("PULSE_LATENCY_MSEC", "6") - else: - audio_mode_persist = 'USB' - os.environ.setdefault("PULSE_LATENCY_MSEC", "3") + audio_mode_persist = 'AES67' if (input_device_name in net_names) else 'USB' - # Map device name to current index for use with sounddevice - device_index = get_device_index_by_name(input_device_name) if input_device_name else None - # Patch config to use index for sounddevice (but persist name) - if device_index is not None: - for big in conf.bigs: - if big.audio_source.startswith('device:'): - big.audio_source = f'device:{device_index}' - else: - log.error(f"Device name '{input_device_name}' not found in current device list.") + # Map device name to index and configure input_format + device_index = int(input_device_name) if (input_device_name and input_device_name.isdigit()) else get_device_index_by_name(input_device_name or '') + if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.") - elif first_source == 'webrtc': - audio_mode_persist = 'Webapp' - input_device_name = None - elif first_source.startswith('file:'): - audio_mode_persist = 'Demo' - input_device_name = None - else: - audio_mode_persist = 'Network' - input_device_name = None - save_stream_settings({ + for big in conf.bigs: + if big.audio_source.startswith('device:'): + big.audio_source = f'device:{device_index}' + devinfo = sd.query_devices(device_index) + capture_rate = int(devinfo.get('default_samplerate') or 48000) + max_in = int(devinfo.get('max_input_channels') or 1) + channels = max(1, min(2, max_in)) + for big in conf.bigs: + big.input_format = f"int16le,{capture_rate},{channels}" + + # Coerce QoS: compute max_transport_latency from RTN if qos_config present + if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None: + conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3 + + # Create and init multicaster1 + self._multicaster1 = multicast_control.Multicaster(conf, conf.bigs) + await reset_nrf54l(1) + await self._multicaster1.init_broadcast() + auto_started = False + if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): + await self._multicaster1.start_streaming() + auto_started = True + + # Return proposed settings to persist on API side + return { 'channel_names': [big.name for big in conf.bigs], 'languages': [big.language for big in conf.bigs], 'audio_mode': audio_mode_persist, @@ -143,106 +223,156 @@ async def initialize(conf: auracast_config.AuracastConfigGroup): 'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs], 'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz, 'octets_per_frame': conf.octets_per_frame, + 'presentation_delay_us': getattr(conf, 'presentation_delay_us', None), + 'rtn': getattr(getattr(conf, 'qos_config', None), 'number_of_retransmissions', None), 'immediate_rendering': getattr(conf, 'immediate_rendering', False), 'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False), 'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None), - 'timestamp': datetime.utcnow().isoformat() - }) - global_config_group = conf - if multicaster1 is not None: - try: - await multicaster1.shutdown() - except Exception: - log.warning("Failed to shutdown previous multicaster", exc_info=True) - log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2)) - multicaster1 = multicast_control.Multicaster(conf, conf.bigs) - await multicaster1.init_broadcast() - if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): - log.info("Auto-starting streaming on multicaster1") - await multicaster1.start_streaming() - except Exception as e: - log.error("Exception in /init: %s", traceback.format_exc()) - raise HTTPException(status_code=500, detail=str(e)) + 'is_streaming': auto_started, + } + + async def _w_init_secondary(self, conf: auracast_config.AuracastConfigGroup) -> None: + if self._multicaster2 is not None: + try: + await self._multicaster2.shutdown() + except Exception: + pass + self._multicaster2 = None -@app.post("/init2") -async def initialize2(conf: auracast_config.AuracastConfigGroup): - """Initializes the secondary broadcaster (multicaster2). Does NOT persist stream settings.""" - global multicaster2 - try: conf.transport = TRANSPORT2 - # Patch device name to index for sounddevice for big in conf.bigs: if big.audio_source.startswith('device:'): device_name = big.audio_source.split(':', 1)[1] device_index = get_device_index_by_name(device_name) - if device_index is not None: - big.audio_source = f'device:{device_index}' - else: - log.error(f"Device name '{device_name}' not found in current device list.") + if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.") - log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) - multicaster2 = multicast_control.Multicaster(conf, conf.bigs) - await multicaster2.init_broadcast() + big.audio_source = f'device:{device_index}' + # Coerce QoS: compute max_transport_latency from RTN if qos_config present + if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None: + conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3 + + + self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs) + await reset_nrf54l(0) + await self._multicaster2.init_broadcast() if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs): - log.info("Auto-starting streaming on multicaster2") - await multicaster2.start_streaming() + await self._multicaster2.start_streaming() + + async def _w_stop_all(self) -> bool: + was_running = False + if self._multicaster1 is not None: + try: + await self._multicaster1.stop_streaming() + await self._multicaster1.shutdown() + was_running = True + finally: + self._multicaster1 = None + if self._multicaster2 is not None: + try: + await self._multicaster2.stop_streaming() + await self._multicaster2.shutdown() + was_running = True + finally: + self._multicaster2 = None + return was_running + + async def _w_status_primary(self) -> dict: + if self._multicaster1 is None: + return {'is_initialized': False, 'is_streaming': False} + try: + return self._multicaster1.get_status() + except Exception: + return {'is_initialized': True, 'is_streaming': False} + + async def _w_stream_lc3(self, audio_data: dict[str, str], bigs_template: list) -> None: + if self._multicaster1 is None: + raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized') + # Update bigs audio_source with provided bytes and start + for big in bigs_template: + if big.language not in audio_data: + raise HTTPException(status_code=500, detail='language len missmatch') + big.audio_source = audio_data[big.language].encode('latin-1') + self._multicaster1.big_conf = bigs_template + await self._multicaster1.start_streaming() + + +# Create the worker singleton and a route-level lock +streamer = StreamerWorker() +# multicaster1: multicast_control.Multicaster | None = None # kept for legacy references, do not use on API loop +# multicaster2: multicast_control.Multicaster | None = None +_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side +@app.post("/init") +async def initialize(conf: auracast_config.AuracastConfigGroup): + """Initializes the primary broadcaster on the streamer thread.""" + global global_config_group + async with _stream_lock: + try: + global_config_group = conf + log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2)) + persisted = await streamer.call(streamer._w_init_primary, conf) + # Persist returned settings (avoid touching from worker thread) + persisted['timestamp'] = datetime.utcnow().isoformat() + save_stream_settings(persisted) + except Exception as e: + log.error("Exception in /init: %s", traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/init2") +async def initialize2(conf: auracast_config.AuracastConfigGroup): + """Initializes the secondary broadcaster on the streamer thread.""" + try: + log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) + await streamer.call(streamer._w_init_secondary, conf) except Exception as e: log.error("Exception in /init2: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) -@app.post("/stream_lc3") -async def send_audio(audio_data: dict[str, str]): - """Sends a block of pre-coded LC3 audio.""" - if multicaster1 is None: - raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized') - try: - for big in global_config_group.bigs: - assert big.language in audio_data, HTTPException(status_code=500, detail='language len missmatch') - log.info('Received a send audio request for %s', big.language) - big.audio_source = audio_data[big.language].encode('latin-1') # TODO: use base64 encoding - - multicaster1.big_conf = global_config_group.bigs - await multicaster1.start_streaming() - return {"status": "audio_sent"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) @app.post("/stop_audio") async def stop_audio(): - """Stops streaming on both multicaster1 and multicaster2.""" + """Stops streaming on both multicaster1 and multicaster2 (worker thread).""" try: - # First close any active WebRTC peer connections so their track loops finish cleanly + # Close any active PeerConnections close_tasks = [pc.close() for pc in list(pcs)] pcs.clear() if close_tasks: await asyncio.gather(*close_tasks, return_exceptions=True) - # Now shut down both multicasters and release audio devices - running = False - if multicaster1 is not None: - await multicaster1.stop_streaming() - await multicaster1.reset() # Fully reset controller and advertising - running = True - if multicaster2 is not None: - await multicaster2.stop_streaming() - await multicaster2.reset() # Fully reset controller and advertising - running = True + was_running = await streamer.call(streamer._w_stop_all) - return {"status": "stopped", "was_running": running} + # Persist is_streaming=False + try: + settings = load_stream_settings() or {} + if settings.get('is_streaming'): + settings['is_streaming'] = False + settings['timestamp'] = datetime.utcnow().isoformat() + save_stream_settings(settings) + except Exception: + log.warning("Failed to persist is_streaming=False during stop_audio", exc_info=True) + + await asyncio.sleep(0.2) + return {"status": "stopped", "was_running": was_running} except Exception as e: log.error("Exception in /stop_audio: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) +@app.post("/stream_lc3") +async def send_audio(audio_data: dict[str, str]): + """Sends a block of pre-coded LC3 audio via the worker.""" + try: + await streamer.call(streamer._w_stream_lc3, audio_data, list(global_config_group.bigs)) + return {"status": "audio_sent"} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.get("/status") async def get_status(): - """Gets the current status of the multicaster together with persisted stream info.""" - status = multicaster1.get_status() if multicaster1 else { - 'is_initialized': False, - 'is_streaming': False, - } + """Gets current status (worker) merged with persisted settings cache.""" + status = await streamer.call(streamer._w_status_primary) status.update(load_stream_settings()) return status @@ -255,12 +385,13 @@ async def _autostart_from_settings(): and initializes streaming. """ try: - settings = load_stream_settings() or {} audio_mode = settings.get('audio_mode') input_device_name = settings.get('input_device') rate = settings.get('auracast_sampling_rate_hz') octets = settings.get('octets_per_frame') + pres_delay = settings.get('presentation_delay_us') + saved_rtn = settings.get('rtn') immediate_rendering = settings.get('immediate_rendering', False) assisted_listening_stream = settings.get('assisted_listening_stream', False) channel_names = settings.get('channel_names') or ["Broadcast0"] @@ -268,18 +399,11 @@ async def _autostart_from_settings(): languages = settings.get('languages') or ["deu"] stream_password = settings.get('stream_password') original_ts = settings.get('timestamp') + previously_streaming = bool(settings.get('is_streaming')) - try: - usb_names = {d.get('name') for _, d in list_usb_pw_inputs(refresh=False)} - net_names = {d.get('name') for _, d in list_network_pw_inputs(refresh=False)} - except Exception: - usb_names, net_names = set(), set() - if input_device_name in net_names: - os.environ.setdefault("PULSE_LATENCY_MSEC", "6") - else: - os.environ.setdefault("PULSE_LATENCY_MSEC", "3") - - # Only auto-start device-based inputs; Webapp and Demo require external sources/UI + # Only auto-start if the previous state was streaming and it's a device-based input. + if not previously_streaming: + return if not input_device_name: return if rate is None or octets is None: @@ -287,12 +411,14 @@ async def _autostart_from_settings(): return # Avoid duplicate start if already streaming - if multicaster1 and multicaster1.get_status().get('is_streaming'): + current = await streamer.call(streamer._w_status_primary) + if current.get('is_streaming'): return while True: # Do not interfere if user started a stream manually in the meantime - if multicaster1 and multicaster1.get_status().get('is_streaming'): + current = await streamer.call(streamer._w_status_primary) + if current.get('is_streaming'): return # Abort if saved settings changed to a different target while we were polling current_settings = load_stream_settings() or {} @@ -304,9 +430,9 @@ async def _autostart_from_settings(): current_settings.get('audio_mode') != audio_mode ): return - # Avoid refreshing PortAudio while we poll - usb = [d for _, d in list_usb_pw_inputs(refresh=False)] - net = [d for _, d in list_network_pw_inputs(refresh=False)] + # Check against the cached device lists + usb = [d for _, d in get_usb_pw_inputs()] + net = [d for _, d in get_network_pw_inputs()] names = {d.get('name') for d in usb} | {d.get('name') for d in net} if input_device_name in names: # Build a minimal config based on saved fields @@ -317,7 +443,7 @@ async def _autostart_from_settings(): program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info, language=languages[0] if languages else "deu", audio_source=f"device:{input_device_name}", - input_format=f"int16le,{rate},1", + # input_format is intentionally omitted to use the default iso_que_len=1, sampling_frequency=rate, octets_per_frame=octets, @@ -329,71 +455,43 @@ async def _autostart_from_settings(): transport=TRANSPORT1, immediate_rendering=immediate_rendering, assisted_listening_stream=assisted_listening_stream, + presentation_delay_us=pres_delay if pres_delay is not None else 40000, bigs=bigs, ) + # Attach QoS if saved_rtn present + conf.qos_config = auracast_config.AuracastQoSConfig( + iso_int_multiple_10ms=1, + number_of_retransmissions=int(saved_rtn), + max_transport_latency_ms=int(saved_rtn) * 10 + 3, + ) + # Initialize and start + await asyncio.sleep(2) await initialize(conf) return await asyncio.sleep(2) except Exception: log.warning("Autostart task failed", exc_info=True) -#TODO: enable and test this @app.on_event("startup") async def _startup_autostart_event(): # Spawn the autostart task without blocking startup + log.info("Refreshing PipeWire device cache.") + # Hydrate settings cache once to avoid disk I/O during /status + _hydrate_settings_cache_from_disk() + refresh_pw_cache() + # Start the streamer worker thread + streamer.start() asyncio.create_task(_autostart_from_settings()) - -@app.post("/refresh_audio_inputs") -async def refresh_audio_inputs(force: bool = False): - """Triggers a re-scan of audio devices. - - If force is True and a stream is active, the stream(s) will be stopped to allow - a full re-initialization of the sounddevice backend. The response will include - 'stopped_stream': True if any running stream was stopped. - """ - stopped = False - if force: - try: - # Stop active streams before forcing sounddevice re-init - if multicaster1 is not None and multicaster1.get_status().get('is_streaming'): - await multicaster1.stop_streaming() - stopped = True - if multicaster2 is not None and multicaster2.get_status().get('is_streaming'): - await multicaster2.stop_streaming() - stopped = True - except Exception: - log.warning("Failed to stop stream(s) before force refresh", exc_info=True) - # Reinitialize sounddevice backend if requested - try: - if sys.platform == 'linux' and force: - log.info("Force re-initializing sounddevice backend") - sd._terminate() - sd._initialize() - except Exception: - log.error("Exception while force-refreshing audio devices:", exc_info=True) - return {"status": "ok", "inputs": [], "stopped_stream": stopped} - - + @app.get("/audio_inputs_pw_usb") async def audio_inputs_pw_usb(): - """List PipeWire USB input nodes mapped to sounddevice indices. - - Returns a list of dicts: [{id, name, max_input_channels}]. - """ + """List PipeWire USB input nodes from cache.""" try: - # Do not refresh PortAudio if we are currently streaming to avoid termination - streaming = False - try: - if multicaster1 is not None: - status = multicaster1.get_status() - streaming = bool(status.get('is_streaming')) - except Exception: - streaming = False devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} - for idx, dev in list_usb_pw_inputs(refresh=not streaming) + for idx, dev in get_usb_pw_inputs() ] return {"inputs": devices} except Exception as e: @@ -403,22 +501,11 @@ async def audio_inputs_pw_usb(): @app.get("/audio_inputs_pw_network") async def audio_inputs_pw_network(): - """List PipeWire Network/AES67 input nodes mapped to sounddevice indices. - - Returns a list of dicts: [{id, name, max_input_channels}]. - """ + """List PipeWire Network/AES67 input nodes from cache.""" try: - # Do not refresh PortAudio if we are currently streaming to avoid termination - streaming = False - try: - if multicaster1 is not None: - status = multicaster1.get_status() - streaming = bool(status.get('is_streaming')) - except Exception: - streaming = False devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} - for idx, dev in list_network_pw_inputs(refresh=not streaming) + for idx, dev in get_network_pw_inputs() ] return {"inputs": devices} except Exception as e: @@ -426,140 +513,209 @@ async def audio_inputs_pw_network(): raise HTTPException(status_code=500, detail=str(e)) -@app.post("/offer") -async def offer(offer: Offer): - log.info("/offer endpoint called") +@app.post("/refresh_audio_devices") +async def refresh_audio_devices(): + """Triggers a re-scan of audio devices, but only if no stream is active.""" + streaming = False + try: + status = await streamer.call(streamer._w_status_primary) + streaming = bool(status.get('is_streaming')) + except Exception: + pass # Ignore errors, default to not refreshing - # If a previous PeerConnection is still alive, close it so we only ever keep one active. - if pcs: - log.info("Closing %d existing PeerConnection(s) before creating a new one", len(pcs)) - close_tasks = [p.close() for p in list(pcs)] - await asyncio.gather(*close_tasks, return_exceptions=True) - pcs.clear() + if streaming: + log.warning("Ignoring refresh request: an audio stream is active.") + raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.") - pc = RTCPeerConnection() # No STUN needed for localhost - pcs.add(pc) - id_ = uuid.uuid4().hex[:8] - log.info(f"{id_}: new PeerConnection") - - # create directory for records - only for testing - os.makedirs("./records", exist_ok=True) - - # Do NOT start the streamer yet – we'll start it lazily once we actually - # receive the first audio frame, ensuring WebRTCAudioInput is ready and - # avoiding race-conditions on restarts. - @pc.on("track") - async def on_track(track: MediaStreamTrack): - log.info(f"{id_}: track {track.kind} received") - try: - first = True - while True: - frame: av.audio.frame.AudioFrame = await track.recv() # RTP audio frame (already decrypted) - if first: - log.info(f"{id_}: frame layout={frame.layout}") - log.info(f"{id_}: frame format={frame.format}") - log.info( - f"{id_}: frame sample_rate={frame.sample_rate}, samples_per_channel={frame.samples}, planes={frame.planes}" - ) - # Lazily start the streamer now that we know a track exists. - if multicaster1.streamer is None: - await multicaster1.start_streaming() - # Yield control so the Streamer coroutine has a chance to - # create the WebRTCAudioInput before we push samples. - await asyncio.sleep(0) - first = False - # in stereo case this is interleaved data format - frame_array = frame.to_ndarray() - log.info(f"array.shape{frame_array.shape}") - log.info(f"array.dtype{frame_array.dtype}") - log.info(f"frame.to_ndarray(){frame_array}") - - samples = frame_array.reshape(-1) - log.info(f"samples.shape: {samples.shape}") - - if frame.layout.name == 'stereo': - # Interleaved stereo: [L0, R0, L1, R1, ...] - mono_array = samples[::2] # Take left channel - else: - mono_array = samples - - log.info(f"mono_array.shape: {mono_array.shape}") + try: + log.info("Refreshing PipeWire device cache.") + refresh_pw_cache() + return {"status": "ok"} + except Exception as e: + log.error("Exception during device refresh: %s", traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Failed to refresh devices: {e}") - frame_array = frame.to_ndarray() +# async def offer(offer: Offer): +# @app.post("/offer") #webrtc endpoint +# log.info("/offer endpoint called") - # Flatten in case it's (1, N) or (N,) - samples = frame_array.reshape(-1) +# # If a previous PeerConnection is still alive, close it so we only ever keep one active. +# if pcs: +# log.info("Closing %d existing PeerConnection(s) before creating a new one", len(pcs)) +# close_tasks = [p.close() for p in list(pcs)] +# await asyncio.gather(*close_tasks, return_exceptions=True) +# pcs.clear() - if frame.layout.name == 'stereo': - # Interleaved stereo: [L0, R0, L1, R1, ...] - mono_array = samples[::2] # Take left channel - else: - mono_array = samples +# pc = RTCPeerConnection() # No STUN needed for localhost +# pcs.add(pc) +# id_ = uuid.uuid4().hex[:8] +# log.info(f"{id_}: new PeerConnection") - # Get current WebRTC audio input (streamer may have been restarted) - big0 = list(multicaster1.bigs.values())[0] - audio_input = big0.get('audio_input') - # Wait until the streamer has instantiated the WebRTCAudioInput - if audio_input is None or getattr(audio_input, 'closed', False): - continue - # Feed mono PCM samples to the global WebRTC audio input - await audio_input.put_samples(mono_array.astype(np.int16)) +# # create directory for records - only for testing +# os.makedirs("./records", exist_ok=True) - # Save to WAV file - only for testing - # if not hasattr(pc, 'wav_writer'): - # import wave - # wav_path = f"./records/auracast_{id_}.wav" - # pc.wav_writer = wave.open(wav_path, "wb") - # pc.wav_writer.setnchannels(1) # mono - # pc.wav_writer.setsampwidth(2) # 16-bit PCM - # pc.wav_writer.setframerate(frame.sample_rate) +# # Do NOT start the streamer yet – we'll start it lazily once we actually +# # receive the first audio frame, ensuring WebRTCAudioInput is ready and +# # avoiding race-conditions on restarts. +# @pc.on("track") +# async def on_track(track: MediaStreamTrack): +# log.info(f"{id_}: track {track.kind} received") +# try: +# first = True +# while True: +# frame: av.audio.frame.AudioFrame = await track.recv() # RTP audio frame (already decrypted) +# if first: +# log.info(f"{id_}: frame layout={frame.layout}") +# log.info(f"{id_}: frame format={frame.format}") +# log.info( +# f"{id_}: frame sample_rate={frame.sample_rate}, samples_per_channel={frame.samples}, planes={frame.planes}" +# ) +# # Lazily start the streamer now that we know a track exists. +# if multicaster1.streamer is None: +# await multicaster1.start_streaming() +# # Yield control so the Streamer coroutine has a chance to +# # create the WebRTCAudioInput before we push samples. +# await asyncio.sleep(0) +# # Persist is_streaming=True for Webapp mode +# try: +# settings = load_stream_settings() or {} +# settings['is_streaming'] = True +# settings['timestamp'] = datetime.utcnow().isoformat() +# save_stream_settings(settings) +# except Exception: +# log.warning("Failed to persist is_streaming=True on WebRTC start", exc_info=True) +# first = False +# # in stereo case this is interleaved data format +# frame_array = frame.to_ndarray() +# log.info(f"array.shape{frame_array.shape}") +# log.info(f"array.dtype{frame_array.dtype}") +# log.info(f"frame.to_ndarray(){frame_array}") - # pcm_data = mono_array.astype(np.int16).tobytes() - # pc.wav_writer.writeframes(pcm_data) +# samples = frame_array.reshape(-1) +# log.info(f"samples.shape: {samples.shape}") + +# if frame.layout.name == 'stereo': +# # Interleaved stereo: [L0, R0, L1, R1, ...] +# mono_array = samples[::2] # Take left channel +# else: +# mono_array = samples + +# log.info(f"mono_array.shape: {mono_array.shape}") - except Exception as e: - log.error(f"{id_}: Exception in on_track: {e}") - finally: - # Always close the wav file when the track ends or on error - if hasattr(pc, 'wav_writer'): - try: - pc.wav_writer.close() - except Exception: - pass - del pc.wav_writer +# frame_array = frame.to_ndarray() - # --- SDP negotiation --- - log.info(f"{id_}: setting remote description") - await pc.setRemoteDescription(RTCSessionDescription(**offer.model_dump())) +# # Flatten in case it's (1, N) or (N,) +# samples = frame_array.reshape(-1) - log.info(f"{id_}: creating answer") - answer = await pc.createAnswer() - sdp = answer.sdp - # Insert a=ptime using the global PTIME variable - ptime_line = f"a=ptime:{PTIME}" - if "a=sendrecv" in sdp: - sdp = sdp.replace("a=sendrecv", f"a=sendrecv\n{ptime_line}") - else: - sdp += f"\n{ptime_line}" - new_answer = RTCSessionDescription(sdp=sdp, type=answer.type) - await pc.setLocalDescription(new_answer) - log.info(f"{id_}: sending answer with {ptime_line}") - return {"sdp": pc.localDescription.sdp, - "type": pc.localDescription.type} +# if frame.layout.name == 'stereo': +# # Interleaved stereo: [L0, R0, L1, R1, ...] +# mono_array = samples[::2] # Take left channel +# else: +# mono_array = samples + +# # Get current WebRTC audio input (streamer may have been restarted) +# big0 = list(multicaster1.bigs.values())[0] +# audio_input = big0.get('audio_input') +# # Wait until the streamer has instantiated the WebRTCAudioInput +# if audio_input is None or getattr(audio_input, 'closed', False): +# continue +# # Feed mono PCM samples to the global WebRTC audio input +# await audio_input.put_samples(mono_array.astype(np.int16)) + +# # Save to WAV file - only for testing +# # if not hasattr(pc, 'wav_writer'): +# # import wave +# # wav_path = f"./records/auracast_{id_}.wav" +# # pc.wav_writer = wave.open(wav_path, "wb") +# # pc.wav_writer.setnchannels(1) # mono +# # pc.wav_writer.setsampwidth(2) # 16-bit PCM +# # pc.wav_writer.setframerate(frame.sample_rate) + +# # pcm_data = mono_array.astype(np.int16).tobytes() +# # pc.wav_writer.writeframes(pcm_data) + + +# except Exception as e: +# log.error(f"{id_}: Exception in on_track: {e}") +# finally: +# # Always close the wav file when the track ends or on error +# if hasattr(pc, 'wav_writer'): +# try: +# pc.wav_writer.close() +# except Exception: +# pass +# del pc.wav_writer + +# # --- SDP negotiation --- +# log.info(f"{id_}: setting remote description") +# await pc.setRemoteDescription(RTCSessionDescription(**offer.model_dump())) + +# log.info(f"{id_}: creating answer") +# answer = await pc.createAnswer() +# sdp = answer.sdp +# # Insert a=ptime using the global PTIME variable +# ptime_line = f"a=ptime:{PTIME}" +# if "a=sendrecv" in sdp: +# sdp = sdp.replace("a=sendrecv", f"a=sendrecv\n{ptime_line}") +# else: +# sdp += f"\n{ptime_line}" +# new_answer = RTCSessionDescription(sdp=sdp, type=answer.type) +# await pc.setLocalDescription(new_answer) +# log.info(f"{id_}: sending answer with {ptime_line}") +# return {"sdp": pc.localDescription.sdp, +# "type": pc.localDescription.type} @app.post("/shutdown") async def shutdown(): """Stops broadcasting and releases all audio/Bluetooth resources.""" try: - await multicaster1.shutdown() + await streamer.call(streamer._w_stop_all) return {"status": "stopped"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) +@app.post("/system_reboot") +async def system_reboot(): + """Stop audio and request a system reboot via sudo. + + Requires the service user to have passwordless sudo permissions to run 'reboot'. + """ + try: + # Best-effort: stop any active streaming cleanly WITHOUT persisting state + try: + # Close any WebRTC peer connections + close_tasks = [pc.close() for pc in list(pcs)] + pcs.clear() + if close_tasks: + await asyncio.gather(*close_tasks, return_exceptions=True) + + # Stop streaming on worker but DO NOT touch stream_settings.json + try: + await streamer.call(streamer._w_stop_all) + except Exception: + pass + except Exception: + log.warning("Non-fatal: failed to stop streams before reboot", exc_info=True) + + # Launch reboot without waiting for completion + try: + await asyncio.create_subprocess_exec("sudo", "reboot") + except Exception as e: + log.error("Failed to invoke reboot: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to invoke reboot: {e}") + + return {"status": "rebooting"} + except HTTPException: + raise + except Exception as e: + log.error("Exception in /system_reboot: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + if __name__ == '__main__': import os os.chdir(os.path.dirname(__file__)) @@ -569,4 +725,4 @@ if __name__ == '__main__': format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) # Bind to localhost only for security: prevents network access, only frontend on same machine can connect - uvicorn.run(app, host="127.0.0.1", port=5000) \ No newline at end of file + uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False) \ No newline at end of file diff --git a/src/auracast/utils/reset_utils.py b/src/auracast/utils/reset_utils.py new file mode 100644 index 0000000..74f549b --- /dev/null +++ b/src/auracast/utils/reset_utils.py @@ -0,0 +1,77 @@ +import os +import asyncio +import logging as log + +async def reset_nrf54l(slot: int = 0, timeout: float = 8.0): + """ + Reset the nRF54L target using OpenOCD before starting broadcast. + + Looks for interface config files in the project at `src/openocd/` relative to this module only. + Accepts filename variants per slot: + - slot 0: raspberrypi-swd0.cfg or swd0.cfg + - slot 1: raspberrypi-swd1.cfg or swd1.cfg + + Executes the equivalent of: + openocd \ + -f ./raspberrypi-${INTERFACE}.cfg \ + -f target/nordic/nrf54l.cfg \ + -c "init" \ + -c "reset run" \ + -c "shutdown" + + Best-effort: if OpenOCD is unavailable, logs a warning and continues. + """ + try: + # Resolve project directory and filenames + proj_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'openocd')) + names = ['raspberrypi-swd0.cfg', 'swd0.cfg'] if slot == 0 else ['raspberrypi-swd1.cfg', 'swd1.cfg'] + cfg = None + for n in names: + p = os.path.join(proj_dir, n) + if os.path.exists(p): + cfg = p + break + if not cfg: + log.warning("reset_nrf54l: no interface CFG found in project dir %s; skipping reset", proj_dir) + return + + # Build openocd command (no sudo required as per project setup). + cmd = ['openocd', '-f', cfg, '-f', 'target/nordic/nrf54l.cfg', + '-c', 'init', '-c', 'reset run', '-c', 'shutdown'] + async def _run(cmd): + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT + ) + try: + out, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + log.error("reset_nrf54l: %s timed out; terminating", cmd[0]) + proc.kill() + return False + rc = proc.returncode + if rc != 0: + log.error("reset_nrf54l: %s exited with code %s; output: %s", cmd[0], rc, (out or b'').decode(errors='ignore')) + return False + return True + + ok = await _run(cmd) + if ok: + log.info("reset_nrf54l: reset succeeded (slot %d) using %s", slot, cfg) + + except FileNotFoundError: + log.error("reset_nrf54l: openocd not found; skipping reset") + except Exception: + log.error("reset_nrf54l failed", exc_info=True) + + +if __name__ == '__main__': + # Basic logging setup + log.basicConfig( + level=os.environ.get('LOG_LEVEL', log.INFO), + format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + slot_to_reset = 1 + log.info(f"Executing reset for slot {slot_to_reset}") + asyncio.run(reset_nrf54l(slot=slot_to_reset)) diff --git a/src/auracast/utils/sounddevice_utils.py b/src/auracast/utils/sounddevice_utils.py index 35dbea0..7db59a4 100644 --- a/src/auracast/utils/sounddevice_utils.py +++ b/src/auracast/utils/sounddevice_utils.py @@ -49,64 +49,40 @@ def _sd_matches_from_names(pa_idx, names): out.append((i, d)) return out -def list_usb_pw_inputs(refresh: bool = True): - """ - Return [(device_index, device_dict), ...] for PipeWire **input** nodes - backed by **USB** devices (excludes monitor sources). +# Module-level caches for device lists +_usb_inputs_cache = [] +_network_inputs_cache = [] - Parameters: - - refresh (bool): If True (default), force PortAudio to re-enumerate devices - before mapping. Set to False to avoid disrupting active streams. +def get_usb_pw_inputs(): + """Return cached list of USB PipeWire inputs.""" + return _usb_inputs_cache + +def get_network_pw_inputs(): + """Return cached list of Network/AES67 PipeWire inputs.""" + return _network_inputs_cache + +def refresh_pw_cache(): """ - # Refresh PortAudio so we see newly added nodes before mapping - if refresh: - _sd_refresh() + Performs a full device scan and updates the internal caches for both USB + and Network audio devices. This is a heavy operation and should not be + called frequently or during active streams. + """ + global _usb_inputs_cache, _network_inputs_cache + + # Force PortAudio to re-enumerate devices + _sd_refresh() pa_idx = _pa_like_hostapi_index() pw = _pw_dump() - # Map device.id -> device.bus ("usb"/"pci"/"platform"/"network"/...) + # --- Pass 1: Map device.id to device.bus --- device_bus = {} for obj in pw: if obj.get("type") == "PipeWire:Interface:Device": props = (obj.get("info") or {}).get("props") or {} device_bus[obj["id"]] = (props.get("device.bus") or "").lower() - # Collect names/descriptions of USB input nodes + # --- Pass 2: Identify all USB and Network nodes --- usb_input_names = set() - for obj in pw: - if obj.get("type") != "PipeWire:Interface:Node": - continue - props = (obj.get("info") or {}).get("props") or {} - media = (props.get("media.class") or "").lower() - if "source" not in media and "stream/input" not in media: - continue - # skip monitor sources ("Monitor of ..." or *.monitor) - nname = (props.get("node.name") or "").lower() - ndesc = (props.get("node.description") or "").lower() - if ".monitor" in nname or "monitor" in ndesc: - continue - bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower() - if bus == "usb": - usb_input_names.add(props.get("node.description") or props.get("node.name")) - - # Map to sounddevice devices on PipeWire host API - return _sd_matches_from_names(pa_idx, usb_input_names) - -def list_network_pw_inputs(refresh: bool = True): - """ - Return [(device_index, device_dict), ...] for PipeWire **input** nodes that - look like network/AES67/RTP sources (excludes monitor sources). - - Parameters: - - refresh (bool): If True (default), force PortAudio to re-enumerate devices - before mapping. Set to False to avoid disrupting active streams. - """ - # Refresh PortAudio so we see newly added nodes before mapping - if refresh: - _sd_refresh() - pa_idx = _pa_like_hostapi_index() - pw = _pw_dump() - network_input_names = set() for obj in pw: if obj.get("type") != "PipeWire:Interface:Node": @@ -115,26 +91,29 @@ def list_network_pw_inputs(refresh: bool = True): media = (props.get("media.class") or "").lower() if "source" not in media and "stream/input" not in media: continue + nname = (props.get("node.name") or "") ndesc = (props.get("node.description") or "") - # skip monitor sources + # Skip all monitor sources if ".monitor" in nname.lower() or "monitor" in ndesc.lower(): continue - # Heuristics for network/AES67/RTP + # Check for USB + bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower() + if bus == "usb": + usb_input_names.add(ndesc or nname) + continue # A device is either USB or Network, not both + + # Heuristics for Network/AES67/RTP text = (nname + " " + ndesc).lower() media_name = (props.get("media.name") or "").lower() node_group = (props.get("node.group") or "").lower() - # Presence flags/keys that strongly indicate network RTP/AES67 sources node_network_flag = bool(props.get("node.network")) - has_rtp_keys = any(k in props for k in ( - "rtp.session", "rtp.source.ip", "rtp.source.port", "rtp.fmtp", "rtp.rate" - )) - has_sess_keys = any(k in props for k in ( - "sess.name", "sess.media", "sess.latency.msec" - )) + has_rtp_keys = any(k in props for k in ("rtp.session", "rtp.source.ip")) + has_sess_keys = any(k in props for k in ("sess.name", "sess.media")) + is_network = ( - (props.get("device.bus") or "").lower() == "network" or + bus == "network" or node_network_flag or "rtp" in media_name or any(k in text for k in ("rtp", "sap", "aes67", "network", "raop", "airplay")) or @@ -145,7 +124,13 @@ def list_network_pw_inputs(refresh: bool = True): if is_network: network_input_names.add(ndesc or nname) - return _sd_matches_from_names(pa_idx, network_input_names) + # --- Final Step: Update caches --- + _usb_inputs_cache = _sd_matches_from_names(pa_idx, usb_input_names) + _network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names) + + +# Populate cache on initial module load +refresh_pw_cache() # Example usage: # for i, d in list_usb_pw_inputs(): diff --git a/src/openocd/raspberrypi-swd0.cfg b/src/openocd/raspberrypi-swd0.cfg new file mode 100644 index 0000000..853f575 --- /dev/null +++ b/src/openocd/raspberrypi-swd0.cfg @@ -0,0 +1,8 @@ +adapter driver bcm2835gpio +transport select swd +adapter gpio swclk 17 +adapter gpio swdio 18 +#adapter gpio trst 26 +#reset_config trst_only + +adapter speed 1000 diff --git a/src/openocd/raspberrypi-swd1.cfg b/src/openocd/raspberrypi-swd1.cfg new file mode 100644 index 0000000..103c0f9 --- /dev/null +++ b/src/openocd/raspberrypi-swd1.cfg @@ -0,0 +1,8 @@ +adapter driver bcm2835gpio +transport select swd +adapter gpio swclk 24 +adapter gpio swdio 23 +#adapter gpio trst 27 +#reset_config trst_only + +adapter speed 1000 diff --git a/src/scripts/hit_status.sh b/src/scripts/hit_status.sh new file mode 100644 index 0000000..24f252e --- /dev/null +++ b/src/scripts/hit_status.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# Usage: ./hit_status.sh [COUNT] [SLEEP_SECONDS] +# Always targets http://127.0.0.1:5000/status +# Defaults: COUNT=100 SLEEP_SECONDS=0 +# Example: ./hit_status.sh 100 0.05 + +set -euo pipefail + +URL="http://127.0.0.1:5000/status" +COUNT="${1:-100}" +SLEEP_SECS="${2:-0}" + +# Ensure COUNT is an integer +if ! [[ "$COUNT" =~ ^[0-9]+$ ]]; then + echo "COUNT must be an integer, got: $COUNT" >&2 + exit 1 +fi + +for i in $(seq 1 "$COUNT"); do + echo "[$i/$COUNT] GET $URL" + curl -sS "$URL" > /dev/null || echo "Request $i failed" + # Sleep if non-zero (supports floats, no bc needed) + if [[ "$SLEEP_SECS" != "0" && "$SLEEP_SECS" != "0.0" && "$SLEEP_SECS" != "" ]]; then + sleep "$SLEEP_SECS" + fi +done \ No newline at end of file