From 39bcd072c0c07a3e7d87031a39d66342ffe707f1 Mon Sep 17 00:00:00 2001
From: Pbopbo
Date: Wed, 18 Mar 2026 10:41:46 +0100
Subject: [PATCH] Adds artifact test.
---
ARTIFACT_DETECTION_README.md | 193 ++++++++++++++++
QUICKSTART.md | 13 +-
README.md | 5 +-
ai_stuff/prompts.md | 18 +-
config.yaml | 23 ++
example_test_result.yaml | 5 +-
run_test.py | 14 +-
src/audio_tests.py | 413 +++++++++++++++++++++++++++++++++++
test_artifact_detection.py | 173 +++++++++++++++
9 files changed, 836 insertions(+), 21 deletions(-)
create mode 100644 ARTIFACT_DETECTION_README.md
create mode 100755 test_artifact_detection.py
diff --git a/ARTIFACT_DETECTION_README.md b/ARTIFACT_DETECTION_README.md
new file mode 100644
index 0000000..9207b02
--- /dev/null
+++ b/ARTIFACT_DETECTION_README.md
@@ -0,0 +1,193 @@
+# Artifact Detection Test
+
+## Overview
+This test plays a 1kHz sine wave for a configurable duration (default 60 seconds) and records both channels simultaneously:
+- **Channel 1**: Loopback path (direct audio interface connection)
+- **Channel 2**: DUT/Radio path (through beacon and radio transmission)
+
+The test detects buzzing, clicks, dropouts, and other audio artifacts using multiple configurable algorithms.
+
+## Quick Start
+
+```bash
+python test_artifact_detection.py \
+ --serial-number SN001234 \
+ --software-version abc123 \
+ --comment "Testing new firmware"
+```
+
+## Detection Algorithms
+
+The test uses four configurable detection algorithms (spectral_anomaly is **disabled by default** due to false positives):
+
+### 1. Spectral Anomaly Detection (DISABLED BY DEFAULT)
+- **Status**: ā ļø Currently generates too many false positives - disabled by default
+- **What it detects**: Unexpected frequencies that aren't harmonics of the fundamental tone
+- **Use case**: Buzzing, interference, crosstalk
+- **Configuration**: `threshold_db` - how far below fundamental to search (-60 dB default)
+
+### 2. Amplitude Spike Detection (WORKING)
+- **What it detects**: Sudden changes in signal amplitude (RMS)
+- **Use case**: Clicks, pops, dropouts
+- **Configuration**: `threshold_factor` - number of standard deviations (3.0 default)
+
+### 3. Zero-Crossing Anomaly Detection (WORKING)
+- **What it detects**: Irregular zero-crossing patterns
+- **Use case**: Distortion, clipping, non-linear artifacts
+- **Configuration**: `threshold_factor` - number of standard deviations (2.0 default)
+
+### 4. Energy Variation Detection (WORKING)
+- **What it detects**: Rapid energy changes between time windows
+- **Use case**: Dropouts, level fluctuations, intermittent issues
+- **Configuration**: `threshold_db` - energy change threshold (6.0 dB default)
+
+## Configuration
+
+Edit `config.yaml` to customize the test:
+
+```yaml
+artifact_detection:
+ test_frequency: 1000 # Hz
+ duration: 60.0 # seconds
+ amplitude: 0.5 # 0.0 to 1.0
+ detectors:
+ spectral_anomaly:
+ enabled: true
+ threshold_db: -40
+ amplitude_spikes:
+ enabled: true
+ threshold_factor: 3.0
+ zero_crossing:
+ enabled: true
+ threshold_factor: 2.0
+ energy_variation:
+ enabled: true
+ threshold_db: 6.0
+```
+
+## Command Line Options
+
+- `--serial-number`: Serial number (required)
+- `--software-version`: Git commit hash or version (required)
+- `--comment`: Optional comments about the test
+- `--config`: Path to config file (default: config.yaml)
+- `--duration`: Override duration in seconds
+- `--frequency`: Override test frequency in Hz
+
+## Example: Quick 10-second Test
+
+```bash
+python test_artifact_detection.py \
+ --serial-number SN001234 \
+ --software-version abc123 \
+ --duration 10
+```
+
+## Example: Custom Frequency
+
+```bash
+python test_artifact_detection.py \
+ --serial-number SN001234 \
+ --software-version abc123 \
+ --frequency 440
+```
+
+## Tuning Detection Algorithms
+
+### More Sensitive Detection
+To catch more subtle artifacts, make thresholds stricter:
+
+```yaml
+detectors:
+ spectral_anomaly:
+ threshold_db: -50 # Lower = more sensitive
+ amplitude_spikes:
+ threshold_factor: 2.0 # Lower = more sensitive
+ zero_crossing:
+ threshold_factor: 1.5 # Lower = more sensitive
+ energy_variation:
+ threshold_db: 3.0 # Lower = more sensitive
+```
+
+### Less Sensitive Detection
+To reduce false positives in noisy environments:
+
+```yaml
+detectors:
+ spectral_anomaly:
+ threshold_db: -30 # Higher = less sensitive
+ amplitude_spikes:
+ threshold_factor: 4.0 # Higher = less sensitive
+ zero_crossing:
+ threshold_factor: 3.0 # Higher = less sensitive
+ energy_variation:
+ threshold_db: 10.0 # Higher = less sensitive
+```
+
+### Disable Specific Detectors
+```yaml
+detectors:
+ spectral_anomaly:
+ enabled: false # Turn off this detector
+```
+
+## Output
+
+The test generates:
+1. **YAML results file**: `test_results/{timestamp}_artifact_detection_results.yaml`
+2. **JSON results file**: `test_results/{timestamp}_artifact_detection_results.json`
+3. **Summary plots** (if enabled): `test_results/{timestamp}_artifact_detection/`
+ - Time domain waveforms with artifact markers
+ - Frequency spectrum analysis
+4. **Individual anomaly plots**: `test_results/{timestamp}_artifact_detection/individual_anomalies/`
+ - Each anomaly plotted individually with ~20 periods of context
+ - Detailed view showing exactly what the anomaly looks like
+ - Named by channel, type, and timestamp for easy identification
+
+### Results Structure
+
+```yaml
+metadata:
+ test_id: "20260317_140530"
+ timestamp: "2026-03-17T14:05:30.123456"
+ test_type: "artifact_detection"
+ pcb_version: "v2.1"
+ pcb_revision: "A"
+ software_version: "abc123"
+
+artifact_detection_result:
+ test_frequency_hz: 1000
+ duration_sec: 60.0
+ channel_1_loopback:
+ total_artifacts: 5
+ artifact_rate_per_minute: 5.0
+ artifacts_by_type:
+ spectral_anomaly: 2
+ amplitude_spike: 3
+ channel_2_dut:
+ total_artifacts: 23
+ artifact_rate_per_minute: 23.0
+ artifacts_by_type:
+ spectral_anomaly: 8
+ amplitude_spike: 10
+ energy_variation: 5
+ detector_config: {...}
+```
+
+## Interpreting Results
+
+- **Zero artifacts in both channels**: Excellent signal quality
+- **Same artifacts in both channels**: Likely environmental interference or audio interface issue
+- **More artifacts in Channel 2 (radio path)**: Radio transmission degradation detected
+- **High spectral_anomaly count**: Interference or crosstalk
+- **High amplitude_spike count**: Clicks, pops, or dropouts
+- **High energy_variation count**: Level instability or dropouts
+
+## Comparison with Loopback Baseline
+
+The loopback path (Channel 1) serves as a baseline reference. Any additional artifacts in the radio path (Channel 2) indicate degradation introduced by the radio transmission system.
+
+Expected behavior:
+- Loopback should have minimal artifacts (ideally zero)
+- Radio path may have some artifacts due to transmission
+- Large difference indicates issues in radio hardware/firmware
diff --git a/QUICKSTART.md b/QUICKSTART.md
index 933a8cd..2eeeecc 100644
--- a/QUICKSTART.md
+++ b/QUICKSTART.md
@@ -13,10 +13,9 @@ pip install -r requirements.txt
```bash
python run_test.py \
- --pcb-version "v1.0" \
- --pcb-revision "A" \
+ --serial-number "SN001234" \
--software-version "initial" \
- --notes "First test run"
+ --comment "First test run"
```
**What happens:**
@@ -44,11 +43,11 @@ python view_results.py example_test_result.yaml
Run multiple tests with different metadata:
```bash
-# Test PCB v1.0
-python run_test.py --pcb-version "v1.0" --pcb-revision "A" --software-version "abc123"
+# Test unit SN001234
+python run_test.py --serial-number "SN001234" --software-version "abc123"
-# Test PCB v2.0
-python run_test.py --pcb-version "v2.0" --pcb-revision "A" --software-version "abc123"
+# Test unit SN001235
+python run_test.py --serial-number "SN001235" --software-version "abc123"
# Compare by viewing both YAML files
python view_results.py test_results/20260226_120000_results.yaml
diff --git a/README.md b/README.md
index 7dab643..3714e94 100644
--- a/README.md
+++ b/README.md
@@ -21,10 +21,9 @@ pip install -r requirements.txt
```bash
python run_test.py \
- --pcb-version "v2.1" \
- --pcb-revision "A" \
+ --serial-number "SN001234" \
--software-version "a3f2b1c" \
- --notes "Replaced capacitor C5"
+ --comment "Replaced capacitor C5"
```
### 3. View Results
diff --git a/ai_stuff/prompts.md b/ai_stuff/prompts.md
index 9619b09..8cfbd0c 100644
--- a/ai_stuff/prompts.md
+++ b/ai_stuff/prompts.md
@@ -35,4 +35,20 @@ Play a sine in different frequencies, and for every frequency 5 sec long and do
Dont do fourier yet.
-Do a simple project.
\ No newline at end of file
+Do a simple project.
+
+
+
+
+----
+
+I want you to write a new test:
+Put a 1khz sine into the system and record both channels for x seconds e.g. 60.
+I want you to detect buzzing and other artifacts in the recording.
+Give me a number how many artifacts you found.
+Make the detection algorithm configurable, so we can try different approaches.
+
+Again input it into the audio interface and measure both loopback and radio path like in the other test.
+
+
+
diff --git a/config.yaml b/config.yaml
index 7216964..5d01705 100644
--- a/config.yaml
+++ b/config.yaml
@@ -15,3 +15,26 @@ output:
results_dir: "test_results"
save_plots: true
save_raw_audio: false
+
+artifact_detection:
+ test_frequency: 1000 # Hz - Test tone frequency (for sine wave mode)
+ duration: 60.0 # seconds - Recording duration
+ amplitude: 0.5 # 0.0 to 1.0
+ startup_delay: 0 # seconds - Wait before starting recording to let system settle
+ # Chirp signal parameters (used when --signal-type chirp is specified)
+ chirp_f0: 100 # Hz - Chirp start frequency
+ chirp_f1: 8000 # Hz - Chirp end frequency
+ # NOTE: All detectors skip the first 1 second of recording to avoid startup transients
+ detectors:
+ spectral_anomaly:
+ enabled: false # DISABLED - generates too many false positives, needs better algorithm
+ threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
+ amplitude_spikes:
+ enabled: true
+ threshold_factor: 4.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
+ zero_crossing:
+ enabled: false
+ threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
+ energy_variation:
+ enabled: false
+ threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
diff --git a/example_test_result.yaml b/example_test_result.yaml
index 3104716..ec6f6d7 100644
--- a/example_test_result.yaml
+++ b/example_test_result.yaml
@@ -1,10 +1,9 @@
metadata:
test_id: 20260226_123456
timestamp: '2026-02-26T12:34:56.789012'
- pcb_version: v2.1
- pcb_revision: A
+ serial_number: SN001234
software_version: a3f2b1c8d9e
- notes: Baseline test with new capacitor values
+ comment: Baseline test with new capacitor values
test_results:
- frequency_hz: 100
latency_ms: 2.341
diff --git a/run_test.py b/run_test.py
index fa812c1..1a926dc 100644
--- a/run_test.py
+++ b/run_test.py
@@ -11,10 +11,9 @@ from src.audio_tests import run_single_test, run_latency_test
def main():
parser = argparse.ArgumentParser(description='Run PCB hardware audio tests')
- parser.add_argument('--pcb-version', required=True, help='PCB version (e.g., v2.1)')
- parser.add_argument('--pcb-revision', required=True, help='PCB revision (e.g., A, B, C)')
+ parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
- parser.add_argument('--notes', default='', help='Adjustments or comments about this test')
+ parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
args = parser.parse_args()
@@ -34,8 +33,10 @@ def main():
save_plots = config['output'].get('save_plots', False)
print(f"Starting audio test run: {test_id}")
- print(f"PCB: {args.pcb_version} Rev {args.pcb_revision}")
+ print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
+ if args.comment:
+ print(f"Comment: {args.comment}")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 60)
@@ -71,10 +72,9 @@ def main():
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
- 'pcb_version': args.pcb_version,
- 'pcb_revision': args.pcb_revision,
+ 'serial_number': args.serial_number,
'software_version': args.software_version,
- 'notes': args.notes
+ 'comment': args.comment
},
'latency_test': latency_stats,
'test_results': test_results
diff --git a/src/audio_tests.py b/src/audio_tests.py
index b30995c..319df13 100644
--- a/src/audio_tests.py
+++ b/src/audio_tests.py
@@ -279,3 +279,416 @@ def plot_latency_test(channel_1: np.ndarray, channel_2: np.ndarray, correlation:
plot_file = output_dir / 'latency_chirp_analysis.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
+
+
+def detect_artifacts_spectral_anomaly(signal_data: np.ndarray, sample_rate: int,
+ fundamental_freq: float, threshold_db: float = -60) -> List[Dict]:
+ artifacts = []
+ window_size = int(sample_rate * 0.5)
+ hop_size = int(sample_rate * 0.25)
+
+ for i in range(0, len(signal_data) - window_size, hop_size):
+ segment = signal_data[i:i+window_size]
+ fft = np.fft.rfft(segment)
+ freqs = np.fft.rfftfreq(len(segment), 1/sample_rate)
+ power_spectrum_db = 20 * np.log10(np.abs(fft) + 1e-10)
+
+ fundamental_idx = np.argmin(np.abs(freqs - fundamental_freq))
+ fundamental_power_db = power_spectrum_db[fundamental_idx]
+
+ expected_harmonics = set()
+ harmonic_tolerance_bins = 3
+ for n in range(1, 11):
+ harmonic_freq = n * fundamental_freq
+ if harmonic_freq < sample_rate / 2:
+ harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq))
+ for offset in range(-harmonic_tolerance_bins, harmonic_tolerance_bins + 1):
+ if 0 <= harmonic_idx + offset < len(freqs):
+ expected_harmonics.add(harmonic_idx + offset)
+
+ noise_floor_db = np.percentile(power_spectrum_db[10:], 10)
+
+ unexpected_peaks = []
+ for idx in range(10, len(power_spectrum_db)):
+ if idx not in expected_harmonics:
+ if power_spectrum_db[idx] > noise_floor_db + abs(threshold_db):
+ unexpected_peaks.append((freqs[idx], power_spectrum_db[idx]))
+
+ if len(unexpected_peaks) >= 5:
+ artifacts.append({
+ 'type': 'spectral_anomaly',
+ 'time_sec': i / sample_rate,
+ 'unexpected_frequencies': unexpected_peaks[:10],
+ 'count': len(unexpected_peaks)
+ })
+
+ return artifacts
+
+
+def detect_artifacts_amplitude_spikes(signal_data: np.ndarray, sample_rate: int,
+ threshold_factor: float = 3.0) -> List[Dict]:
+ artifacts = []
+
+ skip_samples = int(sample_rate * 1.0)
+ if len(signal_data) <= skip_samples:
+ return artifacts
+
+ signal_trimmed = signal_data[skip_samples:]
+
+ envelope = np.abs(signal_trimmed)
+
+ window_size = int(sample_rate * 0.01)
+ if window_size % 2 == 0:
+ window_size += 1
+
+ from scipy.ndimage import uniform_filter1d
+ envelope_smooth = uniform_filter1d(envelope, size=window_size, mode='reflect')
+
+ median_env = np.median(envelope_smooth)
+ mad = np.median(np.abs(envelope_smooth - median_env))
+
+ if mad == 0:
+ return artifacts
+
+ threshold = median_env + threshold_factor * mad * 1.4826
+
+ spike_indices = np.where(envelope_smooth > threshold)[0]
+
+ if len(spike_indices) == 0:
+ return artifacts
+
+ groups = []
+ current_group = [spike_indices[0]]
+
+ for idx in spike_indices[1:]:
+ if idx - current_group[-1] <= int(sample_rate * 0.05):
+ current_group.append(idx)
+ else:
+ groups.append(current_group)
+ current_group = [idx]
+ groups.append(current_group)
+
+ for group in groups:
+ peak_idx = group[np.argmax(envelope_smooth[group])]
+ time_sec = (peak_idx + skip_samples) / sample_rate
+ peak_value = envelope_smooth[peak_idx]
+
+ artifacts.append({
+ 'type': 'amplitude_spike',
+ 'time_sec': float(time_sec),
+ 'peak_amplitude': float(peak_value),
+ 'median_amplitude': float(median_env),
+ 'deviation_factor': float((peak_value - median_env) / (mad * 1.4826)) if mad > 0 else 0
+ })
+
+ return artifacts
+
+
+def detect_artifacts_zero_crossing(signal_data: np.ndarray, sample_rate: int,
+ threshold_factor: float = 2.0) -> List[Dict]:
+ artifacts = []
+
+ skip_samples = int(sample_rate * 1.0)
+ if len(signal_data) <= skip_samples:
+ return artifacts
+
+ window_size = int(sample_rate * 0.1)
+ hop_size = int(sample_rate * 0.05)
+
+ zcr_values = []
+ for i in range(skip_samples, len(signal_data) - window_size, hop_size):
+ segment = signal_data[i:i+window_size]
+ zero_crossings = np.sum(np.abs(np.diff(np.sign(segment)))) / 2
+ zcr = zero_crossings / len(segment)
+ zcr_values.append((i, zcr))
+
+ if not zcr_values:
+ return artifacts
+
+ zcr_array = np.array([z[1] for z in zcr_values])
+ median_zcr = np.median(zcr_array)
+ std_zcr = np.std(zcr_array)
+
+ for i, zcr in zcr_values:
+ if std_zcr > 0 and abs(zcr - median_zcr) > threshold_factor * std_zcr:
+ artifacts.append({
+ 'type': 'zero_crossing_anomaly',
+ 'time_sec': i / sample_rate,
+ 'zcr_value': float(zcr),
+ 'median_zcr': float(median_zcr),
+ 'deviation_factor': float(abs(zcr - median_zcr) / std_zcr)
+ })
+
+ return artifacts
+
+
+def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
+ threshold_db: float = 6.0) -> List[Dict]:
+ artifacts = []
+
+ skip_samples = int(sample_rate * 1.0)
+ if len(signal_data) <= skip_samples:
+ return artifacts
+
+ window_size = int(sample_rate * 0.1)
+ hop_size = int(sample_rate * 0.05)
+
+ energy_values = []
+ for i in range(skip_samples, len(signal_data) - window_size, hop_size):
+ segment = signal_data[i:i+window_size]
+ energy = np.sum(segment**2)
+ energy_values.append((i, energy))
+
+ for idx in range(1, len(energy_values)):
+ prev_energy = energy_values[idx-1][1]
+ curr_energy = energy_values[idx][1]
+
+ if prev_energy > 0 and curr_energy > 0:
+ energy_change_db = 10 * np.log10(curr_energy / prev_energy)
+
+ if abs(energy_change_db) > threshold_db:
+ artifacts.append({
+ 'type': 'energy_variation',
+ 'time_sec': energy_values[idx][0] / sample_rate,
+ 'energy_change_db': float(energy_change_db),
+ 'prev_energy': float(prev_energy),
+ 'curr_energy': float(curr_energy)
+ })
+
+ return artifacts
+
+
+def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundamental_freq: float,
+ detector_config: Dict) -> Dict:
+ all_artifacts = []
+
+ if detector_config.get('spectral_anomaly', {}).get('enabled', True):
+ threshold = detector_config.get('spectral_anomaly', {}).get('threshold_db', -60)
+ artifacts = detect_artifacts_spectral_anomaly(signal_data, sample_rate, fundamental_freq, threshold)
+ all_artifacts.extend(artifacts)
+
+ if detector_config.get('amplitude_spikes', {}).get('enabled', True):
+ threshold = detector_config.get('amplitude_spikes', {}).get('threshold_factor', 3.0)
+ artifacts = detect_artifacts_amplitude_spikes(signal_data, sample_rate, threshold)
+ all_artifacts.extend(artifacts)
+
+ if detector_config.get('zero_crossing', {}).get('enabled', True):
+ threshold = detector_config.get('zero_crossing', {}).get('threshold_factor', 2.0)
+ artifacts = detect_artifacts_zero_crossing(signal_data, sample_rate, threshold)
+ all_artifacts.extend(artifacts)
+
+ if detector_config.get('energy_variation', {}).get('enabled', True):
+ threshold = detector_config.get('energy_variation', {}).get('threshold_db', 6.0)
+ artifacts = detect_artifacts_energy_variation(signal_data, sample_rate, threshold)
+ all_artifacts.extend(artifacts)
+
+ artifact_summary = {
+ 'total_count': len(all_artifacts),
+ 'by_type': {},
+ 'artifacts': all_artifacts
+ }
+
+ for artifact in all_artifacts:
+ artifact_type = artifact['type']
+ if artifact_type not in artifact_summary['by_type']:
+ artifact_summary['by_type'][artifact_type] = 0
+ artifact_summary['by_type'][artifact_type] += 1
+
+ return artifact_summary
+
+
+def plot_individual_anomaly(signal_data: np.ndarray, artifact: Dict, artifact_idx: int,
+ channel_name: str, frequency: float, sample_rate: int,
+ output_dir: Path):
+ periods_to_show = 20
+ period_samples = int(sample_rate / frequency)
+ total_samples = periods_to_show * period_samples
+
+ artifact_time = artifact['time_sec']
+ artifact_sample = int(artifact_time * sample_rate)
+
+ start_sample = max(0, artifact_sample - total_samples // 2)
+ end_sample = min(len(signal_data), artifact_sample + total_samples // 2)
+
+ if end_sample - start_sample < total_samples:
+ if start_sample == 0:
+ end_sample = min(len(signal_data), start_sample + total_samples)
+ else:
+ start_sample = max(0, end_sample - total_samples)
+
+ segment = signal_data[start_sample:end_sample]
+ time_ms = (np.arange(len(segment)) + start_sample) / sample_rate * 1000
+
+ fig, ax = plt.subplots(1, 1, figsize=(14, 6))
+
+ ax.plot(time_ms, segment, linewidth=1.0, color='blue', alpha=0.8)
+ ax.axvline(x=artifact_time * 1000, color='red', linestyle='--', linewidth=2,
+ label=f'Anomaly at {artifact_time:.3f}s', alpha=0.7)
+
+ ax.set_xlabel('Time (ms)', fontsize=11)
+ ax.set_ylabel('Amplitude', fontsize=11)
+
+ artifact_type = artifact['type'].replace('_', ' ').title()
+ ax.set_title(f'{channel_name} - {artifact_type} #{artifact_idx+1} (~{periods_to_show} periods @ {frequency}Hz)',
+ fontsize=12, fontweight='bold')
+
+ info_text = f"Type: {artifact_type}\nTime: {artifact_time:.3f}s"
+ if 'deviation_factor' in artifact:
+ info_text += f"\nDeviation: {artifact['deviation_factor']:.2f}Ļ"
+ if 'energy_change_db' in artifact:
+ info_text += f"\nEnergy Change: {artifact['energy_change_db']:.2f} dB"
+ if 'count' in artifact and artifact['type'] == 'spectral_anomaly':
+ info_text += f"\nUnexpected Peaks: {artifact['count']}"
+
+ ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
+ fontsize=9, verticalalignment='top',
+ bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
+
+ ax.legend(loc='upper right')
+ ax.grid(True, alpha=0.3)
+
+ plt.tight_layout()
+
+ safe_type = artifact['type'].replace('_', '-')
+ plot_file = output_dir / f'{channel_name.lower().replace(" ", "_")}_anomaly_{artifact_idx+1:04d}_{safe_type}_{artifact_time:.3f}s.png'
+ plt.savefig(plot_file, dpi=150, bbox_inches='tight')
+ plt.close()
+
+
+def plot_artifact_detection(channel_1: np.ndarray, channel_2: np.ndarray,
+ artifacts_ch1: Dict, artifacts_ch2: Dict,
+ frequency: float, sample_rate: int, output_dir: Path):
+ fig, axes = plt.subplots(2, 2, figsize=(16, 10))
+
+ time = np.arange(len(channel_1)) / sample_rate
+
+ axes[0, 0].plot(time, channel_1, alpha=0.7, linewidth=0.5)
+ axes[0, 0].set_xlabel('Time (s)')
+ axes[0, 0].set_ylabel('Amplitude')
+ axes[0, 0].set_title(f'Channel 1 (Loopback) - {artifacts_ch1["total_count"]} artifacts')
+ axes[0, 0].grid(True, alpha=0.3)
+
+ for artifact in artifacts_ch1['artifacts']:
+ axes[0, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
+
+ axes[1, 0].plot(time, channel_2, alpha=0.7, linewidth=0.5)
+ axes[1, 0].set_xlabel('Time (s)')
+ axes[1, 0].set_ylabel('Amplitude')
+ axes[1, 0].set_title(f'Channel 2 (DUT/Radio) - {artifacts_ch2["total_count"]} artifacts')
+ axes[1, 0].grid(True, alpha=0.3)
+
+ for artifact in artifacts_ch2['artifacts']:
+ axes[1, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
+
+ fft_ch1 = np.fft.rfft(channel_1)
+ fft_ch2 = np.fft.rfft(channel_2)
+ freqs = np.fft.rfftfreq(len(channel_1), 1/sample_rate)
+
+ axes[0, 1].plot(freqs, 20*np.log10(np.abs(fft_ch1) + 1e-10), linewidth=0.5)
+ axes[0, 1].set_xlabel('Frequency (Hz)')
+ axes[0, 1].set_ylabel('Magnitude (dB)')
+ axes[0, 1].set_title('Channel 1 Spectrum')
+ axes[0, 1].set_xlim(0, min(10000, sample_rate/2))
+ axes[0, 1].grid(True, alpha=0.3)
+
+ axes[1, 1].plot(freqs, 20*np.log10(np.abs(fft_ch2) + 1e-10), linewidth=0.5)
+ axes[1, 1].set_xlabel('Frequency (Hz)')
+ axes[1, 1].set_ylabel('Magnitude (dB)')
+ axes[1, 1].set_title('Channel 2 Spectrum')
+ axes[1, 1].set_xlim(0, min(10000, sample_rate/2))
+ axes[1, 1].grid(True, alpha=0.3)
+
+ plt.tight_layout()
+ plot_file = output_dir / f'artifact_detection_{frequency}Hz.png'
+ plt.savefig(plot_file, dpi=150, bbox_inches='tight')
+ plt.close()
+
+
+def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
+ import time
+
+ sample_rate = config['audio']['sample_rate']
+ duration = config['artifact_detection']['duration']
+ frequency = config['artifact_detection']['test_frequency']
+ amplitude = config['artifact_detection']['amplitude']
+ device_name = config['audio']['device_name']
+ channels = config['audio']['channels']
+ detector_config = config['artifact_detection']['detectors']
+ startup_delay = config['artifact_detection'].get('startup_delay', 10)
+ signal_type = config['artifact_detection'].get('signal_type', 'sine')
+
+ device_ids = find_audio_device(device_name)
+
+ if startup_delay > 0:
+ print(f"Waiting {startup_delay} seconds for system to settle...")
+ time.sleep(startup_delay)
+ print("Starting recording...")
+
+ if signal_type == 'chirp':
+ f0 = config['artifact_detection'].get('chirp_f0', 100)
+ f1 = config['artifact_detection'].get('chirp_f1', 8000)
+ tone = generate_chirp(duration, sample_rate, f0=f0, f1=f1, amplitude=amplitude)
+ frequency = (f0 + f1) / 2
+ recording = play_and_record(tone, sample_rate, device_ids, channels)
+ elif signal_type == 'silent':
+ frequency = 1000
+ recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
+ channels=channels, device=device_ids[0], blocking=True)
+ else:
+ tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
+ recording = play_and_record(tone, sample_rate, device_ids, channels)
+
+ channel_1 = recording[:, 0]
+ channel_2 = recording[:, 1]
+
+ artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
+ artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
+
+ if save_plots and output_dir:
+ plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
+ frequency, sample_rate, output_dir)
+
+ anomalies_dir = output_dir / 'individual_anomalies'
+ anomalies_dir.mkdir(exist_ok=True)
+
+ print(f"\nPlotting individual anomalies to: {anomalies_dir}")
+
+ for idx, artifact in enumerate(artifacts_ch1['artifacts']):
+ plot_individual_anomaly(channel_1, artifact, idx, 'Channel 1 Loopback',
+ frequency, sample_rate, anomalies_dir)
+
+ for idx, artifact in enumerate(artifacts_ch2['artifacts']):
+ plot_individual_anomaly(channel_2, artifact, idx, 'Channel 2 DUT',
+ frequency, sample_rate, anomalies_dir)
+
+ total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts'])
+ if total_anomaly_plots > 0:
+ print(f"ā Generated {total_anomaly_plots} individual anomaly plots")
+
+ result = {
+ 'signal_type': signal_type,
+ 'duration_sec': float(duration),
+ 'channel_1_loopback': {
+ 'total_artifacts': artifacts_ch1['total_count'],
+ 'artifacts_by_type': artifacts_ch1['by_type'],
+ 'artifact_rate_per_minute': float(artifacts_ch1['total_count'] / duration * 60)
+ },
+ 'channel_2_dut': {
+ 'total_artifacts': artifacts_ch2['total_count'],
+ 'artifacts_by_type': artifacts_ch2['by_type'],
+ 'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60)
+ },
+ 'detector_config': detector_config
+ }
+
+ if signal_type == 'chirp':
+ f0 = config['artifact_detection'].get('chirp_f0', 100)
+ f1 = config['artifact_detection'].get('chirp_f1', 8000)
+ result['chirp_f0_hz'] = int(f0)
+ result['chirp_f1_hz'] = int(f1)
+ elif signal_type == 'silent':
+ result['note'] = 'Silent mode - no playback, noise floor measurement'
+ else:
+ result['test_frequency_hz'] = int(frequency)
+
+ return result
diff --git a/test_artifact_detection.py b/test_artifact_detection.py
new file mode 100755
index 0000000..6db2256
--- /dev/null
+++ b/test_artifact_detection.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python3
+import argparse
+import yaml
+from datetime import datetime
+from pathlib import Path
+import sys
+import json
+
+sys.path.insert(0, str(Path(__file__).parent))
+from src.audio_tests import run_artifact_detection_test
+
+
+def main():
+ parser = argparse.ArgumentParser(description='Run artifact detection test on audio loopback and radio path')
+ parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
+ parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
+ parser.add_argument('--comment', default='', help='Comments about this test')
+ parser.add_argument('--config', default='config.yaml', help='Path to config file')
+ parser.add_argument('--duration', type=float, help='Override recording duration in seconds (default from config)')
+ parser.add_argument('--frequency', type=float, help='Override test frequency in Hz (default from config)')
+ parser.add_argument('--signal-type', choices=['sine', 'chirp', 'silent'], default='sine',
+ help='Signal type: sine (single frequency), chirp (frequency sweep), or silent (no signal)')
+
+ args = parser.parse_args()
+
+ with open(args.config, 'r') as f:
+ config = yaml.safe_load(f)
+
+ if args.duration:
+ config['artifact_detection']['duration'] = args.duration
+ if args.frequency:
+ config['artifact_detection']['test_frequency'] = args.frequency
+
+ config['artifact_detection']['signal_type'] = args.signal_type
+
+ timestamp = datetime.now()
+ test_id = timestamp.strftime('%Y%m%d_%H%M%S')
+
+ results_dir = Path(config['output']['results_dir'])
+ results_dir.mkdir(exist_ok=True)
+
+ test_output_dir = results_dir / f"{test_id}_artifact_detection"
+ test_output_dir.mkdir(exist_ok=True)
+
+ save_plots = config['output'].get('save_plots', False)
+
+ print("=" * 70)
+ print("ARTIFACT DETECTION TEST")
+ print("=" * 70)
+ print(f"Test ID: {test_id}")
+ print(f"Serial Number: {args.serial_number}")
+ print(f"Software: {args.software_version}")
+ if args.comment:
+ print(f"Comment: {args.comment}")
+ print(f"Duration: {config['artifact_detection']['duration']} seconds")
+ signal_type = config['artifact_detection'].get('signal_type', 'sine')
+ if signal_type == 'sine':
+ print(f"Signal Type: Sine wave @ {config['artifact_detection']['test_frequency']} Hz")
+ elif signal_type == 'chirp':
+ print(f"Signal Type: Chirp (100 Hz - 8000 Hz)")
+ else:
+ print(f"Signal Type: Silent (no playback - noise floor measurement)")
+ if save_plots:
+ print(f"Plots will be saved to: {test_output_dir}")
+ print("-" * 70)
+
+ print("\nDetection Algorithms:")
+ for detector_name, detector_settings in config['artifact_detection']['detectors'].items():
+ status = "ENABLED" if detector_settings.get('enabled', False) else "DISABLED"
+ print(f" - {detector_name}: {status}")
+ if detector_settings.get('enabled', False):
+ for param, value in detector_settings.items():
+ if param != 'enabled':
+ print(f" {param}: {value}")
+
+ print("\n" + "=" * 70)
+ signal_type = config['artifact_detection'].get('signal_type', 'sine')
+ if signal_type == 'sine':
+ freq = config['artifact_detection']['test_frequency']
+ print(f"STARTING TEST - Playing {freq}Hz sine wave and recording both channels...")
+ elif signal_type == 'chirp':
+ print("STARTING TEST - Playing chirp signal (100-8000Hz) and recording both channels...")
+ else:
+ print("STARTING TEST - Recording silence (no playback)...")
+ print("=" * 70)
+ print("\nChannel 1: Loopback path (direct audio interface loopback)")
+ print("Channel 2: DUT/Radio path (through beacon and radio transmission)")
+ print()
+
+ try:
+ result = run_artifact_detection_test(config, save_plots=save_plots, output_dir=test_output_dir)
+
+ print("\n" + "=" * 70)
+ print("TEST COMPLETE - RESULTS")
+ print("=" * 70)
+
+ signal_type = result.get('signal_type', 'sine')
+ if signal_type == 'chirp':
+ print(f"\nš Signal: Chirp {result['chirp_f0_hz']} Hz ā {result['chirp_f1_hz']} Hz")
+ elif signal_type == 'silent':
+ print(f"\nš Signal: Silent (no playback - noise floor measurement)")
+ else:
+ print(f"\nš Test Frequency: {result['test_frequency_hz']} Hz")
+ print(f"ā±ļø Duration: {result['duration_sec']} seconds")
+
+ print("\nš CHANNEL 1 (LOOPBACK PATH):")
+ print(f" Total Artifacts: {result['channel_1_loopback']['total_artifacts']}")
+ print(f" Artifact Rate: {result['channel_1_loopback']['artifact_rate_per_minute']:.2f} per minute")
+ if result['channel_1_loopback']['artifacts_by_type']:
+ print(" By Type:")
+ for artifact_type, count in result['channel_1_loopback']['artifacts_by_type'].items():
+ print(f" - {artifact_type}: {count}")
+
+ print("\nš» CHANNEL 2 (DUT/RADIO PATH):")
+ print(f" Total Artifacts: {result['channel_2_dut']['total_artifacts']}")
+ print(f" Artifact Rate: {result['channel_2_dut']['artifact_rate_per_minute']:.2f} per minute")
+ if result['channel_2_dut']['artifacts_by_type']:
+ print(" By Type:")
+ for artifact_type, count in result['channel_2_dut']['artifacts_by_type'].items():
+ print(f" - {artifact_type}: {count}")
+
+ ch1_count = result['channel_1_loopback']['total_artifacts']
+ ch2_count = result['channel_2_dut']['total_artifacts']
+
+ if ch2_count > ch1_count:
+ delta = ch2_count - ch1_count
+ print(f"\nā ļø DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
+ elif ch1_count == ch2_count == 0:
+ print("\nā
EXCELLENT: No artifacts detected in either path!")
+ else:
+ print(f"\nā¹ļø Loopback baseline: {ch1_count} artifacts")
+
+ except Exception as e:
+ print(f"\nā ERROR: {e}")
+ import traceback
+ traceback.print_exc()
+ result = {
+ 'error': str(e),
+ 'test_frequency_hz': config['artifact_detection']['test_frequency'],
+ 'duration_sec': config['artifact_detection']['duration']
+ }
+
+ output_data = {
+ 'metadata': {
+ 'test_id': test_id,
+ 'timestamp': timestamp.isoformat(),
+ 'serial_number': args.serial_number,
+ 'software_version': args.software_version,
+ 'comment': args.comment
+ },
+ 'artifact_detection_result': result
+ }
+
+ output_file = results_dir / f"{test_id}_artifact_detection_results.yaml"
+ with open(output_file, 'w') as f:
+ yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
+
+ json_output_file = results_dir / f"{test_id}_artifact_detection_results.json"
+ with open(json_output_file, 'w') as f:
+ json.dump(output_data, f, indent=2)
+
+ print("\n" + "=" * 70)
+ print("ā
Results saved to:")
+ print(f" YAML: {output_file}")
+ print(f" JSON: {json_output_file}")
+ if save_plots:
+ print(f" Summary plots: {test_output_dir}/")
+ print(f" Individual anomaly plots: {test_output_dir}/individual_anomalies/")
+ print("=" * 70)
+
+
+if __name__ == '__main__':
+ main()