# frontend/app.py import os import time import streamlit as st import requests from auracast import auracast_config import logging as log # Track whether WebRTC stream is active across Streamlit reruns if 'stream_started' not in st.session_state: st.session_state['stream_started'] = False # Global: desired packetization time in ms for Opus (should match backend) PTIME = 40 BACKEND_URL = "http://localhost:5000" #TRANSPORT1 = "serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_B53C372677E14460-if00,115200,rtscts" #TRANSPORT2 = "serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_CC69A2912F84AE5E-if00,115200,rtscts" TRANSPORT1 = 'serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi gpio header TRANSPORT2 = 'serial:/dev/ttyAMA4,1000000,rtscts' # transport for raspberry pi gpio header QUALITY_MAP = { "High (48kHz)": {"rate": 48000, "octets": 120}, "Good (32kHz)": {"rate": 32000, "octets": 80}, "Medium (24kHz)": {"rate": 24000, "octets": 60}, "Fair (16kHz)": {"rate": 16000, "octets": 40}, } # Try loading persisted settings from backend saved_settings = {} try: resp = requests.get(f"{BACKEND_URL}/status", timeout=1) if resp.status_code == 200: saved_settings = resp.json() except Exception: saved_settings = {} st.title("🎙️ Auracast Audio Mode Control") # Audio mode selection with persisted default options = ["Webapp", "USB/Network", "Demo"] saved_audio_mode = saved_settings.get("audio_mode", "Webapp") if saved_audio_mode not in options: saved_audio_mode = "Webapp" audio_mode = st.selectbox( "Audio Mode", options, index=options.index(saved_audio_mode), help="Select the audio input source. Choose 'Webapp' for browser microphone, 'USB/Network' for a connected hardware device, or 'Demo' for a simulated stream." ) if audio_mode == "Demo": demo_stream_map = { "1 × 48kHz": {"quality": "High (48kHz)", "streams": 1}, "2 × 24kHz": {"quality": "Medium (24kHz)", "streams": 2}, "3 × 16kHz": {"quality": "Fair (16kHz)", "streams": 3}, "2 × 48kHz": {"quality": "High (48kHz)", "streams": 2}, "4 × 24kHz": {"quality": "Medium (24kHz)", "streams": 4}, "6 × 16kHz": {"quality": "Fair (16kHz)", "streams": 6}, } demo_options = list(demo_stream_map.keys()) default_demo = demo_options[0] demo_selected = st.selectbox( "Demo Stream Type", demo_options, index=0, help="Select the demo stream configuration." ) #st.info(f"Demo mode selected: {demo_selected} (Streams: {demo_stream_map[demo_selected]['streams']}, Rate: {demo_stream_map[demo_selected]['rate']} Hz)") # Start/Stop buttons for demo mode if 'demo_stream_started' not in st.session_state: st.session_state['demo_stream_started'] = False col1, col2 = st.columns(2) with col1: start_demo = st.button("Start Demo Stream") with col2: stop_demo = st.button("Stop Demo Stream") if start_demo: # Always stop any running stream for clean state try: requests.post(f"{BACKEND_URL}/stop_audio").json() except Exception: pass time.sleep(1) demo_cfg = demo_stream_map[demo_selected] # Octets per frame logic matches quality_map q = QUALITY_MAP[demo_cfg['quality']] # Language configs and test files lang_cfgs = [ (auracast_config.AuracastBigConfigDeu, 'de'), (auracast_config.AuracastBigConfigEng, 'en'), (auracast_config.AuracastBigConfigFra, 'fr'), (auracast_config.AuracastBigConfigSpa, 'es'), (auracast_config.AuracastBigConfigIta, 'it'), (auracast_config.AuracastBigConfigPol, 'pl'), ] bigs1 = [] for i in range(demo_cfg['streams']): cfg_cls, lang = lang_cfgs[i % len(lang_cfgs)] bigs1.append(cfg_cls( audio_source=f'file:../testdata/wave_particle_5min_{lang}_{int(q["rate"]/1000)}kHz_mono.wav', iso_que_len=32, sampling_frequency=q['rate'], octets_per_frame=q['octets'], )) # Split bigs into two configs if needed max_per_mc = {48000: 1, 24000: 2, 16000: 3} max_streams = max_per_mc.get(q['rate'], 3) bigs2 = [] if len(bigs1) > max_streams: bigs2 = bigs1[max_streams:] bigs1 = bigs1[:max_streams] config1 = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=q['rate'], octets_per_frame=q['octets'], transport=TRANSPORT1, bigs=bigs1 ) config2 = None if bigs2: config2 = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=q['rate'], octets_per_frame=q['octets'], transport=TRANSPORT2, bigs=bigs2 ) # Call /init and /init2 try: r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump()) if r1.status_code == 200: msg = f"Demo stream started on multicaster 1 ({len(bigs1)} streams)" st.session_state['demo_stream_started'] = True st.success(msg) else: st.session_state['demo_stream_started'] = False st.error(f"Failed to initialize multicaster 1: {r1.text}") if config2: r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump()) if r2.status_code == 200: st.success(f"Demo stream started on multicaster 2 ({len(bigs2)} streams)") else: st.error(f"Failed to initialize multicaster 2: {r2.text}") except Exception as e: st.session_state['demo_stream_started'] = False st.error(f"Error: {e}") elif stop_demo: try: r = requests.post(f"{BACKEND_URL}/stop_audio").json() st.session_state['demo_stream_started'] = False if r.get('was_running'): st.info("Demo stream stopped.") else: st.info("Demo stream was not running.") except Exception as e: st.error(f"Error: {e}") elif st.session_state['demo_stream_started']: st.success(f"Demo stream running: {demo_selected}") else: st.info("Demo stream not running.") quality = None # Not used in demo mode else: # Stream quality selection (now enabled) quality_options = list(QUALITY_MAP.keys()) default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0] quality = st.selectbox( "Stream Quality (Sampling Rate)", quality_options, index=quality_options.index(default_quality), help="Select the audio sampling rate for the stream. Lower rates may improve compatibility." ) default_name = saved_settings.get('channel_names', ["Broadcast0"])[0] default_lang = saved_settings.get('languages', ["deu"])[0] default_input = saved_settings.get('input_device') or 'default' stream_name = st.text_input( "Channel Name", value=default_name, help="The primary name for your broadcast. Like the SSID of a WLAN, it identifies your stream for receivers." ) raw_program_info = saved_settings.get('program_info', default_name) if isinstance(raw_program_info, list) and raw_program_info: default_program_info = raw_program_info[0] else: default_program_info = raw_program_info program_info = st.text_input( "Program Info", value=default_program_info, help="Additional details about the broadcast program, such as its content or purpose. Shown to receivers for more context." ) language = st.text_input( "Language (ISO 639-3)", value=default_lang, help="Three-letter language code (e.g., 'eng' for English, 'deu' for German). Used by receivers to display the language of the stream. See: https://en.wikipedia.org/wiki/List_of_ISO_639-3_codes" ) # Gain slider for Webapp mode if audio_mode == "Webapp": mic_gain = st.slider("Microphone Gain", 0.0, 2.0, 1.0, 0.1, help="Adjust microphone volume sent to Auracast") else: mic_gain = 1.0 # Input device selection for USB mode if audio_mode == "USB/Network": resp = requests.get(f"{BACKEND_URL}/audio_inputs") device_list = resp.json().get('inputs', []) # Display "name [id]" but use name as value input_options = [f"{d['name']} [{d['id']}]" for d in device_list] option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list} device_names = [d['name'] for d in device_list] # Determine default input by name default_input_name = saved_settings.get('input_device') if default_input_name not in device_names and device_names: default_input_name = device_names[0] default_input_label = None for label, name in option_name_map.items(): if name == default_input_name: default_input_label = label break if not input_options: st.warning("No hardware audio input devices found. Plug in a USB input device and click Refresh.") if st.button("Refresh"): try: requests.post(f"{BACKEND_URL}/refresh_audio_inputs", timeout=3) except Exception as e: st.error(f"Failed to refresh devices: {e}") st.rerun() input_device = None else: col1, col2 = st.columns([3, 1], vertical_alignment="bottom") with col1: selected_option = st.selectbox( "Input Device", input_options, index=input_options.index(default_input_label) if default_input_label in input_options else 0 ) with col2: if st.button("Refresh"): try: requests.post(f"{BACKEND_URL}/refresh_audio_inputs", timeout=3) except Exception as e: st.error(f"Failed to refresh devices: {e}") st.rerun() # Send only the device name to backend input_device = option_name_map[selected_option] if selected_option in option_name_map else None else: input_device = None start_stream = st.button("Start Auracast") stop_stream = st.button("Stop Auracast") # If gain slider moved while streaming, send update to JS without restarting if audio_mode == "Webapp" and st.session_state.get('stream_started'): update_js = f""" """ st.components.v1.html(update_js, height=0) if stop_stream: st.session_state['stream_started'] = False try: r = requests.post(f"{BACKEND_URL}/stop_audio").json() if r['was_running']: st.success("Stream Stopped!") else: st.success("Stream was not running.") except Exception as e: st.error(f"Error: {e}") # Ensure existing WebRTC connection is fully closed so that a fresh # connection is created the next time we start the stream. if audio_mode == "Webapp": cleanup_js = """ """ st.components.v1.html(cleanup_js, height=0) if start_stream: # Always send stop to ensure backend is in a clean state, regardless of current status r = requests.post(f"{BACKEND_URL}/stop_audio").json() if r['was_running']: st.success("Stream Stopped!") # Small pause lets backend fully release audio devices before re-init time.sleep(1) # Prepare config using the model (do NOT send qos_config, only relevant fields) q = QUALITY_MAP[quality] config = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=q['rate'], octets_per_frame=q['octets'], transport=TRANSPORT1, # transport for raspberry pi gpio header bigs = [ auracast_config.AuracastBigConfig( name=stream_name, program_info=program_info, language=language, audio_source=( f"device:{input_device}" if audio_mode == "USB/Network" else ( "webrtc" if audio_mode == "Webapp" else "network" ) ), input_format=(f"int16le,{q['rate']},1" if audio_mode == "USB/Network" else "auto"), iso_que_len=1, sampling_frequency=q['rate'], octets_per_frame=q['octets'], ), ] ) try: r = requests.post(f"{BACKEND_URL}/init", json=config.model_dump()) if r.status_code == 200: st.success("Stream Started!") else: st.error(f"Failed to initialize: {r.text}") except Exception as e: st.error(f"Error: {e}") # Render / maintain WebRTC component if audio_mode == "Webapp" and (start_stream or st.session_state.get('stream_started')): st.markdown("Starting microphone; allow access if prompted and speak.") component = f""" """ st.components.v1.html(component, height=0) st.session_state['stream_started'] = True #else: # st.header("Advertised Streams (Cloud Announcements)") # st.info("This feature requires backend support to list advertised streams.") # Placeholder for future implementation # Example: r = requests.get(f"{BACKEND_URL}/advertised_streams") # if r.status_code == 200: # streams = r.json() # for s in streams: # st.write(s) # else: # st.error("Could not fetch advertised streams.") log.basicConfig( level=os.environ.get('LOG_LEVEL', log.DEBUG), format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' )