make it work with bumble auracast backend

This commit is contained in:
2025-02-25 11:53:49 +01:00
parent 18ecedfe45
commit 00895f52b2
6 changed files with 91 additions and 63 deletions
+13 -6
View File
@@ -3,28 +3,35 @@ import os
ANNOUNCEMENT_DIR = os.path.join(os.path.dirname(__file__), 'announcements')
VENV_DIR = os.path.join(os.path.dirname(__file__), '../venv')
PIPER_EXE_PATH = f'{VENV_DIR}/bin/piper'
SAMPLING_RATE_HZ = int(16e3)
FRAME_DUR_MS = 10
SAMPLING_RATE_HZ = int(16e3)
BITRATE_BPS = int(32e3)
def mk_filename_lc3(lang=''):
return f"{ANNOUNCEMENT_DIR}/announcement_{lang}_{SAMPLING_RATE_HZ//1000}_{FRAME_DUR_MS}_{BITRATE_BPS//1000}.lc3"
LANG_CONFIG = {
"de": {
"file": f"{ANNOUNCEMENT_DIR}/announcement_{SAMPLING_RATE_HZ//1000}_{FRAME_DUR_MS}_{BITRATE_BPS//1000}_de",
"filepath_wav": f"{ANNOUNCEMENT_DIR}/announcement_de.wav",
"filepath_wav_resamp": f"{ANNOUNCEMENT_DIR}/announcement_de_resamp.wav",
"tts": 'de_DE-kerstin-low',
},
"en": {
"file": f"{ANNOUNCEMENT_DIR}/announcement_{SAMPLING_RATE_HZ//1000}_{FRAME_DUR_MS}_{BITRATE_BPS//1000}_en",
"filepath_wav": f"{ANNOUNCEMENT_DIR}/announcement_en.wav",
"filepath_wav_resamp": f"{ANNOUNCEMENT_DIR}/announcement_en_resamp.wav",
"tts": 'en_US-lessac-medium'
},
"fr": {
"file": f"{ANNOUNCEMENT_DIR}/announcement_{SAMPLING_RATE_HZ//1000}_{FRAME_DUR_MS}_{BITRATE_BPS//1000}_fr",
"filepath_wav": f"{ANNOUNCEMENT_DIR}/announcement_fr.wav",
"filepath_wav_resamp": f"{ANNOUNCEMENT_DIR}/announcement_fr_resamp.wav",
"tts": 'fr_FR-siwis-medium'
},
# "es": {
# "file": f"{ANNOUNCEMENT_DIR}/announcement_{SAMPLING_RATE_HZ//1000}_{FRAME_DUR_MS}_{BITRATE_BPS//1000}_es",
# "filepath_wav": f"{ANNOUNCEMENT_DIR}/announcement_es.wav",
# "tts": 'es_ES-sharvard-medium'
# },
# "it": {
# "file": f"{ANNOUNCEMENT_DIR}/announcement_{SAMPLING_RATE_HZ//1000}_{FRAME_DUR_MS}_{BITRATE_BPS//1000}_it",
# "filepath_wav": f"{ANNOUNCEMENT_DIR}/announcement_it.wav",
# "tts": 'it_IT-paola-medium'
# }
}
+41 -34
View File
@@ -8,6 +8,7 @@ import asyncio
from copy import copy
import time
import logging as log
from utils import resample
from translator import llm_translator, test_content
from text_to_speech import text_to_speech
from encode import encode_lc3
@@ -15,75 +16,79 @@ from auracast import multicast_control
from auracast import auracast_config
from config import LANG_CONFIG, BITRATE_BPS, SAMPLING_RATE_HZ, FRAME_DUR_MS
# TODO: look for a end to end translation solution
def transcribe():
pass
pass # TODO: Implement transcribing input audio e.g. with whisper
def syntesize(text, tts_model, output_file):
audio_dur = text_to_speech.synthesize(text, tts_model, output_file)
#resample.resample(output_file, output_file, target_rate=SAMPLING_RATE_HZ)
#encode_lc3.encode_lc3(output_file, bps=BITRATE_BPS, frame_dur_ms=FRAME_DUR_MS)
def syntesize_resample(text, tts_model, file_wav, file_wav_resamp):
audio_dur = text_to_speech.synthesize(text, tts_model, file_wav)
resample.resample_file(file_wav, file_wav_resamp, target_rate=SAMPLING_RATE_HZ)
return audio_dur
def translate_from_german(text_de):
config = copy(LANG_CONFIG)
base_lang = "de"
file = config[base_lang]["file"]
audio_dur_s = {}
audio_dur_s [base_lang] = syntesize(text_de, config['de']["tts"], f'{file}.wav')
del config[base_lang]
file = config[base_lang]["filepath_wav"]
file_resamp = config[base_lang]['filepath_wav_resamp']
tts_json = {}
#tts_json[base_lang] = syntesize_resample(text_de, config['de']["tts"], file, file_resamp)
# delete source language since no translation is needed for it
#del config[base_lang]
for key, val in config.items():
text = llm_translator.translate_de_to_x(text_de, key)
file = val['file']
audio_dur_s[key] = syntesize(text, val['tts'], f'{file}.wav')
return audio_dur_s
if key == base_lang:
text = text_de
else:
text = llm_translator.translate_de_to_x(text_de, key)
file = val['filepath_wav']
file_resamp = val['filepath_wav_resamp']
tts_json[key] = syntesize_resample(text, val['tts'], file, file_resamp)
return tts_json
async def announcement_from_german_text(caster:multicast_control.Multicaster, text_de):
translate_from_german(text_de)
# Transfer the files to broadcaster memory
tts_json = translate_from_german(text_de)
start = time.time()
await caster.init_audio()
await caster.start_streaming()
#for val in LANG_CONFIG.values():
# copy_to_broadcaster(f'{val["file"]}.lc3')
rates = [d['audio']['sample_rate'] for d in tts_json.values()]
#for i, big in enumerate(caster.big_conf):
# big.input_format = f'int16le,{rates[i]},1'
#log.info("Transfering files to broadcaster took %s s", round(time.time() - start, 3))
#time.sleep(2)
# Instruct the broadcaster to stream the files
# for i, d in enumerate(list(LANG_CONFIG.items())):
# key, val = d
# broadcaster_play_file(i, f'{os.path.basename(val["file"])}.lc3')
# time.sleep(audio_durs[key])
await caster.init_audio()
caster.start_streaming()
log.info("Starting all broadcasts took %s s", round(time.time() - start, 3))
async def main():
log.basicConfig(
level=log.DEBUG,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
global_conf = auracast_config.global_base_config
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = [ # TODO: integrate this in the LANG_CONFIG dict
big_conf = [ # TODO: integrate this in the LANG_CONFIG dict, better: make a hirachry of dataclasses
auracast_config.broadcast_de,
auracast_config.broadcast_en,
auracast_config.broadcast_fr,
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]
files = [v['file'] for v in LANG_CONFIG.values()]
files = [v['filepath_wav_resamp'] for v in LANG_CONFIG.values()]
for i, conf in enumerate(big_conf):
conf.loop_wav = False
conf.audio_source = f'file:{files[i]}'
conf.input_format = 'int16le,48000,1' # TODO: Use actual samplint rate from piper
caster = multicast_control.Multicaster(global_conf, big_conf)
await caster.init_broadcast()
@@ -92,5 +97,7 @@ async def main():
await announcement_from_german_text(caster, test_content.TESTSENTENCE_DE_HELLO)
await asyncio.wait([caster.streamer.task])
if __name__ == '__main__':
asyncio.run(main())
@@ -1,18 +0,0 @@
# resample .wave from 22.05 to 24kHz sampling rate
import librosa
import soundfile as sf
def resample(filename, out_filename, target_rate=int(24e3)):
# Load the original audio file
audio, rate = librosa.load(filename)
# Convert the sample rate to 24 kHz
resampled_audio = librosa.resample(audio, orig_sr=rate, target_sr=target_rate)
# Save the resampled audio as a new .wav file
sf.write(out_filename, resampled_audio, target_rate)
if __name__ == "__main__":
resample('text_to_speech/welcome.wav', 'text_to_speech/welcome_resampled.wav')
@@ -3,7 +3,6 @@ import subprocess
import time
import json
import logging as log
import wave
from multilang_translator import config
TTS_DIR = os.path.join(os.path.dirname(__file__))
@@ -23,15 +22,21 @@ def synthesize(text, model="en_US-lessac-medium", output_file="out.wav"):
log.info('%s', ret.stderr)
assert ret.returncode == 0, 'Piper returncode was not 0.'
os.chdir(pwd)
log.info("Running piper took %s s", round(time.time() - start, 3))
log.info("Running piper for model %s took %s s", model, round(time.time() - start, 3))
with open (f'{model}.onnx.json') as f: # TODO: wrap everything into a class, store the json permanentl
model_json = json.load(f)
os.chdir(pwd)
return model_json
if __name__ == '__main__':
import logging
logging.basicConfig(
level=logging.INFO,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
synthesize('Hello World')
+27
View File
@@ -0,0 +1,27 @@
# resample .wav source to target sampling rate
import logging as log
import time
import os
import librosa
import soundfile as sf
def resample_file(filename, out_filename, target_rate=int(24e3)):
start=time.time()
# Load the original audio file
audio, rate = librosa.load(filename)
if rate == target_rate: # Nothing to do
sf.write(out_filename, audio, target_rate)
return
# Convert the sample rate to 24 kHz
resampled_audio = librosa.resample(audio, orig_sr=rate, target_sr=target_rate)
# Save the resampled audio as a new .wav file
sf.write(out_filename, resampled_audio, target_rate)
log.info("Resampling of %s took %s s", os.path.basename(filename), round(time.time() - start, 3))
if __name__ == "__main__":
resample_file('text_to_speech/welcome.wav', 'text_to_speech/welcome_resampled.wav')