Compare commits

...

7 Commits

Author SHA1 Message Date
Pbopbo
0c7de92ae9 Refactoring to sort results in year/month/day forlder. 2026-04-08 11:10:12 +02:00
Pbopbo
dd118ddb23 Adds check if test is valid to latency test. 2026-04-08 10:27:22 +02:00
Pbopbo
31cc2c0e92 more plotting 2026-04-07 16:54:05 +02:00
Pbopbo
2cab55c8cd Plotting for write read time and avail logs. 2026-03-31 11:55:44 +02:00
Pbopbo
8d3a144614 Adds latency build up test. 2026-03-31 10:53:55 +02:00
Pbopbo
5d5a131b77 Refactoring and minor improvents. 2026-03-23 13:52:27 +01:00
Pbopbo
39bcd072c0 Adds artifact test. 2026-03-18 10:41:46 +01:00
14 changed files with 1881 additions and 96 deletions

View File

@@ -0,0 +1,193 @@
# Artifact Detection Test
## Overview
This test plays a 1kHz sine wave for a configurable duration (default 60 seconds) and records both channels simultaneously:
- **Channel 1**: Loopback path (direct audio interface connection)
- **Channel 2**: DUT/Radio path (through beacon and radio transmission)
The test detects buzzing, clicks, dropouts, and other audio artifacts using multiple configurable algorithms.
## Quick Start
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--comment "Testing new firmware"
```
## Detection Algorithms
The test uses four configurable detection algorithms (spectral_anomaly is **disabled by default** due to false positives):
### 1. Spectral Anomaly Detection (DISABLED BY DEFAULT)
- **Status**: ⚠️ Currently generates too many false positives - disabled by default
- **What it detects**: Unexpected frequencies that aren't harmonics of the fundamental tone
- **Use case**: Buzzing, interference, crosstalk
- **Configuration**: `threshold_db` - how far below fundamental to search (-60 dB default)
### 2. Amplitude Spike Detection (WORKING)
- **What it detects**: Sudden changes in signal amplitude (RMS)
- **Use case**: Clicks, pops, dropouts
- **Configuration**: `threshold_factor` - number of standard deviations (3.0 default)
### 3. Zero-Crossing Anomaly Detection (WORKING)
- **What it detects**: Irregular zero-crossing patterns
- **Use case**: Distortion, clipping, non-linear artifacts
- **Configuration**: `threshold_factor` - number of standard deviations (2.0 default)
### 4. Energy Variation Detection (WORKING)
- **What it detects**: Rapid energy changes between time windows
- **Use case**: Dropouts, level fluctuations, intermittent issues
- **Configuration**: `threshold_db` - energy change threshold (6.0 dB default)
## Configuration
Edit `config.yaml` to customize the test:
```yaml
artifact_detection:
test_frequency: 1000 # Hz
duration: 60.0 # seconds
amplitude: 0.5 # 0.0 to 1.0
detectors:
spectral_anomaly:
enabled: true
threshold_db: -40
amplitude_spikes:
enabled: true
threshold_factor: 3.0
zero_crossing:
enabled: true
threshold_factor: 2.0
energy_variation:
enabled: true
threshold_db: 6.0
```
## Command Line Options
- `--serial-number`: Serial number (required)
- `--software-version`: Git commit hash or version (required)
- `--comment`: Optional comments about the test
- `--config`: Path to config file (default: config.yaml)
- `--duration`: Override duration in seconds
- `--frequency`: Override test frequency in Hz
## Example: Quick 10-second Test
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--duration 10
```
## Example: Custom Frequency
```bash
python test_artifact_detection.py \
--serial-number SN001234 \
--software-version abc123 \
--frequency 440
```
## Tuning Detection Algorithms
### More Sensitive Detection
To catch more subtle artifacts, make thresholds stricter:
```yaml
detectors:
spectral_anomaly:
threshold_db: -50 # Lower = more sensitive
amplitude_spikes:
threshold_factor: 2.0 # Lower = more sensitive
zero_crossing:
threshold_factor: 1.5 # Lower = more sensitive
energy_variation:
threshold_db: 3.0 # Lower = more sensitive
```
### Less Sensitive Detection
To reduce false positives in noisy environments:
```yaml
detectors:
spectral_anomaly:
threshold_db: -30 # Higher = less sensitive
amplitude_spikes:
threshold_factor: 4.0 # Higher = less sensitive
zero_crossing:
threshold_factor: 3.0 # Higher = less sensitive
energy_variation:
threshold_db: 10.0 # Higher = less sensitive
```
### Disable Specific Detectors
```yaml
detectors:
spectral_anomaly:
enabled: false # Turn off this detector
```
## Output
The test generates:
1. **YAML results file**: `test_results/{timestamp}_artifact_detection_results.yaml`
2. **JSON results file**: `test_results/{timestamp}_artifact_detection_results.json`
3. **Summary plots** (if enabled): `test_results/{timestamp}_artifact_detection/`
- Time domain waveforms with artifact markers
- Frequency spectrum analysis
4. **Individual anomaly plots**: `test_results/{timestamp}_artifact_detection/individual_anomalies/`
- Each anomaly plotted individually with ~20 periods of context
- Detailed view showing exactly what the anomaly looks like
- Named by channel, type, and timestamp for easy identification
### Results Structure
```yaml
metadata:
test_id: "20260317_140530"
timestamp: "2026-03-17T14:05:30.123456"
test_type: "artifact_detection"
pcb_version: "v2.1"
pcb_revision: "A"
software_version: "abc123"
artifact_detection_result:
test_frequency_hz: 1000
duration_sec: 60.0
channel_1_loopback:
total_artifacts: 5
artifact_rate_per_minute: 5.0
artifacts_by_type:
spectral_anomaly: 2
amplitude_spike: 3
channel_2_dut:
total_artifacts: 23
artifact_rate_per_minute: 23.0
artifacts_by_type:
spectral_anomaly: 8
amplitude_spike: 10
energy_variation: 5
detector_config: {...}
```
## Interpreting Results
- **Zero artifacts in both channels**: Excellent signal quality
- **Same artifacts in both channels**: Likely environmental interference or audio interface issue
- **More artifacts in Channel 2 (radio path)**: Radio transmission degradation detected
- **High spectral_anomaly count**: Interference or crosstalk
- **High amplitude_spike count**: Clicks, pops, or dropouts
- **High energy_variation count**: Level instability or dropouts
## Comparison with Loopback Baseline
The loopback path (Channel 1) serves as a baseline reference. Any additional artifacts in the radio path (Channel 2) indicate degradation introduced by the radio transmission system.
Expected behavior:
- Loopback should have minimal artifacts (ideally zero)
- Radio path may have some artifacts due to transmission
- Large difference indicates issues in radio hardware/firmware

View File

