Files
bumble-auracast/src/auracast/server/multicast_frontend.py

1886 lines
87 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# frontend/app.py
import os
import time
import logging as log
from PIL import Image
import requests
from dotenv import load_dotenv
import streamlit as st
from auracast.utils.read_temp import read_case_temp, read_cpu_temp
from auracast import auracast_config
from auracast.utils.frontend_auth import (
is_pw_disabled,
load_pw_record,
save_pw_record,
hash_password,
verify_password,
)
# Set page configuration (tab title and icon) before using other Streamlit APIs
# Always use the favicon from the utils folder relative to this file
_THIS_DIR = os.path.dirname(__file__)
_FAVICON_PATH = os.path.abspath(os.path.join(_THIS_DIR, '..', 'utils', 'favicon.ico'))
favicon = Image.open(_FAVICON_PATH)
st.set_page_config(page_title="Beacon", page_icon=favicon, layout="centered")
# Load environment variables from a .env file if present
load_dotenv()
# Track whether WebRTC stream is active across Streamlit reruns
if 'stream_started' not in st.session_state:
st.session_state['stream_started'] = False
# Frontend authentication gate is controlled via env using shared utils
if 'frontend_authenticated' not in st.session_state:
st.session_state['frontend_authenticated'] = False
if not is_pw_disabled():
pw_rec = load_pw_record()
# First-time setup: no password set -> force user to choose one
if pw_rec is None:
st.header("Set up your frontend password")
st.info("For security, you must set a password on first access.")
with st.form("first_setup_form"):
new_pw = st.text_input("New password", type="password")
new_pw2 = st.text_input("Confirm password", type="password")
submitted = st.form_submit_button("Save password")
if submitted:
if len(new_pw) < 6:
st.error("Password should be at least 6 characters.")
elif new_pw != new_pw2:
st.error("Passwords do not match.")
else:
salt, key = hash_password(new_pw)
try:
save_pw_record(salt, key)
st.success("Password saved. You can now sign in.")
st.rerun()
except Exception as e:
st.error(f"Failed to save password: {e}")
st.stop()
# Normal sign-in gate
if not st.session_state['frontend_authenticated']:
st.header("Sign in")
with st.form("signin_form"):
pw = st.text_input("Password", type="password")
submitted = st.form_submit_button("Sign in")
if submitted:
if verify_password(pw, pw_rec):
st.session_state['frontend_authenticated'] = True
st.success("Signed in.")
st.rerun()
else:
st.error("Incorrect password. Please try again.")
# Stop rendering the rest of the app until authenticated
if not st.session_state['frontend_authenticated']:
st.stop()
BACKEND_URL = "http://localhost:5000"
QUALITY_MAP = {
"High (48kHz)": {"rate": 48000, "octets": 120},
"Good (32kHz)": {"rate": 32000, "octets": 80},
"Medium (24kHz)": {"rate": 24000, "octets": 60},
"Fair (16kHz)": {"rate": 16000, "octets": 40},
}
QOS_PRESET_MAP = {
"Fast": auracast_config.AuracastQosFast(),
"Robust": auracast_config.AuracastQosRobust(),
}
# Try loading persisted settings from backend
saved_settings = {}
try:
resp = requests.get(f"{BACKEND_URL}/status", timeout=1)
if resp.status_code == 200:
saved_settings = resp.json()
except Exception:
saved_settings = {}
# Define is_streaming early from the fetched status for use throughout the UI
is_streaming = bool(saved_settings.get("is_streaming", False))
# Extract secondary status, if provided by the backend /status endpoint.
secondary_status = saved_settings.get("secondary") or {}
secondary_is_streaming = bool(saved_settings.get("secondary_is_streaming", secondary_status.get("is_streaming", False)))
def validate_unique_input_devices(radio1_streams, radio2_streams):
"""Validate that input devices are unique across all streams within each radio."""
# Check Radio 1 devices
r1_devices = [s['input_device'] for s in radio1_streams if s.get('input_device')]
if len(r1_devices) != len(set(r1_devices)):
return False, "Duplicate input devices detected in Radio 1 streams. Each stream must use a unique input device."
# Check Radio 2 devices
r2_devices = [s['input_device'] for s in radio2_streams if s.get('input_device')]
if len(r2_devices) != len(set(r2_devices)):
return False, "Duplicate input devices detected in Radio 2 streams. Each stream must use a unique input device."
return True, ""
st.title("Auracast Audio Mode Control")
def render_stream_controls(status_streaming: bool, start_label: str, stop_label: str, mode_label: str, secondary_streaming: bool = False):
c_start, c_stop, c_status = st.columns([1, 1, 3], gap="small", vertical_alignment="center")
with c_start:
start_clicked = st.button(start_label, disabled=status_streaming)
with c_stop:
stop_clicked = st.button(stop_label, disabled=not status_streaming)
with c_status:
if status_streaming:
if secondary_streaming:
status_text = "🟢 Streaming on both Radios"
else:
status_text = "🟢 Streaming on Radio 1"
else:
status_text = "🔴 Stopped"
st.write(f"Mode: {mode_label} · {status_text}")
return start_clicked, stop_clicked
# Audio mode selection with persisted default
# Note: backend persists 'USB' for any device:<name> source (including AES67). We default to 'USB' in that case.
options = [
"Demo",
"Analog",
"Network - Dante",
]
saved_audio_mode = saved_settings.get("audio_mode", "Demo")
if saved_audio_mode not in options:
# Map legacy/unknown modes to closest
mapping = {"USB/Network": "USB", "AES67": "Network"}
saved_audio_mode = mapping.get(saved_audio_mode, "Demo")
audio_mode = st.selectbox(
"Audio Mode",
options,
index=options.index(saved_audio_mode) if saved_audio_mode in options else options.index("Demo"),
disabled=is_streaming,
help=(
"Select the audio input source. Choose 'USB' for a connected USB audio device (via PipeWire), "
"'Network' (AES67) for network RTP/AES67 sources, "
"'Network - Dante' for Dante inputs via ALSA, "
"or 'Demo' for a simulated stream."
)
)
# Determine the displayed mode label:
# - While streaming, prefer the backend-reported mode
# - When not streaming, show the currently selected mode
backend_mode_raw = saved_settings.get("audio_mode")
backend_mode_mapped = None
if isinstance(backend_mode_raw, str):
if backend_mode_raw == "AES67":
backend_mode_mapped = "Network"
elif backend_mode_raw == "USB/Network":
backend_mode_mapped = "USB"
elif backend_mode_raw in options:
backend_mode_mapped = backend_mode_raw
# When Analog is selected in the UI we always show it as such, even though the
# backend currently persists USB for all device sources.
if audio_mode == "Analog":
running_mode = "Analog"
else:
running_mode = backend_mode_mapped if (is_streaming and backend_mode_mapped) else audio_mode
# Start/Stop buttons and status (moved to top)
if audio_mode == "Demo":
start_stream, stop_stream = render_stream_controls(is_streaming, "Start Demo", "Stop Demo", running_mode, secondary_is_streaming)
else:
start_stream, stop_stream = render_stream_controls(is_streaming, "Start Auracast", "Stop Auracast", running_mode, secondary_is_streaming)
# Placeholder for validation errors (will be filled in later)
validation_error_placeholder = st.empty()
is_started = False
is_stopped = False
if audio_mode == "Demo":
demo_stream_map = {
"1 × 48kHz": {"quality": "High (48kHz)", "streams": 1},
"1 × 24kHz": {"quality": "Medium (24kHz)", "streams": 1},
"1 × 16kHz": {"quality": "Fair (16kHz)", "streams": 1},
"2 × 24kHz": {"quality": "Medium (24kHz)", "streams": 2},
"3 × 16kHz": {"quality": "Fair (16kHz)", "streams": 3},
"2 × 48kHz": {"quality": "High (48kHz)", "streams": 2},
"4 × 24kHz": {"quality": "Medium (24kHz)", "streams": 4},
"6 × 16kHz": {"quality": "Fair (16kHz)", "streams": 6},
}
demo_options = list(demo_stream_map.keys())
default_index = 0
saved_type = saved_settings.get('demo_stream_type')
if isinstance(saved_type, str) and saved_type in demo_options:
default_index = demo_options.index(saved_type)
else:
saved_rate = saved_settings.get('auracast_sampling_rate_hz')
saved_total = saved_settings.get('demo_total_streams')
if saved_total is None and saved_settings.get('audio_mode') == 'Demo':
saved_total = len(saved_settings.get('channel_names') or [])
try:
if saved_rate and saved_total:
for i, label in enumerate(demo_options):
cfg = demo_stream_map[label]
rate_for_label = QUALITY_MAP[cfg['quality']]['rate']
if cfg['streams'] == int(saved_total) and int(saved_rate) == rate_for_label:
default_index = i
break
except Exception:
default_index = 0
demo_selected = st.selectbox(
"Demo Stream Type",
demo_options,
index=default_index,
disabled=is_streaming,
help="Select the demo stream configuration."
)
# Stream password and flags (same as USB/AES67)
saved_pwd = saved_settings.get('stream_password', '') or ''
stream_passwort = st.text_input(
"Stream Passwort",
value=saved_pwd,
type=("password"),
disabled=is_streaming,
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
)
col_flags1, col_flags2, col_pdelay, col_qos = st.columns([1, 1, 0.7, 0.6], gap="small", vertical_alignment="center")
with col_flags1:
assisted_listening = st.checkbox(
"Assistive listening",
value=bool(saved_settings.get('assisted_listening_stream', False)),
disabled=is_streaming,
help="tells the receiver that this is an assistive listening stream"
)
with col_flags2:
immediate_rendering = st.checkbox(
"Immediate rendering",
value=bool(saved_settings.get('immediate_rendering', False)),
disabled=is_streaming,
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
# QoS/presentation controls inline with flags
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_pdelay:
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
presentation_delay_ms = st.number_input(
"Delay (ms)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
disabled=is_streaming,
help="Delay between capture and presentation for receivers."
)
with col_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_settings.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
qos_preset = st.selectbox(
"QoS", options=qos_options, index=default_qos_idx,
disabled=is_streaming,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
#st.info(f"Demo mode selected: {demo_selected} (Streams: {demo_stream_map[demo_selected]['streams']}, Rate: {demo_stream_map[demo_selected]['rate']} Hz)")
quality = None # Not used in demo mode
else:
# --- Mode-specific configuration ---
default_name = saved_settings.get('channel_names', ["Broadcast0"])[0]
raw_program_info = saved_settings.get('program_info', default_name)
if isinstance(raw_program_info, list) and raw_program_info:
default_program_info = raw_program_info[0]
else:
default_program_info = raw_program_info
default_lang = saved_settings.get('languages', ["deu"])[0]
# Per-mode configuration and controls
input_device = None
radio2_enabled = False
radio1_cfg = None
radio2_cfg = None
if audio_mode == "Analog":
# --- Radio 1 controls ---
with st.container(border=True):
st.subheader("Radio 1")
# Always-enabled checkbox for Radio 1
st.checkbox(
"Radio 1 always enabled",
value=True,
disabled=True,
help="Radio 1 is always enabled, Radio 2 can be turned on or off."
)
# Stereo mode toggle for analog
stereo_enabled = st.checkbox(
"🎧 Stereo Mode",
value=bool(saved_settings.get('analog_stereo_mode', False)),
help="Enable stereo streaming for analog inputs. When enabled, ch1 becomes left channel and ch2 becomes right channel in a single stereo stream. Radio 2 will be disabled in stereo mode.",
disabled=is_streaming
)
# Use analog-specific defaults (not from saved settings which may have Dante values)
default_name = "Analog_Radio_1"
default_program_info = "Analog Radio Broadcast"
default_lang = "deu"
quality_options = list(QUALITY_MAP.keys())
default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
quality1 = st.selectbox(
"Stream Quality (Radio 1)",
quality_options,
index=quality_options.index(default_quality),
disabled=is_streaming,
help="Select the audio sampling rate for Radio 1."
)
stream_passwort1 = st.text_input(
"Stream Passwort (Radio 1)",
value="",
type="password",
disabled=is_streaming,
help="Optional: Set a broadcast code for Radio 1."
)
col_r1_flags1, col_r1_flags2, col_r1_pdelay, col_r1_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_r1_flags1:
assisted_listening1 = st.checkbox(
"Assistive listening (R1)",
value=bool(saved_settings.get('assisted_listening_stream', False)),
disabled=is_streaming,
help="tells the receiver that this is an assistive listening stream"
)
with col_r1_flags2:
immediate_rendering1 = st.checkbox(
"Immediate rendering (R1)",
value=bool(saved_settings.get('immediate_rendering', False)),
disabled=is_streaming,
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_r1_pdelay:
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
presentation_delay_ms1 = st.number_input(
"Delay (ms, R1)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
disabled=is_streaming,
help="Delay between capture and presentation for Radio 1."
)
with col_r1_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_settings.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
qos_preset1 = st.selectbox(
"QoS (R1)", options=qos_options, index=default_qos_idx,
disabled=is_streaming,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
col_r1_name, col_r1_lang = st.columns([2, 1])
with col_r1_name:
stream_name1 = st.text_input(
"Channel Name (Radio 1)",
value=default_name,
disabled=is_streaming,
help="Name for the first analog radio (Radio 1)."
)
with col_r1_lang:
language1 = st.text_input(
"Language (ISO 639-3) (Radio 1)",
value=default_lang,
disabled=is_streaming,
help="Language code for Radio 1."
)
program_info1 = st.text_input(
"Program Info (Radio 1)",
value=default_program_info,
disabled=is_streaming,
help="Program information for Radio 1."
)
# Analog mode exposes only ALSA ch1/ch2 inputs.
if not is_streaming:
try:
resp = requests.get(f"{BACKEND_URL}/audio_inputs_pw_usb")
device_list = resp.json().get('inputs', [])
except Exception as e:
st.error(f"Failed to fetch devices: {e}")
device_list = []
analog_devices = [d for d in device_list if d.get('name') in ('ch1', 'ch2')]
if not analog_devices:
st.warning("No Analog (ch1/ch2) ALSA inputs found. Check asound configuration.")
if st.button("Refresh", key="refresh_analog", disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
if not r.ok:
st.error(f"Failed to refresh: {r.text}")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
analog_names = [d['name'] for d in analog_devices]
else:
analog_devices = []
analog_names = []
if not is_streaming:
if analog_names:
if stereo_enabled:
# In stereo mode, only show ch1 and automatically select it
if 'ch1' in analog_names:
input_device1 = 'ch1'
st.selectbox(
"Input Device (Radio 1) - Stereo",
['ch1 + ch2 (Stereo: Left+Right channels)'],
index=0,
disabled=is_streaming,
help="Stereo mode: Captures both ch1 (left) and ch2 (right) as a single stereo stream"
)
st.info("🎧 Stereo mode enabled - both ch1 and ch2 will be captured as left/right channels")
else:
st.error("ch1 not available for stereo mode")
input_device1 = None
else:
# Mono mode: show all available channels
default_r1_idx = 0
input_device1 = st.selectbox(
"Input Device (Radio 1)",
analog_names,
index=default_r1_idx,
disabled=is_streaming,
)
else:
input_device1 = None
else:
input_device1 = saved_settings.get('input_device')
if stereo_enabled:
st.selectbox(
"Input Device (Radio 1) - Stereo",
[f"{input_device1 or 'ch1'} (Left+Right channels)" if input_device1 else "No device selected"],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
else:
st.selectbox(
"Input Device (Radio 1)",
[input_device1 or "No device selected"],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
# --- Radio 2 controls ---
with st.container(border=True):
st.subheader("Radio 2")
# Disable Radio 2 in stereo mode
if stereo_enabled:
st.info("🎧 Radio 2 is automatically disabled in stereo mode")
radio2_enabled = False
else:
# If the backend reports that the secondary radio is currently streaming,
# initialize the checkbox to checked so the UI reflects the active state
# when the frontend is loaded.
radio2_enabled_default = secondary_is_streaming
radio2_enabled = st.checkbox(
"Enable Radio 2",
value=radio2_enabled_default,
disabled=is_streaming,
help="Activate a second analog radio with its own quality and timing settings."
)
if radio2_enabled and not stereo_enabled:
# Use analog-specific defaults for Radio 2
default_name_r2 = "Analog_Radio_2"
default_program_info_r2 = "Analog Radio Broadcast"
default_lang_r2 = "deu"
quality2 = st.selectbox(
"Stream Quality (Radio 2)",
quality_options,
index=quality_options.index(default_quality),
disabled=is_streaming,
help="Select the audio sampling rate for Radio 2."
)
stream_passwort2 = st.text_input(
"Stream Passwort (Radio 2)",
value="",
type="password",
disabled=is_streaming,
help="Optional: Set a broadcast code for Radio 2."
)
col_r2_flags1, col_r2_flags2, col_r2_pdelay, col_r2_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_r2_flags1:
assisted_listening2 = st.checkbox(
"Assistive listening (R2)",
value=bool(saved_settings.get('assisted_listening_stream', False)),
disabled=is_streaming,
help="tells the receiver that this is an assistive listening stream"
)
with col_r2_flags2:
immediate_rendering2 = st.checkbox(
"Immediate rendering (R2)",
value=bool(saved_settings.get('immediate_rendering', False)),
disabled=is_streaming,
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
with col_r2_pdelay:
presentation_delay_ms2 = st.number_input(
"Delay (ms, R2)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
disabled=is_streaming,
help="Delay between capture and presentation for Radio 2."
)
with col_r2_qos:
saved_qos2 = saved_settings.get('secondary', {}).get('qos_preset', 'Fast')
default_qos_idx2 = qos_options.index(saved_qos2) if saved_qos2 in qos_options else 0
qos_preset2 = st.selectbox(
"QoS (R2)", options=qos_options, index=default_qos_idx2,
disabled=is_streaming,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
col_r2_name, col_r2_lang = st.columns([2, 1])
with col_r2_name:
stream_name2 = st.text_input(
"Channel Name (Radio 2)",
value=default_name_r2,
disabled=is_streaming,
help="Name for the second analog radio (Radio 2)."
)
with col_r2_lang:
language2 = st.text_input(
"Language (ISO 639-3) (Radio 2)",
value=default_lang_r2,
disabled=is_streaming,
help="Language code for Radio 2."
)
program_info2 = st.text_input(
"Program Info (Radio 2)",
value=default_program_info_r2,
disabled=is_streaming,
help="Program information for Radio 2."
)
if not is_streaming:
if analog_names:
default_r2_idx = 1 if len(analog_names) > 1 else 0
input_device2 = st.selectbox(
"Input Device (Radio 2)",
analog_names,
index=default_r2_idx,
disabled=is_streaming,
)
else:
input_device2 = None
else:
input_device2 = saved_settings.get('input_device')
st.selectbox(
"Input Device (Radio 2)",
[input_device2 or "No device selected"],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
radio2_cfg = {
'id': 1002,
'name': stream_name2,
'program_info': program_info2,
'language': language2,
'input_device': input_device2,
'quality': quality2,
'stream_passwort': stream_passwort2,
'assisted_listening': assisted_listening2,
'immediate_rendering': immediate_rendering2,
'presentation_delay_ms': presentation_delay_ms2,
'qos_preset': qos_preset2,
}
radio1_cfg = {
'id': 1001,
'name': stream_name1,
'program_info': program_info1,
'language': language1,
'input_device': input_device1,
'quality': quality1,
'stream_passwort': stream_passwort1,
'assisted_listening': assisted_listening1,
'immediate_rendering': immediate_rendering1,
'presentation_delay_ms': presentation_delay_ms1,
'qos_preset': qos_preset1,
'stereo_mode': stereo_enabled, # Add stereo mode setting
}
if audio_mode == "Network - Dante":
# --- Network - Dante mode with Radio 1 and Radio 2 categories ---
# Define stream configuration options
dante_stream_options = {
"1 × 48kHz": {"streams": 1, "quality": "High (48kHz)"},
"2 × 24kHz": {"streams": 2, "quality": "Medium (24kHz)"},
"3 × 16kHz": {"streams": 3, "quality": "Fair (16kHz)"}
}
# Get available Dante devices
if not is_streaming:
try:
resp = requests.get(f"{BACKEND_URL}/audio_inputs_dante")
device_list = resp.json().get('inputs', [])
except Exception as e:
st.error(f"Failed to fetch Dante devices: {e}")
device_list = []
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
device_names = [d['name'] for d in device_list]
else:
input_options = []
option_name_map = {}
device_names = []
# --- Radio 1 Section ---
with st.container(border=True):
st.subheader("Radio 1")
# Always-enabled checkbox for Radio 1
st.checkbox(
"Radio 1 always enabled",
value=True,
disabled=True,
help="Radio 1 is always enabled, Radio 2 can be turned on or off."
)
# Dante stereo mode toggle
saved_r1_config = saved_settings.get('dante_radio1', {})
dante_stereo_enabled = st.checkbox(
"🎧 Stereo Mode",
value=bool(saved_r1_config.get('dante_stereo_mode', False)),
help="Enable stereo streaming for Dante inputs. Select left and right channels from ASRC channels 1-6. Radio 2 and multi-stream configurations will be disabled in stereo mode.",
disabled=is_streaming
)
# Dante stereo channel selectors
dante_left_channel = None
dante_right_channel = None
if dante_stereo_enabled:
dante_channel_options = ["dante_asrc_ch1", "dante_asrc_ch2", "dante_asrc_ch3",
"dante_asrc_ch4", "dante_asrc_ch5", "dante_asrc_ch6"]
dante_channel_labels = ["CH1", "CH2", "CH3", "CH4", "CH5", "CH6"]
col_left, col_right = st.columns(2)
with col_left:
saved_left = saved_r1_config.get('dante_stereo_left', 'dante_asrc_ch1')
left_idx = dante_channel_options.index(saved_left) if saved_left in dante_channel_options else 0
dante_left_channel = st.selectbox(
"Left Channel",
dante_channel_options,
index=left_idx,
format_func=lambda x: f"ASRC {dante_channel_labels[dante_channel_options.index(x)]}",
disabled=is_streaming,
help="Select the Dante ASRC channel for the left stereo channel"
)
with col_right:
saved_right = saved_r1_config.get('dante_stereo_right', 'dante_asrc_ch2')
right_idx = dante_channel_options.index(saved_right) if saved_right in dante_channel_options else 1
dante_right_channel = st.selectbox(
"Right Channel",
dante_channel_options,
index=right_idx,
format_func=lambda x: f"ASRC {dante_channel_labels[dante_channel_options.index(x)]}",
disabled=is_streaming,
help="Select the Dante ASRC channel for the right stereo channel"
)
if dante_left_channel == dante_right_channel:
st.warning("⚠️ Left and right channels are the same. Select different channels for true stereo.")
else:
st.info(f"🎧 Stereo mode: {dante_channel_labels[dante_channel_options.index(dante_left_channel)]} (Left) + {dante_channel_labels[dante_channel_options.index(dante_right_channel)]} (Right)")
# Stream count dropdown for Radio 1 (disabled in stereo mode - forced to 1 stream at 48kHz)
r1_stream_options = list(dante_stream_options.keys())
saved_r1_streams = saved_r1_config.get('stream_config', '1x48')
default_r1_idx = r1_stream_options.index(saved_r1_streams) if saved_r1_streams in r1_stream_options else 0
if dante_stereo_enabled:
# Stereo mode: force 1 stream at 48kHz
r1_stream_config = "1 × 48kHz"
st.selectbox(
"Stream Configuration (Radio 1)",
["1 × 48kHz (Stereo)"],
index=0,
disabled=True,
help="In stereo mode, only 1 stream at 48kHz is supported"
)
else:
r1_stream_config = st.selectbox(
"Stream Configuration (Radio 1)",
r1_stream_options,
index=default_r1_idx,
disabled=is_streaming,
help="Select the number and quality of streams for Radio 1"
)
r1_num_streams = dante_stream_options[r1_stream_config]["streams"]
r1_quality = dante_stream_options[r1_stream_config]["quality"]
# Stream quality (moved directly under stream configuration)
r1_max_quality = r1_quality
r1_available_qualities = []
for quality in ["High (48kHz)", "Good (32kHz)", "Medium (24kHz)", "Fair (16kHz)"]:
# Check if this quality is equal to or lower than the max
if (r1_max_quality == "High (48kHz)" or
(r1_max_quality == "Medium (24kHz)" and quality in ["Medium (24kHz)", "Fair (16kHz)"]) or
(r1_max_quality == "Fair (16kHz)" and quality == "Fair (16kHz)")):
r1_available_qualities.append(quality)
saved_r1_quality = saved_r1_config.get('radio_quality', r1_max_quality)
if saved_r1_quality not in r1_available_qualities:
saved_r1_quality = r1_max_quality
r1_radio_quality = st.selectbox(
"Stream Quality (Radio 1)",
r1_available_qualities,
index=r1_available_qualities.index(saved_r1_quality),
disabled=is_streaming,
help=f"Select stream quality for Radio 1. Maximum quality based on configuration: {r1_max_quality}"
)
# Radio-level settings for Radio 1
# First row: Assistive listening, immediate rendering, presentation delay, QoS
col_r1_flags1, col_r1_flags2, col_r1_pdelay, col_r1_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_r1_flags1:
r1_assisted_listening = st.checkbox(
"Assistive (R1)",
value=bool(saved_r1_config.get('assisted_listening', False)),
disabled=is_streaming,
help="Assistive listening stream"
)
with col_r1_flags2:
r1_immediate_rendering = st.checkbox(
"Immediate (R1)",
value=bool(saved_r1_config.get('immediate_rendering', False)),
disabled=is_streaming,
help="Ignore presentation delay"
)
with col_r1_pdelay:
default_pdelay = int(saved_r1_config.get('presentation_delay_us', 40000) or 40000)
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
r1_presentation_delay_ms = st.number_input(
"Delay (ms, R1)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
disabled=is_streaming,
help="Presentation delay for Radio 1"
)
with col_r1_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_r1_config.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
r1_qos_preset = st.selectbox(
"QoS (R1)", options=qos_options, index=default_qos_idx,
disabled=is_streaming,
help="Quality of Service preset for Radio 1"
)
# Per-stream configuration for Radio 1
if dante_stereo_enabled:
st.write("**Stereo Stream Configuration (Radio 1)**")
else:
st.write("**Stream Configuration (Radio 1)**")
r1_streams = []
if dante_stereo_enabled:
# Stereo mode: single stream with combined L+R channels
with st.expander("Stereo Stream - Radio 1", expanded=True):
saved_streams = saved_r1_config.get('streams', [])
saved_stream = saved_streams[0] if saved_streams else {}
# First row: Channel name and password
col_name, col_pwd = st.columns([2, 1])
with col_name:
stream_name = st.text_input(
"Channel Name",
value=saved_stream.get('name', 'Dante_Stereo'),
disabled=is_streaming,
key="r1_stereo_name"
)
with col_pwd:
stream_password = st.text_input(
"Stream Password",
value=saved_stream.get('stream_password', ''),
type="password",
disabled=is_streaming,
key="r1_stereo_password",
help="Optional: Set a broadcast code for this stream"
)
# Second row: Program info and language
col_prog, col_lang_code = st.columns([2, 1])
with col_prog:
program_info = st.text_input(
"Program Info",
value=saved_stream.get('program_info', 'Dante Stereo Broadcast'),
disabled=is_streaming,
key="r1_stereo_program"
)
with col_lang_code:
language = st.text_input(
"Language",
value=saved_stream.get('language', 'eng'),
disabled=is_streaming,
key="r1_stereo_lang",
help="ISO 639-3 language code"
)
# Build stereo device name from selected channels
# Extract channel numbers from dante_asrc_chX
left_ch_num = dante_left_channel.replace('dante_asrc_ch', '') if dante_left_channel else '1'
right_ch_num = dante_right_channel.replace('dante_asrc_ch', '') if dante_right_channel else '2'
# Use the device name that matches the user's L/R selection
stereo_device_name = f"dante_stereo_{left_ch_num}_{right_ch_num}"
# Show stereo device info
st.selectbox(
"Input Device (Stereo)",
[f"Stereo: CH{left_ch_num} (L) + CH{right_ch_num} (R)"],
index=0,
disabled=True,
help="Stereo input from the selected Dante ASRC channels"
)
r1_streams.append({
'name': stream_name,
'program_info': program_info,
'language': language,
'input_device': stereo_device_name,
'stream_password': stream_password,
'dante_stereo_mode': True,
'dante_stereo_left': dante_left_channel,
'dante_stereo_right': dante_right_channel,
})
else:
# Normal mono mode: multiple streams with individual channels
for i in range(r1_num_streams):
with st.expander(f"Stream {i+1} - Radio 1", expanded=True):
saved_streams = saved_r1_config.get('streams', [])
saved_stream = saved_streams[i] if i < len(saved_streams) else {}
# First row: Channel name and language
col_name, col_lang = st.columns([2, 1])
with col_name:
stream_name = st.text_input(
f"Channel Name",
value=saved_stream.get('name', f'Dante_R1_S{i+1}'),
disabled=is_streaming,
key=f"r1_stream_{i}_name"
)
with col_lang:
stream_password = st.text_input(
f"Stream Password",
value=saved_stream.get('stream_password', ''),
type="password",
disabled=is_streaming,
key=f"r1_stream_{i}_password",
help="Optional: Set a broadcast code for this stream"
)
# Second row: Program info and language
col_prog, col_lang_code = st.columns([2, 1])
with col_prog:
program_info = st.text_input(
f"Program Info",
value=saved_stream.get('program_info', f'Dante Radio 1 Stream {i+1}'),
disabled=is_streaming,
key=f"r1_stream_{i}_program"
)
with col_lang_code:
language = st.text_input(
f"Language",
value=saved_stream.get('language', 'eng'),
disabled=is_streaming,
key=f"r1_stream_{i}_lang",
help="ISO 639-3 language code"
)
# Third row: Input device
col_device = st.columns([1])[0]
with col_device:
# Session state key for persisting the selection
device_session_key = f"r1_stream_{i}_device_saved"
if not is_streaming and input_options:
# Get default from session state first, then from saved settings
default_input_name = st.session_state.get(device_session_key, saved_stream.get('input_device'))
default_input_label = None
for label, name in option_name_map.items():
if name == default_input_name:
default_input_label = label
break
if default_input_label not in input_options and input_options:
default_input_label = input_options[0]
selected_option = st.selectbox(
f"Input Device",
input_options,
index=input_options.index(default_input_label) if default_input_label in input_options else 0,
disabled=is_streaming,
key=f"r1_stream_{i}_device"
)
input_device = option_name_map.get(selected_option)
# Save to session state for persistence
st.session_state[device_session_key] = input_device
else:
# When streaming, get the device from session state
current_device = st.session_state.get(device_session_key, saved_stream.get('input_device', 'No device'))
# Convert internal name to display label
display_label = current_device
for label, name in option_name_map.items():
if name == current_device:
display_label = label
break
st.selectbox(
f"Input Device",
[display_label if display_label else 'No device'],
index=0,
disabled=True,
key=f"r1_stream_{i}_device_disabled"
)
input_device = current_device
r1_streams.append({
'name': stream_name,
'program_info': program_info,
'language': language,
'input_device': input_device,
'stream_password': stream_password
})
# --- Radio 2 Section ---
with st.container(border=True):
st.subheader("Radio 2")
# Disable Radio 2 in stereo mode
saved_r2_config = saved_settings.get('dante_radio2', {})
if dante_stereo_enabled:
st.info("🎧 Radio 2 is automatically disabled in stereo mode")
radio2_enabled = False
else:
# Enable/disable checkbox for Radio 2
radio2_enabled_default = secondary_is_streaming
radio2_enabled = st.checkbox(
"Enable Radio 2",
value=radio2_enabled_default,
disabled=is_streaming,
help="Activate a second Dante radio with its own quality and timing settings."
)
if radio2_enabled:
# Stream count dropdown for Radio 2
r2_stream_options = r1_stream_options
saved_r2_streams = saved_r2_config.get('stream_config', '1x48')
default_r2_idx = r2_stream_options.index(saved_r2_streams) if saved_r2_streams in r2_stream_options else 0
r2_stream_config = st.selectbox(
"Stream Configuration (Radio 2)",
r2_stream_options,
index=default_r2_idx,
disabled=is_streaming,
help="Select the number and quality of streams for Radio 2"
)
r2_num_streams = dante_stream_options[r2_stream_config]["streams"]
r2_quality = dante_stream_options[r2_stream_config]["quality"]
# Stream quality (moved directly under stream configuration)
r2_max_quality = r2_quality
r2_available_qualities = []
for quality in ["High (48kHz)", "Good (32kHz)", "Medium (24kHz)", "Fair (16kHz)"]:
# Check if this quality is equal to or lower than the max
if (r2_max_quality == "High (48kHz)" or
(r2_max_quality == "Medium (24kHz)" and quality in ["Medium (24kHz)", "Fair (16kHz)"]) or
(r2_max_quality == "Fair (16kHz)" and quality == "Fair (16kHz)")):
r2_available_qualities.append(quality)
saved_r2_quality = saved_r2_config.get('radio_quality', r2_max_quality)
if saved_r2_quality not in r2_available_qualities:
saved_r2_quality = r2_max_quality
r2_radio_quality = st.selectbox(
"Stream Quality (Radio 2)",
r2_available_qualities,
index=r2_available_qualities.index(saved_r2_quality),
disabled=is_streaming,
help=f"Select stream quality for Radio 2. Maximum quality based on configuration: {r2_max_quality}"
)
# Radio-level settings for Radio 2
# First row: Assistive listening, immediate rendering, presentation delay, QoS
col_r2_flags1, col_r2_flags2, col_r2_pdelay, col_r2_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_r2_flags1:
r2_assisted_listening = st.checkbox(
"Assistive (R2)",
value=bool(saved_r2_config.get('assisted_listening', False)),
disabled=is_streaming,
help="Assistive listening stream"
)
with col_r2_flags2:
r2_immediate_rendering = st.checkbox(
"Immediate (R2)",
value=bool(saved_r2_config.get('immediate_rendering', False)),
disabled=is_streaming,
help="Ignore presentation delay"
)
with col_r2_pdelay:
default_pdelay = int(saved_r2_config.get('presentation_delay_us', 40000) or 40000)
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
r2_presentation_delay_ms = st.number_input(
"Delay (ms, R2)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
disabled=is_streaming,
help="Presentation delay for Radio 2"
)
with col_r2_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_r2_config.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
r2_qos_preset = st.selectbox(
"QoS (R2)", options=qos_options, index=default_qos_idx,
disabled=is_streaming,
help="Quality of Service preset for Radio 2"
)
# Per-stream configuration for Radio 2
st.write("**Stream Configuration (Radio 2)**")
r2_streams = []
for i in range(r2_num_streams):
with st.expander(f"Stream {i+1} - Radio 2", expanded=True):
saved_streams = saved_r2_config.get('streams', [])
saved_stream = saved_streams[i] if i < len(saved_streams) else {}
# First row: Channel name and password
col_name, col_pwd = st.columns([2, 1])
with col_name:
stream_name = st.text_input(
f"Channel Name",
value=saved_stream.get('name', f'Dante_R2_S{i+1}'),
disabled=is_streaming,
key=f"r2_stream_{i}_name"
)
with col_pwd:
stream_password = st.text_input(
f"Stream Password",
value=saved_stream.get('stream_password', ''),
type="password",
disabled=is_streaming,
key=f"r2_stream_{i}_password",
help="Optional: Set a broadcast code for this stream"
)
# Second row: Program info and language
col_prog, col_lang = st.columns([2, 1])
with col_prog:
program_info = st.text_input(
f"Program Info",
value=saved_stream.get('program_info', f'Dante Radio 2 Stream {i+1}'),
disabled=is_streaming,
key=f"r2_stream_{i}_program"
)
with col_lang:
language = st.text_input(
f"Language",
value=saved_stream.get('language', 'eng'),
disabled=is_streaming,
key=f"r2_stream_{i}_lang",
help="ISO 639-3 language code"
)
# Third row: Input device
col_device = st.columns([1])[0]
with col_device:
# Session state key for persisting the selection
device_session_key = f"r2_stream_{i}_device_saved"
if not is_streaming and input_options:
# Get default from session state first, then from saved settings
default_input_name = st.session_state.get(device_session_key, saved_stream.get('input_device'))
default_input_label = None
for label, name in option_name_map.items():
if name == default_input_name:
default_input_label = label
break
if default_input_label not in input_options and input_options:
default_input_label = input_options[0]
selected_option = st.selectbox(
f"Input Device",
input_options,
index=input_options.index(default_input_label) if default_input_label in input_options else 0,
disabled=is_streaming,
key=f"r2_stream_{i}_device"
)
input_device = option_name_map.get(selected_option)
# Save to session state for persistence
st.session_state[device_session_key] = input_device
else:
# When streaming, get the device from session state
current_device = st.session_state.get(device_session_key, saved_stream.get('input_device', 'No device'))
# Convert internal name to display label
display_label = current_device
for label, name in option_name_map.items():
if name == current_device:
display_label = label
break
st.selectbox(
f"Input Device",
[display_label if display_label else 'No device'],
index=0,
disabled=True,
key=f"r2_stream_{i}_device_disabled"
)
input_device = current_device
r2_streams.append({
'name': stream_name,
'program_info': program_info,
'language': language,
'input_device': input_device,
'stream_password': stream_password
})
else:
r2_streams = []
# Set default values for r2_* variables when radio2 is disabled
r2_stream_config = '1 × 48kHz'
r2_quality = 'High (48kHz)'
r2_radio_quality = 'High (48kHz)'
r2_assisted_listening = False
r2_immediate_rendering = False
r2_presentation_delay_ms = 40
r2_qos_preset = 'Fast'
# Validate unique input devices for Network - Dante mode
if audio_mode == "Network - Dante":
is_valid, error_msg = validate_unique_input_devices(r1_streams, r2_streams)
if not is_valid:
# Display error in the placeholder at the top
validation_error_placeholder.error(error_msg)
# Show device refresh button if no devices found
if not input_options and not is_streaming:
st.warning("No Dante inputs found. Check asound.conf configuration.")
if st.button("Refresh", key="refresh_dante_devices", disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
if not r.ok:
st.error(f"Failed to refresh: {r.text}")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
# Store radio configurations for backend
radio1_cfg = {
'stream_config': r1_stream_config,
'max_quality': r1_quality, # Original max quality from stream config
'radio_quality': r1_radio_quality, # User-selected quality (may be lower)
'quality': r1_radio_quality, # Use selected quality for backend
'streams': r1_streams,
'assisted_listening': r1_assisted_listening,
'immediate_rendering': r1_immediate_rendering,
'presentation_delay_ms': r1_presentation_delay_ms,
'qos_preset': r1_qos_preset,
'dante_stereo_mode': dante_stereo_enabled,
'dante_stereo_left': dante_left_channel,
'dante_stereo_right': dante_right_channel,
}
radio2_cfg = {
'stream_config': r2_stream_config,
'max_quality': r2_quality if radio2_enabled else None, # Original max quality from stream config
'radio_quality': r2_radio_quality if radio2_enabled else None, # User-selected quality
'quality': r2_radio_quality if radio2_enabled else None, # Use selected quality for backend
'streams': r2_streams,
'assisted_listening': r2_assisted_listening if radio2_enabled else False,
'immediate_rendering': r2_immediate_rendering if radio2_enabled else False,
'presentation_delay_ms': r2_presentation_delay_ms if radio2_enabled else 40000,
'qos_preset': r2_qos_preset if radio2_enabled else 'Fast',
} if radio2_enabled else None
if audio_mode in ("USB", "Network"):
# USB/Network: single set of controls shared with the single channel
quality_options = list(QUALITY_MAP.keys())
default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
quality = st.selectbox(
"Stream Quality (Sampling Rate)",
quality_options,
index=quality_options.index(default_quality),
disabled=is_streaming,
help="Select the audio sampling rate for the stream. Lower rates may improve compatibility."
)
stream_passwort = st.text_input(
"Stream Passwort",
value="",
type="password",
disabled=is_streaming,
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
)
col_flags1, col_flags2, col_pdelay, col_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_flags1:
assisted_listening = st.checkbox(
"Assistive listening",
value=bool(saved_settings.get('assisted_listening_stream', False)),
disabled=is_streaming,
help="tells the receiver that this is an assistive listening stream"
)
with col_flags2:
immediate_rendering = st.checkbox(
"Immediate rendering",
value=bool(saved_settings.get('immediate_rendering', False)),
disabled=is_streaming,
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_pdelay:
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
presentation_delay_ms = st.number_input(
"Delay (ms)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
disabled=is_streaming,
help="Delay between capture and presentation for receivers."
)
with col_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_settings.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
qos_preset = st.selectbox(
"QoS", options=qos_options, index=default_qos_idx,
disabled=is_streaming,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
stream_name = st.text_input(
"Channel Name",
value=default_name,
disabled=is_streaming,
help="The primary name for your broadcast. Like the SSID of a WLAN, it identifies your stream for receivers."
)
program_info = st.text_input(
"Program Info",
value=default_program_info,
disabled=is_streaming,
help="Additional details about the broadcast program, such as its content or purpose. Shown to receivers for more context."
)
language = st.text_input(
"Language (ISO 639-3)",
value=default_lang,
disabled=is_streaming,
help="Three-letter language code (e.g., 'eng' for English, 'deu' for German). Used by receivers to display the language of the stream. See: https://en.wikipedia.org/wiki/List_of_ISO_639-3_codes"
)
if audio_mode in ("USB", "Network"):
if not is_streaming:
try:
if audio_mode == "USB":
endpoint = "/audio_inputs_pw_usb"
elif audio_mode == "Network":
endpoint = "/audio_inputs_pw_network"
resp = requests.get(f"{BACKEND_URL}{endpoint}")
device_list = resp.json().get('inputs', [])
except Exception as e:
st.error(f"Failed to fetch devices: {e}")
device_list = []
if audio_mode == "USB":
device_list = [d for d in device_list if d.get('name') not in ('ch1', 'ch2')]
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
device_names = [d['name'] for d in device_list]
default_input_name = saved_settings.get('input_device')
# If saved device isn't in current mode's device list, use first available
if default_input_name not in device_names and device_names:
default_input_name = device_names[0]
default_input_label = None
for label, name in option_name_map.items():
if name == default_input_name:
default_input_label = label
break
if not input_options:
warn_text = (
"No USB audio input devices found. Connect a USB input and click Refresh."
if audio_mode == "USB" else
"No AES67/Network inputs found."
)
st.warning(warn_text)
refresh_key = "refresh_usb" if audio_mode == "USB" else "refresh_aes67"
if st.button("Refresh", key=refresh_key, disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
if not r.ok:
st.error(f"Failed to refresh: {r.text}")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
input_device = None
else:
col1, col2 = st.columns([3, 1], vertical_alignment="bottom")
with col1:
selected_option = st.selectbox(
"Input Device",
input_options,
index=input_options.index(default_input_label) if default_input_label in input_options else 0
)
with col2:
refresh_key = "refresh_inputs" if audio_mode == "USB" else "refresh_inputs_net"
if st.button("Refresh", key=refresh_key, disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
if not r.ok:
st.error(f"Failed to refresh: {r.text}")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
input_device = option_name_map.get(selected_option)
else:
input_device = saved_settings.get('input_device')
current_label = input_device or "No device selected"
st.selectbox(
"Input Device",
[current_label],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
else:
input_device = None
if stop_stream:
st.session_state['stream_started'] = False
try:
r = requests.post(f"{BACKEND_URL}/stop_audio").json()
if audio_mode == "Demo":
st.session_state['demo_stream_started'] = False
if r['was_running']:
is_stopped = True
except Exception as e:
st.error(f"Error: {e}")
if start_stream:
# Always send stop to ensure backend is in a clean state, regardless of current status
r = requests.post(f"{BACKEND_URL}/stop_audio").json()
# Small pause lets backend fully release audio devices before re-init
time.sleep(1)
if audio_mode == "Demo":
demo_cfg = demo_stream_map[demo_selected]
q = QUALITY_MAP[demo_cfg['quality']]
lang_cfgs = [
(auracast_config.AuracastBigConfigDeu, 'de'),
(auracast_config.AuracastBigConfigEng, 'en'),
(auracast_config.AuracastBigConfigFra, 'fr'),
(auracast_config.AuracastBigConfigSpa, 'es'),
(auracast_config.AuracastBigConfigIta, 'it'),
(auracast_config.AuracastBigConfigPol, 'pl'),
]
bigs1 = []
for i in range(demo_cfg['streams']):
cfg_cls, lang = lang_cfgs[i % len(lang_cfgs)]
bigs1.append(cfg_cls(
code=(stream_passwort.strip() or None),
audio_source=f'file:../testdata/wave_particle_5min_{lang}_{int(q["rate"]/1000)}kHz_mono.wav',
iso_que_len=32,
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
))
max_per_mc = {48000: 1, 24000: 2, 16000: 3}
max_streams = max_per_mc.get(q['rate'], 3)
bigs2 = []
if len(bigs1) > max_streams:
bigs2 = bigs1[max_streams:]
bigs1 = bigs1[:max_streams]
config1 = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='',
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=int(presentation_delay_ms * 1000),
qos_config=QOS_PRESET_MAP[qos_preset],
bigs=bigs1
)
config2 = None
if bigs2:
config2 = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='',
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=int(presentation_delay_ms * 1000),
qos_config=QOS_PRESET_MAP[qos_preset],
bigs=bigs2
)
is_started = False
try:
r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump())
if r1.status_code == 200:
st.session_state['demo_stream_started'] = True
is_started = True
else:
st.session_state['demo_stream_started'] = False
st.error(f"Failed to initialize multicaster 1: {r1.text}")
if config2:
r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump())
if r2.status_code == 200:
is_started = True
else:
st.error(f"Failed to initialize multicaster 2: {r2.text}")
except Exception as e:
st.session_state['demo_stream_started'] = False
st.error(f"Error: {e}")
if audio_mode == "Analog":
# Build separate configs per radio, each with its own quality and QoS parameters.
is_started = False
def _build_group_from_radio(cfg: dict) -> auracast_config.AuracastConfigGroup | None:
if not cfg or not cfg.get('input_device'):
return None
q = QUALITY_MAP[cfg['quality']]
# Determine if this is stereo mode (only applicable for analog)
stereo_mode = cfg.get('stereo_mode', False)
channels = 2 if stereo_mode else 1
return auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='', # is set in backend
assisted_listening_stream=bool(cfg['assisted_listening']),
immediate_rendering=bool(cfg['immediate_rendering']),
presentation_delay_us=int(cfg['presentation_delay_ms'] * 1000),
qos_config=QOS_PRESET_MAP[cfg['qos_preset']],
bigs=[
auracast_config.AuracastBigConfig(
id=cfg.get('id', 123456),
code=(cfg['stream_passwort'].strip() or None),
name=cfg['name'],
program_info=cfg['program_info'],
language=cfg['language'],
audio_source=f"device:{cfg['input_device']}",
input_format=f"int16le,{q['rate']},{channels}",
iso_que_len=1,
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
num_bis=channels, # 1=mono, 2=stereo - this determines the behavior
)
],
)
# Radio 1 (always active if a device is selected)
config1 = _build_group_from_radio(radio1_cfg)
# Radio 2 (optional)
config2 = _build_group_from_radio(radio2_cfg) if radio2_enabled else None
try:
if config1 is not None:
r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump())
if r1.status_code == 200:
is_started = True
else:
st.error(f"Failed to initialize Radio 1: {r1.text}")
else:
st.error("Radio 1 has no valid input device configured.")
if config2 is not None:
r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump())
if r2.status_code != 200:
st.error(f"Failed to initialize Radio 2: {r2.text}")
except Exception as e:
st.error(f"Error while starting Analog radios: {e}")
elif audio_mode == "Network - Dante":
# Build multi-stream configs for Dante radios
is_started = False
def _build_dante_radio_config(radio_cfg: dict, radio_id: int) -> auracast_config.AuracastConfigGroup | None:
if not radio_cfg or not radio_cfg.get('streams'):
return None
q = QUALITY_MAP[radio_cfg['quality']]
bigs = []
# Check if stereo mode is enabled for this radio
is_stereo_mode = bool(radio_cfg.get('dante_stereo_mode', False))
for i, stream in enumerate(radio_cfg['streams']):
if not stream.get('input_device'):
continue
stream_id = radio_id * 1000 + i + 1 # Unique ID per stream
# Check if this specific stream uses stereo (dante_stereo_X_Y device)
input_device = stream['input_device']
stream_is_stereo = is_stereo_mode or input_device.startswith('dante_stereo_')
num_bis = 2 if stream_is_stereo else 1
num_channels = 2 if stream_is_stereo else 1
bigs.append(auracast_config.AuracastBigConfig(
id=stream_id,
code=(stream.get('stream_password', '').strip() or None),
name=stream['name'],
program_info=stream['program_info'],
language=stream['language'],
audio_source=f"device:{input_device}",
input_format=f"int16le,{q['rate']},{num_channels}",
iso_que_len=1,
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
num_bis=num_bis,
))
if not bigs:
return None
return auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='', # is set in backend
assisted_listening_stream=bool(radio_cfg['assisted_listening']),
immediate_rendering=bool(radio_cfg['immediate_rendering']),
presentation_delay_us=int(radio_cfg['presentation_delay_ms'] * 1000),
qos_config=QOS_PRESET_MAP[radio_cfg['qos_preset']],
bigs=bigs
)
# Radio 1 config
config1 = _build_dante_radio_config(radio1_cfg, 1)
# Radio 2 config (optional)
config2 = _build_dante_radio_config(radio2_cfg, 2) if radio2_cfg else None
try:
if config1 is not None:
r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump())
if r1.status_code == 200:
is_started = True
else:
st.error(f"Failed to initialize Dante Radio 1: {r1.text}")
else:
st.error("Dante Radio 1 has no valid input devices configured.")
if config2 is not None:
r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump())
if r2.status_code != 200:
st.error(f"Failed to initialize Dante Radio 2: {r2.text}")
except Exception as e:
st.error(f"Error while starting Dante radios: {e}")
if audio_mode not in ("Demo", "Analog", "Network - Dante"):
# USB/Network: single config as before, using shared controls
q = QUALITY_MAP[quality]
config = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='', # is set in backend
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=int(presentation_delay_ms * 1000),
qos_config=QOS_PRESET_MAP[qos_preset],
bigs=[
auracast_config.AuracastBigConfig(
code=(stream_passwort.strip() or None),
name=stream_name,
program_info=program_info,
language=language,
audio_source=(f"device:{input_device}"),
input_format=(f"int16le,{q['rate']},1"),
iso_que_len=1,
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
),
],
)
try:
r = requests.post(f"{BACKEND_URL}/init", json=config.model_dump())
if r.status_code == 200:
is_started = True
else:
st.error(f"Failed to initialize: {r.text}")
except Exception as e:
st.error(f"Error: {e}")
# Centralized rerun based on start/stop outcomes
if is_started or is_stopped:
st.rerun()
#else:
#st.header("Advertised Streams (Cloud Announcements)")
#st.info("This feature requires backend support to list advertised streams.")
# st.info("This feature requires backend support to list advertised streams.")
# Placeholder for future implementation
# Example: r = requests.get(f"{BACKEND_URL}/advertised_streams")
# if r.status_code == 200:
# streams = r.json()
# for s in streams:
# st.write(s)
# else:
# st.error("Could not fetch advertised streams.")
############################
# System expander (collapsed)
############################
with st.expander("System control", expanded=False):
st.subheader("System temperatures")
temp_col1, temp_col2, temp_col3 = st.columns([1, 1, 1])
with temp_col1:
refresh_temps = st.button("Refresh", key="refresh_temps_1")
try:
case_temp = read_case_temp()
cpu_temp = read_cpu_temp()
with temp_col2:
st.write(f"CPU: {cpu_temp} °C")
with temp_col3:
st.write(f"Case: {case_temp} °C")
except Exception as e:
st.warning(f"Could not read temperatures: {e}")
st.subheader("CA Certificate")
st.caption("Download the CA certificate to trust this device's HTTPS connection.")
try:
cert_resp = requests.get(f"{BACKEND_URL}/cert", timeout=2)
if cert_resp.status_code == 200:
st.download_button(
label="Download CA Certificate",
data=cert_resp.content,
file_name="ca_cert.pem",
mime="application/x-pem-file",
)
else:
st.warning("CA certificate not available.")
except Exception as e:
st.warning(f"Could not fetch CA certificate: {e}")
st.subheader("Change password")
if is_pw_disabled():
st.info("Frontend password protection is disabled via DISABLE_FRONTEND_PW.")
else:
with st.form("change_pw_form"):
cur = st.text_input("Current password", type="password")
new1 = st.text_input("New password", type="password")
new2 = st.text_input("Confirm new password", type="password")
submit_change = st.form_submit_button("Change password")
if submit_change:
rec = load_pw_record()
if not rec or not verify_password(cur, rec):
st.error("Current password is incorrect.")
elif len(new1) < 6:
st.error("New password should be at least 6 characters.")
elif new1 != new2:
st.error("New passwords do not match.")
else:
salt, key = hash_password(new1)
try:
save_pw_record(salt, key)
st.success("Password updated.")
except Exception as e:
st.error(f"Failed to update password: {e}")
st.subheader("Software Version")
# Show current version
try:
ver_resp = requests.get(f"{BACKEND_URL}/version", timeout=2)
if ver_resp.ok:
ver_data = ver_resp.json()
current_version = ver_data.get('version', 'unknown')
ver_type = ver_data.get('type', '')
ver_label = current_version if ver_type == 'tag' else f"{current_version} (dev)"
st.write(f"**Current version:** {ver_label}")
else:
st.write("**Current version:** unknown")
current_version = "unknown"
except Exception:
st.write("**Current version:** unknown")
current_version = "unknown"
# Initialize session state for update check
if 'available_update' not in st.session_state:
st.session_state['available_update'] = None
col_check, col_status = st.columns([1, 2])
with col_check:
if st.button("Check for updates"):
try:
check_resp = requests.get(f"{BACKEND_URL}/check_update", timeout=30)
if check_resp.ok:
check_data = check_resp.json()
if check_data.get('error'):
st.session_state['available_update'] = {'error': check_data['error']}
else:
st.session_state['available_update'] = check_data
else:
st.session_state['available_update'] = {'error': f"Failed: {check_resp.status_code}"}
except Exception as e:
st.session_state['available_update'] = {'error': str(e)}
st.rerun()
with col_status:
if st.session_state['available_update']:
upd = st.session_state['available_update']
if upd.get('error'):
st.warning(f"Check failed: {upd['error']}")
elif upd.get('update_available'):
st.info(f"Update available: **{upd['available']}**")
else:
st.success("You are on the latest version.")
# Update button (only show if update is available)
if st.session_state['available_update'] and st.session_state['available_update'].get('update_available'):
if st.button("Update now"):
try:
r = requests.post(f"{BACKEND_URL}/system_update", timeout=120)
if r.ok:
result = r.json()
tag = result.get('tag', 'unknown')
st.success(f"Update to {tag} initiated. The UI will restart shortly.")
st.session_state['available_update'] = None
else:
st.error(f"Failed to update: {r.status_code} {r.text}")
except Exception as e:
st.error(f"Error calling update: {e}")
st.subheader("Restart DEP")
if st.button("Restart DEP"):
try:
r = requests.post(f"{BACKEND_URL}/restart_dep", timeout=30)
if r.ok:
result = r.json()
st.success(result.get('message', 'DEP restarted successfully.'))
else:
st.error(f"Failed to restart DEP: {r.status_code} {r.text}")
except Exception as e:
st.error(f"Error restarting DEP: {str(e)}")
st.subheader("Reboot")
if st.button("Reboot now", type="primary"):
try:
r = requests.post(f"{BACKEND_URL}/system_reboot", timeout=1)
if r.ok:
st.success("Reboot initiated. The UI will become unreachable shortly.")
else:
st.error(f"Failed to reboot: {r.status_code} {r.text}")
except Exception as e:
st.error(f"Error calling reboot: {e}")
############################
# Record expander (collapsed)
############################
with st.expander("Record", expanded=False):
st.subheader("ALSA Device Recording")
# Get ALSA devices from backend
try:
resp = requests.get(f"{BACKEND_URL}/alsa_devices", timeout=2)
if resp.status_code == 200:
alsa_devices = resp.json().get('devices', [])
else:
alsa_devices = []
except Exception:
alsa_devices = []
# ALSA device selection dropdown
device_options = [f"{d['name']} [{d['id']}]" for d in alsa_devices]
device_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in alsa_devices}
if device_options:
selected_device = st.selectbox(
"Select ALSA Device",
device_options,
help="Choose an ALSA device for recording"
)
selected_device_name = device_name_map.get(selected_device)
else:
st.warning("No ALSA devices found.")
selected_device_name = None
# Recording controls
col_record, col_download = st.columns([1, 1])
with col_record:
if st.button("Start Recording (5s)", disabled=not selected_device_name):
try:
r = requests.post(f"{BACKEND_URL}/start_recording", json={"device": selected_device_name}, timeout=15)
if r.ok:
result = r.json()
if result.get('success'):
st.success(f"Recording completed: {result.get('filename')}")
st.session_state['last_recording'] = result.get('filename')
else:
st.error(f"Recording failed: {result.get('error', 'Unknown error')}")
else:
st.error(f"Failed to start recording: {r.status_code} {r.text}")
except Exception as e:
st.error(f"Error starting recording: {e}")
with col_download:
last_recording = st.session_state.get('last_recording')
if last_recording:
try:
# Get the recorded file for download
file_resp = requests.get(f"{BACKEND_URL}/download_recording/{last_recording}", timeout=5)
if file_resp.status_code == 200:
st.download_button(
label="Download Last Recording",
data=file_resp.content,
file_name=last_recording,
mime="audio/wav"
)
else:
st.warning("Recording file not available")
except Exception as e:
st.warning(f"Could not fetch recording: {e}")
else:
st.button("Download Last Recording", disabled=True, help="No recording available yet")
log.basicConfig(
level=os.environ.get('LOG_LEVEL', log.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)