Refactoring and minor improvents.
This commit is contained in:
@@ -12,7 +12,7 @@ pip install -r requirements.txt
|
|||||||
### 1. Run Your First Test
|
### 1. Run Your First Test
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
python run_test.py \
|
python test_latency.py \
|
||||||
--serial-number "SN001234" \
|
--serial-number "SN001234" \
|
||||||
--software-version "initial" \
|
--software-version "initial" \
|
||||||
--comment "First test run"
|
--comment "First test run"
|
||||||
@@ -20,10 +20,10 @@ python run_test.py \
|
|||||||
|
|
||||||
**What happens:**
|
**What happens:**
|
||||||
- Auto-detects your Scarlett audio interface
|
- Auto-detects your Scarlett audio interface
|
||||||
- Plays test tones at 7 frequencies (100 Hz to 8 kHz)
|
- Plays chirp signal and measures latency (5 measurements by default)
|
||||||
- Records input/output on both channels
|
- Records input/output on both channels
|
||||||
- Calculates latency, THD, and SNR
|
- Calculates average, min, max, and standard deviation of latency
|
||||||
- Saves results to `test_results/YYYYMMDD_HHMMSS_results.yaml`
|
- Saves results to `test_results/YYYYMMDD_HHMMSS_latency/YYYYMMDD_HHMMSS_latency_results.yaml`
|
||||||
|
|
||||||
### 2. View Results
|
### 2. View Results
|
||||||
|
|
||||||
@@ -38,35 +38,33 @@ python view_results.py test_results/20260226_123456_results.yaml
|
|||||||
python view_results.py example_test_result.yaml
|
python view_results.py example_test_result.yaml
|
||||||
```
|
```
|
||||||
|
|
||||||
### 3. Compare Different PCB Versions
|
### 3. Compare Different Units
|
||||||
|
|
||||||
Run multiple tests with different metadata:
|
Run multiple tests with different metadata:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Test unit SN001234
|
# Test unit SN001234
|
||||||
python run_test.py --serial-number "SN001234" --software-version "abc123"
|
python test_latency.py --serial-number "SN001234" --software-version "abc123"
|
||||||
|
|
||||||
# Test unit SN001235
|
# Test unit SN001235 with more measurements
|
||||||
python run_test.py --serial-number "SN001235" --software-version "abc123"
|
python test_latency.py --serial-number "SN001235" --software-version "abc123" --measurements 10
|
||||||
|
|
||||||
# Compare by viewing both YAML files
|
# Compare by viewing both YAML files
|
||||||
python view_results.py test_results/20260226_120000_results.yaml
|
python view_results.py test_results/20260226_120000_latency/20260226_120000_latency_results.yaml
|
||||||
python view_results.py test_results/20260226_130000_results.yaml
|
python view_results.py test_results/20260226_130000_latency/20260226_130000_latency_results.yaml
|
||||||
```
|
```
|
||||||
|
|
||||||
## Understanding the Output
|
## Understanding the Output
|
||||||
|
|
||||||
Each test produces metrics at 7 frequencies:
|
Each latency test produces:
|
||||||
|
|
||||||
- **Latency (ms)**: Delay between channels (should be near 0 for loopback)
|
- **Average Latency (ms)**: Mean delay across all measurements
|
||||||
- **THD Input (%)**: Distortion in channel 1 (lower is better)
|
- **Min/Max Latency (ms)**: Range of measured values
|
||||||
- **THD Output (%)**: Distortion in channel 2 (lower is better)
|
- **Standard Deviation (ms)**: Consistency of measurements (lower is better)
|
||||||
- **SNR Input (dB)**: Signal quality in channel 1 (higher is better)
|
|
||||||
- **SNR Output (dB)**: Signal quality in channel 2 (higher is better)
|
|
||||||
|
|
||||||
**Good values:**
|
**Good values:**
|
||||||
- THD: < 0.1% (< 0.01% is excellent)
|
- Latency: Depends on your system (audio interface typically < 10ms)
|
||||||
- SNR: > 80 dB (> 90 dB is excellent)
|
- Standard Deviation: < 1ms (consistent measurements)
|
||||||
- Latency: < 5 ms for loopback
|
- Latency: < 5 ms for loopback
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
@@ -74,11 +72,21 @@ Each test produces metrics at 7 frequencies:
|
|||||||
Edit `config.yaml` to customize test parameters:
|
Edit `config.yaml` to customize test parameters:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
test_tones:
|
audio:
|
||||||
frequencies: [1000] # Test only 1 kHz
|
sample_rate: 44100
|
||||||
duration: 3.0 # Shorter test (3 seconds)
|
channels: 2
|
||||||
|
device_name: "Scarlett"
|
||||||
|
|
||||||
|
output:
|
||||||
|
results_dir: "test_results"
|
||||||
|
save_plots: true
|
||||||
```
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python -c "import sounddevice as sd; print(sd.query_devices())"
|
||||||
|
```
|
||||||
|
Update `device_name` in `config.yaml` to match your device.
|
||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
|
|
||||||
**Audio device not found:**
|
**Audio device not found:**
|
||||||
|
|||||||
61
README.md
61
README.md
@@ -4,8 +4,8 @@ Simple Python-based testing system for PCB audio hardware validation.
|
|||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
- **Automated Testing**: Latency, THD, and SNR measurements across multiple frequencies
|
- **Automated Testing**: Latency measurements with configurable iterations
|
||||||
- **Metadata Tracking**: PCB version, revision, software version, timestamps, notes
|
- **Metadata Tracking**: Serial number, software version, timestamps, comments
|
||||||
- **YAML Output**: Human-readable structured results
|
- **YAML Output**: Human-readable structured results
|
||||||
- **Simple Workflow**: Run tests, view results, compare versions
|
- **Simple Workflow**: Run tests, view results, compare versions
|
||||||
|
|
||||||
@@ -19,13 +19,22 @@ pip install -r requirements.txt
|
|||||||
|
|
||||||
### 2. Run a Test
|
### 2. Run a Test
|
||||||
|
|
||||||
|
**Latency Test:**
|
||||||
```bash
|
```bash
|
||||||
python run_test.py \
|
python test_latency.py \
|
||||||
--serial-number "SN001234" \
|
--serial-number "SN001234" \
|
||||||
--software-version "a3f2b1c" \
|
--software-version "a3f2b1c" \
|
||||||
--comment "Replaced capacitor C5"
|
--comment "Replaced capacitor C5"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**Artifact Detection Test:**
|
||||||
|
```bash
|
||||||
|
python test_artifact_detection.py \
|
||||||
|
--serial-number "SN001234" \
|
||||||
|
--software-version "a3f2b1c" \
|
||||||
|
--comment "Baseline test"
|
||||||
|
```
|
||||||
|
|
||||||
### 3. View Results
|
### 3. View Results
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
@@ -42,10 +51,8 @@ python view_results.py test_results/*.yaml | tail -1
|
|||||||
## Test Metrics
|
## Test Metrics
|
||||||
|
|
||||||
- **Latency**: Round-trip delay between input and output channels (ms)
|
- **Latency**: Round-trip delay between input and output channels (ms)
|
||||||
- **THD**: Total Harmonic Distortion for input and output (%)
|
- Average, minimum, maximum, and standard deviation across measurements
|
||||||
- **SNR**: Signal-to-Noise Ratio for input and output (dB)
|
- Uses chirp signal for accurate cross-correlation measurement
|
||||||
|
|
||||||
Tests run at multiple frequencies: 100 Hz, 250 Hz, 500 Hz, 1 kHz, 2 kHz, 4 kHz, 8 kHz
|
|
||||||
|
|
||||||
## Output Format
|
## Output Format
|
||||||
|
|
||||||
@@ -55,27 +62,35 @@ Results are saved as YAML files in `test_results/`:
|
|||||||
metadata:
|
metadata:
|
||||||
test_id: 20260226_123456
|
test_id: 20260226_123456
|
||||||
timestamp: '2026-02-26T12:34:56.789012'
|
timestamp: '2026-02-26T12:34:56.789012'
|
||||||
pcb_version: v2.1
|
serial_number: SN001234
|
||||||
pcb_revision: A
|
|
||||||
software_version: a3f2b1c
|
software_version: a3f2b1c
|
||||||
notes: Replaced capacitor C5
|
comment: Replaced capacitor C5
|
||||||
test_results:
|
latency_test:
|
||||||
- frequency_hz: 1000
|
avg: 2.345
|
||||||
latency_ms: 2.345
|
min: 2.201
|
||||||
thd_input_percent: 0.012
|
max: 2.489
|
||||||
thd_output_percent: 0.034
|
std: 0.087
|
||||||
snr_input_db: 92.5
|
|
||||||
snr_output_db: 89.2
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
Edit `config.yaml` to customize:
|
Edit `config.yaml` to customize:
|
||||||
- Audio device settings
|
- Audio device settings
|
||||||
- Test frequencies
|
|
||||||
- Test duration
|
|
||||||
- Output options
|
- Output options
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
audio:
|
||||||
|
sample_rate: 44100
|
||||||
|
channels: 2
|
||||||
|
device_name: "Scarlett"
|
||||||
|
|
||||||
|
output:
|
||||||
|
results_dir: "test_results"
|
||||||
|
save_plots: true
|
||||||
|
```
|
||||||
|
|
||||||
|
The system auto-detects Focusrite Scarlett audio interfaces.
|
||||||
|
|
||||||
## Hardware Setup
|
## Hardware Setup
|
||||||
|
|
||||||
```
|
```
|
||||||
@@ -83,19 +98,19 @@ Laptop <-> Audio Interface (Scarlett) <-> DUT <-> Audio Interface (Scarlett) <->
|
|||||||
Output Channels 1&2 Input Channels 1&2
|
Output Channels 1&2 Input Channels 1&2
|
||||||
```
|
```
|
||||||
|
|
||||||
The system auto-detects Focusrite Scarlett audio interfaces.
|
|
||||||
|
|
||||||
## File Structure
|
## File Structure
|
||||||
|
|
||||||
```
|
```
|
||||||
closed_loop_audio_test_suite/
|
closed_loop_audio_test_suite/
|
||||||
├── config.yaml # Test configuration
|
├── config.yaml # Test configuration
|
||||||
├── run_test.py # Main test runner
|
├── test_latency.py # Latency test runner
|
||||||
|
├── test_artifact_detection.py # Artifact detection test
|
||||||
├── view_results.py # Results viewer
|
├── view_results.py # Results viewer
|
||||||
├── src/
|
├── src/
|
||||||
│ └── audio_tests.py # Core test functions
|
│ └── audio_tests.py # Core test functions
|
||||||
└── test_results/ # YAML output files
|
└── test_results/ # YAML output files
|
||||||
└── YYYYMMDD_HHMMSS_results.yaml
|
├── YYYYMMDD_HHMMSS_latency/
|
||||||
|
└── YYYYMMDD_HHMMSS_artifact_detection/
|
||||||
```
|
```
|
||||||
|
|
||||||
## Tips
|
## Tips
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ audio:
|
|||||||
|
|
||||||
test_tones:
|
test_tones:
|
||||||
frequencies: [100, 250, 500, 1000, 2000, 4000, 8000] # Hz
|
frequencies: [100, 250, 500, 1000, 2000, 4000, 8000] # Hz
|
||||||
duration: 5.0 # seconds per frequency
|
duration: 10.0 # seconds per frequency
|
||||||
amplitude: 0.5 # 0.0 to 1.0
|
amplitude: 0.5 # 0.0 to 1.0
|
||||||
latency_runs: 5 # Number of latency measurements to average
|
latency_runs: 5 # Number of latency measurements to average
|
||||||
|
|
||||||
@@ -24,17 +24,17 @@ artifact_detection:
|
|||||||
# Chirp signal parameters (used when --signal-type chirp is specified)
|
# Chirp signal parameters (used when --signal-type chirp is specified)
|
||||||
chirp_f0: 100 # Hz - Chirp start frequency
|
chirp_f0: 100 # Hz - Chirp start frequency
|
||||||
chirp_f1: 8000 # Hz - Chirp end frequency
|
chirp_f1: 8000 # Hz - Chirp end frequency
|
||||||
# NOTE: All detectors skip the first 1 second of recording to avoid startup transients
|
# NOTE: All detectors skip the first and last 1 second of recording to avoid startup/shutdown transients
|
||||||
detectors:
|
detectors:
|
||||||
spectral_anomaly:
|
spectral_anomaly:
|
||||||
enabled: false # DISABLED - generates too many false positives, needs better algorithm
|
enabled: false # DISABLED - generates too many false positives, needs better algorithm
|
||||||
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
|
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
|
||||||
amplitude_spikes:
|
amplitude_spikes:
|
||||||
enabled: true
|
enabled: true
|
||||||
threshold_factor: 4.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
|
threshold_factor: 5.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
|
||||||
zero_crossing:
|
zero_crossing:
|
||||||
enabled: false
|
enabled: false
|
||||||
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
|
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
|
||||||
energy_variation:
|
energy_variation:
|
||||||
enabled: false
|
enabled: true
|
||||||
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
|
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
|
||||||
|
|||||||
@@ -330,12 +330,10 @@ def detect_artifacts_amplitude_spikes(signal_data: np.ndarray, sample_rate: int,
|
|||||||
artifacts = []
|
artifacts = []
|
||||||
|
|
||||||
skip_samples = int(sample_rate * 1.0)
|
skip_samples = int(sample_rate * 1.0)
|
||||||
if len(signal_data) <= skip_samples:
|
if len(signal_data) <= 2 * skip_samples:
|
||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
signal_trimmed = signal_data[skip_samples:]
|
envelope = np.abs(signal_data)
|
||||||
|
|
||||||
envelope = np.abs(signal_trimmed)
|
|
||||||
|
|
||||||
window_size = int(sample_rate * 0.01)
|
window_size = int(sample_rate * 0.01)
|
||||||
if window_size % 2 == 0:
|
if window_size % 2 == 0:
|
||||||
@@ -350,36 +348,76 @@ def detect_artifacts_amplitude_spikes(signal_data: np.ndarray, sample_rate: int,
|
|||||||
if mad == 0:
|
if mad == 0:
|
||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
threshold = median_env + threshold_factor * mad * 1.4826
|
threshold_high = median_env + threshold_factor * mad * 1.4826
|
||||||
|
threshold_low = median_env - threshold_factor * mad * 1.4826
|
||||||
|
|
||||||
spike_indices = np.where(envelope_smooth > threshold)[0]
|
# Detect spikes (too high)
|
||||||
|
spike_indices = np.where(envelope_smooth > threshold_high)[0]
|
||||||
|
|
||||||
if len(spike_indices) == 0:
|
# Detect dropouts (too low)
|
||||||
return artifacts
|
dropout_indices = np.where(envelope_smooth < threshold_low)[0]
|
||||||
|
|
||||||
groups = []
|
total_duration = len(signal_data) / sample_rate
|
||||||
current_group = [spike_indices[0]]
|
|
||||||
|
|
||||||
for idx in spike_indices[1:]:
|
# Process spikes
|
||||||
if idx - current_group[-1] <= int(sample_rate * 0.05):
|
if len(spike_indices) > 0:
|
||||||
current_group.append(idx)
|
groups = []
|
||||||
else:
|
current_group = [spike_indices[0]]
|
||||||
groups.append(current_group)
|
|
||||||
current_group = [idx]
|
|
||||||
groups.append(current_group)
|
|
||||||
|
|
||||||
for group in groups:
|
|
||||||
peak_idx = group[np.argmax(envelope_smooth[group])]
|
|
||||||
time_sec = (peak_idx + skip_samples) / sample_rate
|
|
||||||
peak_value = envelope_smooth[peak_idx]
|
|
||||||
|
|
||||||
artifacts.append({
|
for idx in spike_indices[1:]:
|
||||||
'type': 'amplitude_spike',
|
if idx - current_group[-1] <= int(sample_rate * 0.05):
|
||||||
'time_sec': float(time_sec),
|
current_group.append(idx)
|
||||||
'peak_amplitude': float(peak_value),
|
else:
|
||||||
'median_amplitude': float(median_env),
|
groups.append(current_group)
|
||||||
'deviation_factor': float((peak_value - median_env) / (mad * 1.4826)) if mad > 0 else 0
|
current_group = [idx]
|
||||||
})
|
groups.append(current_group)
|
||||||
|
|
||||||
|
for group in groups:
|
||||||
|
peak_idx = group[np.argmax(envelope_smooth[group])]
|
||||||
|
time_sec = peak_idx / sample_rate
|
||||||
|
peak_value = envelope_smooth[peak_idx]
|
||||||
|
|
||||||
|
# Skip artifacts in first and last second
|
||||||
|
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||||
|
continue
|
||||||
|
|
||||||
|
artifacts.append({
|
||||||
|
'type': 'amplitude_spike',
|
||||||
|
'time_sec': float(time_sec),
|
||||||
|
'peak_amplitude': float(peak_value),
|
||||||
|
'median_amplitude': float(median_env),
|
||||||
|
'deviation_factor': float((peak_value - median_env) / (mad * 1.4826)) if mad > 0 else 0
|
||||||
|
})
|
||||||
|
|
||||||
|
# Process dropouts
|
||||||
|
if len(dropout_indices) > 0:
|
||||||
|
groups = []
|
||||||
|
current_group = [dropout_indices[0]]
|
||||||
|
|
||||||
|
for idx in dropout_indices[1:]:
|
||||||
|
if idx - current_group[-1] <= int(sample_rate * 0.05):
|
||||||
|
current_group.append(idx)
|
||||||
|
else:
|
||||||
|
groups.append(current_group)
|
||||||
|
current_group = [idx]
|
||||||
|
groups.append(current_group)
|
||||||
|
|
||||||
|
for group in groups:
|
||||||
|
dropout_idx = group[np.argmin(envelope_smooth[group])]
|
||||||
|
time_sec = dropout_idx / sample_rate
|
||||||
|
dropout_value = envelope_smooth[dropout_idx]
|
||||||
|
|
||||||
|
# Skip artifacts in first and last second
|
||||||
|
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||||
|
continue
|
||||||
|
|
||||||
|
artifacts.append({
|
||||||
|
'type': 'amplitude_dropout',
|
||||||
|
'time_sec': float(time_sec),
|
||||||
|
'dropout_amplitude': float(dropout_value),
|
||||||
|
'median_amplitude': float(median_env),
|
||||||
|
'deviation_factor': float((median_env - dropout_value) / (mad * 1.4826)) if mad > 0 else 0
|
||||||
|
})
|
||||||
|
|
||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
@@ -388,15 +426,14 @@ def detect_artifacts_zero_crossing(signal_data: np.ndarray, sample_rate: int,
|
|||||||
threshold_factor: float = 2.0) -> List[Dict]:
|
threshold_factor: float = 2.0) -> List[Dict]:
|
||||||
artifacts = []
|
artifacts = []
|
||||||
|
|
||||||
skip_samples = int(sample_rate * 1.0)
|
if len(signal_data) <= int(sample_rate * 2.0):
|
||||||
if len(signal_data) <= skip_samples:
|
|
||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
window_size = int(sample_rate * 0.1)
|
window_size = int(sample_rate * 0.1)
|
||||||
hop_size = int(sample_rate * 0.05)
|
hop_size = int(sample_rate * 0.05)
|
||||||
|
|
||||||
zcr_values = []
|
zcr_values = []
|
||||||
for i in range(skip_samples, len(signal_data) - window_size, hop_size):
|
for i in range(0, len(signal_data) - window_size, hop_size):
|
||||||
segment = signal_data[i:i+window_size]
|
segment = signal_data[i:i+window_size]
|
||||||
zero_crossings = np.sum(np.abs(np.diff(np.sign(segment)))) / 2
|
zero_crossings = np.sum(np.abs(np.diff(np.sign(segment)))) / 2
|
||||||
zcr = zero_crossings / len(segment)
|
zcr = zero_crossings / len(segment)
|
||||||
@@ -409,11 +446,19 @@ def detect_artifacts_zero_crossing(signal_data: np.ndarray, sample_rate: int,
|
|||||||
median_zcr = np.median(zcr_array)
|
median_zcr = np.median(zcr_array)
|
||||||
std_zcr = np.std(zcr_array)
|
std_zcr = np.std(zcr_array)
|
||||||
|
|
||||||
|
total_duration = len(signal_data) / sample_rate
|
||||||
|
|
||||||
for i, zcr in zcr_values:
|
for i, zcr in zcr_values:
|
||||||
|
time_sec = i / sample_rate
|
||||||
|
|
||||||
|
# Skip artifacts in first and last second
|
||||||
|
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||||
|
continue
|
||||||
|
|
||||||
if std_zcr > 0 and abs(zcr - median_zcr) > threshold_factor * std_zcr:
|
if std_zcr > 0 and abs(zcr - median_zcr) > threshold_factor * std_zcr:
|
||||||
artifacts.append({
|
artifacts.append({
|
||||||
'type': 'zero_crossing_anomaly',
|
'type': 'zero_crossing_anomaly',
|
||||||
'time_sec': i / sample_rate,
|
'time_sec': float(time_sec),
|
||||||
'zcr_value': float(zcr),
|
'zcr_value': float(zcr),
|
||||||
'median_zcr': float(median_zcr),
|
'median_zcr': float(median_zcr),
|
||||||
'deviation_factor': float(abs(zcr - median_zcr) / std_zcr)
|
'deviation_factor': float(abs(zcr - median_zcr) / std_zcr)
|
||||||
@@ -426,19 +471,20 @@ def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
|
|||||||
threshold_db: float = 6.0) -> List[Dict]:
|
threshold_db: float = 6.0) -> List[Dict]:
|
||||||
artifacts = []
|
artifacts = []
|
||||||
|
|
||||||
skip_samples = int(sample_rate * 1.0)
|
if len(signal_data) <= int(sample_rate * 2.0):
|
||||||
if len(signal_data) <= skip_samples:
|
|
||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
window_size = int(sample_rate * 0.1)
|
window_size = int(sample_rate * 0.1)
|
||||||
hop_size = int(sample_rate * 0.05)
|
hop_size = int(sample_rate * 0.05)
|
||||||
|
|
||||||
energy_values = []
|
energy_values = []
|
||||||
for i in range(skip_samples, len(signal_data) - window_size, hop_size):
|
for i in range(0, len(signal_data) - window_size, hop_size):
|
||||||
segment = signal_data[i:i+window_size]
|
segment = signal_data[i:i+window_size]
|
||||||
energy = np.sum(segment**2)
|
energy = np.sum(segment**2)
|
||||||
energy_values.append((i, energy))
|
energy_values.append((i, energy))
|
||||||
|
|
||||||
|
total_duration = len(signal_data) / sample_rate
|
||||||
|
|
||||||
for idx in range(1, len(energy_values)):
|
for idx in range(1, len(energy_values)):
|
||||||
prev_energy = energy_values[idx-1][1]
|
prev_energy = energy_values[idx-1][1]
|
||||||
curr_energy = energy_values[idx][1]
|
curr_energy = energy_values[idx][1]
|
||||||
@@ -447,17 +493,61 @@ def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
|
|||||||
energy_change_db = 10 * np.log10(curr_energy / prev_energy)
|
energy_change_db = 10 * np.log10(curr_energy / prev_energy)
|
||||||
|
|
||||||
if abs(energy_change_db) > threshold_db:
|
if abs(energy_change_db) > threshold_db:
|
||||||
|
time_sec = energy_values[idx][0] / sample_rate
|
||||||
|
|
||||||
|
# Skip artifacts in first and last second
|
||||||
|
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||||
|
continue
|
||||||
|
|
||||||
artifacts.append({
|
artifacts.append({
|
||||||
'type': 'energy_variation',
|
'type': 'energy_variation',
|
||||||
'time_sec': energy_values[idx][0] / sample_rate,
|
'time_sec': float(time_sec),
|
||||||
'energy_change_db': float(energy_change_db),
|
'energy_change_db': float(energy_change_db),
|
||||||
'prev_energy': float(prev_energy),
|
'threshold_db': float(threshold_db)
|
||||||
'curr_energy': float(curr_energy)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
|
|
||||||
|
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
|
||||||
|
expected_freq: float) -> Dict:
|
||||||
|
"""
|
||||||
|
Measure the actual dominant frequency in the signal and compare to expected.
|
||||||
|
Uses FFT on the full signal (skipping first and last second).
|
||||||
|
"""
|
||||||
|
# Skip first and last second
|
||||||
|
skip_samples = int(sample_rate * 1.0)
|
||||||
|
if len(signal_data) <= 2 * skip_samples:
|
||||||
|
return {
|
||||||
|
'expected_freq_hz': float(expected_freq),
|
||||||
|
'measured_freq_hz': 0.0,
|
||||||
|
'error_hz': 0.0,
|
||||||
|
'error_percent': 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
signal_trimmed = signal_data[skip_samples:-skip_samples]
|
||||||
|
|
||||||
|
# Perform FFT
|
||||||
|
fft = np.fft.rfft(signal_trimmed)
|
||||||
|
freqs = np.fft.rfftfreq(len(signal_trimmed), 1/sample_rate)
|
||||||
|
|
||||||
|
# Find the peak frequency
|
||||||
|
magnitude = np.abs(fft)
|
||||||
|
peak_idx = np.argmax(magnitude)
|
||||||
|
measured_freq = freqs[peak_idx]
|
||||||
|
|
||||||
|
# Calculate error
|
||||||
|
error_hz = measured_freq - expected_freq
|
||||||
|
error_percent = (error_hz / expected_freq) * 100.0 if expected_freq > 0 else 0.0
|
||||||
|
|
||||||
|
return {
|
||||||
|
'expected_freq_hz': float(expected_freq),
|
||||||
|
'measured_freq_hz': float(measured_freq),
|
||||||
|
'error_hz': float(error_hz),
|
||||||
|
'error_percent': float(error_percent)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundamental_freq: float,
|
def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundamental_freq: float,
|
||||||
detector_config: Dict) -> Dict:
|
detector_config: Dict) -> Dict:
|
||||||
all_artifacts = []
|
all_artifacts = []
|
||||||
@@ -482,10 +572,14 @@ def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundame
|
|||||||
artifacts = detect_artifacts_energy_variation(signal_data, sample_rate, threshold)
|
artifacts = detect_artifacts_energy_variation(signal_data, sample_rate, threshold)
|
||||||
all_artifacts.extend(artifacts)
|
all_artifacts.extend(artifacts)
|
||||||
|
|
||||||
|
# Measure frequency accuracy
|
||||||
|
freq_accuracy = measure_frequency_accuracy(signal_data, sample_rate, fundamental_freq)
|
||||||
|
|
||||||
artifact_summary = {
|
artifact_summary = {
|
||||||
'total_count': len(all_artifacts),
|
'total_count': len(all_artifacts),
|
||||||
'by_type': {},
|
'by_type': {},
|
||||||
'artifacts': all_artifacts
|
'artifacts': all_artifacts,
|
||||||
|
'frequency_accuracy': freq_accuracy
|
||||||
}
|
}
|
||||||
|
|
||||||
for artifact in all_artifacts:
|
for artifact in all_artifacts:
|
||||||
@@ -671,12 +765,14 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
|
|||||||
'channel_1_loopback': {
|
'channel_1_loopback': {
|
||||||
'total_artifacts': artifacts_ch1['total_count'],
|
'total_artifacts': artifacts_ch1['total_count'],
|
||||||
'artifacts_by_type': artifacts_ch1['by_type'],
|
'artifacts_by_type': artifacts_ch1['by_type'],
|
||||||
'artifact_rate_per_minute': float(artifacts_ch1['total_count'] / duration * 60)
|
'artifact_rate_per_minute': float(artifacts_ch1['total_count'] / duration * 60),
|
||||||
|
'frequency_accuracy': artifacts_ch1['frequency_accuracy']
|
||||||
},
|
},
|
||||||
'channel_2_dut': {
|
'channel_2_dut': {
|
||||||
'total_artifacts': artifacts_ch2['total_count'],
|
'total_artifacts': artifacts_ch2['total_count'],
|
||||||
'artifacts_by_type': artifacts_ch2['by_type'],
|
'artifacts_by_type': artifacts_ch2['by_type'],
|
||||||
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60)
|
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
|
||||||
|
'frequency_accuracy': artifacts_ch2['frequency_accuracy']
|
||||||
},
|
},
|
||||||
'detector_config': detector_config
|
'detector_config': detector_config
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import yaml
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import sys
|
import sys
|
||||||
import json
|
|
||||||
|
|
||||||
sys.path.insert(0, str(Path(__file__).parent))
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
from src.audio_tests import run_artifact_detection_test
|
from src.audio_tests import run_artifact_detection_test
|
||||||
@@ -111,6 +110,14 @@ def main():
|
|||||||
for artifact_type, count in result['channel_1_loopback']['artifacts_by_type'].items():
|
for artifact_type, count in result['channel_1_loopback']['artifacts_by_type'].items():
|
||||||
print(f" - {artifact_type}: {count}")
|
print(f" - {artifact_type}: {count}")
|
||||||
|
|
||||||
|
# Display frequency accuracy for channel 1
|
||||||
|
if 'frequency_accuracy' in result['channel_1_loopback']:
|
||||||
|
freq_acc = result['channel_1_loopback']['frequency_accuracy']
|
||||||
|
print(f" Frequency Accuracy:")
|
||||||
|
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
|
||||||
|
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
|
||||||
|
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
|
||||||
|
|
||||||
print("\n📻 CHANNEL 2 (DUT/RADIO PATH):")
|
print("\n📻 CHANNEL 2 (DUT/RADIO PATH):")
|
||||||
print(f" Total Artifacts: {result['channel_2_dut']['total_artifacts']}")
|
print(f" Total Artifacts: {result['channel_2_dut']['total_artifacts']}")
|
||||||
print(f" Artifact Rate: {result['channel_2_dut']['artifact_rate_per_minute']:.2f} per minute")
|
print(f" Artifact Rate: {result['channel_2_dut']['artifact_rate_per_minute']:.2f} per minute")
|
||||||
@@ -119,6 +126,14 @@ def main():
|
|||||||
for artifact_type, count in result['channel_2_dut']['artifacts_by_type'].items():
|
for artifact_type, count in result['channel_2_dut']['artifacts_by_type'].items():
|
||||||
print(f" - {artifact_type}: {count}")
|
print(f" - {artifact_type}: {count}")
|
||||||
|
|
||||||
|
# Display frequency accuracy for channel 2
|
||||||
|
if 'frequency_accuracy' in result['channel_2_dut']:
|
||||||
|
freq_acc = result['channel_2_dut']['frequency_accuracy']
|
||||||
|
print(f" Frequency Accuracy:")
|
||||||
|
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
|
||||||
|
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
|
||||||
|
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
|
||||||
|
|
||||||
ch1_count = result['channel_1_loopback']['total_artifacts']
|
ch1_count = result['channel_1_loopback']['total_artifacts']
|
||||||
ch2_count = result['channel_2_dut']['total_artifacts']
|
ch2_count = result['channel_2_dut']['total_artifacts']
|
||||||
|
|
||||||
@@ -151,18 +166,13 @@ def main():
|
|||||||
'artifact_detection_result': result
|
'artifact_detection_result': result
|
||||||
}
|
}
|
||||||
|
|
||||||
output_file = results_dir / f"{test_id}_artifact_detection_results.yaml"
|
output_file = test_output_dir / f"{test_id}_artifact_detection_results.yaml"
|
||||||
with open(output_file, 'w') as f:
|
with open(output_file, 'w') as f:
|
||||||
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
||||||
|
|
||||||
json_output_file = results_dir / f"{test_id}_artifact_detection_results.json"
|
|
||||||
with open(json_output_file, 'w') as f:
|
|
||||||
json.dump(output_data, f, indent=2)
|
|
||||||
|
|
||||||
print("\n" + "=" * 70)
|
print("\n" + "=" * 70)
|
||||||
print("✅ Results saved to:")
|
print("✅ Results saved to:")
|
||||||
print(f" YAML: {output_file}")
|
print(f" YAML: {output_file}")
|
||||||
print(f" JSON: {json_output_file}")
|
|
||||||
if save_plots:
|
if save_plots:
|
||||||
print(f" Summary plots: {test_output_dir}/")
|
print(f" Summary plots: {test_output_dir}/")
|
||||||
print(f" Individual anomaly plots: {test_output_dir}/individual_anomalies/")
|
print(f" Individual anomaly plots: {test_output_dir}/individual_anomalies/")
|
||||||
|
|||||||
@@ -6,15 +6,16 @@ from pathlib import Path
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
sys.path.insert(0, str(Path(__file__).parent))
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
from src.audio_tests import run_single_test, run_latency_test
|
from src.audio_tests import run_latency_test
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='Run PCB hardware audio tests')
|
parser = argparse.ArgumentParser(description='Run latency test on audio loopback and radio path')
|
||||||
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
|
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
|
||||||
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
|
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
|
||||||
parser.add_argument('--comment', default='', help='Comments about this test')
|
parser.add_argument('--comment', default='', help='Comments about this test')
|
||||||
parser.add_argument('--config', default='config.yaml', help='Path to config file')
|
parser.add_argument('--config', default='config.yaml', help='Path to config file')
|
||||||
|
parser.add_argument('--measurements', type=int, default=5, help='Number of latency measurements (default: 5)')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
@@ -27,47 +28,32 @@ def main():
|
|||||||
results_dir = Path(config['output']['results_dir'])
|
results_dir = Path(config['output']['results_dir'])
|
||||||
results_dir.mkdir(exist_ok=True)
|
results_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
test_output_dir = results_dir / test_id
|
test_output_dir = results_dir / f"{test_id}_latency"
|
||||||
test_output_dir.mkdir(exist_ok=True)
|
test_output_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
save_plots = config['output'].get('save_plots', False)
|
save_plots = config['output'].get('save_plots', False)
|
||||||
|
|
||||||
print(f"Starting audio test run: {test_id}")
|
print(f"Starting latency test: {test_id}")
|
||||||
print(f"Serial Number: {args.serial_number}")
|
print(f"Serial Number: {args.serial_number}")
|
||||||
print(f"Software: {args.software_version}")
|
print(f"Software: {args.software_version}")
|
||||||
if args.comment:
|
if args.comment:
|
||||||
print(f"Comment: {args.comment}")
|
print(f"Comment: {args.comment}")
|
||||||
|
print(f"Measurements: {args.measurements}")
|
||||||
if save_plots:
|
if save_plots:
|
||||||
print(f"Plots will be saved to: {test_output_dir}")
|
print(f"Plots will be saved to: {test_output_dir}")
|
||||||
print("-" * 60)
|
print("-" * 60)
|
||||||
|
|
||||||
print("\n[1/2] Running chirp-based latency test (5 measurements)...")
|
print(f"\nRunning chirp-based latency test ({args.measurements} measurements)...")
|
||||||
try:
|
try:
|
||||||
latency_stats = run_latency_test(config, num_measurements=5,
|
latency_stats = run_latency_test(config, num_measurements=args.measurements,
|
||||||
save_plots=save_plots, output_dir=test_output_dir)
|
save_plots=save_plots, output_dir=test_output_dir)
|
||||||
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, "
|
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, "
|
||||||
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms")
|
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
|
||||||
|
f"std={latency_stats['std']:.3f}ms")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"✗ Error: {e}")
|
print(f"✗ Error: {e}")
|
||||||
latency_stats = {'avg': 0.0, 'min': 0.0, 'max': 0.0, 'std': 0.0, 'error': str(e)}
|
latency_stats = {'avg': 0.0, 'min': 0.0, 'max': 0.0, 'std': 0.0, 'error': str(e)}
|
||||||
|
|
||||||
print("\n[2/2] Running frequency sweep tests...")
|
|
||||||
test_results = []
|
|
||||||
frequencies = config['test_tones']['frequencies']
|
|
||||||
|
|
||||||
for i, freq in enumerate(frequencies, 1):
|
|
||||||
print(f"Testing frequency {i}/{len(frequencies)}: {freq} Hz...", end=' ', flush=True)
|
|
||||||
try:
|
|
||||||
result = run_single_test(freq, config, save_plots=save_plots, output_dir=test_output_dir)
|
|
||||||
test_results.append(result)
|
|
||||||
print("✓")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"✗ Error: {e}")
|
|
||||||
test_results.append({
|
|
||||||
'frequency_hz': freq,
|
|
||||||
'error': str(e)
|
|
||||||
})
|
|
||||||
|
|
||||||
output_data = {
|
output_data = {
|
||||||
'metadata': {
|
'metadata': {
|
||||||
'test_id': test_id,
|
'test_id': test_id,
|
||||||
@@ -76,11 +62,10 @@ def main():
|
|||||||
'software_version': args.software_version,
|
'software_version': args.software_version,
|
||||||
'comment': args.comment
|
'comment': args.comment
|
||||||
},
|
},
|
||||||
'latency_test': latency_stats,
|
'latency_test': latency_stats
|
||||||
'test_results': test_results
|
|
||||||
}
|
}
|
||||||
|
|
||||||
output_file = results_dir / f"{test_id}_results.yaml"
|
output_file = test_output_dir / f"{test_id}_latency_results.yaml"
|
||||||
with open(output_file, 'w') as f:
|
with open(output_file, 'w') as f:
|
||||||
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
||||||
|
|
||||||
Reference in New Issue
Block a user