Files
auracaster-webui/src/auracaster_webui/app.py
2025-03-17 13:47:32 +01:00

333 lines
17 KiB
Python

"""
Airport Announcement System Streamlit frontend application.
"""
import streamlit as st
# Page setup must be first
st.set_page_config(page_title="Airport Announcement System", page_icon="✈️")
import time
import requests
from multilang_translator.translator_client.translator_client import (
get_groups, get_available_languages, get_group_state,
start_announcement, update_group, get_available_endpoints
)
from multilang_translator.translator_models.translator_models import Endpoint, EndpointGroup, AnnouncementStates
# Initialize session state for configuration
if "endpoint_groups" not in st.session_state:
try:
st.session_state.endpoint_groups = get_groups()
except requests.exceptions.RequestException as e:
st.error(f"Failed to load endpoint groups: {str(e)}")
st.session_state.endpoint_groups = []
# Initialize session state for available languages
if "available_languages" not in st.session_state:
try:
st.session_state.available_languages = get_available_languages()
except requests.exceptions.RequestException as e:
st.error(f"Failed to load available languages: {str(e)}")
st.session_state.available_languages = ["deu", "eng"] # Fallback languages using ISO codes
# Initialize session state for announcement text and status tracking
if "announcement_text" not in st.session_state:
st.session_state.announcement_text = "Achtung bitte! Der Flug LH-2456 nach München ist jetzt zum Einsteigen bereit am Gate B12."
if "show_success_message" not in st.session_state:
st.session_state.show_success_message = False
if "announcement_id" not in st.session_state:
st.session_state.announcement_id = 0
if "status_container_key" not in st.session_state:
st.session_state.status_container_key = 0
if "selected_group_id" not in st.session_state:
st.session_state.selected_group_id = None
if "announcement_start_time" not in st.session_state:
st.session_state.announcement_start_time = None
def show_announcement_status(group_id: int):
"""Show the status of an announcement for a specific group."""
try:
# Get the group for additional information
group = next((g for g in st.session_state.endpoint_groups if g.id == group_id), None)
if not group:
st.error(f"Group with ID {group_id} not found")
return
# Get the state from the API
state_name, state_value = get_group_state(group_id)
# Only show status if state is not IDLE
if state_value != AnnouncementStates.IDLE.value:
# Create a container with a unique key for each announcement
# This ensures we get a fresh container for each new announcement
with st.container(key=f"status_container_{st.session_state.status_container_key}"):
# Create the status without a key parameter
status = st.status("**Airport PA System Status**", expanded=True)
with status:
# Calculate progress from state value (normalize to 0.0-1.0 range)
# Assuming states are ordered from IDLE(0) to COMPLETED(4)
max_state_value = AnnouncementStates.COMPLETED.value # COMPLETED is the maximum state value
progress = min(state_value / max_state_value, 1.0)
# Progress elements
progress_bar = st.progress(progress)
time_col, stage_col = st.columns([1, 3])
# Track last displayed state
last_state = None
# Add timeout mechanism
max_wait_time = 60 # Maximum wait time in seconds
start_tracking_time = time.time()
# Update loop
while state_name not in ["COMPLETED", "IDLE", "ERROR"]:
# Check for timeout
elapsed_time = time.time() - start_tracking_time
if elapsed_time > max_wait_time:
st.warning(f"Status tracking timed out after {max_wait_time} seconds")
break
# Only update stage display and elapsed time if state changed
current_state = (state_name, state_value)
if current_state != last_state:
stage_col.write(f"**Stage:** {state_name}")
# Update elapsed time only on state change
if "announcement_start_time" in st.session_state:
start_time = st.session_state.announcement_start_time
elapsed_seconds = time.time() - start_time
time_col.write(f"⏱️ {elapsed_seconds:.1f}s")
else:
# If no start time is available, use the tracking start time
elapsed_seconds = time.time() - start_tracking_time
time_col.write(f"⏱️ {elapsed_seconds:.1f}s")
last_state = current_state
# Update progress bar directly from state value
progress = min(state_value / max_state_value, 1.0)
progress_bar.progress(progress)
# Add a small delay between requests to avoid hammering the API
time.sleep(0.5)
# Refresh state
state_name, state_value = get_group_state(group_id)
# Final update to progress bar
progress = min(state_value / max_state_value, 1.0)
progress_bar.progress(progress)
# Final update to stage display - this ensures we see the COMPLETED state
stage_col.write(f"**Stage:** {state_name}")
# Final update to elapsed time display
if "announcement_start_time" in st.session_state:
start_time = st.session_state.announcement_start_time
final_elapsed_seconds = time.time() - start_time
time_col.write(f"⏱️ {final_elapsed_seconds:.1f}s")
else:
final_elapsed_seconds = time.time() - start_tracking_time
time_col.write(f"⏱️ {final_elapsed_seconds:.1f}s")
# Final state
if state_name == "COMPLETED":
st.success("✅ Announcement completed successfully")
# Display group information
st.write(f"📢 Announcement made to group {group.name}")
# Display endpoints if available
endpoints = group.endpoints
if endpoints:
endpoint_names = [ep.name for ep in endpoints]
st.write(f"📡 Endpoints: {', '.join(endpoint_names)}")
# Display languages if available
languages = group.languages
if languages:
st.write(f"🌐 Languages: {', '.join(languages)}")
# Clear the success message when announcement completes
st.session_state.show_success_message = False
except requests.exceptions.RequestException as e:
st.error(f"Failed to get announcement status: {str(e)}")
# Main interface
st.title("Airport Announcement System ✈️")
# Announcements section
with st.container():
st.header("Announcements")
# Predefined announcements
st.write("**Predefined Announcements** (click to autofill)")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("Letzter Aufruf"):
st.session_state.announcement_text = "Dies ist der letzte Aufruf für Flug LH-380 nach Berlin. Bitte begeben Sie sich sofort zum Gate B15."
with col2:
if st.button("Sicherheitshinweis"):
st.session_state.announcement_text = "Aus Sicherheitsgründen bitten wir Sie, Ihr Gepäck niemals unbeaufsichtigt zu lassen."
with col3:
if st.button("Verspätung"):
st.session_state.announcement_text = "Wir bedauern mitteilen zu müssen, dass sich der Flug LH-472 um 30 Minuten verspätet."
# Custom announcement
with st.form("custom_announcement"):
# Get all groups with their names and IDs
group_options = [(g.name, g.id) for g in st.session_state.endpoint_groups]
selected_group_name = st.selectbox(
"Select announcement area",
options=[g[0] for g in group_options] if group_options else ["No groups available"]
)
message = st.text_area("Enter announcement text", st.session_state.announcement_text)
if st.form_submit_button("Make Announcement"):
if not group_options:
st.error("No endpoint groups available. Please create a group first.")
else:
try:
selected_group_id = next(g[1] for g in group_options if g[0] == selected_group_name)
# Save the selected group ID for status tracking
st.session_state.selected_group_id = selected_group_id
# Store the start time in session state
st.session_state.announcement_start_time = time.time()
# Update session state with the current message
st.session_state.announcement_text = message
start_announcement(message, selected_group_id)
# Set flag to show success message
st.session_state.show_success_message = True
# Increment announcement ID to ensure a fresh status container
st.session_state.announcement_id += 1
st.session_state.status_container_key = st.session_state.announcement_id
except requests.exceptions.RequestException as e:
st.error(f"Failed to start announcement: {str(e)}")
# Show status of any ongoing announcements after the announcement form
if st.session_state.show_success_message and st.session_state.selected_group_id is not None:
show_announcement_status(st.session_state.selected_group_id)
# Configuration section in sidebar
with st.sidebar:
st.header("Configuration")
with st.expander("Endpoint Groups"):
for i, group in enumerate(st.session_state.endpoint_groups):
cols = st.columns([4, 1])
with cols[0]:
# Use a unique key for the text input
input_key = f"group_name_{i}"
# Initialize the previous value in session state if not present
if f"prev_{input_key}" not in st.session_state:
st.session_state[f"prev_{input_key}"] = group.name
new_name = st.text_input(
f"Group Name",
value=group.name,
key=input_key,
on_change=lambda: None # Prevent automatic callbacks
)
# Only update if the name has changed and it's different from the previous value
if new_name != group.name and new_name != st.session_state[f"prev_{input_key}"]:
try:
updated_group = group.model_copy(deep=True)
updated_group.name = new_name
update_group(group.id, updated_group)
# Update the session state with the latest groups
st.session_state.endpoint_groups = get_groups()
# Update the previous value before rerunning
st.session_state[f"prev_{input_key}"] = new_name
st.rerun()
except requests.exceptions.RequestException as e:
st.error(f"Failed to update group name: {str(e)}")
try:
endpoints = get_available_endpoints()
# Use a unique key for the endpoints multiselect
endpoints_key = f"endpoints_select_{i}"
# Initialize the previous value in session state if not present
current_endpoints = [ep.id for ep in group.endpoints]
if f"prev_{endpoints_key}" not in st.session_state:
st.session_state[f"prev_{endpoints_key}"] = current_endpoints
# Create mapping of endpoint ID to name for the multiselect
endpoint_name_to_endpoint = {ep.name: ep for ep in endpoints}
selected_endpoint_names = st.multiselect(
f"Endpoints",
options=list(endpoint_name_to_endpoint.keys()),
default=[ep.name for ep in group.endpoints],
key=endpoints_key
)
# Convert selected names back to Endpoint objects
selected_endpoints = [endpoint_name_to_endpoint[name] for name in selected_endpoint_names]
selected_endpoint_ids = [ep.id for ep in selected_endpoints]
# Only update if endpoints have changed and they're different from previous value
endpoints_changed = selected_endpoint_ids != current_endpoints
endpoints_diff_from_prev = selected_endpoint_ids != st.session_state[f"prev_{endpoints_key}"]
if endpoints_changed and endpoints_diff_from_prev:
updated_group = group.model_copy(deep=True)
updated_group.endpoints = selected_endpoints
update_group(group.id, updated_group)
# Update the session state with the latest groups
st.session_state.endpoint_groups = get_groups()
# Update the previous value before rerunning
st.session_state[f"prev_{endpoints_key}"] = selected_endpoint_ids
st.rerun()
except requests.exceptions.RequestException as e:
st.error(f"Failed to load endpoints: {str(e)}")
# Language selection for the group
try:
# Use a unique key for the languages multiselect
languages_key = f"languages_select_{i}"
# Initialize the previous value in session state if not present
if f"prev_{languages_key}" not in st.session_state:
st.session_state[f"prev_{languages_key}"] = group.languages
selected_languages = st.multiselect(
f"Languages",
options=st.session_state.available_languages,
default=group.languages,
key=languages_key
)
# Only update if languages have changed and they're different from previous value
languages_changed = selected_languages != group.languages
languages_diff_from_prev = selected_languages != st.session_state[f"prev_{languages_key}"]
if languages_changed and languages_diff_from_prev:
updated_group = group.model_copy(deep=True)
updated_group.languages = selected_languages
update_group(group.id, updated_group)
# Update the session state with the latest groups
st.session_state.endpoint_groups = get_groups()
# Update the previous value before rerunning
st.session_state[f"prev_{languages_key}"] = selected_languages
st.rerun()
except Exception as e:
st.error(f"Failed to load languages: {str(e)}")
# Separator for visual clarity
st.markdown("---")