From 530d58440f736c6c2cb4eb9550b2897083aac621 Mon Sep 17 00:00:00 2001 From: pober Date: Fri, 24 Apr 2026 08:39:03 +0200 Subject: [PATCH] Adds null test to artifact. --- config.yaml | 21 ++ src/audio_tests.py | 477 ++++++++++++++++++++++++++++++++++++- test_artifact_detection.py | 33 ++- 3 files changed, 516 insertions(+), 15 deletions(-) diff --git a/config.yaml b/config.yaml index 1c9f13e..f48a9df 100644 --- a/config.yaml +++ b/config.yaml @@ -38,6 +38,27 @@ artifact_detection: energy_variation: enabled: true threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes) + null_test: + enabled: true + # Align Ch2 (DUT) to Ch1 (Loopback), subtract, detect bursts in residual + max_lag_ms: 500.0 # Maximum expected delay between channels for alignment search + window_ms: 5.0 # Short-time RMS window length for burst detection + threshold_factor: 6.0 # Flag windows where residual RMS exceeds baseline × this factor + min_burst_ms: 0.5 # Minimum burst duration to report (filters out single-sample spikes) + sample_slip_detection: true + sample_slip_window_ms: 50.0 # Correlation window for xcorr-based lag tracking (fallback only) + # Sync marker: one chirp burst at the start and one at the end of the played signal. + # Marker-based alignment is immune to periodic-signal ambiguity (e.g. pure sine). + # Sample slip detection compares the lag at the start marker vs. the end marker. + marker_duration_sec: 0.05 # Length of each chirp marker burst + marker_first_offset_sec: 0.5 # Offset from signal start (and from signal end) for markers + marker_f0: 200.0 # Marker chirp start frequency (Hz) + marker_f1: 16000.0 # Marker chirp end frequency (Hz) — wider BW = sharper correlation peak + marker_amplitude: 0.7 # Marker amplitude (mixed on top of test tone) + # Sample slip threshold: only report if inter-marker lag deviates from median by >= this many samples. + # Rule of thumb: peak timing precision ≈ 1 / (marker_f1 - marker_f0) * sample_rate + # With 200-16000 Hz BW at 44100 Hz: precision ≈ 3 samples → min_slip_samples = 5 gives good margin. + min_slip_samples: 5 latency: max_std_dev_ms: 1.0 # Maximum allowed std deviation; test fails if exceeded diff --git a/src/audio_tests.py b/src/audio_tests.py index 8aaa324..73915ee 100644 --- a/src/audio_tests.py +++ b/src/audio_tests.py @@ -2,7 +2,8 @@ import time import numpy as np import sounddevice as sd from scipy import signal -from typing import Tuple, Dict, List +from scipy.io import wavfile +from typing import Tuple, Dict, List, Optional import matplotlib.pyplot as plt from pathlib import Path @@ -533,6 +534,312 @@ def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int, return artifacts +def generate_sync_marker(sample_rate: int, duration_sec: float = 0.05, + f0: float = 200.0, f1: float = 4000.0, + amplitude: float = 0.7) -> np.ndarray: + t = np.linspace(0, duration_sec, int(sample_rate * duration_sec), endpoint=False) + win = np.hanning(len(t)) + return amplitude * win * signal.chirp(t, f0, duration_sec, f1, method='linear') + + +def embed_markers(base_signal: np.ndarray, marker: np.ndarray, + sample_rate: int, interval_sec: float = 5.0, + first_offset_sec: float = 0.5) -> Tuple[np.ndarray, List[int]]: + out = base_signal.copy() + marker_len = len(marker) + positions: List[int] = [] + t = first_offset_sec + total_sec = len(base_signal) / sample_rate + while t + marker_len / sample_rate + 0.1 <= total_sec: + pos = int(t * sample_rate) + end = pos + marker_len + out[pos:end] = np.clip(out[pos:end] + marker, -1.0, 1.0) + positions.append(pos) + t += interval_sec + return out, positions + + +def find_all_marker_positions(channel: np.ndarray, marker_template: np.ndarray, + expected_positions: List[int], sample_rate: int, + search_radius_ms: float = 500.0) -> List[Optional[int]]: + corr = np.abs(signal.correlate(channel, marker_template, mode='valid')) + search_radius = int(search_radius_ms / 1000.0 * sample_rate) + found: List[Optional[int]] = [] + for expected in expected_positions: + lo = max(0, expected - search_radius) + hi = min(len(corr), expected + search_radius + 1) + if lo >= hi: + found.append(None) + continue + local_idx = int(np.argmax(corr[lo:hi])) + found.append(lo + local_idx) + return found + + +def align_by_markers(reference: np.ndarray, dut: np.ndarray, + marker_template: np.ndarray, expected_positions: List[int], + sample_rate: int, max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]: + ref_pos = find_all_marker_positions(reference, marker_template, expected_positions, + sample_rate, max_lag_ms) + dut_pos = find_all_marker_positions(dut, marker_template, expected_positions, + sample_rate, max_lag_ms) + lags = [dp - rp for rp, dp in zip(ref_pos, dut_pos) + if rp is not None and dp is not None] + if not lags: + return align_channels(reference, dut, sample_rate, max_lag_ms) + lag_samples = int(round(float(np.median(lags)))) + if lag_samples >= 0: + ref_aligned = reference[:len(reference) - lag_samples] + dut_aligned = dut[lag_samples:] + else: + ref_aligned = reference[-lag_samples:] + dut_aligned = dut[:len(dut) + lag_samples] + min_len = min(len(ref_aligned), len(dut_aligned)) + return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples + + +def detect_sample_slips_by_markers(reference: np.ndarray, dut: np.ndarray, + marker_template: np.ndarray, + expected_ref_positions: List[int], + sample_rate: int, lag_samples: int, + min_slip_samples: int = 3) -> List[Dict]: + artifacts: List[Dict] = [] + total_duration = len(reference) / sample_rate + expected_dut_positions = [p + lag_samples for p in expected_ref_positions] + ref_found = find_all_marker_positions(reference, marker_template, expected_ref_positions, + sample_rate, search_radius_ms=200.0) + dut_found = find_all_marker_positions(dut, marker_template, expected_dut_positions, + sample_rate, search_radius_ms=200.0) + interval_lags: List[Optional[int]] = [ + (dp - rp) if rp is not None and dp is not None else None + for rp, dp in zip(ref_found, dut_found) + ] + valid_lags = [v for v in interval_lags if v is not None] + baseline_lag = int(round(float(np.median(valid_lags)))) if valid_lags else 0 + for i in range(len(interval_lags)): + if interval_lags[i] is None: + continue + delta = int(interval_lags[i]) - baseline_lag + if abs(delta) >= min_slip_samples: + time_sec = float(expected_ref_positions[i] / sample_rate) + if time_sec < 1.0 or time_sec > total_duration - 1.0: + continue + artifacts.append({ + 'type': 'sample_slip', + 'time_sec': round(time_sec, 4), + 'lag_change_samples': int(delta), + 'lag_baseline': baseline_lag, + 'lag_at_marker': int(interval_lags[i]), + }) + return artifacts + + +def align_channels(reference: np.ndarray, dut: np.ndarray, sample_rate: int, + max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]: + max_lag_samples = int(max_lag_ms / 1000.0 * sample_rate) + + # Use a representative middle segment (up to 4 s) for robust correlation + seg_len = min(len(reference), len(dut), int(sample_rate * 4)) + mid = min(len(reference), len(dut)) // 2 + ref_seg = reference[mid - seg_len // 2: mid + seg_len // 2] + dut_seg = dut[mid - seg_len // 2: mid + seg_len // 2] + + correlation = signal.correlate(dut_seg, ref_seg, mode='full') + lags = np.arange(-(len(ref_seg) - 1), len(ref_seg)) + valid_mask = np.abs(lags) <= max_lag_samples + corr_masked = np.where(valid_mask, np.abs(correlation), 0) + lag_samples = int(lags[np.argmax(corr_masked)]) + + if lag_samples >= 0: + ref_aligned = reference[:len(reference) - lag_samples] + dut_aligned = dut[lag_samples:] + else: + ref_aligned = reference[-lag_samples:] + dut_aligned = dut[:len(dut) + lag_samples] + + min_len = min(len(ref_aligned), len(dut_aligned)) + return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples + + +def compute_residual(ref_aligned: np.ndarray, dut_aligned: np.ndarray) -> Tuple[np.ndarray, float]: + ref_rms = np.sqrt(np.mean(ref_aligned ** 2)) + if ref_rms < 1e-10: + return dut_aligned.copy(), 1.0 + gain = float(np.sqrt(np.mean(dut_aligned ** 2)) / ref_rms) + residual = dut_aligned - gain * ref_aligned + return residual, gain + + +def detect_glitches_short_time_energy(residual: np.ndarray, sample_rate: int, + window_ms: float = 5.0, + threshold_factor: float = 6.0, + min_burst_ms: float = 0.5) -> List[Dict]: + from scipy.ndimage import uniform_filter1d + artifacts = [] + window_samples = max(4, int(window_ms / 1000.0 * sample_rate)) + min_burst_samples = max(1, int(min_burst_ms / 1000.0 * sample_rate)) + total_duration = len(residual) / sample_rate + + rms_envelope = np.sqrt(np.maximum( + uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0)) + + baseline_rms = np.median(rms_envelope) + if baseline_rms < 1e-12: + return artifacts + + exceed = rms_envelope > threshold_factor * baseline_rms + changes = np.diff(exceed.astype(np.int8), prepend=0, append=0) + burst_starts = np.where(changes == 1)[0] + burst_ends = np.where(changes == -1)[0] + + for bstart, bend in zip(burst_starts, burst_ends): + burst_len = int(bend - bstart) + if burst_len < min_burst_samples: + continue + time_sec = float(bstart / sample_rate) + if time_sec < 1.0 or time_sec > total_duration - 1.0: + continue + peak_rms = float(np.max(rms_envelope[bstart:bend])) + artifacts.append({ + 'type': 'null_test_glitch', + 'time_sec': round(time_sec, 4), + 'duration_ms': round(burst_len / sample_rate * 1000.0, 2), + 'peak_residual_rms': round(peak_rms, 8), + 'baseline_rms': round(float(baseline_rms), 8), + 'deviation_factor': round(peak_rms / baseline_rms, 2) + }) + + return artifacts + + +def detect_sample_slips(reference: np.ndarray, dut: np.ndarray, sample_rate: int, + window_ms: float = 50.0, step_ms: float = 100.0) -> List[Dict]: + artifacts = [] + window_samples = int(window_ms / 1000.0 * sample_rate) + step_samples = int(step_ms / 1000.0 * sample_rate) + max_search = min(window_samples // 4, int(0.010 * sample_rate)) # ±10 ms + + min_len = min(len(reference), len(dut)) + total_duration = min_len / sample_rate + if min_len < window_samples * 2: + return artifacts + + lags_over_time = [] + times = [] + + for start in range(0, min_len - window_samples, step_samples): + ref_seg = reference[start: start + window_samples] + dut_seg = dut[start: start + window_samples] + corr = signal.correlate(dut_seg, ref_seg, mode='full') + lags = np.arange(-(len(ref_seg) - 1), len(ref_seg)) + valid = np.abs(lags) <= max_search + best_lag = int(lags[np.argmax(np.where(valid, np.abs(corr), 0))]) + lags_over_time.append(best_lag) + times.append(float(start / sample_rate)) + + if len(lags_over_time) < 3: + return artifacts + + lags_arr = np.array(lags_over_time) + for i in range(1, len(lags_arr)): + delta = int(lags_arr[i]) - int(lags_arr[i - 1]) + if abs(delta) >= 1: + time_sec = times[i] + if time_sec < 1.0 or time_sec > total_duration - 1.0: + continue + artifacts.append({ + 'type': 'sample_slip', + 'time_sec': round(time_sec, 4), + 'lag_change_samples': int(delta), + 'lag_before': int(lags_arr[i - 1]), + 'lag_after': int(lags_arr[i]) + }) + + return artifacts + + +def detect_artifacts_null_test(reference: np.ndarray, dut: np.ndarray, + sample_rate: int, null_test_config: Dict, + marker_template: Optional[np.ndarray] = None, + marker_positions: Optional[List[int]] = None) -> Dict: + result = { + 'enabled': null_test_config.get('enabled', True), + 'lag_samples': 0, + 'lag_ms': 0.0, + 'gain_factor': 1.0, + 'residual_rms': 0.0, + 'residual_peak': 0.0, + 'total_count': 0, + 'by_type': {}, + 'artifacts': [], + '_ref_aligned': None, + '_dut_aligned': None, + '_residual': None, + } + + if not result['enabled']: + return result + + # Guard: skip if either channel is silent/corrupted (e.g. ALSA underrun) + min_signal_level = 1e-4 + if np.max(np.abs(reference)) < min_signal_level or np.max(np.abs(dut)) < min_signal_level: + result['error'] = 'recording_too_quiet_or_corrupted' + print(" ⚠ Null test skipped: one or both channels are silent (possible ALSA underrun).") + return result + + max_lag_ms = float(null_test_config.get('max_lag_ms', 500.0)) + + if marker_template is not None and marker_positions: + ref_aligned, dut_aligned, lag_samples = align_by_markers( + reference, dut, marker_template, marker_positions, sample_rate, max_lag_ms) + alignment_method = 'marker' + else: + ref_aligned, dut_aligned, lag_samples = align_channels( + reference, dut, sample_rate, max_lag_ms) + alignment_method = 'xcorr' + + result['lag_samples'] = lag_samples + result['lag_ms'] = round(float(lag_samples) / sample_rate * 1000.0, 3) + result['alignment_method'] = alignment_method + + residual, gain = compute_residual(ref_aligned, dut_aligned) + result['gain_factor'] = round(gain, 6) + result['residual_rms'] = round(float(np.sqrt(np.mean(residual ** 2))), 8) + result['residual_peak'] = round(float(np.max(np.abs(residual))), 6) + + result['_ref_aligned'] = ref_aligned + result['_dut_aligned'] = dut_aligned + result['_residual'] = residual + + all_artifacts = [] + + window_ms = float(null_test_config.get('window_ms', 5.0)) + threshold_factor = float(null_test_config.get('threshold_factor', 6.0)) + min_burst_ms = float(null_test_config.get('min_burst_ms', 0.5)) + glitches = detect_glitches_short_time_energy( + residual, sample_rate, window_ms, threshold_factor, min_burst_ms) + all_artifacts.extend(glitches) + + if null_test_config.get('sample_slip_detection', True): + if marker_template is not None and marker_positions and len(marker_positions) >= 2: + min_slip_samples = int(null_test_config.get('min_slip_samples', 3)) + slips = detect_sample_slips_by_markers( + reference, dut, marker_template, marker_positions, sample_rate, lag_samples, + min_slip_samples=min_slip_samples) + else: + slip_window_ms = float(null_test_config.get('sample_slip_window_ms', 50.0)) + slips = detect_sample_slips(ref_aligned, dut_aligned, sample_rate, slip_window_ms) + all_artifacts.extend(slips) + + result['total_count'] = len(all_artifacts) + result['artifacts'] = all_artifacts + for a in all_artifacts: + t = a['type'] + result['by_type'][t] = result['by_type'].get(t, 0) + 1 + + return result + + def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int, expected_freq: float) -> Dict: """ @@ -737,18 +1044,29 @@ def plot_deviation_histogram(artifacts_ch1: Dict, artifacts_ch2: Dict, output_di if not all_devs: return - bin_min = int(np.floor(min(all_devs))) - bin_max = int(np.ceil(max(all_devs))) + 1 - bins = np.arange(bin_min, bin_max + 1) + raw_min = min(all_devs) + raw_max = max(all_devs) + MAX_BINS = 50 - counts_ch1, _ = np.histogram(dev_ch1, bins=bins) - counts_ch2, _ = np.histogram(dev_ch2, bins=bins) + if raw_max / max(raw_min, 1e-9) > 20 or (raw_max - raw_min) > MAX_BINS: + bins = np.logspace(np.log10(max(raw_min, 0.1)), np.log10(raw_max + 1), MAX_BINS + 1) + bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)] + else: + bin_min = int(np.floor(raw_min)) + bin_max = int(np.ceil(raw_max)) + 1 + bins = np.arange(bin_min, bin_max + 1) + if len(bins) > MAX_BINS + 1: + bins = np.linspace(bin_min, bin_max, MAX_BINS + 1) + bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)] + + counts_ch1, _ = np.histogram(dev_ch1 if dev_ch1 else [0], bins=bins) + counts_ch2, _ = np.histogram(dev_ch2 if dev_ch2 else [0], bins=bins) - bin_labels = [f"{bins[i]}-{bins[i+1]}" for i in range(len(bins) - 1)] x = np.arange(len(bin_labels)) width = 0.4 - fig, ax = plt.subplots(figsize=(max(10, len(bin_labels) * 0.7), 6)) + fig_width = min(24, max(10, len(bin_labels) * 0.5)) + fig, ax = plt.subplots(figsize=(fig_width, 6)) bars1 = ax.bar(x - width / 2, counts_ch1, width, label='Ch1 Loopback', color='steelblue', alpha=0.85) bars2 = ax.bar(x + width / 2, counts_ch2, width, label='Ch2 DUT/Radio', color='tomato', alpha=0.85) @@ -776,6 +1094,82 @@ def plot_deviation_histogram(artifacts_ch1: Dict, artifacts_ch2: Dict, output_di plt.close() +def plot_null_test(null_test_result: Dict, sample_rate: int, output_dir: Path, + marker_positions: Optional[List[int]] = None, + marker_len_samples: int = 0): + from scipy.ndimage import uniform_filter1d + + ref_aligned = null_test_result['_ref_aligned'] + dut_aligned = null_test_result['_dut_aligned'] + residual = null_test_result['_residual'] + + if ref_aligned is None or residual is None: + return + + total_duration = len(ref_aligned) / sample_rate + fig, axes = plt.subplots(3, 1, figsize=(16, 12)) + time = np.arange(len(ref_aligned)) / sample_rate + + axes[0].plot(time, ref_aligned, alpha=0.6, linewidth=0.5, label='Ch1 Loopback (aligned)') + axes[0].plot(time, dut_aligned, alpha=0.6, linewidth=0.5, label='Ch2 DUT (aligned)') + axes[0].set_ylabel('Amplitude') + axes[0].set_title( + f'Null Test — Aligned Channels ' + f'(lag={null_test_result["lag_ms"]:.2f} ms, ' + f'gain={null_test_result["gain_factor"]:.4f})') + axes[0].legend(loc='upper right', fontsize=8) + axes[0].grid(True, alpha=0.3) + + axes[1].plot(time, residual, alpha=0.8, linewidth=0.4, color='purple') + axes[1].set_ylabel('Amplitude') + axes[1].set_title( + f'Residual e[n] = DUT − gain·Ref ' + f'(RMS={null_test_result["residual_rms"]:.2e}, ' + f'peak={null_test_result["residual_peak"]:.4f})') + for a in null_test_result['artifacts']: + color = 'red' if a['type'] == 'null_test_glitch' else 'orange' + axes[1].axvline(x=a['time_sec'], color=color, alpha=0.6, linewidth=1.0) + axes[1].grid(True, alpha=0.3) + + window_samples = max(4, int(5.0 / 1000.0 * sample_rate)) + rms_envelope = np.sqrt(np.maximum( + uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0)) + + axes[2].plot(time, rms_envelope, linewidth=0.6, color='darkgreen', label='Short-time RMS (5 ms)') + axes[2].set_xlabel('Time (s)') + axes[2].set_ylabel('Short-time RMS') + axes[2].set_title('Short-time RMS of Residual') + axes[2].grid(True, alpha=0.3) + + # Determine x-window for lower two axes: exclude marker regions at start and end + margin_sec = 0.5 + if marker_positions and len(marker_positions) >= 2 and marker_len_samples > 0: + x_min = (marker_positions[0] + marker_len_samples) / sample_rate + margin_sec + x_max = marker_positions[-1] / sample_rate - margin_sec + else: + x_min = margin_sec + x_max = total_duration - margin_sec + + if x_min < x_max: + axes[1].set_xlim(x_min, x_max) + axes[2].set_xlim(x_min, x_max) + + # Recompute baseline from the visible region only (unaffected by marker bursts) + i_min = max(0, int(x_min * sample_rate)) + i_max = min(len(rms_envelope), int(x_max * sample_rate)) + baseline = float(np.median(rms_envelope[i_min:i_max])) + else: + baseline = float(np.median(rms_envelope)) + + axes[2].axhline(y=baseline, color='steelblue', linestyle='--', linewidth=1, + label=f'Baseline ({baseline:.2e})') + axes[2].legend(fontsize=8) + + plt.tight_layout() + plt.savefig(output_dir / 'null_test_residual.png', dpi=150, bbox_inches='tight') + plt.close() + + def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict: import time @@ -786,6 +1180,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d device_name = config['audio']['device_name'] channels = config['audio']['channels'] detector_config = config['artifact_detection']['detectors'] + null_test_config = detector_config.get('null_test', {}) startup_delay = config['artifact_detection'].get('startup_delay', 10) signal_type = config['artifact_detection'].get('signal_type', 'sine') @@ -796,6 +1191,10 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d time.sleep(startup_delay) print("Starting recording...") + use_null_test = signal_type != 'silent' and null_test_config.get('enabled', True) + marker_template: Optional[np.ndarray] = None + marker_positions: Optional[List[int]] = [] + if signal_type == 'chirp': f0 = config['artifact_detection'].get('chirp_f0', 100) f1 = config['artifact_detection'].get('chirp_f1', 8000) @@ -804,22 +1203,67 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d recording = play_and_record(tone, sample_rate, device_ids, channels) elif signal_type == 'silent': frequency = 1000 - recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate, + recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=channels, device=device_ids[0], blocking=True) else: tone = generate_test_tone(frequency, duration, sample_rate, amplitude) + if use_null_test: + marker_duration_sec = float(null_test_config.get('marker_duration_sec', 0.05)) + marker_offset_sec = float(null_test_config.get('marker_first_offset_sec', 0.5)) + marker_template = generate_sync_marker( + sample_rate, + duration_sec=marker_duration_sec, + f0=float(null_test_config.get('marker_f0', 200.0)), + f1=float(null_test_config.get('marker_f1', 16000.0)), + amplitude=float(null_test_config.get('marker_amplitude', 0.7)), + ) + marker_len = len(marker_template) + pos_start = int(marker_offset_sec * sample_rate) + pos_end = len(tone) - int(marker_offset_sec * sample_rate) - marker_len + marker_positions = [pos_start, pos_end] + tone = tone.copy() + for pos in marker_positions: + tone[pos:pos + marker_len] = np.clip( + tone[pos:pos + marker_len] + marker_template, -1.0, 1.0) + print(f" Embedded sync markers at start ({marker_offset_sec:.1f}s) and end " + f"({pos_end/sample_rate:.1f}s) " + f"(chirp 200→16000 Hz, {marker_duration_sec*1000:.0f} ms)") recording = play_and_record(tone, sample_rate, device_ids, channels) - + channel_1 = recording[:, 0] channel_2 = recording[:, 1] - + + if output_dir: + wavfile.write(str(output_dir / 'channel_1_loopback_recording.wav'), sample_rate, channel_1.astype(np.float32)) + wavfile.write(str(output_dir / 'channel_2_dut_recording.wav'), sample_rate, channel_2.astype(np.float32)) + artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config) artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config) - + + null_test_result = None + if use_null_test: + print("Running null test (align → subtract → residual analysis)...") + null_test_result = detect_artifacts_null_test( + channel_1, channel_2, sample_rate, null_test_config, + marker_template=marker_template, + marker_positions=marker_positions if marker_positions else None, + ) + if 'error' not in null_test_result: + print(f" Lag: {null_test_result['lag_ms']:.2f} ms " + f"[{null_test_result.get('alignment_method', '?')}] | " + f"Gain: {null_test_result['gain_factor']:.4f} | " + f"Residual RMS: {null_test_result['residual_rms']:.2e} | " + f"Glitches: {null_test_result['total_count']}") + if save_plots and output_dir: plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2, frequency, sample_rate, output_dir) plot_deviation_histogram(artifacts_ch1, artifacts_ch2, output_dir) + + if null_test_result is not None and null_test_result['_residual'] is not None: + plot_null_test(null_test_result, sample_rate, output_dir, + marker_positions=marker_positions, + marker_len_samples=len(marker_template) if marker_template is not None else 0) anomalies_dir = output_dir / 'individual_anomalies' anomalies_dir.mkdir(exist_ok=True) @@ -837,7 +1281,13 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts']) if total_anomaly_plots > 0: print(f"✓ Generated {total_anomaly_plots} individual anomaly plots") - + + null_test_serializable = None + if null_test_result is not None: + null_test_serializable = { + k: v for k, v in null_test_result.items() if not k.startswith('_') + } + result = { 'signal_type': signal_type, 'duration_sec': float(duration), @@ -853,6 +1303,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d 'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60), 'frequency_accuracy': artifacts_ch2['frequency_accuracy'] }, + 'null_test': null_test_serializable, 'detector_config': detector_config } diff --git a/test_artifact_detection.py b/test_artifact_detection.py index 8d9d820..0592d97 100755 --- a/test_artifact_detection.py +++ b/test_artifact_detection.py @@ -133,13 +133,42 @@ def main(): print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz") print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)") + nt = result.get('null_test') + if nt and nt.get('enabled'): + print("\n🔬 NULL TEST (Ch2 DUT vs Ch1 Loopback reference):") + print(f" Alignment lag: {nt['lag_ms']:.2f} ms ({nt['lag_samples']} samples)") + print(f" Gain factor: {nt['gain_factor']:.4f}") + print(f" Residual RMS: {nt['residual_rms']:.2e}") + print(f" Residual peak: {nt['residual_peak']:.4f}") + print(f" Glitches found: {nt['total_count']}") + if nt['by_type']: + print(" By type:") + for artifact_type, count in nt['by_type'].items(): + print(f" - {artifact_type}: {count}") + if nt['artifacts']: + print(" Glitch timestamps:") + for a in nt['artifacts'][:20]: + if a['type'] == 'null_test_glitch': + print(f" {a['time_sec']:.3f}s dur={a['duration_ms']:.1f}ms " + f"dev={a['deviation_factor']:.1f}×baseline") + elif a['type'] == 'sample_slip': + baseline = a.get('lag_baseline', a.get('lag_before', '?')) + at = a.get('lag_at_marker', a.get('lag_after', '?')) + print(f" {a['time_sec']:.3f}s sample_slip " + f"Δ={a['lag_change_samples']:+d} samples " + f"(baseline={baseline}, at_marker={at})") + if len(nt['artifacts']) > 20: + print(f" ... and {len(nt['artifacts']) - 20} more (see YAML)") + ch1_count = result['channel_1_loopback']['total_artifacts'] ch2_count = result['channel_2_dut']['total_artifacts'] - if ch2_count > ch1_count: + if nt and nt.get('enabled') and nt['total_count'] > 0: + print(f"\n⚠️ NULL TEST: {nt['total_count']} glitch(es) detected in DUT path residual") + elif ch2_count > ch1_count: delta = ch2_count - ch1_count print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback") - elif ch1_count == ch2_count == 0: + elif ch1_count == ch2_count == 0 and (not nt or nt['total_count'] == 0): print("\n✅ EXCELLENT: No artifacts detected in either path!") else: print(f"\nℹ️ Loopback baseline: {ch1_count} artifacts")