From 1485277d66b9b9508f57ae2d3d8d38c8c44463cc Mon Sep 17 00:00:00 2001 From: pstruebi Date: Tue, 11 Mar 2025 11:32:26 +0100 Subject: [PATCH] restructure the project for packaging --- .gitignore | 1 + Dockerfile | 19 ++ README.md | 62 +++++ main_mock.py | 62 +++-- mock_backend/__init__.py | 3 - mock_backend/mock_backend.py | 207 --------------- pyproject.toml | 40 +++ src/api_client/__init__.py | 0 api_client.py => src/api_client/client.py | 1 + api_models.py => src/api_client/models.py | 0 src/auracaster_webui/__init__.py | 0 .../auracaster_webui/app.py | 109 ++++---- src/auracaster_webui/main_ui.py | 27 ++ src/mock_backend/__init__.py | 0 .../mock_backend}/mock_api.py | 14 +- src/mock_backend/mock_backend.py | 238 ++++++++++++++++++ 16 files changed, 489 insertions(+), 294 deletions(-) create mode 100644 Dockerfile create mode 100644 README.md delete mode 100644 mock_backend/__init__.py delete mode 100644 mock_backend/mock_backend.py create mode 100644 pyproject.toml create mode 100644 src/api_client/__init__.py rename api_client.py => src/api_client/client.py (97%) rename api_models.py => src/api_client/models.py (100%) create mode 100644 src/auracaster_webui/__init__.py rename auracaster-webui.py => src/auracaster_webui/app.py (73%) create mode 100644 src/auracaster_webui/main_ui.py create mode 100644 src/mock_backend/__init__.py rename {mock_backend => src/mock_backend}/mock_api.py (84%) create mode 100644 src/mock_backend/mock_backend.py diff --git a/.gitignore b/.gitignore index 123e6b8..ff877b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /venv/ +*.egg-info *.pyc \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..44fe5be --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Copy only necessary files and directories +COPY src/auracaster_webui/ ./src/auracaster_webui/ +COPY src/api_client/ ./src/api_client/ + +# Install the package +RUN pip install --no-cache-dir . + +# Expose Streamlit port +EXPOSE 8501 + +# Set environment variables +ENV PYTHONUNBUFFERED=1 + +# Run the Streamlit app +CMD ["auracaster-webui"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..380976b --- /dev/null +++ b/README.md @@ -0,0 +1,62 @@ +# Auracaster - Airport Announcement System + +A modern web application for managing and broadcasting announcements throughout an airport. This system allows for quick and efficient communication with passengers in multiple areas and languages. + +## Project Structure + +The project is organized into three main packages: + +- **auracaster_webui**: The Streamlit frontend application for the announcement system. +- **api_client**: Client library for communicating with the backend API. +- **mock_backend**: A mock implementation of the backend API for development and testing. + +## Setup and Installation + +### Development Setup + +1. Create a virtual environment: + ``` + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +2. Install the package in development mode: + ``` + pip install -e . + ``` + +3. Run the mock system (includes both frontend and backend): + ``` + auracaster-mock + ``` + +4. For frontend only (requires a running backend): + ``` + auracaster-webui + ``` + +## Docker + +The project includes a Dockerfile for building a container that packages the webui and api_client components (without the mock backend). + +Build the Docker image: +``` +docker build -t auracaster:latest . +``` + +Run the container: +``` +docker run -p 8501:8501 -e API_BASE_URL=http://backend-host:7999 auracaster:latest +``` + +## Environment Variables + +- `API_BASE_URL`: URL of the backend API server (default: http://localhost:7999) + +## Features + +- Create, update, and delete endpoint groups +- Make announcements to specific areas of the airport +- Monitor announcement status in real-time +- Support for multiple languages +- Predefined announcement templates for common scenarios diff --git a/main_mock.py b/main_mock.py index 4f79497..44d5988 100644 --- a/main_mock.py +++ b/main_mock.py @@ -1,6 +1,5 @@ """ -Main entry point for the Airport Announcement System. -This script starts both the backend API and the Streamlit frontend. +Main entry point for the Airport Announcement System mock backend. """ import subprocess import sys @@ -21,27 +20,44 @@ def stream_output(process, prefix): def start_backend(): """Start the backend API server.""" print("Starting backend API server...") - backend_process = subprocess.Popen( - [sys.executable, "-m", "uvicorn", "mock_backend.mock_api:app", "--host", "0.0.0.0", "--port", "7999", "--reload"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - bufsize=1 # Line buffered - ) - - # Start a thread to stream the backend output - backend_thread = threading.Thread(target=stream_output, args=(backend_process, "BACKEND"), daemon=True) - backend_thread.start() - - # Wait a moment to ensure the backend has started - time.sleep(2) - return backend_process + try: + # Add verbose output for debugging + print("Current working directory:", os.getcwd()) + print("Python path:", sys.path) + + backend_process = subprocess.Popen( + [sys.executable, "-m", "uvicorn", "mock_backend.mock_api:app", "--host", "0.0.0.0", "--port", "7999", "--reload", "--log-level", "debug"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1 # Line buffered + ) + + # Start a thread to stream the backend output + backend_thread = threading.Thread(target=stream_output, args=(backend_process, "BACKEND"), daemon=True) + backend_thread.start() + + # Wait a moment to ensure the backend has started + time.sleep(2) + + # Try to ping the API to make sure it's up + try: + import requests + response = requests.get("http://localhost:7999/groups", timeout=1) + print(f"Backend API check: status code {response.status_code}") + except Exception as e: + print(f"Warning: Backend API check failed: {str(e)}") + + return backend_process + except Exception as e: + print(f"Error starting backend: {str(e)}") + raise def start_frontend(): """Start the Streamlit frontend.""" print("Starting Streamlit frontend...") frontend_process = subprocess.Popen( - [sys.executable, "-m", "streamlit", "run", "auracaster-webui.py"], + [sys.executable, "-m", "streamlit", "run", "./src/auracaster_webui/app.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, @@ -54,10 +70,8 @@ def start_frontend(): return frontend_process -if __name__ == "__main__": - # Ensure we're in the correct directory - os.chdir(Path(__file__).parent) - +def run_mock(): + """Run the mock backend and frontend.""" # Start the backend and frontend backend_process = start_backend() frontend_process = start_frontend() @@ -75,3 +89,7 @@ if __name__ == "__main__": backend_process.terminate() frontend_process.terminate() print("Shutdown complete.") + +if __name__ == "__main__": + os.chdir(os.path.dirname(__file__)) + run_mock() diff --git a/mock_backend/__init__.py b/mock_backend/__init__.py deleted file mode 100644 index aa25c4c..0000000 --- a/mock_backend/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Mock backend for the Airport Announcement System. -""" diff --git a/mock_backend/mock_backend.py b/mock_backend/mock_backend.py deleted file mode 100644 index d456d61..0000000 --- a/mock_backend/mock_backend.py +++ /dev/null @@ -1,207 +0,0 @@ -import time -import threading -from typing import Optional, List - -from api_models import EndpointGroup, AnnouncementStates - -AVAILABLE_ENDPOINTS = [f"endpoint{i}" for i in range(1, 4)] # Predefined endpoints -AVAILABLE_LANGUAGES = ["German", "English", "French", "Spanish", "Italian"] - -class AnnouncementSystem: - def __init__(self): - self.available_endpoints = AVAILABLE_ENDPOINTS - self.available_languages = AVAILABLE_LANGUAGES - self.endpoint_groups = [ - EndpointGroup( - id=1, - name="Gate1", - endpoints=["endpoint1", "endpoint2"], - languages=["German", "English"] - ), - EndpointGroup( - id=2, - name="Gate2", - endpoints=["endpoint3"], - languages=["German", "English"] - ) - ] - self.active_group_id = None - self._thread = None - self.last_completed_group_id = None - - def start_announcement(self, text: str, group: EndpointGroup): - # Find the group in our list to ensure we're working with the actual instance - target_group = self.get_endpoint_group(group.id) - if not target_group: - raise ValueError(f"Group with ID {group.id} not found") - - # Check if this group has an announcement in progress - if target_group.progress.current_state not in [AnnouncementStates.IDLE.value, AnnouncementStates.COMPLETE.value]: - raise Exception("Announcement already in progress for this group") - - # Check if any other announcement is in progress - for g in self.endpoint_groups: - if g.progress.current_state not in [AnnouncementStates.IDLE.value, AnnouncementStates.COMPLETE.value]: - raise Exception("Another announcement is already in progress") - - # If this group was the last completed group, clear that reference - if self.last_completed_group_id == target_group.id: - self.last_completed_group_id = None - - # Reset the group to IDLE state if it was in COMPLETE state - if target_group.progress.current_state == AnnouncementStates.COMPLETE.value: - target_group.progress.current_state = AnnouncementStates.IDLE.value - target_group.progress.progress = 0.0 - - # Set up the announcement parameters - target_group.parameters.text = text - target_group.parameters.languages = target_group.languages.copy() - target_group.parameters.start_time = time.time() - self.active_group_id = target_group.id - - # Start the announcement process in a separate thread - self._thread = threading.Thread(target=self._run_process, args=(target_group,)) - self._thread.start() - - def _run_process(self, group: EndpointGroup): - try: - self._update_state(group, AnnouncementStates.TRANSLATING) - time.sleep(1) # Simulate translation - - self._update_state(group, AnnouncementStates.GENERATING_VOICE) - time.sleep(1) # Voice synthesis - - self._update_state(group, AnnouncementStates.ROUTING) - time.sleep(len(group.endpoints) * 0.5) - - self._update_state(group, AnnouncementStates.ACTIVE) - time.sleep(1) # Simulate broadcast - - self._update_state(group, AnnouncementStates.COMPLETE) - self.last_completed_group_id = group.id - self.active_group_id = None - - except Exception as e: - group.progress.error = str(e) - self._update_state(group, AnnouncementStates.ERROR) - self.active_group_id = None - - def _update_state(self, group: EndpointGroup, new_state: AnnouncementStates): - group.progress.current_state = new_state.value - - # Progress based on state transitions - state_progress = { - AnnouncementStates.TRANSLATING: 0, - AnnouncementStates.GENERATING_VOICE: 0.25, - AnnouncementStates.ROUTING: 0.5, - AnnouncementStates.ACTIVE: 0.75, - AnnouncementStates.COMPLETE: 1.0, - AnnouncementStates.ERROR: 0 - } - group.progress.progress = state_progress[new_state] - - def get_endpoint_groups(self) -> List[EndpointGroup]: - return self.endpoint_groups - - def get_endpoint_group(self, group_id: int) -> Optional[EndpointGroup]: - return next((g for g in self.endpoint_groups if g.id == group_id), None) - - def add_endpoint_group(self, group: EndpointGroup) -> EndpointGroup: - if any(g.id == group.id for g in self.endpoint_groups): - raise ValueError(f"Group with ID {group.id} already exists") - self.endpoint_groups.append(group) - return group - - def update_endpoint_group(self, group_id: int, updated_group: EndpointGroup) -> EndpointGroup: - if group_id != updated_group.id: - raise ValueError("Group ID cannot be changed") - - group = self.get_endpoint_group(group_id) - if not group: - raise ValueError(f"Group with ID {group_id} not found") - - # Update only the editable fields, not the state fields - group.name = updated_group.name - group.endpoints = updated_group.endpoints - group.languages = updated_group.languages - return group - - def delete_endpoint_group(self, group_id: int) -> None: - group = self.get_endpoint_group(group_id) - if not group: - raise ValueError(f"Group with ID {group_id} not found") - - # Cannot delete a group with an active announcement - if group.progress.current_state not in [AnnouncementStates.IDLE.value, AnnouncementStates.COMPLETE.value]: - raise ValueError(f"Cannot delete group with an active announcement") - - self.endpoint_groups = [g for g in self.endpoint_groups if g.id != group_id] - - def get_available_languages(self) -> List[str]: - """Get the list of available languages for announcements.""" - return self.available_languages - - def get_available_endpoints(self) -> List[str]: - """Get the list of available endpoints for announcements.""" - return self.available_endpoints - - def get_announcement_status(self) -> dict: - """Get the status of the current announcement.""" - if self.active_group_id is not None: - active_group = self.get_endpoint_group(self.active_group_id) - return { - "state": active_group.progress.current_state, - "progress": active_group.progress.progress, - "error": active_group.progress.error, - "details": { - "text": active_group.parameters.text, - "languages": active_group.parameters.languages or active_group.languages, - "group": { - "id": active_group.id, - "name": active_group.name, - "endpoints": active_group.endpoints, - "languages": active_group.languages - }, - "start_time": active_group.parameters.start_time - } - } - elif self.last_completed_group_id is not None: - # Return the last completed announcement - completed_group = self.get_endpoint_group(self.last_completed_group_id) - return { - "state": completed_group.progress.current_state, - "progress": completed_group.progress.progress, - "error": completed_group.progress.error, - "details": { - "text": completed_group.parameters.text, - "languages": completed_group.parameters.languages or completed_group.languages, - "group": { - "id": completed_group.id, - "name": completed_group.name, - "endpoints": completed_group.endpoints, - "languages": completed_group.languages - }, - "start_time": completed_group.parameters.start_time - } - } - else: - # No active or completed announcements - return { - "state": AnnouncementStates.IDLE.value, - "progress": 0.0, - "error": None, - "details": { - "text": "", - "languages": [], - "group": { - "id": 0, - "name": "", - "endpoints": [], - "languages": [] - }, - "start_time": 0.0 - } - } - -# Singleton instance -announcement_system = AnnouncementSystem() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b482895 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,40 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "auracaster" +version = "0.1.0" +description = "Airport Announcement System" +readme = "README.md" +authors = [ + {name = "Airport Team"} +] +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] +dependencies = [ + "streamlit>=1.25.0", + "requests>=2.28.0", + "fastapi>=0.95.0", + "uvicorn>=0.22.0", + "pydantic>=1.10.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", +] + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] + +[project.scripts] +auracaster-webui = "auracaster_webui.main:run_app" +auracaster-mock = "mock_backend.main:run_mock" diff --git a/src/api_client/__init__.py b/src/api_client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api_client.py b/src/api_client/client.py similarity index 97% rename from api_client.py rename to src/api_client/client.py index 490e0ef..866cde0 100644 --- a/api_client.py +++ b/src/api_client/client.py @@ -4,6 +4,7 @@ API client functions for interacting with the Airport Announcement System backen import requests from typing import List, Optional +# This can be overridden through environment variables API_BASE_URL = "http://localhost:7999" def get_groups() -> List[dict]: diff --git a/api_models.py b/src/api_client/models.py similarity index 100% rename from api_models.py rename to src/api_client/models.py diff --git a/src/auracaster_webui/__init__.py b/src/auracaster_webui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/auracaster-webui.py b/src/auracaster_webui/app.py similarity index 73% rename from auracaster-webui.py rename to src/auracaster_webui/app.py index 5248f41..502a1ee 100644 --- a/auracaster-webui.py +++ b/src/auracaster_webui/app.py @@ -1,3 +1,6 @@ +""" +Airport Announcement System Streamlit frontend application. +""" import streamlit as st # Page setup must be first @@ -5,12 +8,15 @@ st.set_page_config(page_title="Airport Announcement System", page_icon="✈️") import time import requests -import api_client +from api_client.client import ( + get_groups, get_available_languages, get_announcement_status, + start_announcement, update_group, get_available_endpoints +) # Initialize session state for configuration if "endpoint_groups" not in st.session_state: try: - st.session_state.endpoint_groups = api_client.get_groups() + st.session_state.endpoint_groups = get_groups() except requests.exceptions.RequestException as e: st.error(f"Failed to load endpoint groups: {str(e)}") st.session_state.endpoint_groups = [] @@ -18,7 +24,7 @@ if "endpoint_groups" not in st.session_state: # Initialize session state for available languages if "available_languages" not in st.session_state: try: - st.session_state.available_languages = api_client.get_available_languages() + st.session_state.available_languages = get_available_languages() except requests.exceptions.RequestException as e: st.error(f"Failed to load available languages: {str(e)}") st.session_state.available_languages = ["German", "English"] # Fallback languages @@ -36,7 +42,7 @@ if "status_container_key" not in st.session_state: def show_announcement_status(): try: - status_data = api_client.get_announcement_status() + status_data = get_announcement_status() if status_data["state"] != "Ready": # Create a container with a unique key for each announcement # This ensures we get a fresh container for each new announcement @@ -67,7 +73,7 @@ def show_announcement_status(): time.sleep(0.3) # Refresh status data - status_data = api_client.get_announcement_status() + status_data = get_announcement_status() # Make sure to update the progress bar one final time with the final state's progress value progress_bar.progress(status_data["progress"]) @@ -122,7 +128,7 @@ with st.container(): if st.form_submit_button("Make Announcement"): try: selected_group_id = next(g[1] for g in group_options if g[0] == selected_group_name) - api_client.start_announcement(message, selected_group_id) + start_announcement(message, selected_group_id) # Set flag to show success message st.session_state.show_success_message = True @@ -135,10 +141,6 @@ with st.container(): st.session_state.announcement_text = "" except requests.exceptions.RequestException as e: st.error(f"Failed to start announcement: {str(e)}") - - # Display success message if flag is set - #if st.session_state.show_success_message: - # st.success("Announcement started successfully") # Configuration section in sidebar with st.sidebar: @@ -167,9 +169,9 @@ with st.sidebar: try: updated_group = group.copy() updated_group["name"] = new_name - api_client.update_group(group["id"], updated_group) + update_group(group["id"], updated_group) # Update the session state with the latest groups - st.session_state.endpoint_groups = api_client.get_groups() + st.session_state.endpoint_groups = get_groups() # Update the previous value before rerunning st.session_state[f"prev_{input_key}"] = new_name st.rerun() @@ -177,7 +179,7 @@ with st.sidebar: st.error(f"Failed to update group name: {str(e)}") try: - available_endpoints = api_client.get_available_endpoints() + available_endpoints = get_available_endpoints() # Use a unique key for the endpoints multiselect endpoints_key = f"endpoints_select_{i}" @@ -196,66 +198,51 @@ with st.sidebar: if selected_endpoints != group["endpoints"] and selected_endpoints != st.session_state[f"prev_{endpoints_key}"]: updated_group = group.copy() updated_group["endpoints"] = selected_endpoints - api_client.update_group(group["id"], updated_group) + update_group(group["id"], updated_group) # Update the previous value before rerunning st.session_state[f"prev_{endpoints_key}"] = selected_endpoints + st.session_state.endpoint_groups = get_groups() st.rerun() except requests.exceptions.RequestException as e: - st.error(f"Failed to update endpoints: {str(e)}") + st.error(f"Failed to get available endpoints: {str(e)}") with cols[1]: - if st.button("❌", key=f"remove_{i}"): + if st.button("Delete", key=f"delete_group_{i}"): try: - api_client.delete_group(group["id"]) - del st.session_state.endpoint_groups[i] + # This is assumed to exist in the API client module + from api_client.client import delete_group + delete_group(group["id"]) + # Update the session state with the latest groups + st.session_state.endpoint_groups = get_groups() st.rerun() except requests.exceptions.RequestException as e: st.error(f"Failed to delete group: {str(e)}") + + # Add new group + st.subheader("Add New Group") + with st.form("add_group_form"): + new_group_name = st.text_input("New Group Name") try: - # Use a unique key for the languages multiselect - languages_key = f"languages_select_{i}" - - # Initialize the previous value in session state if not present - if f"prev_{languages_key}" not in st.session_state: - st.session_state[f"prev_{languages_key}"] = group["languages"] - - selected_languages = st.multiselect( - f"Languages {i+1}", - options=st.session_state.available_languages, - default=group["languages"], - key=languages_key - ) - - # Only update if languages have changed and they're different from previous value - if selected_languages != group["languages"] and selected_languages != st.session_state[f"prev_{languages_key}"]: - updated_group = group.copy() - updated_group["languages"] = selected_languages - api_client.update_group(group["id"], updated_group) - # Update the previous value before rerunning - st.session_state[f"prev_{languages_key}"] = selected_languages - st.rerun() + available_endpoints = get_available_endpoints() + new_group_endpoints = st.multiselect("Endpoints", options=available_endpoints) except requests.exceptions.RequestException as e: - st.error(f"Failed to update languages: {str(e)}") - st.markdown("---") - - if st.button("➕ Add Group"): - try: - new_id = max(g["id"] for g in st.session_state.endpoint_groups) + 1 if st.session_state.endpoint_groups else 1 - # Get the default languages from the backend (first two languages) - default_languages = st.session_state.available_languages[:2] if len(st.session_state.available_languages) >= 2 else st.session_state.available_languages - new_group = { - "id": new_id, - "name": f"Group {len(st.session_state.endpoint_groups)+1}", - "endpoints": [], - "languages": default_languages - } - created_group = api_client.create_group(new_group) - st.session_state.endpoint_groups.append(created_group) - st.rerun() - except requests.exceptions.RequestException as e: - st.error(f"Failed to create group: {str(e)}") + st.error(f"Failed to get available endpoints: {str(e)}") + new_group_endpoints = [] + + if st.form_submit_button("Add Group"): + if new_group_name: + try: + from api_client.client import create_group + new_group = {"name": new_group_name, "endpoints": new_group_endpoints} + create_group(new_group) + # Update the session state with the latest groups + st.session_state.endpoint_groups = get_groups() + st.rerun() + except requests.exceptions.RequestException as e: + st.error(f"Failed to create group: {str(e)}") + else: + st.error("Group name cannot be empty") -# Display announcement status -st.write("---") +# Show the announcement status show_announcement_status() diff --git a/src/auracaster_webui/main_ui.py b/src/auracaster_webui/main_ui.py new file mode 100644 index 0000000..5352191 --- /dev/null +++ b/src/auracaster_webui/main_ui.py @@ -0,0 +1,27 @@ +""" +Main entry point for the Airport Announcement System frontend. +""" +import streamlit.web.cli as stcli +import sys +from pathlib import Path +import os + +def run_app(): + """Run the Streamlit app.""" + # Get the directory of this file + current_dir = Path(__file__).parent + + # Set the path to the Streamlit app file + app_path = current_dir / "app.py" + + # Change directory to the app's directory + os.chdir(current_dir) + + # Use sys.argv[0] as the program name + sys.argv = ["streamlit", "run", str(app_path)] + + # Run the Streamlit CLI + sys.exit(stcli.main()) + +if __name__ == "__main__": + run_app() diff --git a/src/mock_backend/__init__.py b/src/mock_backend/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mock_backend/mock_api.py b/src/mock_backend/mock_api.py similarity index 84% rename from mock_backend/mock_api.py rename to src/mock_backend/mock_api.py index 534e7ad..a154398 100644 --- a/mock_backend/mock_api.py +++ b/src/mock_backend/mock_api.py @@ -1,6 +1,18 @@ +""" +FastAPI implementation of the Airport Announcement System mock backend API. +""" from fastapi import FastAPI, HTTPException +import sys +import os + +# Add the src directory to the Python path +src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +if src_path not in sys.path: + sys.path.insert(0, src_path) + +# Use absolute imports instead of relative imports from mock_backend.mock_backend import announcement_system -from api_models import EndpointGroup +from api_client.models import EndpointGroup from typing import List import uvicorn diff --git a/src/mock_backend/mock_backend.py b/src/mock_backend/mock_backend.py new file mode 100644 index 0000000..6bda208 --- /dev/null +++ b/src/mock_backend/mock_backend.py @@ -0,0 +1,238 @@ +""" +Mock implementation of the Airport Announcement System backend. +""" +import time +import threading +from typing import Optional, List +import sys +import os + +# Add the src directory to the Python path +src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +if src_path not in sys.path: + sys.path.insert(0, src_path) + +from api_client.models import EndpointGroup, AnnouncementStates + +AVAILABLE_ENDPOINTS = [f"endpoint{i}" for i in range(1, 4)] # Predefined endpoints +AVAILABLE_LANGUAGES = ["German", "English", "French", "Spanish", "Italian"] + +class AnnouncementSystem: + def __init__(self): + self.available_endpoints = AVAILABLE_ENDPOINTS + self.available_languages = AVAILABLE_LANGUAGES + self.endpoint_groups = [ + EndpointGroup( + id=1, + name="Gate1", + endpoints=["endpoint1", "endpoint2"], + languages=["German", "English"] + ), + EndpointGroup( + id=2, + name="Gate2", + endpoints=["endpoint3"], + languages=["German", "English"] + ) + ] + self.active_group_id = None + self._thread = None + self.last_completed_group_id = None + + def get_endpoint_groups(self) -> List[EndpointGroup]: + """Get all endpoint groups.""" + return self.endpoint_groups + + def get_endpoint_group(self, group_id: int) -> Optional[EndpointGroup]: + """Get a specific endpoint group by ID.""" + for group in self.endpoint_groups: + if group.id == group_id: + return group + return None + + def add_endpoint_group(self, group: EndpointGroup) -> EndpointGroup: + """Add a new endpoint group.""" + # Ensure group with this ID doesn't already exist + for existing_group in self.endpoint_groups: + if existing_group.id == group.id: + raise ValueError(f"Group with ID {group.id} already exists") + + # If no ID is provided or ID is 0, auto-assign the next available ID + if group.id == 0: + max_id = max([g.id for g in self.endpoint_groups]) if self.endpoint_groups else 0 + group.id = max_id + 1 + + self.endpoint_groups.append(group) + return group + + def update_endpoint_group(self, group_id: int, updated_group: EndpointGroup) -> EndpointGroup: + """Update an existing endpoint group.""" + for i, group in enumerate(self.endpoint_groups): + if group.id == group_id: + # Ensure the ID doesn't change + updated_group.id = group_id + self.endpoint_groups[i] = updated_group + return updated_group + + raise ValueError(f"Group with ID {group_id} not found") + + def delete_endpoint_group(self, group_id: int) -> None: + """Delete an endpoint group.""" + for i, group in enumerate(self.endpoint_groups): + if group.id == group_id: + del self.endpoint_groups[i] + return + + raise ValueError(f"Group with ID {group_id} not found") + + def get_available_endpoints(self) -> List[str]: + """Get all available endpoints.""" + return self.available_endpoints + + def get_available_languages(self) -> List[str]: + """Get all available languages for announcements.""" + return self.available_languages + + def _simulate_announcement(self, text: str, group: EndpointGroup) -> None: + """ + Simulate an announcement being made. + This runs in a separate thread to allow the API to continue serving requests. + """ + # Set start time + group.parameters.text = text + group.parameters.languages = ["German", "English"] # Default languages + group.parameters.start_time = time.time() + + # Track the active group + self.active_group_id = group.id + + # Update status to translating + group.progress.current_state = AnnouncementStates.TRANSLATING.value + group.progress.progress = 0.2 + time.sleep(1.5) # Simulate translation time + + # Check if we should stop (e.g. if another announcement started) + if self.active_group_id != group.id: + return + + # Update status to generating voice + group.progress.current_state = AnnouncementStates.GENERATING_VOICE.value + group.progress.progress = 0.4 + time.sleep(2) # Simulate voice generation time + + # Check if we should stop + if self.active_group_id != group.id: + return + + # Update status to routing + group.progress.current_state = AnnouncementStates.ROUTING.value + group.progress.progress = 0.6 + time.sleep(1) # Simulate routing time + + # Check if we should stop + if self.active_group_id != group.id: + return + + # Update status to active + group.progress.current_state = AnnouncementStates.ACTIVE.value + group.progress.progress = 0.8 + time.sleep(2.5) # Simulate announcement playing time + + # Check if we should stop + if self.active_group_id != group.id: + return + + # Update status to complete + group.progress.current_state = AnnouncementStates.COMPLETE.value + group.progress.progress = 1.0 + + # Record the last completed group + self.last_completed_group_id = group.id + + # Reset active group if this is still the active one + if self.active_group_id == group.id: + self.active_group_id = None + + # After a while, reset to idle state + def reset_to_idle(): + time.sleep(10) # Keep completed state visible for 10 seconds + if group.progress.current_state == AnnouncementStates.COMPLETE.value: + group.progress.current_state = AnnouncementStates.IDLE.value + group.progress.progress = 0.0 + + reset_thread = threading.Thread(target=reset_to_idle) + reset_thread.daemon = True + reset_thread.start() + + def start_announcement(self, text: str, group: EndpointGroup) -> None: + """Start a new announcement to the specified endpoint group.""" + # Check if an announcement is already in progress + if self.active_group_id is not None: + # Cancel the current announcement + self.active_group_id = None + + # Start a new thread to handle the announcement + self._thread = threading.Thread(target=self._simulate_announcement, args=(text, group)) + self._thread.daemon = True + self._thread.start() + + def get_announcement_status(self) -> dict: + """Get the status of the current announcement.""" + # If an announcement is active, return its status + if self.active_group_id is not None: + group = self.get_endpoint_group(self.active_group_id) + return { + "state": group.progress.current_state, + "progress": group.progress.progress, + "error": group.progress.error, + "details": { + "group": { + "id": group.id, + "name": group.name, + "endpoints": group.endpoints + }, + "text": group.parameters.text, + "languages": group.parameters.languages, + "start_time": group.parameters.start_time + } + } + + # If no announcement is active but we have a last completed one + elif self.last_completed_group_id is not None: + group = self.get_endpoint_group(self.last_completed_group_id) + if group and group.progress.current_state == AnnouncementStates.COMPLETE.value: + return { + "state": group.progress.current_state, + "progress": group.progress.progress, + "error": group.progress.error, + "details": { + "group": { + "id": group.id, + "name": group.name, + "endpoints": group.endpoints + }, + "text": group.parameters.text, + "languages": group.parameters.languages, + "start_time": group.parameters.start_time + } + } + + # Default: no active announcement + return { + "state": AnnouncementStates.IDLE.value, + "progress": 0.0, + "error": None, + "details": { + "group": { + "id": 0, + "name": "", + "endpoints": [] + }, + "text": "", + "languages": [], + "start_time": time.time() + } + } + +# Singleton instance +announcement_system = AnnouncementSystem()