Compare commits

...

2 Commits

Author SHA1 Message Date
Pbopbo
5d5a131b77 Refactoring and minor improvents. 2026-03-23 13:52:27 +01:00
Pbopbo
39bcd072c0 Adds artifact test. 2026-03-18 10:41:46 +01:00
9 changed files with 1019 additions and 90 deletions

View File

@@ -0,0 +1,193 @@
# Artifact Detection Test
## Overview
This test plays a 1kHz sine wave for a configurable duration (default 60 seconds) and records both channels simultaneously:
- **Channel 1**: Loopback path (direct audio interface connection)
- **Channel 2**: DUT/Radio path (through beacon and radio transmission)
The test detects buzzing, clicks, dropouts, and other audio artifacts using multiple configurable algorithms.
## Quick Start
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--comment "Testing new firmware"
```
## Detection Algorithms
The test uses four configurable detection algorithms (spectral_anomaly is **disabled by default** due to false positives):
### 1. Spectral Anomaly Detection (DISABLED BY DEFAULT)
- **Status**: ⚠️ Currently generates too many false positives - disabled by default
- **What it detects**: Unexpected frequencies that aren't harmonics of the fundamental tone
- **Use case**: Buzzing, interference, crosstalk
- **Configuration**: `threshold_db` - how far below fundamental to search (-60 dB default)
### 2. Amplitude Spike Detection (WORKING)
- **What it detects**: Sudden changes in signal amplitude (RMS)
- **Use case**: Clicks, pops, dropouts
- **Configuration**: `threshold_factor` - number of standard deviations (3.0 default)
### 3. Zero-Crossing Anomaly Detection (WORKING)
- **What it detects**: Irregular zero-crossing patterns
- **Use case**: Distortion, clipping, non-linear artifacts
- **Configuration**: `threshold_factor` - number of standard deviations (2.0 default)
### 4. Energy Variation Detection (WORKING)
- **What it detects**: Rapid energy changes between time windows
- **Use case**: Dropouts, level fluctuations, intermittent issues
- **Configuration**: `threshold_db` - energy change threshold (6.0 dB default)
## Configuration
Edit `config.yaml` to customize the test:
```yaml
artifact_detection:
test_frequency: 1000 # Hz
duration: 60.0 # seconds
amplitude: 0.5 # 0.0 to 1.0
detectors:
spectral_anomaly:
enabled: true
threshold_db: -40
amplitude_spikes:
enabled: true
threshold_factor: 3.0
zero_crossing:
enabled: true
threshold_factor: 2.0
energy_variation:
enabled: true
threshold_db: 6.0
```
## Command Line Options
- `--serial-number`: Serial number (required)
- `--software-version`: Git commit hash or version (required)
- `--comment`: Optional comments about the test
- `--config`: Path to config file (default: config.yaml)
- `--duration`: Override duration in seconds
- `--frequency`: Override test frequency in Hz
## Example: Quick 10-second Test
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--duration 10
```
## Example: Custom Frequency
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--frequency 440
```
## Tuning Detection Algorithms
### More Sensitive Detection
To catch more subtle artifacts, make thresholds stricter:
```yaml
detectors:
spectral_anomaly:
threshold_db: -50 # Lower = more sensitive
amplitude_spikes:
threshold_factor: 2.0 # Lower = more sensitive
zero_crossing:
threshold_factor: 1.5 # Lower = more sensitive
energy_variation:
threshold_db: 3.0 # Lower = more sensitive
```
### Less Sensitive Detection
To reduce false positives in noisy environments:
```yaml
detectors:
spectral_anomaly:
threshold_db: -30 # Higher = less sensitive
amplitude_spikes:
threshold_factor: 4.0 # Higher = less sensitive
zero_crossing:
threshold_factor: 3.0 # Higher = less sensitive
energy_variation:
threshold_db: 10.0 # Higher = less sensitive
```
### Disable Specific Detectors
```yaml
detectors:
spectral_anomaly:
enabled: false # Turn off this detector
```
## Output
The test generates:
1. **YAML results file**: `test_results/{timestamp}_artifact_detection_results.yaml`
2. **JSON results file**: `test_results/{timestamp}_artifact_detection_results.json`
3. **Summary plots** (if enabled): `test_results/{timestamp}_artifact_detection/`
- Time domain waveforms with artifact markers
- Frequency spectrum analysis
4. **Individual anomaly plots**: `test_results/{timestamp}_artifact_detection/individual_anomalies/`
- Each anomaly plotted individually with ~20 periods of context
- Detailed view showing exactly what the anomaly looks like
- Named by channel, type, and timestamp for easy identification
### Results Structure
```yaml
metadata:
test_id: "20260317_140530"
timestamp: "2026-03-17T14:05:30.123456"
test_type: "artifact_detection"
pcb_version: "v2.1"
pcb_revision: "A"
software_version: "abc123"
artifact_detection_result:
test_frequency_hz: 1000
duration_sec: 60.0
channel_1_loopback:
total_artifacts: 5
artifact_rate_per_minute: 5.0
artifacts_by_type:
spectral_anomaly: 2
amplitude_spike: 3
channel_2_dut:
total_artifacts: 23
artifact_rate_per_minute: 23.0
artifacts_by_type:
spectral_anomaly: 8
amplitude_spike: 10
energy_variation: 5
detector_config: {...}
```
## Interpreting Results
- **Zero artifacts in both channels**: Excellent signal quality
- **Same artifacts in both channels**: Likely environmental interference or audio interface issue
- **More artifacts in Channel 2 (radio path)**: Radio transmission degradation detected
- **High spectral_anomaly count**: Interference or crosstalk
- **High amplitude_spike count**: Clicks, pops, or dropouts
- **High energy_variation count**: Level instability or dropouts
## Comparison with Loopback Baseline
The loopback path (Channel 1) serves as a baseline reference. Any additional artifacts in the radio path (Channel 2) indicate degradation introduced by the radio transmission system.
Expected behavior:
- Loopback should have minimal artifacts (ideally zero)
- Radio path may have some artifacts due to transmission
- Large difference indicates issues in radio hardware/firmware

View File

@@ -12,19 +12,18 @@ pip install -r requirements.txt
### 1. Run Your First Test
```bash
python run_test.py \
--pcb-version "v1.0" \
--pcb-revision "A" \
python test_latency.py \
--serial-number "SN001234" \
--software-version "initial" \
--notes "First test run"
--comment "First test run"
```
**What happens:**
- Auto-detects your Scarlett audio interface
- Plays test tones at 7 frequencies (100 Hz to 8 kHz)
- Plays chirp signal and measures latency (5 measurements by default)
- Records input/output on both channels
- Calculates latency, THD, and SNR
- Saves results to `test_results/YYYYMMDD_HHMMSS_results.yaml`
- Calculates average, min, max, and standard deviation of latency
- Saves results to `test_results/YYYYMMDD_HHMMSS_latency/YYYYMMDD_HHMMSS_latency_results.yaml`
### 2. View Results
@@ -39,35 +38,33 @@ python view_results.py test_results/20260226_123456_results.yaml
python view_results.py example_test_result.yaml
```
### 3. Compare Different PCB Versions
### 3. Compare Different Units
Run multiple tests with different metadata:
```bash
# Test PCB v1.0
python run_test.py --pcb-version "v1.0" --pcb-revision "A" --software-version "abc123"
# Test unit SN001234
python test_latency.py --serial-number "SN001234" --software-version "abc123"
# Test PCB v2.0
python run_test.py --pcb-version "v2.0" --pcb-revision "A" --software-version "abc123"
# Test unit SN001235 with more measurements
python test_latency.py --serial-number "SN001235" --software-version "abc123" --measurements 10
# Compare by viewing both YAML files
python view_results.py test_results/20260226_120000_results.yaml
python view_results.py test_results/20260226_130000_results.yaml
python view_results.py test_results/20260226_120000_latency/20260226_120000_latency_results.yaml
python view_results.py test_results/20260226_130000_latency/20260226_130000_latency_results.yaml
```
## Understanding the Output
Each test produces metrics at 7 frequencies:
Each latency test produces:
- **Latency (ms)**: Delay between channels (should be near 0 for loopback)
- **THD Input (%)**: Distortion in channel 1 (lower is better)
- **THD Output (%)**: Distortion in channel 2 (lower is better)
- **SNR Input (dB)**: Signal quality in channel 1 (higher is better)
- **SNR Output (dB)**: Signal quality in channel 2 (higher is better)
- **Average Latency (ms)**: Mean delay across all measurements
- **Min/Max Latency (ms)**: Range of measured values
- **Standard Deviation (ms)**: Consistency of measurements (lower is better)
**Good values:**
- THD: < 0.1% (< 0.01% is excellent)
- SNR: > 80 dB (> 90 dB is excellent)
- Latency: Depends on your system (audio interface typically < 10ms)
- Standard Deviation: < 1ms (consistent measurements)
- Latency: < 5 ms for loopback
## Configuration
@@ -75,11 +72,21 @@ Each test produces metrics at 7 frequencies:
Edit `config.yaml` to customize test parameters:
```yaml
test_tones:
frequencies: [1000] # Test only 1 kHz
duration: 3.0 # Shorter test (3 seconds)
audio:
sample_rate: 44100
channels: 2
device_name: "Scarlett"
output:
results_dir: "test_results"
save_plots: true
```
```bash
python -c "import sounddevice as sd; print(sd.query_devices())"
```
Update `device_name` in `config.yaml` to match your device.
## Troubleshooting
**Audio device not found:**

