Adds null test to artifact.

This commit is contained in:
2026-04-24 08:39:03 +02:00
parent 8f44cf56d4
commit 530d58440f
3 changed files with 516 additions and 15 deletions

View File

@@ -38,6 +38,27 @@ artifact_detection:
energy_variation:
enabled: true
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
null_test:
enabled: true
# Align Ch2 (DUT) to Ch1 (Loopback), subtract, detect bursts in residual
max_lag_ms: 500.0 # Maximum expected delay between channels for alignment search
window_ms: 5.0 # Short-time RMS window length for burst detection
threshold_factor: 6.0 # Flag windows where residual RMS exceeds baseline × this factor
min_burst_ms: 0.5 # Minimum burst duration to report (filters out single-sample spikes)
sample_slip_detection: true
sample_slip_window_ms: 50.0 # Correlation window for xcorr-based lag tracking (fallback only)
# Sync marker: one chirp burst at the start and one at the end of the played signal.
# Marker-based alignment is immune to periodic-signal ambiguity (e.g. pure sine).
# Sample slip detection compares the lag at the start marker vs. the end marker.
marker_duration_sec: 0.05 # Length of each chirp marker burst
marker_first_offset_sec: 0.5 # Offset from signal start (and from signal end) for markers
marker_f0: 200.0 # Marker chirp start frequency (Hz)
marker_f1: 16000.0 # Marker chirp end frequency (Hz) — wider BW = sharper correlation peak
marker_amplitude: 0.7 # Marker amplitude (mixed on top of test tone)
# Sample slip threshold: only report if inter-marker lag deviates from median by >= this many samples.
# Rule of thumb: peak timing precision ≈ 1 / (marker_f1 - marker_f0) * sample_rate
# With 200-16000 Hz BW at 44100 Hz: precision ≈ 3 samples → min_slip_samples = 5 gives good margin.
min_slip_samples: 5
latency:
max_std_dev_ms: 1.0 # Maximum allowed std deviation; test fails if exceeded

View File