@@ -12,19 +12,18 @@ pip install -r requirements.txt
### 1. Run Your First Test
```bash
python run_test.py \
--pcb-version "v1.0" \
--pcb-revision "A" \
python test_latency.py \
--serial-number "SN001234" \
--software-version "initial" \
--notes "First test run"
--comment "First test run"
```
**What happens:**
- Auto-detects your Scarlett audio interface
- Plays test tones at 7 frequencies (100 Hz to 8 kHz)
- Plays chirp signal and measures latency (5 measurements by default)
- Records input/output on both channels
- Calculates latency, THD, and SNR
- Saves results to `test_results/YYYYMMDD_HHMMSS_results.yaml`
- Calculates average, min, max, and standard deviation of latency
- Saves results to `test_results/YYYYMMDD_HHMMSS_latency/YYYYMMDD_HHMMSS_latency_results.yaml`
### 2. View Results
@@ -39,35 +38,33 @@ python view_results.py test_results/20260226_123456_results.yaml
python view_results.py example_test_result.yaml
```
### 3. Compare Different PCB Versions
### 3. Compare Different Units
Run multiple tests with different metadata:
```bash
# Test PCB v1.0
python run_test.py --pcb-version "v1.0" --pcb-revision "A" --software-version "abc123"
# Test unit SN001234
python test_latency.py --serial-number "SN001234" --software-version "abc123"
# Test PCB v2.0
python run_test.py --pcb-version "v2.0" --pcb-revision "A" --software-version "abc123"
# Test unit SN001235 with more measurements
python test_latency.py --serial-number "SN001235" --software-version "abc123" --measurements 10
# Compare by viewing both YAML files
python view_results.py test_results/20260226_120000_results.yaml
python view_results.py test_results/20260226_130000_results.yaml
python view_results.py test_results/20260226_120000_latency/20260226_120000_latency_results.yaml
python view_results.py test_results/20260226_130000_latency/20260226_130000_latency_results.yaml
```
## Understanding the Output
Each test produces metrics at 7 frequencies:
Each latency test produces:
- **Latency (ms)**: Delay between channels (should be near 0 for loopback)
- **THD Input (%)**: Distortion in channel 1 (lower is better)
- **THD Output (%)**: Distortion in channel 2 (lower is better)
- **SNR Input (dB)**: Signal quality in channel 1 (higher is better)
- **SNR Output (dB)**: Signal quality in channel 2 (higher is better)
- **Average Latency (ms)**: Mean delay across all measurements
- **Min/Max Latency (ms)**: Range of measured values
- **Standard Deviation (ms)**: Consistency of measurements (lower is better)
**Good values:**
- THD: < 0.1% (< 0.01% is excellent)
- SNR: > 80 dB (> 90 dB is excellent)
- Latency: Depends on your system (audio interface typically < 10ms)
- Standard Deviation: < 1ms (consistent measurements)
- Latency: < 5 ms for loopback
## Configuration
@@ -75,11 +72,21 @@ Each test produces metrics at 7 frequencies:
Edit `config.yaml` to customize test parameters:
```yaml
test_tones:
frequencies: [1000] # Test only 1 kHz
duration: 3.0 # Shorter test (3 seconds)
audio:
sample_rate: 44100
channels: 2
device_name: "Scarlett"
output:
results_dir: "test_results"
save_plots: true
```
```bash
python -c "import sounddevice as sd; print(sd.query_devices())"
```
Update `device_name` in `config.yaml` to match your device.
## Troubleshooting
**Audio device not found:**

View File

@@ -4,8 +4,8 @@ Simple Python-based testing system for PCB audio hardware validation.
## Features
- **Automated Testing**: Latency, THD, and SNR measurements across multiple frequencies
- **Metadata Tracking**: PCB version, revision, software version, timestamps, notes
- **Automated Testing**: Latency measurements with configurable iterations
- **Metadata Tracking**: Serial number, software version, timestamps, comments
- **YAML Output**: Human-readable structured results
- **Simple Workflow**: Run tests, view results, compare versions
@@ -19,12 +19,20 @@ pip install -r requirements.txt
### 2. Run a Test
**Latency Test:**
```bash
python run_test.py \
--pcb-version "v2.1" \
--pcb-revision "A" \
python test_latency.py \
--serial-number "SN001234" \
--software-version "a3f2b1c" \
--notes "Replaced capacitor C5"
--comment "Replaced capacitor C5"
```
**Artifact Detection Test:**
```bash
python test_artifact_detection.py \
--serial-number "SN001234" \
--software-version "a3f2b1c" \
--comment "Baseline test"
```
### 3. View Results
@@ -43,10 +51,8 @@ python view_results.py test_results/*.yaml | tail -1
## Test Metrics
- **Latency**: Round-trip delay between input and output channels (ms)
- **THD**: Total Harmonic Distortion for input and output (%)
- **SNR**: Signal-to-Noise Ratio for input and output (dB)
Tests run at multiple frequencies: 100 Hz, 250 Hz, 500 Hz, 1 kHz, 2 kHz, 4 kHz, 8 kHz
- Average, minimum, maximum, and standard deviation across measurements
- Uses chirp signal for accurate cross-correlation measurement
## Output Format
@@ -56,27 +62,35 @@ Results are saved as YAML files in `test_results/`:
metadata:
test_id: 20260226_123456
timestamp: '2026-02-26T12:34:56.789012'
pcb_version: v2.1
pcb_revision: A
serial_number: SN001234
software_version: a3f2b1c
notes: Replaced capacitor C5
test_results:
- frequency_hz: 1000
latency_ms: 2.345
thd_input_percent: 0.012
thd_output_percent: 0.034
snr_input_db: 92.5
snr_output_db: 89.2
comment: Replaced capacitor C5
latency_test:
avg: 2.345
min: 2.201
max: 2.489
std: 0.087
```
## Configuration
Edit `config.yaml` to customize:
- Audio device settings
- Test frequencies
- Test duration
- Output options
```yaml
audio:
sample_rate: 44100
channels: 2
device_name: "Scarlett"
output:
results_dir: "test_results"
save_plots: true
```
The system auto-detects Focusrite Scarlett audio interfaces.
## Hardware Setup
```
@@ -84,19 +98,19 @@ Laptop <-> Audio Interface (Scarlett) <-> DUT <-> Audio Interface (Scarlett) <->
Output Channels 1&2 Input Channels 1&2
```
The system auto-detects Focusrite Scarlett audio interfaces.
## File Structure
```
closed_loop_audio_test_suite/
├── config.yaml # Test configuration
├── run_test.py # Main test runner
├── test_latency.py # Latency test runner
├── test_artifact_detection.py # Artifact detection test
├── view_results.py # Results viewer
├── src/
│ └── audio_tests.py # Core test functions
└── test_results/ # YAML output files
── YYYYMMDD_HHMMSS_results.yaml
── YYYYMMDD_HHMMSS_latency/
└── YYYYMMDD_HHMMSS_artifact_detection/
```
## Tips

View File

@@ -35,4 +35,20 @@ Play a sine in different frequencies, and for every frequency 5 sec long and do
Dont do fourier yet.
Do a simple project.
Do a simple project.
----
I want you to write a new test:
Put a 1khz sine into the system and record both channels for x seconds e.g. 60.
I want you to detect buzzing and other artifacts in the recording.
Give me a number how many artifacts you found.
Make the detection algorithm configurable, so we can try different approaches.
Again input it into the audio interface and measure both loopback and radio path like in the other test.

View File

@@ -7,7 +7,7 @@ audio:
test_tones:
frequencies: [100, 250, 500, 1000, 2000, 4000, 8000] # Hz
duration: 5.0 # seconds per frequency
duration: 10.0 # seconds per frequency
amplitude: 0.5 # 0.0 to 1.0
latency_runs: 5 # Number of latency measurements to average
@@ -15,3 +15,35 @@ output:
results_dir: "test_results"
save_plots: true
save_raw_audio: false
artifact_detection:
test_frequency: 1000 # Hz - Test tone frequency (for sine wave mode)
duration: 60.0 # seconds - Recording duration
amplitude: 0.5 # 0.0 to 1.0
startup_delay: 0 # seconds - Wait before starting recording to let system settle
# Chirp signal parameters (used when --signal-type chirp is specified)
chirp_f0: 100 # Hz - Chirp start frequency
chirp_f1: 8000 # Hz - Chirp end frequency
# NOTE: All detectors skip the first and last 1 second of recording to avoid startup/shutdown transients
detectors:
spectral_anomaly:
enabled: false # DISABLED - generates too many false positives, needs better algorithm
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
amplitude_spikes:
enabled: true
threshold_factor: 5.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
zero_crossing:
enabled: false
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
energy_variation:
enabled: true
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
latency:
max_std_dev_ms: 0.5 # Maximum allowed std deviation; test fails if exceeded
min_avg_ms: 1.0 # Minimum expected average latency; near-zero indicates bad loopback
latency_buildup:
measurement_interval: 10 # seconds between latency measurements
max_duration: null # maximum test duration in seconds (null = run until canceled)
buildup_threshold_percent: 5.0 # percentage change threshold for buildup detection

View File

@@ -1,10 +1,9 @@
metadata:
test_id: 20260226_123456
timestamp: '2026-02-26T12:34:56.789012'
pcb_version: v2.1
pcb_revision: A
serial_number: SN001234
software_version: a3f2b1c8d9e
notes: Baseline test with new capacitor values
comment: Baseline test with new capacitor values
test_results:
- frequency_hz: 100
latency_ms: 2.341

