make the announcement completely async

This commit is contained in:
2025-03-19 11:03:42 +01:00
parent 8501184de5
commit a487f5f462
@@ -85,7 +85,7 @@ async def make_announcement(text: str, ep_group: EndpointGroup):
Make an announcement to a group of endpoints.
"""
if text== "":
if text == "":
log.warning("Announcement text is empty")
return {"error": "Announcement text is empty"}
@@ -94,65 +94,102 @@ async def make_announcement(text: str, ep_group: EndpointGroup):
# update the database with the new state and start time so this can be read by another process
endpoints_db.update_group(ep_group.id, ep_group)
# Initialize all endpoints in the group if they were not initalized before
for endpoint in ep_group.endpoints:
ep_group.current_state = AnnouncementStates.INIT
endpoints_db.update_group(ep_group.id, ep_group)
await init_endpoint(endpoint, ep_group.languages, ep_group.sampling_rate_hz)
# Initialize all endpoints in the group concurrently
ep_group.current_state = AnnouncementStates.INIT
endpoints_db.update_group(ep_group.id, ep_group)
# Create init tasks and run them concurrently
init_tasks = [
init_endpoint(endpoint, ep_group.languages, ep_group.sampling_rate_hz)
for endpoint in ep_group.endpoints
]
# Translate the text for each language
# Translate the text for each language (concurrently)
base_lang = "deu" # German is the base language
target_langs = ep_group.languages.copy()
if base_lang in target_langs:
target_langs.remove(base_lang)
ep_group.current_state = AnnouncementStates.TRANSLATING
endpoints_db.update_group(ep_group.id, ep_group)
# Create translation tasks
translations = {base_lang: text}
for lang in ep_group.languages:
if lang != base_lang:
# Translate the text
trans_conf = getattr(ep_group.translator_config, lang)
translation = await llm_translator.translate_de_to_x_async(
text=text,
target_language=lang,
client=trans_conf.llm_client,
model=trans_conf.translator_llm,
host=trans_conf.llm_host_url,
token=trans_conf.llm_host_token
)
translations[lang] = translation
log.info(f"Translated to {lang}: {translation}")
translation_tasks = []
for lang in target_langs:
# Prepare translation task
trans_conf = getattr(ep_group.translator_config, lang)
task = llm_translator.translate_de_to_x_async(
text=text,
target_language=lang,
client=trans_conf.llm_client,
model=trans_conf.translator_llm,
host=trans_conf.llm_host_url,
token=trans_conf.llm_host_token
)
translation_tasks.append(task)
# Wait for all translations to complete concurrently
results = await asyncio.gather(*translation_tasks)
for i, translation in enumerate(results):
lang = target_langs[i]
translations[lang] = translation
log.info(f"Translated to {lang}: {translation}")
# Generate voices
# Generate voices concurrently
ep_group.current_state = AnnouncementStates.GENERATING_VOICE
endpoints_db.update_group(ep_group.id, ep_group)
# Prepare synthesis jobs
audio = {}
# Convert each translation to audio
for lang, text in translations.items():
# Get the appropriate language configuration
# Define a helper function for voice synthesis
async def synthesize_text(lang, text):
trans_conf = getattr(ep_group.translator_config, lang)
# Since synthesize is not async, we'll run it in a thread pool
loop = asyncio.get_event_loop()
audio_data = await loop.run_in_executor(
None,
lambda: text_to_speech.synthesize(
text,
ep_group.sampling_rate_hz,
trans_conf.tts_system,
trans_conf.tts_model,
return_lc3=True
).decode('latin-1')
)
return lang, audio_data
# Create tasks for voice synthesis
synthesis_tasks = [
synthesize_text(lang, text)
for lang, text in translations.items()
]
# Wait for all synthesis tasks to complete
synthesis_results = await asyncio.gather(*synthesis_tasks)
# Build audio dictionary from results
for lang, audio_data in synthesis_results:
audio[lang] = audio_data
# Convert text to LC3-encoded audio using the configuration's TTS settings
audio[lang] = text_to_speech.synthesize(
translations[lang],
ep_group.sampling_rate_hz, # Sample rate from auracast config # TODO: take sampling rate from auracast config
trans_conf.tts_system, # TTS system from config
trans_conf.tts_model, # TTS model from config
return_lc3=True
).decode('latin-1')
# Add to audio data dictionary (decode bytes to string for JSON serialization)
# make sure init finished before broadcasting
await asyncio.gather(*init_tasks)
# Start the monitoring coroutine to wait for streaming to complete
# This will set the state to COMPLETED when finished
asyncio.create_task(monitor_streaming_completion(ep_group))
# Broadcast to all endpoints in group
# Broadcast to all endpoints in group concurrently
broadcast_tasks = []
for endpoint in ep_group.endpoints:
# Send the audio data to the server using the existing send_audio function
log.info(f"Broadcastcasting to {endpoint.name} for languages: {', '.join(audio.keys())}")
await multicast_client.send_audio(audio, base_url=endpoint.url)
log.info(f"Broadcasting to {endpoint.name} for languages: {', '.join(audio.keys())}")
task = multicast_client.send_audio(audio, base_url=endpoint.url)
broadcast_tasks.append(task)
# Wait for all broadcasts to complete
await asyncio.gather(*broadcast_tasks)
# Return the translations
return {"translations": translations}