restructure the project for packaging

This commit is contained in:
2025-03-11 11:32:26 +01:00
parent f3bdb6d53f
commit 1485277d66
16 changed files with 489 additions and 294 deletions

View File

View File

@@ -0,0 +1,72 @@
"""
FastAPI implementation of the Airport Announcement System mock backend API.
"""
from fastapi import FastAPI, HTTPException
import sys
import os
# Add the src directory to the Python path
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if src_path not in sys.path:
sys.path.insert(0, src_path)
# Use absolute imports instead of relative imports
from mock_backend.mock_backend import announcement_system
from api_client.models import EndpointGroup
from typing import List
import uvicorn
app = FastAPI()
@app.get("/groups", response_model=List[EndpointGroup])
def get_groups():
return announcement_system.get_endpoint_groups()
@app.post("/groups", response_model=EndpointGroup)
def create_group(group: EndpointGroup):
try:
return announcement_system.add_endpoint_group(group)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.put("/groups/{group_id}", response_model=EndpointGroup)
def update_group(group_id: int, updated_group: EndpointGroup):
try:
return announcement_system.update_endpoint_group(group_id, updated_group)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.delete("/groups/{group_id}")
def delete_group(group_id: int):
try:
announcement_system.delete_endpoint_group(group_id)
return {"message": "Group deleted successfully"}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.post("/announcements")
def start_announcement(text: str, group_id: int):
group = announcement_system.get_endpoint_group(group_id)
if not group:
raise HTTPException(status_code=404, detail=f"Group with ID {group_id} not found")
try:
announcement_system.start_announcement(text, group)
return {"message": "Announcement started successfully"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/announcements/status")
def get_announcement_status():
return announcement_system.get_announcement_status()
@app.get("/endpoints")
def get_available_endpoints():
return announcement_system.get_available_endpoints()
@app.get("/languages")
def get_available_languages():
return announcement_system.get_available_languages()
if __name__ == "__main__":
uvicorn.run('mock_backend.mock_api:app', host="0.0.0.0", port=7999, reload=True)

View File

@@ -0,0 +1,238 @@
"""
Mock implementation of the Airport Announcement System backend.
"""
import time
import threading
from typing import Optional, List
import sys
import os
# Add the src directory to the Python path
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if src_path not in sys.path:
sys.path.insert(0, src_path)
from api_client.models import EndpointGroup, AnnouncementStates
AVAILABLE_ENDPOINTS = [f"endpoint{i}" for i in range(1, 4)] # Predefined endpoints
AVAILABLE_LANGUAGES = ["German", "English", "French", "Spanish", "Italian"]
class AnnouncementSystem:
def __init__(self):
self.available_endpoints = AVAILABLE_ENDPOINTS
self.available_languages = AVAILABLE_LANGUAGES
self.endpoint_groups = [
EndpointGroup(
id=1,
name="Gate1",
endpoints=["endpoint1", "endpoint2"],
languages=["German", "English"]
),
EndpointGroup(
id=2,
name="Gate2",
endpoints=["endpoint3"],
languages=["German", "English"]
)
]
self.active_group_id = None
self._thread = None
self.last_completed_group_id = None
def get_endpoint_groups(self) -> List[EndpointGroup]:
"""Get all endpoint groups."""
return self.endpoint_groups
def get_endpoint_group(self, group_id: int) -> Optional[EndpointGroup]:
"""Get a specific endpoint group by ID."""
for group in self.endpoint_groups:
if group.id == group_id:
return group
return None
def add_endpoint_group(self, group: EndpointGroup) -> EndpointGroup:
"""Add a new endpoint group."""
# Ensure group with this ID doesn't already exist
for existing_group in self.endpoint_groups:
if existing_group.id == group.id:
raise ValueError(f"Group with ID {group.id} already exists")
# If no ID is provided or ID is 0, auto-assign the next available ID
if group.id == 0:
max_id = max([g.id for g in self.endpoint_groups]) if self.endpoint_groups else 0
group.id = max_id + 1
self.endpoint_groups.append(group)
return group
def update_endpoint_group(self, group_id: int, updated_group: EndpointGroup) -> EndpointGroup:
"""Update an existing endpoint group."""
for i, group in enumerate(self.endpoint_groups):
if group.id == group_id:
# Ensure the ID doesn't change
updated_group.id = group_id
self.endpoint_groups[i] = updated_group
return updated_group
raise ValueError(f"Group with ID {group_id} not found")
def delete_endpoint_group(self, group_id: int) -> None:
"""Delete an endpoint group."""
for i, group in enumerate(self.endpoint_groups):
if group.id == group_id:
del self.endpoint_groups[i]
return
raise ValueError(f"Group with ID {group_id} not found")
def get_available_endpoints(self) -> List[str]:
"""Get all available endpoints."""
return self.available_endpoints
def get_available_languages(self) -> List[str]:
"""Get all available languages for announcements."""
return self.available_languages
def _simulate_announcement(self, text: str, group: EndpointGroup) -> None:
"""
Simulate an announcement being made.
This runs in a separate thread to allow the API to continue serving requests.
"""
# Set start time
group.parameters.text = text
group.parameters.languages = ["German", "English"] # Default languages
group.parameters.start_time = time.time()
# Track the active group
self.active_group_id = group.id
# Update status to translating
group.progress.current_state = AnnouncementStates.TRANSLATING.value
group.progress.progress = 0.2
time.sleep(1.5) # Simulate translation time
# Check if we should stop (e.g. if another announcement started)
if self.active_group_id != group.id:
return
# Update status to generating voice
group.progress.current_state = AnnouncementStates.GENERATING_VOICE.value
group.progress.progress = 0.4
time.sleep(2) # Simulate voice generation time
# Check if we should stop
if self.active_group_id != group.id:
return
# Update status to routing
group.progress.current_state = AnnouncementStates.ROUTING.value
group.progress.progress = 0.6
time.sleep(1) # Simulate routing time
# Check if we should stop
if self.active_group_id != group.id:
return
# Update status to active
group.progress.current_state = AnnouncementStates.ACTIVE.value
group.progress.progress = 0.8
time.sleep(2.5) # Simulate announcement playing time
# Check if we should stop
if self.active_group_id != group.id:
return
# Update status to complete
group.progress.current_state = AnnouncementStates.COMPLETE.value
group.progress.progress = 1.0
# Record the last completed group
self.last_completed_group_id = group.id
# Reset active group if this is still the active one
if self.active_group_id == group.id:
self.active_group_id = None
# After a while, reset to idle state
def reset_to_idle():
time.sleep(10) # Keep completed state visible for 10 seconds
if group.progress.current_state == AnnouncementStates.COMPLETE.value:
group.progress.current_state = AnnouncementStates.IDLE.value
group.progress.progress = 0.0
reset_thread = threading.Thread(target=reset_to_idle)
reset_thread.daemon = True
reset_thread.start()
def start_announcement(self, text: str, group: EndpointGroup) -> None:
"""Start a new announcement to the specified endpoint group."""
# Check if an announcement is already in progress
if self.active_group_id is not None:
# Cancel the current announcement
self.active_group_id = None
# Start a new thread to handle the announcement
self._thread = threading.Thread(target=self._simulate_announcement, args=(text, group))
self._thread.daemon = True
self._thread.start()
def get_announcement_status(self) -> dict:
"""Get the status of the current announcement."""
# If an announcement is active, return its status
if self.active_group_id is not None:
group = self.get_endpoint_group(self.active_group_id)
return {
"state": group.progress.current_state,
"progress": group.progress.progress,
"error": group.progress.error,
"details": {
"group": {
"id": group.id,
"name": group.name,
"endpoints": group.endpoints
},
"text": group.parameters.text,
"languages": group.parameters.languages,
"start_time": group.parameters.start_time
}
}
# If no announcement is active but we have a last completed one
elif self.last_completed_group_id is not None:
group = self.get_endpoint_group(self.last_completed_group_id)
if group and group.progress.current_state == AnnouncementStates.COMPLETE.value:
return {
"state": group.progress.current_state,
"progress": group.progress.progress,
"error": group.progress.error,
"details": {
"group": {
"id": group.id,
"name": group.name,
"endpoints": group.endpoints
},
"text": group.parameters.text,
"languages": group.parameters.languages,
"start_time": group.parameters.start_time
}
}
# Default: no active announcement
return {
"state": AnnouncementStates.IDLE.value,
"progress": 0.0,
"error": None,
"details": {
"group": {
"id": 0,
"name": "",
"endpoints": []
},
"text": "",
"languages": [],
"start_time": time.time()
}
}
# Singleton instance
announcement_system = AnnouncementSystem()