2506 lines
117 KiB
Python
2506 lines
117 KiB
Python
# frontend/app.py
|
||
import os
|
||
import time
|
||
import math
|
||
import logging as log
|
||
from PIL import Image
|
||
|
||
import requests
|
||
from dotenv import load_dotenv
|
||
import streamlit as st
|
||
|
||
from auracast.utils.read_temp import read_case_temp, read_cpu_temp
|
||
|
||
from auracast import auracast_config
|
||
from auracast.utils.frontend_auth import (
|
||
is_pw_disabled,
|
||
load_pw_record,
|
||
save_pw_record,
|
||
hash_password,
|
||
verify_password,
|
||
)
|
||
|
||
# Set page configuration (tab title and icon) before using other Streamlit APIs
|
||
# Always use the favicon from the utils folder relative to this file
|
||
_THIS_DIR = os.path.dirname(__file__)
|
||
_FAVICON_PATH = os.path.abspath(os.path.join(_THIS_DIR, '..', 'utils', 'favicon.ico'))
|
||
favicon = Image.open(_FAVICON_PATH)
|
||
st.set_page_config(page_title="Beacon", page_icon=favicon, layout="centered")
|
||
|
||
# Load environment variables from a .env file if present
|
||
load_dotenv()
|
||
|
||
# Track whether WebRTC stream is active across Streamlit reruns
|
||
if 'stream_started' not in st.session_state:
|
||
st.session_state['stream_started'] = False
|
||
|
||
# Frontend authentication gate is controlled via env using shared utils
|
||
|
||
if 'frontend_authenticated' not in st.session_state:
|
||
st.session_state['frontend_authenticated'] = False
|
||
|
||
if not is_pw_disabled():
|
||
pw_rec = load_pw_record()
|
||
|
||
# First-time setup: no password set -> force user to choose one
|
||
if pw_rec is None:
|
||
st.header("Set up your frontend password")
|
||
st.info("For security, you must set a password on first access.")
|
||
with st.form("first_setup_form"):
|
||
new_pw = st.text_input("New password", type="password")
|
||
new_pw2 = st.text_input("Confirm password", type="password")
|
||
submitted = st.form_submit_button("Save password")
|
||
if submitted:
|
||
if len(new_pw) < 6:
|
||
st.error("Password should be at least 6 characters.")
|
||
elif new_pw != new_pw2:
|
||
st.error("Passwords do not match.")
|
||
else:
|
||
salt, key = hash_password(new_pw)
|
||
try:
|
||
save_pw_record(salt, key)
|
||
st.success("Password saved. You can now sign in.")
|
||
st.rerun()
|
||
except Exception as e:
|
||
st.error(f"Failed to save password: {e}")
|
||
st.stop()
|
||
|
||
# Normal sign-in gate
|
||
if not st.session_state['frontend_authenticated']:
|
||
st.header("Sign in")
|
||
with st.form("signin_form"):
|
||
pw = st.text_input("Password", type="password")
|
||
submitted = st.form_submit_button("Sign in")
|
||
st.components.v1.html(
|
||
"<script>setTimeout(()=>window.parent.document.querySelector('input[type=\"password\"]')?.focus(),100)</script>",
|
||
height=0
|
||
)
|
||
if submitted:
|
||
if verify_password(pw, pw_rec):
|
||
st.session_state['frontend_authenticated'] = True
|
||
st.success("Signed in.")
|
||
st.rerun()
|
||
else:
|
||
st.error("Incorrect password. Please try again.")
|
||
# Stop rendering the rest of the app until authenticated
|
||
if not st.session_state['frontend_authenticated']:
|
||
st.stop()
|
||
|
||
BACKEND_URL = "http://localhost:5000"
|
||
|
||
QUALITY_MAP = {
|
||
"High (48kHz)": {"rate": 48000, "octets": 120},
|
||
"Good (32kHz)": {"rate": 32000, "octets": 80},
|
||
"Medium (24kHz)": {"rate": 24000, "octets": 60},
|
||
"Fair (16kHz)": {"rate": 16000, "octets": 40},
|
||
}
|
||
|
||
QOS_PRESET_MAP = {
|
||
"Fast": auracast_config.AuracastQosFast(),
|
||
"Robust": auracast_config.AuracastQosRobust(),
|
||
}
|
||
|
||
# Discrete advertising TX power steps in dBm supported by the Nordic SDC radio
|
||
# PA. Sent through HCI_LE_Set_Extended_Advertising_Parameters; the controller
|
||
# clamps to the nearest hardware step.
|
||
TX_POWER_OPTIONS = [8, 7, 6, 5, 4, 3, 2, 0, -4, -8, -12, -16, -20]
|
||
TX_POWER_DEFAULT = 8
|
||
|
||
|
||
def _coerce_tx_power(value, default: int = TX_POWER_DEFAULT) -> int:
|
||
try:
|
||
v = int(value)
|
||
except (TypeError, ValueError):
|
||
return default
|
||
if v in TX_POWER_OPTIONS:
|
||
return v
|
||
return min(TX_POWER_OPTIONS, key=lambda s: abs(s - v))
|
||
|
||
|
||
def _tx_power_selectbox(label: str, key: str, default: int, disabled: bool, help_text: str | None = None) -> int:
|
||
snapped = _coerce_tx_power(default)
|
||
idx = TX_POWER_OPTIONS.index(snapped)
|
||
return st.selectbox(
|
||
label,
|
||
TX_POWER_OPTIONS,
|
||
index=idx,
|
||
key=key,
|
||
format_func=lambda v: f"{v:+d} dBm",
|
||
disabled=disabled,
|
||
help=help_text or "Bluetooth advertising TX power for this radio. Higher values increase range; lower values reduce interference and power draw.",
|
||
)
|
||
|
||
# Try loading persisted settings from backend
|
||
saved_settings = {}
|
||
try:
|
||
resp = requests.get(f"{BACKEND_URL}/status", timeout=1)
|
||
if resp.status_code == 200:
|
||
saved_settings = resp.json()
|
||
except Exception:
|
||
saved_settings = {}
|
||
|
||
# Define is_streaming early from the fetched status for use throughout the UI
|
||
is_streaming = bool(saved_settings.get("is_streaming", False))
|
||
textcast_is_streaming = bool(saved_settings.get("textcast_is_streaming", False))
|
||
|
||
# Extract secondary status, if provided by the backend /status endpoint.
|
||
secondary_status = saved_settings.get("secondary") or {}
|
||
secondary_is_streaming = bool(saved_settings.get("secondary_is_streaming", secondary_status.get("is_streaming", False)))
|
||
|
||
def validate_unique_input_devices(radio1_streams, radio2_streams):
|
||
"""Validate that input devices are unique across all streams within each radio."""
|
||
# Check Radio 1 devices
|
||
r1_devices = [s['input_device'] for s in radio1_streams if s.get('input_device')]
|
||
if len(r1_devices) != len(set(r1_devices)):
|
||
return False, "Duplicate input devices detected in Radio 1 streams. Each stream must use a unique input device."
|
||
|
||
# Check Radio 2 devices
|
||
r2_devices = [s['input_device'] for s in radio2_streams if s.get('input_device')]
|
||
if len(r2_devices) != len(set(r2_devices)):
|
||
return False, "Duplicate input devices detected in Radio 2 streams. Each stream must use a unique input device."
|
||
|
||
return True, ""
|
||
|
||
st.title("Auracast Audio Mode Control")
|
||
|
||
def render_stream_controls(status_streaming: bool, start_label: str, stop_label: str, mode_label: str, secondary_streaming: bool = False):
|
||
c_start, c_stop, c_status = st.columns([1, 1, 3], gap="small", vertical_alignment="center")
|
||
with c_start:
|
||
start_clicked = st.button(start_label, disabled=status_streaming)
|
||
with c_stop:
|
||
stop_clicked = st.button(stop_label, disabled=not status_streaming)
|
||
with c_status:
|
||
if status_streaming:
|
||
if secondary_streaming:
|
||
status_text = "🟢 Streaming on both Radios"
|
||
else:
|
||
status_text = "🟢 Streaming on Radio 1"
|
||
else:
|
||
status_text = "🔴 Stopped"
|
||
st.write(f"Mode: {mode_label} · {status_text}")
|
||
return start_clicked, stop_clicked
|
||
|
||
# Audio mode selection with persisted default
|
||
# Note: backend persists 'USB' for any device:<name> source (including AES67). We default to 'USB' in that case.
|
||
options = [
|
||
"Demo",
|
||
"Analog",
|
||
"Network - Dante",
|
||
"TextCast",
|
||
]
|
||
saved_audio_mode = saved_settings.get("audio_mode", "Demo")
|
||
if saved_audio_mode not in options:
|
||
# Map legacy/unknown modes to closest
|
||
mapping = {"USB/Network": "USB", "AES67": "Network"}
|
||
saved_audio_mode = mapping.get(saved_audio_mode, "Demo")
|
||
|
||
audio_mode = st.selectbox(
|
||
"Audio Mode",
|
||
options,
|
||
index=options.index(saved_audio_mode) if saved_audio_mode in options else options.index("Demo"),
|
||
disabled=is_streaming or textcast_is_streaming,
|
||
help=(
|
||
"Select the audio input source. Choose 'USB' for a connected USB audio device (via PipeWire), "
|
||
"'Network' (AES67) for network RTP/AES67 sources, "
|
||
"'Network - Dante' for Dante inputs via ALSA, "
|
||
"or 'Demo' for a simulated stream."
|
||
)
|
||
)
|
||
|
||
# Determine the displayed mode label:
|
||
# - While streaming, prefer the backend-reported mode
|
||
# - When not streaming, show the currently selected mode
|
||
backend_mode_raw = saved_settings.get("audio_mode")
|
||
backend_mode_mapped = None
|
||
if isinstance(backend_mode_raw, str):
|
||
if backend_mode_raw == "AES67":
|
||
backend_mode_mapped = "Network"
|
||
elif backend_mode_raw == "USB/Network":
|
||
backend_mode_mapped = "USB"
|
||
elif backend_mode_raw in options:
|
||
backend_mode_mapped = backend_mode_raw
|
||
|
||
# When Analog is selected in the UI we always show it as such, even though the
|
||
# backend currently persists USB for all device sources.
|
||
if audio_mode == "Analog":
|
||
running_mode = "Analog"
|
||
else:
|
||
running_mode = backend_mode_mapped if (is_streaming and backend_mode_mapped) else audio_mode
|
||
|
||
# Start/Stop buttons and status (moved to top)
|
||
if audio_mode == "TextCast":
|
||
start_stream, stop_stream = render_stream_controls(textcast_is_streaming, "Start TextCast", "Stop TextCast", "TextCast", False)
|
||
elif audio_mode == "Demo":
|
||
start_stream, stop_stream = render_stream_controls(is_streaming, "Start Demo", "Stop Demo", running_mode, secondary_is_streaming)
|
||
else:
|
||
start_stream, stop_stream = render_stream_controls(is_streaming, "Start Auracast", "Stop Auracast", running_mode, secondary_is_streaming)
|
||
|
||
# TextCast: DCP XML file uploader
|
||
if audio_mode == "TextCast":
|
||
st.markdown("#### DCP Subtitle File")
|
||
dcp_file = st.file_uploader(
|
||
"Upload DCP XML subtitle file (.xml)",
|
||
type=["xml"],
|
||
disabled=textcast_is_streaming,
|
||
help="Upload a DCP-compliant subtitle XML file. Subtitles will be broadcast over Auracast.",
|
||
)
|
||
if dcp_file is not None:
|
||
content = dcp_file.read().decode("utf-8", errors="replace")
|
||
st.session_state['_textcast_dcp_content'] = content
|
||
st.session_state['_textcast_dcp_name'] = dcp_file.name
|
||
st.success(f"Loaded: {dcp_file.name} ({len(content):,} bytes)")
|
||
elif st.session_state.get('_textcast_dcp_name'):
|
||
st.info(f"Using previously uploaded file: {st.session_state['_textcast_dcp_name']}")
|
||
else:
|
||
st.warning("No subtitle file loaded. Upload a DCP XML file or use the sample below.")
|
||
if st.button("Load sample subtitle file", disabled=textcast_is_streaming):
|
||
import os as _os
|
||
_sample = _os.path.abspath(_os.path.join(
|
||
_os.path.dirname(__file__), '..', 'testdata', 'sample_subtitles.xml'))
|
||
try:
|
||
with open(_sample, 'r', encoding='utf-8') as _f:
|
||
_content = _f.read()
|
||
st.session_state['_textcast_dcp_content'] = _content
|
||
st.session_state['_textcast_dcp_name'] = 'sample_subtitles.xml'
|
||
st.rerun()
|
||
except Exception as _e:
|
||
st.error(f"Could not load sample: {_e}")
|
||
|
||
# Analog gain control (only for Analog mode, placed below start button)
|
||
analog_gain_db_left = 0 # default (dB)
|
||
analog_gain_db_right = 0 # default (dB)
|
||
if audio_mode == "Analog":
|
||
if '_analog_gain_db_left' not in st.session_state:
|
||
st.session_state['_analog_gain_db_left'] = max(-12, min(18, int(saved_settings.get('analog_gain_db_left', 0))))
|
||
if '_analog_gain_db_right' not in st.session_state:
|
||
st.session_state['_analog_gain_db_right'] = max(-12, min(18, int(saved_settings.get('analog_gain_db_right', 0))))
|
||
if '_gain_link_channels' not in st.session_state:
|
||
st.session_state['_gain_link_channels'] = True
|
||
|
||
link_channels = st.checkbox(
|
||
"Link audio channel gain",
|
||
key='_gain_link_channels',
|
||
help="When enabled, Ch 2 mirrors Ch 1."
|
||
)
|
||
_gain_col1, _gain_col2 = st.columns(2)
|
||
with _gain_col1:
|
||
analog_gain_db_left = st.slider(
|
||
"Ch 1 Input Gain",
|
||
min_value=-12,
|
||
max_value=18,
|
||
key='_analog_gain_db_left',
|
||
step=1,
|
||
format="%d dB",
|
||
help="ADC gain for channel 1 (-12 to 18 dB). Default is 0 dB."
|
||
)
|
||
with _gain_col2:
|
||
if link_channels:
|
||
st.session_state['_analog_gain_db_right'] = analog_gain_db_left
|
||
elif st.session_state.get('_prev_gain_link_channels', True):
|
||
# Transition: just unlinked — seed Ch 2 from Ch 1 so it doesn't jump to min
|
||
st.session_state['_analog_gain_db_right'] = analog_gain_db_left
|
||
st.session_state['_prev_gain_link_channels'] = link_channels
|
||
analog_gain_db_right = st.slider(
|
||
"Ch 2 Input Gain",
|
||
min_value=-12,
|
||
max_value=18,
|
||
key='_analog_gain_db_right',
|
||
step=1,
|
||
format="%d dB",
|
||
disabled=link_channels,
|
||
help="Uncheck 'Link audio channel gain' to adjust Ch 2 independently." if link_channels else "ADC gain for channel 2 (-12 to 18 dB). Default is 0 dB."
|
||
)
|
||
# Apply gain live while streaming whenever either slider value changes
|
||
if is_streaming:
|
||
prev_left = st.session_state.get('_prev_analog_gain_db_left')
|
||
prev_right = st.session_state.get('_prev_analog_gain_db_right')
|
||
if prev_left != analog_gain_db_left or prev_right != analog_gain_db_right:
|
||
try:
|
||
requests.post(
|
||
f"{BACKEND_URL}/adc_gain",
|
||
json={"gain_db_left": analog_gain_db_left, "gain_db_right": analog_gain_db_right},
|
||
timeout=1,
|
||
)
|
||
except Exception:
|
||
pass
|
||
st.session_state['_prev_analog_gain_db_left'] = analog_gain_db_left
|
||
st.session_state['_prev_analog_gain_db_right'] = analog_gain_db_right
|
||
|
||
# Audio level monitor (checkbox, not persisted across reloads)
|
||
show_level_monitor = st.checkbox("Audio level monitor", value=False, disabled=not is_streaming,
|
||
help="Show real-time audio level meters for active radios. Only works while streaming.")
|
||
|
||
if show_level_monitor and is_streaming:
|
||
@st.fragment(run_every=0.2)
|
||
def _audio_level_fragment():
|
||
cols = st.columns(2)
|
||
# Radio 1
|
||
with cols[0]:
|
||
try:
|
||
r = requests.get(f"{BACKEND_URL}/audio_level", timeout=0.2)
|
||
levels = r.json().get("levels", []) if r.ok else []
|
||
except Exception:
|
||
levels = []
|
||
if levels:
|
||
rms = max(levels)
|
||
db = max(-60.0, 20.0 * (math.log10(rms) if rms > 0 else -3.0))
|
||
pct = int(max(0, min(100, (db + 60) * 100 / 60)))
|
||
st.markdown(
|
||
f"**Radio 1**"
|
||
f'<div style="background:#333;border-radius:4px;height:18px;width:100%;margin-top:4px;">'
|
||
f'<div style="background:#2ecc71;height:100%;width:{pct}%;border-radius:4px;transition:width 0.15s;"></div>'
|
||
f'</div>',
|
||
unsafe_allow_html=True,
|
||
)
|
||
else:
|
||
st.markdown("**Radio 1** --")
|
||
# Radio 2
|
||
with cols[1]:
|
||
try:
|
||
r2 = requests.get(f"{BACKEND_URL}/audio_level2", timeout=0.2)
|
||
levels2 = r2.json().get("levels", []) if r2.ok else []
|
||
except Exception:
|
||
levels2 = []
|
||
if levels2:
|
||
rms2 = max(levels2)
|
||
db2 = max(-60.0, 20.0 * (math.log10(rms2) if rms2 > 0 else -3.0))
|
||
pct2 = int(max(0, min(100, (db2 + 60) * 100 / 60)))
|
||
st.markdown(
|
||
f"**Radio 2**"
|
||
f'<div style="background:#333;border-radius:4px;height:18px;width:100%;margin-top:4px;">'
|
||
f'<div style="background:#2ecc71;height:100%;width:{pct2}%;border-radius:4px;transition:width 0.15s;"></div>'
|
||
f'</div>',
|
||
unsafe_allow_html=True,
|
||
)
|
||
else:
|
||
st.markdown("**Radio 2** --")
|
||
_audio_level_fragment()
|
||
|
||
# Placeholder for validation errors (will be filled in later)
|
||
validation_error_placeholder = st.empty()
|
||
|
||
is_started = False
|
||
is_stopped = False
|
||
|
||
if audio_mode == "Demo":
|
||
demo_stream_map = {
|
||
"1 × 48kHz": {"quality": "High (48kHz)", "streams": 1},
|
||
"1 × 24kHz": {"quality": "Medium (24kHz)", "streams": 1},
|
||
"1 × 16kHz": {"quality": "Fair (16kHz)", "streams": 1},
|
||
"2 × 24kHz": {"quality": "Medium (24kHz)", "streams": 2},
|
||
"3 × 16kHz": {"quality": "Fair (16kHz)", "streams": 3},
|
||
"2 × 48kHz": {"quality": "High (48kHz)", "streams": 2},
|
||
"4 × 24kHz": {"quality": "Medium (24kHz)", "streams": 4},
|
||
"6 × 16kHz": {"quality": "Fair (16kHz)", "streams": 6},
|
||
}
|
||
demo_options = list(demo_stream_map.keys())
|
||
default_index = 0
|
||
saved_type = saved_settings.get('demo_stream_type')
|
||
if isinstance(saved_type, str) and saved_type in demo_options:
|
||
default_index = demo_options.index(saved_type)
|
||
else:
|
||
saved_rate = saved_settings.get('auracast_sampling_rate_hz')
|
||
saved_total = saved_settings.get('demo_total_streams')
|
||
if saved_total is None and saved_settings.get('audio_mode') == 'Demo':
|
||
saved_total = len(saved_settings.get('channel_names') or [])
|
||
try:
|
||
if saved_rate and saved_total:
|
||
for i, label in enumerate(demo_options):
|
||
cfg = demo_stream_map[label]
|
||
rate_for_label = QUALITY_MAP[cfg['quality']]['rate']
|
||
if cfg['streams'] == int(saved_total) and int(saved_rate) == rate_for_label:
|
||
default_index = i
|
||
break
|
||
except Exception:
|
||
default_index = 0
|
||
demo_selected = st.selectbox(
|
||
"Demo Stream Type",
|
||
demo_options,
|
||
index=default_index,
|
||
disabled=is_streaming,
|
||
help="Select the demo stream configuration."
|
||
)
|
||
demo_content_options = ["Program material", "1 kHz test tone"]
|
||
saved_demo_content = saved_settings.get('demo_content', 'Program material')
|
||
if saved_demo_content not in demo_content_options:
|
||
saved_demo_content = 'Program material'
|
||
demo_content = st.selectbox(
|
||
"Demo Content",
|
||
demo_content_options,
|
||
index=demo_content_options.index(saved_demo_content),
|
||
disabled=is_streaming,
|
||
help="Select whether demo streams use program audio files or a continuous 1 kHz test tone."
|
||
)
|
||
# Stream password and flags (same as USB/AES67)
|
||
saved_pwd = saved_settings.get('stream_password', '') or ''
|
||
stream_passwort = st.text_input(
|
||
"Stream Passwort",
|
||
value=saved_pwd,
|
||
type=("password"),
|
||
disabled=is_streaming,
|
||
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
|
||
)
|
||
col_flags1, col_flags2, col_pdelay, col_qos = st.columns([1, 1, 0.7, 0.6], gap="small", vertical_alignment="center")
|
||
with col_flags1:
|
||
assisted_listening = st.checkbox(
|
||
"Assistive listening",
|
||
value=bool(saved_settings.get('assisted_listening_stream', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver that this is an assistive listening stream"
|
||
)
|
||
with col_flags2:
|
||
immediate_rendering = st.checkbox(
|
||
"Immediate rendering",
|
||
value=bool(saved_settings.get('immediate_rendering', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver to ignore presentation delay and render immediately if possible."
|
||
)
|
||
# QoS/presentation controls inline with flags
|
||
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
|
||
with col_pdelay:
|
||
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
|
||
presentation_delay_ms = st.number_input(
|
||
"Delay (ms)",
|
||
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
|
||
disabled=is_streaming,
|
||
help="Delay between capture and presentation for receivers."
|
||
)
|
||
with col_qos:
|
||
qos_options = list(QOS_PRESET_MAP.keys())
|
||
saved_qos = saved_settings.get('qos_preset', 'Fast')
|
||
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
|
||
qos_preset = st.selectbox(
|
||
"QoS", options=qos_options, index=default_qos_idx,
|
||
disabled=is_streaming,
|
||
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
|
||
)
|
||
# Per-radio TX power for Demo (independent for R1 and R2)
|
||
col_tx_r1, col_tx_r2 = st.columns(2, gap="small")
|
||
with col_tx_r1:
|
||
tx_power_r1 = _tx_power_selectbox(
|
||
"TX Power (R1)",
|
||
key="demo_tx_power_r1",
|
||
default=saved_settings.get('advertising_tx_power', TX_POWER_DEFAULT),
|
||
disabled=is_streaming,
|
||
)
|
||
with col_tx_r2:
|
||
tx_power_r2 = _tx_power_selectbox(
|
||
"TX Power (R2)",
|
||
key="demo_tx_power_r2",
|
||
default=saved_settings.get('secondary', {}).get('advertising_tx_power', TX_POWER_DEFAULT),
|
||
disabled=is_streaming,
|
||
)
|
||
#st.info(f"Demo mode selected: {demo_selected} (Streams: {demo_stream_map[demo_selected]['streams']}, Rate: {demo_stream_map[demo_selected]['rate']} Hz)")
|
||
quality = None # Not used in demo mode
|
||
else:
|
||
# --- Mode-specific configuration ---
|
||
default_name = saved_settings.get('channel_names', ["Broadcast0"])[0]
|
||
raw_program_info = saved_settings.get('program_info', default_name)
|
||
if isinstance(raw_program_info, list) and raw_program_info:
|
||
default_program_info = raw_program_info[0]
|
||
else:
|
||
default_program_info = raw_program_info
|
||
default_lang = saved_settings.get('languages', ["deu"])[0]
|
||
|
||
# Per-mode configuration and controls
|
||
input_device = None
|
||
radio2_enabled = False
|
||
radio1_cfg = None
|
||
radio2_cfg = None
|
||
|
||
if audio_mode == "Analog":
|
||
# --- Radio 1 controls ---
|
||
with st.container(border=True):
|
||
st.subheader("Radio 1")
|
||
|
||
# Always-enabled checkbox for Radio 1
|
||
st.checkbox(
|
||
"Radio 1 always enabled",
|
||
value=True,
|
||
disabled=True,
|
||
help="Radio 1 is always enabled, Radio 2 can be turned on or off."
|
||
)
|
||
|
||
# Stereo mode toggle for analog
|
||
stereo_enabled = st.checkbox(
|
||
"🎧 Stereo Mode",
|
||
value=bool(saved_settings.get('analog_stereo_mode', False)),
|
||
help="Enable stereo streaming for analog inputs. When enabled, ch1 becomes left channel and ch2 becomes right channel in a single stereo stream. Radio 2 will be disabled in stereo mode.",
|
||
disabled=is_streaming
|
||
)
|
||
|
||
quality_options = list(QUALITY_MAP.keys())
|
||
|
||
# Use saved settings if audio_mode matches, otherwise use analog-specific defaults
|
||
saved_audio_mode = saved_settings.get('audio_mode')
|
||
if saved_audio_mode == 'Analog':
|
||
default_name = saved_settings.get('channel_names', ["Analog_Radio_1"])[0]
|
||
raw_program_info = saved_settings.get('program_info', default_name)
|
||
if isinstance(raw_program_info, list) and raw_program_info:
|
||
default_program_info = raw_program_info[0]
|
||
else:
|
||
default_program_info = raw_program_info
|
||
default_lang = saved_settings.get('languages', ["deu"])[0]
|
||
|
||
# Map saved sampling rate to quality label
|
||
saved_rate = saved_settings.get('auracast_sampling_rate_hz')
|
||
if saved_rate == 48000:
|
||
default_quality = "High (48kHz)"
|
||
elif saved_rate == 32000:
|
||
default_quality = "Good (32kHz)"
|
||
elif saved_rate == 24000:
|
||
default_quality = "Medium (24kHz)"
|
||
elif saved_rate == 16000:
|
||
default_quality = "Fair (16kHz)"
|
||
else:
|
||
default_quality = "Medium (24kHz)"
|
||
|
||
saved_pwd = saved_settings.get('stream_password', '')
|
||
else:
|
||
# Use analog-specific defaults when switching from another mode
|
||
default_name = "Analog_Radio_1"
|
||
default_program_info = "Analog Radio Broadcast"
|
||
default_lang = "deu"
|
||
default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
|
||
saved_pwd = ''
|
||
|
||
if default_quality not in quality_options:
|
||
default_quality = quality_options[0]
|
||
quality1 = st.selectbox(
|
||
"Stream Quality (Radio 1)",
|
||
quality_options,
|
||
index=quality_options.index(default_quality),
|
||
disabled=is_streaming,
|
||
help="Select the audio sampling rate for Radio 1."
|
||
)
|
||
|
||
stream_passwort1 = st.text_input(
|
||
"Stream Passwort (Radio 1)",
|
||
value=saved_pwd,
|
||
type="password",
|
||
disabled=is_streaming,
|
||
help="Optional: Set a broadcast code for Radio 1."
|
||
)
|
||
|
||
col_r1_flags1, col_r1_flags2, col_r1_pdelay, col_r1_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
|
||
with col_r1_flags1:
|
||
assisted_listening1 = st.checkbox(
|
||
"Assistive listening (R1)",
|
||
value=bool(saved_settings.get('assisted_listening_stream', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver that this is an assistive listening stream"
|
||
)
|
||
with col_r1_flags2:
|
||
immediate_rendering1 = st.checkbox(
|
||
"Immediate rendering (R1)",
|
||
value=bool(saved_settings.get('immediate_rendering', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver to ignore presentation delay and render immediately if possible."
|
||
)
|
||
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
|
||
with col_r1_pdelay:
|
||
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
|
||
presentation_delay_ms1 = st.number_input(
|
||
"Delay (ms, R1)",
|
||
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
|
||
disabled=is_streaming,
|
||
help="Delay between capture and presentation for Radio 1."
|
||
)
|
||
with col_r1_qos:
|
||
qos_options = list(QOS_PRESET_MAP.keys())
|
||
saved_qos = saved_settings.get('qos_preset', 'Fast')
|
||
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
|
||
qos_preset1 = st.selectbox(
|
||
"QoS (R1)", options=qos_options, index=default_qos_idx,
|
||
disabled=is_streaming,
|
||
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
|
||
)
|
||
|
||
tx_power_r1 = _tx_power_selectbox(
|
||
"TX Power (R1)",
|
||
key="analog_tx_power_r1",
|
||
default=saved_settings.get('advertising_tx_power', TX_POWER_DEFAULT),
|
||
disabled=is_streaming,
|
||
)
|
||
|
||
col_r1_name, col_r1_lang = st.columns([2, 1])
|
||
with col_r1_name:
|
||
stream_name1 = st.text_input(
|
||
"Channel Name (Radio 1)",
|
||
value=default_name,
|
||
disabled=is_streaming,
|
||
help="Name for the first analog radio (Radio 1)."
|
||
)
|
||
with col_r1_lang:
|
||
language1 = st.text_input(
|
||
"Language (ISO 639-3) (Radio 1)",
|
||
value=default_lang,
|
||
disabled=is_streaming,
|
||
help="Language code for Radio 1."
|
||
)
|
||
program_info1 = st.text_input(
|
||
"Program Info (Radio 1)",
|
||
value=default_program_info,
|
||
disabled=is_streaming,
|
||
help="Program information for Radio 1."
|
||
)
|
||
|
||
# Analog mode exposes only ALSA ch1/ch2 inputs.
|
||
if not is_streaming:
|
||
try:
|
||
resp = requests.get(f"{BACKEND_URL}/audio_inputs_pw_usb")
|
||
device_list = resp.json().get('inputs', [])
|
||
except Exception as e:
|
||
st.error(f"Failed to fetch devices: {e}")
|
||
device_list = []
|
||
|
||
analog_devices = [d for d in device_list if d.get('name') in ('ch1', 'ch2')]
|
||
|
||
if not analog_devices:
|
||
st.warning("No Analog (ch1/ch2) ALSA inputs found. Check asound configuration.")
|
||
if st.button("Refresh", key="refresh_analog", disabled=is_streaming):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
|
||
if not r.ok:
|
||
st.error(f"Failed to refresh: {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Failed to refresh devices: {e}")
|
||
st.rerun()
|
||
analog_names = [d['name'] for d in analog_devices]
|
||
else:
|
||
analog_devices = []
|
||
analog_names = []
|
||
|
||
if not is_streaming:
|
||
if analog_names:
|
||
if stereo_enabled:
|
||
# In stereo mode, only show ch1 and automatically select it
|
||
if 'ch1' in analog_names:
|
||
input_device1 = 'ch1'
|
||
st.selectbox(
|
||
"Input Device (Radio 1) - Stereo",
|
||
['ch1 + ch2 (Stereo: Left+Right channels)'],
|
||
index=0,
|
||
disabled=is_streaming,
|
||
help="Stereo mode: Captures both ch1 (left) and ch2 (right) as a single stereo stream"
|
||
)
|
||
st.info("🎧 Stereo mode enabled - both ch1 and ch2 will be captured as left/right channels")
|
||
else:
|
||
st.error("ch1 not available for stereo mode")
|
||
input_device1 = None
|
||
else:
|
||
# Mono mode: show all available channels
|
||
saved_input_device = saved_settings.get('input_device')
|
||
default_r1_idx = 0
|
||
if saved_input_device in analog_names:
|
||
default_r1_idx = analog_names.index(saved_input_device)
|
||
input_device1 = st.selectbox(
|
||
"Input Device (Radio 1)",
|
||
analog_names,
|
||
index=default_r1_idx,
|
||
disabled=is_streaming,
|
||
)
|
||
else:
|
||
input_device1 = None
|
||
else:
|
||
input_device1 = saved_settings.get('input_device')
|
||
if stereo_enabled:
|
||
st.selectbox(
|
||
"Input Device (Radio 1) - Stereo",
|
||
[f"{input_device1 or 'ch1'} (Left+Right channels)" if input_device1 else "No device selected"],
|
||
index=0,
|
||
disabled=True,
|
||
help="Stop the stream to change the input device."
|
||
)
|
||
else:
|
||
st.selectbox(
|
||
"Input Device (Radio 1)",
|
||
[input_device1 or "No device selected"],
|
||
index=0,
|
||
disabled=True,
|
||
help="Stop the stream to change the input device."
|
||
)
|
||
|
||
# --- Radio 2 controls ---
|
||
with st.container(border=True):
|
||
st.subheader("Radio 2")
|
||
|
||
# Disable Radio 2 in stereo mode
|
||
if stereo_enabled:
|
||
st.info("🎧 Radio 2 is automatically disabled in stereo mode")
|
||
radio2_enabled = False
|
||
else:
|
||
# If the backend reports that the secondary radio is currently streaming,
|
||
# initialize the checkbox to checked so the UI reflects the active state
|
||
# when the frontend is loaded.
|
||
radio2_enabled_default = secondary_is_streaming
|
||
radio2_enabled = st.checkbox(
|
||
"Enable Radio 2",
|
||
value=radio2_enabled_default,
|
||
disabled=is_streaming,
|
||
help="Activate a second analog radio with its own quality and timing settings."
|
||
)
|
||
|
||
if radio2_enabled and not stereo_enabled:
|
||
# Use saved settings if audio_mode matches, otherwise use analog-specific defaults for Radio 2
|
||
secondary_settings = saved_settings.get('secondary', {})
|
||
saved_audio_mode = saved_settings.get('audio_mode')
|
||
if saved_audio_mode == 'Analog' and secondary_settings:
|
||
default_name_r2 = secondary_settings.get('channel_names', ["Analog_Radio_2"])[0] if isinstance(secondary_settings.get('channel_names'), list) else secondary_settings.get('channel_names', "Analog_Radio_2")
|
||
raw_program_info_r2 = secondary_settings.get('program_info', default_name_r2)
|
||
if isinstance(raw_program_info_r2, list) and raw_program_info_r2:
|
||
default_program_info_r2 = raw_program_info_r2[0]
|
||
else:
|
||
default_program_info_r2 = raw_program_info_r2
|
||
default_lang_r2 = secondary_settings.get('languages', ["deu"])[0] if isinstance(secondary_settings.get('languages'), list) else secondary_settings.get('languages', 'deu')
|
||
|
||
# Map saved sampling rate to quality label
|
||
saved_rate_r2 = secondary_settings.get('auracast_sampling_rate_hz')
|
||
if saved_rate_r2 == 48000:
|
||
default_quality_r2 = "High (48kHz)"
|
||
elif saved_rate_r2 == 32000:
|
||
default_quality_r2 = "Good (32kHz)"
|
||
elif saved_rate_r2 == 24000:
|
||
default_quality_r2 = "Medium (24kHz)"
|
||
elif saved_rate_r2 == 16000:
|
||
default_quality_r2 = "Fair (16kHz)"
|
||
else:
|
||
default_quality_r2 = "Medium (24kHz)"
|
||
|
||
saved_pwd_r2 = secondary_settings.get('stream_password', '')
|
||
else:
|
||
# Use analog-specific defaults when switching from another mode
|
||
default_name_r2 = "Analog_Radio_2"
|
||
default_program_info_r2 = "Analog Radio Broadcast"
|
||
default_lang_r2 = "deu"
|
||
default_quality_r2 = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
|
||
saved_pwd_r2 = ''
|
||
|
||
if default_quality_r2 not in quality_options:
|
||
default_quality_r2 = quality_options[0]
|
||
quality2 = st.selectbox(
|
||
"Stream Quality (Radio 2)",
|
||
quality_options,
|
||
index=quality_options.index(default_quality_r2),
|
||
disabled=is_streaming,
|
||
help="Select the audio sampling rate for Radio 2."
|
||
)
|
||
|
||
stream_passwort2 = st.text_input(
|
||
"Stream Passwort (Radio 2)",
|
||
value=saved_pwd_r2,
|
||
type="password",
|
||
disabled=is_streaming,
|
||
help="Optional: Set a broadcast code for Radio 2."
|
||
)
|
||
|
||
col_r2_flags1, col_r2_flags2, col_r2_pdelay, col_r2_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
|
||
with col_r2_flags1:
|
||
assisted_listening2 = st.checkbox(
|
||
"Assistive listening (R2)",
|
||
value=bool(saved_settings.get('assisted_listening_stream', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver that this is an assistive listening stream"
|
||
)
|
||
with col_r2_flags2:
|
||
immediate_rendering2 = st.checkbox(
|
||
"Immediate rendering (R2)",
|
||
value=bool(saved_settings.get('immediate_rendering', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver to ignore presentation delay and render immediately if possible."
|
||
)
|
||
with col_r2_pdelay:
|
||
presentation_delay_ms2 = st.number_input(
|
||
"Delay (ms, R2)",
|
||
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
|
||
disabled=is_streaming,
|
||
help="Delay between capture and presentation for Radio 2."
|
||
)
|
||
with col_r2_qos:
|
||
saved_qos2 = saved_settings.get('secondary', {}).get('qos_preset', 'Fast')
|
||
default_qos_idx2 = qos_options.index(saved_qos2) if saved_qos2 in qos_options else 0
|
||
qos_preset2 = st.selectbox(
|
||
"QoS (R2)", options=qos_options, index=default_qos_idx2,
|
||
disabled=is_streaming,
|
||
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
|
||
)
|
||
|
||
tx_power_r2 = _tx_power_selectbox(
|
||
"TX Power (R2)",
|
||
key="analog_tx_power_r2",
|
||
default=saved_settings.get('secondary', {}).get('advertising_tx_power', TX_POWER_DEFAULT),
|
||
disabled=is_streaming,
|
||
)
|
||
|
||
col_r2_name, col_r2_lang = st.columns([2, 1])
|
||
with col_r2_name:
|
||
stream_name2 = st.text_input(
|
||
"Channel Name (Radio 2)",
|
||
value=default_name_r2,
|
||
disabled=is_streaming,
|
||
help="Name for the second analog radio (Radio 2)."
|
||
)
|
||
with col_r2_lang:
|
||
language2 = st.text_input(
|
||
"Language (ISO 639-3) (Radio 2)",
|
||
value=default_lang_r2,
|
||
disabled=is_streaming,
|
||
help="Language code for Radio 2."
|
||
)
|
||
program_info2 = st.text_input(
|
||
"Program Info (Radio 2)",
|
||
value=default_program_info_r2,
|
||
disabled=is_streaming,
|
||
help="Program information for Radio 2."
|
||
)
|
||
|
||
if not is_streaming:
|
||
if analog_names:
|
||
secondary_settings = saved_settings.get('secondary', {})
|
||
saved_input_device2 = secondary_settings.get('input_device')
|
||
default_r2_idx = 1 if len(analog_names) > 1 else 0
|
||
if saved_input_device2 in analog_names:
|
||
default_r2_idx = analog_names.index(saved_input_device2)
|
||
input_device2 = st.selectbox(
|
||
"Input Device (Radio 2)",
|
||
analog_names,
|
||
index=default_r2_idx,
|
||
disabled=is_streaming,
|
||
)
|
||
else:
|
||
input_device2 = None
|
||
else:
|
||
input_device2 = saved_settings.get('input_device')
|
||
st.selectbox(
|
||
"Input Device (Radio 2)",
|
||
[input_device2 or "No device selected"],
|
||
index=0,
|
||
disabled=True,
|
||
help="Stop the stream to change the input device."
|
||
)
|
||
|
||
radio2_cfg = {
|
||
'id': 1002,
|
||
'name': stream_name2,
|
||
'program_info': program_info2,
|
||
'language': language2,
|
||
'input_device': input_device2,
|
||
'quality': quality2,
|
||
'stream_passwort': stream_passwort2,
|
||
'assisted_listening': assisted_listening2,
|
||
'immediate_rendering': immediate_rendering2,
|
||
'presentation_delay_ms': presentation_delay_ms2,
|
||
'qos_preset': qos_preset2,
|
||
'tx_power': tx_power_r2,
|
||
'analog_gain_db_left': analog_gain_db_left,
|
||
'analog_gain_db_right': analog_gain_db_right,
|
||
}
|
||
|
||
radio1_cfg = {
|
||
'id': 1001,
|
||
'name': stream_name1,
|
||
'program_info': program_info1,
|
||
'language': language1,
|
||
'input_device': input_device1,
|
||
'quality': quality1,
|
||
'stream_passwort': stream_passwort1,
|
||
'assisted_listening': assisted_listening1,
|
||
'immediate_rendering': immediate_rendering1,
|
||
'presentation_delay_ms': presentation_delay_ms1,
|
||
'qos_preset': qos_preset1,
|
||
'tx_power': tx_power_r1,
|
||
'stereo_mode': stereo_enabled,
|
||
'analog_gain_db_left': analog_gain_db_left,
|
||
'analog_gain_db_right': analog_gain_db_right,
|
||
}
|
||
|
||
if audio_mode == "Network - Dante":
|
||
# --- Network - Dante mode with Radio 1 and Radio 2 categories ---
|
||
|
||
# Define stream configuration options
|
||
dante_stream_options = {
|
||
"1 × 48kHz": {"streams": 1, "quality": "High (48kHz)"},
|
||
"2 × 24kHz": {"streams": 2, "quality": "Medium (24kHz)"},
|
||
"3 × 16kHz": {"streams": 3, "quality": "Fair (16kHz)"}
|
||
}
|
||
|
||
# Get available Dante devices
|
||
if not is_streaming:
|
||
try:
|
||
resp = requests.get(f"{BACKEND_URL}/audio_inputs_dante")
|
||
device_list = resp.json().get('inputs', [])
|
||
except Exception as e:
|
||
st.error(f"Failed to fetch Dante devices: {e}")
|
||
device_list = []
|
||
|
||
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
|
||
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
|
||
device_names = [d['name'] for d in device_list]
|
||
else:
|
||
input_options = []
|
||
option_name_map = {}
|
||
device_names = []
|
||
|
||
# --- Radio 1 Section ---
|
||
with st.container(border=True):
|
||
st.subheader("Radio 1")
|
||
|
||
# Always-enabled checkbox for Radio 1
|
||
st.checkbox(
|
||
"Radio 1 always enabled",
|
||
value=True,
|
||
disabled=True,
|
||
help="Radio 1 is always enabled, Radio 2 can be turned on or off."
|
||
)
|
||
|
||
# Dante stereo mode toggle
|
||
saved_audio_mode = saved_settings.get('audio_mode')
|
||
dante_stereo_enabled = False
|
||
if saved_audio_mode == 'Network - Dante':
|
||
# Check if any input device starts with dante_stereo_ to detect stereo mode
|
||
input_device = saved_settings.get('input_device', '')
|
||
dante_stereo_enabled = input_device.startswith('dante_stereo_')
|
||
dante_stereo_enabled = st.checkbox(
|
||
"🎧 Stereo Mode",
|
||
value=dante_stereo_enabled,
|
||
help="Enable stereo streaming for Dante inputs. Select left and right channels from ASRC channels 1-6. Radio 2 and multi-stream configurations will be disabled in stereo mode.",
|
||
disabled=is_streaming
|
||
)
|
||
|
||
# Dante stereo channel selectors
|
||
dante_left_channel = None
|
||
dante_right_channel = None
|
||
if dante_stereo_enabled:
|
||
dante_channel_options = ["dante_asrc_ch1", "dante_asrc_ch2", "dante_asrc_ch3",
|
||
"dante_asrc_ch4", "dante_asrc_ch5", "dante_asrc_ch6"]
|
||
dante_channel_labels = ["CH1", "CH2", "CH3", "CH4", "CH5", "CH6"]
|
||
|
||
# Parse saved stereo device name to extract left and right channels
|
||
input_device = saved_settings.get('input_device', '')
|
||
saved_left = 'dante_asrc_ch1'
|
||
saved_right = 'dante_asrc_ch2'
|
||
if input_device.startswith('dante_stereo_'):
|
||
# Format: dante_stereo_<left>_<right>
|
||
parts = input_device.split('_')
|
||
if len(parts) >= 4:
|
||
saved_left = f"dante_asrc_ch{parts[2]}"
|
||
saved_right = f"dante_asrc_ch{parts[3]}"
|
||
|
||
col_left, col_right = st.columns(2)
|
||
with col_left:
|
||
left_idx = dante_channel_options.index(saved_left) if saved_left in dante_channel_options else 0
|
||
dante_left_channel = st.selectbox(
|
||
"Left Channel",
|
||
dante_channel_options,
|
||
index=left_idx,
|
||
format_func=lambda x: f"ASRC {dante_channel_labels[dante_channel_options.index(x)]}",
|
||
disabled=is_streaming,
|
||
help="Select the Dante ASRC channel for the left stereo channel"
|
||
)
|
||
with col_right:
|
||
right_idx = dante_channel_options.index(saved_right) if saved_right in dante_channel_options else 1
|
||
dante_right_channel = st.selectbox(
|
||
"Right Channel",
|
||
dante_channel_options,
|
||
index=right_idx,
|
||
format_func=lambda x: f"ASRC {dante_channel_labels[dante_channel_options.index(x)]}",
|
||
disabled=is_streaming,
|
||
help="Select the Dante ASRC channel for the right stereo channel"
|
||
)
|
||
|
||
if dante_left_channel == dante_right_channel:
|
||
st.warning("⚠️ Left and right channels are the same. Select different channels for true stereo.")
|
||
else:
|
||
st.info(f"🎧 Stereo mode: {dante_channel_labels[dante_channel_options.index(dante_left_channel)]} (Left) + {dante_channel_labels[dante_channel_options.index(dante_right_channel)]} (Right)")
|
||
|
||
# Stream count dropdown for Radio 1 (disabled in stereo mode - forced to 1 stream at 48kHz)
|
||
r1_stream_options = list(dante_stream_options.keys())
|
||
# Infer stream configuration from saved sampling rate
|
||
saved_rate = saved_settings.get('auracast_sampling_rate_hz')
|
||
saved_r1_streams = '1 × 48kHz' # default
|
||
if saved_rate:
|
||
if saved_rate == 48000:
|
||
channel_names = saved_settings.get('channel_names', [])
|
||
if len(channel_names) == 2:
|
||
saved_r1_streams = '2 × 24kHz'
|
||
elif len(channel_names) == 3:
|
||
saved_r1_streams = '3 × 16kHz'
|
||
else:
|
||
saved_r1_streams = '1 × 48kHz'
|
||
elif saved_rate == 24000:
|
||
saved_r1_streams = '2 × 24kHz'
|
||
elif saved_rate == 16000:
|
||
saved_r1_streams = '3 × 16kHz'
|
||
default_r1_idx = r1_stream_options.index(saved_r1_streams) if saved_r1_streams in r1_stream_options else 0
|
||
|
||
if dante_stereo_enabled:
|
||
# Stereo mode: force 1 stream at 48kHz
|
||
r1_stream_config = "1 × 48kHz"
|
||
st.selectbox(
|
||
"Stream Configuration (Radio 1)",
|
||
["1 × 48kHz (Stereo)"],
|
||
index=0,
|
||
disabled=True,
|
||
help="In stereo mode, only 1 stream at 48kHz is supported"
|
||
)
|
||
else:
|
||
r1_stream_config = st.selectbox(
|
||
"Stream Configuration (Radio 1)",
|
||
r1_stream_options,
|
||
index=default_r1_idx,
|
||
disabled=is_streaming,
|
||
help="Select the number and quality of streams for Radio 1"
|
||
)
|
||
|
||
r1_num_streams = dante_stream_options[r1_stream_config]["streams"]
|
||
r1_quality = dante_stream_options[r1_stream_config]["quality"]
|
||
|
||
# Stream quality (moved directly under stream configuration)
|
||
r1_max_quality = r1_quality
|
||
r1_available_qualities = []
|
||
for quality in ["High (48kHz)", "Good (32kHz)", "Medium (24kHz)", "Fair (16kHz)"]:
|
||
# Check if this quality is equal to or lower than the max
|
||
if (r1_max_quality == "High (48kHz)" or
|
||
(r1_max_quality == "Medium (24kHz)" and quality in ["Medium (24kHz)", "Fair (16kHz)"]) or
|
||
(r1_max_quality == "Fair (16kHz)" and quality == "Fair (16kHz)")):
|
||
r1_available_qualities.append(quality)
|
||
|
||
# Map saved sampling rate to quality label
|
||
saved_r1_quality = r1_max_quality
|
||
saved_rate = saved_settings.get('auracast_sampling_rate_hz')
|
||
if saved_rate == 48000:
|
||
saved_r1_quality = "High (48kHz)"
|
||
elif saved_rate == 32000:
|
||
saved_r1_quality = "Good (32kHz)"
|
||
elif saved_rate == 24000:
|
||
saved_r1_quality = "Medium (24kHz)"
|
||
elif saved_rate == 16000:
|
||
saved_r1_quality = "Fair (16kHz)"
|
||
if saved_r1_quality not in r1_available_qualities:
|
||
saved_r1_quality = r1_max_quality
|
||
|
||
r1_radio_quality = st.selectbox(
|
||
"Stream Quality (Radio 1)",
|
||
r1_available_qualities,
|
||
index=r1_available_qualities.index(saved_r1_quality),
|
||
disabled=is_streaming,
|
||
help=f"Select stream quality for Radio 1. Maximum quality based on configuration: {r1_max_quality}"
|
||
)
|
||
|
||
# Radio-level settings for Radio 1
|
||
# First row: Assistive listening, immediate rendering, presentation delay, QoS
|
||
col_r1_flags1, col_r1_flags2, col_r1_pdelay, col_r1_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
|
||
|
||
with col_r1_flags1:
|
||
r1_assisted_listening = st.checkbox(
|
||
"Assistive (R1)",
|
||
value=bool(saved_settings.get('assisted_listening_stream', False)),
|
||
disabled=is_streaming,
|
||
help="Assistive listening stream"
|
||
)
|
||
|
||
with col_r1_flags2:
|
||
r1_immediate_rendering = st.checkbox(
|
||
"Immediate (R1)",
|
||
value=bool(saved_settings.get('immediate_rendering', False)),
|
||
disabled=is_streaming,
|
||
help="Ignore presentation delay"
|
||
)
|
||
|
||
with col_r1_pdelay:
|
||
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
|
||
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
|
||
r1_presentation_delay_ms = st.number_input(
|
||
"Delay (ms, R1)",
|
||
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
|
||
disabled=is_streaming,
|
||
help="Presentation delay for Radio 1"
|
||
)
|
||
|
||
with col_r1_qos:
|
||
qos_options = list(QOS_PRESET_MAP.keys())
|
||
saved_qos = saved_settings.get('qos_preset', 'Fast')
|
||
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
|
||
r1_qos_preset = st.selectbox(
|
||
"QoS (R1)", options=qos_options, index=default_qos_idx,
|
||
disabled=is_streaming,
|
||
help="Quality of Service preset for Radio 1"
|
||
)
|
||
|
||
r1_tx_power = _tx_power_selectbox(
|
||
"TX Power (R1)",
|
||
key="dante_tx_power_r1",
|
||
default=saved_r1_config.get('advertising_tx_power', saved_settings.get('advertising_tx_power', TX_POWER_DEFAULT)),
|
||
disabled=is_streaming,
|
||
)
|
||
|
||
# Per-stream configuration for Radio 1
|
||
if dante_stereo_enabled:
|
||
st.write("**Stereo Stream Configuration (Radio 1)**")
|
||
else:
|
||
st.write("**Stream Configuration (Radio 1)**")
|
||
r1_streams = []
|
||
|
||
if dante_stereo_enabled:
|
||
# Stereo mode: single stream with combined L+R channels
|
||
with st.expander("Stereo Stream - Radio 1", expanded=True):
|
||
# Read from flat settings structure
|
||
channel_names = saved_settings.get('channel_names', [])
|
||
program_infos = saved_settings.get('program_info', [])
|
||
languages = saved_settings.get('languages', [])
|
||
|
||
saved_name = channel_names[0] if channel_names else 'Dante_Stereo'
|
||
saved_program_info = program_infos[0] if program_infos else saved_name
|
||
saved_language = languages[0] if languages else 'eng'
|
||
saved_password = saved_settings.get('stream_password', '')
|
||
|
||
# First row: Channel name and password
|
||
col_name, col_pwd = st.columns([2, 1])
|
||
|
||
with col_name:
|
||
stream_name = st.text_input(
|
||
"Channel Name",
|
||
value=saved_name,
|
||
disabled=is_streaming,
|
||
key="r1_stereo_name"
|
||
)
|
||
|
||
with col_pwd:
|
||
stream_password = st.text_input(
|
||
"Stream Password",
|
||
value=saved_password,
|
||
type="password",
|
||
disabled=is_streaming,
|
||
key="r1_stereo_password",
|
||
help="Optional: Set a broadcast code for this stream"
|
||
)
|
||
|
||
# Second row: Program info and language
|
||
col_prog, col_lang_code = st.columns([2, 1])
|
||
|
||
with col_prog:
|
||
program_info = st.text_input(
|
||
"Program Info",
|
||
value=saved_program_info,
|
||
disabled=is_streaming,
|
||
key="r1_stereo_program"
|
||
)
|
||
|
||
with col_lang_code:
|
||
language = st.text_input(
|
||
"Language",
|
||
value=saved_language,
|
||
disabled=is_streaming,
|
||
key="r1_stereo_lang",
|
||
help="ISO 639-3 language code"
|
||
)
|
||
|
||
# Build stereo device name from selected channels
|
||
# Extract channel numbers from dante_asrc_chX
|
||
left_ch_num = dante_left_channel.replace('dante_asrc_ch', '') if dante_left_channel else '1'
|
||
right_ch_num = dante_right_channel.replace('dante_asrc_ch', '') if dante_right_channel else '2'
|
||
# Use the device name that matches the user's L/R selection
|
||
stereo_device_name = f"dante_stereo_{left_ch_num}_{right_ch_num}"
|
||
|
||
# Show stereo device info
|
||
st.selectbox(
|
||
"Input Device (Stereo)",
|
||
[f"Stereo: CH{left_ch_num} (L) + CH{right_ch_num} (R)"],
|
||
index=0,
|
||
disabled=True,
|
||
help="Stereo input from the selected Dante ASRC channels"
|
||
)
|
||
|
||
r1_streams.append({
|
||
'name': stream_name,
|
||
'program_info': program_info,
|
||
'language': language,
|
||
'input_device': stereo_device_name,
|
||
'stream_password': stream_password,
|
||
'dante_stereo_mode': True,
|
||
'dante_stereo_left': dante_left_channel,
|
||
'dante_stereo_right': dante_right_channel,
|
||
})
|
||
else:
|
||
# Normal mono mode: multiple streams with individual channels
|
||
# Read from flat settings structure
|
||
channel_names = saved_settings.get('channel_names', [])
|
||
program_infos = saved_settings.get('program_info', [])
|
||
languages = saved_settings.get('languages', [])
|
||
input_devices = saved_settings.get('input_devices', [])
|
||
stream_passwords = saved_settings.get('stream_passwords', []) if 'stream_passwords' in saved_settings else []
|
||
|
||
for i in range(r1_num_streams):
|
||
with st.expander(f"Stream {i+1} - Radio 1", expanded=True):
|
||
# Get saved values from flat structure
|
||
saved_name = channel_names[i] if i < len(channel_names) else f'Dante_R1_S{i+1}'
|
||
saved_program_info = program_infos[i] if i < len(program_infos) else f'Dante Radio 1 Stream {i+1}'
|
||
saved_language = languages[i] if i < len(languages) else 'eng'
|
||
saved_password = stream_passwords[i] if i < len(stream_passwords) else ''
|
||
saved_input_device = input_devices[i] if i < len(input_devices) else None
|
||
|
||
# First row: Channel name and language
|
||
col_name, col_lang = st.columns([2, 1])
|
||
|
||
with col_name:
|
||
stream_name = st.text_input(
|
||
f"Channel Name",
|
||
value=saved_name,
|
||
disabled=is_streaming,
|
||
key=f"r1_stream_{i}_name"
|
||
)
|
||
|
||
with col_lang:
|
||
stream_password = st.text_input(
|
||
f"Stream Password",
|
||
value=saved_password,
|
||
type="password",
|
||
disabled=is_streaming,
|
||
key=f"r1_stream_{i}_password",
|
||
help="Optional: Set a broadcast code for this stream"
|
||
)
|
||
|
||
# Second row: Program info and language
|
||
col_prog, col_lang_code = st.columns([2, 1])
|
||
|
||
with col_prog:
|
||
program_info = st.text_input(
|
||
f"Program Info",
|
||
value=saved_program_info,
|
||
disabled=is_streaming,
|
||
key=f"r1_stream_{i}_program"
|
||
)
|
||
|
||
with col_lang_code:
|
||
language = st.text_input(
|
||
f"Language",
|
||
value=saved_language,
|
||
disabled=is_streaming,
|
||
key=f"r1_stream_{i}_lang",
|
||
help="ISO 639-3 language code"
|
||
)
|
||
|
||
# Third row: Input device
|
||
col_device = st.columns([1])[0]
|
||
|
||
with col_device:
|
||
# Session state key for persisting the selection
|
||
device_session_key = f"r1_stream_{i}_device_saved"
|
||
|
||
if not is_streaming and input_options:
|
||
# Get default from session state first, then from saved settings
|
||
default_input_name = st.session_state.get(device_session_key, saved_input_device)
|
||
default_input_label = None
|
||
for label, name in option_name_map.items():
|
||
if name == default_input_name:
|
||
default_input_label = label
|
||
break
|
||
if default_input_label not in input_options and input_options:
|
||
default_input_label = input_options[0]
|
||
|
||
selected_option = st.selectbox(
|
||
f"Input Device",
|
||
input_options,
|
||
index=input_options.index(default_input_label) if default_input_label in input_options else 0,
|
||
disabled=is_streaming,
|
||
key=f"r1_stream_{i}_device"
|
||
)
|
||
input_device = option_name_map.get(selected_option)
|
||
# Save to session state for persistence
|
||
st.session_state[device_session_key] = input_device
|
||
else:
|
||
# When streaming, get the device from session state
|
||
current_device = st.session_state.get(device_session_key, saved_input_device or 'No device')
|
||
|
||
# Convert internal name to display label
|
||
display_label = current_device
|
||
for label, name in option_name_map.items():
|
||
if name == current_device:
|
||
display_label = label
|
||
break
|
||
|
||
st.selectbox(
|
||
f"Input Device",
|
||
[display_label if display_label else 'No device'],
|
||
index=0,
|
||
disabled=True,
|
||
key=f"r1_stream_{i}_device_disabled"
|
||
)
|
||
input_device = current_device
|
||
|
||
r1_streams.append({
|
||
'name': stream_name,
|
||
'program_info': program_info,
|
||
'language': language,
|
||
'input_device': input_device,
|
||
'stream_password': stream_password
|
||
})
|
||
|
||
# --- Radio 2 Section ---
|
||
with st.container(border=True):
|
||
st.subheader("Radio 2")
|
||
|
||
# Disable Radio 2 in stereo mode
|
||
secondary_settings = saved_settings.get('secondary', {})
|
||
if dante_stereo_enabled:
|
||
st.info("🎧 Radio 2 is automatically disabled in stereo mode")
|
||
radio2_enabled = False
|
||
else:
|
||
# Enable/disable checkbox for Radio 2
|
||
# Use saved settings or streaming state to determine default
|
||
radio2_enabled_default = secondary_is_streaming
|
||
# Check if secondary radio has saved settings (indicates it was enabled)
|
||
if secondary_settings.get('auracast_sampling_rate_hz') or secondary_settings.get('channel_names'):
|
||
radio2_enabled_default = True
|
||
radio2_enabled = st.checkbox(
|
||
"Enable Radio 2",
|
||
value=radio2_enabled_default,
|
||
disabled=is_streaming,
|
||
help="Activate a second Dante radio with its own quality and timing settings."
|
||
)
|
||
|
||
if radio2_enabled:
|
||
# Stream count dropdown for Radio 2
|
||
r2_stream_options = r1_stream_options
|
||
# Infer stream configuration from saved secondary sampling rate
|
||
saved_rate2 = secondary_settings.get('auracast_sampling_rate_hz')
|
||
saved_r2_streams = '1 × 48kHz' # default
|
||
if saved_rate2:
|
||
if saved_rate2 == 48000:
|
||
channel_names2 = secondary_settings.get('channel_names', [])
|
||
if len(channel_names2) == 2:
|
||
saved_r2_streams = '2 × 24kHz'
|
||
elif len(channel_names2) == 3:
|
||
saved_r2_streams = '3 × 16kHz'
|
||
else:
|
||
saved_r2_streams = '1 × 48kHz'
|
||
elif saved_rate2 == 24000:
|
||
saved_r2_streams = '2 × 24kHz'
|
||
elif saved_rate2 == 16000:
|
||
saved_r2_streams = '3 × 16kHz'
|
||
default_r2_idx = r2_stream_options.index(saved_r2_streams) if saved_r2_streams in r2_stream_options else 0
|
||
|
||
r2_stream_config = st.selectbox(
|
||
"Stream Configuration (Radio 2)",
|
||
r2_stream_options,
|
||
index=default_r2_idx,
|
||
disabled=is_streaming,
|
||
help="Select the number and quality of streams for Radio 2"
|
||
)
|
||
r2_num_streams = dante_stream_options[r2_stream_config]["streams"]
|
||
r2_quality = dante_stream_options[r2_stream_config]["quality"]
|
||
|
||
# Stream quality (moved directly under stream configuration)
|
||
r2_max_quality = r2_quality
|
||
r2_available_qualities = []
|
||
for quality in ["High (48kHz)", "Good (32kHz)", "Medium (24kHz)", "Fair (16kHz)"]:
|
||
# Check if this quality is equal to or lower than the max
|
||
if (r2_max_quality == "High (48kHz)" or
|
||
(r2_max_quality == "Medium (24kHz)" and quality in ["Medium (24kHz)", "Fair (16kHz)"]) or
|
||
(r2_max_quality == "Fair (16kHz)" and quality == "Fair (16kHz)")):
|
||
r2_available_qualities.append(quality)
|
||
|
||
# Map saved secondary sampling rate to quality label
|
||
saved_r2_quality = r2_max_quality
|
||
if saved_rate2 == 48000:
|
||
saved_r2_quality = "High (48kHz)"
|
||
elif saved_rate2 == 32000:
|
||
saved_r2_quality = "Good (32kHz)"
|
||
elif saved_rate2 == 24000:
|
||
saved_r2_quality = "Medium (24kHz)"
|
||
elif saved_rate2 == 16000:
|
||
saved_r2_quality = "Fair (16kHz)"
|
||
if saved_r2_quality not in r2_available_qualities:
|
||
saved_r2_quality = r2_max_quality
|
||
|
||
r2_radio_quality = st.selectbox(
|
||
"Stream Quality (Radio 2)",
|
||
r2_available_qualities,
|
||
index=r2_available_qualities.index(saved_r2_quality),
|
||
disabled=is_streaming,
|
||
help=f"Select stream quality for Radio 2. Maximum quality based on configuration: {r2_max_quality}"
|
||
)
|
||
|
||
# Radio-level settings for Radio 2
|
||
col_r2_flags1, col_r2_flags2, col_r2_pdelay, col_r2_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
|
||
|
||
with col_r2_flags1:
|
||
r2_assisted_listening = st.checkbox(
|
||
"Assistive (R2)",
|
||
value=bool(secondary_settings.get('assisted_listening_stream', False)),
|
||
disabled=is_streaming,
|
||
help="Assistive listening stream"
|
||
)
|
||
|
||
with col_r2_flags2:
|
||
r2_immediate_rendering = st.checkbox(
|
||
"Immediate (R2)",
|
||
value=bool(secondary_settings.get('immediate_rendering', False)),
|
||
disabled=is_streaming,
|
||
help="Ignore presentation delay"
|
||
)
|
||
|
||
with col_r2_pdelay:
|
||
default_pdelay = int(secondary_settings.get('presentation_delay_us', 40000) or 40000)
|
||
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
|
||
r2_presentation_delay_ms = st.number_input(
|
||
"Delay (ms, R2)",
|
||
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
|
||
disabled=is_streaming,
|
||
help="Presentation delay for Radio 2"
|
||
)
|
||
|
||
with col_r2_qos:
|
||
qos_options = list(QOS_PRESET_MAP.keys())
|
||
saved_qos = secondary_settings.get('qos_preset', 'Fast')
|
||
default_qos_idx2 = qos_options.index(saved_qos) if saved_qos in qos_options else 0
|
||
r2_qos_preset = st.selectbox(
|
||
"QoS (R2)", options=qos_options, index=default_qos_idx2,
|
||
disabled=is_streaming,
|
||
help="Quality of Service preset for Radio 2"
|
||
)
|
||
|
||
r2_tx_power = _tx_power_selectbox(
|
||
"TX Power (R2)",
|
||
key="dante_tx_power_r2",
|
||
default=saved_r2_config.get('advertising_tx_power', saved_settings.get('secondary', {}).get('advertising_tx_power', TX_POWER_DEFAULT)),
|
||
disabled=is_streaming,
|
||
)
|
||
|
||
# Per-stream configuration for Radio 2
|
||
st.write("**Stream Configuration (Radio 2)**")
|
||
r2_streams = []
|
||
|
||
# Read from flat secondary settings structure
|
||
channel_names2 = secondary_settings.get('channel_names', [])
|
||
program_infos2 = secondary_settings.get('program_info', [])
|
||
languages2 = secondary_settings.get('languages', [])
|
||
input_devices2 = secondary_settings.get('input_devices', [])
|
||
stream_passwords2 = secondary_settings.get('stream_passwords', []) if 'stream_passwords' in secondary_settings else []
|
||
|
||
for i in range(r2_num_streams):
|
||
with st.expander(f"Stream {i+1} - Radio 2", expanded=True):
|
||
# Get saved values from flat secondary structure
|
||
saved_name2 = channel_names2[i] if i < len(channel_names2) else f'Dante_R2_S{i+1}'
|
||
saved_program_info2 = program_infos2[i] if i < len(program_infos2) else f'Dante Radio 2 Stream {i+1}'
|
||
saved_language2 = languages2[i] if i < len(languages2) else 'eng'
|
||
saved_password2 = stream_passwords2[i] if i < len(stream_passwords2) else ''
|
||
saved_input_device2 = input_devices2[i] if i < len(input_devices2) else None
|
||
|
||
# First row: Channel name and password
|
||
col_name, col_pwd = st.columns([2, 1])
|
||
|
||
with col_name:
|
||
stream_name = st.text_input(
|
||
f"Channel Name",
|
||
value=saved_name2,
|
||
disabled=is_streaming,
|
||
key=f"r2_stream_{i}_name"
|
||
)
|
||
|
||
with col_pwd:
|
||
stream_password = st.text_input(
|
||
f"Stream Password",
|
||
value=saved_password2,
|
||
type="password",
|
||
disabled=is_streaming,
|
||
key=f"r2_stream_{i}_password",
|
||
help="Optional: Set a broadcast code for this stream"
|
||
)
|
||
|
||
# Second row: Program info and language
|
||
col_prog, col_lang = st.columns([2, 1])
|
||
|
||
with col_prog:
|
||
program_info = st.text_input(
|
||
f"Program Info",
|
||
value=saved_program_info2,
|
||
disabled=is_streaming,
|
||
key=f"r2_stream_{i}_program"
|
||
)
|
||
|
||
with col_lang:
|
||
language = st.text_input(
|
||
f"Language",
|
||
value=saved_language2,
|
||
disabled=is_streaming,
|
||
key=f"r2_stream_{i}_lang",
|
||
help="ISO 639-3 language code"
|
||
)
|
||
|
||
# Third row: Input device
|
||
col_device = st.columns([1])[0]
|
||
|
||
with col_device:
|
||
# Session state key for persisting the selection
|
||
device_session_key = f"r2_stream_{i}_device_saved"
|
||
|
||
if not is_streaming and input_options:
|
||
# Get default from session state first, then from saved settings
|
||
default_input_name = st.session_state.get(device_session_key, saved_input_device2)
|
||
default_input_label = None
|
||
for label, name in option_name_map.items():
|
||
if name == default_input_name:
|
||
default_input_label = label
|
||
break
|
||
if default_input_label not in input_options and input_options:
|
||
default_input_label = input_options[0]
|
||
|
||
selected_option = st.selectbox(
|
||
f"Input Device",
|
||
input_options,
|
||
index=input_options.index(default_input_label) if default_input_label in input_options else 0,
|
||
disabled=is_streaming,
|
||
key=f"r2_stream_{i}_device"
|
||
)
|
||
input_device = option_name_map.get(selected_option)
|
||
# Save to session state for persistence
|
||
st.session_state[device_session_key] = input_device
|
||
else:
|
||
# When streaming, get the device from session state
|
||
current_device = st.session_state.get(device_session_key, saved_input_device2 or 'No device')
|
||
|
||
# Convert internal name to display label
|
||
display_label = current_device
|
||
for label, name in option_name_map.items():
|
||
if name == current_device:
|
||
display_label = label
|
||
break
|
||
|
||
st.selectbox(
|
||
f"Input Device",
|
||
[display_label if display_label else 'No device'],
|
||
index=0,
|
||
disabled=True,
|
||
key=f"r2_stream_{i}_device_disabled"
|
||
)
|
||
input_device = current_device
|
||
|
||
r2_streams.append({
|
||
'name': stream_name,
|
||
'program_info': program_info,
|
||
'language': language,
|
||
'input_device': input_device,
|
||
'stream_password': stream_password
|
||
})
|
||
else:
|
||
r2_streams = []
|
||
# Set default values for r2_* variables when radio2 is disabled
|
||
r2_stream_config = '1 × 48kHz'
|
||
r2_quality = 'High (48kHz)'
|
||
r2_radio_quality = 'High (48kHz)'
|
||
r2_assisted_listening = False
|
||
r2_immediate_rendering = False
|
||
r2_presentation_delay_ms = 40
|
||
r2_qos_preset = 'Fast'
|
||
r2_tx_power = TX_POWER_DEFAULT
|
||
|
||
# Validate unique input devices for Network - Dante mode
|
||
if audio_mode == "Network - Dante":
|
||
is_valid, error_msg = validate_unique_input_devices(r1_streams, r2_streams)
|
||
if not is_valid:
|
||
# Display error in the placeholder at the top
|
||
validation_error_placeholder.error(error_msg)
|
||
|
||
# Show device refresh button if no devices found
|
||
if not input_options and not is_streaming:
|
||
st.warning("No Dante inputs found. Check asound.conf configuration.")
|
||
if st.button("Refresh", key="refresh_dante_devices", disabled=is_streaming):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
|
||
if not r.ok:
|
||
st.error(f"Failed to refresh: {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Failed to refresh devices: {e}")
|
||
st.rerun()
|
||
|
||
# Store radio configurations for backend
|
||
radio1_cfg = {
|
||
'stream_config': r1_stream_config,
|
||
'max_quality': r1_quality, # Original max quality from stream config
|
||
'radio_quality': r1_radio_quality, # User-selected quality (may be lower)
|
||
'quality': r1_radio_quality, # Use selected quality for backend
|
||
'streams': r1_streams,
|
||
'assisted_listening': r1_assisted_listening,
|
||
'immediate_rendering': r1_immediate_rendering,
|
||
'presentation_delay_ms': r1_presentation_delay_ms,
|
||
'qos_preset': r1_qos_preset,
|
||
'tx_power': r1_tx_power,
|
||
'dante_stereo_mode': dante_stereo_enabled,
|
||
'dante_stereo_left': dante_left_channel,
|
||
'dante_stereo_right': dante_right_channel,
|
||
}
|
||
|
||
radio2_cfg = {
|
||
'stream_config': r2_stream_config,
|
||
'max_quality': r2_quality if radio2_enabled else None, # Original max quality from stream config
|
||
'radio_quality': r2_radio_quality if radio2_enabled else None, # User-selected quality
|
||
'quality': r2_radio_quality if radio2_enabled else None, # Use selected quality for backend
|
||
'streams': r2_streams,
|
||
'assisted_listening': r2_assisted_listening if radio2_enabled else False,
|
||
'immediate_rendering': r2_immediate_rendering if radio2_enabled else False,
|
||
'presentation_delay_ms': r2_presentation_delay_ms if radio2_enabled else 40000,
|
||
'qos_preset': r2_qos_preset if radio2_enabled else 'Fast',
|
||
'tx_power': r2_tx_power if radio2_enabled else TX_POWER_DEFAULT,
|
||
} if radio2_enabled else None
|
||
|
||
if audio_mode in ("USB", "Network"):
|
||
# USB/Network: single set of controls shared with the single channel
|
||
# Use saved settings if audio_mode matches, otherwise use defaults
|
||
saved_audio_mode = saved_settings.get('audio_mode')
|
||
if saved_audio_mode in ("USB", "Network"):
|
||
# Map saved sampling rate to quality label
|
||
saved_rate = saved_settings.get('auracast_sampling_rate_hz')
|
||
if saved_rate == 48000:
|
||
default_quality = "High (48kHz)"
|
||
elif saved_rate == 32000:
|
||
default_quality = "Good (32kHz)"
|
||
elif saved_rate == 24000:
|
||
default_quality = "Medium (24kHz)"
|
||
elif saved_rate == 16000:
|
||
default_quality = "Fair (16kHz)"
|
||
else:
|
||
default_quality = "Medium (24kHz)"
|
||
saved_pwd = saved_settings.get('stream_password', '')
|
||
else:
|
||
# Use defaults when switching from another mode
|
||
default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
|
||
saved_pwd = ''
|
||
|
||
quality_options = list(QUALITY_MAP.keys())
|
||
if default_quality not in quality_options:
|
||
default_quality = quality_options[0]
|
||
quality = st.selectbox(
|
||
"Stream Quality (Sampling Rate)",
|
||
quality_options,
|
||
index=quality_options.index(default_quality),
|
||
disabled=is_streaming,
|
||
help="Select the audio sampling rate for the stream. Lower rates may improve compatibility."
|
||
)
|
||
|
||
stream_passwort = st.text_input(
|
||
"Stream Passwort",
|
||
value=saved_pwd,
|
||
type="password",
|
||
disabled=is_streaming,
|
||
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
|
||
)
|
||
|
||
col_flags1, col_flags2, col_pdelay, col_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
|
||
with col_flags1:
|
||
assisted_listening = st.checkbox(
|
||
"Assistive listening",
|
||
value=bool(saved_settings.get('assisted_listening_stream', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver that this is an assistive listening stream"
|
||
)
|
||
with col_flags2:
|
||
immediate_rendering = st.checkbox(
|
||
"Immediate rendering",
|
||
value=bool(saved_settings.get('immediate_rendering', False)),
|
||
disabled=is_streaming,
|
||
help="tells the receiver to ignore presentation delay and render immediately if possible."
|
||
)
|
||
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
|
||
with col_pdelay:
|
||
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
|
||
presentation_delay_ms = st.number_input(
|
||
"Delay (ms)",
|
||
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
|
||
disabled=is_streaming,
|
||
help="Delay between capture and presentation for receivers."
|
||
)
|
||
with col_qos:
|
||
qos_options = list(QOS_PRESET_MAP.keys())
|
||
saved_qos = saved_settings.get('qos_preset', 'Fast')
|
||
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
|
||
qos_preset = st.selectbox(
|
||
"QoS", options=qos_options, index=default_qos_idx,
|
||
disabled=is_streaming,
|
||
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
|
||
)
|
||
|
||
tx_power = _tx_power_selectbox(
|
||
"TX Power",
|
||
key="usb_tx_power",
|
||
default=saved_settings.get('advertising_tx_power', TX_POWER_DEFAULT),
|
||
disabled=is_streaming,
|
||
)
|
||
|
||
stream_name = st.text_input(
|
||
"Channel Name",
|
||
value=default_name,
|
||
disabled=is_streaming,
|
||
help="The primary name for your broadcast. Like the SSID of a WLAN, it identifies your stream for receivers."
|
||
)
|
||
program_info = st.text_input(
|
||
"Program Info",
|
||
value=default_program_info,
|
||
disabled=is_streaming,
|
||
help="Additional details about the broadcast program, such as its content or purpose. Shown to receivers for more context."
|
||
)
|
||
language = st.text_input(
|
||
"Language (ISO 639-3)",
|
||
value=default_lang,
|
||
disabled=is_streaming,
|
||
help="Three-letter language code (e.g., 'eng' for English, 'deu' for German). Used by receivers to display the language of the stream. See: https://en.wikipedia.org/wiki/List_of_ISO_639-3_codes"
|
||
)
|
||
|
||
if audio_mode in ("USB", "Network"):
|
||
if not is_streaming:
|
||
try:
|
||
if audio_mode == "USB":
|
||
endpoint = "/audio_inputs_pw_usb"
|
||
elif audio_mode == "Network":
|
||
endpoint = "/audio_inputs_pw_network"
|
||
|
||
resp = requests.get(f"{BACKEND_URL}{endpoint}")
|
||
device_list = resp.json().get('inputs', [])
|
||
except Exception as e:
|
||
st.error(f"Failed to fetch devices: {e}")
|
||
device_list = []
|
||
|
||
if audio_mode == "USB":
|
||
device_list = [d for d in device_list if d.get('name') not in ('ch1', 'ch2')]
|
||
|
||
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
|
||
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
|
||
device_names = [d['name'] for d in device_list]
|
||
|
||
default_input_name = saved_settings.get('input_device')
|
||
# If saved device isn't in current mode's device list, use first available
|
||
if default_input_name not in device_names and device_names:
|
||
default_input_name = device_names[0]
|
||
default_input_label = None
|
||
for label, name in option_name_map.items():
|
||
if name == default_input_name:
|
||
default_input_label = label
|
||
break
|
||
if not input_options:
|
||
warn_text = (
|
||
"No USB audio input devices found. Connect a USB input and click Refresh."
|
||
if audio_mode == "USB" else
|
||
"No AES67/Network inputs found."
|
||
)
|
||
st.warning(warn_text)
|
||
refresh_key = "refresh_usb" if audio_mode == "USB" else "refresh_aes67"
|
||
if st.button("Refresh", key=refresh_key, disabled=is_streaming):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
|
||
if not r.ok:
|
||
st.error(f"Failed to refresh: {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Failed to refresh devices: {e}")
|
||
st.rerun()
|
||
input_device = None
|
||
else:
|
||
col1, col2 = st.columns([3, 1], vertical_alignment="bottom")
|
||
with col1:
|
||
selected_option = st.selectbox(
|
||
"Input Device",
|
||
input_options,
|
||
index=input_options.index(default_input_label) if default_input_label in input_options else 0
|
||
)
|
||
with col2:
|
||
refresh_key = "refresh_inputs" if audio_mode == "USB" else "refresh_inputs_net"
|
||
if st.button("Refresh", key=refresh_key, disabled=is_streaming):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
|
||
if not r.ok:
|
||
st.error(f"Failed to refresh: {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Failed to refresh devices: {e}")
|
||
st.rerun()
|
||
input_device = option_name_map.get(selected_option)
|
||
else:
|
||
input_device = saved_settings.get('input_device')
|
||
current_label = input_device or "No device selected"
|
||
st.selectbox(
|
||
"Input Device",
|
||
[current_label],
|
||
index=0,
|
||
disabled=True,
|
||
help="Stop the stream to change the input device."
|
||
)
|
||
else:
|
||
input_device = None
|
||
|
||
if stop_stream:
|
||
st.session_state['stream_started'] = False
|
||
try:
|
||
if audio_mode == "TextCast":
|
||
r = requests.post(f"{BACKEND_URL}/stop_textcast").json()
|
||
else:
|
||
r = requests.post(f"{BACKEND_URL}/stop_audio").json()
|
||
if audio_mode == "Demo":
|
||
st.session_state['demo_stream_started'] = False
|
||
if r.get('was_running'):
|
||
is_stopped = True
|
||
except Exception as e:
|
||
st.error(f"Error: {e}")
|
||
|
||
|
||
if start_stream:
|
||
if audio_mode == "TextCast":
|
||
uploaded = st.session_state.get('_textcast_dcp_content')
|
||
if not uploaded:
|
||
st.error("Upload a DCP XML file first.")
|
||
else:
|
||
try:
|
||
ru = requests.post(f"{BACKEND_URL}/upload_dcp", json={"xml": uploaded})
|
||
if not ru.ok:
|
||
st.error(f"Upload failed: {ru.text}")
|
||
else:
|
||
rs = requests.post(f"{BACKEND_URL}/start_textcast")
|
||
if rs.ok:
|
||
st.success("TextCast started.")
|
||
st.rerun()
|
||
else:
|
||
st.error(f"Start failed: {rs.text}")
|
||
except Exception as e:
|
||
st.error(f"Error: {e}")
|
||
|
||
else:
|
||
|
||
# Always send stop to ensure backend is in a clean state, regardless of current status
|
||
r = requests.post(f"{BACKEND_URL}/stop_audio").json()
|
||
# Small pause lets backend fully release audio devices before re-init
|
||
time.sleep(1)
|
||
|
||
if audio_mode == "Demo":
|
||
demo_cfg = demo_stream_map[demo_selected]
|
||
q = QUALITY_MAP[demo_cfg['quality']]
|
||
|
||
lang_cfgs = [
|
||
(auracast_config.AuracastBigConfigDeu, 'de'),
|
||
(auracast_config.AuracastBigConfigEng, 'en'),
|
||
(auracast_config.AuracastBigConfigFra, 'fr'),
|
||
(auracast_config.AuracastBigConfigSpa, 'es'),
|
||
(auracast_config.AuracastBigConfigIta, 'it'),
|
||
(auracast_config.AuracastBigConfigPol, 'pl'),
|
||
]
|
||
bigs1 = []
|
||
for i in range(demo_cfg['streams']):
|
||
cfg_cls, lang = lang_cfgs[i % len(lang_cfgs)]
|
||
if demo_content == "1 kHz test tone":
|
||
source_file = f'../testdata/test_tone_1k_{int(q["rate"]/1000)}kHz_mono.lc3'
|
||
big_kwargs = {
|
||
'name': 'test tone',
|
||
'program_info': '1khz',
|
||
}
|
||
else:
|
||
source_file = f'../testdata/wave_particle_5min_{lang}_{int(q["rate"]/1000)}kHz_mono.lc3'
|
||
big_kwargs = {}
|
||
bigs1.append(cfg_cls(
|
||
code=(stream_passwort.strip() or None),
|
||
audio_source=f'file:{source_file}',
|
||
iso_que_len=32,
|
||
sampling_frequency=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
**big_kwargs,
|
||
))
|
||
|
||
max_per_mc = {48000: 1, 24000: 2, 16000: 3}
|
||
max_streams = max_per_mc.get(q['rate'], 3)
|
||
bigs2 = []
|
||
if len(bigs1) > max_streams:
|
||
bigs2 = bigs1[max_streams:]
|
||
bigs1 = bigs1[:max_streams]
|
||
config1 = auracast_config.AuracastConfigGroup(
|
||
auracast_sampling_rate_hz=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
transport='',
|
||
assisted_listening_stream=assisted_listening,
|
||
immediate_rendering=immediate_rendering,
|
||
presentation_delay_us=int(presentation_delay_ms * 1000),
|
||
qos_config=QOS_PRESET_MAP[qos_preset],
|
||
advertising_tx_power=tx_power_r1,
|
||
bigs=bigs1
|
||
)
|
||
config2 = None
|
||
if bigs2:
|
||
config2 = auracast_config.AuracastConfigGroup(
|
||
auracast_sampling_rate_hz=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
transport='',
|
||
assisted_listening_stream=assisted_listening,
|
||
immediate_rendering=immediate_rendering,
|
||
presentation_delay_us=int(presentation_delay_ms * 1000),
|
||
qos_config=QOS_PRESET_MAP[qos_preset],
|
||
advertising_tx_power=tx_power_r2,
|
||
bigs=bigs2
|
||
)
|
||
|
||
is_started = False
|
||
try:
|
||
r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump())
|
||
if r1.status_code == 200:
|
||
st.session_state['demo_stream_started'] = True
|
||
is_started = True
|
||
else:
|
||
st.session_state['demo_stream_started'] = False
|
||
st.error(f"Failed to initialize multicaster 1: {r1.text}")
|
||
if config2:
|
||
r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump())
|
||
if r2.status_code == 200:
|
||
is_started = True
|
||
else:
|
||
st.error(f"Failed to initialize multicaster 2: {r2.text}")
|
||
except Exception as e:
|
||
st.session_state['demo_stream_started'] = False
|
||
st.error(f"Error: {e}")
|
||
|
||
if audio_mode == "Analog":
|
||
# Build separate configs per radio, each with its own quality and QoS parameters.
|
||
is_started = False
|
||
|
||
def _build_group_from_radio(cfg: dict) -> auracast_config.AuracastConfigGroup | None:
|
||
if not cfg or not cfg.get('input_device'):
|
||
return None
|
||
q = QUALITY_MAP[cfg['quality']]
|
||
|
||
# Determine if this is stereo mode (only applicable for analog)
|
||
stereo_mode = cfg.get('stereo_mode', False)
|
||
channels = 2 if stereo_mode else 1
|
||
|
||
return auracast_config.AuracastConfigGroup(
|
||
auracast_sampling_rate_hz=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
transport='', # is set in backend
|
||
assisted_listening_stream=bool(cfg['assisted_listening']),
|
||
immediate_rendering=bool(cfg['immediate_rendering']),
|
||
presentation_delay_us=int(cfg['presentation_delay_ms'] * 1000),
|
||
qos_config=QOS_PRESET_MAP[cfg['qos_preset']],
|
||
advertising_tx_power=int(cfg.get('tx_power', TX_POWER_DEFAULT)),
|
||
analog_gain_db_left=cfg.get('analog_gain_db_left', 0.0),
|
||
analog_gain_db_right=cfg.get('analog_gain_db_right', 0.0),
|
||
bigs=[
|
||
auracast_config.AuracastBigConfig(
|
||
code=((cfg['stream_passwort'] or '').strip() or None),
|
||
name=cfg['name'],
|
||
program_info=cfg['program_info'],
|
||
language=cfg['language'],
|
||
audio_source=f"device:{cfg['input_device']}",
|
||
input_format=f"int16le,{q['rate']},{channels}",
|
||
iso_que_len=1,
|
||
sampling_frequency=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
num_bis=channels, # 1=mono, 2=stereo - this determines the behavior
|
||
)
|
||
],
|
||
)
|
||
|
||
# Radio 1 (always active if a device is selected)
|
||
config1 = _build_group_from_radio(radio1_cfg)
|
||
# Radio 2 (optional)
|
||
config2 = _build_group_from_radio(radio2_cfg) if radio2_enabled else None
|
||
|
||
try:
|
||
if config1 is not None:
|
||
r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump())
|
||
if r1.status_code == 200:
|
||
is_started = True
|
||
else:
|
||
st.error(f"Failed to initialize Radio 1: {r1.text}")
|
||
else:
|
||
st.error("Radio 1 has no valid input device configured.")
|
||
|
||
if config2 is not None:
|
||
r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump())
|
||
if r2.status_code != 200:
|
||
st.error(f"Failed to initialize Radio 2: {r2.text}")
|
||
except Exception as e:
|
||
st.error(f"Error while starting Analog radios: {e}")
|
||
elif audio_mode == "Network - Dante":
|
||
# Build multi-stream configs for Dante radios
|
||
is_started = False
|
||
|
||
def _build_dante_radio_config(radio_cfg: dict, radio_id: int) -> auracast_config.AuracastConfigGroup | None:
|
||
if not radio_cfg or not radio_cfg.get('streams'):
|
||
return None
|
||
|
||
q = QUALITY_MAP[radio_cfg['quality']]
|
||
bigs = []
|
||
|
||
# Check if stereo mode is enabled for this radio
|
||
is_stereo_mode = bool(radio_cfg.get('dante_stereo_mode', False))
|
||
|
||
for i, stream in enumerate(radio_cfg['streams']):
|
||
if not stream.get('input_device'):
|
||
continue
|
||
|
||
# Check if this specific stream uses stereo (dante_stereo_X_Y device)
|
||
input_device = stream['input_device']
|
||
stream_is_stereo = is_stereo_mode or input_device.startswith('dante_stereo_')
|
||
num_bis = 2 if stream_is_stereo else 1
|
||
num_channels = 2 if stream_is_stereo else 1
|
||
|
||
bigs.append(auracast_config.AuracastBigConfig(
|
||
code=(stream.get('stream_password', '').strip() or None),
|
||
name=stream['name'],
|
||
program_info=stream['program_info'],
|
||
language=stream['language'],
|
||
audio_source=f"device:{input_device}",
|
||
input_format=f"int16le,{q['rate']},{num_channels}",
|
||
iso_que_len=1,
|
||
sampling_frequency=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
num_bis=num_bis,
|
||
))
|
||
|
||
if not bigs:
|
||
return None
|
||
|
||
return auracast_config.AuracastConfigGroup(
|
||
auracast_sampling_rate_hz=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
transport='', # is set in backend
|
||
assisted_listening_stream=bool(radio_cfg['assisted_listening']),
|
||
immediate_rendering=bool(radio_cfg['immediate_rendering']),
|
||
presentation_delay_us=int(radio_cfg['presentation_delay_ms'] * 1000),
|
||
qos_config=QOS_PRESET_MAP[radio_cfg['qos_preset']],
|
||
advertising_tx_power=int(radio_cfg.get('tx_power', TX_POWER_DEFAULT)),
|
||
bigs=bigs
|
||
)
|
||
|
||
# Radio 1 config
|
||
config1 = _build_dante_radio_config(radio1_cfg, 1)
|
||
# Radio 2 config (optional)
|
||
config2 = _build_dante_radio_config(radio2_cfg, 2) if radio2_cfg else None
|
||
|
||
try:
|
||
if config1 is not None:
|
||
r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump())
|
||
if r1.status_code == 200:
|
||
is_started = True
|
||
else:
|
||
st.error(f"Failed to initialize Dante Radio 1: {r1.text}")
|
||
else:
|
||
st.error("Dante Radio 1 has no valid input devices configured.")
|
||
|
||
if config2 is not None:
|
||
r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump())
|
||
if r2.status_code != 200:
|
||
st.error(f"Failed to initialize Dante Radio 2: {r2.text}")
|
||
except Exception as e:
|
||
st.error(f"Error while starting Dante radios: {e}")
|
||
if audio_mode not in ("Demo", "Analog", "Network - Dante"):
|
||
# USB/Network: single config as before, using shared controls
|
||
q = QUALITY_MAP[quality]
|
||
config = auracast_config.AuracastConfigGroup(
|
||
auracast_sampling_rate_hz=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
transport='', # is set in backend
|
||
assisted_listening_stream=assisted_listening,
|
||
immediate_rendering=immediate_rendering,
|
||
presentation_delay_us=int(presentation_delay_ms * 1000),
|
||
qos_config=QOS_PRESET_MAP[qos_preset],
|
||
advertising_tx_power=tx_power,
|
||
bigs=[
|
||
auracast_config.AuracastBigConfig(
|
||
code=(stream_passwort.strip() or None),
|
||
name=stream_name,
|
||
program_info=program_info,
|
||
language=language,
|
||
audio_source=(f"device:{input_device}"),
|
||
input_format=(f"int16le,{q['rate']},1"),
|
||
iso_que_len=1,
|
||
sampling_frequency=q['rate'],
|
||
octets_per_frame=q['octets'],
|
||
),
|
||
],
|
||
)
|
||
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/init", json=config.model_dump())
|
||
if r.status_code == 200:
|
||
is_started = True
|
||
else:
|
||
st.error(f"Failed to initialize: {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Error: {e}")
|
||
|
||
# Centralized rerun based on start/stop outcomes
|
||
if is_started or is_stopped:
|
||
st.rerun()
|
||
#else:
|
||
#st.header("Advertised Streams (Cloud Announcements)")
|
||
#st.info("This feature requires backend support to list advertised streams.")
|
||
# st.info("This feature requires backend support to list advertised streams.")
|
||
# Placeholder for future implementation
|
||
# Example: r = requests.get(f"{BACKEND_URL}/advertised_streams")
|
||
# if r.status_code == 200:
|
||
# streams = r.json()
|
||
# for s in streams:
|
||
# st.write(s)
|
||
# else:
|
||
# st.error("Could not fetch advertised streams.")
|
||
|
||
############################
|
||
# System expander (collapsed)
|
||
############################
|
||
with st.expander("System control", expanded=False):
|
||
|
||
st.subheader("Status LED")
|
||
led_enabled_current = bool(saved_settings.get("led_enabled", True))
|
||
led_enabled = st.checkbox(
|
||
"Blue LED on while transmitting",
|
||
value=led_enabled_current,
|
||
help="When enabled, the blue LED on GPIO pin 12 lights up while the stream is active."
|
||
)
|
||
if led_enabled != led_enabled_current:
|
||
try:
|
||
requests.post(f"{BACKEND_URL}/set_led_enabled", json={"led_enabled": led_enabled}, timeout=2)
|
||
except Exception as e:
|
||
st.error(f"Failed to update LED setting: {e}")
|
||
st.rerun()
|
||
|
||
st.subheader("System temperatures")
|
||
temp_col1, temp_col2, temp_col3 = st.columns([1, 1, 1])
|
||
with temp_col1:
|
||
refresh_temps = st.button("Refresh", key="refresh_temps_1")
|
||
try:
|
||
case_temp = read_case_temp()
|
||
cpu_temp = read_cpu_temp()
|
||
with temp_col2:
|
||
st.write(f"CPU: {cpu_temp} °C")
|
||
with temp_col3:
|
||
st.write(f"Case: {case_temp} °C")
|
||
except Exception as e:
|
||
st.warning(f"Could not read temperatures: {e}")
|
||
|
||
st.subheader("Network Information")
|
||
try:
|
||
import subprocess, socket
|
||
device_hostname = socket.gethostname()
|
||
st.write(f"Hostname: **{device_hostname}**")
|
||
|
||
network_info_resp = requests.get(f"{BACKEND_URL}/network_info", timeout=5)
|
||
if network_info_resp.status_code == 200:
|
||
network_data = network_info_resp.json()
|
||
interfaces = network_data.get("interfaces", {})
|
||
port_mapping = network_data.get("port_mapping", {})
|
||
|
||
for port_name in ["port1", "port2"]:
|
||
if port_name not in port_mapping:
|
||
continue
|
||
|
||
interface_name = port_mapping[port_name]
|
||
interface_data = interfaces.get(interface_name, {})
|
||
|
||
port_label = "Port 1" if port_name == "port1" else "Port 2"
|
||
st.markdown(f"### {port_label}")
|
||
|
||
ip_address = interface_data.get("ip_address", "N/A")
|
||
is_dhcp = interface_data.get("is_dhcp", True)
|
||
|
||
st.write(f"Interface: **{interface_name}**")
|
||
st.write(f"IP Address: **{ip_address}**")
|
||
|
||
col1, col2 = st.columns([1, 3])
|
||
with col1:
|
||
toggle_key = f"{port_name}_dhcp_toggle"
|
||
current_mode = "DHCP" if is_dhcp else "Static IP"
|
||
new_mode = st.radio(
|
||
"Mode",
|
||
options=["DHCP", "Static IP"],
|
||
index=0 if is_dhcp else 1,
|
||
key=toggle_key,
|
||
horizontal=True
|
||
)
|
||
|
||
with col2:
|
||
if new_mode == "Static IP":
|
||
ip_input_key = f"{port_name}_ip_input"
|
||
default_ip = ip_address if ip_address != "N/A" and not is_dhcp else ""
|
||
new_ip = st.text_input(
|
||
"Static IP Address",
|
||
value=default_ip,
|
||
key=ip_input_key,
|
||
placeholder="192.168.1.100"
|
||
)
|
||
|
||
if st.button(f"Apply", key=f"{port_name}_apply_btn"):
|
||
if not new_ip:
|
||
st.error("Please enter an IP address")
|
||
else:
|
||
import re
|
||
ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
|
||
if not ip_pattern.match(new_ip):
|
||
st.error("Invalid IP address format")
|
||
else:
|
||
octets = new_ip.split('.')
|
||
if not all(0 <= int(octet) <= 255 for octet in octets):
|
||
st.error("IP address octets must be between 0 and 255")
|
||
else:
|
||
try:
|
||
config_payload = {
|
||
"interface": interface_name,
|
||
"is_dhcp": False,
|
||
"ip_address": new_ip,
|
||
"netmask": "24"
|
||
}
|
||
config_resp = requests.post(
|
||
f"{BACKEND_URL}/set_network_config",
|
||
json=config_payload,
|
||
timeout=10
|
||
)
|
||
if config_resp.status_code == 200:
|
||
st.success(f"Static IP {new_ip} applied to {interface_name}")
|
||
time.sleep(2)
|
||
st.rerun()
|
||
else:
|
||
st.error(f"Failed to apply configuration: {config_resp.text}")
|
||
except Exception as e:
|
||
st.error(f"Error applying configuration: {e}")
|
||
else:
|
||
if new_mode != current_mode:
|
||
if st.button(f"Apply DHCP", key=f"{port_name}_dhcp_apply_btn"):
|
||
try:
|
||
config_payload = {
|
||
"interface": interface_name,
|
||
"is_dhcp": True
|
||
}
|
||
config_resp = requests.post(
|
||
f"{BACKEND_URL}/set_network_config",
|
||
json=config_payload,
|
||
timeout=10
|
||
)
|
||
if config_resp.status_code == 200:
|
||
st.success(f"DHCP enabled for {interface_name}")
|
||
time.sleep(2)
|
||
st.rerun()
|
||
else:
|
||
st.error(f"Failed to apply configuration: {config_resp.text}")
|
||
except Exception as e:
|
||
st.error(f"Error applying configuration: {e}")
|
||
|
||
st.markdown("---")
|
||
else:
|
||
result = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=2)
|
||
ips = [ip for ip in result.stdout.strip().split() if not ip.startswith('127.') and ':' not in ip]
|
||
if ips:
|
||
st.write(f"IP Address: **{ips[0]}**")
|
||
else:
|
||
st.warning("No valid IP address found.")
|
||
except Exception as e:
|
||
st.warning(f"Could not determine network info: {e}")
|
||
|
||
st.subheader("CA Certificate")
|
||
st.caption("Download the CA certificate to trust this device's HTTPS connection.")
|
||
try:
|
||
cert_resp = requests.get(f"{BACKEND_URL}/cert", timeout=2)
|
||
if cert_resp.status_code == 200:
|
||
st.download_button(
|
||
label="Download CA Certificate",
|
||
data=cert_resp.content,
|
||
file_name="ca_cert.pem",
|
||
mime="application/x-pem-file",
|
||
)
|
||
else:
|
||
st.warning("CA certificate not available.")
|
||
except Exception as e:
|
||
st.warning(f"Could not fetch CA certificate: {e}")
|
||
|
||
st.subheader("Change password")
|
||
if is_pw_disabled():
|
||
st.info("Frontend password protection is disabled via DISABLE_FRONTEND_PW.")
|
||
else:
|
||
with st.form("change_pw_form"):
|
||
cur = st.text_input("Current password", type="password")
|
||
new1 = st.text_input("New password", type="password")
|
||
new2 = st.text_input("Confirm new password", type="password")
|
||
submit_change = st.form_submit_button("Change password")
|
||
if submit_change:
|
||
rec = load_pw_record()
|
||
if not rec or not verify_password(cur, rec):
|
||
st.error("Current password is incorrect.")
|
||
elif len(new1) < 6:
|
||
st.error("New password should be at least 6 characters.")
|
||
elif new1 != new2:
|
||
st.error("New passwords do not match.")
|
||
else:
|
||
salt, key = hash_password(new1)
|
||
try:
|
||
save_pw_record(salt, key)
|
||
st.success("Password updated.")
|
||
except Exception as e:
|
||
st.error(f"Failed to update password: {e}")
|
||
|
||
st.subheader("Software Version")
|
||
# Show current version
|
||
try:
|
||
ver_resp = requests.get(f"{BACKEND_URL}/version", timeout=2)
|
||
if ver_resp.ok:
|
||
ver_data = ver_resp.json()
|
||
current_version = ver_data.get('version', 'unknown')
|
||
ver_type = ver_data.get('type', '')
|
||
ver_label = current_version if ver_type == 'tag' else f"{current_version} (dev)"
|
||
st.write(f"**Current version:** {ver_label}")
|
||
else:
|
||
st.write("**Current version:** unknown")
|
||
current_version = "unknown"
|
||
except Exception:
|
||
st.write("**Current version:** unknown")
|
||
current_version = "unknown"
|
||
|
||
# Initialize session state for update check
|
||
if 'available_update' not in st.session_state:
|
||
st.session_state['available_update'] = None
|
||
|
||
col_check, col_status = st.columns([1, 2])
|
||
with col_check:
|
||
if st.button("Check for updates"):
|
||
try:
|
||
check_resp = requests.get(f"{BACKEND_URL}/check_update", timeout=30)
|
||
if check_resp.ok:
|
||
check_data = check_resp.json()
|
||
if check_data.get('error'):
|
||
st.session_state['available_update'] = {'error': check_data['error']}
|
||
else:
|
||
st.session_state['available_update'] = check_data
|
||
else:
|
||
st.session_state['available_update'] = {'error': f"Failed: {check_resp.status_code}"}
|
||
except Exception as e:
|
||
st.session_state['available_update'] = {'error': str(e)}
|
||
st.rerun()
|
||
|
||
with col_status:
|
||
if st.session_state['available_update']:
|
||
upd = st.session_state['available_update']
|
||
if upd.get('error'):
|
||
st.warning(f"Check failed: {upd['error']}")
|
||
elif upd.get('update_available'):
|
||
st.info(f"Update available: **{upd['available']}**")
|
||
else:
|
||
st.success("You are on the latest version.")
|
||
|
||
# Update button (only show if update is available)
|
||
if st.session_state['available_update'] and st.session_state['available_update'].get('update_available'):
|
||
if st.button("Update now"):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/system_update", timeout=120)
|
||
if r.ok:
|
||
result = r.json()
|
||
tag = result.get('tag', 'unknown')
|
||
st.success(f"Update to {tag} initiated. The UI will restart shortly.")
|
||
st.session_state['available_update'] = None
|
||
else:
|
||
st.error(f"Failed to update: {r.status_code} {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Error calling update: {e}")
|
||
|
||
st.subheader("Restart DEP")
|
||
if st.button("Restart DEP"):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/restart_dep", timeout=30)
|
||
if r.ok:
|
||
result = r.json()
|
||
st.success(result.get('message', 'DEP restarted successfully.'))
|
||
else:
|
||
st.error(f"Failed to restart DEP: {r.status_code} {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Error restarting DEP: {str(e)}")
|
||
|
||
st.subheader("Reboot")
|
||
if st.button("Reboot now", type="primary"):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/system_reboot", timeout=1)
|
||
if r.ok:
|
||
st.success("Reboot initiated. The UI will become unreachable shortly.")
|
||
else:
|
||
st.error(f"Failed to reboot: {r.status_code} {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Error calling reboot: {e}")
|
||
|
||
############################
|
||
# Record expander (collapsed)
|
||
############################
|
||
with st.expander("Record", expanded=False):
|
||
|
||
st.subheader("ALSA Device Recording")
|
||
|
||
# Get ALSA devices from backend
|
||
try:
|
||
resp = requests.get(f"{BACKEND_URL}/alsa_devices", timeout=2)
|
||
if resp.status_code == 200:
|
||
alsa_devices = resp.json().get('devices', [])
|
||
else:
|
||
alsa_devices = []
|
||
except Exception:
|
||
alsa_devices = []
|
||
|
||
# ALSA device selection dropdown
|
||
device_options = [f"{d['name']} [{d['id']}]" for d in alsa_devices]
|
||
device_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in alsa_devices}
|
||
|
||
if device_options:
|
||
selected_device = st.selectbox(
|
||
"Select ALSA Device",
|
||
device_options,
|
||
help="Choose an ALSA device for recording"
|
||
)
|
||
selected_device_name = device_name_map.get(selected_device)
|
||
else:
|
||
st.warning("No ALSA devices found.")
|
||
selected_device_name = None
|
||
|
||
# Recording controls
|
||
col_record, col_download, col_delete = st.columns([1, 1, 1])
|
||
|
||
with col_record:
|
||
if st.button("Start Recording (5s)", disabled=not selected_device_name):
|
||
try:
|
||
r = requests.post(f"{BACKEND_URL}/start_recording", json={"device": selected_device_name}, timeout=15)
|
||
if r.ok:
|
||
result = r.json()
|
||
if result.get('success'):
|
||
st.success(f"Recording completed: {result.get('filename')}")
|
||
st.session_state['last_recording'] = result.get('filename')
|
||
else:
|
||
st.error(f"Recording failed: {result.get('error', 'Unknown error')}")
|
||
else:
|
||
st.error(f"Failed to start recording: {r.status_code} {r.text}")
|
||
except Exception as e:
|
||
st.error(f"Error starting recording: {e}")
|
||
|
||
with col_download:
|
||
last_recording = st.session_state.get('last_recording')
|
||
if last_recording:
|
||
try:
|
||
# Get the recorded file for download
|
||
file_resp = requests.get(f"{BACKEND_URL}/download_recording/{last_recording}", timeout=5)
|
||
if file_resp.status_code == 200:
|
||
st.download_button(
|
||
label="Download Last Recording",
|
||
data=file_resp.content,
|
||
file_name=last_recording,
|
||
mime="audio/wav"
|
||
)
|
||
else:
|
||
st.warning("Recording file not available")
|
||
except Exception as e:
|
||
st.warning(f"Could not fetch recording: {e}")
|
||
else:
|
||
st.button("Download Last Recording", disabled=True, help="No recording available yet")
|
||
|
||
with col_delete:
|
||
if st.button("Delete Recordings", type="secondary"):
|
||
try:
|
||
r = requests.delete(f"{BACKEND_URL}/delete_recordings", timeout=10)
|
||
if r.ok:
|
||
result = r.json()
|
||
if result.get('success'):
|
||
st.success(f"Deleted {result.get('deleted_count', 0)} recording(s)")
|
||
st.session_state['last_recording'] = None
|
||
else:
|
||
st.error("Failed to delete recordings")
|
||
else:
|
||
st.error(f"Failed to delete recordings: {r.status_code}")
|
||
except Exception as e:
|
||
st.error(f"Error deleting recordings: {e}")
|
||
|
||
log.basicConfig(
|
||
level=os.environ.get('LOG_LEVEL', log.DEBUG),
|
||
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
|
||
) |