Compare commits
7 Commits
80f7522159
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c7de92ae9 | ||
|
|
dd118ddb23 | ||
|
|
31cc2c0e92 | ||
|
|
2cab55c8cd | ||
|
|
8d3a144614 | ||
|
|
5d5a131b77 | ||
|
|
39bcd072c0 |
193
ARTIFACT_DETECTION_README.md
Normal file
193
ARTIFACT_DETECTION_README.md
Normal file
@@ -0,0 +1,193 @@
|
||||
# Artifact Detection Test
|
||||
|
||||
## Overview
|
||||
This test plays a 1kHz sine wave for a configurable duration (default 60 seconds) and records both channels simultaneously:
|
||||
- **Channel 1**: Loopback path (direct audio interface connection)
|
||||
- **Channel 2**: DUT/Radio path (through beacon and radio transmission)
|
||||
|
||||
The test detects buzzing, clicks, dropouts, and other audio artifacts using multiple configurable algorithms.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
python test_artifact_detection.py \
|
||||
--serial-number SN001234 \
|
||||
--software-version abc123 \
|
||||
--comment "Testing new firmware"
|
||||
```
|
||||
|
||||
## Detection Algorithms
|
||||
|
||||
The test uses four configurable detection algorithms (spectral_anomaly is **disabled by default** due to false positives):
|
||||
|
||||
### 1. Spectral Anomaly Detection (DISABLED BY DEFAULT)
|
||||
- **Status**: ⚠️ Currently generates too many false positives - disabled by default
|
||||
- **What it detects**: Unexpected frequencies that aren't harmonics of the fundamental tone
|
||||
- **Use case**: Buzzing, interference, crosstalk
|
||||
- **Configuration**: `threshold_db` - how far below fundamental to search (-60 dB default)
|
||||
|
||||
### 2. Amplitude Spike Detection (WORKING)
|
||||
- **What it detects**: Sudden changes in signal amplitude (RMS)
|
||||
- **Use case**: Clicks, pops, dropouts
|
||||
- **Configuration**: `threshold_factor` - number of standard deviations (3.0 default)
|
||||
|
||||
### 3. Zero-Crossing Anomaly Detection (WORKING)
|
||||
- **What it detects**: Irregular zero-crossing patterns
|
||||
- **Use case**: Distortion, clipping, non-linear artifacts
|
||||
- **Configuration**: `threshold_factor` - number of standard deviations (2.0 default)
|
||||
|
||||
### 4. Energy Variation Detection (WORKING)
|
||||
- **What it detects**: Rapid energy changes between time windows
|
||||
- **Use case**: Dropouts, level fluctuations, intermittent issues
|
||||
- **Configuration**: `threshold_db` - energy change threshold (6.0 dB default)
|
||||
|
||||
## Configuration
|
||||
|
||||
Edit `config.yaml` to customize the test:
|
||||
|
||||
```yaml
|
||||
artifact_detection:
|
||||
test_frequency: 1000 # Hz
|
||||
duration: 60.0 # seconds
|
||||
amplitude: 0.5 # 0.0 to 1.0
|
||||
detectors:
|
||||
spectral_anomaly:
|
||||
enabled: true
|
||||
threshold_db: -40
|
||||
amplitude_spikes:
|
||||
enabled: true
|
||||
threshold_factor: 3.0
|
||||
zero_crossing:
|
||||
enabled: true
|
||||
threshold_factor: 2.0
|
||||
energy_variation:
|
||||
enabled: true
|
||||
threshold_db: 6.0
|
||||
```
|
||||
|
||||
## Command Line Options
|
||||
|
||||
- `--serial-number`: Serial number (required)
|
||||
- `--software-version`: Git commit hash or version (required)
|
||||
- `--comment`: Optional comments about the test
|
||||
- `--config`: Path to config file (default: config.yaml)
|
||||
- `--duration`: Override duration in seconds
|
||||
- `--frequency`: Override test frequency in Hz
|
||||
|
||||
## Example: Quick 10-second Test
|
||||
|
||||
```bash
|
||||
python test_artifact_detection.py \
|
||||
--serial-number SN001234 \
|
||||
--software-version abc123 \
|
||||
--duration 10
|
||||
```
|
||||
|
||||
## Example: Custom Frequency
|
||||
|
||||
```bash
|
||||
python test_artifact_detection.py \
|
||||
--serial-number SN001234 \
|
||||
--software-version abc123 \
|
||||
--frequency 440
|
||||
```
|
||||
|
||||
## Tuning Detection Algorithms
|
||||
|
||||
### More Sensitive Detection
|
||||
To catch more subtle artifacts, make thresholds stricter:
|
||||
|
||||
```yaml
|
||||
detectors:
|
||||
spectral_anomaly:
|
||||
threshold_db: -50 # Lower = more sensitive
|
||||
amplitude_spikes:
|
||||
threshold_factor: 2.0 # Lower = more sensitive
|
||||
zero_crossing:
|
||||
threshold_factor: 1.5 # Lower = more sensitive
|
||||
energy_variation:
|
||||
threshold_db: 3.0 # Lower = more sensitive
|
||||
```
|
||||
|
||||
### Less Sensitive Detection
|
||||
To reduce false positives in noisy environments:
|
||||
|
||||
```yaml
|
||||
detectors:
|
||||
spectral_anomaly:
|
||||
threshold_db: -30 # Higher = less sensitive
|
||||
amplitude_spikes:
|
||||
threshold_factor: 4.0 # Higher = less sensitive
|
||||
zero_crossing:
|
||||
threshold_factor: 3.0 # Higher = less sensitive
|
||||
energy_variation:
|
||||
threshold_db: 10.0 # Higher = less sensitive
|
||||
```
|
||||
|
||||
### Disable Specific Detectors
|
||||
```yaml
|
||||
detectors:
|
||||
spectral_anomaly:
|
||||
enabled: false # Turn off this detector
|
||||
```
|
||||
|
||||
## Output
|
||||
|
||||
The test generates:
|
||||
1. **YAML results file**: `test_results/{timestamp}_artifact_detection_results.yaml`
|
||||
2. **JSON results file**: `test_results/{timestamp}_artifact_detection_results.json`
|
||||
3. **Summary plots** (if enabled): `test_results/{timestamp}_artifact_detection/`
|
||||
- Time domain waveforms with artifact markers
|
||||
- Frequency spectrum analysis
|
||||
4. **Individual anomaly plots**: `test_results/{timestamp}_artifact_detection/individual_anomalies/`
|
||||
- Each anomaly plotted individually with ~20 periods of context
|
||||
- Detailed view showing exactly what the anomaly looks like
|
||||
- Named by channel, type, and timestamp for easy identification
|
||||
|
||||
### Results Structure
|
||||
|
||||
```yaml
|
||||
metadata:
|
||||
test_id: "20260317_140530"
|
||||
timestamp: "2026-03-17T14:05:30.123456"
|
||||
test_type: "artifact_detection"
|
||||
pcb_version: "v2.1"
|
||||
pcb_revision: "A"
|
||||
software_version: "abc123"
|
||||
|
||||
artifact_detection_result:
|
||||
test_frequency_hz: 1000
|
||||
duration_sec: 60.0
|
||||
channel_1_loopback:
|
||||
total_artifacts: 5
|
||||
artifact_rate_per_minute: 5.0
|
||||
artifacts_by_type:
|
||||
spectral_anomaly: 2
|
||||
amplitude_spike: 3
|
||||
channel_2_dut:
|
||||
total_artifacts: 23
|
||||
artifact_rate_per_minute: 23.0
|
||||
artifacts_by_type:
|
||||
spectral_anomaly: 8
|
||||
amplitude_spike: 10
|
||||
energy_variation: 5
|
||||
detector_config: {...}
|
||||
```
|
||||
|
||||
## Interpreting Results
|
||||
|
||||
- **Zero artifacts in both channels**: Excellent signal quality
|
||||
- **Same artifacts in both channels**: Likely environmental interference or audio interface issue
|
||||
- **More artifacts in Channel 2 (radio path)**: Radio transmission degradation detected
|
||||
- **High spectral_anomaly count**: Interference or crosstalk
|
||||
- **High amplitude_spike count**: Clicks, pops, or dropouts
|
||||
- **High energy_variation count**: Level instability or dropouts
|
||||
|
||||
## Comparison with Loopback Baseline
|
||||
|
||||
The loopback path (Channel 1) serves as a baseline reference. Any additional artifacts in the radio path (Channel 2) indicate degradation introduced by the radio transmission system.
|
||||
|
||||
Expected behavior:
|
||||
- Loopback should have minimal artifacts (ideally zero)
|
||||
- Radio path may have some artifacts due to transmission
|
||||
- Large difference indicates issues in radio hardware/firmware
|
||||
@@ -12,19 +12,18 @@ pip install -r requirements.txt
|
||||
### 1. Run Your First Test
|
||||
|
||||
```bash
|
||||
python run_test.py \
|
||||
--pcb-version "v1.0" \
|
||||
--pcb-revision "A" \
|
||||
python test_latency.py \
|
||||
--serial-number "SN001234" \
|
||||
--software-version "initial" \
|
||||
--notes "First test run"
|
||||
--comment "First test run"
|
||||
```
|
||||
|
||||
**What happens:**
|
||||
- Auto-detects your Scarlett audio interface
|
||||
- Plays test tones at 7 frequencies (100 Hz to 8 kHz)
|
||||
- Plays chirp signal and measures latency (5 measurements by default)
|
||||
- Records input/output on both channels
|
||||
- Calculates latency, THD, and SNR
|
||||
- Saves results to `test_results/YYYYMMDD_HHMMSS_results.yaml`
|
||||
- Calculates average, min, max, and standard deviation of latency
|
||||
- Saves results to `test_results/YYYYMMDD_HHMMSS_latency/YYYYMMDD_HHMMSS_latency_results.yaml`
|
||||
|
||||
### 2. View Results
|
||||
|
||||
@@ -39,35 +38,33 @@ python view_results.py test_results/20260226_123456_results.yaml
|
||||
python view_results.py example_test_result.yaml
|
||||
```
|
||||
|
||||
### 3. Compare Different PCB Versions
|
||||
### 3. Compare Different Units
|
||||
|
||||
Run multiple tests with different metadata:
|
||||
|
||||
```bash
|
||||
# Test PCB v1.0
|
||||
python run_test.py --pcb-version "v1.0" --pcb-revision "A" --software-version "abc123"
|
||||
# Test unit SN001234
|
||||
python test_latency.py --serial-number "SN001234" --software-version "abc123"
|
||||
|
||||
# Test PCB v2.0
|
||||
python run_test.py --pcb-version "v2.0" --pcb-revision "A" --software-version "abc123"
|
||||
# Test unit SN001235 with more measurements
|
||||
python test_latency.py --serial-number "SN001235" --software-version "abc123" --measurements 10
|
||||
|
||||
# Compare by viewing both YAML files
|
||||
python view_results.py test_results/20260226_120000_results.yaml
|
||||
python view_results.py test_results/20260226_130000_results.yaml
|
||||
python view_results.py test_results/20260226_120000_latency/20260226_120000_latency_results.yaml
|
||||
python view_results.py test_results/20260226_130000_latency/20260226_130000_latency_results.yaml
|
||||
```
|
||||
|
||||
## Understanding the Output
|
||||
|
||||
Each test produces metrics at 7 frequencies:
|
||||
Each latency test produces:
|
||||
|
||||
- **Latency (ms)**: Delay between channels (should be near 0 for loopback)
|
||||
- **THD Input (%)**: Distortion in channel 1 (lower is better)
|
||||
- **THD Output (%)**: Distortion in channel 2 (lower is better)
|
||||
- **SNR Input (dB)**: Signal quality in channel 1 (higher is better)
|
||||
- **SNR Output (dB)**: Signal quality in channel 2 (higher is better)
|
||||
- **Average Latency (ms)**: Mean delay across all measurements
|
||||
- **Min/Max Latency (ms)**: Range of measured values
|
||||
- **Standard Deviation (ms)**: Consistency of measurements (lower is better)
|
||||
|
||||
**Good values:**
|
||||
- THD: < 0.1% (< 0.01% is excellent)
|
||||
- SNR: > 80 dB (> 90 dB is excellent)
|
||||
- Latency: Depends on your system (audio interface typically < 10ms)
|
||||
- Standard Deviation: < 1ms (consistent measurements)
|
||||
- Latency: < 5 ms for loopback
|
||||
|
||||
## Configuration
|
||||
@@ -75,11 +72,21 @@ Each test produces metrics at 7 frequencies:
|
||||
Edit `config.yaml` to customize test parameters:
|
||||
|
||||
```yaml
|
||||
test_tones:
|
||||
frequencies: [1000] # Test only 1 kHz
|
||||
duration: 3.0 # Shorter test (3 seconds)
|
||||
audio:
|
||||
sample_rate: 44100
|
||||
channels: 2
|
||||
device_name: "Scarlett"
|
||||
|
||||
output:
|
||||
results_dir: "test_results"
|
||||
save_plots: true
|
||||
```
|
||||
|
||||
```bash
|
||||
python -c "import sounddevice as sd; print(sd.query_devices())"
|
||||
```
|
||||
Update `device_name` in `config.yaml` to match your device.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**Audio device not found:**
|
||||
|
||||
66
README.md
66
README.md
@@ -4,8 +4,8 @@ Simple Python-based testing system for PCB audio hardware validation.
|
||||
|
||||
## Features
|
||||
|
||||
- **Automated Testing**: Latency, THD, and SNR measurements across multiple frequencies
|
||||
- **Metadata Tracking**: PCB version, revision, software version, timestamps, notes
|
||||
- **Automated Testing**: Latency measurements with configurable iterations
|
||||
- **Metadata Tracking**: Serial number, software version, timestamps, comments
|
||||
- **YAML Output**: Human-readable structured results
|
||||
- **Simple Workflow**: Run tests, view results, compare versions
|
||||
|
||||
@@ -19,12 +19,20 @@ pip install -r requirements.txt
|
||||
|
||||
### 2. Run a Test
|
||||
|
||||
**Latency Test:**
|
||||
```bash
|
||||
python run_test.py \
|
||||
--pcb-version "v2.1" \
|
||||
--pcb-revision "A" \
|
||||
python test_latency.py \
|
||||
--serial-number "SN001234" \
|
||||
--software-version "a3f2b1c" \
|
||||
--notes "Replaced capacitor C5"
|
||||
--comment "Replaced capacitor C5"
|
||||
```
|
||||
|
||||
**Artifact Detection Test:**
|
||||
```bash
|
||||
python test_artifact_detection.py \
|
||||
--serial-number "SN001234" \
|
||||
--software-version "a3f2b1c" \
|
||||
--comment "Baseline test"
|
||||
```
|
||||
|
||||
### 3. View Results
|
||||
@@ -43,10 +51,8 @@ python view_results.py test_results/*.yaml | tail -1
|
||||
## Test Metrics
|
||||
|
||||
- **Latency**: Round-trip delay between input and output channels (ms)
|
||||
- **THD**: Total Harmonic Distortion for input and output (%)
|
||||
- **SNR**: Signal-to-Noise Ratio for input and output (dB)
|
||||
|
||||
Tests run at multiple frequencies: 100 Hz, 250 Hz, 500 Hz, 1 kHz, 2 kHz, 4 kHz, 8 kHz
|
||||
- Average, minimum, maximum, and standard deviation across measurements
|
||||
- Uses chirp signal for accurate cross-correlation measurement
|
||||
|
||||
## Output Format
|
||||
|
||||
@@ -56,27 +62,35 @@ Results are saved as YAML files in `test_results/`:
|
||||
metadata:
|
||||
test_id: 20260226_123456
|
||||
timestamp: '2026-02-26T12:34:56.789012'
|
||||
pcb_version: v2.1
|
||||
pcb_revision: A
|
||||
serial_number: SN001234
|
||||
software_version: a3f2b1c
|
||||
notes: Replaced capacitor C5
|
||||
test_results:
|
||||
- frequency_hz: 1000
|
||||
latency_ms: 2.345
|
||||
thd_input_percent: 0.012
|
||||
thd_output_percent: 0.034
|
||||
snr_input_db: 92.5
|
||||
snr_output_db: 89.2
|
||||
comment: Replaced capacitor C5
|
||||
latency_test:
|
||||
avg: 2.345
|
||||
min: 2.201
|
||||
max: 2.489
|
||||
std: 0.087
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
Edit `config.yaml` to customize:
|
||||
- Audio device settings
|
||||
- Test frequencies
|
||||
- Test duration
|
||||
- Output options
|
||||
|
||||
```yaml
|
||||
audio:
|
||||
sample_rate: 44100
|
||||
channels: 2
|
||||
device_name: "Scarlett"
|
||||
|
||||
output:
|
||||
results_dir: "test_results"
|
||||
save_plots: true
|
||||
```
|
||||
|
||||
The system auto-detects Focusrite Scarlett audio interfaces.
|
||||
|
||||
## Hardware Setup
|
||||
|
||||
```
|
||||
@@ -84,19 +98,19 @@ Laptop <-> Audio Interface (Scarlett) <-> DUT <-> Audio Interface (Scarlett) <->
|
||||
Output Channels 1&2 Input Channels 1&2
|
||||
```
|
||||
|
||||
The system auto-detects Focusrite Scarlett audio interfaces.
|
||||
|
||||
## File Structure
|
||||
|
||||
```
|
||||
closed_loop_audio_test_suite/
|
||||
├── config.yaml # Test configuration
|
||||
├── run_test.py # Main test runner
|
||||
├── test_latency.py # Latency test runner
|
||||
├── test_artifact_detection.py # Artifact detection test
|
||||
├── view_results.py # Results viewer
|
||||
├── src/
|
||||
│ └── audio_tests.py # Core test functions
|
||||
└── test_results/ # YAML output files
|
||||
└── YYYYMMDD_HHMMSS_results.yaml
|
||||
├── YYYYMMDD_HHMMSS_latency/
|
||||
└── YYYYMMDD_HHMMSS_artifact_detection/
|
||||
```
|
||||
|
||||
## Tips
|
||||
|
||||
@@ -35,4 +35,20 @@ Play a sine in different frequencies, and for every frequency 5 sec long and do
|
||||
|
||||
Dont do fourier yet.
|
||||
|
||||
Do a simple project.
|
||||
Do a simple project.
|
||||
|
||||
|
||||
|
||||
|
||||
----
|
||||
|
||||
I want you to write a new test:
|
||||
Put a 1khz sine into the system and record both channels for x seconds e.g. 60.
|
||||
I want you to detect buzzing and other artifacts in the recording.
|
||||
Give me a number how many artifacts you found.
|
||||
Make the detection algorithm configurable, so we can try different approaches.
|
||||
|
||||
Again input it into the audio interface and measure both loopback and radio path like in the other test.
|
||||
|
||||
|
||||
|
||||
|
||||
34
config.yaml
34
config.yaml
@@ -7,7 +7,7 @@ audio:
|
||||
|
||||
test_tones:
|
||||
frequencies: [100, 250, 500, 1000, 2000, 4000, 8000] # Hz
|
||||
duration: 5.0 # seconds per frequency
|
||||
duration: 10.0 # seconds per frequency
|
||||
amplitude: 0.5 # 0.0 to 1.0
|
||||
latency_runs: 5 # Number of latency measurements to average
|
||||
|
||||
@@ -15,3 +15,35 @@ output:
|
||||
results_dir: "test_results"
|
||||
save_plots: true
|
||||
save_raw_audio: false
|
||||
|
||||
artifact_detection:
|
||||
test_frequency: 1000 # Hz - Test tone frequency (for sine wave mode)
|
||||
duration: 60.0 # seconds - Recording duration
|
||||
amplitude: 0.5 # 0.0 to 1.0
|
||||
startup_delay: 0 # seconds - Wait before starting recording to let system settle
|
||||
# Chirp signal parameters (used when --signal-type chirp is specified)
|
||||
chirp_f0: 100 # Hz - Chirp start frequency
|
||||
chirp_f1: 8000 # Hz - Chirp end frequency
|
||||
# NOTE: All detectors skip the first and last 1 second of recording to avoid startup/shutdown transients
|
||||
detectors:
|
||||
spectral_anomaly:
|
||||
enabled: false # DISABLED - generates too many false positives, needs better algorithm
|
||||
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
|
||||
amplitude_spikes:
|
||||
enabled: true
|
||||
threshold_factor: 5.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
|
||||
zero_crossing:
|
||||
enabled: false
|
||||
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
|
||||
energy_variation:
|
||||
enabled: true
|
||||
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
|
||||
|
||||
latency:
|
||||
max_std_dev_ms: 0.5 # Maximum allowed std deviation; test fails if exceeded
|
||||
min_avg_ms: 1.0 # Minimum expected average latency; near-zero indicates bad loopback
|
||||
|
||||
latency_buildup:
|
||||
measurement_interval: 10 # seconds between latency measurements
|
||||
max_duration: null # maximum test duration in seconds (null = run until canceled)
|
||||
buildup_threshold_percent: 5.0 # percentage change threshold for buildup detection
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
metadata:
|
||||
test_id: 20260226_123456
|
||||
timestamp: '2026-02-26T12:34:56.789012'
|
||||
pcb_version: v2.1
|
||||
pcb_revision: A
|
||||
serial_number: SN001234
|
||||
software_version: a3f2b1c8d9e
|
||||
notes: Baseline test with new capacitor values
|
||||
comment: Baseline test with new capacitor values
|
||||
test_results:
|
||||
- frequency_hz: 100
|
||||
latency_ms: 2.341
|
||||
|
||||
92
plot_alsa_status.py
Normal file
92
plot_alsa_status.py
Normal file
@@ -0,0 +1,92 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Parse ALSA status log file and plot avail value over time."""
|
||||
|
||||
import sys
|
||||
import re
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
|
||||
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
|
||||
|
||||
|
||||
def parse_log(log_path):
|
||||
timestamps = []
|
||||
avail_values = []
|
||||
|
||||
with open(log_path, "r") as f:
|
||||
current_timestamp = None
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
|
||||
# Check for timestamp line
|
||||
ts_match = TIMESTAMP_RE.match(line)
|
||||
if ts_match:
|
||||
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
|
||||
continue
|
||||
|
||||
# Check for avail line (only if we have a timestamp)
|
||||
if current_timestamp:
|
||||
avail_match = AVAIL_RE.match(line)
|
||||
if avail_match:
|
||||
timestamps.append(current_timestamp)
|
||||
avail_values.append(int(avail_match.group(1)))
|
||||
current_timestamp = None # Reset until next timestamp
|
||||
|
||||
if not timestamps:
|
||||
print("No valid timestamp/avail pairs found in the log file.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Convert to relative seconds from first timestamp
|
||||
t0 = timestamps[0]
|
||||
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||
return seconds, avail_values
|
||||
|
||||
|
||||
def plot(seconds, avail_values, out_path):
|
||||
plt.figure(figsize=(12, 6))
|
||||
plt.plot(seconds, avail_values, label="avail", linewidth=1, alpha=0.7)
|
||||
|
||||
# Add moving average (windowed mean)
|
||||
if len(avail_values) >= 10: # Only if we have enough data points
|
||||
window_size = min(50, len(avail_values) // 10) # Adaptive window size
|
||||
import numpy as np
|
||||
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
|
||||
# Adjust timestamps for the moving average (they align with window centers)
|
||||
ma_seconds = seconds[window_size-1:]
|
||||
plt.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})", linewidth=2)
|
||||
|
||||
plt.xlabel("Time (s)")
|
||||
plt.ylabel("Available samples")
|
||||
plt.title("ALSA Available Samples Over Time")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
plt.tight_layout()
|
||||
plt.savefig(out_path, dpi=150)
|
||||
print(f"Plot saved to {out_path}")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 2:
|
||||
print(f"Usage: {sys.argv[0]} <path_to_alsa_status_log>", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
log_path = sys.argv[1]
|
||||
if not os.path.isfile(log_path):
|
||||
print(f"File not found: {log_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
seconds, avail_values = parse_log(log_path)
|
||||
|
||||
log_dir = os.path.dirname(os.path.abspath(log_path))
|
||||
log_base = os.path.splitext(os.path.basename(log_path))[0]
|
||||
out_path = os.path.join(log_dir, f"{log_base}_avail_plot.png")
|
||||
|
||||
plot(seconds, avail_values, out_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
302
plot_combined.py
Normal file
302
plot_combined.py
Normal file
@@ -0,0 +1,302 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Combine ALSA avail, perf metrics, and latency plots into one figure."""
|
||||
|
||||
import sys
|
||||
import re
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
|
||||
# Regex patterns
|
||||
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
|
||||
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
|
||||
PERF_RE = re.compile(
|
||||
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
|
||||
r".*?sample mean=([\d.]+)ms"
|
||||
r".*?write mean=([\d.]+)ms"
|
||||
r".*?loop mean=([\d.]+)ms"
|
||||
)
|
||||
LATENCY_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*latency.*?(\d+(?:\.\d+)?)ms")
|
||||
PYALSA_AVAIL_BEFORE_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: avail before read: (\d+)")
|
||||
PYALSA_AVAIL_AFTER_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: .* avail=(\d+)")
|
||||
|
||||
|
||||
def parse_alsa_status(log_path):
|
||||
timestamps = []
|
||||
avail_values = []
|
||||
|
||||
with open(log_path, "r") as f:
|
||||
current_timestamp = None
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
|
||||
ts_match = TIMESTAMP_RE.match(line)
|
||||
if ts_match:
|
||||
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
|
||||
continue
|
||||
|
||||
if current_timestamp:
|
||||
avail_match = AVAIL_RE.match(line)
|
||||
if avail_match:
|
||||
timestamps.append(current_timestamp)
|
||||
avail_values.append(int(avail_match.group(1)))
|
||||
current_timestamp = None
|
||||
|
||||
if not timestamps:
|
||||
return [], []
|
||||
|
||||
t0 = timestamps[0]
|
||||
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||
return seconds, avail_values
|
||||
|
||||
|
||||
def parse_perf_log(log_path):
|
||||
timestamps = []
|
||||
sample_means = []
|
||||
write_means = []
|
||||
loop_means = []
|
||||
|
||||
with open(log_path, "r") as f:
|
||||
for line in f:
|
||||
m = PERF_RE.search(line)
|
||||
if m:
|
||||
ts_str, sample, write, loop = m.groups()
|
||||
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
|
||||
timestamps.append(ts)
|
||||
sample_means.append(float(sample))
|
||||
write_means.append(float(write))
|
||||
loop_means.append(float(loop))
|
||||
|
||||
if not timestamps:
|
||||
return [], [], [], []
|
||||
|
||||
t0 = timestamps[0]
|
||||
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||
return seconds, sample_means, write_means, loop_means
|
||||
|
||||
|
||||
def parse_pyalsa_avail(perf_file):
|
||||
"""Parse PyALSA avail before/after read from the perf log file."""
|
||||
before_timestamps = []
|
||||
before_values = []
|
||||
after_timestamps = []
|
||||
after_values = []
|
||||
|
||||
with open(perf_file, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
|
||||
# Check for "avail before read"
|
||||
before_match = PYALSA_AVAIL_BEFORE_RE.match(line)
|
||||
if before_match:
|
||||
ts_str, avail = before_match.groups()
|
||||
current_year = datetime.now().year
|
||||
ts_with_year = f"{current_year} {ts_str}"
|
||||
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
|
||||
before_timestamps.append(ts)
|
||||
before_values.append(int(avail))
|
||||
continue
|
||||
|
||||
# Check for "avail=" (after read)
|
||||
after_match = PYALSA_AVAIL_AFTER_RE.match(line)
|
||||
if after_match:
|
||||
ts_str, avail = after_match.groups()
|
||||
current_year = datetime.now().year
|
||||
ts_with_year = f"{current_year} {ts_str}"
|
||||
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
|
||||
after_timestamps.append(ts)
|
||||
after_values.append(int(avail))
|
||||
|
||||
return before_timestamps, before_values, after_timestamps, after_values
|
||||
|
||||
|
||||
def parse_latency_yaml(yaml_path):
|
||||
import yaml
|
||||
|
||||
with open(yaml_path, 'r') as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
|
||||
|
||||
timestamps = []
|
||||
latencies = []
|
||||
|
||||
for measurement in latency_measurements:
|
||||
ts_str = measurement['timestamp']
|
||||
latency = measurement['latency_ms']
|
||||
|
||||
# Parse ISO format timestamp
|
||||
ts = datetime.fromisoformat(ts_str)
|
||||
timestamps.append(ts)
|
||||
latencies.append(float(latency))
|
||||
|
||||
if not timestamps:
|
||||
return [], []
|
||||
|
||||
t0 = timestamps[0]
|
||||
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||
return seconds, latencies
|
||||
|
||||
|
||||
def plot_combined(alsa_file, perf_file, latency_file, out_path):
|
||||
# Parse all logs
|
||||
alsa_seconds, avail_values = parse_alsa_status(alsa_file)
|
||||
perf_seconds, sample_means, write_means, loop_means = parse_perf_log(perf_file)
|
||||
latency_seconds, latencies = parse_latency_yaml(latency_file)
|
||||
|
||||
# Parse PyALSA avail data
|
||||
before_timestamps, before_values, after_timestamps, after_values = parse_pyalsa_avail(perf_file)
|
||||
|
||||
# Get absolute timestamps for proper alignment
|
||||
alsa_timestamps = []
|
||||
perf_timestamps = []
|
||||
latency_timestamps = []
|
||||
|
||||
# Re-parse to get absolute timestamps for alignment
|
||||
with open(alsa_file, "r") as f:
|
||||
current_timestamp = None
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
ts_match = TIMESTAMP_RE.match(line)
|
||||
if ts_match:
|
||||
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
|
||||
continue
|
||||
if current_timestamp:
|
||||
avail_match = AVAIL_RE.match(line)
|
||||
if avail_match:
|
||||
alsa_timestamps.append(current_timestamp)
|
||||
current_timestamp = None
|
||||
|
||||
with open(perf_file, "r") as f:
|
||||
for line in f:
|
||||
m = PERF_RE.search(line)
|
||||
if m:
|
||||
ts_str = m.group(1)
|
||||
# Add current year to the timestamp since it doesn't include year
|
||||
current_year = datetime.now().year
|
||||
ts_with_year = f"{current_year} {ts_str}"
|
||||
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
|
||||
perf_timestamps.append(ts)
|
||||
|
||||
import yaml
|
||||
with open(latency_file, 'r') as f:
|
||||
data = yaml.safe_load(f)
|
||||
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
|
||||
for measurement in latency_measurements:
|
||||
ts_str = measurement['timestamp']
|
||||
ts = datetime.fromisoformat(ts_str)
|
||||
latency_timestamps.append(ts)
|
||||
|
||||
# Find earliest timestamp
|
||||
all_abs_timestamps = []
|
||||
if alsa_timestamps:
|
||||
all_abs_timestamps.extend(alsa_timestamps)
|
||||
if perf_timestamps:
|
||||
all_abs_timestamps.extend(perf_timestamps)
|
||||
if latency_timestamps:
|
||||
all_abs_timestamps.extend(latency_timestamps)
|
||||
if before_timestamps:
|
||||
all_abs_timestamps.extend(before_timestamps)
|
||||
if after_timestamps:
|
||||
all_abs_timestamps.extend(after_timestamps)
|
||||
|
||||
t0_absolute = min(all_abs_timestamps)
|
||||
|
||||
# Convert all times to seconds from earliest timestamp
|
||||
alsa_aligned = [(ts - t0_absolute).total_seconds() for ts in alsa_timestamps] if alsa_timestamps else []
|
||||
perf_aligned = [(ts - t0_absolute).total_seconds() for ts in perf_timestamps] if perf_timestamps else []
|
||||
latency_aligned = [(ts - t0_absolute).total_seconds() for ts in latency_timestamps] if latency_timestamps else []
|
||||
before_aligned = [(ts - t0_absolute).total_seconds() for ts in before_timestamps] if before_timestamps else []
|
||||
after_aligned = [(ts - t0_absolute).total_seconds() for ts in after_timestamps] if after_timestamps else []
|
||||
|
||||
# Create figure with 4 subplots sharing x-axis
|
||||
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(14, 12), sharex=True)
|
||||
fig.suptitle("Combined Audio Performance Metrics", fontsize=16)
|
||||
|
||||
# Plot 1: ALSA avail
|
||||
if alsa_aligned and avail_values:
|
||||
ax1.plot(alsa_aligned, avail_values, label="avail", linewidth=1, alpha=0.7, color='blue')
|
||||
if len(avail_values) >= 10:
|
||||
window_size = min(50, len(avail_values) // 10)
|
||||
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
|
||||
ma_seconds = alsa_aligned[window_size-1:]
|
||||
ax1.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})",
|
||||
linewidth=2, color='darkblue')
|
||||
ax1.set_ylabel("Available samples")
|
||||
ax1.set_title("ALSA Available Samples")
|
||||
ax1.legend()
|
||||
ax1.grid(True, alpha=0.3)
|
||||
|
||||
# Plot 2: Perf metrics
|
||||
if perf_aligned:
|
||||
ax2.plot(perf_aligned, sample_means, label="sample mean", linewidth=1, alpha=0.8, color='green')
|
||||
ax2.plot(perf_aligned, write_means, label="write mean", linewidth=1, alpha=0.8, color='orange')
|
||||
ax2.plot(perf_aligned, loop_means, label="loop mean", linewidth=1, alpha=0.8, color='red')
|
||||
|
||||
# Add moving average for loop mean
|
||||
if len(loop_means) >= 10:
|
||||
window_size = min(50, len(loop_means) // 10)
|
||||
moving_avg = np.convolve(loop_means, np.ones(window_size)/window_size, mode='valid')
|
||||
ma_seconds = perf_aligned[window_size-1:]
|
||||
ax2.plot(ma_seconds, moving_avg, label=f"loop mean moving avg (window={window_size})",
|
||||
linewidth=2, color='darkred', alpha=0.9)
|
||||
|
||||
ax2.set_ylabel("Duration (ms)")
|
||||
ax2.set_title("Performance Metrics")
|
||||
ax2.legend()
|
||||
ax2.grid(True, alpha=0.3)
|
||||
|
||||
# Plot 3: Latency
|
||||
if latency_aligned:
|
||||
ax3.plot(latency_aligned, latencies, label="latency", linewidth=1, color='purple')
|
||||
ax3.set_ylabel("Latency (ms)")
|
||||
ax3.set_title("Latency Buildup")
|
||||
ax3.legend()
|
||||
ax3.grid(True, alpha=0.3)
|
||||
|
||||
# Plot 4: PyALSA avail before/after read
|
||||
if before_aligned and before_values:
|
||||
ax4.plot(before_aligned, before_values, label="avail before read", linewidth=1, alpha=0.7, color='cyan')
|
||||
if after_aligned and after_values:
|
||||
ax4.plot(after_aligned, after_values, label="avail after read", linewidth=1, alpha=0.7, color='magenta')
|
||||
ax4.set_xlabel("Time (s)")
|
||||
ax4.set_ylabel("Available samples")
|
||||
ax4.set_title("PyALSA Available Samples (Before/After Read)")
|
||||
if before_aligned or after_aligned:
|
||||
ax4.legend()
|
||||
ax4.grid(True, alpha=0.3)
|
||||
|
||||
plt.tight_layout()
|
||||
plt.savefig(out_path, dpi=150, bbox_inches='tight')
|
||||
print(f"Combined plot saved to {out_path}")
|
||||
|
||||
# Show interactive plot
|
||||
plt.show()
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 4:
|
||||
print(f"Usage: {sys.argv[0]} <alsa_status.log> <perf_log.log> <latency_results.yaml>", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
alsa_file = sys.argv[1]
|
||||
perf_file = sys.argv[2]
|
||||
latency_file = sys.argv[3]
|
||||
|
||||
for file_path in [alsa_file, perf_file, latency_file]:
|
||||
if not os.path.isfile(file_path):
|
||||
print(f"File not found: {file_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Determine output path (same directory as first file)
|
||||
log_dir = os.path.dirname(os.path.abspath(alsa_file))
|
||||
out_path = os.path.join(log_dir, "combined_audio_plot.png")
|
||||
|
||||
plot_combined(alsa_file, perf_file, latency_file, out_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
81
plot_perf_log.py
Normal file
81
plot_perf_log.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Parse Perf lines from a log file and plot sample mean, write mean, and loop mean over time."""
|
||||
|
||||
import sys
|
||||
import re
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
PERF_RE = re.compile(
|
||||
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
|
||||
r".*?sample mean=([\d.]+)ms"
|
||||
r".*?write mean=([\d.]+)ms"
|
||||
r".*?loop mean=([\d.]+)ms"
|
||||
)
|
||||
|
||||
|
||||
def parse_log(log_path):
|
||||
timestamps = []
|
||||
sample_means = []
|
||||
write_means = []
|
||||
loop_means = []
|
||||
|
||||
with open(log_path, "r") as f:
|
||||
for line in f:
|
||||
m = PERF_RE.search(line)
|
||||
if m:
|
||||
ts_str, sample, write, loop = m.groups()
|
||||
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
|
||||
timestamps.append(ts)
|
||||
sample_means.append(float(sample))
|
||||
write_means.append(float(write))
|
||||
loop_means.append(float(loop))
|
||||
|
||||
if not timestamps:
|
||||
print("No Perf lines found in the log file.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
t0 = timestamps[0]
|
||||
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||
return seconds, sample_means, write_means, loop_means
|
||||
|
||||
|
||||
def plot(seconds, sample_means, write_means, loop_means, out_path):
|
||||
plt.figure(figsize=(12, 6))
|
||||
plt.plot(seconds, sample_means, label="sample mean (ms)")
|
||||
plt.plot(seconds, write_means, label="write mean (ms)")
|
||||
plt.plot(seconds, loop_means, label="loop mean (ms)")
|
||||
plt.xlabel("Time (s)")
|
||||
plt.ylabel("Duration (ms)")
|
||||
plt.title("Perf Metrics Over Time")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
plt.tight_layout()
|
||||
plt.savefig(out_path, dpi=150)
|
||||
print(f"Plot saved to {out_path}")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 2:
|
||||
print(f"Usage: {sys.argv[0]} <path_to_log_file>", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
log_path = sys.argv[1]
|
||||
if not os.path.isfile(log_path):
|
||||
print(f"File not found: {log_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
seconds, sample_means, write_means, loop_means = parse_log(log_path)
|
||||
|
||||
log_dir = os.path.dirname(os.path.abspath(log_path))
|
||||
log_base = os.path.splitext(os.path.basename(log_path))[0]
|
||||
out_path = os.path.join(log_dir, f"{log_base}_perf_plot.png")
|
||||
|
||||
plot(seconds, sample_means, write_means, loop_means, out_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -235,11 +235,22 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool =
|
||||
last_correlation = correlation
|
||||
last_lags = lags
|
||||
|
||||
avg = float(np.mean(latencies))
|
||||
std_dev = float(np.std(latencies))
|
||||
latency_cfg = config.get('latency', {})
|
||||
max_std_dev_ms = latency_cfg.get('max_std_dev_ms', None)
|
||||
min_avg_ms = latency_cfg.get('min_avg_ms', None)
|
||||
valid = True
|
||||
if max_std_dev_ms is not None and std_dev > max_std_dev_ms:
|
||||
valid = False
|
||||
if min_avg_ms is not None and avg < min_avg_ms:
|
||||
valid = False
|
||||
latency_stats = {
|
||||
'avg': float(np.mean(latencies)),
|
||||
'avg': avg,
|
||||
'min': float(np.min(latencies)),
|
||||
'max': float(np.max(latencies)),
|
||||
'std': float(np.std(latencies))
|
||||
'std': std_dev,
|
||||
'valid': valid
|
||||
}
|
||||
|
||||
if save_plots and output_dir and last_recording is not None:
|
||||
@@ -279,3 +290,512 @@ def plot_latency_test(channel_1: np.ndarray, channel_2: np.ndarray, correlation:
|
||||
plot_file = output_dir / 'latency_chirp_analysis.png'
|
||||
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
|
||||
def detect_artifacts_spectral_anomaly(signal_data: np.ndarray, sample_rate: int,
|
||||
fundamental_freq: float, threshold_db: float = -60) -> List[Dict]:
|
||||
artifacts = []
|
||||
window_size = int(sample_rate * 0.5)
|
||||
hop_size = int(sample_rate * 0.25)
|
||||
|
||||
for i in range(0, len(signal_data) - window_size, hop_size):
|
||||
segment = signal_data[i:i+window_size]
|
||||
fft = np.fft.rfft(segment)
|
||||
freqs = np.fft.rfftfreq(len(segment), 1/sample_rate)
|
||||
power_spectrum_db = 20 * np.log10(np.abs(fft) + 1e-10)
|
||||
|
||||
fundamental_idx = np.argmin(np.abs(freqs - fundamental_freq))
|
||||
fundamental_power_db = power_spectrum_db[fundamental_idx]
|
||||
|
||||
expected_harmonics = set()
|
||||
harmonic_tolerance_bins = 3
|
||||
for n in range(1, 11):
|
||||
harmonic_freq = n * fundamental_freq
|
||||
if harmonic_freq < sample_rate / 2:
|
||||
harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq))
|
||||
for offset in range(-harmonic_tolerance_bins, harmonic_tolerance_bins + 1):
|
||||
if 0 <= harmonic_idx + offset < len(freqs):
|
||||
expected_harmonics.add(harmonic_idx + offset)
|
||||
|
||||
noise_floor_db = np.percentile(power_spectrum_db[10:], 10)
|
||||
|
||||
unexpected_peaks = []
|
||||
for idx in range(10, len(power_spectrum_db)):
|
||||
if idx not in expected_harmonics:
|
||||
if power_spectrum_db[idx] > noise_floor_db + abs(threshold_db):
|
||||
unexpected_peaks.append((freqs[idx], power_spectrum_db[idx]))
|
||||
|
||||
if len(unexpected_peaks) >= 5:
|
||||
artifacts.append({
|
||||
'type': 'spectral_anomaly',
|
||||
'time_sec': i / sample_rate,
|
||||
'unexpected_frequencies': unexpected_peaks[:10],
|
||||
'count': len(unexpected_peaks)
|
||||
})
|
||||
|
||||
return artifacts
|
||||
|
||||
|
||||
def detect_artifacts_amplitude_spikes(signal_data: np.ndarray, sample_rate: int,
|
||||
threshold_factor: float = 3.0) -> List[Dict]:
|
||||
artifacts = []
|
||||
|
||||
skip_samples = int(sample_rate * 1.0)
|
||||
if len(signal_data) <= 2 * skip_samples:
|
||||
return artifacts
|
||||
|
||||
envelope = np.abs(signal_data)
|
||||
|
||||
window_size = int(sample_rate * 0.01)
|
||||
if window_size % 2 == 0:
|
||||
window_size += 1
|
||||
|
||||
from scipy.ndimage import uniform_filter1d
|
||||
envelope_smooth = uniform_filter1d(envelope, size=window_size, mode='reflect')
|
||||
|
||||
median_env = np.median(envelope_smooth)
|
||||
mad = np.median(np.abs(envelope_smooth - median_env))
|
||||
|
||||
if mad == 0:
|
||||
return artifacts
|
||||
|
||||
threshold_high = median_env + threshold_factor * mad * 1.4826
|
||||
threshold_low = median_env - threshold_factor * mad * 1.4826
|
||||
|
||||
# Detect spikes (too high)
|
||||
spike_indices = np.where(envelope_smooth > threshold_high)[0]
|
||||
|
||||
# Detect dropouts (too low)
|
||||
dropout_indices = np.where(envelope_smooth < threshold_low)[0]
|
||||
|
||||
total_duration = len(signal_data) / sample_rate
|
||||
|
||||
# Process spikes
|
||||
if len(spike_indices) > 0:
|
||||
groups = []
|
||||
current_group = [spike_indices[0]]
|
||||
|
||||
for idx in spike_indices[1:]:
|
||||
if idx - current_group[-1] <= int(sample_rate * 0.05):
|
||||
current_group.append(idx)
|
||||
else:
|
||||
groups.append(current_group)
|
||||
current_group = [idx]
|
||||
groups.append(current_group)
|
||||
|
||||
for group in groups:
|
||||
peak_idx = group[np.argmax(envelope_smooth[group])]
|
||||
time_sec = peak_idx / sample_rate
|
||||
peak_value = envelope_smooth[peak_idx]
|
||||
|
||||
# Skip artifacts in first and last second
|
||||
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||
continue
|
||||
|
||||
artifacts.append({
|
||||
'type': 'amplitude_spike',
|
||||
'time_sec': float(time_sec),
|
||||
'peak_amplitude': float(peak_value),
|
||||
'median_amplitude': float(median_env),
|
||||
'deviation_factor': float((peak_value - median_env) / (mad * 1.4826)) if mad > 0 else 0
|
||||
})
|
||||
|
||||
# Process dropouts
|
||||
if len(dropout_indices) > 0:
|
||||
groups = []
|
||||
current_group = [dropout_indices[0]]
|
||||
|
||||
for idx in dropout_indices[1:]:
|
||||
if idx - current_group[-1] <= int(sample_rate * 0.05):
|
||||
current_group.append(idx)
|
||||
else:
|
||||
groups.append(current_group)
|
||||
current_group = [idx]
|
||||
groups.append(current_group)
|
||||
|
||||
for group in groups:
|
||||
dropout_idx = group[np.argmin(envelope_smooth[group])]
|
||||
time_sec = dropout_idx / sample_rate
|
||||
dropout_value = envelope_smooth[dropout_idx]
|
||||
|
||||
# Skip artifacts in first and last second
|
||||
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||
continue
|
||||
|
||||
artifacts.append({
|
||||
'type': 'amplitude_dropout',
|
||||
'time_sec': float(time_sec),
|
||||
'dropout_amplitude': float(dropout_value),
|
||||
'median_amplitude': float(median_env),
|
||||
'deviation_factor': float((median_env - dropout_value) / (mad * 1.4826)) if mad > 0 else 0
|
||||
})
|
||||
|
||||
return artifacts
|
||||
|
||||
|
||||
def detect_artifacts_zero_crossing(signal_data: np.ndarray, sample_rate: int,
|
||||
threshold_factor: float = 2.0) -> List[Dict]:
|
||||
artifacts = []
|
||||
|
||||
if len(signal_data) <= int(sample_rate * 2.0):
|
||||
return artifacts
|
||||
|
||||
window_size = int(sample_rate * 0.1)
|
||||
hop_size = int(sample_rate * 0.05)
|
||||
|
||||
zcr_values = []
|
||||
for i in range(0, len(signal_data) - window_size, hop_size):
|
||||
segment = signal_data[i:i+window_size]
|
||||
zero_crossings = np.sum(np.abs(np.diff(np.sign(segment)))) / 2
|
||||
zcr = zero_crossings / len(segment)
|
||||
zcr_values.append((i, zcr))
|
||||
|
||||
if not zcr_values:
|
||||
return artifacts
|
||||
|
||||
zcr_array = np.array([z[1] for z in zcr_values])
|
||||
median_zcr = np.median(zcr_array)
|
||||
std_zcr = np.std(zcr_array)
|
||||
|
||||
total_duration = len(signal_data) / sample_rate
|
||||
|
||||
for i, zcr in zcr_values:
|
||||
time_sec = i / sample_rate
|
||||
|
||||
# Skip artifacts in first and last second
|
||||
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||
continue
|
||||
|
||||
if std_zcr > 0 and abs(zcr - median_zcr) > threshold_factor * std_zcr:
|
||||
artifacts.append({
|
||||
'type': 'zero_crossing_anomaly',
|
||||
'time_sec': float(time_sec),
|
||||
'zcr_value': float(zcr),
|
||||
'median_zcr': float(median_zcr),
|
||||
'deviation_factor': float(abs(zcr - median_zcr) / std_zcr)
|
||||
})
|
||||
|
||||
return artifacts
|
||||
|
||||
|
||||
def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
|
||||
threshold_db: float = 6.0) -> List[Dict]:
|
||||
artifacts = []
|
||||
|
||||
if len(signal_data) <= int(sample_rate * 2.0):
|
||||
return artifacts
|
||||
|
||||
window_size = int(sample_rate * 0.1)
|
||||
hop_size = int(sample_rate * 0.05)
|
||||
|
||||
energy_values = []
|
||||
for i in range(0, len(signal_data) - window_size, hop_size):
|
||||
segment = signal_data[i:i+window_size]
|
||||
energy = np.sum(segment**2)
|
||||
energy_values.append((i, energy))
|
||||
|
||||
total_duration = len(signal_data) / sample_rate
|
||||
|
||||
for idx in range(1, len(energy_values)):
|
||||
prev_energy = energy_values[idx-1][1]
|
||||
curr_energy = energy_values[idx][1]
|
||||
|
||||
if prev_energy > 0 and curr_energy > 0:
|
||||
energy_change_db = 10 * np.log10(curr_energy / prev_energy)
|
||||
|
||||
if abs(energy_change_db) > threshold_db:
|
||||
time_sec = energy_values[idx][0] / sample_rate
|
||||
|
||||
# Skip artifacts in first and last second
|
||||
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
|
||||
continue
|
||||
|
||||
artifacts.append({
|
||||
'type': 'energy_variation',
|
||||
'time_sec': float(time_sec),
|
||||
'energy_change_db': float(energy_change_db),
|
||||
'threshold_db': float(threshold_db)
|
||||
})
|
||||
|
||||
return artifacts
|
||||
|
||||
|
||||
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
|
||||
expected_freq: float) -> Dict:
|
||||
"""
|
||||
Measure the actual dominant frequency in the signal and compare to expected.
|
||||
Uses FFT on the full signal (skipping first and last second).
|
||||
"""
|
||||
# Skip first and last second
|
||||
skip_samples = int(sample_rate * 1.0)
|
||||
if len(signal_data) <= 2 * skip_samples:
|
||||
return {
|
||||
'expected_freq_hz': float(expected_freq),
|
||||
'measured_freq_hz': 0.0,
|
||||
'error_hz': 0.0,
|
||||
'error_percent': 0.0
|
||||
}
|
||||
|
||||
signal_trimmed = signal_data[skip_samples:-skip_samples]
|
||||
|
||||
# Perform FFT
|
||||
fft = np.fft.rfft(signal_trimmed)
|
||||
freqs = np.fft.rfftfreq(len(signal_trimmed), 1/sample_rate)
|
||||
|
||||
# Find the peak frequency
|
||||
magnitude = np.abs(fft)
|
||||
peak_idx = np.argmax(magnitude)
|
||||
measured_freq = freqs[peak_idx]
|
||||
|
||||
# Calculate error
|
||||
error_hz = measured_freq - expected_freq
|
||||
error_percent = (error_hz / expected_freq) * 100.0 if expected_freq > 0 else 0.0
|
||||
|
||||
return {
|
||||
'expected_freq_hz': float(expected_freq),
|
||||
'measured_freq_hz': float(measured_freq),
|
||||
'error_hz': float(error_hz),
|
||||
'error_percent': float(error_percent)
|
||||
}
|
||||
|
||||
|
||||
def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundamental_freq: float,
|
||||
detector_config: Dict) -> Dict:
|
||||
all_artifacts = []
|
||||
|
||||
if detector_config.get('spectral_anomaly', {}).get('enabled', True):
|
||||
threshold = detector_config.get('spectral_anomaly', {}).get('threshold_db', -60)
|
||||
artifacts = detect_artifacts_spectral_anomaly(signal_data, sample_rate, fundamental_freq, threshold)
|
||||
all_artifacts.extend(artifacts)
|
||||
|
||||
if detector_config.get('amplitude_spikes', {}).get('enabled', True):
|
||||
threshold = detector_config.get('amplitude_spikes', {}).get('threshold_factor', 3.0)
|
||||
artifacts = detect_artifacts_amplitude_spikes(signal_data, sample_rate, threshold)
|
||||
all_artifacts.extend(artifacts)
|
||||
|
||||
if detector_config.get('zero_crossing', {}).get('enabled', True):
|
||||
threshold = detector_config.get('zero_crossing', {}).get('threshold_factor', 2.0)
|
||||
artifacts = detect_artifacts_zero_crossing(signal_data, sample_rate, threshold)
|
||||
all_artifacts.extend(artifacts)
|
||||
|
||||
if detector_config.get('energy_variation', {}).get('enabled', True):
|
||||
threshold = detector_config.get('energy_variation', {}).get('threshold_db', 6.0)
|
||||
artifacts = detect_artifacts_energy_variation(signal_data, sample_rate, threshold)
|
||||
all_artifacts.extend(artifacts)
|
||||
|
||||
# Measure frequency accuracy
|
||||
freq_accuracy = measure_frequency_accuracy(signal_data, sample_rate, fundamental_freq)
|
||||
|
||||
artifact_summary = {
|
||||
'total_count': len(all_artifacts),
|
||||
'by_type': {},
|
||||
'artifacts': all_artifacts,
|
||||
'frequency_accuracy': freq_accuracy
|
||||
}
|
||||
|
||||
for artifact in all_artifacts:
|
||||
artifact_type = artifact['type']
|
||||
if artifact_type not in artifact_summary['by_type']:
|
||||
artifact_summary['by_type'][artifact_type] = 0
|
||||
artifact_summary['by_type'][artifact_type] += 1
|
||||
|
||||
return artifact_summary
|
||||
|
||||
|
||||
def plot_individual_anomaly(signal_data: np.ndarray, artifact: Dict, artifact_idx: int,
|
||||
channel_name: str, frequency: float, sample_rate: int,
|
||||
output_dir: Path):
|
||||
periods_to_show = 20
|
||||
period_samples = int(sample_rate / frequency)
|
||||
total_samples = periods_to_show * period_samples
|
||||
|
||||
artifact_time = artifact['time_sec']
|
||||
artifact_sample = int(artifact_time * sample_rate)
|
||||
|
||||
start_sample = max(0, artifact_sample - total_samples // 2)
|
||||
end_sample = min(len(signal_data), artifact_sample + total_samples // 2)
|
||||
|
||||
if end_sample - start_sample < total_samples:
|
||||
if start_sample == 0:
|
||||
end_sample = min(len(signal_data), start_sample + total_samples)
|
||||
else:
|
||||
start_sample = max(0, end_sample - total_samples)
|
||||
|
||||
segment = signal_data[start_sample:end_sample]
|
||||
time_ms = (np.arange(len(segment)) + start_sample) / sample_rate * 1000
|
||||
|
||||
fig, ax = plt.subplots(1, 1, figsize=(14, 6))
|
||||
|
||||
ax.plot(time_ms, segment, linewidth=1.0, color='blue', alpha=0.8)
|
||||
ax.axvline(x=artifact_time * 1000, color='red', linestyle='--', linewidth=2,
|
||||
label=f'Anomaly at {artifact_time:.3f}s', alpha=0.7)
|
||||
|
||||
ax.set_xlabel('Time (ms)', fontsize=11)
|
||||
ax.set_ylabel('Amplitude', fontsize=11)
|
||||
|
||||
artifact_type = artifact['type'].replace('_', ' ').title()
|
||||
ax.set_title(f'{channel_name} - {artifact_type} #{artifact_idx+1} (~{periods_to_show} periods @ {frequency}Hz)',
|
||||
fontsize=12, fontweight='bold')
|
||||
|
||||
info_text = f"Type: {artifact_type}\nTime: {artifact_time:.3f}s"
|
||||
if 'deviation_factor' in artifact:
|
||||
info_text += f"\nDeviation: {artifact['deviation_factor']:.2f}σ"
|
||||
if 'energy_change_db' in artifact:
|
||||
info_text += f"\nEnergy Change: {artifact['energy_change_db']:.2f} dB"
|
||||
if 'count' in artifact and artifact['type'] == 'spectral_anomaly':
|
||||
info_text += f"\nUnexpected Peaks: {artifact['count']}"
|
||||
|
||||
ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
|
||||
fontsize=9, verticalalignment='top',
|
||||
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
|
||||
|
||||
ax.legend(loc='upper right')
|
||||
ax.grid(True, alpha=0.3)
|
||||
|
||||
plt.tight_layout()
|
||||
|
||||
safe_type = artifact['type'].replace('_', '-')
|
||||
plot_file = output_dir / f'{channel_name.lower().replace(" ", "_")}_anomaly_{artifact_idx+1:04d}_{safe_type}_{artifact_time:.3f}s.png'
|
||||
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
|
||||
def plot_artifact_detection(channel_1: np.ndarray, channel_2: np.ndarray,
|
||||
artifacts_ch1: Dict, artifacts_ch2: Dict,
|
||||
frequency: float, sample_rate: int, output_dir: Path):
|
||||
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
|
||||
|
||||
time = np.arange(len(channel_1)) / sample_rate
|
||||
|
||||
axes[0, 0].plot(time, channel_1, alpha=0.7, linewidth=0.5)
|
||||
axes[0, 0].set_xlabel('Time (s)')
|
||||
axes[0, 0].set_ylabel('Amplitude')
|
||||
axes[0, 0].set_title(f'Channel 1 (Loopback) - {artifacts_ch1["total_count"]} artifacts')
|
||||
axes[0, 0].grid(True, alpha=0.3)
|
||||
|
||||
for artifact in artifacts_ch1['artifacts']:
|
||||
axes[0, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
|
||||
|
||||
axes[1, 0].plot(time, channel_2, alpha=0.7, linewidth=0.5)
|
||||
axes[1, 0].set_xlabel('Time (s)')
|
||||
axes[1, 0].set_ylabel('Amplitude')
|
||||
axes[1, 0].set_title(f'Channel 2 (DUT/Radio) - {artifacts_ch2["total_count"]} artifacts')
|
||||
axes[1, 0].grid(True, alpha=0.3)
|
||||
|
||||
for artifact in artifacts_ch2['artifacts']:
|
||||
axes[1, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
|
||||
|
||||
fft_ch1 = np.fft.rfft(channel_1)
|
||||
fft_ch2 = np.fft.rfft(channel_2)
|
||||
freqs = np.fft.rfftfreq(len(channel_1), 1/sample_rate)
|
||||
|
||||
axes[0, 1].plot(freqs, 20*np.log10(np.abs(fft_ch1) + 1e-10), linewidth=0.5)
|
||||
axes[0, 1].set_xlabel('Frequency (Hz)')
|
||||
axes[0, 1].set_ylabel('Magnitude (dB)')
|
||||
axes[0, 1].set_title('Channel 1 Spectrum')
|
||||
axes[0, 1].set_xlim(0, min(10000, sample_rate/2))
|
||||
axes[0, 1].grid(True, alpha=0.3)
|
||||
|
||||
axes[1, 1].plot(freqs, 20*np.log10(np.abs(fft_ch2) + 1e-10), linewidth=0.5)
|
||||
axes[1, 1].set_xlabel('Frequency (Hz)')
|
||||
axes[1, 1].set_ylabel('Magnitude (dB)')
|
||||
axes[1, 1].set_title('Channel 2 Spectrum')
|
||||
axes[1, 1].set_xlim(0, min(10000, sample_rate/2))
|
||||
axes[1, 1].grid(True, alpha=0.3)
|
||||
|
||||
plt.tight_layout()
|
||||
plot_file = output_dir / f'artifact_detection_{frequency}Hz.png'
|
||||
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
|
||||
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
|
||||
import time
|
||||
|
||||
sample_rate = config['audio']['sample_rate']
|
||||
duration = config['artifact_detection']['duration']
|
||||
frequency = config['artifact_detection']['test_frequency']
|
||||
amplitude = config['artifact_detection']['amplitude']
|
||||
device_name = config['audio']['device_name']
|
||||
channels = config['audio']['channels']
|
||||
detector_config = config['artifact_detection']['detectors']
|
||||
startup_delay = config['artifact_detection'].get('startup_delay', 10)
|
||||
signal_type = config['artifact_detection'].get('signal_type', 'sine')
|
||||
|
||||
device_ids = find_audio_device(device_name)
|
||||
|
||||
if startup_delay > 0:
|
||||
print(f"Waiting {startup_delay} seconds for system to settle...")
|
||||
time.sleep(startup_delay)
|
||||
print("Starting recording...")
|
||||
|
||||
if signal_type == 'chirp':
|
||||
f0 = config['artifact_detection'].get('chirp_f0', 100)
|
||||
f1 = config['artifact_detection'].get('chirp_f1', 8000)
|
||||
tone = generate_chirp(duration, sample_rate, f0=f0, f1=f1, amplitude=amplitude)
|
||||
frequency = (f0 + f1) / 2
|
||||
recording = play_and_record(tone, sample_rate, device_ids, channels)
|
||||
elif signal_type == 'silent':
|
||||
frequency = 1000
|
||||
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
|
||||
channels=channels, device=device_ids[0], blocking=True)
|
||||
else:
|
||||
tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
|
||||
recording = play_and_record(tone, sample_rate, device_ids, channels)
|
||||
|
||||
channel_1 = recording[:, 0]
|
||||
channel_2 = recording[:, 1]
|
||||
|
||||
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
|
||||
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
|
||||
|
||||
if save_plots and output_dir:
|
||||
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
|
||||
frequency, sample_rate, output_dir)
|
||||
|
||||
anomalies_dir = output_dir / 'individual_anomalies'
|
||||
anomalies_dir.mkdir(exist_ok=True)
|
||||
|
||||
print(f"\nPlotting individual anomalies to: {anomalies_dir}")
|
||||
|
||||
for idx, artifact in enumerate(artifacts_ch1['artifacts']):
|
||||
plot_individual_anomaly(channel_1, artifact, idx, 'Channel 1 Loopback',
|
||||
frequency, sample_rate, anomalies_dir)
|
||||
|
||||
for idx, artifact in enumerate(artifacts_ch2['artifacts']):
|
||||
plot_individual_anomaly(channel_2, artifact, idx, 'Channel 2 DUT',
|
||||
frequency, sample_rate, anomalies_dir)
|
||||
|
||||
total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts'])
|
||||
if total_anomaly_plots > 0:
|
||||
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
|
||||
|
||||
result = {
|
||||
'signal_type': signal_type,
|
||||
'duration_sec': float(duration),
|
||||
'channel_1_loopback': {
|
||||
'total_artifacts': artifacts_ch1['total_count'],
|
||||
'artifacts_by_type': artifacts_ch1['by_type'],
|
||||
'artifact_rate_per_minute': float(artifacts_ch1['total_count'] / duration * 60),
|
||||
'frequency_accuracy': artifacts_ch1['frequency_accuracy']
|
||||
},
|
||||
'channel_2_dut': {
|
||||
'total_artifacts': artifacts_ch2['total_count'],
|
||||
'artifacts_by_type': artifacts_ch2['by_type'],
|
||||
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
|
||||
'frequency_accuracy': artifacts_ch2['frequency_accuracy']
|
||||
},
|
||||
'detector_config': detector_config
|
||||
}
|
||||
|
||||
if signal_type == 'chirp':
|
||||
f0 = config['artifact_detection'].get('chirp_f0', 100)
|
||||
f1 = config['artifact_detection'].get('chirp_f1', 8000)
|
||||
result['chirp_f0_hz'] = int(f0)
|
||||
result['chirp_f1_hz'] = int(f1)
|
||||
elif signal_type == 'silent':
|
||||
result['note'] = 'Silent mode - no playback, noise floor measurement'
|
||||
else:
|
||||
result['test_frequency_hz'] = int(frequency)
|
||||
|
||||
return result
|
||||
|
||||
182
test_artifact_detection.py
Executable file
182
test_artifact_detection.py
Executable file
@@ -0,0 +1,182 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import yaml
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from src.audio_tests import run_artifact_detection_test
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Run artifact detection test on audio loopback and radio path')
|
||||
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
|
||||
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
|
||||
parser.add_argument('--comment', default='', help='Comments about this test')
|
||||
parser.add_argument('--config', default='config.yaml', help='Path to config file')
|
||||
parser.add_argument('--duration', type=float, help='Override recording duration in seconds (default from config)')
|
||||
parser.add_argument('--frequency', type=float, help='Override test frequency in Hz (default from config)')
|
||||
parser.add_argument('--signal-type', choices=['sine', 'chirp', 'silent'], default='sine',
|
||||
help='Signal type: sine (single frequency), chirp (frequency sweep), or silent (no signal)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.config, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
if args.duration:
|
||||
config['artifact_detection']['duration'] = args.duration
|
||||
if args.frequency:
|
||||
config['artifact_detection']['test_frequency'] = args.frequency
|
||||
|
||||
config['artifact_detection']['signal_type'] = args.signal_type
|
||||
|
||||
timestamp = datetime.now()
|
||||
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
||||
|
||||
results_dir = Path(config['output']['results_dir'])
|
||||
|
||||
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_artifact_detection"
|
||||
test_output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
save_plots = config['output'].get('save_plots', False)
|
||||
|
||||
print("=" * 70)
|
||||
print("ARTIFACT DETECTION TEST")
|
||||
print("=" * 70)
|
||||
print(f"Test ID: {test_id}")
|
||||
print(f"Serial Number: {args.serial_number}")
|
||||
print(f"Software: {args.software_version}")
|
||||
if args.comment:
|
||||
print(f"Comment: {args.comment}")
|
||||
print(f"Duration: {config['artifact_detection']['duration']} seconds")
|
||||
signal_type = config['artifact_detection'].get('signal_type', 'sine')
|
||||
if signal_type == 'sine':
|
||||
print(f"Signal Type: Sine wave @ {config['artifact_detection']['test_frequency']} Hz")
|
||||
elif signal_type == 'chirp':
|
||||
print(f"Signal Type: Chirp (100 Hz - 8000 Hz)")
|
||||
else:
|
||||
print(f"Signal Type: Silent (no playback - noise floor measurement)")
|
||||
if save_plots:
|
||||
print(f"Plots will be saved to: {test_output_dir}")
|
||||
print("-" * 70)
|
||||
|
||||
print("\nDetection Algorithms:")
|
||||
for detector_name, detector_settings in config['artifact_detection']['detectors'].items():
|
||||
status = "ENABLED" if detector_settings.get('enabled', False) else "DISABLED"
|
||||
print(f" - {detector_name}: {status}")
|
||||
if detector_settings.get('enabled', False):
|
||||
for param, value in detector_settings.items():
|
||||
if param != 'enabled':
|
||||
print(f" {param}: {value}")
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
signal_type = config['artifact_detection'].get('signal_type', 'sine')
|
||||
if signal_type == 'sine':
|
||||
freq = config['artifact_detection']['test_frequency']
|
||||
print(f"STARTING TEST - Playing {freq}Hz sine wave and recording both channels...")
|
||||
elif signal_type == 'chirp':
|
||||
print("STARTING TEST - Playing chirp signal (100-8000Hz) and recording both channels...")
|
||||
else:
|
||||
print("STARTING TEST - Recording silence (no playback)...")
|
||||
print("=" * 70)
|
||||
print("\nChannel 1: Loopback path (direct audio interface loopback)")
|
||||
print("Channel 2: DUT/Radio path (through beacon and radio transmission)")
|
||||
print()
|
||||
|
||||
try:
|
||||
result = run_artifact_detection_test(config, save_plots=save_plots, output_dir=test_output_dir)
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("TEST COMPLETE - RESULTS")
|
||||
print("=" * 70)
|
||||
|
||||
signal_type = result.get('signal_type', 'sine')
|
||||
if signal_type == 'chirp':
|
||||
print(f"\n📊 Signal: Chirp {result['chirp_f0_hz']} Hz → {result['chirp_f1_hz']} Hz")
|
||||
elif signal_type == 'silent':
|
||||
print(f"\n📊 Signal: Silent (no playback - noise floor measurement)")
|
||||
else:
|
||||
print(f"\n📊 Test Frequency: {result['test_frequency_hz']} Hz")
|
||||
print(f"⏱️ Duration: {result['duration_sec']} seconds")
|
||||
|
||||
print("\n🔊 CHANNEL 1 (LOOPBACK PATH):")
|
||||
print(f" Total Artifacts: {result['channel_1_loopback']['total_artifacts']}")
|
||||
print(f" Artifact Rate: {result['channel_1_loopback']['artifact_rate_per_minute']:.2f} per minute")
|
||||
if result['channel_1_loopback']['artifacts_by_type']:
|
||||
print(" By Type:")
|
||||
for artifact_type, count in result['channel_1_loopback']['artifacts_by_type'].items():
|
||||
print(f" - {artifact_type}: {count}")
|
||||
|
||||
# Display frequency accuracy for channel 1
|
||||
if 'frequency_accuracy' in result['channel_1_loopback']:
|
||||
freq_acc = result['channel_1_loopback']['frequency_accuracy']
|
||||
print(f" Frequency Accuracy:")
|
||||
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
|
||||
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
|
||||
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
|
||||
|
||||
print("\n📻 CHANNEL 2 (DUT/RADIO PATH):")
|
||||
print(f" Total Artifacts: {result['channel_2_dut']['total_artifacts']}")
|
||||
print(f" Artifact Rate: {result['channel_2_dut']['artifact_rate_per_minute']:.2f} per minute")
|
||||
if result['channel_2_dut']['artifacts_by_type']:
|
||||
print(" By Type:")
|
||||
for artifact_type, count in result['channel_2_dut']['artifacts_by_type'].items():
|
||||
print(f" - {artifact_type}: {count}")
|
||||
|
||||
# Display frequency accuracy for channel 2
|
||||
if 'frequency_accuracy' in result['channel_2_dut']:
|
||||
freq_acc = result['channel_2_dut']['frequency_accuracy']
|
||||
print(f" Frequency Accuracy:")
|
||||
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
|
||||
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
|
||||
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
|
||||
|
||||
ch1_count = result['channel_1_loopback']['total_artifacts']
|
||||
ch2_count = result['channel_2_dut']['total_artifacts']
|
||||
|
||||
if ch2_count > ch1_count:
|
||||
delta = ch2_count - ch1_count
|
||||
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
|
||||
elif ch1_count == ch2_count == 0:
|
||||
print("\n✅ EXCELLENT: No artifacts detected in either path!")
|
||||
else:
|
||||
print(f"\nℹ️ Loopback baseline: {ch1_count} artifacts")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ ERROR: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
result = {
|
||||
'error': str(e),
|
||||
'test_frequency_hz': config['artifact_detection']['test_frequency'],
|
||||
'duration_sec': config['artifact_detection']['duration']
|
||||
}
|
||||
|
||||
output_data = {
|
||||
'metadata': {
|
||||
'test_id': test_id,
|
||||
'timestamp': timestamp.isoformat(),
|
||||
'serial_number': args.serial_number,
|
||||
'software_version': args.software_version,
|
||||
'comment': args.comment
|
||||
},
|
||||
'artifact_detection_result': result
|
||||
}
|
||||
|
||||
output_file = test_output_dir / f"{test_id}_artifact_detection_results.yaml"
|
||||
with open(output_file, 'w') as f:
|
||||
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("✅ Results saved to:")
|
||||
print(f" YAML: {output_file}")
|
||||
if save_plots:
|
||||
print(f" Summary plots: {test_output_dir}/")
|
||||
print(f" Individual anomaly plots: {test_output_dir}/individual_anomalies/")
|
||||
print("=" * 70)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -6,16 +6,16 @@ from pathlib import Path
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from src.audio_tests import run_single_test, run_latency_test
|
||||
from src.audio_tests import run_latency_test
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Run PCB hardware audio tests')
|
||||
parser.add_argument('--pcb-version', required=True, help='PCB version (e.g., v2.1)')
|
||||
parser.add_argument('--pcb-revision', required=True, help='PCB revision (e.g., A, B, C)')
|
||||
parser = argparse.ArgumentParser(description='Run latency test on audio loopback and radio path')
|
||||
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
|
||||
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
|
||||
parser.add_argument('--notes', default='', help='Adjustments or comments about this test')
|
||||
parser.add_argument('--comment', default='', help='Comments about this test')
|
||||
parser.add_argument('--config', default='config.yaml', help='Path to config file')
|
||||
parser.add_argument('--measurements', type=int, default=5, help='Number of latency measurements (default: 5)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -26,61 +26,47 @@ def main():
|
||||
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
||||
|
||||
results_dir = Path(config['output']['results_dir'])
|
||||
results_dir.mkdir(exist_ok=True)
|
||||
|
||||
test_output_dir = results_dir / test_id
|
||||
test_output_dir.mkdir(exist_ok=True)
|
||||
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency"
|
||||
test_output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
save_plots = config['output'].get('save_plots', False)
|
||||
|
||||
print(f"Starting audio test run: {test_id}")
|
||||
print(f"PCB: {args.pcb_version} Rev {args.pcb_revision}")
|
||||
print(f"Starting latency test: {test_id}")
|
||||
print(f"Serial Number: {args.serial_number}")
|
||||
print(f"Software: {args.software_version}")
|
||||
if args.comment:
|
||||
print(f"Comment: {args.comment}")
|
||||
print(f"Measurements: {args.measurements}")
|
||||
if save_plots:
|
||||
print(f"Plots will be saved to: {test_output_dir}")
|
||||
print("-" * 60)
|
||||
|
||||
print("\n[1/2] Running chirp-based latency test (5 measurements)...")
|
||||
print(f"\nRunning chirp-based latency test ({args.measurements} measurements)...")
|
||||
try:
|
||||
latency_stats = run_latency_test(config, num_measurements=5,
|
||||
latency_stats = run_latency_test(config, num_measurements=args.measurements,
|
||||
save_plots=save_plots, output_dir=test_output_dir)
|
||||
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, "
|
||||
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms")
|
||||
valid = latency_stats.get('valid', True)
|
||||
status = "PASS" if valid else "FAIL"
|
||||
print(f"{'✓' if valid else '✗'} Latency [{status}]: avg={latency_stats['avg']:.3f}ms, "
|
||||
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
|
||||
f"std={latency_stats['std']:.3f}ms")
|
||||
except Exception as e:
|
||||
print(f"✗ Error: {e}")
|
||||
latency_stats = {'avg': 0.0, 'min': 0.0, 'max': 0.0, 'std': 0.0, 'error': str(e)}
|
||||
|
||||
print("\n[2/2] Running frequency sweep tests...")
|
||||
test_results = []
|
||||
frequencies = config['test_tones']['frequencies']
|
||||
|
||||
for i, freq in enumerate(frequencies, 1):
|
||||
print(f"Testing frequency {i}/{len(frequencies)}: {freq} Hz...", end=' ', flush=True)
|
||||
try:
|
||||
result = run_single_test(freq, config, save_plots=save_plots, output_dir=test_output_dir)
|
||||
test_results.append(result)
|
||||
print("✓")
|
||||
except Exception as e:
|
||||
print(f"✗ Error: {e}")
|
||||
test_results.append({
|
||||
'frequency_hz': freq,
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
output_data = {
|
||||
'metadata': {
|
||||
'test_id': test_id,
|
||||
'timestamp': timestamp.isoformat(),
|
||||
'pcb_version': args.pcb_version,
|
||||
'pcb_revision': args.pcb_revision,
|
||||
'serial_number': args.serial_number,
|
||||
'software_version': args.software_version,
|
||||
'notes': args.notes
|
||||
'comment': args.comment
|
||||
},
|
||||
'latency_test': latency_stats,
|
||||
'test_results': test_results
|
||||
'latency_test': latency_stats
|
||||
}
|
||||
|
||||
output_file = results_dir / f"{test_id}_results.yaml"
|
||||
output_file = test_output_dir / f"{test_id}_latency_results.yaml"
|
||||
with open(output_file, 'w') as f:
|
||||
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
361
test_latency_buildup.py
Executable file
361
test_latency_buildup.py
Executable file
@@ -0,0 +1,361 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import yaml
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from src.audio_tests import run_latency_test, find_audio_device, generate_chirp, play_and_record, calculate_latency
|
||||
|
||||
|
||||
class LatencyBuildupTest:
|
||||
def __init__(self, config, measurement_interval=30, max_duration=None):
|
||||
self.config = config
|
||||
self.measurement_interval = measurement_interval
|
||||
self.max_duration = max_duration
|
||||
self.running = False
|
||||
self.measurements = []
|
||||
self.start_time = None
|
||||
|
||||
def signal_handler(self, signum, frame):
|
||||
print(f"\n\n{'='*70}")
|
||||
print("TEST STOPPED - Generating final results...")
|
||||
print(f"{'='*70}")
|
||||
self.running = False
|
||||
|
||||
def run_single_latency_measurement(self):
|
||||
"""Run a single latency measurement and return the result"""
|
||||
try:
|
||||
# Use existing latency test function with 1 measurement for speed
|
||||
latency_stats = run_latency_test(self.config, num_measurements=1,
|
||||
save_plots=False, output_dir=None)
|
||||
return latency_stats['avg']
|
||||
except Exception as e:
|
||||
print(f"❌ Error in latency measurement: {e}")
|
||||
return None
|
||||
|
||||
def analyze_buildup(self, latencies, timestamps):
|
||||
"""Analyze latency build-up and return analysis results"""
|
||||
if len(latencies) < 2:
|
||||
return {
|
||||
'buildup_detected': False,
|
||||
'start_latency': 0,
|
||||
'end_latency': 0,
|
||||
'change_ms': 0,
|
||||
'change_percent': 0,
|
||||
'trend': 'insufficient_data'
|
||||
}
|
||||
|
||||
start_latency = latencies[0]
|
||||
end_latency = latencies[-1]
|
||||
change_ms = end_latency - start_latency
|
||||
change_percent = (change_ms / start_latency) * 100 if start_latency > 0 else 0
|
||||
|
||||
# Determine if buildup occurred (±5% threshold)
|
||||
buildup_detected = abs(change_percent) > 5.0
|
||||
|
||||
# Calculate trend using linear regression
|
||||
if len(latencies) >= 3:
|
||||
x = np.arange(len(latencies))
|
||||
y = np.array(latencies)
|
||||
slope = np.polyfit(x, y, 1)[0]
|
||||
|
||||
if slope > 0.01: # Positive trend
|
||||
trend = 'increasing'
|
||||
elif slope < -0.01: # Negative trend
|
||||
trend = 'decreasing'
|
||||
else:
|
||||
trend = 'stable'
|
||||
else:
|
||||
trend = 'insufficient_data'
|
||||
|
||||
return {
|
||||
'buildup_detected': buildup_detected,
|
||||
'start_latency': start_latency,
|
||||
'end_latency': end_latency,
|
||||
'change_ms': change_ms,
|
||||
'change_percent': change_percent,
|
||||
'trend': trend
|
||||
}
|
||||
|
||||
def plot_latency_buildup(self, timestamps, latencies, output_dir):
|
||||
"""Create and save latency over time plot"""
|
||||
fig, ax = plt.subplots(1, 1, figsize=(12, 6))
|
||||
|
||||
# Convert timestamps to relative time in seconds
|
||||
relative_times = [(t - timestamps[0]).total_seconds() for t in timestamps]
|
||||
|
||||
# Plot latency measurements
|
||||
ax.plot(relative_times, latencies, 'b-o', markersize=4, linewidth=2, label='Latency Measurements')
|
||||
|
||||
# Add trend line if we have enough data
|
||||
if len(latencies) >= 3:
|
||||
x = np.array(relative_times)
|
||||
y = np.array(latencies)
|
||||
z = np.polyfit(x, y, 1)
|
||||
p = np.poly1d(z)
|
||||
ax.plot(x, p(x), "r--", alpha=0.8, linewidth=2, label=f'Trend: {z[0]:.4f} ms/s')
|
||||
|
||||
# Add reference line for start latency
|
||||
ax.axhline(y=latencies[0], color='g', linestyle=':', alpha=0.7,
|
||||
label=f'Start: {latencies[0]:.3f} ms')
|
||||
|
||||
ax.set_xlabel('Time (seconds)', fontsize=12)
|
||||
ax.set_ylabel('Latency (ms)', fontsize=12)
|
||||
ax.set_title('Latency Build-up Over Time', fontsize=14, fontweight='bold')
|
||||
ax.grid(True, alpha=0.3)
|
||||
ax.legend()
|
||||
|
||||
# Format y-axis to show reasonable precision
|
||||
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:.3f}'))
|
||||
|
||||
plt.tight_layout()
|
||||
plot_file = output_dir / 'latency_buildup_graph.png'
|
||||
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
|
||||
plt.close()
|
||||
|
||||
return plot_file
|
||||
|
||||
def run_test(self):
|
||||
"""Run the latency build-up test"""
|
||||
self.running = True
|
||||
self.start_time = datetime.now()
|
||||
|
||||
print(f"Starting latency build-up test at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print(f"Measurement interval: {self.measurement_interval} seconds")
|
||||
if self.max_duration:
|
||||
print(f"Maximum duration: {self.max_duration} seconds")
|
||||
print("Press Ctrl+C to stop the test early")
|
||||
print("=" * 70)
|
||||
|
||||
# Set up signal handler for graceful shutdown
|
||||
signal.signal(signal.SIGINT, self.signal_handler)
|
||||
signal.signal(signal.SIGTERM, self.signal_handler)
|
||||
|
||||
measurement_count = 0
|
||||
|
||||
while self.running:
|
||||
current_time = datetime.now()
|
||||
measurement_count += 1
|
||||
|
||||
print(f"\n[{current_time.strftime('%H:%M:%S')}] Measurement #{measurement_count}")
|
||||
|
||||
# Perform latency measurement
|
||||
latency = self.run_single_latency_measurement()
|
||||
|
||||
if latency is not None:
|
||||
self.measurements.append((current_time, latency))
|
||||
timestamps, latencies = zip(*self.measurements)
|
||||
|
||||
# Calculate current statistics
|
||||
avg_latency = np.mean(latencies)
|
||||
min_latency = np.min(latencies)
|
||||
max_latency = np.max(latencies)
|
||||
std_latency = np.std(latencies)
|
||||
|
||||
print(f" Current latency: {latency:.3f} ms")
|
||||
print(f" Average so far: {avg_latency:.3f} ms")
|
||||
print(f" Range: {min_latency:.3f} - {max_latency:.3f} ms")
|
||||
print(f" Std deviation: {std_latency:.3f} ms")
|
||||
|
||||
# Analyze for buildup
|
||||
analysis = self.analyze_buildup(latencies, timestamps)
|
||||
if analysis['buildup_detected']:
|
||||
print(f" ⚠️ BUILDUP DETECTED: {analysis['change_percent']:+.2f}% "
|
||||
f"({analysis['change_ms']:+.3f} ms)")
|
||||
else:
|
||||
print(f" ✅ No significant buildup: {analysis['change_percent']:+.2f}%")
|
||||
|
||||
print(f" Trend: {analysis['trend']}")
|
||||
else:
|
||||
print(f" ❌ Measurement failed")
|
||||
|
||||
# Check if we should continue
|
||||
if self.max_duration:
|
||||
elapsed = (current_time - self.start_time).total_seconds()
|
||||
if elapsed >= self.max_duration:
|
||||
print(f"\nMaximum duration of {self.max_duration} seconds reached")
|
||||
break
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
# Wait for next measurement with interruptible sleep
|
||||
if self.running:
|
||||
print(f" Waiting {self.measurement_interval} seconds...")
|
||||
# Sleep in smaller chunks to allow quick interruption
|
||||
sleep_chunk = 1.0 # Check every second
|
||||
time_slept = 0
|
||||
while self.running and time_slept < self.measurement_interval:
|
||||
time.sleep(min(sleep_chunk, self.measurement_interval - time_slept))
|
||||
time_slept += sleep_chunk
|
||||
|
||||
return self.generate_results()
|
||||
|
||||
def generate_results(self):
|
||||
"""Generate final results and analysis"""
|
||||
if not self.measurements:
|
||||
return {'error': 'No measurements completed'}
|
||||
|
||||
timestamps, latencies = zip(*self.measurements)
|
||||
end_time = datetime.now()
|
||||
total_duration = (end_time - self.start_time).total_seconds()
|
||||
|
||||
# Final analysis
|
||||
analysis = self.analyze_buildup(latencies, timestamps)
|
||||
|
||||
# Statistics
|
||||
stats = {
|
||||
'count': len(latencies),
|
||||
'avg_ms': float(np.mean(latencies)),
|
||||
'min_ms': float(np.min(latencies)),
|
||||
'max_ms': float(np.max(latencies)),
|
||||
'std_ms': float(np.std(latencies)),
|
||||
'range_ms': float(np.max(latencies) - np.min(latencies))
|
||||
}
|
||||
|
||||
results = {
|
||||
'test_metadata': {
|
||||
'start_time': self.start_time.isoformat(),
|
||||
'end_time': end_time.isoformat(),
|
||||
'total_duration_sec': total_duration,
|
||||
'measurement_interval_sec': self.measurement_interval,
|
||||
'total_measurements': len(latencies)
|
||||
},
|
||||
'latency_measurements': [
|
||||
{
|
||||
'timestamp': t.isoformat(),
|
||||
'latency_ms': float(l)
|
||||
}
|
||||
for t, l in self.measurements
|
||||
],
|
||||
'statistics': stats,
|
||||
'buildup_analysis': analysis
|
||||
}
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Run latency build-up test over time')
|
||||
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
|
||||
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
|
||||
parser.add_argument('--comment', default='', help='Comments about this test')
|
||||
parser.add_argument('--config', default='config.yaml', help='Path to config file')
|
||||
parser.add_argument('--interval', type=int, help='Measurement interval in seconds (default from config)')
|
||||
parser.add_argument('--duration', type=int, help='Maximum test duration in seconds (default: run until canceled)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.config, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
# Use config values as defaults if not overridden by command line
|
||||
measurement_interval = args.interval if args.interval else config['latency_buildup']['measurement_interval']
|
||||
max_duration = args.duration if args.duration else config['latency_buildup'].get('max_duration')
|
||||
|
||||
timestamp = datetime.now()
|
||||
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
||||
|
||||
results_dir = Path(config['output']['results_dir'])
|
||||
|
||||
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency_buildup"
|
||||
test_output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
save_plots = config['output'].get('save_plots', False)
|
||||
|
||||
print("=" * 70)
|
||||
print("LATENCY BUILD-UP TEST")
|
||||
print("=" * 70)
|
||||
print(f"Test ID: {test_id}")
|
||||
print(f"Serial Number: {args.serial_number}")
|
||||
print(f"Software: {args.software_version}")
|
||||
if args.comment:
|
||||
print(f"Comment: {args.comment}")
|
||||
print(f"Measurement Interval: {measurement_interval} seconds")
|
||||
if max_duration:
|
||||
print(f"Maximum Duration: {max_duration} seconds")
|
||||
else:
|
||||
print("Duration: Run until canceled (Ctrl+C)")
|
||||
if save_plots:
|
||||
print(f"Plots will be saved to: {test_output_dir}")
|
||||
print("-" * 70)
|
||||
|
||||
# Create and run test
|
||||
test = LatencyBuildupTest(config, measurement_interval=measurement_interval, max_duration=max_duration)
|
||||
results = test.run_test()
|
||||
|
||||
# Display final results
|
||||
print("\n" + "=" * 70)
|
||||
print("TEST COMPLETE - FINAL RESULTS")
|
||||
print("=" * 70)
|
||||
|
||||
if 'error' in results:
|
||||
print(f"❌ Test failed: {results['error']}")
|
||||
else:
|
||||
metadata = results['test_metadata']
|
||||
stats = results['statistics']
|
||||
analysis = results['buildup_analysis']
|
||||
|
||||
print(f"\n📊 Test Summary:")
|
||||
print(f" Duration: {metadata['total_duration_sec']:.1f} seconds")
|
||||
print(f" Measurements: {metadata['total_measurements']}")
|
||||
print(f" Interval: {metadata['measurement_interval_sec']} seconds")
|
||||
|
||||
print(f"\n⏱️ Latency Statistics:")
|
||||
print(f" Average: {stats['avg_ms']:.3f} ms")
|
||||
print(f" Range: {stats['min_ms']:.3f} - {stats['max_ms']:.3f} ms")
|
||||
print(f" Std Dev: {stats['std_ms']:.3f} ms")
|
||||
|
||||
print(f"\n📈 Build-up Analysis:")
|
||||
print(f" Start Latency: {analysis['start_latency']:.3f} ms")
|
||||
print(f" End Latency: {analysis['end_latency']:.3f} ms")
|
||||
print(f" Change: {analysis['change_ms']:+.3f} ms ({analysis['change_percent']:+.2f}%)")
|
||||
print(f" Trend: {analysis['trend']}")
|
||||
|
||||
if analysis['buildup_detected']:
|
||||
print(f"\n⚠️ BUILDUP DETECTED!")
|
||||
print(f" Latency changed by {abs(analysis['change_percent']):.2f}% (threshold: ±5%)")
|
||||
else:
|
||||
print(f"\n✅ No significant buildup detected")
|
||||
print(f" Latency change within acceptable range (±5%)")
|
||||
|
||||
# Generate and save plot
|
||||
if save_plots and len(results['latency_measurements']) > 1:
|
||||
timestamps = [datetime.fromisoformat(m['timestamp']) for m in results['latency_measurements']]
|
||||
latencies = [m['latency_ms'] for m in results['latency_measurements']]
|
||||
|
||||
plot_file = test.plot_latency_buildup(timestamps, latencies, test_output_dir)
|
||||
print(f"\n📊 Latency graph saved to: {plot_file}")
|
||||
|
||||
# Save results to file
|
||||
output_data = {
|
||||
'metadata': {
|
||||
'test_id': test_id,
|
||||
'timestamp': timestamp.isoformat(),
|
||||
'serial_number': args.serial_number,
|
||||
'software_version': args.software_version,
|
||||
'comment': args.comment
|
||||
},
|
||||
'latency_buildup_result': results
|
||||
}
|
||||
|
||||
output_file = test_output_dir / f"{test_id}_latency_buildup_results.yaml"
|
||||
with open(output_file, 'w') as f:
|
||||
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("✅ Results saved to:")
|
||||
print(f" YAML: {output_file}")
|
||||
if save_plots and len(results.get('latency_measurements', [])) > 1:
|
||||
print(f" Graph: {test_output_dir}/latency_buildup_graph.png")
|
||||
print("=" * 70)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -88,7 +88,7 @@ def display_results(yaml_file: Path):
|
||||
|
||||
|
||||
def list_all_results(results_dir: Path):
|
||||
yaml_files = sorted(results_dir.glob("*_results.yaml"))
|
||||
yaml_files = sorted(results_dir.rglob("*_results.yaml"))
|
||||
|
||||
if not yaml_files:
|
||||
print("No test results found.")
|
||||
|
||||
Reference in New Issue
Block a user