do not save announcements to files anymore

This commit is contained in:
2025-03-04 13:44:38 +01:00
parent 1b48ade5d4
commit fc15604b8e
7 changed files with 133 additions and 39 deletions

3
.vscode/launch.json vendored
View File

@@ -10,7 +10,8 @@
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
"console": "integratedTerminal",
"justMyCode": true
}
]
}

5
.vscode/tasks.json vendored
View File

@@ -13,6 +13,11 @@
"label": "pip install -e bumble",
"type": "shell",
"command": "./venv/bin/python -m pip install -e ../bumble --config-settings editable_mode=compat"
},
{
"label": "pip install -e auracast",
"type": "shell",
"command": "./venv/bin/python -m pip install -e ../bumble-auracast --config-settings editable_mode=compat"
}
]
}

View File

@@ -7,8 +7,6 @@ FRAME_DUR_MS = 10
SAMPLING_RATE_HZ = int(16e3)
BITRATE_BPS = int(32e3)
def mk_filename_lc3(lang=''):
return f"{ANNOUNCEMENT_DIR}/announcement_{lang}_{SAMPLING_RATE_HZ//1000}_{FRAME_DUR_MS}_{BITRATE_BPS//1000}.lc3"
LANG_CONFIG = {
"de": {

View File

@@ -32,34 +32,34 @@ def syntesize_resample(text, tts_model, file_wav, file_wav_resamp):
return audio_dur
def translate_from_german(text_de, model):
async def announcement_from_german_text(
caster: multicast_control.Multicaster,
text_de
):
TRANSLATOR_LLM = 'llama3.2:3b-instruct-q4_0'
config = copy(LANG_CONFIG)
base_lang = "de"
file = config[base_lang]["filepath_wav"]
file_resamp = config[base_lang]['filepath_wav_resamp']
tts_json = {}
for key, val in config.items():
for i, d in enumerate(config.items()):
key, val = d
if key == base_lang:
text = text_de
else:
text = llm_translator.translate_de_to_x(text_de, key, model=model)
text = llm_translator.translate_de_to_x(text_de, key, model=TRANSLATOR_LLM)
log.info('%s', text)
file = val['filepath_wav']
file_resamp = val['filepath_wav_resamp']
tts_json[key] = syntesize_resample(text, val['tts'], file, file_resamp)
return tts_json
async def announcement_from_german_text(caster:multicast_control.Multicaster, text_de):
tts_json = translate_from_german(text_de, model = 'llama3.2:3b-instruct-q4_0')
lc3_audio = text_to_speech.synthesize(
text,
SAMPLING_RATE_HZ,
'piper',
val['tts'],
return_lc3=True
)
caster.big_conf[i].audio_source = lc3_audio
start = time.time()
await caster.init_audio()
caster.start_streaming()
log.info("Starting all broadcasts took %s s", round(time.time() - start, 3))
@@ -74,7 +74,7 @@ async def command_line_ui(caster: multicast_control.Multicaster):
prompt += "\n".join([f"{i}: {s}" for i,s in enumerate(sentence_list)])
prompt += "\n"
command = await aioconsole.ainput(prompt)
if command.strip().lower() == "quit":
print("👋 Exiting...")
if caster.device:
@@ -103,25 +103,22 @@ async def main():
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = [
big_conf = [
auracast_config.broadcast_de,
auracast_config.broadcast_en,
auracast_config.broadcast_fr,
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]
files = [v['filepath_wav_resamp'] for v in LANG_CONFIG.values()]
for i, conf in enumerate(big_conf):
conf.loop_wav = False
conf.audio_source = f'file:{files[i]}'
conf.loop = False
caster = multicast_control.Multicaster(global_conf, big_conf)
await caster.init_broadcast()
#await announcement_from_german_text(caster, test_content.TESTSENTENCE_DE_HELLO)
await command_line_ui(caster)
#await announcement_from_german_text(caster, test_content.TESTSENTENCE.DE_HELLO)
#await asyncio.wait([caster.streamer.task])
await command_line_ui(caster)
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -0,0 +1,32 @@
import numpy as np
import lc3
def encode(
audio: np.array,
output_sample_rate_hz,
octets_per_frame,
frame_duration_us=10000,
pcm_bit_depth = 16
):
encoder = lc3.Encoder(
frame_duration_us=frame_duration_us,
sample_rate_hz=output_sample_rate_hz,
num_channels=1,
#input_sample_rate_hz=input_sample_rate,
)
lc3_frame_samples = encoder.get_frame_samples() # number of the pcm samples per lc3 frame
# reshape array into slices of lc3_frame_samples and padd with zeros
pad_width = (lc3_frame_samples - len(audio) % lc3_frame_samples) % lc3_frame_samples # Compute padding length
arr_padded = np.pad(audio, (0, pad_width), mode='constant', constant_values=0)
reshaped_arr = arr_padded.reshape(-1, lc3_frame_samples)
lc3_bytes = b''
for pcm_frame in reshaped_arr:
lc3_bytes += encoder.encode(
pcm_frame, num_bytes=octets_per_frame, bit_depth=pcm_bit_depth
)
return lc3_bytes

View File

@@ -3,40 +3,80 @@ import subprocess
import time
import json
import logging as log
import numpy as np
from multilang_translator import config
from multilang_translator.utils.resample import resample_array
from multilang_translator.text_to_speech import encode_lc3
TTS_DIR = os.path.join(os.path.dirname(__file__))
def synthesize(text, model="en_US-lessac-medium", output_file="out.wav"):
def synth_piper(text, model="en_US-lessac-medium",):
pwd = os.getcwd()
os.chdir(TTS_DIR)
start = time.time()
ret = subprocess.run( # TODO: wrap this whole thing in a class and open a permanent instance of the model
[config.PIPER_EXE_PATH, '--model', model, '--output_file', output_file],
ret = subprocess.run( # TODO: wrap this whole thing in a class and open a permanent pipe to the model
[config.PIPER_EXE_PATH,
'--cuda',
'--model', model,
'--output-raw'],
input=text.encode('utf-8'),
capture_output=True
)
log.info('%s', ret.stdout)
log.info('%s', ret.stderr)
log.warning('Piper stderr:\n%s', ret.stderr)
assert ret.returncode == 0, 'Piper returncode was not 0.'
audio = ret.stdout
log.info("Running piper for model %s took %s s", model, round(time.time() - start, 3))
with open (f'{model}.onnx.json') as f: # TODO: wrap everything into a class, store the json permanentl
with open (f'{model}.onnx.json') as f: # TODO: wrap everything into a class, store the json permanently
model_json = json.load(f)
os.chdir(pwd)
return model_json
return model_json, audio
# TODO: framework should probably be a dataclass that holds all the relevant informations, also model
# TODO: make a common repo that hold the configuration dataclasses ?
def synthesize(text, target_sample_rate, framework, model="en_US-lessac-medium", return_lc3=True):
if framework == 'piper':
model_json, audio_raw = synth_piper(text, model)
tts_sample_rate = model_json['audio']['sample_rate']
audio_np = np.frombuffer(audio_raw, dtype=np.dtype('<i2')).astype(np.float32) /(2**15-1)# convert to float fraction
audio = resample_array(audio_np, tts_sample_rate, target_sample_rate)
elif framework == 'koro':
pass
elif framework == 'xtts':
pass
elif framework == 'zonos':
pass
else: raise NotImplementedError('unknown framework')
if return_lc3:
audio_pcm = (audio_np * 2**15-1).astype(np.int16)
lc3 = encode_lc3.encode(audio_pcm, target_sample_rate, 40) # TODO: octetts per frame should be parameter
return lc3
else:
return audio
if __name__ == '__main__':
import logging
import soundfile as sf
logging.basicConfig(
level=logging.INFO,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
target_rate=16000
synthesize('Hello World')
audio = synthesize('Hello World', target_rate, 'piper', model= 'de_DE-kerstin-low', encode_lc3=False)
sf.write('hello.wav', audio, target_rate)
print('Done.')

View File

@@ -23,5 +23,26 @@ def resample_file(filename, out_filename, target_rate=int(24e3)):
log.info("Resampling of %s took %s s", os.path.basename(filename), round(time.time() - start, 3))
def resample_array(audio, rate, target_rate=int(24e3)):
start=time.time()
# Load the original audio file
if rate == target_rate: # Nothing to do
log.info('audio already at target rate, skipping resample')
return audio
# Convert the sample rate to target rate
resampled_audio = librosa.resample(audio, orig_sr=rate, target_sr=target_rate)
# Save the resampled audio as a new .wav file
log.info("Resampling took %s s", round(time.time() - start, 3))
return resampled_audio
if __name__ == "__main__":
resample_file('text_to_speech/welcome.wav', 'text_to_speech/welcome_resampled.wav')
import os
os.chdir(os.path.dirname(__file__))
file_dir = '../text_to_speech/'
resample_file(f'{file_dir}/welcome.wav', f'{file_dir}/welcome_resampled.wav')