92
plot_alsa_status.py Normal file
View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""Parse ALSA status log file and plot avail value over time."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
def parse_log(log_path):
timestamps = []
avail_values = []
with open(log_path, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
# Check for timestamp line
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
# Check for avail line (only if we have a timestamp)
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
timestamps.append(current_timestamp)
avail_values.append(int(avail_match.group(1)))
current_timestamp = None # Reset until next timestamp
if not timestamps:
print("No valid timestamp/avail pairs found in the log file.", file=sys.stderr)
sys.exit(1)
# Convert to relative seconds from first timestamp
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, avail_values
def plot(seconds, avail_values, out_path):
plt.figure(figsize=(12, 6))
plt.plot(seconds, avail_values, label="avail", linewidth=1, alpha=0.7)
# Add moving average (windowed mean)
if len(avail_values) >= 10: # Only if we have enough data points
window_size = min(50, len(avail_values) // 10) # Adaptive window size
import numpy as np
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
# Adjust timestamps for the moving average (they align with window centers)
ma_seconds = seconds[window_size-1:]
plt.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})", linewidth=2)
plt.xlabel("Time (s)")
plt.ylabel("Available samples")
plt.title("ALSA Available Samples Over Time")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
print(f"Plot saved to {out_path}")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <path_to_alsa_status_log>", file=sys.stderr)
sys.exit(1)
log_path = sys.argv[1]
if not os.path.isfile(log_path):
print(f"File not found: {log_path}", file=sys.stderr)
sys.exit(1)
seconds, avail_values = parse_log(log_path)
log_dir = os.path.dirname(os.path.abspath(log_path))
log_base = os.path.splitext(os.path.basename(log_path))[0]
out_path = os.path.join(log_dir, f"{log_base}_avail_plot.png")
plot(seconds, avail_values, out_path)
if __name__ == "__main__":
main()

