minimal example working with ui

This commit is contained in:
2025-03-13 14:30:33 +01:00
parent ce05358f26
commit 9addb8bde1
@@ -31,7 +31,7 @@ app.add_middleware(
# Get available endpoints and languages from the database
AVAILABLE_ENDPOINTS = endpoints_db.get_all_endpoints()
AVAILABLE_ENDPOINTS = endpoints_db.get_all_endpoints() # TODO: this should pobably not be cached here
AVAILABLE_LANGUAGES = endpoints_db.get_available_languages()
CURRENT_ENDPOINT_CONFIG = {}
@@ -64,27 +64,30 @@ def init_endpoint(endpoint: Endpoint, languages: list[str]):
log.info(f"Endpoint {endpoint.name} initialized successfully")
async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup):
async def make_announcement(text: str, ep_group: EndpointGroup):
"""
Process an announcement using the multilang_translator.
This function now uses the multicast_client to send announcements to endpoints.
Make an announcement to a group of endpoints.
"""
if text== "":
log.warning("Announcement text is empty")
return {"error": "Announcement text is empty"}
ep_group.current_state = AnnouncementStates.IDLE
ep_group.anouncement_start_time = time.time()
# update the database with the new state and start time so this can be read by another process
endpoints_db.update_group(ep_group.id, ep_group)
# Initialize all endpoints in the group if they were not initalized before
for endpoint in ep_group.endpoints:
init_endpoint(endpoint, ep_group.languages)
# Translate the text for each language
base_lang = "deu" # German is the base language
ep_group.current_state = AnnouncementStates.TRANSLATING
endpoints_db.update_group(ep_group.id, ep_group)
translations = {base_lang: text}
for lang in ep_group.languages:
if lang != base_lang:
@@ -108,6 +111,8 @@ async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup):
endpoints_db.update_group(ep_group.id, ep_group)
audio = {}
# Convert each translation to audio
for lang, text in translations.items():
# Get the appropriate language configuration
lang_conf = getattr(ep_group.translator_config, lang)
@@ -123,10 +128,12 @@ async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup):
# Add to audio data dictionary (decode bytes to string for JSON serialization)
# Update group progress
ep_group.current_state = AnnouncementStates.BROADCASTING
endpoints_db.update_group(ep_group.id, ep_group)
# Start the monitoring coroutine to wait for streaming to complete
# This will set the state to COMPLETED when finished
asyncio.create_task(monitor_streaming_completion(ep_group))
# Broadcast to all endpoints in group
for endpoint in ep_group.endpoints:
@@ -134,20 +141,78 @@ async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup):
log.info(f"Broadcastcasting to {endpoint.name} for languages: {', '.join(audio.keys())}")
multicast_client.send_audio(audio, base_url=endpoint.url)
# Update group state to completed
#ep_group.current_state = AnnouncementStates.COMPLETED # TODO: somehow the group state needs to be updated to completed after a broadcast
# Schedule group endpoint reset
# if ep_group.reset_task and not ep_group.reset_task.done():
# ep_group.reset_task.cancel()
# ep_group.reset_task = asyncio.create_task(reset_endpoints_after_delay(ep_group.endpoints, 60))
# Return the translations
return {"translations": translations}
async def monitor_streaming_completion(ep_group: EndpointGroup):
"""
Monitor streaming status after audio is sent and update group state when complete.
Args:
ep_group: The endpoint group being monitored
"""
log.info(f"Starting streaming completion monitoring for endpoint group {ep_group.id}")
# Set a shorter timeout as requested
max_completion_time = 60 # seconds
# First check if we are actually in streaming state
streaming_started = False
initial_check_timeout = 10 # seconds
initial_check_start = time.time()
# Wait for streaming to start (with timeout)
while time.time() - initial_check_start < initial_check_timeout:
# Wait before checking again
await asyncio.sleep(1)
any_streaming = False
for endpoint in ep_group.endpoints:
status = multicast_client.get_status(base_url=endpoint.url)
if status.get("is_streaming", False):
any_streaming = True
log.info(f"Streaming confirmed started on endpoint {endpoint.name}")
break
if any_streaming:
streaming_started = True
break
if not streaming_started:
log.warning(f"No endpoints started streaming for group {ep_group.id} after {initial_check_timeout}s")
# Still update to completed since there's nothing to wait for
ep_group.current_state = AnnouncementStates.ERROR
endpoints_db.update_group(ep_group.id, ep_group)
return
# Update group progress
ep_group.current_state = AnnouncementStates.BROADCASTING
endpoints_db.update_group(ep_group.id, ep_group)
# Now monitor until streaming completes on all endpoints
check_completion_start_time = time.time()
completed = [False for _ in ep_group.endpoints]
while not all(completed) or time.time() - check_completion_start_time > max_completion_time:
await asyncio.sleep(1)
# Check status of each endpoint
for i, endpoint in enumerate(ep_group.endpoints):
completed[i] = not multicast_client.get_status(base_url=endpoint.url)['is_streaming']
if all(completed):
log.info(f"All endpoints completed streaming for group {ep_group.id}")
# Update group state to completed
ep_group.current_state = AnnouncementStates.COMPLETED
endpoints_db.update_group(ep_group.id, ep_group)
log.info(f"Updated group {ep_group.id} state to COMPLETED")
else:
log.error(f"Max wait time reached for group {ep_group.id}. Forcing completion.")
async def reset_endpoints_after_delay(endpoint_ids, delay_seconds):
"""Reset endpoints after a delay."""
await asyncio.sleep(delay_seconds)
@@ -161,6 +226,17 @@ async def reset_endpoints_after_delay(endpoint_ids, delay_seconds):
log.error(f"Failed to reset endpoint {endpoint_id}: {e}")
@app.get("/groups/{group_id}/state") # TODO: think about progress tracking
async def get_group_state(group_id: int):
"""Get the status of a specific endpoint."""
# Check if the endpoint exists
ep_group = endpoints_db.get_group_by_id(group_id)
if not ep_group:
raise HTTPException(status_code=404, detail=f"Endpoint {group_id} not found")
return {"name": ep_group.current_state.name, "value": ep_group.current_state.value}
@app.get("/groups")
async def get_groups():
"""Get all endpoint groups with their current status."""
@@ -216,17 +292,6 @@ async def start_announcement(text: str, group_id: int):
@app.get("/groups/{endpoint_id}/state") # TODO: think about progress tracking
async def get_group_state(group_id: int):
"""Get the status of a specific endpoint."""
# Check if the endpoint exists
ep_group = endpoints_db.get_group_by_id(group_id)
if not ep_group:
raise HTTPException(status_code=404, detail=f"Endpoint {group_id} not found")
return ep_group.current_state
@app.get("/endpoints")
async def get_available_endpoints():
"""Get all available endpoints with their capabilities."""