Initial check in of Python training code

This commit is contained in:
Michael Hansen
2022-11-11 11:01:59 -06:00
parent 344b483904
commit a6b2d2e69c
46 changed files with 27024 additions and 3 deletions

View File

@@ -0,0 +1,92 @@
from hashlib import sha256
from pathlib import Path
from typing import Optional, Tuple, Union
import librosa
import torch
from larynx_train.vits.mel_processing import spectrogram_torch
from .trim import trim_silence
from .vad import SileroVoiceActivityDetector
_DIR = Path(__file__).parent
def make_silence_detector() -> SileroVoiceActivityDetector:
silence_model = _DIR / "models" / "silero_vad.onnx"
return SileroVoiceActivityDetector(silence_model)
def cache_norm_audio(
audio_path: Union[str, Path],
cache_dir: Union[str, Path],
detector: SileroVoiceActivityDetector,
sample_rate: int,
silence_threshold: float = 0.2,
silence_samples_per_chunk: int = 480,
silence_keep_chunks_before: int = 2,
silence_keep_chunks_after: int = 2,
filter_length: int = 1024,
window_length: int = 1024,
hop_length: int = 256,
ignore_cache: bool = False,
) -> Tuple[Path, Path]:
audio_path = Path(audio_path).absolute()
cache_dir = Path(cache_dir)
# Cache id is the SHA256 of the full audio path
audio_cache_id = sha256(str(audio_path).encode()).hexdigest()
audio_norm_path = cache_dir / f"{audio_cache_id}.pt"
audio_spec_path = cache_dir / f"{audio_cache_id}.spec.pt"
# Normalize audio
audio_norm_tensor: Optional[torch.FloatTensor] = None
if ignore_cache or (not audio_norm_path.exists()):
# Trim silence first.
#
# The VAD model works on 16khz, so we determine the portion of audio
# to keep and then just load that with librosa.
vad_sample_rate = 16000
audio_16khz, _sr = librosa.load(path=audio_path, sr=vad_sample_rate)
offset_sec, duration_sec = trim_silence(
audio_16khz,
detector,
threshold=silence_threshold,
samples_per_chunk=silence_samples_per_chunk,
sample_rate=vad_sample_rate,
keep_chunks_before=silence_keep_chunks_before,
keep_chunks_after=silence_keep_chunks_after,
)
# NOTE: audio is already in [-1, 1] coming from librosa
audio_norm_array, _sr = librosa.load(
path=audio_path,
sr=sample_rate,
offset=offset_sec,
duration=duration_sec,
)
# Save to cache directory
audio_norm_tensor = torch.FloatTensor(audio_norm_array).unsqueeze(0)
torch.save(audio_norm_tensor, audio_norm_path)
# Compute spectrogram
if ignore_cache or (not audio_spec_path.exists()):
if audio_norm_tensor is None:
# Load pre-cached normalized audio
audio_norm_tensor = torch.load(audio_norm_path)
audio_spec_tensor = spectrogram_torch(
y=audio_norm_tensor,
n_fft=filter_length,
sampling_rate=sample_rate,
hop_size=hop_length,
win_size=window_length,
center=False,
).squeeze(0)
torch.save(audio_spec_tensor, audio_spec_path)
return audio_norm_path, audio_spec_path

View File

@@ -0,0 +1,54 @@
from typing import Optional, Tuple
import numpy as np
from .vad import SileroVoiceActivityDetector
def trim_silence(
audio_array: np.ndarray,
detector: SileroVoiceActivityDetector,
threshold: float = 0.2,
samples_per_chunk=480,
sample_rate=16000,
keep_chunks_before: int = 2,
keep_chunks_after: int = 2,
) -> Tuple[float, Optional[float]]:
"""Returns the offset/duration of trimmed audio in seconds"""
offset_sec: float = 0.0
duration_sec: Optional[float] = None
first_chunk: Optional[int] = None
last_chunk: Optional[int] = None
seconds_per_chunk: float = samples_per_chunk / sample_rate
chunk = audio_array[:samples_per_chunk]
audio_array = audio_array[samples_per_chunk:]
chunk_idx: int = 0
# Determine main block of speech
while len(audio_array) > 0:
prob = detector(chunk, sample_rate=sample_rate)
is_speech = prob >= threshold
if is_speech:
if first_chunk is None:
# First speech
first_chunk = chunk_idx
else:
# Last speech so far
last_chunk = chunk_idx
chunk = audio_array[:samples_per_chunk]
audio_array = audio_array[samples_per_chunk:]
chunk_idx += 1
if (first_chunk is not None) and (last_chunk is not None):
first_chunk = max(0, first_chunk - keep_chunks_before)
last_chunk = min(chunk_idx, last_chunk + keep_chunks_after)
# Compute offset/duration
offset_sec = first_chunk * seconds_per_chunk
last_sec = (last_chunk + 1) * seconds_per_chunk
duration_sec = last_sec - offset_sec
return offset_sec, duration_sec

View File

@@ -0,0 +1,54 @@
import typing
from pathlib import Path
import numpy as np
import onnxruntime
class SileroVoiceActivityDetector:
"""Detects speech/silence using Silero VAD.
https://github.com/snakers4/silero-vad
"""
def __init__(self, onnx_path: typing.Union[str, Path]):
onnx_path = str(onnx_path)
self.session = onnxruntime.InferenceSession(onnx_path)
self.session.intra_op_num_threads = 1
self.session.inter_op_num_threads = 1
self._h = np.zeros((2, 1, 64)).astype("float32")
self._c = np.zeros((2, 1, 64)).astype("float32")
def __call__(self, audio_array: np.ndarray, sample_rate: int = 16000):
"""Return probability of speech in audio [0-1].
Audio must be 16Khz 16-bit mono PCM.
"""
if len(audio_array.shape) == 1:
# Add batch dimension
audio_array = np.expand_dims(audio_array, 0)
if len(audio_array.shape) > 2:
raise ValueError(
f"Too many dimensions for input audio chunk {audio_array.shape}"
)
if audio_array.shape[0] > 1:
raise ValueError("Onnx model does not support batching")
if sample_rate != 16000:
raise ValueError("Only 16Khz audio is supported")
ort_inputs = {
"input": audio_array.astype(np.float32),
"h0": self._h,
"c0": self._c,
}
ort_outs = self.session.run(None, ort_inputs)
out, self._h, self._c = ort_outs
out = out.squeeze(2)[:, 1] # make output type match JIT analog
return out