302
plot_combined.py Normal file
View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""Combine ALSA avail, perf metrics, and latency plots into one figure."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
# Regex patterns
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
PERF_RE = re.compile(
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
r".*?sample mean=([\d.]+)ms"
r".*?write mean=([\d.]+)ms"
r".*?loop mean=([\d.]+)ms"
)
LATENCY_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*latency.*?(\d+(?:\.\d+)?)ms")
PYALSA_AVAIL_BEFORE_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: avail before read: (\d+)")
PYALSA_AVAIL_AFTER_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: .* avail=(\d+)")
def parse_alsa_status(log_path):
timestamps = []
avail_values = []
with open(log_path, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
timestamps.append(current_timestamp)
avail_values.append(int(avail_match.group(1)))
current_timestamp = None
if not timestamps:
return [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, avail_values
def parse_perf_log(log_path):
timestamps = []
sample_means = []
write_means = []
loop_means = []
with open(log_path, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str, sample, write, loop = m.groups()
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
timestamps.append(ts)
sample_means.append(float(sample))
write_means.append(float(write))
loop_means.append(float(loop))
if not timestamps:
return [], [], [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, sample_means, write_means, loop_means
def parse_pyalsa_avail(perf_file):
"""Parse PyALSA avail before/after read from the perf log file."""
before_timestamps = []
before_values = []
after_timestamps = []
after_values = []
with open(perf_file, "r") as f:
for line in f:
line = line.strip()
# Check for "avail before read"
before_match = PYALSA_AVAIL_BEFORE_RE.match(line)
if before_match:
ts_str, avail = before_match.groups()
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
before_timestamps.append(ts)
before_values.append(int(avail))
continue
# Check for "avail=" (after read)
after_match = PYALSA_AVAIL_AFTER_RE.match(line)
if after_match:
ts_str, avail = after_match.groups()
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
after_timestamps.append(ts)
after_values.append(int(avail))
return before_timestamps, before_values, after_timestamps, after_values
def parse_latency_yaml(yaml_path):
import yaml
with open(yaml_path, 'r') as f:
data = yaml.safe_load(f)
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
timestamps = []
latencies = []
for measurement in latency_measurements:
ts_str = measurement['timestamp']
latency = measurement['latency_ms']
# Parse ISO format timestamp
ts = datetime.fromisoformat(ts_str)
timestamps.append(ts)
latencies.append(float(latency))
if not timestamps:
return [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, latencies
def plot_combined(alsa_file, perf_file, latency_file, out_path):
# Parse all logs
alsa_seconds, avail_values = parse_alsa_status(alsa_file)
perf_seconds, sample_means, write_means, loop_means = parse_perf_log(perf_file)
latency_seconds, latencies = parse_latency_yaml(latency_file)
# Parse PyALSA avail data
before_timestamps, before_values, after_timestamps, after_values = parse_pyalsa_avail(perf_file)
# Get absolute timestamps for proper alignment
alsa_timestamps = []
perf_timestamps = []
latency_timestamps = []
# Re-parse to get absolute timestamps for alignment
with open(alsa_file, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
alsa_timestamps.append(current_timestamp)
current_timestamp = None
with open(perf_file, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str = m.group(1)
# Add current year to the timestamp since it doesn't include year
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
perf_timestamps.append(ts)
import yaml
with open(latency_file, 'r') as f:
data = yaml.safe_load(f)
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
for measurement in latency_measurements:
ts_str = measurement['timestamp']
ts = datetime.fromisoformat(ts_str)
latency_timestamps.append(ts)
# Find earliest timestamp
all_abs_timestamps = []
if alsa_timestamps:
all_abs_timestamps.extend(alsa_timestamps)
if perf_timestamps:
all_abs_timestamps.extend(perf_timestamps)
if latency_timestamps:
all_abs_timestamps.extend(latency_timestamps)
if before_timestamps:
all_abs_timestamps.extend(before_timestamps)
if after_timestamps:
all_abs_timestamps.extend(after_timestamps)
t0_absolute = min(all_abs_timestamps)
# Convert all times to seconds from earliest timestamp
alsa_aligned = [(ts - t0_absolute).total_seconds() for ts in alsa_timestamps] if alsa_timestamps else []
perf_aligned = [(ts - t0_absolute).total_seconds() for ts in perf_timestamps] if perf_timestamps else []
latency_aligned = [(ts - t0_absolute).total_seconds() for ts in latency_timestamps] if latency_timestamps else []
before_aligned = [(ts - t0_absolute).total_seconds() for ts in before_timestamps] if before_timestamps else []
after_aligned = [(ts - t0_absolute).total_seconds() for ts in after_timestamps] if after_timestamps else []
# Create figure with 4 subplots sharing x-axis
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(14, 12), sharex=True)
fig.suptitle("Combined Audio Performance Metrics", fontsize=16)
# Plot 1: ALSA avail
if alsa_aligned and avail_values:
ax1.plot(alsa_aligned, avail_values, label="avail", linewidth=1, alpha=0.7, color='blue')
if len(avail_values) >= 10:
window_size = min(50, len(avail_values) // 10)
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
ma_seconds = alsa_aligned[window_size-1:]
ax1.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})",
linewidth=2, color='darkblue')
ax1.set_ylabel("Available samples")
ax1.set_title("ALSA Available Samples")
ax1.legend()
ax1.grid(True, alpha=0.3)
# Plot 2: Perf metrics
if perf_aligned:
ax2.plot(perf_aligned, sample_means, label="sample mean", linewidth=1, alpha=0.8, color='green')
ax2.plot(perf_aligned, write_means, label="write mean", linewidth=1, alpha=0.8, color='orange')
ax2.plot(perf_aligned, loop_means, label="loop mean", linewidth=1, alpha=0.8, color='red')
# Add moving average for loop mean
if len(loop_means) >= 10:
window_size = min(50, len(loop_means) // 10)
moving_avg = np.convolve(loop_means, np.ones(window_size)/window_size, mode='valid')
ma_seconds = perf_aligned[window_size-1:]
ax2.plot(ma_seconds, moving_avg, label=f"loop mean moving avg (window={window_size})",
linewidth=2, color='darkred', alpha=0.9)
ax2.set_ylabel("Duration (ms)")
ax2.set_title("Performance Metrics")
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: Latency
if latency_aligned:
ax3.plot(latency_aligned, latencies, label="latency", linewidth=1, color='purple')
ax3.set_ylabel("Latency (ms)")
ax3.set_title("Latency Buildup")
ax3.legend()
ax3.grid(True, alpha=0.3)
# Plot 4: PyALSA avail before/after read
if before_aligned and before_values:
ax4.plot(before_aligned, before_values, label="avail before read", linewidth=1, alpha=0.7, color='cyan')
if after_aligned and after_values:
ax4.plot(after_aligned, after_values, label="avail after read", linewidth=1, alpha=0.7, color='magenta')
ax4.set_xlabel("Time (s)")
ax4.set_ylabel("Available samples")
ax4.set_title("PyALSA Available Samples (Before/After Read)")
if before_aligned or after_aligned:
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(out_path, dpi=150, bbox_inches='tight')
print(f"Combined plot saved to {out_path}")
# Show interactive plot
plt.show()
def main():
if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} <alsa_status.log> <perf_log.log> <latency_results.yaml>", file=sys.stderr)
sys.exit(1)
alsa_file = sys.argv[1]
perf_file = sys.argv[2]
latency_file = sys.argv[3]
for file_path in [alsa_file, perf_file, latency_file]:
if not os.path.isfile(file_path):
print(f"File not found: {file_path}", file=sys.stderr)
sys.exit(1)
# Determine output path (same directory as first file)
log_dir = os.path.dirname(os.path.abspath(alsa_file))
out_path = os.path.join(log_dir, "combined_audio_plot.png")
plot_combined(alsa_file, perf_file, latency_file, out_path)
if __name__ == "__main__":
main()

81
plot_perf_log.py Normal file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Parse Perf lines from a log file and plot sample mean, write mean, and loop mean over time."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
PERF_RE = re.compile(
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
r".*?sample mean=([\d.]+)ms"
r".*?write mean=([\d.]+)ms"
r".*?loop mean=([\d.]+)ms"
)
def parse_log(log_path):
timestamps = []
sample_means = []
write_means = []
loop_means = []
with open(log_path, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str, sample, write, loop = m.groups()
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
timestamps.append(ts)
sample_means.append(float(sample))
write_means.append(float(write))
loop_means.append(float(loop))
if not timestamps:
print("No Perf lines found in the log file.", file=sys.stderr)
sys.exit(1)
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, sample_means, write_means, loop_means
def plot(seconds, sample_means, write_means, loop_means, out_path):
plt.figure(figsize=(12, 6))
plt.plot(seconds, sample_means, label="sample mean (ms)")
plt.plot(seconds, write_means, label="write mean (ms)")
plt.plot(seconds, loop_means, label="loop mean (ms)")
plt.xlabel("Time (s)")
plt.ylabel("Duration (ms)")
plt.title("Perf Metrics Over Time")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
print(f"Plot saved to {out_path}")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <path_to_log_file>", file=sys.stderr)
sys.exit(1)
log_path = sys.argv[1]
if not os.path.isfile(log_path):
print(f"File not found: {log_path}", file=sys.stderr)
sys.exit(1)
seconds, sample_means, write_means, loop_means = parse_log(log_path)
log_dir = os.path.dirname(os.path.abspath(log_path))
log_base = os.path.splitext(os.path.basename(log_path))[0]
out_path = os.path.join(log_dir, f"{log_base}_perf_plot.png")
plot(seconds, sample_means, write_means, loop_means, out_path)
if __name__ == "__main__":
main()

View File

@@ -235,11 +235,22 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool =
last_correlation = correlation
last_lags = lags
avg = float(np.mean(latencies))
std_dev = float(np.std(latencies))
latency_cfg = config.get('latency', {})
max_std_dev_ms = latency_cfg.get('max_std_dev_ms', None)
min_avg_ms = latency_cfg.get('min_avg_ms', None)
valid = True
if max_std_dev_ms is not None and std_dev > max_std_dev_ms:
valid = False
if min_avg_ms is not None and avg < min_avg_ms:
valid = False
latency_stats = {
'avg': float(np.mean(latencies)),
'avg': avg,
'min': float(np.min(latencies)),
'max': float(np.max(latencies)),
'std': float(np.std(latencies))
'std': std_dev,
'valid': valid
}
if save_plots and output_dir and last_recording is not None:
@@ -279,3 +290,512 @@ def plot_latency_test(channel_1: np.ndarray, channel_2: np.ndarray, correlation:
plot_file = output_dir / 'latency_chirp_analysis.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def detect_artifacts_spectral_anomaly(signal_data: np.ndarray, sample_rate: int,
fundamental_freq: float, threshold_db: float = -60) -> List[Dict]:
artifacts = []
window_size = int(sample_rate * 0.5)
hop_size = int(sample_rate * 0.25)
for i in range(0, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
fft = np.fft.rfft(segment)
freqs = np.fft.rfftfreq(len(segment), 1/sample_rate)
power_spectrum_db = 20 * np.log10(np.abs(fft) + 1e-10)
fundamental_idx = np.argmin(np.abs(freqs - fundamental_freq))
fundamental_power_db = power_spectrum_db[fundamental_idx]
expected_harmonics = set()
harmonic_tolerance_bins = 3
for n in range(1, 11):
harmonic_freq = n * fundamental_freq
if harmonic_freq < sample_rate / 2:
harmonic_idx = np.argmin(np.abs(freqs - harmonic_freq))
for offset in range(-harmonic_tolerance_bins, harmonic_tolerance_bins + 1):
if 0 <= harmonic_idx + offset < len(freqs):
expected_harmonics.add(harmonic_idx + offset)
noise_floor_db = np.percentile(power_spectrum_db[10:], 10)
unexpected_peaks = []
for idx in range(10, len(power_spectrum_db)):
if idx not in expected_harmonics:
if power_spectrum_db[idx] > noise_floor_db + abs(threshold_db):
unexpected_peaks.append((freqs[idx], power_spectrum_db[idx]))
if len(unexpected_peaks) >= 5:
artifacts.append({
'type': 'spectral_anomaly',
'time_sec': i / sample_rate,
'unexpected_frequencies': unexpected_peaks[:10],
'count': len(unexpected_peaks)
})
return artifacts
def detect_artifacts_amplitude_spikes(signal_data: np.ndarray, sample_rate: int,
threshold_factor: float = 3.0) -> List[Dict]:
artifacts = []
skip_samples = int(sample_rate * 1.0)
if len(signal_data) <= 2 * skip_samples:
return artifacts
envelope = np.abs(signal_data)
window_size = int(sample_rate * 0.01)
if window_size % 2 == 0:
window_size += 1
from scipy.ndimage import uniform_filter1d
envelope_smooth = uniform_filter1d(envelope, size=window_size, mode='reflect')
median_env = np.median(envelope_smooth)
mad = np.median(np.abs(envelope_smooth - median_env))
if mad == 0:
return artifacts
threshold_high = median_env + threshold_factor * mad * 1.4826
threshold_low = median_env - threshold_factor * mad * 1.4826
# Detect spikes (too high)
spike_indices = np.where(envelope_smooth > threshold_high)[0]
# Detect dropouts (too low)
dropout_indices = np.where(envelope_smooth < threshold_low)[0]
total_duration = len(signal_data) / sample_rate
# Process spikes
if len(spike_indices) > 0:
groups = []
current_group = [spike_indices[0]]
for idx in spike_indices[1:]:
if idx - current_group[-1] <= int(sample_rate * 0.05):
current_group.append(idx)
else:
groups.append(current_group)
current_group = [idx]
groups.append(current_group)
for group in groups:
peak_idx = group[np.argmax(envelope_smooth[group])]
time_sec = peak_idx / sample_rate
peak_value = envelope_smooth[peak_idx]
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
artifacts.append({
'type': 'amplitude_spike',
'time_sec': float(time_sec),
'peak_amplitude': float(peak_value),
'median_amplitude': float(median_env),
'deviation_factor': float((peak_value - median_env) / (mad * 1.4826)) if mad > 0 else 0
})
# Process dropouts
if len(dropout_indices) > 0:
groups = []
current_group = [dropout_indices[0]]
for idx in dropout_indices[1:]:
if idx - current_group[-1] <= int(sample_rate * 0.05):
current_group.append(idx)
else:
groups.append(current_group)
current_group = [idx]
groups.append(current_group)
for group in groups:
dropout_idx = group[np.argmin(envelope_smooth[group])]
time_sec = dropout_idx / sample_rate
dropout_value = envelope_smooth[dropout_idx]
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
artifacts.append({
'type': 'amplitude_dropout',
'time_sec': float(time_sec),
'dropout_amplitude': float(dropout_value),
'median_amplitude': float(median_env),
'deviation_factor': float((median_env - dropout_value) / (mad * 1.4826)) if mad > 0 else 0
})
return artifacts
def detect_artifacts_zero_crossing(signal_data: np.ndarray, sample_rate: int,
threshold_factor: float = 2.0) -> List[Dict]:
artifacts = []
if len(signal_data) <= int(sample_rate * 2.0):
return artifacts
window_size = int(sample_rate * 0.1)
hop_size = int(sample_rate * 0.05)
zcr_values = []
for i in range(0, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
zero_crossings = np.sum(np.abs(np.diff(np.sign(segment)))) / 2
zcr = zero_crossings / len(segment)
zcr_values.append((i, zcr))
if not zcr_values:
return artifacts
zcr_array = np.array([z[1] for z in zcr_values])
median_zcr = np.median(zcr_array)
std_zcr = np.std(zcr_array)
total_duration = len(signal_data) / sample_rate
for i, zcr in zcr_values:
time_sec = i / sample_rate
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
if std_zcr > 0 and abs(zcr - median_zcr) > threshold_factor * std_zcr:
artifacts.append({
'type': 'zero_crossing_anomaly',
'time_sec': float(time_sec),
'zcr_value': float(zcr),
'median_zcr': float(median_zcr),
'deviation_factor': float(abs(zcr - median_zcr) / std_zcr)
})
return artifacts
def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
threshold_db: float = 6.0) -> List[Dict]:
artifacts = []
if len(signal_data) <= int(sample_rate * 2.0):
return artifacts
window_size = int(sample_rate * 0.1)
hop_size = int(sample_rate * 0.05)
energy_values = []
for i in range(0, len(signal_data) - window_size, hop_size):
segment = signal_data[i:i+window_size]
energy = np.sum(segment**2)
energy_values.append((i, energy))
total_duration = len(signal_data) / sample_rate
for idx in range(1, len(energy_values)):
prev_energy = energy_values[idx-1][1]
curr_energy = energy_values[idx][1]
if prev_energy > 0 and curr_energy > 0:
energy_change_db = 10 * np.log10(curr_energy / prev_energy)
if abs(energy_change_db) > threshold_db:
time_sec = energy_values[idx][0] / sample_rate
# Skip artifacts in first and last second
if time_sec < 1.0 or time_sec > (total_duration - 1.0):
continue
artifacts.append({
'type': 'energy_variation',
'time_sec': float(time_sec),
'energy_change_db': float(energy_change_db),
'threshold_db': float(threshold_db)
})
return artifacts
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
expected_freq: float) -> Dict:
"""
Measure the actual dominant frequency in the signal and compare to expected.
Uses FFT on the full signal (skipping first and last second).
"""
# Skip first and last second
skip_samples = int(sample_rate * 1.0)
if len(signal_data) <= 2 * skip_samples:
return {
'expected_freq_hz': float(expected_freq),
'measured_freq_hz': 0.0,
'error_hz': 0.0,
'error_percent': 0.0
}
signal_trimmed = signal_data[skip_samples:-skip_samples]
# Perform FFT
fft = np.fft.rfft(signal_trimmed)
freqs = np.fft.rfftfreq(len(signal_trimmed), 1/sample_rate)
# Find the peak frequency
magnitude = np.abs(fft)
peak_idx = np.argmax(magnitude)
measured_freq = freqs[peak_idx]
# Calculate error
error_hz = measured_freq - expected_freq
error_percent = (error_hz / expected_freq) * 100.0 if expected_freq > 0 else 0.0
return {
'expected_freq_hz': float(expected_freq),
'measured_freq_hz': float(measured_freq),
'error_hz': float(error_hz),
'error_percent': float(error_percent)
}
def detect_artifacts_combined(signal_data: np.ndarray, sample_rate: int, fundamental_freq: float,
detector_config: Dict) -> Dict:
all_artifacts = []
if detector_config.get('spectral_anomaly', {}).get('enabled', True):
threshold = detector_config.get('spectral_anomaly', {}).get('threshold_db', -60)
artifacts = detect_artifacts_spectral_anomaly(signal_data, sample_rate, fundamental_freq, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('amplitude_spikes', {}).get('enabled', True):
threshold = detector_config.get('amplitude_spikes', {}).get('threshold_factor', 3.0)
artifacts = detect_artifacts_amplitude_spikes(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('zero_crossing', {}).get('enabled', True):
threshold = detector_config.get('zero_crossing', {}).get('threshold_factor', 2.0)
artifacts = detect_artifacts_zero_crossing(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
if detector_config.get('energy_variation', {}).get('enabled', True):
threshold = detector_config.get('energy_variation', {}).get('threshold_db', 6.0)
artifacts = detect_artifacts_energy_variation(signal_data, sample_rate, threshold)
all_artifacts.extend(artifacts)
# Measure frequency accuracy
freq_accuracy = measure_frequency_accuracy(signal_data, sample_rate, fundamental_freq)
artifact_summary = {
'total_count': len(all_artifacts),
'by_type': {},
'artifacts': all_artifacts,
'frequency_accuracy': freq_accuracy
}
for artifact in all_artifacts:
artifact_type = artifact['type']
if artifact_type not in artifact_summary['by_type']:
artifact_summary['by_type'][artifact_type] = 0
artifact_summary['by_type'][artifact_type] += 1
return artifact_summary
def plot_individual_anomaly(signal_data: np.ndarray, artifact: Dict, artifact_idx: int,
channel_name: str, frequency: float, sample_rate: int,
output_dir: Path):
periods_to_show = 20
period_samples = int(sample_rate / frequency)
total_samples = periods_to_show * period_samples
artifact_time = artifact['time_sec']
artifact_sample = int(artifact_time * sample_rate)
start_sample = max(0, artifact_sample - total_samples // 2)
end_sample = min(len(signal_data), artifact_sample + total_samples // 2)
if end_sample - start_sample < total_samples:
if start_sample == 0:
end_sample = min(len(signal_data), start_sample + total_samples)
else:
start_sample = max(0, end_sample - total_samples)
segment = signal_data[start_sample:end_sample]
time_ms = (np.arange(len(segment)) + start_sample) / sample_rate * 1000
fig, ax = plt.subplots(1, 1, figsize=(14, 6))
ax.plot(time_ms, segment, linewidth=1.0, color='blue', alpha=0.8)
ax.axvline(x=artifact_time * 1000, color='red', linestyle='--', linewidth=2,
label=f'Anomaly at {artifact_time:.3f}s', alpha=0.7)
ax.set_xlabel('Time (ms)', fontsize=11)
ax.set_ylabel('Amplitude', fontsize=11)
artifact_type = artifact['type'].replace('_', ' ').title()
ax.set_title(f'{channel_name} - {artifact_type} #{artifact_idx+1} (~{periods_to_show} periods @ {frequency}Hz)',
fontsize=12, fontweight='bold')
info_text = f"Type: {artifact_type}\nTime: {artifact_time:.3f}s"
if 'deviation_factor' in artifact:
info_text += f"\nDeviation: {artifact['deviation_factor']:.2f}σ"
if 'energy_change_db' in artifact:
info_text += f"\nEnergy Change: {artifact['energy_change_db']:.2f} dB"
if 'count' in artifact and artifact['type'] == 'spectral_anomaly':
info_text += f"\nUnexpected Peaks: {artifact['count']}"
ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
plt.tight_layout()
safe_type = artifact['type'].replace('_', '-')
plot_file = output_dir / f'{channel_name.lower().replace(" ", "_")}_anomaly_{artifact_idx+1:04d}_{safe_type}_{artifact_time:.3f}s.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def plot_artifact_detection(channel_1: np.ndarray, channel_2: np.ndarray,
artifacts_ch1: Dict, artifacts_ch2: Dict,
frequency: float, sample_rate: int, output_dir: Path):
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
time = np.arange(len(channel_1)) / sample_rate
axes[0, 0].plot(time, channel_1, alpha=0.7, linewidth=0.5)
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].set_title(f'Channel 1 (Loopback) - {artifacts_ch1["total_count"]} artifacts')
axes[0, 0].grid(True, alpha=0.3)
for artifact in artifacts_ch1['artifacts']:
axes[0, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
axes[1, 0].plot(time, channel_2, alpha=0.7, linewidth=0.5)
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Amplitude')
axes[1, 0].set_title(f'Channel 2 (DUT/Radio) - {artifacts_ch2["total_count"]} artifacts')
axes[1, 0].grid(True, alpha=0.3)
for artifact in artifacts_ch2['artifacts']:
axes[1, 0].axvline(x=artifact['time_sec'], color='r', alpha=0.3, linewidth=0.5)
fft_ch1 = np.fft.rfft(channel_1)
fft_ch2 = np.fft.rfft(channel_2)
freqs = np.fft.rfftfreq(len(channel_1), 1/sample_rate)
axes[0, 1].plot(freqs, 20*np.log10(np.abs(fft_ch1) + 1e-10), linewidth=0.5)
axes[0, 1].set_xlabel('Frequency (Hz)')
axes[0, 1].set_ylabel('Magnitude (dB)')
axes[0, 1].set_title('Channel 1 Spectrum')
axes[0, 1].set_xlim(0, min(10000, sample_rate/2))
axes[0, 1].grid(True, alpha=0.3)
axes[1, 1].plot(freqs, 20*np.log10(np.abs(fft_ch2) + 1e-10), linewidth=0.5)
axes[1, 1].set_xlabel('Frequency (Hz)')
axes[1, 1].set_ylabel('Magnitude (dB)')
axes[1, 1].set_title('Channel 2 Spectrum')
axes[1, 1].set_xlim(0, min(10000, sample_rate/2))
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plot_file = output_dir / f'artifact_detection_{frequency}Hz.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
import time
sample_rate = config['audio']['sample_rate']
duration = config['artifact_detection']['duration']
frequency = config['artifact_detection']['test_frequency']
amplitude = config['artifact_detection']['amplitude']
device_name = config['audio']['device_name']
channels = config['audio']['channels']
detector_config = config['artifact_detection']['detectors']
startup_delay = config['artifact_detection'].get('startup_delay', 10)
signal_type = config['artifact_detection'].get('signal_type', 'sine')
device_ids = find_audio_device(device_name)
if startup_delay > 0:
print(f"Waiting {startup_delay} seconds for system to settle...")
time.sleep(startup_delay)
print("Starting recording...")
if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000)
tone = generate_chirp(duration, sample_rate, f0=f0, f1=f1, amplitude=amplitude)
frequency = (f0 + f1) / 2
recording = play_and_record(tone, sample_rate, device_ids, channels)
elif signal_type == 'silent':
frequency = 1000
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
channels=channels, device=device_ids[0], blocking=True)
else:
tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
recording = play_and_record(tone, sample_rate, device_ids, channels)
channel_1 = recording[:, 0]
channel_2 = recording[:, 1]
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
if save_plots and output_dir:
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
frequency, sample_rate, output_dir)
anomalies_dir = output_dir / 'individual_anomalies'
anomalies_dir.mkdir(exist_ok=True)
print(f"\nPlotting individual anomalies to: {anomalies_dir}")
for idx, artifact in enumerate(artifacts_ch1['artifacts']):
plot_individual_anomaly(channel_1, artifact, idx, 'Channel 1 Loopback',
frequency, sample_rate, anomalies_dir)
for idx, artifact in enumerate(artifacts_ch2['artifacts']):
plot_individual_anomaly(channel_2, artifact, idx, 'Channel 2 DUT',
frequency, sample_rate, anomalies_dir)
total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts'])
if total_anomaly_plots > 0:
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
result = {
'signal_type': signal_type,
'duration_sec': float(duration),
'channel_1_loopback': {
'total_artifacts': artifacts_ch1['total_count'],
'artifacts_by_type': artifacts_ch1['by_type'],
'artifact_rate_per_minute': float(artifacts_ch1['total_count'] / duration * 60),
'frequency_accuracy': artifacts_ch1['frequency_accuracy']
},
'channel_2_dut': {
'total_artifacts': artifacts_ch2['total_count'],
'artifacts_by_type': artifacts_ch2['by_type'],
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
'frequency_accuracy': artifacts_ch2['frequency_accuracy']
},
'detector_config': detector_config
}
if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000)
result['chirp_f0_hz'] = int(f0)
result['chirp_f1_hz'] = int(f1)
elif signal_type == 'silent':
result['note'] = 'Silent mode - no playback, noise floor measurement'
else:
result['test_frequency_hz'] = int(frequency)
return result

182
test_artifact_detection.py Executable file
View File

@@ -0,0 +1,182 @@
#!/usr/bin/env python3
import argparse
import yaml
from datetime import datetime
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_artifact_detection_test
def main():
parser = argparse.ArgumentParser(description='Run artifact detection test on audio loopback and radio path')
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
parser.add_argument('--duration', type=float, help='Override recording duration in seconds (default from config)')
parser.add_argument('--frequency', type=float, help='Override test frequency in Hz (default from config)')
parser.add_argument('--signal-type', choices=['sine', 'chirp', 'silent'], default='sine',
help='Signal type: sine (single frequency), chirp (frequency sweep), or silent (no signal)')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
if args.duration:
config['artifact_detection']['duration'] = args.duration
if args.frequency:
config['artifact_detection']['test_frequency'] = args.frequency
config['artifact_detection']['signal_type'] = args.signal_type
timestamp = datetime.now()
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_artifact_detection"
test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False)
print("=" * 70)
print("ARTIFACT DETECTION TEST")
print("=" * 70)
print(f"Test ID: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Duration: {config['artifact_detection']['duration']} seconds")
signal_type = config['artifact_detection'].get('signal_type', 'sine')
if signal_type == 'sine':
print(f"Signal Type: Sine wave @ {config['artifact_detection']['test_frequency']} Hz")
elif signal_type == 'chirp':
print(f"Signal Type: Chirp (100 Hz - 8000 Hz)")
else:
print(f"Signal Type: Silent (no playback - noise floor measurement)")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 70)
print("\nDetection Algorithms:")
for detector_name, detector_settings in config['artifact_detection']['detectors'].items():
status = "ENABLED" if detector_settings.get('enabled', False) else "DISABLED"
print(f" - {detector_name}: {status}")
if detector_settings.get('enabled', False):
for param, value in detector_settings.items():
if param != 'enabled':
print(f" {param}: {value}")
print("\n" + "=" * 70)
signal_type = config['artifact_detection'].get('signal_type', 'sine')
if signal_type == 'sine':
freq = config['artifact_detection']['test_frequency']
print(f"STARTING TEST - Playing {freq}Hz sine wave and recording both channels...")
elif signal_type == 'chirp':
print("STARTING TEST - Playing chirp signal (100-8000Hz) and recording both channels...")
else:
print("STARTING TEST - Recording silence (no playback)...")
print("=" * 70)
print("\nChannel 1: Loopback path (direct audio interface loopback)")
print("Channel 2: DUT/Radio path (through beacon and radio transmission)")
print()
try:
result = run_artifact_detection_test(config, save_plots=save_plots, output_dir=test_output_dir)
print("\n" + "=" * 70)
print("TEST COMPLETE - RESULTS")
print("=" * 70)
signal_type = result.get('signal_type', 'sine')
if signal_type == 'chirp':
print(f"\n📊 Signal: Chirp {result['chirp_f0_hz']} Hz → {result['chirp_f1_hz']} Hz")
elif signal_type == 'silent':
print(f"\n📊 Signal: Silent (no playback - noise floor measurement)")
else:
print(f"\n📊 Test Frequency: {result['test_frequency_hz']} Hz")
print(f"⏱️ Duration: {result['duration_sec']} seconds")
print("\n🔊 CHANNEL 1 (LOOPBACK PATH):")
print(f" Total Artifacts: {result['channel_1_loopback']['total_artifacts']}")
print(f" Artifact Rate: {result['channel_1_loopback']['artifact_rate_per_minute']:.2f} per minute")
if result['channel_1_loopback']['artifacts_by_type']:
print(" By Type:")
for artifact_type, count in result['channel_1_loopback']['artifacts_by_type'].items():
print(f" - {artifact_type}: {count}")
# Display frequency accuracy for channel 1
if 'frequency_accuracy' in result['channel_1_loopback']:
freq_acc = result['channel_1_loopback']['frequency_accuracy']
print(f" Frequency Accuracy:")
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
print("\n📻 CHANNEL 2 (DUT/RADIO PATH):")
print(f" Total Artifacts: {result['channel_2_dut']['total_artifacts']}")
print(f" Artifact Rate: {result['channel_2_dut']['artifact_rate_per_minute']:.2f} per minute")
if result['channel_2_dut']['artifacts_by_type']:
print(" By Type:")
for artifact_type, count in result['channel_2_dut']['artifacts_by_type'].items():
print(f" - {artifact_type}: {count}")
# Display frequency accuracy for channel 2
if 'frequency_accuracy' in result['channel_2_dut']:
freq_acc = result['channel_2_dut']['frequency_accuracy']
print(f" Frequency Accuracy:")
print(f" Expected: {freq_acc['expected_freq_hz']:.1f} Hz")
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
ch1_count = result['channel_1_loopback']['total_artifacts']
ch2_count = result['channel_2_dut']['total_artifacts']
if ch2_count > ch1_count:
delta = ch2_count - ch1_count
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
elif ch1_count == ch2_count == 0:
print("\n✅ EXCELLENT: No artifacts detected in either path!")
else:
print(f"\n Loopback baseline: {ch1_count} artifacts")
except Exception as e:
print(f"\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
result = {
'error': str(e),
'test_frequency_hz': config['artifact_detection']['test_frequency'],
'duration_sec': config['artifact_detection']['duration']
}
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'serial_number': args.serial_number,
'software_version': args.software_version,
'comment': args.comment
},
'artifact_detection_result': result
}
output_file = test_output_dir / f"{test_id}_artifact_detection_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
print("\n" + "=" * 70)
print("✅ Results saved to:")
print(f" YAML: {output_file}")
if save_plots:
print(f" Summary plots: {test_output_dir}/")
print(f" Individual anomaly plots: {test_output_dir}/individual_anomalies/")
print("=" * 70)
if __name__ == '__main__':
main()

View File

@@ -6,16 +6,16 @@ from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_single_test, run_latency_test
from src.audio_tests import run_latency_test
def main():
parser = argparse.ArgumentParser(description='Run PCB hardware audio tests')
parser.add_argument('--pcb-version', required=True, help='PCB version (e.g., v2.1)')
parser.add_argument('--pcb-revision', required=True, help='PCB revision (e.g., A, B, C)')
parser = argparse.ArgumentParser(description='Run latency test on audio loopback and radio path')
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--notes', default='', help='Adjustments or comments about this test')
parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
parser.add_argument('--measurements', type=int, default=5, help='Number of latency measurements (default: 5)')
args = parser.parse_args()
@@ -26,61 +26,47 @@ def main():
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / test_id
test_output_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency"
test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False)
print(f"Starting audio test run: {test_id}")
print(f"PCB: {args.pcb_version} Rev {args.pcb_revision}")
print(f"Starting latency test: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Measurements: {args.measurements}")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 60)
print("\n[1/2] Running chirp-based latency test (5 measurements)...")
print(f"\nRunning chirp-based latency test ({args.measurements} measurements)...")
try:
latency_stats = run_latency_test(config, num_measurements=5,
latency_stats = run_latency_test(config, num_measurements=args.measurements,
save_plots=save_plots, output_dir=test_output_dir)
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, "
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms")
valid = latency_stats.get('valid', True)
status = "PASS" if valid else "FAIL"
print(f"{'' if valid else ''} Latency [{status}]: avg={latency_stats['avg']:.3f}ms, "
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
f"std={latency_stats['std']:.3f}ms")
except Exception as e:
print(f"✗ Error: {e}")
latency_stats = {'avg': 0.0, 'min': 0.0, 'max': 0.0, 'std': 0.0, 'error': str(e)}
print("\n[2/2] Running frequency sweep tests...")
test_results = []
frequencies = config['test_tones']['frequencies']
for i, freq in enumerate(frequencies, 1):
print(f"Testing frequency {i}/{len(frequencies)}: {freq} Hz...", end=' ', flush=True)
try:
result = run_single_test(freq, config, save_plots=save_plots, output_dir=test_output_dir)
test_results.append(result)
print("")
except Exception as e:
print(f"✗ Error: {e}")
test_results.append({
'frequency_hz': freq,
'error': str(e)
})
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'pcb_version': args.pcb_version,
'pcb_revision': args.pcb_revision,
'serial_number': args.serial_number,
'software_version': args.software_version,
'notes': args.notes
'comment': args.comment
},
'latency_test': latency_stats,
'test_results': test_results
'latency_test': latency_stats
}
output_file = results_dir / f"{test_id}_results.yaml"
output_file = test_output_dir / f"{test_id}_latency_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)

