Adds artifact test.

This commit is contained in:
Pbopbo
2026-03-18 10:41:46 +01:00
parent 80f7522159
commit 39bcd072c0
9 changed files with 836 additions and 21 deletions

View File

@@ -0,0 +1,193 @@
# Artifact Detection Test
## Overview
This test plays a 1kHz sine wave for a configurable duration (default 60 seconds) and records both channels simultaneously:
- **Channel 1**: Loopback path (direct audio interface connection)
- **Channel 2**: DUT/Radio path (through beacon and radio transmission)
The test detects buzzing, clicks, dropouts, and other audio artifacts using multiple configurable algorithms.
## Quick Start
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--comment "Testing new firmware"
```
## Detection Algorithms
The test uses four configurable detection algorithms (spectral_anomaly is **disabled by default** due to false positives):
### 1. Spectral Anomaly Detection (DISABLED BY DEFAULT)
- **Status**: ⚠️ Currently generates too many false positives - disabled by default
- **What it detects**: Unexpected frequencies that aren't harmonics of the fundamental tone
- **Use case**: Buzzing, interference, crosstalk
- **Configuration**: `threshold_db` - how far below fundamental to search (-60 dB default)
### 2. Amplitude Spike Detection (WORKING)
- **What it detects**: Sudden changes in signal amplitude (RMS)
- **Use case**: Clicks, pops, dropouts
- **Configuration**: `threshold_factor` - number of standard deviations (3.0 default)
### 3. Zero-Crossing Anomaly Detection (WORKING)
- **What it detects**: Irregular zero-crossing patterns
- **Use case**: Distortion, clipping, non-linear artifacts
- **Configuration**: `threshold_factor` - number of standard deviations (2.0 default)
### 4. Energy Variation Detection (WORKING)
- **What it detects**: Rapid energy changes between time windows
- **Use case**: Dropouts, level fluctuations, intermittent issues
- **Configuration**: `threshold_db` - energy change threshold (6.0 dB default)
## Configuration
Edit `config.yaml` to customize the test:
```yaml
artifact_detection:
test_frequency: 1000 # Hz
duration: 60.0 # seconds
amplitude: 0.5 # 0.0 to 1.0
detectors:
spectral_anomaly:
enabled: true
threshold_db: -40
amplitude_spikes:
enabled: true
threshold_factor: 3.0
zero_crossing:
enabled: true
threshold_factor: 2.0
energy_variation:
enabled: true
threshold_db: 6.0
```
## Command Line Options
- `--serial-number`: Serial number (required)
- `--software-version`: Git commit hash or version (required)
- `--comment`: Optional comments about the test
- `--config`: Path to config file (default: config.yaml)
- `--duration`: Override duration in seconds
- `--frequency`: Override test frequency in Hz
## Example: Quick 10-second Test
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--duration 10
```
## Example: Custom Frequency
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--frequency 440
```
## Tuning Detection Algorithms
### More Sensitive Detection
To catch more subtle artifacts, make thresholds stricter:
```yaml
detectors:
spectral_anomaly:
threshold_db: -50 # Lower = more sensitive
amplitude_spikes:
threshold_factor: 2.0 # Lower = more sensitive
zero_crossing:
threshold_factor: 1.5 # Lower = more sensitive
energy_variation:
threshold_db: 3.0 # Lower = more sensitive
```
### Less Sensitive Detection
To reduce false positives in noisy environments:
```yaml
detectors:
spectral_anomaly:
threshold_db: -30 # Higher = less sensitive
amplitude_spikes:
threshold_factor: 4.0 # Higher = less sensitive
zero_crossing:
threshold_factor: 3.0 # Higher = less sensitive
energy_variation:
threshold_db: 10.0 # Higher = less sensitive
```
### Disable Specific Detectors
```yaml
detectors:
spectral_anomaly:
enabled: false # Turn off this detector
```
## Output
The test generates:
1. **YAML results file**: `test_results/{timestamp}_artifact_detection_results.yaml`
2. **JSON results file**: `test_results/{timestamp}_artifact_detection_results.json`
3. **Summary plots** (if enabled): `test_results/{timestamp}_artifact_detection/`
- Time domain waveforms with artifact markers
- Frequency spectrum analysis
4. **Individual anomaly plots**: `test_results/{timestamp}_artifact_detection/individual_anomalies/`
- Each anomaly plotted individually with ~20 periods of context
- Detailed view showing exactly what the anomaly looks like
- Named by channel, type, and timestamp for easy identification
### Results Structure
```yaml
metadata:
test_id: "20260317_140530"
timestamp: "2026-03-17T14:05:30.123456"
test_type: "artifact_detection"
pcb_version: "v2.1"
pcb_revision: "A"
software_version: "abc123"
artifact_detection_result:
test_frequency_hz: 1000
duration_sec: 60.0
channel_1_loopback:
total_artifacts: 5
artifact_rate_per_minute: 5.0
artifacts_by_type:
spectral_anomaly: 2
amplitude_spike: 3
channel_2_dut:
total_artifacts: 23
artifact_rate_per_minute: 23.0
artifacts_by_type:
spectral_anomaly: 8
amplitude_spike: 10
energy_variation: 5
detector_config: {...}
```
## Interpreting Results
- **Zero artifacts in both channels**: Excellent signal quality
- **Same artifacts in both channels**: Likely environmental interference or audio interface issue
- **More artifacts in Channel 2 (radio path)**: Radio transmission degradation detected
- **High spectral_anomaly count**: Interference or crosstalk
- **High amplitude_spike count**: Clicks, pops, or dropouts
- **High energy_variation count**: Level instability or dropouts
## Comparison with Loopback Baseline
The loopback path (Channel 1) serves as a baseline reference. Any additional artifacts in the radio path (Channel 2) indicate degradation introduced by the radio transmission system.
Expected behavior:
- Loopback should have minimal artifacts (ideally zero)
- Radio path may have some artifacts due to transmission
- Large difference indicates issues in radio hardware/firmware

View File

@@ -13,10 +13,9 @@ pip install -r requirements.txt
```bash ```bash
python run_test.py \ python run_test.py \
--pcb-version "v1.0" \ --serial-number "SN001234" \
--pcb-revision "A" \
--software-version "initial" \ --software-version "initial" \
--notes "First test run" --comment "First test run"
``` ```
**What happens:** **What happens:**
@@ -44,11 +43,11 @@ python view_results.py example_test_result.yaml
Run multiple tests with different metadata: Run multiple tests with different metadata:
```bash ```bash
# Test PCB v1.0 # Test unit SN001234
python run_test.py --pcb-version "v1.0" --pcb-revision "A" --software-version "abc123" python run_test.py --serial-number "SN001234" --software-version "abc123"
# Test PCB v2.0 # Test unit SN001235
python run_test.py --pcb-version "v2.0" --pcb-revision "A" --software-version "abc123" python run_test.py --serial-number "SN001235" --software-version "abc123"
# Compare by viewing both YAML files # Compare by viewing both YAML files
python view_results.py test_results/20260226_120000_results.yaml python view_results.py test_results/20260226_120000_results.yaml

View File

@@ -21,10 +21,9 @@ pip install -r requirements.txt
```bash ```bash
python run_test.py \ python run_test.py \
--pcb-version "v2.1" \ --serial-number "SN001234" \
--pcb-revision "A" \
--software-version "a3f2b1c" \ --software-version "a3f2b1c" \
--notes "Replaced capacitor C5" --comment "Replaced capacitor C5"
``` ```
### 3. View Results ### 3. View Results

View File

@@ -36,3 +36,19 @@ Play a sine in different frequencies, and for every frequency 5 sec long and do
Dont do fourier yet. Dont do fourier yet.
Do a simple project. Do a simple project.
----
I want you to write a new test:
Put a 1khz sine into the system and record both channels for x seconds e.g. 60.
I want you to detect buzzing and other artifacts in the recording.
Give me a number how many artifacts you found.
Make the detection algorithm configurable, so we can try different approaches.
Again input it into the audio interface and measure both loopback and radio path like in the other test.

View File

@@ -15,3 +15,26 @@ output:
results_dir: "test_results" results_dir: "test_results"
save_plots: true save_plots: true
save_raw_audio: false save_raw_audio: false
artifact_detection:
test_frequency: 1000 # Hz - Test tone frequency (for sine wave mode)
duration: 60.0 # seconds - Recording duration
amplitude: 0.5 # 0.0 to 1.0
startup_delay: 0 # seconds - Wait before starting recording to let system settle
# Chirp signal parameters (used when --signal-type chirp is specified)
chirp_f0: 100 # Hz - Chirp start frequency
chirp_f1: 8000 # Hz - Chirp end frequency
# NOTE: All detectors skip the first 1 second of recording to avoid startup transients
detectors:
spectral_anomaly:
enabled: false # DISABLED - generates too many false positives, needs better algorithm
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
amplitude_spikes:
enabled: true
threshold_factor: 4.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
zero_crossing:
enabled: false
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
energy_variation:
enabled: false
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)

View File

@@ -1,10 +1,9 @@
metadata: metadata:
test_id: 20260226_123456 test_id: 20260226_123456
timestamp: '2026-02-26T12:34:56.789012' timestamp: '2026-02-26T12:34:56.789012'
pcb_version: v2.1 serial_number: SN001234
pcb_revision: A
software_version: a3f2b1c8d9e software_version: a3f2b1c8d9e
notes: Baseline test with new capacitor values comment: Baseline test with new capacitor values
test_results: test_results:
- frequency_hz: 100 - frequency_hz: 100
latency_ms: 2.341 latency_ms: 2.341

View File

@@ -11,10 +11,9 @@ from src.audio_tests import run_single_test, run_latency_test
def main(): def main():
parser = argparse.ArgumentParser(description='Run PCB hardware audio tests') parser = argparse.ArgumentParser(description='Run PCB hardware audio tests')
parser.add_argument('--pcb-version', required=True, help='PCB version (e.g., v2.1)') parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--pcb-revision', required=True, help='PCB revision (e.g., A, B, C)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)') parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--notes', default='', help='Adjustments or comments about this test') parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file') parser.add_argument('--config', default='config.yaml', help='Path to config file')
args = parser.parse_args() args = parser.parse_args()
@@ -34,8 +33,10 @@ def main():
save_plots = config['output'].get('save_plots', False) save_plots = config['output'].get('save_plots', False)
print(f"Starting audio test run: {test_id}") print(f"Starting audio test run: {test_id}")
print(f"PCB: {args.pcb_version} Rev {args.pcb_revision}") print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}") print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
if save_plots: if save_plots:
print(f"Plots will be saved to: {test_output_dir}") print(f"Plots will be saved to: {test_output_dir}")
print("-" * 60) print("-" * 60)
@@ -71,10 +72,9 @@ def main():
'metadata': { 'metadata': {
'test_id': test_id, 'test_id': test_id,
'timestamp': timestamp.isoformat(), 'timestamp': timestamp.isoformat(),
'pcb_version': args.pcb_version, 'serial_number': args.serial_number,
'pcb_revision': args.pcb_revision,
'software_version': args.software_version, 'software_version': args.software_version,
'notes': args.notes 'comment': args.comment
}, },
'latency_test': latency_stats, 'latency_test': latency_stats,
'test_results': test_results 'test_results': test_results

View File

@@ -279,3 +279,416 @@ def plot_latency_test(channel_1: np.ndarray, channel_2: np.ndarray, correlation:
plot_file = output_dir / 'latency_chirp_analysis.png' plot_file = output_dir / 'latency_chirp_analysis.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight') plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close() plt.close()
def detect_artifacts_spectral_anomaly(signal_data: np.ndarray, sample_rate: int,
fundamental_freq: float, threshold_db: float = -60) -> List[Dict]:
artifacts = []
window_size = int(sample_rate * 0.5)
hop_size = int(sample_rate * 0.25)
for i in range(0, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
fft = np.fft.rfft(segment)
freqs = np.fft.rfftfreq(len(segment), 1/sample_rate)
power_spectrum_db = 20 * np.log10(np.abs(fft) + 1e-10)
fundamental_idx = np.argmin(np.abs(freqs - fundamental_freq))
fundamental_power_db = power_spectrum_db[fundamental_idx]
expected_harmonics = set()
harmonic_tolerance_bins = 3
for n in range(1, 11):
harmonic_freq = n * fundamental_freq
if harmonic_freq < sample_rate / 2:
harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq))
for offset in range(-harmonic_tolerance_bins, harmonic_tolerance_bins + 1):
if 0 <= harmonic_idx + offset < len(freqs):
expected_harmonics.add(harmonic_idx + offset)
noise_floor_db = np.percentile(power_spectrum_db[10:], 10)
unexpected_peaks = []
for idx in range(10, len(power_spectrum_db)):
if idx not in expected_harmonics:
if power_spectrum_db[idx] > noise_floor_db + abs(threshold_db):
unexpected_peaks.append((freqs[idx], power_spectrum_db[idx]))
if len(unexpected_peaks) >= 5:
artifacts.append({
'type': 'spectral_anomaly',
'time_sec': i / sample_rate,
'unexpected_frequencies': unexpected_peaks[:10],
'count': len(unexpected_peaks)
})
return artifacts
def detect_artifacts_amplitude_spikes(signal_data: np.ndarray, sample_rate: int,
threshold_factor: float = 3.0) -> List[Dict]:
artifacts = []
skip_samples = int(sample_rate * 1.0)
if len(signal_data) <= skip_samples:
return artifacts
signal_trimmed = signal_data[skip_samples:]
envelope = np.abs(signal_trimmed)
window_size = int(sample_rate * 0.01)
if window_size % 2 == 0:
window_size += 1
from scipy.ndimage import uniform_filter1d
envelope_smooth = uniform_filter1d(envelope, size=window_size, mode='reflect')
median_env = np.median(envelope_smooth)
mad = np.median(np.abs(envelope_smooth - median_env))
if mad == 0:
return artifacts
threshold = median_env + threshold_factor * mad * 1.4826
spike_indices = np.where(envelope_smooth > threshold)[0]
if len(spike_indices) == 0:
return artifacts
groups = []
current_group = [spike_indices[0]]
for idx in spike_indices[1:]:
if idx - current_group[-1] <= int(sample_rate * 0.05):
current_group.append(idx)
else:
groups.append(current_group)
current_group = [idx]
groups.append(current_group)
for group in groups:
peak_idx = group[np.argmax(envelope_smooth[group])]
time_sec = (peak_idx + skip_samples) / sample_rate
peak_value = envelope_smooth[peak_idx]
artifacts.append({
'type': 'amplitude_spike',
'time_sec': float(time_sec),
'peak_amplitude': float(peak_value),
'median_amplitude': float(median_env),
'deviation_factor': float((peak_value - median_env) / (mad * 1.4826)) if mad > 0 else 0
})
return artifacts
def detect_artifacts_zero_crossing(signal_data: np.ndarray, sample_rate: int,
threshold_factor: float = 2.0) -> List[Dict]:
artifacts = []
skip_samples = int(sample_rate * 1.0)
if len(signal_data) <= skip_samples:
return artifacts
window_size = int(sample_rate * 0.1)
hop_size = int(sample_rate * 0.05)
zcr_values = []
for i in range(skip_samples, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
zero_crossings = np.sum(np.abs(np.diff(np.sign(segment)))) / 2
zcr = zero_crossings / len(segment)
zcr_values.append((i, zcr))
if not zcr_values:
return artifacts
zcr_array = np.array([z[1] for z in zcr_values])
median_zcr = np.median(zcr_array)
std_zcr = np.std(zcr_array)
for i, zcr in zcr_values:
if std_zcr > 0 and abs(zcr - median_zcr) > threshold_factor * std_zcr:
artifacts.append({
'type': 'zero_crossing_anomaly',
'time_sec': i / sample_rate,
'zcr_value': float(zcr),
'median_zcr': float(median_zcr),
'deviation_factor': float(abs(zcr - median_zcr) / std_zcr)
})
return artifacts
def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
threshold_db: float = 6.0) -> List[Dict]:
artifacts = []
skip_samples = int(sample_rate * 1.0)
if len(signal_data) <= skip_samples:
return artifacts
window_size = int(sample_rate * 0.1)
hop_size = int(sample_rate * 0.05)
energy_values = []
for i in range(skip_samples, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
energy = np.sum(segment**2)
energy_values.append((i, energy))
for idx in range(1, len(energy_values)):
prev_energy = energy_values[idx-1][1]
curr_energy = energy_values[idx][1]
if prev_energy > 0 and curr_energy > 0:
energy_change_db = 10 * np.log10(curr_energy / prev_energy)
if abs(energy_change_db) > threshold_db:
artifacts.append({
'type': 'energy_variation',
'time_sec': energy_values[idx][0] / sample_rate,
'energy_change_db': float(energy_change_db),
'prev_energy': float(prev_energy),
'curr_energy': float(curr_energy)
})
return artifacts
def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundamental_freq: float,
detector_config: Dict) -> Dict:
all_artifacts = []
if detector_config.get('spectral_anomaly', {}).get('enabled', True):
threshold = detector_config.get('spectral_anomaly', {}).get('threshold_db', -60)
artifacts = detect_artifacts_spectral_anomaly(signal_data, sample_rate, fundamental_freq, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('amplitude_spikes', {}).get('enabled', True):
threshold = detector_config.get('amplitude_spikes', {}).get('threshold_factor', 3.0)
artifacts = detect_artifacts_amplitude_spikes(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('zero_crossing', {}).get('enabled', True):
threshold = detector_config.get('zero_crossing', {}).get('threshold_factor', 2.0)
artifacts = detect_artifacts_zero_crossing(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('energy_variation', {}).get('enabled', True):
threshold = detector_config.get('energy_variation', {}).get('threshold_db', 6.0)
artifacts = detect_artifacts_energy_variation(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
artifact_summary = {
'total_count': len(all_artifacts),
'by_type': {},
'artifacts': all_artifacts
}
for artifact in all_artifacts:
artifact_type = artifact['type']
if artifact_type not in artifact_summary['by_type']:
artifact_summary['by_type'][artifact_type] = 0
artifact_summary['by_type'][artifact_type] += 1
return artifact_summary
def plot_individual_anomaly(signal_data: np.ndarray, artifact: Dict, artifact_idx: int,
channel_name: str, frequency: float, sample_rate: int,
output_dir: Path):
periods_to_show = 20
period_samples = int(sample_rate / frequency)
total_samples = periods_to_show * period_samples
artifact_time = artifact['time_sec']
artifact_sample = int(artifact_time * sample_rate)
start_sample = max(0, artifact_sample - total_samples // 2)
end_sample = min(len(signal_data), artifact_sample + total_samples // 2)
if end_sample - start_sample < total_samples:
if start_sample == 0:
end_sample = min(len(signal_data), start_sample + total_samples)
else:
start_sample = max(0, end_sample - total_samples)
segment = signal_data[start_sample:end_sample]
time_ms = (np.arange(len(segment)) + start_sample) / sample_rate * 1000
fig, ax = plt.subplots(1, 1, figsize=(14, 6))
ax.plot(time_ms, segment, linewidth=1.0, color='blue', alpha=0.8)
ax.axvline(x=artifact_time * 1000, color='red', linestyle='--', linewidth=2,
label=f'Anomaly at {artifact_time:.3f}s', alpha=0.7)
ax.set_xlabel('Time (ms)', fontsize=11)
ax.set_ylabel('Amplitude', fontsize=11)
artifact_type = artifact['type'].replace('_', ' ').title()
ax.set_title(f'{channel_name} - {artifact_type} #{artifact_idx+1} (~{periods_to_show} periods @ {frequency}Hz)',
fontsize=12, fontweight='bold')
info_text = f"Type: {artifact_type}\nTime: {artifact_time:.3f}s"
if 'deviation_factor' in artifact:
info_text += f"\nDeviation: {artifact['deviation_factor']:.2f}σ"
if 'energy_change_db' in artifact:
info_text += f"\nEnergy Change: {artifact['energy_change_db']:.2f} dB"
if 'count' in artifact and artifact['type'] == 'spectral_anomaly':
info_text += f"\nUnexpected Peaks: {artifact['count']}"
ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
plt.tight_layout()
safe_type = artifact['type'].replace('_', '-')
plot_file = output_dir / f'{channel_name.lower().replace(" ", "_")}_anomaly_{artifact_idx+1:04d}_{safe_type}_{artifact_time:.3f}s.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def plot_artifact_detection(channel_1: np.ndarray, channel_2: np.ndarray,
artifacts_ch1: Dict, artifacts_ch2: Dict,
frequency: float, sample_rate: int, output_dir: Path):
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
time = np.arange(len(channel_1)) / sample_rate
axes[0, 0].plot(time, channel_1, alpha=0.7, linewidth=0.5)
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].set_title(f'Channel 1 (Loopback) - {artifacts_ch1["total_count"]} artifacts')
axes[0, 0].grid(True, alpha=0.3)
for artifact in artifacts_ch1['artifacts']:
axes[0, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
axes[1, 0].plot(time, channel_2, alpha=0.7, linewidth=0.5)
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Amplitude')
axes[1, 0].set_title(f'Channel 2 (DUT/Radio) - {artifacts_ch2["total_count"]} artifacts')
axes[1, 0].grid(True, alpha=0.3)
for artifact in artifacts_ch2['artifacts']:
axes[1, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
fft_ch1 = np.fft.rfft(channel_1)
fft_ch2 = np.fft.rfft(channel_2)
freqs = np.fft.rfftfreq(len(channel_1), 1/sample_rate)
axes[0, 1].plot(freqs, 20*np.log10(np.abs(fft_ch1) + 1e-10), linewidth=0.5)
axes[0, 1].set_xlabel('Frequency (Hz)')
axes[0, 1].set_ylabel('Magnitude (dB)')
axes[0, 1].set_title('Channel 1 Spectrum')
axes[0, 1].set_xlim(0, min(10000, sample_rate/2))
axes[0, 1].grid(True, alpha=0.3)
axes[1, 1].plot(freqs, 20*np.log10(np.abs(fft_ch2) + 1e-10), linewidth=0.5)
axes[1, 1].set_xlabel('Frequency (Hz)')
axes[1, 1].set_ylabel('Magnitude (dB)')
axes[1, 1].set_title('Channel 2 Spectrum')
axes[1, 1].set_xlim(0, min(10000, sample_rate/2))
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plot_file = output_dir / f'artifact_detection_{frequency}Hz.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
import time
sample_rate = config['audio']['sample_rate']
duration = config['artifact_detection']['duration']
frequency = config['artifact_detection']['test_frequency']
amplitude = config['artifact_detection']['amplitude']
device_name = config['audio']['device_name']
channels = config['audio']['channels']
detector_config = config['artifact_detection']['detectors']
startup_delay = config['artifact_detection'].get('startup_delay', 10)
signal_type = config['artifact_detection'].get('signal_type', 'sine')
device_ids = find_audio_device(device_name)
if startup_delay > 0:
print(f"Waiting {startup_delay} seconds for system to settle...")
time.sleep(startup_delay)
print("Starting recording...")
if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000)
tone = generate_chirp(duration, sample_rate, f0=f0, f1=f1, amplitude=amplitude)
frequency = (f0 + f1) / 2
recording = play_and_record(tone, sample_rate, device_ids, channels)
elif signal_type == 'silent':
frequency = 1000
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
channels=channels, device=device_ids[0], blocking=True)
else:
tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
recording = play_and_record(tone, sample_rate, device_ids, channels)
channel_1 = recording[:, 0]
channel_2 = recording[:, 1]
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
if save_plots and output_dir:
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
frequency, sample_rate, output_dir)
anomalies_dir = output_dir / 'individual_anomalies'
anomalies_dir.mkdir(exist_ok=True)
print(f"\nPlotting individual anomalies to: {anomalies_dir}")
for idx, artifact in enumerate(artifacts_ch1['artifacts']):
plot_individual_anomaly(channel_1, artifact, idx, 'Channel 1 Loopback',
frequency, sample_rate, anomalies_dir)
for idx, artifact in enumerate(artifacts_ch2['artifacts']):
plot_individual_anomaly(channel_2, artifact, idx, 'Channel 2 DUT',
frequency, sample_rate, anomalies_dir)
total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts'])
if total_anomaly_plots > 0:
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
result = {
'signal_type': signal_type,
'duration_sec': float(duration),
'channel_1_loopback': {
'total_artifacts': artifacts_ch1['total_count'],
'artifacts_by_type': artifacts_ch1['by_type'],
'artifact_rate_per_minute': float(artifacts_ch1['total_count'] / duration * 60)
},
'channel_2_dut': {
'total_artifacts': artifacts_ch2['total_count'],
'artifacts_by_type': artifacts_ch2['by_type'],
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60)
},
'detector_config': detector_config
}
if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000)
result['chirp_f0_hz'] = int(f0)
result['chirp_f1_hz'] = int(f1)
elif signal_type == 'silent':
result['note'] = 'Silent mode - no playback, noise floor measurement'
else:
result['test_frequency_hz'] = int(frequency)
return result

173
test_artifact_detection.py Executable file
View File

@@ -0,0 +1,173 @@
#!/usr/bin/env python3
import argparse
import yaml
from datetime import datetime
from pathlib import Path
import sys
import json
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_artifact_detection_test
def main():
parser = argparse.ArgumentParser(description='Run artifact detection test on audio loopback and radio path')
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
parser.add_argument('--duration', type=float, help='Override recording duration in seconds (default from config)')
parser.add_argument('--frequency', type=float, help='Override test frequency in Hz (default from config)')
parser.add_argument('--signal-type', choices=['sine', 'chirp', 'silent'], default='sine',
help='Signal type: sine (single frequency), chirp (frequency sweep), or silent (no signal)')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
if args.duration:
config['artifact_detection']['duration'] = args.duration
if args.frequency:
config['artifact_detection']['test_frequency'] = args.frequency
config['artifact_detection']['signal_type'] = args.signal_type
timestamp = datetime.now()
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / f"{test_id}_artifact_detection"
test_output_dir.mkdir(exist_ok=True)
save_plots = config['output'].get('save_plots', False)
print("=" * 70)
print("ARTIFACT DETECTION TEST")
print("=" * 70)
print(f"Test ID: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Duration: {config['artifact_detection']['duration']} seconds")
signal_type = config['artifact_detection'].get('signal_type', 'sine')
if signal_type == 'sine':
print(f"Signal Type: Sine wave @ {config['artifact_detection']['test_frequency']} Hz")
elif signal_type == 'chirp':
print(f"Signal Type: Chirp (100 Hz - 8000 Hz)")
else:
print(f"Signal Type: Silent (no playback - noise floor measurement)")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 70)
print("\nDetection Algorithms:")
for detector_name, detector_settings in config['artifact_detection']['detectors'].items():
status = "ENABLED" if detector_settings.get('enabled', False) else "DISABLED"
print(f" - {detector_name}: {status}")
if detector_settings.get('enabled', False):
for param, value in detector_settings.items():
if param != 'enabled':
print(f" {param}: {value}")
print("\n" + "=" * 70)
signal_type = config['artifact_detection'].get('signal_type', 'sine')
if signal_type == 'sine':
freq = config['artifact_detection']['test_frequency']
print(f"STARTING TEST - Playing {freq}Hz sine wave and recording both channels...")
elif signal_type == 'chirp':
print("STARTING TEST - Playing chirp signal (100-8000Hz) and recording both channels...")
else:
print("STARTING TEST - Recording silence (no playback)...")
print("=" * 70)
print("\nChannel 1: Loopback path (direct audio interface loopback)")
print("Channel 2: DUT/Radio path (through beacon and radio transmission)")
print()
try:
result = run_artifact_detection_test(config, save_plots=save_plots, output_dir=test_output_dir)
print("\n" + "=" * 70)
print("TEST COMPLETE - RESULTS")
print("=" * 70)
signal_type = result.get('signal_type', 'sine')
if signal_type == 'chirp':
print(f"\n📊 Signal: Chirp {result['chirp_f0_hz']} Hz → {result['chirp_f1_hz']} Hz")
elif signal_type == 'silent':
print(f"\n📊 Signal: Silent (no playback - noise floor measurement)")
else:
print(f"\n📊 Test Frequency: {result['test_frequency_hz']} Hz")
print(f"⏱️ Duration: {result['duration_sec']} seconds")
print("\n🔊 CHANNEL 1 (LOOPBACK PATH):")
print(f" Total Artifacts: {result['channel_1_loopback']['total_artifacts']}")
print(f" Artifact Rate: {result['channel_1_loopback']['artifact_rate_per_minute']:.2f} per minute")
if result['channel_1_loopback']['artifacts_by_type']:
print(" By Type:")
for artifact_type, count in result['channel_1_loopback']['artifacts_by_type'].items():
print(f" - {artifact_type}: {count}")
print("\n📻 CHANNEL 2 (DUT/RADIO PATH):")
print(f" Total Artifacts: {result['channel_2_dut']['total_artifacts']}")
print(f" Artifact Rate: {result['channel_2_dut']['artifact_rate_per_minute']:.2f} per minute")
if result['channel_2_dut']['artifacts_by_type']:
print(" By Type:")
for artifact_type, count in result['channel_2_dut']['artifacts_by_type'].items():
print(f" - {artifact_type}: {count}")
ch1_count = result['channel_1_loopback']['total_artifacts']
ch2_count = result['channel_2_dut']['total_artifacts']
if ch2_count > ch1_count:
delta = ch2_count - ch1_count
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
elif ch1_count == ch2_count == 0:
print("\n✅ EXCELLENT: No artifacts detected in either path!")
else:
print(f"\n Loopback baseline: {ch1_count} artifacts")
except Exception as e:
print(f"\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
result = {
'error': str(e),
'test_frequency_hz': config['artifact_detection']['test_frequency'],
'duration_sec': config['artifact_detection']['duration']
}
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'serial_number': args.serial_number,
'software_version': args.software_version,
'comment': args.comment
},
'artifact_detection_result': result
}
output_file = results_dir / f"{test_id}_artifact_detection_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
json_output_file = results_dir / f"{test_id}_artifact_detection_results.json"
with open(json_output_file, 'w') as f:
json.dump(output_data, f, indent=2)
print("\n" + "=" * 70)
print("✅ Results saved to:")
print(f" YAML: {output_file}")
print(f" JSON: {json_output_file}")
if save_plots:
print(f" Summary plots: {test_output_dir}/")
print(f" Individual anomaly plots: {test_output_dir}/individual_anomalies/")
print("=" * 70)
if __name__ == '__main__':
main()