diff --git a/src/multilang_translator/translator_server/translator_server.py b/src/multilang_translator/translator_server/translator_server.py index cbc107b..27fb2d3 100644 --- a/src/multilang_translator/translator_server/translator_server.py +++ b/src/multilang_translator/translator_server/translator_server.py @@ -85,7 +85,7 @@ async def make_announcement(text: str, ep_group: EndpointGroup): Make an announcement to a group of endpoints. """ - if text== "": + if text == "": log.warning("Announcement text is empty") return {"error": "Announcement text is empty"} @@ -94,65 +94,102 @@ async def make_announcement(text: str, ep_group: EndpointGroup): # update the database with the new state and start time so this can be read by another process endpoints_db.update_group(ep_group.id, ep_group) - # Initialize all endpoints in the group if they were not initalized before - for endpoint in ep_group.endpoints: - ep_group.current_state = AnnouncementStates.INIT - endpoints_db.update_group(ep_group.id, ep_group) - await init_endpoint(endpoint, ep_group.languages, ep_group.sampling_rate_hz) + # Initialize all endpoints in the group concurrently + ep_group.current_state = AnnouncementStates.INIT + endpoints_db.update_group(ep_group.id, ep_group) + + # Create init tasks and run them concurrently + init_tasks = [ + init_endpoint(endpoint, ep_group.languages, ep_group.sampling_rate_hz) + for endpoint in ep_group.endpoints + ] - # Translate the text for each language + # Translate the text for each language (concurrently) base_lang = "deu" # German is the base language + target_langs = ep_group.languages.copy() + if base_lang in target_langs: + target_langs.remove(base_lang) + ep_group.current_state = AnnouncementStates.TRANSLATING endpoints_db.update_group(ep_group.id, ep_group) + # Create translation tasks translations = {base_lang: text} - for lang in ep_group.languages: - if lang != base_lang: - # Translate the text - trans_conf = getattr(ep_group.translator_config, lang) - translation = await llm_translator.translate_de_to_x_async( - text=text, - target_language=lang, - client=trans_conf.llm_client, - model=trans_conf.translator_llm, - host=trans_conf.llm_host_url, - token=trans_conf.llm_host_token - ) - translations[lang] = translation - log.info(f"Translated to {lang}: {translation}") + translation_tasks = [] + + for lang in target_langs: + # Prepare translation task + trans_conf = getattr(ep_group.translator_config, lang) + task = llm_translator.translate_de_to_x_async( + text=text, + target_language=lang, + client=trans_conf.llm_client, + model=trans_conf.translator_llm, + host=trans_conf.llm_host_url, + token=trans_conf.llm_host_token + ) + translation_tasks.append(task) + + # Wait for all translations to complete concurrently + results = await asyncio.gather(*translation_tasks) + for i, translation in enumerate(results): + lang = target_langs[i] + translations[lang] = translation + log.info(f"Translated to {lang}: {translation}") - # Generate voices + # Generate voices concurrently ep_group.current_state = AnnouncementStates.GENERATING_VOICE endpoints_db.update_group(ep_group.id, ep_group) + # Prepare synthesis jobs audio = {} - # Convert each translation to audio - for lang, text in translations.items(): - # Get the appropriate language configuration + + # Define a helper function for voice synthesis + async def synthesize_text(lang, text): trans_conf = getattr(ep_group.translator_config, lang) + # Since synthesize is not async, we'll run it in a thread pool + loop = asyncio.get_event_loop() + audio_data = await loop.run_in_executor( + None, + lambda: text_to_speech.synthesize( + text, + ep_group.sampling_rate_hz, + trans_conf.tts_system, + trans_conf.tts_model, + return_lc3=True + ).decode('latin-1') + ) + return lang, audio_data + + # Create tasks for voice synthesis + synthesis_tasks = [ + synthesize_text(lang, text) + for lang, text in translations.items() + ] + + # Wait for all synthesis tasks to complete + synthesis_results = await asyncio.gather(*synthesis_tasks) + + # Build audio dictionary from results + for lang, audio_data in synthesis_results: + audio[lang] = audio_data - # Convert text to LC3-encoded audio using the configuration's TTS settings - audio[lang] = text_to_speech.synthesize( - translations[lang], - ep_group.sampling_rate_hz, # Sample rate from auracast config # TODO: take sampling rate from auracast config - trans_conf.tts_system, # TTS system from config - trans_conf.tts_model, # TTS model from config - return_lc3=True - ).decode('latin-1') - - # Add to audio data dictionary (decode bytes to string for JSON serialization) - + # make sure init finished before broadcasting + await asyncio.gather(*init_tasks) # Start the monitoring coroutine to wait for streaming to complete # This will set the state to COMPLETED when finished asyncio.create_task(monitor_streaming_completion(ep_group)) - - # Broadcast to all endpoints in group + # Broadcast to all endpoints in group concurrently + broadcast_tasks = [] for endpoint in ep_group.endpoints: - # Send the audio data to the server using the existing send_audio function - log.info(f"Broadcastcasting to {endpoint.name} for languages: {', '.join(audio.keys())}") - await multicast_client.send_audio(audio, base_url=endpoint.url) + log.info(f"Broadcasting to {endpoint.name} for languages: {', '.join(audio.keys())}") + task = multicast_client.send_audio(audio, base_url=endpoint.url) + broadcast_tasks.append(task) + + # Wait for all broadcasts to complete + await asyncio.gather(*broadcast_tasks) # Return the translations return {"translations": translations}