361
test_latency_buildup.py Executable file
View File

@@ -0,0 +1,361 @@
#!/usr/bin/env python3
import argparse
import yaml
import time
import signal
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_latency_test, find_audio_device, generate_chirp, play_and_record, calculate_latency
class LatencyBuildupTest:
def __init__(self, config, measurement_interval=30, max_duration=None):
self.config = config
self.measurement_interval = measurement_interval
self.max_duration = max_duration
self.running = False
self.measurements = []
self.start_time = None
def signal_handler(self, signum, frame):
print(f"\n\n{'='*70}")
print("TEST STOPPED - Generating final results...")
print(f"{'='*70}")
self.running = False
def run_single_latency_measurement(self):
"""Run a single latency measurement and return the result"""
try:
# Use existing latency test function with 1 measurement for speed
latency_stats = run_latency_test(self.config, num_measurements=1,
save_plots=False, output_dir=None)
return latency_stats['avg']
except Exception as e:
print(f"❌ Error in latency measurement: {e}")
return None
def analyze_buildup(self, latencies, timestamps):
"""Analyze latency build-up and return analysis results"""
if len(latencies) < 2:
return {
'buildup_detected': False,
'start_latency': 0,
'end_latency': 0,
'change_ms': 0,
'change_percent': 0,
'trend': 'insufficient_data'
}
start_latency = latencies[0]
end_latency = latencies[-1]
change_ms = end_latency - start_latency
change_percent = (change_ms / start_latency) * 100 if start_latency > 0 else 0
# Determine if buildup occurred (±5% threshold)
buildup_detected = abs(change_percent) > 5.0
# Calculate trend using linear regression
if len(latencies) >= 3:
x = np.arange(len(latencies))
y = np.array(latencies)
slope = np.polyfit(x, y, 1)[0]
if slope > 0.01: # Positive trend
trend = 'increasing'
elif slope < -0.01: # Negative trend
trend = 'decreasing'
else:
trend = 'stable'
else:
trend = 'insufficient_data'
return {
'buildup_detected': buildup_detected,
'start_latency': start_latency,
'end_latency': end_latency,
'change_ms': change_ms,
'change_percent': change_percent,
'trend': trend
}
def plot_latency_buildup(self, timestamps, latencies, output_dir):
"""Create and save latency over time plot"""
fig, ax = plt.subplots(1, 1, figsize=(12, 6))
# Convert timestamps to relative time in seconds
relative_times = [(t - timestamps[0]).total_seconds() for t in timestamps]
# Plot latency measurements
ax.plot(relative_times, latencies, 'b-o', markersize=4, linewidth=2, label='Latency Measurements')
# Add trend line if we have enough data
if len(latencies) >= 3:
x = np.array(relative_times)
y = np.array(latencies)
z = np.polyfit(x, y, 1)
p = np.poly1d(z)
ax.plot(x, p(x), "r--", alpha=0.8, linewidth=2, label=f'Trend: {z[0]:.4f} ms/s')
# Add reference line for start latency
ax.axhline(y=latencies[0], color='g', linestyle=':', alpha=0.7,
label=f'Start: {latencies[0]:.3f} ms')
ax.set_xlabel('Time (seconds)', fontsize=12)
ax.set_ylabel('Latency (ms)', fontsize=12)
ax.set_title('Latency Build-up Over Time', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
# Format y-axis to show reasonable precision
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:.3f}'))
plt.tight_layout()
plot_file = output_dir / 'latency_buildup_graph.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
return plot_file
def run_test(self):
"""Run the latency build-up test"""
self.running = True
self.start_time = datetime.now()
print(f"Starting latency build-up test at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Measurement interval: {self.measurement_interval} seconds")
if self.max_duration:
print(f"Maximum duration: {self.max_duration} seconds")
print("Press Ctrl+C to stop the test early")
print("=" * 70)
# Set up signal handler for graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
measurement_count = 0
while self.running:
current_time = datetime.now()
measurement_count += 1
print(f"\n[{current_time.strftime('%H:%M:%S')}] Measurement #{measurement_count}")
# Perform latency measurement
latency = self.run_single_latency_measurement()
if latency is not None:
self.measurements.append((current_time, latency))
timestamps, latencies = zip(*self.measurements)
# Calculate current statistics
avg_latency = np.mean(latencies)
min_latency = np.min(latencies)
max_latency = np.max(latencies)
std_latency = np.std(latencies)
print(f" Current latency: {latency:.3f} ms")
print(f" Average so far: {avg_latency:.3f} ms")
print(f" Range: {min_latency:.3f} - {max_latency:.3f} ms")
print(f" Std deviation: {std_latency:.3f} ms")
# Analyze for buildup
analysis = self.analyze_buildup(latencies, timestamps)
if analysis['buildup_detected']:
print(f" ⚠️ BUILDUP DETECTED: {analysis['change_percent']:+.2f}% "
f"({analysis['change_ms']:+.3f} ms)")
else:
print(f" ✅ No significant buildup: {analysis['change_percent']:+.2f}%")
print(f" Trend: {analysis['trend']}")
else:
print(f" ❌ Measurement failed")
# Check if we should continue
if self.max_duration:
elapsed = (current_time - self.start_time).total_seconds()
if elapsed >= self.max_duration:
print(f"\nMaximum duration of {self.max_duration} seconds reached")
break
if not self.running:
break
# Wait for next measurement with interruptible sleep
if self.running:
print(f" Waiting {self.measurement_interval} seconds...")
# Sleep in smaller chunks to allow quick interruption
sleep_chunk = 1.0 # Check every second
time_slept = 0
while self.running and time_slept < self.measurement_interval:
time.sleep(min(sleep_chunk, self.measurement_interval - time_slept))
time_slept += sleep_chunk
return self.generate_results()
def generate_results(self):
"""Generate final results and analysis"""
if not self.measurements:
return {'error': 'No measurements completed'}
timestamps, latencies = zip(*self.measurements)
end_time = datetime.now()
total_duration = (end_time - self.start_time).total_seconds()
# Final analysis
analysis = self.analyze_buildup(latencies, timestamps)
# Statistics
stats = {
'count': len(latencies),
'avg_ms': float(np.mean(latencies)),
'min_ms': float(np.min(latencies)),
'max_ms': float(np.max(latencies)),
'std_ms': float(np.std(latencies)),
'range_ms': float(np.max(latencies) - np.min(latencies))
}
results = {
'test_metadata': {
'start_time': self.start_time.isoformat(),
'end_time': end_time.isoformat(),
'total_duration_sec': total_duration,
'measurement_interval_sec': self.measurement_interval,
'total_measurements': len(latencies)
},
'latency_measurements': [
{
'timestamp': t.isoformat(),
'latency_ms': float(l)
}
for t, l in self.measurements
],
'statistics': stats,
'buildup_analysis': analysis
}
return results
def main():
parser = argparse.ArgumentParser(description='Run latency build-up test over time')
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
parser.add_argument('--interval', type=int, help='Measurement interval in seconds (default from config)')
parser.add_argument('--duration', type=int, help='Maximum test duration in seconds (default: run until canceled)')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
# Use config values as defaults if not overridden by command line
measurement_interval = args.interval if args.interval else config['latency_buildup']['measurement_interval']
max_duration = args.duration if args.duration else config['latency_buildup'].get('max_duration')
timestamp = datetime.now()
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency_buildup"
test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False)
print("=" * 70)
print("LATENCY BUILD-UP TEST")
print("=" * 70)
print(f"Test ID: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Measurement Interval: {measurement_interval} seconds")
if max_duration:
print(f"Maximum Duration: {max_duration} seconds")
else:
print("Duration: Run until canceled (Ctrl+C)")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 70)
# Create and run test
test = LatencyBuildupTest(config, measurement_interval=measurement_interval, max_duration=max_duration)
results = test.run_test()
# Display final results
print("\n" + "=" * 70)
print("TEST COMPLETE - FINAL RESULTS")
print("=" * 70)
if 'error' in results:
print(f"❌ Test failed: {results['error']}")
else:
metadata = results['test_metadata']
stats = results['statistics']
analysis = results['buildup_analysis']
print(f"\n📊 Test Summary:")
print(f" Duration: {metadata['total_duration_sec']:.1f} seconds")
print(f" Measurements: {metadata['total_measurements']}")
print(f" Interval: {metadata['measurement_interval_sec']} seconds")
print(f"\n⏱️ Latency Statistics:")
print(f" Average: {stats['avg_ms']:.3f} ms")
print(f" Range: {stats['min_ms']:.3f} - {stats['max_ms']:.3f} ms")
print(f" Std Dev: {stats['std_ms']:.3f} ms")
print(f"\n📈 Build-up Analysis:")
print(f" Start Latency: {analysis['start_latency']:.3f} ms")
print(f" End Latency: {analysis['end_latency']:.3f} ms")
print(f" Change: {analysis['change_ms']:+.3f} ms ({analysis['change_percent']:+.2f}%)")
print(f" Trend: {analysis['trend']}")
if analysis['buildup_detected']:
print(f"\n⚠️ BUILDUP DETECTED!")
print(f" Latency changed by {abs(analysis['change_percent']):.2f}% (threshold: ±5%)")
else:
print(f"\n✅ No significant buildup detected")
print(f" Latency change within acceptable range (±5%)")
# Generate and save plot
if save_plots and len(results['latency_measurements']) > 1:
timestamps = [datetime.fromisoformat(m['timestamp']) for m in results['latency_measurements']]
latencies = [m['latency_ms'] for m in results['latency_measurements']]
plot_file = test.plot_latency_buildup(timestamps, latencies, test_output_dir)
print(f"\n📊 Latency graph saved to: {plot_file}")
# Save results to file
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'serial_number': args.serial_number,
'software_version': args.software_version,
'comment': args.comment
},
'latency_buildup_result': results
}
output_file = test_output_dir / f"{test_id}_latency_buildup_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
print("\n" + "=" * 70)
print("✅ Results saved to:")
print(f" YAML: {output_file}")
if save_plots and len(results.get('latency_measurements', [])) > 1:
print(f" Graph: {test_output_dir}/latency_buildup_graph.png")
print("=" * 70)
if __name__ == '__main__':
main()

View File

@@ -88,7 +88,7 @@ def display_results(yaml_file: Path):
def list_all_results(results_dir: Path):
yaml_files = sorted(results_dir.glob("*_results.yaml"))
yaml_files = sorted(results_dir.rglob("*_results.yaml"))
if not yaml_files:
print("No test results found.")