@@ -2,7 +2,8 @@ import time
import numpy as np
import sounddevice as sd
from scipy import signal
from typing import Tuple, Dict, List
from scipy.io import wavfile
from typing import Tuple, Dict, List, Optional
import matplotlib.pyplot as plt
from pathlib import Path
@@ -533,6 +534,312 @@ def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
return artifacts
def generate_sync_marker(sample_rate: int, duration_sec: float = 0.05,
f0: float = 200.0, f1: float = 4000.0,
amplitude: float = 0.7) -> np.ndarray:
t = np.linspace(0, duration_sec, int(sample_rate * duration_sec), endpoint=False)
win = np.hanning(len(t))
return amplitude * win * signal.chirp(t, f0, duration_sec, f1, method='linear')
def embed_markers(base_signal: np.ndarray, marker: np.ndarray,
sample_rate: int, interval_sec: float = 5.0,
first_offset_sec: float = 0.5) -> Tuple[np.ndarray, List[int]]:
out = base_signal.copy()
marker_len = len(marker)
positions: List[int] = []
t = first_offset_sec
total_sec = len(base_signal) / sample_rate
while t + marker_len / sample_rate + 0.1 <= total_sec:
pos = int(t * sample_rate)
end = pos + marker_len
out[pos:end] = np.clip(out[pos:end] + marker, -1.0, 1.0)
positions.append(pos)
t += interval_sec
return out, positions
def find_all_marker_positions(channel: np.ndarray, marker_template: np.ndarray,
expected_positions: List[int], sample_rate: int,
search_radius_ms: float = 500.0) -> List[Optional[int]]:
corr = np.abs(signal.correlate(channel, marker_template, mode='valid'))
search_radius = int(search_radius_ms / 1000.0 * sample_rate)
found: List[Optional[int]] = []
for expected in expected_positions:
lo = max(0, expected - search_radius)
hi = min(len(corr), expected + search_radius + 1)
if lo >= hi:
found.append(None)
continue
local_idx = int(np.argmax(corr[lo:hi]))
found.append(lo + local_idx)
return found
def align_by_markers(reference: np.ndarray, dut: np.ndarray,
marker_template: np.ndarray, expected_positions: List[int],
sample_rate: int, max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]:
ref_pos = find_all_marker_positions(reference, marker_template, expected_positions,
sample_rate, max_lag_ms)
dut_pos = find_all_marker_positions(dut, marker_template, expected_positions,
sample_rate, max_lag_ms)
lags = [dp - rp for rp, dp in zip(ref_pos, dut_pos)
if rp is not None and dp is not None]
if not lags:
return align_channels(reference, dut, sample_rate, max_lag_ms)
lag_samples = int(round(float(np.median(lags))))
if lag_samples >= 0:
ref_aligned = reference[:len(reference) - lag_samples]
dut_aligned = dut[lag_samples:]
else:
ref_aligned = reference[-lag_samples:]
dut_aligned = dut[:len(dut) + lag_samples]
min_len = min(len(ref_aligned), len(dut_aligned))
return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples
def detect_sample_slips_by_markers(reference: np.ndarray, dut: np.ndarray,
marker_template: np.ndarray,
expected_ref_positions: List[int],
sample_rate: int, lag_samples: int,
min_slip_samples: int = 3) -> List[Dict]:
artifacts: List[Dict] = []
total_duration = len(reference) / sample_rate
expected_dut_positions = [p + lag_samples for p in expected_ref_positions]
ref_found = find_all_marker_positions(reference, marker_template, expected_ref_positions,
sample_rate, search_radius_ms=200.0)
dut_found = find_all_marker_positions(dut, marker_template, expected_dut_positions,
sample_rate, search_radius_ms=200.0)
interval_lags: List[Optional[int]] = [
(dp - rp) if rp is not None and dp is not None else None
for rp, dp in zip(ref_found, dut_found)
]
valid_lags = [v for v in interval_lags if v is not None]
baseline_lag = int(round(float(np.median(valid_lags)))) if valid_lags else 0
for i in range(len(interval_lags)):
if interval_lags[i] is None:
continue
delta = int(interval_lags[i]) - baseline_lag
if abs(delta) >= min_slip_samples:
time_sec = float(expected_ref_positions[i] / sample_rate)
if time_sec < 1.0 or time_sec > total_duration - 1.0:
continue
artifacts.append({
'type': 'sample_slip',
'time_sec': round(time_sec, 4),
'lag_change_samples': int(delta),
'lag_baseline': baseline_lag,
'lag_at_marker': int(interval_lags[i]),
})
return artifacts
def align_channels(reference: np.ndarray, dut: np.ndarray, sample_rate: int,
max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]:
max_lag_samples = int(max_lag_ms / 1000.0 * sample_rate)
# Use a representative middle segment (up to 4 s) for robust correlation
seg_len = min(len(reference), len(dut), int(sample_rate * 4))
mid = min(len(reference), len(dut)) // 2
ref_seg = reference[mid - seg_len // 2: mid + seg_len // 2]
dut_seg = dut[mid - seg_len // 2: mid + seg_len // 2]
correlation = signal.correlate(dut_seg, ref_seg, mode='full')
lags = np.arange(-(len(ref_seg) - 1), len(ref_seg))
valid_mask = np.abs(lags) <= max_lag_samples
corr_masked = np.where(valid_mask, np.abs(correlation), 0)
lag_samples = int(lags[np.argmax(corr_masked)])
if lag_samples >= 0:
ref_aligned = reference[:len(reference) - lag_samples]
dut_aligned = dut[lag_samples:]
else:
ref_aligned = reference[-lag_samples:]
dut_aligned = dut[:len(dut) + lag_samples]
min_len = min(len(ref_aligned), len(dut_aligned))
return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples
def compute_residual(ref_aligned: np.ndarray, dut_aligned: np.ndarray) -> Tuple[np.ndarray, float]:
ref_rms = np.sqrt(np.mean(ref_aligned ** 2))
if ref_rms < 1e-10:
return dut_aligned.copy(), 1.0
gain = float(np.sqrt(np.mean(dut_aligned ** 2)) / ref_rms)
residual = dut_aligned - gain * ref_aligned
return residual, gain
def detect_glitches_short_time_energy(residual: np.ndarray, sample_rate: int,
window_ms: float = 5.0,
threshold_factor: float = 6.0,
min_burst_ms: float = 0.5) -> List[Dict]:
from scipy.ndimage import uniform_filter1d
artifacts = []
window_samples = max(4, int(window_ms / 1000.0 * sample_rate))
min_burst_samples = max(1, int(min_burst_ms / 1000.0 * sample_rate))
total_duration = len(residual) / sample_rate
rms_envelope = np.sqrt(np.maximum(
uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0))
baseline_rms = np.median(rms_envelope)
if baseline_rms < 1e-12:
return artifacts
exceed = rms_envelope > threshold_factor * baseline_rms
changes = np.diff(exceed.astype(np.int8), prepend=0, append=0)
burst_starts = np.where(changes == 1)[0]
burst_ends = np.where(changes == -1)[0]
for bstart, bend in zip(burst_starts, burst_ends):
burst_len = int(bend - bstart)
if burst_len < min_burst_samples:
continue
time_sec = float(bstart / sample_rate)
if time_sec < 1.0 or time_sec > total_duration - 1.0:
continue
peak_rms = float(np.max(rms_envelope[bstart:bend]))
artifacts.append({
'type': 'null_test_glitch',
'time_sec': round(time_sec, 4),
'duration_ms': round(burst_len / sample_rate * 1000.0, 2),
'peak_residual_rms': round(peak_rms, 8),
'baseline_rms': round(float(baseline_rms), 8),
'deviation_factor': round(peak_rms / baseline_rms, 2)
})
return artifacts
def detect_sample_slips(reference: np.ndarray, dut: np.ndarray, sample_rate: int,
window_ms: float = 50.0, step_ms: float = 100.0) -> List[Dict]:
artifacts = []
window_samples = int(window_ms / 1000.0 * sample_rate)
step_samples = int(step_ms / 1000.0 * sample_rate)
max_search = min(window_samples // 4, int(0.010 * sample_rate)) # ±10 ms
min_len = min(len(reference), len(dut))
total_duration = min_len / sample_rate
if min_len < window_samples * 2:
return artifacts
lags_over_time = []
times = []
for start in range(0, min_len - window_samples, step_samples):
ref_seg = reference[start: start + window_samples]
dut_seg = dut[start: start + window_samples]
corr = signal.correlate(dut_seg, ref_seg, mode='full')
lags = np.arange(-(len(ref_seg) - 1), len(ref_seg))
valid = np.abs(lags) <= max_search
best_lag = int(lags[np.argmax(np.where(valid, np.abs(corr), 0))])
lags_over_time.append(best_lag)
times.append(float(start / sample_rate))
if len(lags_over_time) < 3:
return artifacts
lags_arr = np.array(lags_over_time)
for i in range(1, len(lags_arr)):
delta = int(lags_arr[i]) - int(lags_arr[i - 1])
if abs(delta) >= 1:
time_sec = times[i]
if time_sec < 1.0 or time_sec > total_duration - 1.0:
continue
artifacts.append({
'type': 'sample_slip',
'time_sec': round(time_sec, 4),
'lag_change_samples': int(delta),
'lag_before': int(lags_arr[i - 1]),
'lag_after': int(lags_arr[i])
})
return artifacts
def detect_artifacts_null_test(reference: np.ndarray, dut: np.ndarray,
sample_rate: int, null_test_config: Dict,
marker_template: Optional[np.ndarray] = None,
marker_positions: Optional[List[int]] = None) -> Dict:
result = {
'enabled': null_test_config.get('enabled', True),
'lag_samples': 0,
'lag_ms': 0.0,
'gain_factor': 1.0,
'residual_rms': 0.0,
'residual_peak': 0.0,
'total_count': 0,
'by_type': {},
'artifacts': [],
'_ref_aligned': None,
'_dut_aligned': None,
'_residual': None,
}
if not result['enabled']:
return result
# Guard: skip if either channel is silent/corrupted (e.g. ALSA underrun)
min_signal_level = 1e-4
if np.max(np.abs(reference)) < min_signal_level or np.max(np.abs(dut)) < min_signal_level:
result['error'] = 'recording_too_quiet_or_corrupted'
print(" ⚠ Null test skipped: one or both channels are silent (possible ALSA underrun).")
return result
max_lag_ms = float(null_test_config.get('max_lag_ms', 500.0))
if marker_template is not None and marker_positions:
ref_aligned, dut_aligned, lag_samples = align_by_markers(
reference, dut, marker_template, marker_positions, sample_rate, max_lag_ms)
alignment_method = 'marker'
else:
ref_aligned, dut_aligned, lag_samples = align_channels(
reference, dut, sample_rate, max_lag_ms)
alignment_method = 'xcorr'
result['lag_samples'] = lag_samples
result['lag_ms'] = round(float(lag_samples) / sample_rate * 1000.0, 3)
result['alignment_method'] = alignment_method
residual, gain = compute_residual(ref_aligned, dut_aligned)
result['gain_factor'] = round(gain, 6)
result['residual_rms'] = round(float(np.sqrt(np.mean(residual ** 2))), 8)
result['residual_peak'] = round(float(np.max(np.abs(residual))), 6)
result['_ref_aligned'] = ref_aligned
result['_dut_aligned'] = dut_aligned
result['_residual'] = residual
all_artifacts = []
window_ms = float(null_test_config.get('window_ms', 5.0))
threshold_factor = float(null_test_config.get('threshold_factor', 6.0))
min_burst_ms = float(null_test_config.get('min_burst_ms', 0.5))
glitches = detect_glitches_short_time_energy(
residual, sample_rate, window_ms, threshold_factor, min_burst_ms)
all_artifacts.extend(glitches)
if null_test_config.get('sample_slip_detection', True):
if marker_template is not None and marker_positions and len(marker_positions) >= 2:
min_slip_samples = int(null_test_config.get('min_slip_samples', 3))
slips = detect_sample_slips_by_markers(
reference, dut, marker_template, marker_positions, sample_rate, lag_samples,
min_slip_samples=min_slip_samples)
else:
slip_window_ms = float(null_test_config.get('sample_slip_window_ms', 50.0))
slips = detect_sample_slips(ref_aligned, dut_aligned, sample_rate, slip_window_ms)
all_artifacts.extend(slips)
result['total_count'] = len(all_artifacts)
result['artifacts'] = all_artifacts
for a in all_artifacts:
t = a['type']
result['by_type'][t] = result['by_type'].get(t, 0) + 1
return result
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
expected_freq: float) -> Dict:
"""
@@ -737,18 +1044,29 @@ def plot_deviation_histogram(artifacts_ch1: Dict, artifacts_ch2: Dict, output_di
if not all_devs:
return
bin_min = int(np.floor(min(all_devs)))
bin_max = int(np.ceil(max(all_devs))) + 1
bins = np.arange(bin_min, bin_max + 1)
raw_min = min(all_devs)
raw_max = max(all_devs)
MAX_BINS = 50
counts_ch1, _ = np.histogram(dev_ch1, bins=bins)
counts_ch2, _ = np.histogram(dev_ch2, bins=bins)
if raw_max / max(raw_min, 1e-9) > 20 or (raw_max - raw_min) > MAX_BINS:
bins = np.logspace(np.log10(max(raw_min, 0.1)), np.log10(raw_max + 1), MAX_BINS + 1)
bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)]
else:
bin_min = int(np.floor(raw_min))
bin_max = int(np.ceil(raw_max)) + 1
bins = np.arange(bin_min, bin_max + 1)
if len(bins) > MAX_BINS + 1:
bins = np.linspace(bin_min, bin_max, MAX_BINS + 1)
bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)]
counts_ch1, _ = np.histogram(dev_ch1 if dev_ch1 else [0], bins=bins)
counts_ch2, _ = np.histogram(dev_ch2 if dev_ch2 else [0], bins=bins)
bin_labels = [f"{bins[i]}-{bins[i+1]}" for i in range(len(bins) - 1)]
x = np.arange(len(bin_labels))
width = 0.4
fig, ax = plt.subplots(figsize=(max(10, len(bin_labels) * 0.7), 6))
fig_width = min(24, max(10, len(bin_labels) * 0.5))
fig, ax = plt.subplots(figsize=(fig_width, 6))
bars1 = ax.bar(x - width / 2, counts_ch1, width, label='Ch1 Loopback', color='steelblue', alpha=0.85)
bars2 = ax.bar(x + width / 2, counts_ch2, width, label='Ch2 DUT/Radio', color='tomato', alpha=0.85)
@@ -776,6 +1094,82 @@ def plot_deviation_histogram(artifacts_ch1: Dict, artifacts_ch2: Dict, output_di
plt.close()
def plot_null_test(null_test_result: Dict, sample_rate: int, output_dir: Path,
marker_positions: Optional[List[int]] = None,
marker_len_samples: int = 0):
from scipy.ndimage import uniform_filter1d
ref_aligned = null_test_result['_ref_aligned']
dut_aligned = null_test_result['_dut_aligned']
residual = null_test_result['_residual']
if ref_aligned is None or residual is None:
return
total_duration = len(ref_aligned) / sample_rate
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
time = np.arange(len(ref_aligned)) / sample_rate
axes[0].plot(time, ref_aligned, alpha=0.6, linewidth=0.5, label='Ch1 Loopback (aligned)')
axes[0].plot(time, dut_aligned, alpha=0.6, linewidth=0.5, label='Ch2 DUT (aligned)')
axes[0].set_ylabel('Amplitude')
axes[0].set_title(
f'Null Test — Aligned Channels '
f'(lag={null_test_result["lag_ms"]:.2f} ms, '
f'gain={null_test_result["gain_factor"]:.4f})')
axes[0].legend(loc='upper right', fontsize=8)
axes[0].grid(True, alpha=0.3)
axes[1].plot(time, residual, alpha=0.8, linewidth=0.4, color='purple')
axes[1].set_ylabel('Amplitude')
axes[1].set_title(
f'Residual e[n] = DUT gain·Ref '
f'(RMS={null_test_result["residual_rms"]:.2e}, '
f'peak={null_test_result["residual_peak"]:.4f})')
for a in null_test_result['artifacts']:
color = 'red' if a['type'] == 'null_test_glitch' else 'orange'
axes[1].axvline(x=a['time_sec'], color=color, alpha=0.6, linewidth=1.0)
axes[1].grid(True, alpha=0.3)
window_samples = max(4, int(5.0 / 1000.0 * sample_rate))
rms_envelope = np.sqrt(np.maximum(
uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0))
axes[2].plot(time, rms_envelope, linewidth=0.6, color='darkgreen', label='Short-time RMS (5 ms)')
axes[2].set_xlabel('Time (s)')
axes[2].set_ylabel('Short-time RMS')
axes[2].set_title('Short-time RMS of Residual')
axes[2].grid(True, alpha=0.3)
# Determine x-window for lower two axes: exclude marker regions at start and end
margin_sec = 0.5
if marker_positions and len(marker_positions) >= 2 and marker_len_samples > 0:
x_min = (marker_positions[0] + marker_len_samples) / sample_rate + margin_sec
x_max = marker_positions[-1] / sample_rate - margin_sec
else:
x_min = margin_sec
x_max = total_duration - margin_sec
if x_min < x_max:
axes[1].set_xlim(x_min, x_max)
axes[2].set_xlim(x_min, x_max)
# Recompute baseline from the visible region only (unaffected by marker bursts)
i_min = max(0, int(x_min * sample_rate))
i_max = min(len(rms_envelope), int(x_max * sample_rate))
baseline = float(np.median(rms_envelope[i_min:i_max]))
else:
baseline = float(np.median(rms_envelope))
axes[2].axhline(y=baseline, color='steelblue', linestyle='--', linewidth=1,
label=f'Baseline ({baseline:.2e})')
axes[2].legend(fontsize=8)
plt.tight_layout()
plt.savefig(output_dir / 'null_test_residual.png', dpi=150, bbox_inches='tight')
plt.close()
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
import time
@@ -786,6 +1180,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
device_name = config['audio']['device_name']
channels = config['audio']['channels']
detector_config = config['artifact_detection']['detectors']
null_test_config = detector_config.get('null_test', {})
startup_delay = config['artifact_detection'].get('startup_delay', 10)
signal_type = config['artifact_detection'].get('signal_type', 'sine')
@@ -796,6 +1191,10 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
time.sleep(startup_delay)
print("Starting recording...")
use_null_test = signal_type != 'silent' and null_test_config.get('enabled', True)
marker_template: Optional[np.ndarray] = None
marker_positions: Optional[List[int]] = []
if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000)
@@ -804,22 +1203,67 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
recording = play_and_record(tone, sample_rate, device_ids, channels)
elif signal_type == 'silent':
frequency = 1000
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
channels=channels, device=device_ids[0], blocking=True)
else:
tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
if use_null_test:
marker_duration_sec = float(null_test_config.get('marker_duration_sec', 0.05))
marker_offset_sec = float(null_test_config.get('marker_first_offset_sec', 0.5))
marker_template = generate_sync_marker(
sample_rate,
duration_sec=marker_duration_sec,
f0=float(null_test_config.get('marker_f0', 200.0)),
f1=float(null_test_config.get('marker_f1', 16000.0)),
amplitude=float(null_test_config.get('marker_amplitude', 0.7)),
)
marker_len = len(marker_template)
pos_start = int(marker_offset_sec * sample_rate)
pos_end = len(tone) - int(marker_offset_sec * sample_rate) - marker_len
marker_positions = [pos_start, pos_end]
tone = tone.copy()
for pos in marker_positions:
tone[pos:pos + marker_len] = np.clip(
tone[pos:pos + marker_len] + marker_template, -1.0, 1.0)
print(f" Embedded sync markers at start ({marker_offset_sec:.1f}s) and end "
f"({pos_end/sample_rate:.1f}s) "
f"(chirp 200→16000 Hz, {marker_duration_sec*1000:.0f} ms)")
recording = play_and_record(tone, sample_rate, device_ids, channels)
channel_1 = recording[:, 0]
channel_2 = recording[:, 1]
if output_dir:
wavfile.write(str(output_dir / 'channel_1_loopback_recording.wav'), sample_rate, channel_1.astype(np.float32))
wavfile.write(str(output_dir / 'channel_2_dut_recording.wav'), sample_rate, channel_2.astype(np.float32))
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
null_test_result = None
if use_null_test:
print("Running null test (align → subtract → residual analysis)...")
null_test_result = detect_artifacts_null_test(
channel_1, channel_2, sample_rate, null_test_config,
marker_template=marker_template,
marker_positions=marker_positions if marker_positions else None,
)
if 'error' not in null_test_result:
print(f" Lag: {null_test_result['lag_ms']:.2f} ms "
f"[{null_test_result.get('alignment_method', '?')}] | "
f"Gain: {null_test_result['gain_factor']:.4f} | "
f"Residual RMS: {null_test_result['residual_rms']:.2e} | "
f"Glitches: {null_test_result['total_count']}")
if save_plots and output_dir:
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
frequency, sample_rate, output_dir)
plot_deviation_histogram(artifacts_ch1, artifacts_ch2, output_dir)
if null_test_result is not None and null_test_result['_residual'] is not None:
plot_null_test(null_test_result, sample_rate, output_dir,
marker_positions=marker_positions,
marker_len_samples=len(marker_template) if marker_template is not None else 0)
anomalies_dir = output_dir / 'individual_anomalies'
anomalies_dir.mkdir(exist_ok=True)
@@ -837,7 +1281,13 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts'])
if total_anomaly_plots > 0:
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
null_test_serializable = None
if null_test_result is not None:
null_test_serializable = {
k: v for k, v in null_test_result.items() if not k.startswith('_')
}
result = {
'signal_type': signal_type,
'duration_sec': float(duration),
@@ -853,6 +1303,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
'frequency_accuracy': artifacts_ch2['frequency_accuracy']
},
'null_test': null_test_serializable,
'detector_config': detector_config
}

View File

@@ -133,13 +133,42 @@ def main():
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
nt = result.get('null_test')
if nt and nt.get('enabled'):
print("\n🔬 NULL TEST (Ch2 DUT vs Ch1 Loopback reference):")
print(f" Alignment lag: {nt['lag_ms']:.2f} ms ({nt['lag_samples']} samples)")
print(f" Gain factor: {nt['gain_factor']:.4f}")
print(f" Residual RMS: {nt['residual_rms']:.2e}")
print(f" Residual peak: {nt['residual_peak']:.4f}")
print(f" Glitches found: {nt['total_count']}")
if nt['by_type']:
print(" By type:")
for artifact_type, count in nt['by_type'].items():
print(f" - {artifact_type}: {count}")
if nt['artifacts']:
print(" Glitch timestamps:")
for a in nt['artifacts'][:20]:
if a['type'] == 'null_test_glitch':
print(f" {a['time_sec']:.3f}s dur={a['duration_ms']:.1f}ms "
f"dev={a['deviation_factor']:.1f}×baseline")
elif a['type'] == 'sample_slip':
baseline = a.get('lag_baseline', a.get('lag_before', '?'))
at = a.get('lag_at_marker', a.get('lag_after', '?'))
print(f" {a['time_sec']:.3f}s sample_slip "
f"Δ={a['lag_change_samples']:+d} samples "
f"(baseline={baseline}, at_marker={at})")
if len(nt['artifacts']) > 20:
print(f" ... and {len(nt['artifacts']) - 20} more (see YAML)")
ch1_count = result['channel_1_loopback']['total_artifacts']
ch2_count = result['channel_2_dut']['total_artifacts']
if ch2_count > ch1_count:
if nt and nt.get('enabled') and nt['total_count'] > 0:
print(f"\n⚠️ NULL TEST: {nt['total_count']} glitch(es) detected in DUT path residual")
elif ch2_count > ch1_count:
delta = ch2_count - ch1_count
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
elif ch1_count == ch2_count == 0:
elif ch1_count == ch2_count == 0 and (not nt or nt['total_count'] == 0):
print("\n✅ EXCELLENT: No artifacts detected in either path!")
else:
print(f"\n Loopback baseline: {ch1_count} artifacts")