refractoring
This commit is contained in:
148
mock_backend/mock_backend.py
Normal file
148
mock_backend/mock_backend.py
Normal file
@@ -0,0 +1,148 @@
|
||||
import time
|
||||
import threading
|
||||
from enum import Enum
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Optional, List
|
||||
|
||||
ENDPOINTS = [f"endpoint{i}" for i in range(1, 4)] # Predefined endpoints
|
||||
|
||||
class AnnouncementStates(Enum):
|
||||
IDLE: str = "Ready"
|
||||
TRANSLATING: str = "Translating"
|
||||
GENERATING_VOICE: str = "Generating voice synthesis"
|
||||
ROUTING: str = "Routing to endpoints"
|
||||
ACTIVE: str = "Broadcasting announcement"
|
||||
COMPLETE: str = "Complete"
|
||||
ERROR: str = "Error"
|
||||
|
||||
class EndpointGroup(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
endpoints: List[str]
|
||||
languages: List[str]
|
||||
|
||||
class AnnouncementDetails(BaseModel):
|
||||
text: str
|
||||
languages: List[str]
|
||||
group: EndpointGroup
|
||||
start_time: float
|
||||
|
||||
class AnnouncementProgress(BaseModel):
|
||||
current_state: str = AnnouncementStates.IDLE
|
||||
error: Optional[str] = None
|
||||
details: AnnouncementDetails
|
||||
progress: float = Field(default=0.0, ge=0.0, le=1.0)
|
||||
description: str = Field(default="Ready")
|
||||
|
||||
class AnnouncementSystem:
|
||||
def __init__(self):
|
||||
self.available_endpoints = ENDPOINTS
|
||||
self.endpoint_groups = [
|
||||
EndpointGroup(
|
||||
id=1,
|
||||
name="Gate1",
|
||||
endpoints=["endpoint1", "endpoint2"],
|
||||
languages=["German", "English"]
|
||||
),
|
||||
EndpointGroup(
|
||||
id=2,
|
||||
name="Gate2",
|
||||
endpoints=["endpoint3"],
|
||||
languages=["German", "English"]
|
||||
)
|
||||
]
|
||||
self.current_process = AnnouncementProgress(
|
||||
details=AnnouncementDetails(
|
||||
text="",
|
||||
languages=[],
|
||||
group=EndpointGroup(
|
||||
id=0,
|
||||
name="",
|
||||
endpoints=[],
|
||||
languages=[]
|
||||
),
|
||||
start_time=0.0
|
||||
)
|
||||
)
|
||||
self._thread = None
|
||||
|
||||
|
||||
def start_announcement(self, text, group):
|
||||
if self.current_process.current_state not in [AnnouncementStates.IDLE, AnnouncementStates.COMPLETE]:
|
||||
raise Exception("Announcement already in progress")
|
||||
|
||||
self.current_process.details.text = text
|
||||
self.current_process.details.group = group
|
||||
# Set the languages from the group
|
||||
self.current_process.details.languages = group.languages
|
||||
self._thread = threading.Thread(target=self._run_process)
|
||||
self._thread.start()
|
||||
|
||||
def _run_process(self):
|
||||
self.current_process.details.start_time = time.time()
|
||||
try:
|
||||
self._update_state(AnnouncementStates.TRANSLATING)
|
||||
time.sleep(1) # Simulate translation
|
||||
|
||||
self._update_state(AnnouncementStates.GENERATING_VOICE)
|
||||
time.sleep(1) # Voice synthesis
|
||||
|
||||
self._update_state(AnnouncementStates.ROUTING)
|
||||
time.sleep(len(self.current_process.details.group.endpoints) * 0.5)
|
||||
|
||||
self._update_state(AnnouncementStates.ACTIVE)
|
||||
time.sleep(1) # Simulate broadcast
|
||||
|
||||
self._update_state(AnnouncementStates.COMPLETE)
|
||||
|
||||
except Exception as e:
|
||||
self.current_process.error = str(e)
|
||||
self._update_state(AnnouncementStates.ERROR)
|
||||
|
||||
def _update_state(self, new_state):
|
||||
self.current_process.current_state = new_state
|
||||
|
||||
# Progress based on state transitions
|
||||
state_progress = {
|
||||
AnnouncementStates.TRANSLATING: 0,
|
||||
AnnouncementStates.GENERATING_VOICE: 0.25,
|
||||
AnnouncementStates.ROUTING: 0.5,
|
||||
AnnouncementStates.ACTIVE: 0.75,
|
||||
AnnouncementStates.COMPLETE: 1.0,
|
||||
AnnouncementStates.ERROR: 0
|
||||
}
|
||||
self.current_process.progress = state_progress[new_state]
|
||||
|
||||
def get_endpoint_groups(self) -> List[EndpointGroup]:
|
||||
return self.endpoint_groups
|
||||
|
||||
def get_endpoint_group(self, group_id: int) -> Optional[EndpointGroup]:
|
||||
return next((g for g in self.endpoint_groups if g.id == group_id), None)
|
||||
|
||||
def add_endpoint_group(self, group: EndpointGroup) -> EndpointGroup:
|
||||
if any(g.id == group.id for g in self.endpoint_groups):
|
||||
raise ValueError(f"Group with ID {group.id} already exists")
|
||||
self.endpoint_groups.append(group)
|
||||
return group
|
||||
|
||||
def update_endpoint_group(self, group_id: int, updated_group: EndpointGroup) -> EndpointGroup:
|
||||
if group_id != updated_group.id:
|
||||
raise ValueError("Group ID cannot be changed")
|
||||
|
||||
group = self.get_endpoint_group(group_id)
|
||||
if not group:
|
||||
raise ValueError(f"Group with ID {group_id} not found")
|
||||
|
||||
group.name = updated_group.name
|
||||
group.endpoints = updated_group.endpoints
|
||||
group.languages = updated_group.languages
|
||||
return group
|
||||
|
||||
def delete_endpoint_group(self, group_id: int) -> None:
|
||||
group = self.get_endpoint_group(group_id)
|
||||
if not group:
|
||||
raise ValueError(f"Group with ID {group_id} not found")
|
||||
self.endpoint_groups = [g for g in self.endpoint_groups if g.id != group_id]
|
||||
|
||||
# Singleton instance
|
||||
announcement_system = AnnouncementSystem()
|
||||
Reference in New Issue
Block a user