View File

@@ -4,8 +4,8 @@ Simple Python-based testing system for PCB audio hardware validation.
## Features
- **Automated Testing**: Latency, THD, and SNR measurements across multiple frequencies
- **Metadata Tracking**: PCB version, revision, software version, timestamps, notes
- **Automated Testing**: Latency measurements with configurable iterations
- **Metadata Tracking**: Serial number, software version, timestamps, comments
- **YAML Output**: Human-readable structured results
- **Simple Workflow**: Run tests, view results, compare versions
@@ -19,12 +19,20 @@ pip install -r requirements.txt
### 2. Run a Test
**Latency Test:**
```bash
python run_test.py \
--pcb-version "v2.1" \
--pcb-revision "A" \
python test_latency.py \
--serial-number "SN001234" \
--software-version "a3f2b1c" \
--notes "Replaced capacitor C5"
--comment "Replaced capacitor C5"
```
**Artifact Detection Test:**
```bash
python test_artifact_detection.py \
--serial-number "SN001234" \
--software-version "a3f2b1c" \
--comment "Baseline test"
```
### 3. View Results
@@ -43,10 +51,8 @@ python view_results.py test_results/*.yaml | tail -1
## Test Metrics
- **Latency**: Round-trip delay between input and output channels (ms)
- **THD**: Total Harmonic Distortion for input and output (%)
- **SNR**: Signal-to-Noise Ratio for input and output (dB)
Tests run at multiple frequencies: 100 Hz, 250 Hz, 500 Hz, 1 kHz, 2 kHz, 4 kHz, 8 kHz
- Average, minimum, maximum, and standard deviation across measurements
- Uses chirp signal for accurate cross-correlation measurement
## Output Format
@@ -56,27 +62,35 @@ Results are saved as YAML files in `test_results/`:
metadata:
test_id: 20260226_123456
timestamp: '2026-02-26T12:34:56.789012'
pcb_version: v2.1
pcb_revision: A
serial_number: SN001234
software_version: a3f2b1c
notes: Replaced capacitor C5
test_results:
- frequency_hz: 1000
latency_ms: 2.345
thd_input_percent: 0.012
thd_output_percent: 0.034
snr_input_db: 92.5
snr_output_db: 89.2
comment: Replaced capacitor C5
latency_test:
avg: 2.345
min: 2.201
max: 2.489
std: 0.087
```
## Configuration
Edit `config.yaml` to customize:
- Audio device settings
- Test frequencies
- Test duration
- Output options
```yaml
audio:
sample_rate: 44100
channels: 2
device_name: "Scarlett"
output:
results_dir: "test_results"
save_plots: true
```
The system auto-detects Focusrite Scarlett audio interfaces.
## Hardware Setup
```
@@ -84,19 +98,19 @@ Laptop <-> Audio Interface (Scarlett) <-> DUT <-> Audio Interface (Scarlett) <->
Output Channels 1&2 Input Channels 1&2
```
The system auto-detects Focusrite Scarlett audio interfaces.
## File Structure
```
closed_loop_audio_test_suite/
├── config.yaml # Test configuration
├── run_test.py # Main test runner
├── test_latency.py # Latency test runner
├── test_artifact_detection.py # Artifact detection test
├── view_results.py # Results viewer
├── src/
│ └── audio_tests.py # Core test functions
└── test_results/ # YAML output files
── YYYYMMDD_HHMMSS_results.yaml
── YYYYMMDD_HHMMSS_latency/
└── YYYYMMDD_HHMMSS_artifact_detection/
```
## Tips

View File

@@ -35,4 +35,20 @@ Play a sine in different frequencies, and for every frequency 5 sec long and do
Dont do fourier yet.
Do a simple project.
Do a simple project.
----
I want you to write a new test:
Put a 1khz sine into the system and record both channels for x seconds e.g. 60.
I want you to detect buzzing and other artifacts in the recording.
Give me a number how many artifacts you found.
Make the detection algorithm configurable, so we can try different approaches.
Again input it into the audio interface and measure both loopback and radio path like in the other test.

View File

@@ -7,7 +7,7 @@ audio:
test_tones:
frequencies: [100, 250, 500, 1000, 2000, 4000, 8000] # Hz
duration: 5.0 # seconds per frequency
duration: 10.0 # seconds per frequency
amplitude: 0.5 # 0.0 to 1.0
latency_runs: 5 # Number of latency measurements to average
@@ -15,3 +15,26 @@ output:
results_dir: "test_results"
save_plots: true
save_raw_audio: false
artifact_detection:
test_frequency: 1000 # Hz - Test tone frequency (for sine wave mode)
duration: 60.0 # seconds - Recording duration
amplitude: 0.5 # 0.0 to 1.0
startup_delay: 0 # seconds - Wait before starting recording to let system settle
# Chirp signal parameters (used when --signal-type chirp is specified)
chirp_f0: 100 # Hz - Chirp start frequency
chirp_f1: 8000 # Hz - Chirp end frequency
# NOTE: All detectors skip the first and last 1 second of recording to avoid startup/shutdown transients
detectors:
spectral_anomaly:
enabled: false # DISABLED - generates too many false positives, needs better algorithm
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
amplitude_spikes:
enabled: true
threshold_factor: 5.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
zero_crossing:
enabled: false
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
energy_variation:
enabled: true
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)

View File

@@ -1,10 +1,9 @@
metadata:
test_id: 20260226_123456
timestamp: '2026-02-26T12:34:56.789012'
pcb_version: v2.1
pcb_revision: A
serial_number: SN001234
software_version: a3f2b1c8d9e
notes: Baseline test with new capacitor values
comment: Baseline test with new capacitor values
test_results:
- frequency_hz: 100
latency_ms: 2.341

View File

@@ -279,3 +279,512 @@ def plot_latency_test(channel_1: np.ndarray, channel_2: np.ndarray, correlation:
plot_file = output_dir / 'latency_chirp_analysis.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def detect_artifacts_spectral_anomaly(signal_data: np.ndarray, sample_rate: int,
fundamental_freq: float, threshold_db: float = -60) -> List[Dict]:
artifacts = []
window_size = int(sample_rate * 0.5)
hop_size = int(sample_rate * 0.25)
for i in range(0, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
fft = np.fft.rfft(segment)
freqs = np.fft.rfftfreq(len(segment), 1/sample_rate)
power_spectrum_db = 20 * np.log10(np.abs(fft) + 1e-10)
fundamental_idx = np.argmin(np.abs(freqs - fundamental_freq))
fundamental_power_db = power_spectrum_db[fundamental_idx]
expected_harmonics = set()
harmonic_tolerance_bins = 3
for n in range(1, 11):
harmonic_freq = n * fundamental_freq
if harmonic_freq < sample_rate / 2:
harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq))
for offset in range(-harmonic_tolerance_bins, harmonic_tolerance_bins + 1):
if 0 <= harmonic_idx + offset < len(freqs):
expected_harmonics.add(harmonic_idx + offset)
noise_floor_db = np.percentile(power_spectrum_db[10:], 10)
unexpected_peaks = []
for idx in range(10, len(power_spectrum_db)):
if idx not in expected_harmonics:
if power_spectrum_db[idx] > noise_floor_db + abs(threshold_db):
unexpected_peaks.append((freqs[idx], power_spectrum_db[idx]))
if len(unexpected_peaks) >= 5:
artifacts.append({
'type': 'spectral_anomaly',
'time_sec': i / sample_rate,
'unexpected_frequencies': unexpected_peaks[:10],
'count': len(unexpected_peaks)
})
return artifacts
def detect_artifacts_amplitude_spikes(signal_data: np.ndarray, sample_rate: int,
threshold_factor: float = 3.0) -> List[Dict]:
artifacts = []
skip_samples = int(sample_rate * 1.0)
if len(signal_data) <= 2 * skip_samples:
return artifacts
envelope = np.abs(signal_data)
window_size = int(sample_rate * 0.01)
if window_size % 2 == 0:
window_size += 1
from scipy.ndimage import uniform_filter1d
envelope_smooth = uniform_filter1d(envelope, size=window_size, mode='reflect')
median_env = np.median(envelope_smooth)
mad = np.median(np.abs(envelope_smooth - median_env))
if mad == 0:
return artifacts
threshold_high = median_env + threshold_factor * mad * 1.4826
threshold_low = median_env - threshold_factor * mad * 1.4826
# Detect spikes (too high)
spike_indices = np.where(envelope_smooth > threshold_high)[0]
# Detect dropouts (too low)
dropout_indices = np.where(envelope_smooth < threshold_low)[0]
total_duration = len(signal_data) / sample_rate
# Process spikes
if len(spike_indices) > 0:
groups = []
current_group = [spike_indices[0]]
for idx in spike_indices[1:]:
if idx - current_group[-1] <= int(sample_rate * 0.05):
current_group.append(idx)
else:
groups.append(current_group)
current_group = [idx]
groups.append(current_group)
for group in groups:
peak_idx = group[np.argmax(envelope_smooth[group])]
time_sec = peak_idx / sample_rate
peak_value = envelope_smooth[peak_idx]
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
artifacts.append({
'type': 'amplitude_spike',
'time_sec': float(time_sec),
'peak_amplitude': float(peak_value),
'median_amplitude': float(median_env),
'deviation_factor': float((peak_value - median_env) / (mad * 1.4826)) if mad > 0 else 0
})
# Process dropouts
if len(dropout_indices) > 0:
groups = []
current_group = [dropout_indices[0]]
for idx in dropout_indices[1:]:
if idx - current_group[-1] <= int(sample_rate * 0.05):
current_group.append(idx)
else:
groups.append(current_group)
current_group = [idx]
groups.append(current_group)
for group in groups:
dropout_idx = group[np.argmin(envelope_smooth[group])]
time_sec = dropout_idx / sample_rate
dropout_value = envelope_smooth[dropout_idx]
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
artifacts.append({
'type': 'amplitude_dropout',
'time_sec': float(time_sec),
'dropout_amplitude': float(dropout_value),
'median_amplitude': float(median_env),
'deviation_factor': float((median_env - dropout_value) / (mad * 1.4826)) if mad > 0 else 0
})
return artifacts
def detect_artifacts_zero_crossing(signal_data: np.ndarray, sample_rate: int,
threshold_factor: float = 2.0) -> List[Dict]:
artifacts = []
if len(signal_data) <= int(sample_rate * 2.0):
return artifacts
window_size = int(sample_rate * 0.1)
hop_size = int(sample_rate * 0.05)
zcr_values = []
for i in range(0, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
zero_crossings = np.sum(np.abs(np.diff(np.sign(segment)))) / 2
zcr = zero_crossings / len(segment)
zcr_values.append((i, zcr))
if not zcr_values:
return artifacts
zcr_array = np.array([z[1] for z in zcr_values])
median_zcr = np.median(zcr_array)
std_zcr = np.std(zcr_array)
total_duration = len(signal_data) / sample_rate
for i, zcr in zcr_values:
time_sec = i / sample_rate
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
if std_zcr > 0 and abs(zcr - median_zcr) > threshold_factor * std_zcr:
artifacts.append({
'type': 'zero_crossing_anomaly',
'time_sec': float(time_sec),
'zcr_value': float(zcr),
'median_zcr': float(median_zcr),
'deviation_factor': float(abs(zcr - median_zcr) / std_zcr)
})
return artifacts
def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
threshold_db: float = 6.0) -> List[Dict]:
artifacts = []
if len(signal_data) <= int(sample_rate * 2.0):
return artifacts
window_size = int(sample_rate * 0.1)
hop_size = int(sample_rate * 0.05)
energy_values = []
for i in range(0, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
energy = np.sum(segment**2)
energy_values.append((i, energy))
total_duration = len(signal_data) / sample_rate
for idx in range(1, len(energy_values)):
prev_energy = energy_values[idx-1][1]
curr_energy = energy_values[idx][1]
if prev_energy > 0 and curr_energy > 0:
energy_change_db = 10 * np.log10(curr_energy / prev_energy)
if abs(energy_change_db) > threshold_db:
time_sec = energy_values[idx][0] / sample_rate
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
artifacts.append({
'type': 'energy_variation',
'time_sec': float(time_sec),
'energy_change_db': float(energy_change_db),
'threshold_db': float(threshold_db)
})
return artifacts
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
expected_freq: float) -> Dict:
"""
Measure the actual dominant frequency in the signal and compare to expected.
Uses FFT on the full signal (skipping first and last second).
"""
# Skip first and last second
skip_samples = int(sample_rate * 1.0)
if len(signal_data) <= 2 * skip_samples:
return {
'expected_freq_hz': float(expected_freq),
'measured_freq_hz': 0.0,
'error_hz': 0.0,
'error_percent': 0.0
}
signal_trimmed = signal_data[skip_samples:-skip_samples]
# Perform FFT
fft = np.fft.rfft(signal_trimmed)
freqs = np.fft.rfftfreq(len(signal_trimmed), 1/sample_rate)
# Find the peak frequency
magnitude = np.abs(fft)
peak_idx = np.argmax(magnitude)
measured_freq = freqs[peak_idx]
# Calculate error
error_hz = measured_freq - expected_freq
error_percent = (error_hz / expected_freq) * 100.0 if expected_freq > 0 else 0.0
return {
'expected_freq_hz': float(expected_freq),
'measured_freq_hz': float(measured_freq),
'error_hz': float(error_hz),
'error_percent': float(error_percent)
}
def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundamental_freq: float,
detector_config: Dict) -> Dict:
all_artifacts = []
if detector_config.get('spectral_anomaly', {}).get('enabled', True):
threshold = detector_config.get('spectral_anomaly', {}).get('threshold_db', -60)
artifacts = detect_artifacts_spectral_anomaly(signal_data, sample_rate, fundamental_freq, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('amplitude_spikes', {}).get('enabled', True):
threshold = detector_config.get('amplitude_spikes', {}).get('threshold_factor', 3.0)
artifacts = detect_artifacts_amplitude_spikes(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('zero_crossing', {}).get('enabled', True):
threshold = detector_config.get('zero_crossing', {}).get('threshold_factor', 2.0)
artifacts = detect_artifacts_zero_crossing(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('energy_variation', {}).get('enabled', True):
threshold = detector_config.get('energy_variation', {}).get('threshold_db', 6.0)
artifacts = detect_artifacts_energy_variation(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
# Measure frequency accuracy
freq_accuracy = measure_frequency_accuracy(signal_data, sample_rate, fundamental_freq)
artifact_summary = {
'total_count': len(all_artifacts),
'by_type': {},
'artifacts': all_artifacts,
'frequency_accuracy': freq_accuracy
}
for artifact in all_artifacts:
artifact_type = artifact['type']
if artifact_type not in artifact_summary['by_type']:
artifact_summary['by_type'][artifact_type] = 0
artifact_summary['by_type'][artifact_type] += 1
return artifact_summary
def plot_individual_anomaly(signal_data: np.ndarray, artifact: Dict, artifact_idx: int,
channel_name: str, frequency: float, sample_rate: int,
output_dir: Path):
periods_to_show = 20
period_samples = int(sample_rate / frequency)
total_samples = periods_to_show * period_samples
artifact_time = artifact['time_sec']
artifact_sample = int(artifact_time * sample_rate)
start_sample = max(0, artifact_sample - total_samples // 2)
end_sample = min(len(signal_data), artifact_sample + total_samples // 2)
if end_sample - start_sample < total_samples:
if start_sample == 0:
end_sample = min(len(signal_data), start_sample + total_samples)
else:
start_sample = max(0, end_sample - total_samples)
segment = signal_data[start_sample:end_sample]
time_ms = (np.arange(len(segment)) + start_sample) / sample_rate * 1000
fig, ax = plt.subplots(1, 1, figsize=(14, 6))
ax.plot(time_ms, segment, linewidth=1.0, color='blue', alpha=0.8)
ax.axvline(x=artifact_time * 1000, color='red', linestyle='--', linewidth=2,
label=f'Anomaly at {artifact_time:.3f}s', alpha=0.7)
ax.set_xlabel('Time (ms)', fontsize=11)
ax.set_ylabel('Amplitude', fontsize=11)
artifact_type = artifact['type'].replace('_', ' ').title()
ax.set_title(f'{channel_name} - {artifact_type} #{artifact_idx+1} (~{periods_to_show} periods @ {frequency}Hz)',
fontsize=12, fontweight='bold')
info_text = f"Type: {artifact_type}\nTime: {artifact_time:.3f}s"
if 'deviation_factor' in artifact:
info_text += f"\nDeviation: {artifact['deviation_factor']:.2f}σ"
if 'energy_change_db' in artifact:
info_text += f"\nEnergy Change: {artifact['energy_change_db']:.2f} dB"
if 'count' in artifact and artifact['type'] == 'spectral_anomaly':
info_text += f"\nUnexpected Peaks: {artifact['count']}"
ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
plt.tight_layout()
safe_type = artifact['type'].replace('_', '-')
plot_file = output_dir / f'{channel_name.lower().replace(" ", "_")}_anomaly_{artifact_idx+1:04d}_{safe_type}_{artifact_time:.3f}s.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def plot_artifact_detection(channel_1: np.ndarray, channel_2: np.ndarray,
artifacts_ch1: Dict, artifacts_ch2: Dict,
frequency: float, sample_rate: int, output_dir: Path):
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
time = np.arange(len(channel_1)) / sample_rate
axes[0, 0].plot(time, channel_1, alpha=0.7, linewidth=0.5)
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].set_title(f'Channel 1 (Loopback) - {artifacts_ch1["total_count"]} artifacts')
axes[0, 0].grid(True, alpha=0.3)
for artifact in artifacts_ch1['artifacts']:
axes[0, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
axes[1, 0].plot(time, channel_2, alpha=0.7, linewidth=0.5)
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Amplitude')
axes[1, 0].set_title(f'Channel 2 (DUT/Radio) - {artifacts_ch2["total_count"]} artifacts')
axes[1, 0].grid(True, alpha=0.3)
for artifact in artifacts_ch2['artifacts']:
axes[1, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
fft_ch1 = np.fft.rfft(channel_1)
fft_ch2 = np.fft.rfft(channel_2)
freqs = np.fft.rfftfreq(len(channel_1), 1/sample_rate)
axes[0, 1].plot(freqs, 20*np.log10(np.abs(fft_ch1) + 1e-10), linewidth=0.5)
axes[0, 1].set_xlabel('Frequency (Hz)')
axes[0, 1].set_ylabel('Magnitude (dB)')
axes[0, 1].set_title('Channel 1 Spectrum')
axes[0, 1].set_xlim(0, min(10000, sample_rate/2))
axes[0, 1].grid(True, alpha=0.3)
axes[1, 1].plot(freqs, 20*np.log10(np.abs(fft_ch2) + 1e-10), linewidth=0.5)
axes[1, 1].set_xlabel('Frequency (Hz)')
axes[1, 1].set_ylabel('Magnitude (dB)')
axes[1, 1].set_title('Channel 2 Spectrum')
axes[1, 1].set_xlim(0, min(10000, sample_rate/2))
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plot_file = output_dir / f'artifact_detection_{frequency}Hz.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
import time
sample_rate = config['audio']['sample_rate']
duration = config['artifact_detection']['duration']
frequency = config['artifact_detection']['test_frequency']
amplitude = config['artifact_detection']['amplitude']
device_name = config['audio']['device_name']
channels = config['audio']['channels']
detector_config = config['artifact_detection']['detectors']
startup_delay = config['artifact_detection'].get('startup_delay', 10)
signal_type = config['artifact_detection'].get('signal_type', 'sine')
device_ids = find_audio_device(device_name)
if startup_delay > 0:
print(f"Waiting {startup_delay} seconds for system to settle...")
time.sleep(startup_delay)
print("Starting recording...")
if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000)
tone = generate_chirp(duration, sample_rate, f0=f0, f1=f1, amplitude=amplitude)
frequency = (f0 + f1) / 2
recording = play_and_record(tone, sample_rate, device_ids, channels)
elif signal_type == 'silent':
frequency = 1000
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
channels=channels, device=device_ids[0], blocking=True)
else:
tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
recording = play_and_record(tone, sample_rate, device_ids, channels)
channel_1 = recording[:, 0]
channel_2 = recording[:, 1]
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
if save_plots and output_dir:
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
frequency, sample_rate, output_dir)
anomalies_dir = output_dir / 'individual_anomalies'
anomalies_dir.mkdir(exist_ok=True)
print(f"\nPlotting individual anomalies to: {anomalies_dir}")
for idx, artifact in enumerate(artifacts_ch1['artifacts']):
plot_individual_anomaly(channel_1, artifact, idx, 'Channel 1 Loopback',
frequency, sample_rate, anomalies_dir)
for idx, artifact in enumerate(artifacts_ch2['artifacts']):
plot_individual_anomaly(channel_2, artifact, idx, 'Channel 2 DUT',
frequency, sample_rate, anomalies_dir)
total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts'])
if total_anomaly_plots > 0:
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
result = {
'signal_type': signal_type,
'duration_sec': float(duration),
'channel_1_loopback': {
'total_artifacts': artifacts_ch1['total_count'],
'artifacts_by_type': artifacts_ch1['by_type'],
'artifact_rate_per_minute': float(artifacts_ch1['total_count'] / duration * 60),
'frequency_accuracy': artifacts_ch1['frequency_accuracy']
},
'channel_2_dut': {
'total_artifacts': artifacts_ch2['total_count'],
'artifacts_by_type': artifacts_ch2['by_type'],
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
'frequency_accuracy': artifacts_ch2['frequency_accuracy']
},
'detector_config': detector_config
}
if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000)
result['chirp_f0_hz'] = int(f0)
result['chirp_f1_hz'] = int(f1)
elif signal_type == 'silent':
result['note'] = 'Silent mode - no playback, noise floor measurement'
else:
result['test_frequency_hz'] = int(frequency)
return result

183
test_artifact_detection.py Executable file
View File

@@ -0,0 +1,183 @@
#!/usr/bin/env python3
import argparse
import yaml
from datetime import datetime
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_artifact_detection_test
def main():
parser = argparse.ArgumentParser(description='Run artifact detection test on audio loopback and radio path')
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
parser.add_argument('--duration', type=float, help='Override recording duration in seconds (default from config)')
parser.add_argument('--frequency', type=float, help='Override test frequency in Hz (default from config)')
parser.add_argument('--signal-type', choices=['sine', 'chirp', 'silent'], default='sine',
help='Signal type: sine (single frequency), chirp (frequency sweep), or silent (no signal)')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
if args.duration:
config['artifact_detection']['duration'] = args.duration
if args.frequency:
config['artifact_detection']['test_frequency'] = args.frequency
config['artifact_detection']['signal_type'] = args.signal_type
timestamp = datetime.now()
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / f"{test_id}_artifact_detection"
test_output_dir.mkdir(exist_ok=True)
save_plots = config['output'].get('save_plots', False)
print("=" * 70)
print("ARTIFACT DETECTION TEST")
print("=" * 70)
print(f"Test ID: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Duration: {config['artifact_detection']['duration']} seconds")
signal_type = config['artifact_detection'].get('signal_type', 'sine')
if signal_type == 'sine':
print(f"Signal Type: Sine wave @ {config['artifact_detection']['test_frequency']} Hz")
elif signal_type == 'chirp':
print(f"Signal Type: Chirp (100 Hz - 8000 Hz)")
else:
print(f"Signal Type: Silent (no playback - noise floor measurement)")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 70)
print("\nDetection Algorithms:")
for detector_name, detector_settings in config['artifact_detection']['detectors'].items():
status = "ENABLED" if detector_settings.get('enabled', False) else "DISABLED"
print(f" - {detector_name}: {status}")
if detector_settings.get('enabled', False):
for param, value in detector_settings.items():
if param != 'enabled':
print(f" {param}: {value}")
print("\n" + "=" * 70)
signal_type = config['artifact_detection'].get('signal_type', 'sine')
if signal_type == 'sine':
freq = config['artifact_detection']['test_frequency']
print(f"STARTING TEST - Playing {freq}Hz sine wave and recording both channels...")
elif signal_type == 'chirp':
print("STARTING TEST - Playing chirp signal (100-8000Hz) and recording both channels...")
else:
print("STARTING TEST - Recording silence (no playback)...")
print("=" * 70)
print("\nChannel 1: Loopback path (direct audio interface loopback)")
print("Channel 2: DUT/Radio path (through beacon and radio transmission)")
print()
try:
result = run_artifact_detection_test(config, save_plots=save_plots, output_dir=test_output_dir)
print("\n" + "=" * 70)
print("TEST COMPLETE - RESULTS")
print("=" * 70)
signal_type = result.get('signal_type', 'sine')
if signal_type == 'chirp':
print(f"\n📊 Signal: Chirp {result['chirp_f0_hz']} Hz → {result['chirp_f1_hz']} Hz")
elif signal_type == 'silent':
print(f"\n📊 Signal: Silent (no playback - noise floor measurement)")
else:
print(f"\n📊 Test Frequency: {result['test_frequency_hz']} Hz")
print(f"⏱️ Duration: {result['duration_sec']} seconds")
print("\n🔊 CHANNEL 1 (LOOPBACK PATH):")
print(f" Total Artifacts: {result['channel_1_loopback']['total_artifacts']}")
print(f" Artifact Rate: {result['channel_1_loopback']['artifact_rate_per_minute']:.2f} per minute")
if result['channel_1_loopback']['artifacts_by_type']:
print(" By Type:")
for artifact_type, count in result['channel_1_loopback']['artifacts_by_type'].items():
print(f" - {artifact_type}: {count}")
# Display frequency accuracy for channel 1
if 'frequency_accuracy' in result['channel_1_loopback']:
freq_acc = result['channel_1_loopback']['frequency_accuracy']
print(f" Frequency Accuracy:")
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
print("\n📻 CHANNEL 2 (DUT/RADIO PATH):")
print(f" Total Artifacts: {result['channel_2_dut']['total_artifacts']}")
print(f" Artifact Rate: {result['channel_2_dut']['artifact_rate_per_minute']:.2f} per minute")
if result['channel_2_dut']['artifacts_by_type']:
print(" By Type:")
for artifact_type, count in result['channel_2_dut']['artifacts_by_type'].items():
print(f" - {artifact_type}: {count}")
# Display frequency accuracy for channel 2
if 'frequency_accuracy' in result['channel_2_dut']:
freq_acc = result['channel_2_dut']['frequency_accuracy']
print(f" Frequency Accuracy:")
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
ch1_count = result['channel_1_loopback']['total_artifacts']
ch2_count = result['channel_2_dut']['total_artifacts']
if ch2_count > ch1_count:
delta = ch2_count - ch1_count
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
elif ch1_count == ch2_count == 0:
print("\n✅ EXCELLENT: No artifacts detected in either path!")
else:
print(f"\n Loopback baseline: {ch1_count} artifacts")
except Exception as e:
print(f"\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
result = {
'error': str(e),
'test_frequency_hz': config['artifact_detection']['test_frequency'],
'duration_sec': config['artifact_detection']['duration']
}
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'serial_number': args.serial_number,
'software_version': args.software_version,
'comment': args.comment
},
'artifact_detection_result': result
}
output_file = test_output_dir / f"{test_id}_artifact_detection_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
print("\n" + "=" * 70)
print("✅ Results saved to:")
print(f" YAML: {output_file}")
if save_plots:
print(f" Summary plots: {test_output_dir}/")
print(f" Individual anomaly plots: {test_output_dir}/individual_anomalies/")
print("=" * 70)
if __name__ == '__main__':
main()

View File

@@ -6,16 +6,16 @@ from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_single_test, run_latency_test
from src.audio_tests import run_latency_test
def main():
parser = argparse.ArgumentParser(description='Run PCB hardware audio tests')
parser.add_argument('--pcb-version', required=True, help='PCB version (e.g., v2.1)')
parser.add_argument('--pcb-revision', required=True, help='PCB revision (e.g., A, B, C)')
parser = argparse.ArgumentParser(description='Run latency test on audio loopback and radio path')
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--notes', default='', help='Adjustments or comments about this test')
parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
parser.add_argument('--measurements', type=int, default=5, help='Number of latency measurements (default: 5)')
args = parser.parse_args()
@@ -28,59 +28,44 @@ def main():
results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / test_id
test_output_dir = results_dir / f"{test_id}_latency"
test_output_dir.mkdir(exist_ok=True)
save_plots = config['output'].get('save_plots', False)
print(f"Starting audio test run: {test_id}")
print(f"PCB: {args.pcb_version} Rev {args.pcb_revision}")
print(f"Starting latency test: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Measurements: {args.measurements}")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 60)
print("\n[1/2] Running chirp-based latency test (5 measurements)...")
print(f"\nRunning chirp-based latency test ({args.measurements} measurements)...")
try:
latency_stats = run_latency_test(config, num_measurements=5,
latency_stats = run_latency_test(config, num_measurements=args.measurements,
save_plots=save_plots, output_dir=test_output_dir)
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, "
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms")
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
f"std={latency_stats['std']:.3f}ms")
except Exception as e:
print(f"✗ Error: {e}")
latency_stats = {'avg': 0.0, 'min': 0.0, 'max': 0.0, 'std': 0.0, 'error': str(e)}
print("\n[2/2] Running frequency sweep tests...")
test_results = []
frequencies = config['test_tones']['frequencies']
for i, freq in enumerate(frequencies, 1):
print(f"Testing frequency {i}/{len(frequencies)}: {freq} Hz...", end=' ', flush=True)
try:
result = run_single_test(freq, config, save_plots=save_plots, output_dir=test_output_dir)
test_results.append(result)
print("")
except Exception as e:
print(f"✗ Error: {e}")
test_results.append({
'frequency_hz': freq,
'error': str(e)
})
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'pcb_version': args.pcb_version,
'pcb_revision': args.pcb_revision,
'serial_number': args.serial_number,
'software_version': args.software_version,
'notes': args.notes
'comment': args.comment
},
'latency_test': latency_stats,
'test_results': test_results
'latency_test': latency_stats
}
output_file = results_dir / f"{test_id}_results.yaml"
output_file = test_output_dir / f"{test_id}_latency_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)