restructure the project

This commit is contained in:
2025-03-06 08:46:03 +01:00
parent 7fa677d865
commit f14902c6e7
15 changed files with 22 additions and 20 deletions

View File

View File

@@ -0,0 +1,20 @@
import subprocess
import logging as log
def encode_lc3(file, frame_dur_ms=10, bps=48000):
file = file.replace('.wav', '')
cmd = ['elc3', '-m', f'{frame_dur_ms}' , '-b', f'{bps}', f'{file}.wav', f'{file}.lc3']
log.info("Executing: %s", " ".join(cmd))
ret = subprocess.run(cmd, check=True)
return ret.returncode, ret.stdout, ret.stderr
if __name__ == '__main__':
import os
os.chdir(os.path.dirname(__file__))
r, stdout, stderr = encode_lc3('welcome_resampled.wav')
print(r)
print(stdout)
print(stderr)

View File

@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
"""
list prompt example
"""
from __future__ import print_function, unicode_literals
from typing import List
from dataclasses import asdict
import asyncio
import time
import logging as log
import aioconsole
from auracast import multicast_control
from auracast import auracast_config
import multilang_translator.translator_config as translator_config
from translator import llm_translator
from translator.test_content import TESTSENTENCE
from voice_provider import text_to_speech
# TODO: look for a end to end translation solution
def transcribe():
pass # TODO: Implement transcribing input audio e.g. with whisper
async def announcement_from_german_text(
global_config: auracast_config.AuracastGlobalConfig,
translator_config: List[translator_config.TranslatorConfigDe],
caster: multicast_control.Multicaster,
text_de
):
base_lang = "deu"
for i, trans in enumerate(translator_config):
if trans.big.language == base_lang:
text = text_de
else:
text = llm_translator.translate_de_to_x(
text_de,
trans.big.language,
model=trans.translator_llm,
client = trans.llm_client,
host=trans.llm_host_url,
token=trans.llm_host_token
)
log.info('%s', text)
lc3_audio = text_to_speech.synthesize(
text,
global_config.auracast_sampling_rate_hz,
trans.tts_system,
trans.tts_model,
return_lc3=True
)
caster.big_conf[i].audio_source = lc3_audio
start = time.time()
caster.start_streaming()
log.info("Starting all broadcasts took %s s", round(time.time() - start, 3))
async def command_line_ui(global_conf, translator_conf, caster: multicast_control.Multicaster):
while True:
# make a list of all available testsentence
sentence_list = list(asdict(TESTSENTENCE).values())
prompt = "Enter your Announcement|quit or choose:] > \n"
prompt += "\n".join([f"{i}: {s}" for i,s in enumerate(sentence_list)])
prompt += "\n"
command = await aioconsole.ainput(prompt)
if command.strip().lower() == "quit":
print("👋 Exiting...")
if caster.device:
caster.stop_streaming()
await caster.shutdown()
break # Exit loop
elif command.strip() == '':
print('Nothing to Announce')
# Check if command is a single number
elif command.strip().isdigit():
ind = int(command.strip())
await announcement_from_german_text(
global_conf,
translator_conf,
caster,
sentence_list[ind])
await asyncio.wait([caster.streamer.task])
# Interpret the command as announcement
else:
await announcement_from_german_text(caster, command)
await asyncio.wait([caster.streamer.task])
async def main():
log.basicConfig(
level=log.INFO,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
global_conf = auracast_config.AuracastGlobalConfig()
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
translator_conf = [
translator_config.TranslatorConfigDe(),
translator_config.TranslatorConfigEn(),
translator_config.TranslatorConfigFr(),
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]
for conf in translator_conf:
conf.big.loop = False
conf.llm_client = 'openwebui' # comment out for local llm
conf.llm_host_url = 'https://ollama.pstruebi.xyz'
conf.llm_host_token = 'sk-17124cb84df14cc6ab2d9e17d0724d13'
caster = multicast_control.Multicaster(global_conf, [conf.big for conf in translator_conf])
await caster.init_broadcast()
# await announcement_from_german_text(
# global_conf,
# translator_conf,
# caster,
# test_content.TESTSENTENCE.DE_HELLO
# )
# await asyncio.wait([caster.streamer.task])
await command_line_ui(global_conf, translator_conf, caster)
if __name__ == '__main__':
asyncio.run(main())
# TODO: add support for multiple radios

View File

@@ -0,0 +1,91 @@
import time
import requests
import json
import logging as log
import time
import ollama
from multilang_translator.translator import syspromts
# ollama.create( # TODO: create models on startup
# model='example',
# from_='llama3.2', system="You are Mario from Super Mario Bros."
# )
async def chat():
message = {'role': 'user', 'content': 'Why is the sky blue?'}
response = await ollama.AsyncClient().chat(model='llama3.2', messages=[message])
def query_openwebui(model, system, query, url, token):
url = f'{url}/api/chat/completions'
headers = {
'Authorization': f'Bearer {token}',
}
payload = {
'model': model,
'messages': [
{'role': 'system', 'content': system},
{'role': 'user', 'content': query}
],
}
start = time.time()
response = requests.post(url, headers=headers, json=payload)
log.info("Translating the text took %s s", round(time.time() - start, 2))
return response.json()['choices'][0]['message']['content']
def query_ollama(model, system, query, host='http://localhost:11434'):
client = ollama.Client(
host=host,
)
response = client.chat(
model = model,
messages = [
{'role': 'system', 'content': system},
{'role': 'user', 'content': query}
],
)
return response.message.content
def translate_de_to_x( # TODO: use async ollama client later - implenent a translate async function
text:str,
target_language: str,
client='ollama',
model='llama3.2:3b-instruct-q4_0', # remember to use instruct models
host = None,
token = None
):
start=time.time()
s = getattr(syspromts, f"TRANSLATOR_DEU_{target_language.upper()}")
if client == 'ollama':
response = query_ollama(model, s, text, host=host)
elif client == 'openwebui':
response = query_openwebui(model, s, text, url=host, token=token)
else: raise NotImplementedError('llm client not implemented')
log.info('Running the translator to %s took %s s', target_language, round(time.time() - start, 3))
return response
if __name__ == "__main__":
import time
from multilang_translator.translator import test_content
start=time.time()
response = translate_de_to_x('Der Zug ist da.', target_language='en', model='llama3.2:1b-instruct-q4_0')
print("Query took", time.time() - start)
print(response)
start=time.time()
response = translate_de_to_x(test_content.TESTSENTENCE_DE_RAINBOW, target_language='en')
print("query took", time.time() - start)
print(response)
start=time.time()
response = translate_de_to_x(test_content.TESTSENTENCE_DE_RAINBOW, target_language='fr')
print("query took", time.time() - start)
print(response)

View File

@@ -0,0 +1,6 @@
# TODO: make this more elegant. this can probably be generated and the base lang be assumed by the llm?
TRANSLATOR_DEU_ENG = 'Du bist ein Übersetzer. Übersetze die folgende Satz aus dem Deutschen ins Englische. Antworte nur mit der übersetzten Satz.\n'
TRANSLATOR_DEU_FRA = 'Du bist ein Übersetzer. Übersetze die folgende Satz aus dem Deutschen ins Französische. Antworte nur mit der übersetzten Satz.\n'
TRANSLATOR_DEU_SPA = 'Du bist ein Übersetzer. Übersetze die folgende Satz aus dem Deutschen ins Spanische. Antworte nur mit der übersetzten Satz.\n'
TRANSLATOR_DEU_ITA = 'Du bist ein Übersetzer. Übersetze die folgende Satz aus dem Deutschen ins Italienische. Antworte nur mit der übersetzten Satz.\n'

View File

@@ -0,0 +1,10 @@
from dataclasses import dataclass, fields, asdict
@dataclass
class TestContent:
DE_HELLO: str = 'Hallo Welt.'
DE_GATE_OPENED: str = "Gate 23 ist jetzt geöffnet."
DE_TRAIN_ARRIVING: str = "Der Zug Nach Wien fährt heute von Gleis 3."
DE_SECURITY_CHECKPOINT_OPENING: str = "Sicherheitskontrolle 5 ist jetzt geöffnet. Bitte setzen Sie sich in Bewegung, um Ihre Wartezeit während Sicherheitsüberprüfungen zu minimieren."
DE_RAINBOW: str = 'Der Regenbogen ist ein atmosphärisch-optisches Phänomen, das als kreisbogenförmiges farbiges Lichtband in einer von der Sonne beschienenen Wolke oder Regenwand wahrgenommen wird und ein großes Farbspektrum anzeigt.'
TESTSENTENCE = TestContent()

View File

@@ -0,0 +1,35 @@
import os
from pydantic import BaseModel
from auracast import auracast_config
VENV_DIR = os.path.join(os.path.dirname(__file__), './../../venv')
class TranslatorBaseconfig(BaseModel):
big: auracast_config.AuracastBigConfig = auracast_config.AuracastBigConfigDe()
translator_llm: str = 'llama3.2:3b-instruct-q4_0'
llm_client: str = 'ollama'
llm_host_url: str | None = 'http://localhost:11434'
llm_host_token: str | None = None
tts_system: str = 'piper'
tts_model: str ='de_DE-kerstin-low'
class TranslatorConfigDe(TranslatorBaseconfig):
big: auracast_config.AuracastBigConfig = auracast_config.AuracastBigConfigDe()
tts_model: str ='de_DE-thorsten-high'
class TranslatorConfigEn(TranslatorBaseconfig):
big: auracast_config.AuracastBigConfig = auracast_config.AuracastBigConfigEn()
tts_model: str = 'en_GB-alba-medium'
class TranslatorConfigFr(TranslatorBaseconfig):
big: auracast_config.AuracastBigConfig = auracast_config.AuracastBigConfigFr()
tts_model: str = 'fr_FR-siwis-medium'
class TranslatorConfigEs(TranslatorBaseconfig):
big: auracast_config.AuracastBigConfig = auracast_config.AuracastBigConfigEs()
tts_model: str = 'es_ES-sharvard-medium'
class TranslatorConfigIt(TranslatorBaseconfig):
big: auracast_config.AuracastBigConfig = auracast_config.AuracastBigConfigIt()
tts_model: str = 'it_IT-paola-medium'

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
SCRIPT_DIR=$(dirname "$(readlink -f "$BASH_SOURCE")")
START_DIR=$(pwd)
cd $SCRIPT_DIR
echo 'Welcome to the world of speech synthesis!' | piper \
--model en_US-lessac-medium \
--output_file $SCRIPT_DIR/welcome.wav \
cd $START_DIR

View File

@@ -0,0 +1,89 @@
import os
import shutil
import subprocess
import time
import json
import logging as log
import numpy as np
from voice_provider.utils.resample import resample_array
from voice_provider.utils.encode_lc3 import encode_lc3
PIPER_EXE = shutil.which('piper')
TTS_DIR = os.path.join(os.path.dirname(__file__))
PIPER_WORKDIR = f'{TTS_DIR}/piper'
if not PIPER_EXE:
PIPER_EXE = f'{TTS_DIR}/../../venv/bin/piper'
def synth_piper(text, model="en_US-lessac-medium"):
pwd = os.getcwd()
os.chdir(PIPER_WORKDIR)
start = time.time()
# make sure piper has voices.json in working directory, otherwise it attempts to always load models
ret = subprocess.run( # TODO: wrap this whole thing in a class and open a permanent pipe to the model
[
PIPER_EXE,
'--cuda',
'--model', model,
'--output-raw'
],
input=text.encode('utf-8'),
capture_output=True
)
os.chdir(pwd)
log.warning('Piper stderr:\n%s', ret.stderr)
assert ret.returncode == 0, 'Piper returncode was not 0.'
audio = ret.stdout
log.info("Running piper for model %s took %s s", model, round(time.time() - start, 3))
with open (f'{PIPER_WORKDIR}/{model}.onnx.json') as f: # TODO: wrap everyth0ing into a class, store the json permanently
model_json = json.load(f)
return model_json, audio
# TODO: framework should probably be a dataclass that holds all the relevant informations, also model
def synthesize(text, target_sample_rate, framework, model="en_US-lessac-medium", return_lc3=True):
if framework == 'piper':
model_json, audio_raw = synth_piper(text, model)
tts_sample_rate = model_json['audio']['sample_rate']
audio_np = np.frombuffer(audio_raw, dtype=np.dtype('<i2')).astype(np.float32) /(2**15-1)# convert to float fraction
audio = resample_array(audio_np, tts_sample_rate, target_sample_rate)
elif framework == 'koro':
pass
elif framework == 'xtts':
pass
elif framework == 'zonos':
pass
else: raise NotImplementedError('unknown framework')
if return_lc3:
audio_pcm = (audio * 2**15-1).astype(np.int16)
lc3 = encode_lc3(audio_pcm, target_sample_rate, 40) # TODO: octetts per frame should be parameter
return lc3
else:
return audio
if __name__ == '__main__':
import logging
import soundfile as sf
logging.basicConfig(
level=logging.INFO,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
target_rate=16000
audio = synthesize('Hello World', target_rate, 'piper', model= 'de_DE-kerstin-low', return_lc3=False)
sf.write('hello.wav', audio, target_rate)
# TODO: "WARNING:piper.download:Wrong size (expected=5952, actual=4158
print('Done.')

View File

View File

@@ -0,0 +1,32 @@
import numpy as np
import lc3
def encode_lc3(
audio: np.array,
output_sample_rate_hz,
octets_per_frame,
frame_duration_us=10000,
pcm_bit_depth = 16
):
encoder = lc3.Encoder(
frame_duration_us=frame_duration_us,
sample_rate_hz=output_sample_rate_hz,
num_channels=1,
#input_sample_rate_hz=input_sample_rate,
)
lc3_frame_samples = encoder.get_frame_samples() # number of the pcm samples per lc3 frame
# reshape array into slices of lc3_frame_samples and padd with zeros
pad_width = (lc3_frame_samples - len(audio) % lc3_frame_samples) % lc3_frame_samples # Compute padding length
arr_padded = np.pad(audio, (0, pad_width), mode='constant', constant_values=0)
reshaped_arr = arr_padded.reshape(-1, lc3_frame_samples)
lc3_bytes = b''
for pcm_frame in reshaped_arr:
lc3_bytes += encoder.encode(
pcm_frame, num_bytes=octets_per_frame, bit_depth=pcm_bit_depth
)
return lc3_bytes

View File

@@ -0,0 +1,48 @@
# resample .wav source to target sampling rate
import logging as log
import time
import os
import librosa
import soundfile as sf
def resample_file(filename, out_filename, target_rate):
start=time.time()
# Load the original audio file
audio, rate = librosa.load(filename)
if rate == target_rate: # Nothing to do
sf.write(out_filename, audio, target_rate)
return
# Convert the sample rate to 24 kHz
resampled_audio = librosa.resample(audio, orig_sr=rate, target_sr=target_rate)
# Save the resampled audio as a new .wav file
sf.write(out_filename, resampled_audio, target_rate)
log.info("Resampling of %s took %s s", os.path.basename(filename), round(time.time() - start, 3))
def resample_array(audio, rate, target_rate):
start=time.time()
# Load the original audio file
if rate == target_rate: # Nothing to do
log.info('audio already at target rate, skipping resample')
return audio
# Convert the sample rate to target rate
resampled_audio = librosa.resample(audio, orig_sr=rate, target_sr=target_rate)
# Save the resampled audio as a new .wav file
log.info("Resampling took %s s", round(time.time() - start, 3))
return resampled_audio
if __name__ == "__main__":
import os
os.chdir(os.path.dirname(__file__))
file_dir = '../text_to_speech/'
resample_file(f'{file_dir}/welcome.wav', f'{file_dir}/welcome_resampled.wav')