Files
auracaster-webui/mock_backend/mock_backend.py

208 lines
8.7 KiB
Python

import time
import threading
from typing import Optional, List
from api_models import EndpointGroup, AnnouncementStates
AVAILABLE_ENDPOINTS = [f"endpoint{i}" for i in range(1, 4)] # Predefined endpoints
AVAILABLE_LANGUAGES = ["German", "English", "French", "Spanish", "Italian"]
class AnnouncementSystem:
def __init__(self):
self.available_endpoints = AVAILABLE_ENDPOINTS
self.available_languages = AVAILABLE_LANGUAGES
self.endpoint_groups = [
EndpointGroup(
id=1,
name="Gate1",
endpoints=["endpoint1", "endpoint2"],
languages=["German", "English"]
),
EndpointGroup(
id=2,
name="Gate2",
endpoints=["endpoint3"],
languages=["German", "English"]
)
]
self.active_group_id = None
self._thread = None
self.last_completed_group_id = None
def start_announcement(self, text: str, group: EndpointGroup):
# Find the group in our list to ensure we're working with the actual instance
target_group = self.get_endpoint_group(group.id)
if not target_group:
raise ValueError(f"Group with ID {group.id} not found")
# Check if this group has an announcement in progress
if target_group.progress.current_state not in [AnnouncementStates.IDLE.value, AnnouncementStates.COMPLETE.value]:
raise Exception("Announcement already in progress for this group")
# Check if any other announcement is in progress
for g in self.endpoint_groups:
if g.progress.current_state not in [AnnouncementStates.IDLE.value, AnnouncementStates.COMPLETE.value]:
raise Exception("Another announcement is already in progress")
# If this group was the last completed group, clear that reference
if self.last_completed_group_id == target_group.id:
self.last_completed_group_id = None
# Reset the group to IDLE state if it was in COMPLETE state
if target_group.progress.current_state == AnnouncementStates.COMPLETE.value:
target_group.progress.current_state = AnnouncementStates.IDLE.value
target_group.progress.progress = 0.0
# Set up the announcement parameters
target_group.parameters.text = text
target_group.parameters.languages = target_group.languages.copy()
target_group.parameters.start_time = time.time()
self.active_group_id = target_group.id
# Start the announcement process in a separate thread
self._thread = threading.Thread(target=self._run_process, args=(target_group,))
self._thread.start()
def _run_process(self, group: EndpointGroup):
try:
self._update_state(group, AnnouncementStates.TRANSLATING)
time.sleep(1) # Simulate translation
self._update_state(group, AnnouncementStates.GENERATING_VOICE)
time.sleep(1) # Voice synthesis
self._update_state(group, AnnouncementStates.ROUTING)
time.sleep(len(group.endpoints) * 0.5)
self._update_state(group, AnnouncementStates.ACTIVE)
time.sleep(1) # Simulate broadcast
self._update_state(group, AnnouncementStates.COMPLETE)
self.last_completed_group_id = group.id
self.active_group_id = None
except Exception as e:
group.progress.error = str(e)
self._update_state(group, AnnouncementStates.ERROR)
self.active_group_id = None
def _update_state(self, group: EndpointGroup, new_state: AnnouncementStates):
group.progress.current_state = new_state.value
# Progress based on state transitions
state_progress = {
AnnouncementStates.TRANSLATING: 0,
AnnouncementStates.GENERATING_VOICE: 0.25,
AnnouncementStates.ROUTING: 0.5,
AnnouncementStates.ACTIVE: 0.75,
AnnouncementStates.COMPLETE: 1.0,
AnnouncementStates.ERROR: 0
}
group.progress.progress = state_progress[new_state]
def get_endpoint_groups(self) -> List[EndpointGroup]:
return self.endpoint_groups
def get_endpoint_group(self, group_id: int) -> Optional[EndpointGroup]:
return next((g for g in self.endpoint_groups if g.id == group_id), None)
def add_endpoint_group(self, group: EndpointGroup) -> EndpointGroup:
if any(g.id == group.id for g in self.endpoint_groups):
raise ValueError(f"Group with ID {group.id} already exists")
self.endpoint_groups.append(group)
return group
def update_endpoint_group(self, group_id: int, updated_group: EndpointGroup) -> EndpointGroup:
if group_id != updated_group.id:
raise ValueError("Group ID cannot be changed")
group = self.get_endpoint_group(group_id)
if not group:
raise ValueError(f"Group with ID {group_id} not found")
# Update only the editable fields, not the state fields
group.name = updated_group.name
group.endpoints = updated_group.endpoints
group.languages = updated_group.languages
return group
def delete_endpoint_group(self, group_id: int) -> None:
group = self.get_endpoint_group(group_id)
if not group:
raise ValueError(f"Group with ID {group_id} not found")
# Cannot delete a group with an active announcement
if group.progress.current_state not in [AnnouncementStates.IDLE.value, AnnouncementStates.COMPLETE.value]:
raise ValueError(f"Cannot delete group with an active announcement")
self.endpoint_groups = [g for g in self.endpoint_groups if g.id != group_id]
def get_available_languages(self) -> List[str]:
"""Get the list of available languages for announcements."""
return self.available_languages
def get_available_endpoints(self) -> List[str]:
"""Get the list of available endpoints for announcements."""
return self.available_endpoints
def get_announcement_status(self) -> dict:
"""Get the status of the current announcement."""
if self.active_group_id is not None:
active_group = self.get_endpoint_group(self.active_group_id)
return {
"state": active_group.progress.current_state,
"progress": active_group.progress.progress,
"error": active_group.progress.error,
"details": {
"text": active_group.parameters.text,
"languages": active_group.parameters.languages or active_group.languages,
"group": {
"id": active_group.id,
"name": active_group.name,
"endpoints": active_group.endpoints,
"languages": active_group.languages
},
"start_time": active_group.parameters.start_time
}
}
elif self.last_completed_group_id is not None:
# Return the last completed announcement
completed_group = self.get_endpoint_group(self.last_completed_group_id)
return {
"state": completed_group.progress.current_state,
"progress": completed_group.progress.progress,
"error": completed_group.progress.error,
"details": {
"text": completed_group.parameters.text,
"languages": completed_group.parameters.languages or completed_group.languages,
"group": {
"id": completed_group.id,
"name": completed_group.name,
"endpoints": completed_group.endpoints,
"languages": completed_group.languages
},
"start_time": completed_group.parameters.start_time
}
}
else:
# No active or completed announcements
return {
"state": AnnouncementStates.IDLE.value,
"progress": 0.0,
"error": None,
"details": {
"text": "",
"languages": [],
"group": {
"id": 0,
"name": "",
"endpoints": [],
"languages": []
},
"start_time": 0.0
}
}
# Singleton instance
announcement_system = AnnouncementSystem()