From 9addb8bde17c25df31a295dc5e9a73fcf83a394d Mon Sep 17 00:00:00 2001 From: pstruebi Date: Thu, 13 Mar 2025 14:30:33 +0100 Subject: [PATCH] minimal example working with ui --- .../translator_api/translator_api.py | 127 +++++++++++++----- 1 file changed, 96 insertions(+), 31 deletions(-) diff --git a/src/multilang_translator/translator_api/translator_api.py b/src/multilang_translator/translator_api/translator_api.py index 9734151..5cb97a9 100644 --- a/src/multilang_translator/translator_api/translator_api.py +++ b/src/multilang_translator/translator_api/translator_api.py @@ -31,7 +31,7 @@ app.add_middleware( # Get available endpoints and languages from the database -AVAILABLE_ENDPOINTS = endpoints_db.get_all_endpoints() +AVAILABLE_ENDPOINTS = endpoints_db.get_all_endpoints() # TODO: this should pobably not be cached here AVAILABLE_LANGUAGES = endpoints_db.get_available_languages() CURRENT_ENDPOINT_CONFIG = {} @@ -64,27 +64,30 @@ def init_endpoint(endpoint: Endpoint, languages: list[str]): log.info(f"Endpoint {endpoint.name} initialized successfully") -async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup): +async def make_announcement(text: str, ep_group: EndpointGroup): """ - Process an announcement using the multilang_translator. - This function now uses the multicast_client to send announcements to endpoints. + Make an announcement to a group of endpoints. """ + if text== "": + log.warning("Announcement text is empty") + return {"error": "Announcement text is empty"} + ep_group.current_state = AnnouncementStates.IDLE ep_group.anouncement_start_time = time.time() # update the database with the new state and start time so this can be read by another process endpoints_db.update_group(ep_group.id, ep_group) - + # Initialize all endpoints in the group if they were not initalized before for endpoint in ep_group.endpoints: init_endpoint(endpoint, ep_group.languages) - + # Translate the text for each language base_lang = "deu" # German is the base language ep_group.current_state = AnnouncementStates.TRANSLATING endpoints_db.update_group(ep_group.id, ep_group) - + translations = {base_lang: text} for lang in ep_group.languages: if lang != base_lang: @@ -108,6 +111,8 @@ async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup): endpoints_db.update_group(ep_group.id, ep_group) audio = {} + + # Convert each translation to audio for lang, text in translations.items(): # Get the appropriate language configuration lang_conf = getattr(ep_group.translator_config, lang) @@ -123,10 +128,12 @@ async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup): # Add to audio data dictionary (decode bytes to string for JSON serialization) - # Update group progress - ep_group.current_state = AnnouncementStates.BROADCASTING - endpoints_db.update_group(ep_group.id, ep_group) + + # Start the monitoring coroutine to wait for streaming to complete + # This will set the state to COMPLETED when finished + asyncio.create_task(monitor_streaming_completion(ep_group)) + # Broadcast to all endpoints in group for endpoint in ep_group.endpoints: @@ -134,20 +141,78 @@ async def make_announcement(text: str, ep_group: endpoints_db.EndpointGroup): log.info(f"Broadcastcasting to {endpoint.name} for languages: {', '.join(audio.keys())}") multicast_client.send_audio(audio, base_url=endpoint.url) - - # Update group state to completed - #ep_group.current_state = AnnouncementStates.COMPLETED # TODO: somehow the group state needs to be updated to completed after a broadcast - - # Schedule group endpoint reset - # if ep_group.reset_task and not ep_group.reset_task.done(): - # ep_group.reset_task.cancel() - - # ep_group.reset_task = asyncio.create_task(reset_endpoints_after_delay(ep_group.endpoints, 60)) - # Return the translations return {"translations": translations} +async def monitor_streaming_completion(ep_group: EndpointGroup): + """ + Monitor streaming status after audio is sent and update group state when complete. + + Args: + ep_group: The endpoint group being monitored + """ + log.info(f"Starting streaming completion monitoring for endpoint group {ep_group.id}") + + + # Set a shorter timeout as requested + max_completion_time = 60 # seconds + + # First check if we are actually in streaming state + streaming_started = False + initial_check_timeout = 10 # seconds + initial_check_start = time.time() + + # Wait for streaming to start (with timeout) + while time.time() - initial_check_start < initial_check_timeout: + # Wait before checking again + await asyncio.sleep(1) + + any_streaming = False + for endpoint in ep_group.endpoints: + status = multicast_client.get_status(base_url=endpoint.url) + if status.get("is_streaming", False): + any_streaming = True + log.info(f"Streaming confirmed started on endpoint {endpoint.name}") + break + + if any_streaming: + streaming_started = True + break + + + if not streaming_started: + log.warning(f"No endpoints started streaming for group {ep_group.id} after {initial_check_timeout}s") + # Still update to completed since there's nothing to wait for + ep_group.current_state = AnnouncementStates.ERROR + endpoints_db.update_group(ep_group.id, ep_group) + return + + # Update group progress + ep_group.current_state = AnnouncementStates.BROADCASTING + endpoints_db.update_group(ep_group.id, ep_group) + + # Now monitor until streaming completes on all endpoints + check_completion_start_time = time.time() + completed = [False for _ in ep_group.endpoints] + while not all(completed) or time.time() - check_completion_start_time > max_completion_time: + await asyncio.sleep(1) + + # Check status of each endpoint + for i, endpoint in enumerate(ep_group.endpoints): + completed[i] = not multicast_client.get_status(base_url=endpoint.url)['is_streaming'] + + if all(completed): + log.info(f"All endpoints completed streaming for group {ep_group.id}") + # Update group state to completed + ep_group.current_state = AnnouncementStates.COMPLETED + endpoints_db.update_group(ep_group.id, ep_group) + log.info(f"Updated group {ep_group.id} state to COMPLETED") + + else: + log.error(f"Max wait time reached for group {ep_group.id}. Forcing completion.") + + async def reset_endpoints_after_delay(endpoint_ids, delay_seconds): """Reset endpoints after a delay.""" await asyncio.sleep(delay_seconds) @@ -161,6 +226,17 @@ async def reset_endpoints_after_delay(endpoint_ids, delay_seconds): log.error(f"Failed to reset endpoint {endpoint_id}: {e}") +@app.get("/groups/{group_id}/state") # TODO: think about progress tracking +async def get_group_state(group_id: int): + """Get the status of a specific endpoint.""" + # Check if the endpoint exists + ep_group = endpoints_db.get_group_by_id(group_id) + if not ep_group: + raise HTTPException(status_code=404, detail=f"Endpoint {group_id} not found") + + return {"name": ep_group.current_state.name, "value": ep_group.current_state.value} + + @app.get("/groups") async def get_groups(): """Get all endpoint groups with their current status.""" @@ -216,17 +292,6 @@ async def start_announcement(text: str, group_id: int): -@app.get("/groups/{endpoint_id}/state") # TODO: think about progress tracking -async def get_group_state(group_id: int): - """Get the status of a specific endpoint.""" - # Check if the endpoint exists - ep_group = endpoints_db.get_group_by_id(group_id) - if not ep_group: - raise HTTPException(status_code=404, detail=f"Endpoint {group_id} not found") - - return ep_group.current_state - - @app.get("/endpoints") async def get_available_endpoints(): """Get all available endpoints with their capabilities."""