Compare commits
8 Commits
8d3a144614
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 530d58440f | |||
| 8f44cf56d4 | |||
| fdfe43b6b9 | |||
|
|
cc8766b278 | ||
|
|
0c7de92ae9 | ||
|
|
dd118ddb23 | ||
|
|
31cc2c0e92 | ||
|
|
2cab55c8cd |
@@ -52,3 +52,72 @@ Again input it into the audio interface and measure both loopback and radio path
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
============
|
||||||
|
|
||||||
|
Implement Matrix test
|
||||||
|
|
||||||
|
|
||||||
|
Test:
|
||||||
|
|
||||||
|
Fast / Robust
|
||||||
|
|
||||||
|
16k / 24k / 48k
|
||||||
|
|
||||||
|
Mono / Stereo
|
||||||
|
|
||||||
|
Presentation Delay 10 / 20 / 40 / 80
|
||||||
|
|
||||||
|
|
||||||
|
For each combination test:
|
||||||
|
|
||||||
|
Latency
|
||||||
|
|
||||||
|
Latency buildup yes/no
|
||||||
|
|
||||||
|
Maybe: Audio quality BUT this way test gets really long.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Plot a table with the results, also compare to 'baseline' measurement.
|
||||||
|
|
||||||
|
Use the existing tests as a guideline how to save the results.
|
||||||
|
|
||||||
|
For setting the parameters for the tests use the API:
|
||||||
|
http://beacon29.local:5000/init
|
||||||
|
curl -X 'POST' \ 'http://beacon29.local:5000/init' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "qos_config": { "iso_int_multiple_10ms": 1, "number_of_retransmissions": 2, "max_transport_latency_ms": 23 }, "debug": false, "device_name": "Auracaster", "transport": "", "auracast_device_address": "F0:F1:F2:F3:F4:F5", "auracast_sampling_rate_hz": 16000, "octets_per_frame": 160, "frame_duration_us": 10000, "presentation_delay_us": 10000, "manufacturer_data": [ null, null ], "immediate_rendering": false, "assisted_listening_stream": false, "bigs": [ { "id": 12, "random_address": "F1:F1:F2:F3:F4:F5", "language": "deu", "name": "Broadcast0", "program_info": "Vorlesung DE", "audio_source": "device:ch1", "input_format": "auto", "loop": true, "precode_wav": false, "iso_que_len": 1, "num_bis": 1, "input_gain_db": 0 } ], "analog_gain": 50 }'
|
||||||
|
|
||||||
|
It has to have the name Broadcast0.
|
||||||
|
qos fast is "number_of_retransmissions": 2, "max_transport_latency_ms": 23
|
||||||
|
qos robust is "number_of_retransmissions": 4, "max_transport_latency_ms": 43
|
||||||
|
Mono is "num_bis": 1
|
||||||
|
Stereo is "num_bis": 2
|
||||||
|
16k is "auracast_sampling_rate_hz": 16000, "octets_per_frame": 40
|
||||||
|
24k is "auracast_sampling_rate_hz": 24000, "octets_per_frame": 60
|
||||||
|
48k is "auracast_sampling_rate_hz": 48000, "octets_per_frame": 120
|
||||||
|
|
||||||
|
The results shall be plotted as a table:
|
||||||
|
Presentation delay 10 / 20 / 40 /80
|
||||||
|
Mono Stereo Mono Stereo Mono Stereo ...
|
||||||
|
x
|
||||||
|
Fast 16k
|
||||||
|
Fast 24k
|
||||||
|
Fast 48k
|
||||||
|
Robust 16k
|
||||||
|
Robust 24k
|
||||||
|
Robust 48k
|
||||||
|
|
||||||
|
For each combination you have to run the latency test. If the test fails print fail. Else print the ms value.
|
||||||
|
Optional: Also run the build up test for 20 secs. As a result just print if there is a buildup or not.
|
||||||
|
Optional: Also run the quality test for 3 min per combination and display the err/min.
|
||||||
|
|
||||||
|
The result shall be saved as a yaml (like in all the other scripts).
|
||||||
|
Important to save the API call aswell.
|
||||||
|
|
||||||
|
And create an image with the table.
|
||||||
|
|
||||||
|
There should be a feature to compare this measurement to a 'baseline' measurement.
|
||||||
|
Failed tests should be colored red.
|
||||||
|
Tests significantly worse than the baseline in orange.
|
||||||
|
And better values in green.
|
||||||
|
No change should be just white.
|
||||||
27
config.yaml
27
config.yaml
@@ -31,13 +31,38 @@ artifact_detection:
|
|||||||
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
|
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
|
||||||
amplitude_spikes:
|
amplitude_spikes:
|
||||||
enabled: true
|
enabled: true
|
||||||
threshold_factor: 5.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
|
threshold_factor: 10.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
|
||||||
zero_crossing:
|
zero_crossing:
|
||||||
enabled: false
|
enabled: false
|
||||||
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
|
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
|
||||||
energy_variation:
|
energy_variation:
|
||||||
enabled: true
|
enabled: true
|
||||||
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
|
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
|
||||||
|
null_test:
|
||||||
|
enabled: true
|
||||||
|
# Align Ch2 (DUT) to Ch1 (Loopback), subtract, detect bursts in residual
|
||||||
|
max_lag_ms: 500.0 # Maximum expected delay between channels for alignment search
|
||||||
|
window_ms: 5.0 # Short-time RMS window length for burst detection
|
||||||
|
threshold_factor: 6.0 # Flag windows where residual RMS exceeds baseline × this factor
|
||||||
|
min_burst_ms: 0.5 # Minimum burst duration to report (filters out single-sample spikes)
|
||||||
|
sample_slip_detection: true
|
||||||
|
sample_slip_window_ms: 50.0 # Correlation window for xcorr-based lag tracking (fallback only)
|
||||||
|
# Sync marker: one chirp burst at the start and one at the end of the played signal.
|
||||||
|
# Marker-based alignment is immune to periodic-signal ambiguity (e.g. pure sine).
|
||||||
|
# Sample slip detection compares the lag at the start marker vs. the end marker.
|
||||||
|
marker_duration_sec: 0.05 # Length of each chirp marker burst
|
||||||
|
marker_first_offset_sec: 0.5 # Offset from signal start (and from signal end) for markers
|
||||||
|
marker_f0: 200.0 # Marker chirp start frequency (Hz)
|
||||||
|
marker_f1: 16000.0 # Marker chirp end frequency (Hz) — wider BW = sharper correlation peak
|
||||||
|
marker_amplitude: 0.7 # Marker amplitude (mixed on top of test tone)
|
||||||
|
# Sample slip threshold: only report if inter-marker lag deviates from median by >= this many samples.
|
||||||
|
# Rule of thumb: peak timing precision ≈ 1 / (marker_f1 - marker_f0) * sample_rate
|
||||||
|
# With 200-16000 Hz BW at 44100 Hz: precision ≈ 3 samples → min_slip_samples = 5 gives good margin.
|
||||||
|
min_slip_samples: 5
|
||||||
|
|
||||||
|
latency:
|
||||||
|
max_std_dev_ms: 1.0 # Maximum allowed std deviation; test fails if exceeded
|
||||||
|
min_avg_ms: 1.0 # Minimum expected average latency; near-zero indicates bad loopback
|
||||||
|
|
||||||
latency_buildup:
|
latency_buildup:
|
||||||
measurement_interval: 10 # seconds between latency measurements
|
measurement_interval: 10 # seconds between latency measurements
|
||||||
|
|||||||
92
plot_alsa_status.py
Normal file
92
plot_alsa_status.py
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Parse ALSA status log file and plot avail value over time."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
|
||||||
|
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
|
||||||
|
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_log(log_path):
|
||||||
|
timestamps = []
|
||||||
|
avail_values = []
|
||||||
|
|
||||||
|
with open(log_path, "r") as f:
|
||||||
|
current_timestamp = None
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
|
||||||
|
# Check for timestamp line
|
||||||
|
ts_match = TIMESTAMP_RE.match(line)
|
||||||
|
if ts_match:
|
||||||
|
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for avail line (only if we have a timestamp)
|
||||||
|
if current_timestamp:
|
||||||
|
avail_match = AVAIL_RE.match(line)
|
||||||
|
if avail_match:
|
||||||
|
timestamps.append(current_timestamp)
|
||||||
|
avail_values.append(int(avail_match.group(1)))
|
||||||
|
current_timestamp = None # Reset until next timestamp
|
||||||
|
|
||||||
|
if not timestamps:
|
||||||
|
print("No valid timestamp/avail pairs found in the log file.", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Convert to relative seconds from first timestamp
|
||||||
|
t0 = timestamps[0]
|
||||||
|
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||||
|
return seconds, avail_values
|
||||||
|
|
||||||
|
|
||||||
|
def plot(seconds, avail_values, out_path):
|
||||||
|
plt.figure(figsize=(12, 6))
|
||||||
|
plt.plot(seconds, avail_values, label="avail", linewidth=1, alpha=0.7)
|
||||||
|
|
||||||
|
# Add moving average (windowed mean)
|
||||||
|
if len(avail_values) >= 10: # Only if we have enough data points
|
||||||
|
window_size = min(50, len(avail_values) // 10) # Adaptive window size
|
||||||
|
import numpy as np
|
||||||
|
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
|
||||||
|
# Adjust timestamps for the moving average (they align with window centers)
|
||||||
|
ma_seconds = seconds[window_size-1:]
|
||||||
|
plt.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})", linewidth=2)
|
||||||
|
|
||||||
|
plt.xlabel("Time (s)")
|
||||||
|
plt.ylabel("Available samples")
|
||||||
|
plt.title("ALSA Available Samples Over Time")
|
||||||
|
plt.legend()
|
||||||
|
plt.grid(True)
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.savefig(out_path, dpi=150)
|
||||||
|
print(f"Plot saved to {out_path}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print(f"Usage: {sys.argv[0]} <path_to_alsa_status_log>", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
log_path = sys.argv[1]
|
||||||
|
if not os.path.isfile(log_path):
|
||||||
|
print(f"File not found: {log_path}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
seconds, avail_values = parse_log(log_path)
|
||||||
|
|
||||||
|
log_dir = os.path.dirname(os.path.abspath(log_path))
|
||||||
|
log_base = os.path.splitext(os.path.basename(log_path))[0]
|
||||||
|
out_path = os.path.join(log_dir, f"{log_base}_avail_plot.png")
|
||||||
|
|
||||||
|
plot(seconds, avail_values, out_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
302
plot_combined.py
Normal file
302
plot_combined.py
Normal file
@@ -0,0 +1,302 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Combine ALSA avail, perf metrics, and latency plots into one figure."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
# Regex patterns
|
||||||
|
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
|
||||||
|
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
|
||||||
|
PERF_RE = re.compile(
|
||||||
|
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
|
||||||
|
r".*?sample mean=([\d.]+)ms"
|
||||||
|
r".*?write mean=([\d.]+)ms"
|
||||||
|
r".*?loop mean=([\d.]+)ms"
|
||||||
|
)
|
||||||
|
LATENCY_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*latency.*?(\d+(?:\.\d+)?)ms")
|
||||||
|
PYALSA_AVAIL_BEFORE_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: avail before read: (\d+)")
|
||||||
|
PYALSA_AVAIL_AFTER_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: .* avail=(\d+)")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_alsa_status(log_path):
|
||||||
|
timestamps = []
|
||||||
|
avail_values = []
|
||||||
|
|
||||||
|
with open(log_path, "r") as f:
|
||||||
|
current_timestamp = None
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
|
||||||
|
ts_match = TIMESTAMP_RE.match(line)
|
||||||
|
if ts_match:
|
||||||
|
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if current_timestamp:
|
||||||
|
avail_match = AVAIL_RE.match(line)
|
||||||
|
if avail_match:
|
||||||
|
timestamps.append(current_timestamp)
|
||||||
|
avail_values.append(int(avail_match.group(1)))
|
||||||
|
current_timestamp = None
|
||||||
|
|
||||||
|
if not timestamps:
|
||||||
|
return [], []
|
||||||
|
|
||||||
|
t0 = timestamps[0]
|
||||||
|
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||||
|
return seconds, avail_values
|
||||||
|
|
||||||
|
|
||||||
|
def parse_perf_log(log_path):
|
||||||
|
timestamps = []
|
||||||
|
sample_means = []
|
||||||
|
write_means = []
|
||||||
|
loop_means = []
|
||||||
|
|
||||||
|
with open(log_path, "r") as f:
|
||||||
|
for line in f:
|
||||||
|
m = PERF_RE.search(line)
|
||||||
|
if m:
|
||||||
|
ts_str, sample, write, loop = m.groups()
|
||||||
|
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
|
||||||
|
timestamps.append(ts)
|
||||||
|
sample_means.append(float(sample))
|
||||||
|
write_means.append(float(write))
|
||||||
|
loop_means.append(float(loop))
|
||||||
|
|
||||||
|
if not timestamps:
|
||||||
|
return [], [], [], []
|
||||||
|
|
||||||
|
t0 = timestamps[0]
|
||||||
|
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||||
|
return seconds, sample_means, write_means, loop_means
|
||||||
|
|
||||||
|
|
||||||
|
def parse_pyalsa_avail(perf_file):
|
||||||
|
"""Parse PyALSA avail before/after read from the perf log file."""
|
||||||
|
before_timestamps = []
|
||||||
|
before_values = []
|
||||||
|
after_timestamps = []
|
||||||
|
after_values = []
|
||||||
|
|
||||||
|
with open(perf_file, "r") as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
|
||||||
|
# Check for "avail before read"
|
||||||
|
before_match = PYALSA_AVAIL_BEFORE_RE.match(line)
|
||||||
|
if before_match:
|
||||||
|
ts_str, avail = before_match.groups()
|
||||||
|
current_year = datetime.now().year
|
||||||
|
ts_with_year = f"{current_year} {ts_str}"
|
||||||
|
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
|
||||||
|
before_timestamps.append(ts)
|
||||||
|
before_values.append(int(avail))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for "avail=" (after read)
|
||||||
|
after_match = PYALSA_AVAIL_AFTER_RE.match(line)
|
||||||
|
if after_match:
|
||||||
|
ts_str, avail = after_match.groups()
|
||||||
|
current_year = datetime.now().year
|
||||||
|
ts_with_year = f"{current_year} {ts_str}"
|
||||||
|
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
|
||||||
|
after_timestamps.append(ts)
|
||||||
|
after_values.append(int(avail))
|
||||||
|
|
||||||
|
return before_timestamps, before_values, after_timestamps, after_values
|
||||||
|
|
||||||
|
|
||||||
|
def parse_latency_yaml(yaml_path):
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
with open(yaml_path, 'r') as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
|
||||||
|
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
|
||||||
|
|
||||||
|
timestamps = []
|
||||||
|
latencies = []
|
||||||
|
|
||||||
|
for measurement in latency_measurements:
|
||||||
|
ts_str = measurement['timestamp']
|
||||||
|
latency = measurement['latency_ms']
|
||||||
|
|
||||||
|
# Parse ISO format timestamp
|
||||||
|
ts = datetime.fromisoformat(ts_str)
|
||||||
|
timestamps.append(ts)
|
||||||
|
latencies.append(float(latency))
|
||||||
|
|
||||||
|
if not timestamps:
|
||||||
|
return [], []
|
||||||
|
|
||||||
|
t0 = timestamps[0]
|
||||||
|
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||||
|
return seconds, latencies
|
||||||
|
|
||||||
|
|
||||||
|
def plot_combined(alsa_file, perf_file, latency_file, out_path):
|
||||||
|
# Parse all logs
|
||||||
|
alsa_seconds, avail_values = parse_alsa_status(alsa_file)
|
||||||
|
perf_seconds, sample_means, write_means, loop_means = parse_perf_log(perf_file)
|
||||||
|
latency_seconds, latencies = parse_latency_yaml(latency_file)
|
||||||
|
|
||||||
|
# Parse PyALSA avail data
|
||||||
|
before_timestamps, before_values, after_timestamps, after_values = parse_pyalsa_avail(perf_file)
|
||||||
|
|
||||||
|
# Get absolute timestamps for proper alignment
|
||||||
|
alsa_timestamps = []
|
||||||
|
perf_timestamps = []
|
||||||
|
latency_timestamps = []
|
||||||
|
|
||||||
|
# Re-parse to get absolute timestamps for alignment
|
||||||
|
with open(alsa_file, "r") as f:
|
||||||
|
current_timestamp = None
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
ts_match = TIMESTAMP_RE.match(line)
|
||||||
|
if ts_match:
|
||||||
|
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
|
||||||
|
continue
|
||||||
|
if current_timestamp:
|
||||||
|
avail_match = AVAIL_RE.match(line)
|
||||||
|
if avail_match:
|
||||||
|
alsa_timestamps.append(current_timestamp)
|
||||||
|
current_timestamp = None
|
||||||
|
|
||||||
|
with open(perf_file, "r") as f:
|
||||||
|
for line in f:
|
||||||
|
m = PERF_RE.search(line)
|
||||||
|
if m:
|
||||||
|
ts_str = m.group(1)
|
||||||
|
# Add current year to the timestamp since it doesn't include year
|
||||||
|
current_year = datetime.now().year
|
||||||
|
ts_with_year = f"{current_year} {ts_str}"
|
||||||
|
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
|
||||||
|
perf_timestamps.append(ts)
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
with open(latency_file, 'r') as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
|
||||||
|
for measurement in latency_measurements:
|
||||||
|
ts_str = measurement['timestamp']
|
||||||
|
ts = datetime.fromisoformat(ts_str)
|
||||||
|
latency_timestamps.append(ts)
|
||||||
|
|
||||||
|
# Find earliest timestamp
|
||||||
|
all_abs_timestamps = []
|
||||||
|
if alsa_timestamps:
|
||||||
|
all_abs_timestamps.extend(alsa_timestamps)
|
||||||
|
if perf_timestamps:
|
||||||
|
all_abs_timestamps.extend(perf_timestamps)
|
||||||
|
if latency_timestamps:
|
||||||
|
all_abs_timestamps.extend(latency_timestamps)
|
||||||
|
if before_timestamps:
|
||||||
|
all_abs_timestamps.extend(before_timestamps)
|
||||||
|
if after_timestamps:
|
||||||
|
all_abs_timestamps.extend(after_timestamps)
|
||||||
|
|
||||||
|
t0_absolute = min(all_abs_timestamps)
|
||||||
|
|
||||||
|
# Convert all times to seconds from earliest timestamp
|
||||||
|
alsa_aligned = [(ts - t0_absolute).total_seconds() for ts in alsa_timestamps] if alsa_timestamps else []
|
||||||
|
perf_aligned = [(ts - t0_absolute).total_seconds() for ts in perf_timestamps] if perf_timestamps else []
|
||||||
|
latency_aligned = [(ts - t0_absolute).total_seconds() for ts in latency_timestamps] if latency_timestamps else []
|
||||||
|
before_aligned = [(ts - t0_absolute).total_seconds() for ts in before_timestamps] if before_timestamps else []
|
||||||
|
after_aligned = [(ts - t0_absolute).total_seconds() for ts in after_timestamps] if after_timestamps else []
|
||||||
|
|
||||||
|
# Create figure with 4 subplots sharing x-axis
|
||||||
|
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(14, 12), sharex=True)
|
||||||
|
fig.suptitle("Combined Audio Performance Metrics", fontsize=16)
|
||||||
|
|
||||||
|
# Plot 1: ALSA avail
|
||||||
|
if alsa_aligned and avail_values:
|
||||||
|
ax1.plot(alsa_aligned, avail_values, label="avail", linewidth=1, alpha=0.7, color='blue')
|
||||||
|
if len(avail_values) >= 10:
|
||||||
|
window_size = min(50, len(avail_values) // 10)
|
||||||
|
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
|
||||||
|
ma_seconds = alsa_aligned[window_size-1:]
|
||||||
|
ax1.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})",
|
||||||
|
linewidth=2, color='darkblue')
|
||||||
|
ax1.set_ylabel("Available samples")
|
||||||
|
ax1.set_title("ALSA Available Samples")
|
||||||
|
ax1.legend()
|
||||||
|
ax1.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 2: Perf metrics
|
||||||
|
if perf_aligned:
|
||||||
|
ax2.plot(perf_aligned, sample_means, label="sample mean", linewidth=1, alpha=0.8, color='green')
|
||||||
|
ax2.plot(perf_aligned, write_means, label="write mean", linewidth=1, alpha=0.8, color='orange')
|
||||||
|
ax2.plot(perf_aligned, loop_means, label="loop mean", linewidth=1, alpha=0.8, color='red')
|
||||||
|
|
||||||
|
# Add moving average for loop mean
|
||||||
|
if len(loop_means) >= 10:
|
||||||
|
window_size = min(50, len(loop_means) // 10)
|
||||||
|
moving_avg = np.convolve(loop_means, np.ones(window_size)/window_size, mode='valid')
|
||||||
|
ma_seconds = perf_aligned[window_size-1:]
|
||||||
|
ax2.plot(ma_seconds, moving_avg, label=f"loop mean moving avg (window={window_size})",
|
||||||
|
linewidth=2, color='darkred', alpha=0.9)
|
||||||
|
|
||||||
|
ax2.set_ylabel("Duration (ms)")
|
||||||
|
ax2.set_title("Performance Metrics")
|
||||||
|
ax2.legend()
|
||||||
|
ax2.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 3: Latency
|
||||||
|
if latency_aligned:
|
||||||
|
ax3.plot(latency_aligned, latencies, label="latency", linewidth=1, color='purple')
|
||||||
|
ax3.set_ylabel("Latency (ms)")
|
||||||
|
ax3.set_title("Latency Buildup")
|
||||||
|
ax3.legend()
|
||||||
|
ax3.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 4: PyALSA avail before/after read
|
||||||
|
if before_aligned and before_values:
|
||||||
|
ax4.plot(before_aligned, before_values, label="avail before read", linewidth=1, alpha=0.7, color='cyan')
|
||||||
|
if after_aligned and after_values:
|
||||||
|
ax4.plot(after_aligned, after_values, label="avail after read", linewidth=1, alpha=0.7, color='magenta')
|
||||||
|
ax4.set_xlabel("Time (s)")
|
||||||
|
ax4.set_ylabel("Available samples")
|
||||||
|
ax4.set_title("PyALSA Available Samples (Before/After Read)")
|
||||||
|
if before_aligned or after_aligned:
|
||||||
|
ax4.legend()
|
||||||
|
ax4.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.savefig(out_path, dpi=150, bbox_inches='tight')
|
||||||
|
print(f"Combined plot saved to {out_path}")
|
||||||
|
|
||||||
|
# Show interactive plot
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) != 4:
|
||||||
|
print(f"Usage: {sys.argv[0]} <alsa_status.log> <perf_log.log> <latency_results.yaml>", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
alsa_file = sys.argv[1]
|
||||||
|
perf_file = sys.argv[2]
|
||||||
|
latency_file = sys.argv[3]
|
||||||
|
|
||||||
|
for file_path in [alsa_file, perf_file, latency_file]:
|
||||||
|
if not os.path.isfile(file_path):
|
||||||
|
print(f"File not found: {file_path}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Determine output path (same directory as first file)
|
||||||
|
log_dir = os.path.dirname(os.path.abspath(alsa_file))
|
||||||
|
out_path = os.path.join(log_dir, "combined_audio_plot.png")
|
||||||
|
|
||||||
|
plot_combined(alsa_file, perf_file, latency_file, out_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
437
plot_matrix.py
Normal file
437
plot_matrix.py
Normal file
@@ -0,0 +1,437 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Plot a results table image from a matrix test YAML file.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python plot_matrix.py <results.yaml>
|
||||||
|
python plot_matrix.py <results.yaml> --baseline <baseline.yaml>
|
||||||
|
python plot_matrix.py <results.yaml> --baseline <baseline.yaml> --output table.png
|
||||||
|
"""
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
from typing import Optional
|
||||||
|
import yaml
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import matplotlib.patches as mpatches
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Matrix layout constants
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
QOS_RATES = [
|
||||||
|
('fast', '16k'),
|
||||||
|
('fast', '24k'),
|
||||||
|
('fast', '48k'),
|
||||||
|
('robust', '16k'),
|
||||||
|
('robust', '24k'),
|
||||||
|
('robust', '48k'),
|
||||||
|
]
|
||||||
|
|
||||||
|
CHANNELS = ['mono', 'stereo']
|
||||||
|
PRESENTATION_DELAYS_MS = [10, 20, 40, 80]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Colour helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
COLOR_FAIL = '#FF4444' # red
|
||||||
|
COLOR_WORSE = '#FFA500' # orange
|
||||||
|
COLOR_BETTER = '#66BB6A' # green
|
||||||
|
COLOR_NEUTRAL = '#FFFFFF' # white
|
||||||
|
COLOR_MISSING = '#DDDDDD' # light grey – not run / no data
|
||||||
|
COLOR_HEADER = '#263238' # dark blue-grey header
|
||||||
|
COLOR_SUBHDR = '#455A64' # secondary header
|
||||||
|
COLOR_ROW_EVEN = '#FAFAFA'
|
||||||
|
COLOR_ROW_ODD = '#F0F4F8'
|
||||||
|
COLOR_HEADER_TEXT = '#FFFFFF'
|
||||||
|
|
||||||
|
|
||||||
|
def _latency_ok(lat: Optional[dict]) -> bool:
|
||||||
|
if lat is None:
|
||||||
|
return False
|
||||||
|
if lat.get('error'):
|
||||||
|
return False
|
||||||
|
if lat.get('valid') is False:
|
||||||
|
return False
|
||||||
|
return lat.get('avg') is not None
|
||||||
|
|
||||||
|
|
||||||
|
def _cell_color(result: dict, baseline_result: Optional[dict],
|
||||||
|
worse_threshold_pct: float = 10.0,
|
||||||
|
better_threshold_pct: float = 5.0) -> str:
|
||||||
|
"""Return a hex colour for the cell."""
|
||||||
|
lat = result.get('latency')
|
||||||
|
|
||||||
|
if not _latency_ok(lat):
|
||||||
|
return COLOR_FAIL
|
||||||
|
|
||||||
|
if baseline_result is None:
|
||||||
|
return COLOR_NEUTRAL
|
||||||
|
|
||||||
|
base_lat = baseline_result.get('latency')
|
||||||
|
if not _latency_ok(base_lat):
|
||||||
|
return COLOR_NEUTRAL
|
||||||
|
|
||||||
|
current_avg = lat['avg']
|
||||||
|
base_avg = base_lat['avg']
|
||||||
|
|
||||||
|
if base_avg == 0:
|
||||||
|
return COLOR_NEUTRAL
|
||||||
|
|
||||||
|
diff_pct = (current_avg - base_avg) / base_avg * 100.0
|
||||||
|
|
||||||
|
if diff_pct > worse_threshold_pct:
|
||||||
|
return COLOR_WORSE
|
||||||
|
if diff_pct < -better_threshold_pct:
|
||||||
|
return COLOR_BETTER
|
||||||
|
return COLOR_NEUTRAL
|
||||||
|
|
||||||
|
|
||||||
|
def _cell_text(result: dict, show_buildup: bool, show_quality: bool) -> list:
|
||||||
|
"""Return list of text lines for a cell."""
|
||||||
|
lat = result.get('latency')
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
if not _latency_ok(lat):
|
||||||
|
err = lat.get('error', 'FAIL') if lat else 'NO DATA'
|
||||||
|
short = err[:20] if len(err) > 20 else err
|
||||||
|
lines.append('FAIL')
|
||||||
|
if short and short != 'FAIL':
|
||||||
|
lines.append(short)
|
||||||
|
return lines
|
||||||
|
|
||||||
|
lines.append(f"{lat['avg']:.1f} ms")
|
||||||
|
|
||||||
|
if show_buildup:
|
||||||
|
bd = result.get('buildup')
|
||||||
|
if bd is not None:
|
||||||
|
detected = bd.get('buildup_detected')
|
||||||
|
if detected is True:
|
||||||
|
lines.append('buildup: YES')
|
||||||
|
elif detected is False:
|
||||||
|
lines.append('buildup: no')
|
||||||
|
else:
|
||||||
|
lines.append('buildup: n/a')
|
||||||
|
|
||||||
|
if show_quality:
|
||||||
|
q = result.get('quality')
|
||||||
|
if q is not None:
|
||||||
|
apm = q.get('artifacts_per_min')
|
||||||
|
if apm is not None:
|
||||||
|
lines.append(f"{apm:.1f} art/min")
|
||||||
|
else:
|
||||||
|
lines.append('quality: err')
|
||||||
|
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Core table builder
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def build_table(
|
||||||
|
matrix_results: dict,
|
||||||
|
baseline_results: Optional[dict],
|
||||||
|
metadata: dict,
|
||||||
|
baseline_metadata: Optional[dict],
|
||||||
|
show_buildup: bool,
|
||||||
|
show_quality: bool,
|
||||||
|
worse_threshold_pct: float = 10.0,
|
||||||
|
better_threshold_pct: float = 5.0,
|
||||||
|
) -> plt.Figure:
|
||||||
|
"""
|
||||||
|
Build and return a matplotlib Figure containing the results table.
|
||||||
|
"""
|
||||||
|
n_rows = len(QOS_RATES) # 6
|
||||||
|
n_pd = len(PRESENTATION_DELAYS_MS) # 4
|
||||||
|
n_ch = len(CHANNELS) # 2
|
||||||
|
n_cols = n_pd * n_ch # 8
|
||||||
|
|
||||||
|
# Determine cell height based on content rows per cell
|
||||||
|
lines_per_cell = 1
|
||||||
|
if show_buildup:
|
||||||
|
lines_per_cell += 1
|
||||||
|
if show_quality:
|
||||||
|
lines_per_cell += 1
|
||||||
|
|
||||||
|
cell_h = 0.5 + 0.22 * lines_per_cell # inches
|
||||||
|
cell_w = 1.45 # inches
|
||||||
|
row_label_w = 1.4 # inches for row labels
|
||||||
|
|
||||||
|
hdr_h = 0.55 # top presentation-delay header row
|
||||||
|
sub_h = 0.38 # mono/stereo sub-header row
|
||||||
|
|
||||||
|
total_w = row_label_w + n_cols * cell_w + 0.3
|
||||||
|
total_h = hdr_h + sub_h + n_rows * cell_h + 1.6 # extra for title & legend
|
||||||
|
|
||||||
|
fig, ax = plt.subplots(figsize=(total_w, total_h))
|
||||||
|
ax.set_xlim(0, total_w)
|
||||||
|
ax.set_ylim(0, total_h)
|
||||||
|
ax.axis('off')
|
||||||
|
|
||||||
|
# coordinate helpers (y grows upward in matplotlib, so we flip)
|
||||||
|
def x_col(col_idx: int) -> float:
|
||||||
|
return row_label_w + col_idx * cell_w
|
||||||
|
|
||||||
|
def y_row(row_idx: int) -> float:
|
||||||
|
# row 0 = topmost data row
|
||||||
|
return total_h - 1.4 - hdr_h - sub_h - (row_idx + 1) * cell_h
|
||||||
|
|
||||||
|
def add_rect(x, y, w, h, facecolor, edgecolor='#90A4AE', lw=0.6, zorder=1):
|
||||||
|
rect = mpatches.FancyBboxPatch(
|
||||||
|
(x, y), w, h,
|
||||||
|
boxstyle='square,pad=0',
|
||||||
|
facecolor=facecolor, edgecolor=edgecolor, linewidth=lw, zorder=zorder)
|
||||||
|
ax.add_patch(rect)
|
||||||
|
|
||||||
|
def add_text(x, y, text, fontsize=8, color='black', ha='center', va='center',
|
||||||
|
bold=False, wrap_lines=None):
|
||||||
|
weight = 'bold' if bold else 'normal'
|
||||||
|
if wrap_lines:
|
||||||
|
for i, line in enumerate(wrap_lines):
|
||||||
|
offset = (len(wrap_lines) - 1) / 2.0 - i
|
||||||
|
ax.text(x, y + offset * (fontsize * 0.014),
|
||||||
|
line, fontsize=fontsize, color=color,
|
||||||
|
ha=ha, va='center', fontweight=weight,
|
||||||
|
clip_on=True)
|
||||||
|
else:
|
||||||
|
ax.text(x, y, text, fontsize=fontsize, color=color,
|
||||||
|
ha=ha, va='center', fontweight=weight, clip_on=True)
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Title
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
ts = metadata.get('timestamp', '')
|
||||||
|
try:
|
||||||
|
ts_fmt = datetime.fromisoformat(ts).strftime('%Y-%m-%d %H:%M')
|
||||||
|
except Exception:
|
||||||
|
ts_fmt = ts
|
||||||
|
|
||||||
|
title_lines = [
|
||||||
|
f"Matrix Test Results — {metadata.get('test_id', '')}",
|
||||||
|
f"SN: {metadata.get('serial_number', 'n/a')} SW: {metadata.get('software_version', 'n/a')} {ts_fmt}",
|
||||||
|
]
|
||||||
|
if metadata.get('comment'):
|
||||||
|
title_lines.append(f"Comment: {metadata['comment']}")
|
||||||
|
if baseline_metadata:
|
||||||
|
title_lines.append(
|
||||||
|
f"Baseline: {baseline_metadata.get('test_id', 'n/a')} "
|
||||||
|
f"({baseline_metadata.get('timestamp', '')[:10]})"
|
||||||
|
)
|
||||||
|
|
||||||
|
title_y = total_h - 0.25
|
||||||
|
for i, line in enumerate(title_lines):
|
||||||
|
ax.text(total_w / 2, title_y - i * 0.28, line,
|
||||||
|
fontsize=9 if i == 0 else 7.5,
|
||||||
|
fontweight='bold' if i == 0 else 'normal',
|
||||||
|
ha='center', va='top', color='#1A237E')
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Row label column header (top-left corner block)
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
hdr_top = total_h - 1.4
|
||||||
|
# Spans presentation-delay header + mono/stereo sub-header
|
||||||
|
add_rect(0, hdr_top - hdr_h - sub_h, row_label_w, hdr_h + sub_h,
|
||||||
|
facecolor=COLOR_HEADER)
|
||||||
|
add_text(row_label_w / 2, hdr_top - (hdr_h + sub_h) / 2,
|
||||||
|
'QoS / Rate', fontsize=8, color=COLOR_HEADER_TEXT, bold=True)
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Presentation-delay group headers
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
for pd_idx, pd_ms in enumerate(PRESENTATION_DELAYS_MS):
|
||||||
|
col_start = pd_idx * n_ch
|
||||||
|
x = x_col(col_start)
|
||||||
|
w = cell_w * n_ch
|
||||||
|
add_rect(x, hdr_top - hdr_h, w, hdr_h, facecolor=COLOR_HEADER)
|
||||||
|
add_text(x + w / 2, hdr_top - hdr_h / 2,
|
||||||
|
f'PD {pd_ms} ms', fontsize=8.5, color=COLOR_HEADER_TEXT, bold=True)
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Mono / Stereo sub-headers
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
sub_top = hdr_top - hdr_h
|
||||||
|
for col in range(n_cols):
|
||||||
|
ch = CHANNELS[col % n_ch]
|
||||||
|
x = x_col(col)
|
||||||
|
add_rect(x, sub_top - sub_h, cell_w, sub_h, facecolor=COLOR_SUBHDR)
|
||||||
|
add_text(x + cell_w / 2, sub_top - sub_h / 2,
|
||||||
|
ch.capitalize(), fontsize=7.5, color=COLOR_HEADER_TEXT, bold=True)
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Data rows
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
for row_idx, (qos, rate) in enumerate(QOS_RATES):
|
||||||
|
row_bg = COLOR_ROW_EVEN if row_idx % 2 == 0 else COLOR_ROW_ODD
|
||||||
|
|
||||||
|
# Row label
|
||||||
|
y = y_row(row_idx)
|
||||||
|
add_rect(0, y, row_label_w, cell_h, facecolor=COLOR_SUBHDR if row_idx < 3 else '#37474F')
|
||||||
|
label = f"{'Fast' if qos == 'fast' else 'Robust'} {rate}"
|
||||||
|
add_text(row_label_w / 2, y + cell_h / 2,
|
||||||
|
label, fontsize=8, color=COLOR_HEADER_TEXT, bold=True)
|
||||||
|
|
||||||
|
for col_idx, (pd_ms, ch) in enumerate(
|
||||||
|
[(pd, ch)
|
||||||
|
for pd in PRESENTATION_DELAYS_MS
|
||||||
|
for ch in CHANNELS]):
|
||||||
|
key = f"{qos}_{rate}_{ch}_{pd_ms}ms"
|
||||||
|
result = matrix_results.get(key)
|
||||||
|
baseline_result = baseline_results.get(key) if baseline_results else None
|
||||||
|
|
||||||
|
x = x_col(col_idx)
|
||||||
|
|
||||||
|
if result is None:
|
||||||
|
add_rect(x, y, cell_w, cell_h, facecolor=COLOR_MISSING)
|
||||||
|
add_text(x + cell_w / 2, y + cell_h / 2, '—', fontsize=8)
|
||||||
|
continue
|
||||||
|
|
||||||
|
color = _cell_color(result, baseline_result,
|
||||||
|
worse_threshold_pct, better_threshold_pct)
|
||||||
|
add_rect(x, y, cell_w, cell_h, facecolor=color)
|
||||||
|
|
||||||
|
lines = _cell_text(result, show_buildup, show_quality)
|
||||||
|
# font size depends on how many lines
|
||||||
|
fs = 8.5 if len(lines) == 1 else 7.5
|
||||||
|
is_fail = color == COLOR_FAIL
|
||||||
|
txt_color = '#FFFFFF' if is_fail else '#1A1A2E'
|
||||||
|
|
||||||
|
# centre vertically
|
||||||
|
n = len(lines)
|
||||||
|
line_gap = cell_h / (n + 1)
|
||||||
|
for li, line in enumerate(lines):
|
||||||
|
line_y = y + cell_h - line_gap * (li + 1)
|
||||||
|
bold_line = li == 0 # first line (latency) is bold
|
||||||
|
ax.text(x + cell_w / 2, line_y, line,
|
||||||
|
fontsize=fs if li == 0 else fs - 0.5,
|
||||||
|
color=txt_color,
|
||||||
|
ha='center', va='center',
|
||||||
|
fontweight='bold' if bold_line else 'normal',
|
||||||
|
clip_on=True)
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Outer border for the full table
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
table_x = 0
|
||||||
|
table_y = y_row(n_rows - 1)
|
||||||
|
table_w = row_label_w + n_cols * cell_w
|
||||||
|
table_h_total = hdr_top - table_y
|
||||||
|
rect = mpatches.Rectangle((table_x, table_y), table_w, table_h_total,
|
||||||
|
fill=False, edgecolor='#37474F', linewidth=1.5)
|
||||||
|
ax.add_patch(rect)
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Legend
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
legend_y = y_row(n_rows - 1) - 0.55
|
||||||
|
legend_items = [
|
||||||
|
(COLOR_FAIL, 'FAIL / error'),
|
||||||
|
(COLOR_WORSE, f'>{worse_threshold_pct:.0f}% worse than baseline'),
|
||||||
|
(COLOR_NEUTRAL, 'Within threshold'),
|
||||||
|
(COLOR_BETTER, f'>{better_threshold_pct:.0f}% better than baseline'),
|
||||||
|
(COLOR_MISSING, 'Not measured'),
|
||||||
|
]
|
||||||
|
lx = 0.2
|
||||||
|
for color, label in legend_items:
|
||||||
|
add_rect(lx, legend_y - 0.18, 0.28, 0.25, facecolor=color,
|
||||||
|
edgecolor='#90A4AE', lw=0.8)
|
||||||
|
ax.text(lx + 0.35, legend_y - 0.055, label, fontsize=7, va='center')
|
||||||
|
lx += 2.2
|
||||||
|
|
||||||
|
plt.tight_layout(pad=0.1)
|
||||||
|
return fig
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# CLI
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def load_matrix_results(path: Path) -> tuple:
|
||||||
|
"""Load a matrix results YAML and return (matrix_results, metadata)."""
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
return data.get('matrix_results', {}), data.get('metadata', {})
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Plot matrix test results as a table image')
|
||||||
|
parser.add_argument('results', help='Path to matrix results YAML file')
|
||||||
|
parser.add_argument('--baseline', default=None,
|
||||||
|
help='Path to baseline matrix results YAML for comparison')
|
||||||
|
parser.add_argument('--output', default=None,
|
||||||
|
help='Output image path (default: <results_stem>_table.png)')
|
||||||
|
parser.add_argument('--worse-threshold', type=float, default=10.0,
|
||||||
|
help='Percent worse than baseline to colour orange (default: 10)')
|
||||||
|
parser.add_argument('--better-threshold', type=float, default=5.0,
|
||||||
|
help='Percent better than baseline to colour green (default: 5)')
|
||||||
|
parser.add_argument('--dpi', type=int, default=150,
|
||||||
|
help='Output image DPI (default: 150)')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
results_path = Path(args.results)
|
||||||
|
if not results_path.exists():
|
||||||
|
print(f"ERROR: Results file not found: {results_path}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
matrix_results, metadata = load_matrix_results(results_path)
|
||||||
|
|
||||||
|
baseline_results = None
|
||||||
|
baseline_metadata = None
|
||||||
|
if args.baseline:
|
||||||
|
baseline_path = Path(args.baseline)
|
||||||
|
if not baseline_path.exists():
|
||||||
|
print(f"ERROR: Baseline file not found: {baseline_path}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
baseline_results, baseline_metadata = load_matrix_results(baseline_path)
|
||||||
|
print(f"Comparing against baseline: {baseline_path.name}")
|
||||||
|
|
||||||
|
# Detect which optional columns are present
|
||||||
|
show_buildup = any(
|
||||||
|
r.get('buildup') is not None
|
||||||
|
for r in matrix_results.values()
|
||||||
|
)
|
||||||
|
show_quality = any(
|
||||||
|
r.get('quality') is not None
|
||||||
|
for r in matrix_results.values()
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Results: {len(matrix_results)} combinations")
|
||||||
|
print(f"Show buildup column: {show_buildup}")
|
||||||
|
print(f"Show quality column: {show_quality}")
|
||||||
|
|
||||||
|
fig = build_table(
|
||||||
|
matrix_results=matrix_results,
|
||||||
|
baseline_results=baseline_results,
|
||||||
|
metadata=metadata,
|
||||||
|
baseline_metadata=baseline_metadata,
|
||||||
|
show_buildup=show_buildup,
|
||||||
|
show_quality=show_quality,
|
||||||
|
worse_threshold_pct=args.worse_threshold,
|
||||||
|
better_threshold_pct=args.better_threshold,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Always save next to the results YAML
|
||||||
|
folder_copy = results_path.parent / f"{results_path.stem}_table.png"
|
||||||
|
fig.savefig(folder_copy, dpi=args.dpi, bbox_inches='tight',
|
||||||
|
facecolor='white', edgecolor='none')
|
||||||
|
print(f"Table saved to: {folder_copy}")
|
||||||
|
|
||||||
|
# If a custom --output path was given (and differs), save there too
|
||||||
|
if args.output:
|
||||||
|
output_path = Path(args.output)
|
||||||
|
if output_path.resolve() != folder_copy.resolve():
|
||||||
|
fig.savefig(output_path, dpi=args.dpi, bbox_inches='tight',
|
||||||
|
facecolor='white', edgecolor='none')
|
||||||
|
print(f"Table also saved to: {output_path}")
|
||||||
|
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
81
plot_perf_log.py
Normal file
81
plot_perf_log.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Parse Perf lines from a log file and plot sample mean, write mean, and loop mean over time."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
|
||||||
|
PERF_RE = re.compile(
|
||||||
|
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
|
||||||
|
r".*?sample mean=([\d.]+)ms"
|
||||||
|
r".*?write mean=([\d.]+)ms"
|
||||||
|
r".*?loop mean=([\d.]+)ms"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_log(log_path):
|
||||||
|
timestamps = []
|
||||||
|
sample_means = []
|
||||||
|
write_means = []
|
||||||
|
loop_means = []
|
||||||
|
|
||||||
|
with open(log_path, "r") as f:
|
||||||
|
for line in f:
|
||||||
|
m = PERF_RE.search(line)
|
||||||
|
if m:
|
||||||
|
ts_str, sample, write, loop = m.groups()
|
||||||
|
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
|
||||||
|
timestamps.append(ts)
|
||||||
|
sample_means.append(float(sample))
|
||||||
|
write_means.append(float(write))
|
||||||
|
loop_means.append(float(loop))
|
||||||
|
|
||||||
|
if not timestamps:
|
||||||
|
print("No Perf lines found in the log file.", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
t0 = timestamps[0]
|
||||||
|
seconds = [(t - t0).total_seconds() for t in timestamps]
|
||||||
|
return seconds, sample_means, write_means, loop_means
|
||||||
|
|
||||||
|
|
||||||
|
def plot(seconds, sample_means, write_means, loop_means, out_path):
|
||||||
|
plt.figure(figsize=(12, 6))
|
||||||
|
plt.plot(seconds, sample_means, label="sample mean (ms)")
|
||||||
|
plt.plot(seconds, write_means, label="write mean (ms)")
|
||||||
|
plt.plot(seconds, loop_means, label="loop mean (ms)")
|
||||||
|
plt.xlabel("Time (s)")
|
||||||
|
plt.ylabel("Duration (ms)")
|
||||||
|
plt.title("Perf Metrics Over Time")
|
||||||
|
plt.legend()
|
||||||
|
plt.grid(True)
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.savefig(out_path, dpi=150)
|
||||||
|
print(f"Plot saved to {out_path}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print(f"Usage: {sys.argv[0]} <path_to_log_file>", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
log_path = sys.argv[1]
|
||||||
|
if not os.path.isfile(log_path):
|
||||||
|
print(f"File not found: {log_path}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
seconds, sample_means, write_means, loop_means = parse_log(log_path)
|
||||||
|
|
||||||
|
log_dir = os.path.dirname(os.path.abspath(log_path))
|
||||||
|
log_base = os.path.splitext(os.path.basename(log_path))[0]
|
||||||
|
out_path = os.path.join(log_dir, f"{log_base}_perf_plot.png")
|
||||||
|
|
||||||
|
plot(seconds, sample_means, write_means, loop_means, out_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -3,3 +3,4 @@ scipy>=1.10.0
|
|||||||
sounddevice>=0.4.6
|
sounddevice>=0.4.6
|
||||||
PyYAML>=6.0
|
PyYAML>=6.0
|
||||||
matplotlib>=3.7.0
|
matplotlib>=3.7.0
|
||||||
|
requests>=2.28.0
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
|
import time
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import sounddevice as sd
|
import sounddevice as sd
|
||||||
from scipy import signal
|
from scipy import signal
|
||||||
from typing import Tuple, Dict, List
|
from scipy.io import wavfile
|
||||||
|
from typing import Tuple, Dict, List, Optional
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
def find_audio_device(device_name: str = "Scarlett") -> tuple:
|
def find_audio_device(device_name: str = "Scarlett") -> tuple:
|
||||||
devices = sd.query_devices()
|
devices = sd.query_devices()
|
||||||
|
|
||||||
for idx, device in enumerate(devices):
|
for idx, device in enumerate(devices):
|
||||||
if device_name.lower() in device['name'].lower():
|
if device_name.lower() in device['name'].lower():
|
||||||
if device['max_input_channels'] >= 2 and device['max_output_channels'] >= 2:
|
if device['max_input_channels'] >= 2 and device['max_output_channels'] >= 2:
|
||||||
@@ -45,10 +46,17 @@ def generate_chirp(duration: float, sample_rate: int, f0: float = 100, f1: float
|
|||||||
|
|
||||||
def play_and_record(tone: np.ndarray, sample_rate: int, device_id: tuple, channels: int = 2) -> np.ndarray:
|
def play_and_record(tone: np.ndarray, sample_rate: int, device_id: tuple, channels: int = 2) -> np.ndarray:
|
||||||
output_signal = np.column_stack([tone, tone])
|
output_signal = np.column_stack([tone, tone])
|
||||||
|
|
||||||
input_dev, output_dev = device_id
|
input_dev, output_dev = device_id
|
||||||
|
|
||||||
|
sd.stop()
|
||||||
recording = sd.playrec(output_signal, samplerate=sample_rate,
|
recording = sd.playrec(output_signal, samplerate=sample_rate,
|
||||||
channels=channels, device=(input_dev, output_dev), blocking=True)
|
channels=channels, device=(input_dev, output_dev),
|
||||||
|
latency='high', blocking=True)
|
||||||
|
sd.stop()
|
||||||
|
|
||||||
|
if not np.isfinite(recording).all():
|
||||||
|
raise RuntimeError("Recording contains NaN/Inf — ALSA stream corrupted. "
|
||||||
|
"Try replugging the audio interface.")
|
||||||
|
|
||||||
return recording
|
return recording
|
||||||
|
|
||||||
@@ -216,6 +224,12 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool =
|
|||||||
|
|
||||||
chirp_signal = generate_chirp(duration, sample_rate, amplitude=amplitude)
|
chirp_signal = generate_chirp(duration, sample_rate, amplitude=amplitude)
|
||||||
|
|
||||||
|
# Discard one warm-up recording to flush stale ALSA ring buffer data
|
||||||
|
try:
|
||||||
|
play_and_record(chirp_signal, sample_rate, device_ids, channels)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
latencies = []
|
latencies = []
|
||||||
last_recording = None
|
last_recording = None
|
||||||
last_correlation = None
|
last_correlation = None
|
||||||
@@ -235,11 +249,22 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool =
|
|||||||
last_correlation = correlation
|
last_correlation = correlation
|
||||||
last_lags = lags
|
last_lags = lags
|
||||||
|
|
||||||
|
avg = float(np.mean(latencies))
|
||||||
|
std_dev = float(np.std(latencies))
|
||||||
|
latency_cfg = config.get('latency', {})
|
||||||
|
max_std_dev_ms = latency_cfg.get('max_std_dev_ms', None)
|
||||||
|
min_avg_ms = latency_cfg.get('min_avg_ms', None)
|
||||||
|
valid = True
|
||||||
|
if max_std_dev_ms is not None and std_dev > max_std_dev_ms:
|
||||||
|
valid = False
|
||||||
|
if min_avg_ms is not None and avg < min_avg_ms:
|
||||||
|
valid = False
|
||||||
latency_stats = {
|
latency_stats = {
|
||||||
'avg': float(np.mean(latencies)),
|
'avg': avg,
|
||||||
'min': float(np.min(latencies)),
|
'min': float(np.min(latencies)),
|
||||||
'max': float(np.max(latencies)),
|
'max': float(np.max(latencies)),
|
||||||
'std': float(np.std(latencies))
|
'std': std_dev,
|
||||||
|
'valid': valid
|
||||||
}
|
}
|
||||||
|
|
||||||
if save_plots and output_dir and last_recording is not None:
|
if save_plots and output_dir and last_recording is not None:
|
||||||
@@ -509,6 +534,312 @@ def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
|
|||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
|
|
||||||
|
def generate_sync_marker(sample_rate: int, duration_sec: float = 0.05,
|
||||||
|
f0: float = 200.0, f1: float = 4000.0,
|
||||||
|
amplitude: float = 0.7) -> np.ndarray:
|
||||||
|
t = np.linspace(0, duration_sec, int(sample_rate * duration_sec), endpoint=False)
|
||||||
|
win = np.hanning(len(t))
|
||||||
|
return amplitude * win * signal.chirp(t, f0, duration_sec, f1, method='linear')
|
||||||
|
|
||||||
|
|
||||||
|
def embed_markers(base_signal: np.ndarray, marker: np.ndarray,
|
||||||
|
sample_rate: int, interval_sec: float = 5.0,
|
||||||
|
first_offset_sec: float = 0.5) -> Tuple[np.ndarray, List[int]]:
|
||||||
|
out = base_signal.copy()
|
||||||
|
marker_len = len(marker)
|
||||||
|
positions: List[int] = []
|
||||||
|
t = first_offset_sec
|
||||||
|
total_sec = len(base_signal) / sample_rate
|
||||||
|
while t + marker_len / sample_rate + 0.1 <= total_sec:
|
||||||
|
pos = int(t * sample_rate)
|
||||||
|
end = pos + marker_len
|
||||||
|
out[pos:end] = np.clip(out[pos:end] + marker, -1.0, 1.0)
|
||||||
|
positions.append(pos)
|
||||||
|
t += interval_sec
|
||||||
|
return out, positions
|
||||||
|
|
||||||
|
|
||||||
|
def find_all_marker_positions(channel: np.ndarray, marker_template: np.ndarray,
|
||||||
|
expected_positions: List[int], sample_rate: int,
|
||||||
|
search_radius_ms: float = 500.0) -> List[Optional[int]]:
|
||||||
|
corr = np.abs(signal.correlate(channel, marker_template, mode='valid'))
|
||||||
|
search_radius = int(search_radius_ms / 1000.0 * sample_rate)
|
||||||
|
found: List[Optional[int]] = []
|
||||||
|
for expected in expected_positions:
|
||||||
|
lo = max(0, expected - search_radius)
|
||||||
|
hi = min(len(corr), expected + search_radius + 1)
|
||||||
|
if lo >= hi:
|
||||||
|
found.append(None)
|
||||||
|
continue
|
||||||
|
local_idx = int(np.argmax(corr[lo:hi]))
|
||||||
|
found.append(lo + local_idx)
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
|
def align_by_markers(reference: np.ndarray, dut: np.ndarray,
|
||||||
|
marker_template: np.ndarray, expected_positions: List[int],
|
||||||
|
sample_rate: int, max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]:
|
||||||
|
ref_pos = find_all_marker_positions(reference, marker_template, expected_positions,
|
||||||
|
sample_rate, max_lag_ms)
|
||||||
|
dut_pos = find_all_marker_positions(dut, marker_template, expected_positions,
|
||||||
|
sample_rate, max_lag_ms)
|
||||||
|
lags = [dp - rp for rp, dp in zip(ref_pos, dut_pos)
|
||||||
|
if rp is not None and dp is not None]
|
||||||
|
if not lags:
|
||||||
|
return align_channels(reference, dut, sample_rate, max_lag_ms)
|
||||||
|
lag_samples = int(round(float(np.median(lags))))
|
||||||
|
if lag_samples >= 0:
|
||||||
|
ref_aligned = reference[:len(reference) - lag_samples]
|
||||||
|
dut_aligned = dut[lag_samples:]
|
||||||
|
else:
|
||||||
|
ref_aligned = reference[-lag_samples:]
|
||||||
|
dut_aligned = dut[:len(dut) + lag_samples]
|
||||||
|
min_len = min(len(ref_aligned), len(dut_aligned))
|
||||||
|
return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples
|
||||||
|
|
||||||
|
|
||||||
|
def detect_sample_slips_by_markers(reference: np.ndarray, dut: np.ndarray,
|
||||||
|
marker_template: np.ndarray,
|
||||||
|
expected_ref_positions: List[int],
|
||||||
|
sample_rate: int, lag_samples: int,
|
||||||
|
min_slip_samples: int = 3) -> List[Dict]:
|
||||||
|
artifacts: List[Dict] = []
|
||||||
|
total_duration = len(reference) / sample_rate
|
||||||
|
expected_dut_positions = [p + lag_samples for p in expected_ref_positions]
|
||||||
|
ref_found = find_all_marker_positions(reference, marker_template, expected_ref_positions,
|
||||||
|
sample_rate, search_radius_ms=200.0)
|
||||||
|
dut_found = find_all_marker_positions(dut, marker_template, expected_dut_positions,
|
||||||
|
sample_rate, search_radius_ms=200.0)
|
||||||
|
interval_lags: List[Optional[int]] = [
|
||||||
|
(dp - rp) if rp is not None and dp is not None else None
|
||||||
|
for rp, dp in zip(ref_found, dut_found)
|
||||||
|
]
|
||||||
|
valid_lags = [v for v in interval_lags if v is not None]
|
||||||
|
baseline_lag = int(round(float(np.median(valid_lags)))) if valid_lags else 0
|
||||||
|
for i in range(len(interval_lags)):
|
||||||
|
if interval_lags[i] is None:
|
||||||
|
continue
|
||||||
|
delta = int(interval_lags[i]) - baseline_lag
|
||||||
|
if abs(delta) >= min_slip_samples:
|
||||||
|
time_sec = float(expected_ref_positions[i] / sample_rate)
|
||||||
|
if time_sec < 1.0 or time_sec > total_duration - 1.0:
|
||||||
|
continue
|
||||||
|
artifacts.append({
|
||||||
|
'type': 'sample_slip',
|
||||||
|
'time_sec': round(time_sec, 4),
|
||||||
|
'lag_change_samples': int(delta),
|
||||||
|
'lag_baseline': baseline_lag,
|
||||||
|
'lag_at_marker': int(interval_lags[i]),
|
||||||
|
})
|
||||||
|
return artifacts
|
||||||
|
|
||||||
|
|
||||||
|
def align_channels(reference: np.ndarray, dut: np.ndarray, sample_rate: int,
|
||||||
|
max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]:
|
||||||
|
max_lag_samples = int(max_lag_ms / 1000.0 * sample_rate)
|
||||||
|
|
||||||
|
# Use a representative middle segment (up to 4 s) for robust correlation
|
||||||
|
seg_len = min(len(reference), len(dut), int(sample_rate * 4))
|
||||||
|
mid = min(len(reference), len(dut)) // 2
|
||||||
|
ref_seg = reference[mid - seg_len // 2: mid + seg_len // 2]
|
||||||
|
dut_seg = dut[mid - seg_len // 2: mid + seg_len // 2]
|
||||||
|
|
||||||
|
correlation = signal.correlate(dut_seg, ref_seg, mode='full')
|
||||||
|
lags = np.arange(-(len(ref_seg) - 1), len(ref_seg))
|
||||||
|
valid_mask = np.abs(lags) <= max_lag_samples
|
||||||
|
corr_masked = np.where(valid_mask, np.abs(correlation), 0)
|
||||||
|
lag_samples = int(lags[np.argmax(corr_masked)])
|
||||||
|
|
||||||
|
if lag_samples >= 0:
|
||||||
|
ref_aligned = reference[:len(reference) - lag_samples]
|
||||||
|
dut_aligned = dut[lag_samples:]
|
||||||
|
else:
|
||||||
|
ref_aligned = reference[-lag_samples:]
|
||||||
|
dut_aligned = dut[:len(dut) + lag_samples]
|
||||||
|
|
||||||
|
min_len = min(len(ref_aligned), len(dut_aligned))
|
||||||
|
return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples
|
||||||
|
|
||||||
|
|
||||||
|
def compute_residual(ref_aligned: np.ndarray, dut_aligned: np.ndarray) -> Tuple[np.ndarray, float]:
|
||||||
|
ref_rms = np.sqrt(np.mean(ref_aligned ** 2))
|
||||||
|
if ref_rms < 1e-10:
|
||||||
|
return dut_aligned.copy(), 1.0
|
||||||
|
gain = float(np.sqrt(np.mean(dut_aligned ** 2)) / ref_rms)
|
||||||
|
residual = dut_aligned - gain * ref_aligned
|
||||||
|
return residual, gain
|
||||||
|
|
||||||
|
|
||||||
|
def detect_glitches_short_time_energy(residual: np.ndarray, sample_rate: int,
|
||||||
|
window_ms: float = 5.0,
|
||||||
|
threshold_factor: float = 6.0,
|
||||||
|
min_burst_ms: float = 0.5) -> List[Dict]:
|
||||||
|
from scipy.ndimage import uniform_filter1d
|
||||||
|
artifacts = []
|
||||||
|
window_samples = max(4, int(window_ms / 1000.0 * sample_rate))
|
||||||
|
min_burst_samples = max(1, int(min_burst_ms / 1000.0 * sample_rate))
|
||||||
|
total_duration = len(residual) / sample_rate
|
||||||
|
|
||||||
|
rms_envelope = np.sqrt(np.maximum(
|
||||||
|
uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0))
|
||||||
|
|
||||||
|
baseline_rms = np.median(rms_envelope)
|
||||||
|
if baseline_rms < 1e-12:
|
||||||
|
return artifacts
|
||||||
|
|
||||||
|
exceed = rms_envelope > threshold_factor * baseline_rms
|
||||||
|
changes = np.diff(exceed.astype(np.int8), prepend=0, append=0)
|
||||||
|
burst_starts = np.where(changes == 1)[0]
|
||||||
|
burst_ends = np.where(changes == -1)[0]
|
||||||
|
|
||||||
|
for bstart, bend in zip(burst_starts, burst_ends):
|
||||||
|
burst_len = int(bend - bstart)
|
||||||
|
if burst_len < min_burst_samples:
|
||||||
|
continue
|
||||||
|
time_sec = float(bstart / sample_rate)
|
||||||
|
if time_sec < 1.0 or time_sec > total_duration - 1.0:
|
||||||
|
continue
|
||||||
|
peak_rms = float(np.max(rms_envelope[bstart:bend]))
|
||||||
|
artifacts.append({
|
||||||
|
'type': 'null_test_glitch',
|
||||||
|
'time_sec': round(time_sec, 4),
|
||||||
|
'duration_ms': round(burst_len / sample_rate * 1000.0, 2),
|
||||||
|
'peak_residual_rms': round(peak_rms, 8),
|
||||||
|
'baseline_rms': round(float(baseline_rms), 8),
|
||||||
|
'deviation_factor': round(peak_rms / baseline_rms, 2)
|
||||||
|
})
|
||||||
|
|
||||||
|
return artifacts
|
||||||
|
|
||||||
|
|
||||||
|
def detect_sample_slips(reference: np.ndarray, dut: np.ndarray, sample_rate: int,
|
||||||
|
window_ms: float = 50.0, step_ms: float = 100.0) -> List[Dict]:
|
||||||
|
artifacts = []
|
||||||
|
window_samples = int(window_ms / 1000.0 * sample_rate)
|
||||||
|
step_samples = int(step_ms / 1000.0 * sample_rate)
|
||||||
|
max_search = min(window_samples // 4, int(0.010 * sample_rate)) # ±10 ms
|
||||||
|
|
||||||
|
min_len = min(len(reference), len(dut))
|
||||||
|
total_duration = min_len / sample_rate
|
||||||
|
if min_len < window_samples * 2:
|
||||||
|
return artifacts
|
||||||
|
|
||||||
|
lags_over_time = []
|
||||||
|
times = []
|
||||||
|
|
||||||
|
for start in range(0, min_len - window_samples, step_samples):
|
||||||
|
ref_seg = reference[start: start + window_samples]
|
||||||
|
dut_seg = dut[start: start + window_samples]
|
||||||
|
corr = signal.correlate(dut_seg, ref_seg, mode='full')
|
||||||
|
lags = np.arange(-(len(ref_seg) - 1), len(ref_seg))
|
||||||
|
valid = np.abs(lags) <= max_search
|
||||||
|
best_lag = int(lags[np.argmax(np.where(valid, np.abs(corr), 0))])
|
||||||
|
lags_over_time.append(best_lag)
|
||||||
|
times.append(float(start / sample_rate))
|
||||||
|
|
||||||
|
if len(lags_over_time) < 3:
|
||||||
|
return artifacts
|
||||||
|
|
||||||
|
lags_arr = np.array(lags_over_time)
|
||||||
|
for i in range(1, len(lags_arr)):
|
||||||
|
delta = int(lags_arr[i]) - int(lags_arr[i - 1])
|
||||||
|
if abs(delta) >= 1:
|
||||||
|
time_sec = times[i]
|
||||||
|
if time_sec < 1.0 or time_sec > total_duration - 1.0:
|
||||||
|
continue
|
||||||
|
artifacts.append({
|
||||||
|
'type': 'sample_slip',
|
||||||
|
'time_sec': round(time_sec, 4),
|
||||||
|
'lag_change_samples': int(delta),
|
||||||
|
'lag_before': int(lags_arr[i - 1]),
|
||||||
|
'lag_after': int(lags_arr[i])
|
||||||
|
})
|
||||||
|
|
||||||
|
return artifacts
|
||||||
|
|
||||||
|
|
||||||
|
def detect_artifacts_null_test(reference: np.ndarray, dut: np.ndarray,
|
||||||
|
sample_rate: int, null_test_config: Dict,
|
||||||
|
marker_template: Optional[np.ndarray] = None,
|
||||||
|
marker_positions: Optional[List[int]] = None) -> Dict:
|
||||||
|
result = {
|
||||||
|
'enabled': null_test_config.get('enabled', True),
|
||||||
|
'lag_samples': 0,
|
||||||
|
'lag_ms': 0.0,
|
||||||
|
'gain_factor': 1.0,
|
||||||
|
'residual_rms': 0.0,
|
||||||
|
'residual_peak': 0.0,
|
||||||
|
'total_count': 0,
|
||||||
|
'by_type': {},
|
||||||
|
'artifacts': [],
|
||||||
|
'_ref_aligned': None,
|
||||||
|
'_dut_aligned': None,
|
||||||
|
'_residual': None,
|
||||||
|
}
|
||||||
|
|
||||||
|
if not result['enabled']:
|
||||||
|
return result
|
||||||
|
|
||||||
|
# Guard: skip if either channel is silent/corrupted (e.g. ALSA underrun)
|
||||||
|
min_signal_level = 1e-4
|
||||||
|
if np.max(np.abs(reference)) < min_signal_level or np.max(np.abs(dut)) < min_signal_level:
|
||||||
|
result['error'] = 'recording_too_quiet_or_corrupted'
|
||||||
|
print(" ⚠ Null test skipped: one or both channels are silent (possible ALSA underrun).")
|
||||||
|
return result
|
||||||
|
|
||||||
|
max_lag_ms = float(null_test_config.get('max_lag_ms', 500.0))
|
||||||
|
|
||||||
|
if marker_template is not None and marker_positions:
|
||||||
|
ref_aligned, dut_aligned, lag_samples = align_by_markers(
|
||||||
|
reference, dut, marker_template, marker_positions, sample_rate, max_lag_ms)
|
||||||
|
alignment_method = 'marker'
|
||||||
|
else:
|
||||||
|
ref_aligned, dut_aligned, lag_samples = align_channels(
|
||||||
|
reference, dut, sample_rate, max_lag_ms)
|
||||||
|
alignment_method = 'xcorr'
|
||||||
|
|
||||||
|
result['lag_samples'] = lag_samples
|
||||||
|
result['lag_ms'] = round(float(lag_samples) / sample_rate * 1000.0, 3)
|
||||||
|
result['alignment_method'] = alignment_method
|
||||||
|
|
||||||
|
residual, gain = compute_residual(ref_aligned, dut_aligned)
|
||||||
|
result['gain_factor'] = round(gain, 6)
|
||||||
|
result['residual_rms'] = round(float(np.sqrt(np.mean(residual ** 2))), 8)
|
||||||
|
result['residual_peak'] = round(float(np.max(np.abs(residual))), 6)
|
||||||
|
|
||||||
|
result['_ref_aligned'] = ref_aligned
|
||||||
|
result['_dut_aligned'] = dut_aligned
|
||||||
|
result['_residual'] = residual
|
||||||
|
|
||||||
|
all_artifacts = []
|
||||||
|
|
||||||
|
window_ms = float(null_test_config.get('window_ms', 5.0))
|
||||||
|
threshold_factor = float(null_test_config.get('threshold_factor', 6.0))
|
||||||
|
min_burst_ms = float(null_test_config.get('min_burst_ms', 0.5))
|
||||||
|
glitches = detect_glitches_short_time_energy(
|
||||||
|
residual, sample_rate, window_ms, threshold_factor, min_burst_ms)
|
||||||
|
all_artifacts.extend(glitches)
|
||||||
|
|
||||||
|
if null_test_config.get('sample_slip_detection', True):
|
||||||
|
if marker_template is not None and marker_positions and len(marker_positions) >= 2:
|
||||||
|
min_slip_samples = int(null_test_config.get('min_slip_samples', 3))
|
||||||
|
slips = detect_sample_slips_by_markers(
|
||||||
|
reference, dut, marker_template, marker_positions, sample_rate, lag_samples,
|
||||||
|
min_slip_samples=min_slip_samples)
|
||||||
|
else:
|
||||||
|
slip_window_ms = float(null_test_config.get('sample_slip_window_ms', 50.0))
|
||||||
|
slips = detect_sample_slips(ref_aligned, dut_aligned, sample_rate, slip_window_ms)
|
||||||
|
all_artifacts.extend(slips)
|
||||||
|
|
||||||
|
result['total_count'] = len(all_artifacts)
|
||||||
|
result['artifacts'] = all_artifacts
|
||||||
|
for a in all_artifacts:
|
||||||
|
t = a['type']
|
||||||
|
result['by_type'][t] = result['by_type'].get(t, 0) + 1
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
|
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
|
||||||
expected_freq: float) -> Dict:
|
expected_freq: float) -> Dict:
|
||||||
"""
|
"""
|
||||||
@@ -698,6 +1029,147 @@ def plot_artifact_detection(channel_1: np.ndarray, channel_2: np.ndarray,
|
|||||||
plt.close()
|
plt.close()
|
||||||
|
|
||||||
|
|
||||||
|
def plot_deviation_histogram(artifacts_ch1: Dict, artifacts_ch2: Dict, output_dir: Path):
|
||||||
|
def get_deviations(artifacts_dict):
|
||||||
|
values = []
|
||||||
|
for a in artifacts_dict.get('artifacts', []):
|
||||||
|
if 'deviation_factor' in a:
|
||||||
|
values.append(a['deviation_factor'])
|
||||||
|
return values
|
||||||
|
|
||||||
|
dev_ch1 = get_deviations(artifacts_ch1)
|
||||||
|
dev_ch2 = get_deviations(artifacts_ch2)
|
||||||
|
|
||||||
|
all_devs = dev_ch1 + dev_ch2
|
||||||
|
if not all_devs:
|
||||||
|
return
|
||||||
|
|
||||||
|
raw_min = min(all_devs)
|
||||||
|
raw_max = max(all_devs)
|
||||||
|
MAX_BINS = 50
|
||||||
|
|
||||||
|
if raw_max / max(raw_min, 1e-9) > 20 or (raw_max - raw_min) > MAX_BINS:
|
||||||
|
bins = np.logspace(np.log10(max(raw_min, 0.1)), np.log10(raw_max + 1), MAX_BINS + 1)
|
||||||
|
bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)]
|
||||||
|
else:
|
||||||
|
bin_min = int(np.floor(raw_min))
|
||||||
|
bin_max = int(np.ceil(raw_max)) + 1
|
||||||
|
bins = np.arange(bin_min, bin_max + 1)
|
||||||
|
if len(bins) > MAX_BINS + 1:
|
||||||
|
bins = np.linspace(bin_min, bin_max, MAX_BINS + 1)
|
||||||
|
bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)]
|
||||||
|
|
||||||
|
counts_ch1, _ = np.histogram(dev_ch1 if dev_ch1 else [0], bins=bins)
|
||||||
|
counts_ch2, _ = np.histogram(dev_ch2 if dev_ch2 else [0], bins=bins)
|
||||||
|
|
||||||
|
x = np.arange(len(bin_labels))
|
||||||
|
width = 0.4
|
||||||
|
|
||||||
|
fig_width = min(24, max(10, len(bin_labels) * 0.5))
|
||||||
|
fig, ax = plt.subplots(figsize=(fig_width, 6))
|
||||||
|
bars1 = ax.bar(x - width / 2, counts_ch1, width, label='Ch1 Loopback', color='steelblue', alpha=0.85)
|
||||||
|
bars2 = ax.bar(x + width / 2, counts_ch2, width, label='Ch2 DUT/Radio', color='tomato', alpha=0.85)
|
||||||
|
|
||||||
|
ax.set_xlabel('Deviation Factor (σ)')
|
||||||
|
ax.set_ylabel('Count')
|
||||||
|
ax.set_title('Artifact Deviation Factor Distribution')
|
||||||
|
ax.set_xticks(x)
|
||||||
|
ax.set_xticklabels(bin_labels, rotation=45, ha='right')
|
||||||
|
ax.yaxis.get_major_locator().set_params(integer=True)
|
||||||
|
ax.legend()
|
||||||
|
ax.grid(True, axis='y', alpha=0.3)
|
||||||
|
|
||||||
|
for bar in bars1:
|
||||||
|
if bar.get_height() > 0:
|
||||||
|
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.05,
|
||||||
|
str(int(bar.get_height())), ha='center', va='bottom', fontsize=8)
|
||||||
|
for bar in bars2:
|
||||||
|
if bar.get_height() > 0:
|
||||||
|
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.05,
|
||||||
|
str(int(bar.get_height())), ha='center', va='bottom', fontsize=8)
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
plot_file = output_dir / 'artifact_deviation_histogram.png'
|
||||||
|
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
|
||||||
|
plt.close()
|
||||||
|
|
||||||
|
|
||||||
|
def plot_null_test(null_test_result: Dict, sample_rate: int, output_dir: Path,
|
||||||
|
marker_positions: Optional[List[int]] = None,
|
||||||
|
marker_len_samples: int = 0):
|
||||||
|
from scipy.ndimage import uniform_filter1d
|
||||||
|
|
||||||
|
ref_aligned = null_test_result['_ref_aligned']
|
||||||
|
dut_aligned = null_test_result['_dut_aligned']
|
||||||
|
residual = null_test_result['_residual']
|
||||||
|
|
||||||
|
if ref_aligned is None or residual is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
total_duration = len(ref_aligned) / sample_rate
|
||||||
|
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
|
||||||
|
time = np.arange(len(ref_aligned)) / sample_rate
|
||||||
|
|
||||||
|
axes[0].plot(time, ref_aligned, alpha=0.6, linewidth=0.5, label='Ch1 Loopback (aligned)')
|
||||||
|
axes[0].plot(time, dut_aligned, alpha=0.6, linewidth=0.5, label='Ch2 DUT (aligned)')
|
||||||
|
axes[0].set_ylabel('Amplitude')
|
||||||
|
axes[0].set_title(
|
||||||
|
f'Null Test — Aligned Channels '
|
||||||
|
f'(lag={null_test_result["lag_ms"]:.2f} ms, '
|
||||||
|
f'gain={null_test_result["gain_factor"]:.4f})')
|
||||||
|
axes[0].legend(loc='upper right', fontsize=8)
|
||||||
|
axes[0].grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
axes[1].plot(time, residual, alpha=0.8, linewidth=0.4, color='purple')
|
||||||
|
axes[1].set_ylabel('Amplitude')
|
||||||
|
axes[1].set_title(
|
||||||
|
f'Residual e[n] = DUT − gain·Ref '
|
||||||
|
f'(RMS={null_test_result["residual_rms"]:.2e}, '
|
||||||
|
f'peak={null_test_result["residual_peak"]:.4f})')
|
||||||
|
for a in null_test_result['artifacts']:
|
||||||
|
color = 'red' if a['type'] == 'null_test_glitch' else 'orange'
|
||||||
|
axes[1].axvline(x=a['time_sec'], color=color, alpha=0.6, linewidth=1.0)
|
||||||
|
axes[1].grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
window_samples = max(4, int(5.0 / 1000.0 * sample_rate))
|
||||||
|
rms_envelope = np.sqrt(np.maximum(
|
||||||
|
uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0))
|
||||||
|
|
||||||
|
axes[2].plot(time, rms_envelope, linewidth=0.6, color='darkgreen', label='Short-time RMS (5 ms)')
|
||||||
|
axes[2].set_xlabel('Time (s)')
|
||||||
|
axes[2].set_ylabel('Short-time RMS')
|
||||||
|
axes[2].set_title('Short-time RMS of Residual')
|
||||||
|
axes[2].grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Determine x-window for lower two axes: exclude marker regions at start and end
|
||||||
|
margin_sec = 0.5
|
||||||
|
if marker_positions and len(marker_positions) >= 2 and marker_len_samples > 0:
|
||||||
|
x_min = (marker_positions[0] + marker_len_samples) / sample_rate + margin_sec
|
||||||
|
x_max = marker_positions[-1] / sample_rate - margin_sec
|
||||||
|
else:
|
||||||
|
x_min = margin_sec
|
||||||
|
x_max = total_duration - margin_sec
|
||||||
|
|
||||||
|
if x_min < x_max:
|
||||||
|
axes[1].set_xlim(x_min, x_max)
|
||||||
|
axes[2].set_xlim(x_min, x_max)
|
||||||
|
|
||||||
|
# Recompute baseline from the visible region only (unaffected by marker bursts)
|
||||||
|
i_min = max(0, int(x_min * sample_rate))
|
||||||
|
i_max = min(len(rms_envelope), int(x_max * sample_rate))
|
||||||
|
baseline = float(np.median(rms_envelope[i_min:i_max]))
|
||||||
|
else:
|
||||||
|
baseline = float(np.median(rms_envelope))
|
||||||
|
|
||||||
|
axes[2].axhline(y=baseline, color='steelblue', linestyle='--', linewidth=1,
|
||||||
|
label=f'Baseline ({baseline:.2e})')
|
||||||
|
axes[2].legend(fontsize=8)
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.savefig(output_dir / 'null_test_residual.png', dpi=150, bbox_inches='tight')
|
||||||
|
plt.close()
|
||||||
|
|
||||||
|
|
||||||
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
|
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@@ -708,6 +1180,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
|
|||||||
device_name = config['audio']['device_name']
|
device_name = config['audio']['device_name']
|
||||||
channels = config['audio']['channels']
|
channels = config['audio']['channels']
|
||||||
detector_config = config['artifact_detection']['detectors']
|
detector_config = config['artifact_detection']['detectors']
|
||||||
|
null_test_config = detector_config.get('null_test', {})
|
||||||
startup_delay = config['artifact_detection'].get('startup_delay', 10)
|
startup_delay = config['artifact_detection'].get('startup_delay', 10)
|
||||||
signal_type = config['artifact_detection'].get('signal_type', 'sine')
|
signal_type = config['artifact_detection'].get('signal_type', 'sine')
|
||||||
|
|
||||||
@@ -718,6 +1191,10 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
|
|||||||
time.sleep(startup_delay)
|
time.sleep(startup_delay)
|
||||||
print("Starting recording...")
|
print("Starting recording...")
|
||||||
|
|
||||||
|
use_null_test = signal_type != 'silent' and null_test_config.get('enabled', True)
|
||||||
|
marker_template: Optional[np.ndarray] = None
|
||||||
|
marker_positions: Optional[List[int]] = []
|
||||||
|
|
||||||
if signal_type == 'chirp':
|
if signal_type == 'chirp':
|
||||||
f0 = config['artifact_detection'].get('chirp_f0', 100)
|
f0 = config['artifact_detection'].get('chirp_f0', 100)
|
||||||
f1 = config['artifact_detection'].get('chirp_f1', 8000)
|
f1 = config['artifact_detection'].get('chirp_f1', 8000)
|
||||||
@@ -730,17 +1207,63 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
|
|||||||
channels=channels, device=device_ids[0], blocking=True)
|
channels=channels, device=device_ids[0], blocking=True)
|
||||||
else:
|
else:
|
||||||
tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
|
tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
|
||||||
|
if use_null_test:
|
||||||
|
marker_duration_sec = float(null_test_config.get('marker_duration_sec', 0.05))
|
||||||
|
marker_offset_sec = float(null_test_config.get('marker_first_offset_sec', 0.5))
|
||||||
|
marker_template = generate_sync_marker(
|
||||||
|
sample_rate,
|
||||||
|
duration_sec=marker_duration_sec,
|
||||||
|
f0=float(null_test_config.get('marker_f0', 200.0)),
|
||||||
|
f1=float(null_test_config.get('marker_f1', 16000.0)),
|
||||||
|
amplitude=float(null_test_config.get('marker_amplitude', 0.7)),
|
||||||
|
)
|
||||||
|
marker_len = len(marker_template)
|
||||||
|
pos_start = int(marker_offset_sec * sample_rate)
|
||||||
|
pos_end = len(tone) - int(marker_offset_sec * sample_rate) - marker_len
|
||||||
|
marker_positions = [pos_start, pos_end]
|
||||||
|
tone = tone.copy()
|
||||||
|
for pos in marker_positions:
|
||||||
|
tone[pos:pos + marker_len] = np.clip(
|
||||||
|
tone[pos:pos + marker_len] + marker_template, -1.0, 1.0)
|
||||||
|
print(f" Embedded sync markers at start ({marker_offset_sec:.1f}s) and end "
|
||||||
|
f"({pos_end/sample_rate:.1f}s) "
|
||||||
|
f"(chirp 200→16000 Hz, {marker_duration_sec*1000:.0f} ms)")
|
||||||
recording = play_and_record(tone, sample_rate, device_ids, channels)
|
recording = play_and_record(tone, sample_rate, device_ids, channels)
|
||||||
|
|
||||||
channel_1 = recording[:, 0]
|
channel_1 = recording[:, 0]
|
||||||
channel_2 = recording[:, 1]
|
channel_2 = recording[:, 1]
|
||||||
|
|
||||||
|
if output_dir:
|
||||||
|
wavfile.write(str(output_dir / 'channel_1_loopback_recording.wav'), sample_rate, channel_1.astype(np.float32))
|
||||||
|
wavfile.write(str(output_dir / 'channel_2_dut_recording.wav'), sample_rate, channel_2.astype(np.float32))
|
||||||
|
|
||||||
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
|
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
|
||||||
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
|
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
|
||||||
|
|
||||||
|
null_test_result = None
|
||||||
|
if use_null_test:
|
||||||
|
print("Running null test (align → subtract → residual analysis)...")
|
||||||
|
null_test_result = detect_artifacts_null_test(
|
||||||
|
channel_1, channel_2, sample_rate, null_test_config,
|
||||||
|
marker_template=marker_template,
|
||||||
|
marker_positions=marker_positions if marker_positions else None,
|
||||||
|
)
|
||||||
|
if 'error' not in null_test_result:
|
||||||
|
print(f" Lag: {null_test_result['lag_ms']:.2f} ms "
|
||||||
|
f"[{null_test_result.get('alignment_method', '?')}] | "
|
||||||
|
f"Gain: {null_test_result['gain_factor']:.4f} | "
|
||||||
|
f"Residual RMS: {null_test_result['residual_rms']:.2e} | "
|
||||||
|
f"Glitches: {null_test_result['total_count']}")
|
||||||
|
|
||||||
if save_plots and output_dir:
|
if save_plots and output_dir:
|
||||||
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
|
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
|
||||||
frequency, sample_rate, output_dir)
|
frequency, sample_rate, output_dir)
|
||||||
|
plot_deviation_histogram(artifacts_ch1, artifacts_ch2, output_dir)
|
||||||
|
|
||||||
|
if null_test_result is not None and null_test_result['_residual'] is not None:
|
||||||
|
plot_null_test(null_test_result, sample_rate, output_dir,
|
||||||
|
marker_positions=marker_positions,
|
||||||
|
marker_len_samples=len(marker_template) if marker_template is not None else 0)
|
||||||
|
|
||||||
anomalies_dir = output_dir / 'individual_anomalies'
|
anomalies_dir = output_dir / 'individual_anomalies'
|
||||||
anomalies_dir.mkdir(exist_ok=True)
|
anomalies_dir.mkdir(exist_ok=True)
|
||||||
@@ -759,6 +1282,12 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
|
|||||||
if total_anomaly_plots > 0:
|
if total_anomaly_plots > 0:
|
||||||
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
|
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
|
||||||
|
|
||||||
|
null_test_serializable = None
|
||||||
|
if null_test_result is not None:
|
||||||
|
null_test_serializable = {
|
||||||
|
k: v for k, v in null_test_result.items() if not k.startswith('_')
|
||||||
|
}
|
||||||
|
|
||||||
result = {
|
result = {
|
||||||
'signal_type': signal_type,
|
'signal_type': signal_type,
|
||||||
'duration_sec': float(duration),
|
'duration_sec': float(duration),
|
||||||
@@ -774,6 +1303,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
|
|||||||
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
|
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
|
||||||
'frequency_accuracy': artifacts_ch2['frequency_accuracy']
|
'frequency_accuracy': artifacts_ch2['frequency_accuracy']
|
||||||
},
|
},
|
||||||
|
'null_test': null_test_serializable,
|
||||||
'detector_config': detector_config
|
'detector_config': detector_config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -36,10 +36,9 @@ def main():
|
|||||||
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
||||||
|
|
||||||
results_dir = Path(config['output']['results_dir'])
|
results_dir = Path(config['output']['results_dir'])
|
||||||
results_dir.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
test_output_dir = results_dir / f"{test_id}_artifact_detection"
|
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_artifact_detection"
|
||||||
test_output_dir.mkdir(exist_ok=True)
|
test_output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
save_plots = config['output'].get('save_plots', False)
|
save_plots = config['output'].get('save_plots', False)
|
||||||
|
|
||||||
@@ -134,13 +133,42 @@ def main():
|
|||||||
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
|
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
|
||||||
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
|
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
|
||||||
|
|
||||||
|
nt = result.get('null_test')
|
||||||
|
if nt and nt.get('enabled'):
|
||||||
|
print("\n🔬 NULL TEST (Ch2 DUT vs Ch1 Loopback reference):")
|
||||||
|
print(f" Alignment lag: {nt['lag_ms']:.2f} ms ({nt['lag_samples']} samples)")
|
||||||
|
print(f" Gain factor: {nt['gain_factor']:.4f}")
|
||||||
|
print(f" Residual RMS: {nt['residual_rms']:.2e}")
|
||||||
|
print(f" Residual peak: {nt['residual_peak']:.4f}")
|
||||||
|
print(f" Glitches found: {nt['total_count']}")
|
||||||
|
if nt['by_type']:
|
||||||
|
print(" By type:")
|
||||||
|
for artifact_type, count in nt['by_type'].items():
|
||||||
|
print(f" - {artifact_type}: {count}")
|
||||||
|
if nt['artifacts']:
|
||||||
|
print(" Glitch timestamps:")
|
||||||
|
for a in nt['artifacts'][:20]:
|
||||||
|
if a['type'] == 'null_test_glitch':
|
||||||
|
print(f" {a['time_sec']:.3f}s dur={a['duration_ms']:.1f}ms "
|
||||||
|
f"dev={a['deviation_factor']:.1f}×baseline")
|
||||||
|
elif a['type'] == 'sample_slip':
|
||||||
|
baseline = a.get('lag_baseline', a.get('lag_before', '?'))
|
||||||
|
at = a.get('lag_at_marker', a.get('lag_after', '?'))
|
||||||
|
print(f" {a['time_sec']:.3f}s sample_slip "
|
||||||
|
f"Δ={a['lag_change_samples']:+d} samples "
|
||||||
|
f"(baseline={baseline}, at_marker={at})")
|
||||||
|
if len(nt['artifacts']) > 20:
|
||||||
|
print(f" ... and {len(nt['artifacts']) - 20} more (see YAML)")
|
||||||
|
|
||||||
ch1_count = result['channel_1_loopback']['total_artifacts']
|
ch1_count = result['channel_1_loopback']['total_artifacts']
|
||||||
ch2_count = result['channel_2_dut']['total_artifacts']
|
ch2_count = result['channel_2_dut']['total_artifacts']
|
||||||
|
|
||||||
if ch2_count > ch1_count:
|
if nt and nt.get('enabled') and nt['total_count'] > 0:
|
||||||
|
print(f"\n⚠️ NULL TEST: {nt['total_count']} glitch(es) detected in DUT path residual")
|
||||||
|
elif ch2_count > ch1_count:
|
||||||
delta = ch2_count - ch1_count
|
delta = ch2_count - ch1_count
|
||||||
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
|
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
|
||||||
elif ch1_count == ch2_count == 0:
|
elif ch1_count == ch2_count == 0 and (not nt or nt['total_count'] == 0):
|
||||||
print("\n✅ EXCELLENT: No artifacts detected in either path!")
|
print("\n✅ EXCELLENT: No artifacts detected in either path!")
|
||||||
else:
|
else:
|
||||||
print(f"\nℹ️ Loopback baseline: {ch1_count} artifacts")
|
print(f"\nℹ️ Loopback baseline: {ch1_count} artifacts")
|
||||||
|
|||||||
@@ -26,10 +26,9 @@ def main():
|
|||||||
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
||||||
|
|
||||||
results_dir = Path(config['output']['results_dir'])
|
results_dir = Path(config['output']['results_dir'])
|
||||||
results_dir.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
test_output_dir = results_dir / f"{test_id}_latency"
|
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency"
|
||||||
test_output_dir.mkdir(exist_ok=True)
|
test_output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
save_plots = config['output'].get('save_plots', False)
|
save_plots = config['output'].get('save_plots', False)
|
||||||
|
|
||||||
@@ -47,7 +46,9 @@ def main():
|
|||||||
try:
|
try:
|
||||||
latency_stats = run_latency_test(config, num_measurements=args.measurements,
|
latency_stats = run_latency_test(config, num_measurements=args.measurements,
|
||||||
save_plots=save_plots, output_dir=test_output_dir)
|
save_plots=save_plots, output_dir=test_output_dir)
|
||||||
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, "
|
valid = latency_stats.get('valid', True)
|
||||||
|
status = "PASS" if valid else "FAIL"
|
||||||
|
print(f"{'✓' if valid else '✗'} Latency [{status}]: avg={latency_stats['avg']:.3f}ms, "
|
||||||
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
|
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
|
||||||
f"std={latency_stats['std']:.3f}ms")
|
f"std={latency_stats['std']:.3f}ms")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -263,10 +263,9 @@ def main():
|
|||||||
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
||||||
|
|
||||||
results_dir = Path(config['output']['results_dir'])
|
results_dir = Path(config['output']['results_dir'])
|
||||||
results_dir.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
test_output_dir = results_dir / f"{test_id}_latency_buildup"
|
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency_buildup"
|
||||||
test_output_dir.mkdir(exist_ok=True)
|
test_output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
save_plots = config['output'].get('save_plots', False)
|
save_plots = config['output'].get('save_plots', False)
|
||||||
|
|
||||||
|
|||||||
527
test_matrix.py
Normal file
527
test_matrix.py
Normal file
@@ -0,0 +1,527 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
import copy
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import yaml
|
||||||
|
import requests
|
||||||
|
import numpy as np
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
from src.audio_tests import run_latency_test, run_artifact_detection_test
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Parameter definitions
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
QOS_PROFILES = {
|
||||||
|
'fast': {'number_of_retransmissions': 2, 'max_transport_latency_ms': 22},
|
||||||
|
'robust': {'number_of_retransmissions': 4, 'max_transport_latency_ms': 43},
|
||||||
|
}
|
||||||
|
|
||||||
|
SAMPLE_RATES = {
|
||||||
|
'16k': {'auracast_sampling_rate_hz': 16000, 'octets_per_frame': 40},
|
||||||
|
'24k': {'auracast_sampling_rate_hz': 24000, 'octets_per_frame': 60},
|
||||||
|
'48k': {'auracast_sampling_rate_hz': 48000, 'octets_per_frame': 120},
|
||||||
|
}
|
||||||
|
|
||||||
|
CHANNELS = {
|
||||||
|
'mono': {'num_bis': 1},
|
||||||
|
'stereo': {'num_bis': 2},
|
||||||
|
}
|
||||||
|
|
||||||
|
# PRESENTATION_DELAYS_MS = [10, 20, 40, 80]
|
||||||
|
PRESENTATION_DELAYS_MS = [10]
|
||||||
|
|
||||||
|
API_URL = 'http://beacon29.local:5000/init'
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def build_api_payload(qos_name: str, rate_name: str, channel_name: str, pd_ms: int) -> dict:
|
||||||
|
qos = QOS_PROFILES[qos_name]
|
||||||
|
rate = SAMPLE_RATES[rate_name]
|
||||||
|
ch = CHANNELS[channel_name]
|
||||||
|
return {
|
||||||
|
'qos_config': {
|
||||||
|
'iso_int_multiple_10ms': 1,
|
||||||
|
'number_of_retransmissions': qos['number_of_retransmissions'],
|
||||||
|
'max_transport_latency_ms': qos['max_transport_latency_ms'],
|
||||||
|
},
|
||||||
|
'debug': False,
|
||||||
|
'device_name': 'Auracaster',
|
||||||
|
'transport': '',
|
||||||
|
'auracast_device_address': 'F0:F1:F2:F3:F4:F5',
|
||||||
|
'auracast_sampling_rate_hz': rate['auracast_sampling_rate_hz'],
|
||||||
|
'octets_per_frame': rate['octets_per_frame'],
|
||||||
|
'frame_duration_us': 10000,
|
||||||
|
'presentation_delay_us': pd_ms * 1000,
|
||||||
|
'manufacturer_data': [None, None],
|
||||||
|
'immediate_rendering': False,
|
||||||
|
'assisted_listening_stream': False,
|
||||||
|
'bigs': [{
|
||||||
|
'id': 12,
|
||||||
|
'random_address': 'F1:F1:F2:F3:F4:F5',
|
||||||
|
'language': 'deu',
|
||||||
|
'name': 'Broadcast0',
|
||||||
|
'program_info': 'Vorlesung DE',
|
||||||
|
'audio_source': 'device:ch1',
|
||||||
|
'input_format': 'auto',
|
||||||
|
'loop': True,
|
||||||
|
'precode_wav': False,
|
||||||
|
'iso_que_len': 1,
|
||||||
|
'num_bis': ch['num_bis'],
|
||||||
|
'input_gain_db': 0,
|
||||||
|
}],
|
||||||
|
'analog_gain': 50,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
STOP_URL = 'http://beacon29.local:5000/stop_audio'
|
||||||
|
|
||||||
|
|
||||||
|
def stop_device(timeout: int = 10) -> None:
|
||||||
|
"""POST to stop_audio before reconfiguring. Errors are non-fatal."""
|
||||||
|
try:
|
||||||
|
requests.post(STOP_URL, timeout=timeout,
|
||||||
|
headers={'accept': 'application/json'})
|
||||||
|
except Exception as e:
|
||||||
|
print(f" stop_audio warning: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def configure_device(payload: dict, timeout: int = 15) -> tuple:
|
||||||
|
"""POST the init payload to the device API. Returns (success, response_or_error)."""
|
||||||
|
try:
|
||||||
|
resp = requests.post(API_URL, json=payload, timeout=timeout,
|
||||||
|
headers={'accept': 'application/json',
|
||||||
|
'Content-Type': 'application/json'})
|
||||||
|
resp.raise_for_status()
|
||||||
|
try:
|
||||||
|
return True, resp.json()
|
||||||
|
except Exception:
|
||||||
|
return True, resp.text
|
||||||
|
except Exception as e:
|
||||||
|
return False, str(e)
|
||||||
|
|
||||||
|
|
||||||
|
def run_buildup_check(config: dict, duration_sec: int = 20, interval_sec: int = 1) -> dict:
|
||||||
|
"""
|
||||||
|
Lightweight buildup check: take latency measurements over duration_sec seconds,
|
||||||
|
return analysis dict with 'buildup_detected' bool and stats.
|
||||||
|
"""
|
||||||
|
measurements = []
|
||||||
|
t_end = time.time() + duration_sec
|
||||||
|
|
||||||
|
while time.time() < t_end:
|
||||||
|
try:
|
||||||
|
stats = run_latency_test(config, num_measurements=1, save_plots=False)
|
||||||
|
measurements.append(float(stats['avg']))
|
||||||
|
except Exception as e:
|
||||||
|
print(f" buildup measurement error: {e}")
|
||||||
|
remaining = t_end - time.time()
|
||||||
|
if remaining <= 0:
|
||||||
|
break
|
||||||
|
time.sleep(min(interval_sec, remaining))
|
||||||
|
|
||||||
|
if len(measurements) < 2:
|
||||||
|
return {'buildup_detected': None, 'measurements': measurements,
|
||||||
|
'note': 'insufficient_data'}
|
||||||
|
|
||||||
|
start_l = measurements[0]
|
||||||
|
end_l = measurements[-1]
|
||||||
|
change_ms = end_l - start_l
|
||||||
|
change_pct = (change_ms / start_l * 100.0) if start_l > 0 else 0.0
|
||||||
|
buildup_detected = abs(change_pct) > 5.0
|
||||||
|
|
||||||
|
x = np.arange(len(measurements))
|
||||||
|
y = np.array(measurements)
|
||||||
|
slope = float(np.polyfit(x, y, 1)[0]) if len(measurements) >= 3 else 0.0
|
||||||
|
if slope > 0.01:
|
||||||
|
trend = 'increasing'
|
||||||
|
elif slope < -0.01:
|
||||||
|
trend = 'decreasing'
|
||||||
|
else:
|
||||||
|
trend = 'stable'
|
||||||
|
|
||||||
|
return {
|
||||||
|
'buildup_detected': buildup_detected,
|
||||||
|
'start_latency_ms': round(start_l, 3),
|
||||||
|
'end_latency_ms': round(end_l, 3),
|
||||||
|
'change_ms': round(change_ms, 3),
|
||||||
|
'change_percent': round(change_pct, 2),
|
||||||
|
'trend': trend,
|
||||||
|
'measurements': [round(m, 3) for m in measurements],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def run_quality_check(config: dict, duration_sec: int = 180,
|
||||||
|
output_dir: Path = None) -> dict:
|
||||||
|
"""
|
||||||
|
Run artifact detection for duration_sec seconds.
|
||||||
|
Returns dict with artifacts_per_min and total_artifacts.
|
||||||
|
"""
|
||||||
|
cfg = copy.deepcopy(config)
|
||||||
|
cfg['artifact_detection']['duration'] = float(duration_sec)
|
||||||
|
cfg['artifact_detection']['startup_delay'] = 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = run_artifact_detection_test(
|
||||||
|
cfg,
|
||||||
|
save_plots=output_dir is not None,
|
||||||
|
output_dir=output_dir,
|
||||||
|
)
|
||||||
|
dut = result['channel_2_dut']
|
||||||
|
return {
|
||||||
|
'artifacts_per_min': round(float(dut['artifact_rate_per_minute']), 2),
|
||||||
|
'total_artifacts': int(dut['total_artifacts']),
|
||||||
|
'duration_sec': duration_sec,
|
||||||
|
'artifacts_by_type': dut['artifacts_by_type'],
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
return {'error': str(e), 'artifacts_per_min': None}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# USB recovery helper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _try_usb_audio_reset(config: dict) -> None:
|
||||||
|
"""
|
||||||
|
Try to recover the audio device after an ALSA xrun.
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
1. Reinitialize PortAudio (Pa_Terminate + Pa_Initialize) — no root needed,
|
||||||
|
closes all ALSA handles and reopens them cleanly.
|
||||||
|
2. If that fails, attempt a USB-level reset via USBDEVFS_RESET ioctl.
|
||||||
|
Requires either root or membership in the 'plugdev' group:
|
||||||
|
sudo usermod -aG plugdev $USER (then re-login)
|
||||||
|
3. Always finish with a 3 s settle sleep.
|
||||||
|
"""
|
||||||
|
import fcntl
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sounddevice as _sd
|
||||||
|
|
||||||
|
USBDEVFS_RESET = 0x5514
|
||||||
|
|
||||||
|
# Stop any active sounddevice stream first
|
||||||
|
try:
|
||||||
|
_sd.stop()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# USB-level reset via ioctl (equivalent to replug)
|
||||||
|
device_name = config['audio'].get('device_name', 'Scarlett')
|
||||||
|
try:
|
||||||
|
with open('/proc/asound/cards') as f:
|
||||||
|
cards_text = f.read()
|
||||||
|
|
||||||
|
card_num = None
|
||||||
|
for line in cards_text.splitlines():
|
||||||
|
if device_name.lower() in line.lower():
|
||||||
|
m = re.match(r'\s*(\d+)', line)
|
||||||
|
if m:
|
||||||
|
card_num = m.group(1)
|
||||||
|
break
|
||||||
|
|
||||||
|
if card_num is not None:
|
||||||
|
card_sysfs = f'/sys/class/sound/card{card_num}'
|
||||||
|
real_path = Path(os.path.realpath(card_sysfs))
|
||||||
|
usb_dev_path = None
|
||||||
|
for parent in real_path.parents:
|
||||||
|
if (parent / 'idVendor').exists():
|
||||||
|
usb_dev_path = parent
|
||||||
|
break
|
||||||
|
|
||||||
|
if usb_dev_path is not None:
|
||||||
|
bus_num = int((usb_dev_path / 'busnum').read_text().strip())
|
||||||
|
dev_num = int((usb_dev_path / 'devnum').read_text().strip())
|
||||||
|
dev_file = f'/dev/bus/usb/{bus_num:03d}/{dev_num:03d}'
|
||||||
|
with open(dev_file, 'wb') as f:
|
||||||
|
fcntl.ioctl(f, USBDEVFS_RESET, 0)
|
||||||
|
print(f" Recovery: USB reset of {dev_file} OK")
|
||||||
|
|
||||||
|
except PermissionError as e:
|
||||||
|
print(f" Recovery: USB reset skipped (permission denied — "
|
||||||
|
f"add yourself to plugdev: sudo usermod -aG plugdev $USER)")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Recovery: USB reset skipped ({e})")
|
||||||
|
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Main
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Run matrix test across all QoS/rate/channel/delay combinations')
|
||||||
|
parser.add_argument('--serial-number', required=True,
|
||||||
|
help='Serial number (e.g. SN001234)')
|
||||||
|
parser.add_argument('--software-version', required=True,
|
||||||
|
help='Software version / git commit hash')
|
||||||
|
parser.add_argument('--comment', default='',
|
||||||
|
help='Free-text comment for this test run')
|
||||||
|
parser.add_argument('--config', default='config.yaml',
|
||||||
|
help='Path to config file')
|
||||||
|
parser.add_argument('--measurements', type=int, default=5,
|
||||||
|
help='Latency measurements per combination (default: 5)')
|
||||||
|
parser.add_argument('--settle-time', type=int, default=5,
|
||||||
|
help='Seconds to wait after API call before measuring (default: 15)')
|
||||||
|
parser.add_argument('--buildup', action='store_true',
|
||||||
|
help='Run 20 s buildup test per combination')
|
||||||
|
parser.add_argument('--quality', action='store_true',
|
||||||
|
help='Run 3 min quality/artifact test per combination')
|
||||||
|
parser.add_argument('--quality-duration', type=int, default=180,
|
||||||
|
help='Quality test duration in seconds (default: 180)')
|
||||||
|
parser.add_argument('--dry-run', action='store_true',
|
||||||
|
help='Skip API calls and audio measurements (for testing the script)')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
with open(args.config, 'r') as f:
|
||||||
|
config = yaml.safe_load(f)
|
||||||
|
|
||||||
|
timestamp = datetime.now()
|
||||||
|
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
||||||
|
|
||||||
|
results_dir = Path(config['output']['results_dir'])
|
||||||
|
test_output_dir = (results_dir
|
||||||
|
/ timestamp.strftime('%Y')
|
||||||
|
/ timestamp.strftime('%m')
|
||||||
|
/ timestamp.strftime('%d')
|
||||||
|
/ f"{test_id}_matrix")
|
||||||
|
test_output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# All combinations in the specified order
|
||||||
|
combos = [
|
||||||
|
(qos, rate, ch, pd)
|
||||||
|
for qos in ['fast', 'robust']
|
||||||
|
for rate in ['16k', '24k', '48k']
|
||||||
|
for ch in ['mono', 'stereo']
|
||||||
|
for pd in PRESENTATION_DELAYS_MS
|
||||||
|
]
|
||||||
|
|
||||||
|
total = len(combos)
|
||||||
|
print("=" * 70)
|
||||||
|
print("MATRIX TEST")
|
||||||
|
print("=" * 70)
|
||||||
|
print(f"Test ID: {test_id}")
|
||||||
|
print(f"Serial Number: {args.serial_number}")
|
||||||
|
print(f"Software: {args.software_version}")
|
||||||
|
if args.comment:
|
||||||
|
print(f"Comment: {args.comment}")
|
||||||
|
print(f"Combinations: {total}")
|
||||||
|
print(f"Measurements/combo: {args.measurements}")
|
||||||
|
print(f"Settle time: {args.settle_time} s")
|
||||||
|
print(f"Buildup test: {'yes (20 s)' if args.buildup else 'no'}")
|
||||||
|
print(f"Quality test: {'yes (' + str(args.quality_duration) + ' s)' if args.quality else 'no'}")
|
||||||
|
if args.dry_run:
|
||||||
|
print("DRY RUN MODE - no API calls or audio measurements")
|
||||||
|
print("=" * 70)
|
||||||
|
|
||||||
|
def run_combo(qos, rate, ch, pd):
|
||||||
|
"""Run a single combination and return its result dict."""
|
||||||
|
payload = build_api_payload(qos, rate, ch, pd)
|
||||||
|
result = {
|
||||||
|
'qos': qos,
|
||||||
|
'sample_rate': rate,
|
||||||
|
'channels': ch,
|
||||||
|
'presentation_delay_ms': pd,
|
||||||
|
'api_payload': payload,
|
||||||
|
'api_success': None,
|
||||||
|
'latency': None,
|
||||||
|
'buildup': None,
|
||||||
|
'quality': None,
|
||||||
|
}
|
||||||
|
|
||||||
|
if not args.dry_run:
|
||||||
|
stop_device()
|
||||||
|
ok, api_resp = configure_device(payload)
|
||||||
|
result['api_success'] = ok
|
||||||
|
result['api_response'] = api_resp if not ok else str(api_resp)
|
||||||
|
|
||||||
|
if not ok:
|
||||||
|
print(f" API FAILED: {api_resp}")
|
||||||
|
result['latency'] = {'error': f'API failed: {api_resp}', 'valid': False,
|
||||||
|
'avg': None}
|
||||||
|
return result
|
||||||
|
|
||||||
|
print(f" API OK -> settling {args.settle_time} s...")
|
||||||
|
time.sleep(args.settle_time)
|
||||||
|
else:
|
||||||
|
result['api_success'] = True
|
||||||
|
|
||||||
|
if not args.dry_run:
|
||||||
|
try:
|
||||||
|
lat = run_latency_test(config, num_measurements=args.measurements,
|
||||||
|
save_plots=False)
|
||||||
|
result['latency'] = {
|
||||||
|
'avg': round(float(lat['avg']), 3),
|
||||||
|
'min': round(float(lat['min']), 3),
|
||||||
|
'max': round(float(lat['max']), 3),
|
||||||
|
'std': round(float(lat['std']), 3),
|
||||||
|
'valid': bool(lat.get('valid', True)),
|
||||||
|
}
|
||||||
|
status = "PASS" if result['latency']['valid'] else "FAIL"
|
||||||
|
print(f" Latency [{status}]: avg={lat['avg']:.1f} ms "
|
||||||
|
f"std={lat['std']:.2f} ms")
|
||||||
|
except Exception as e:
|
||||||
|
result['latency'] = {'error': str(e), 'valid': False, 'avg': None}
|
||||||
|
print(f" Latency ERROR: {e}")
|
||||||
|
|
||||||
|
if not result['latency'].get('valid', False):
|
||||||
|
print(" Latency invalid — attempting USB recovery, skipping buildup/quality.")
|
||||||
|
result['latency']['alsa_error'] = True
|
||||||
|
_try_usb_audio_reset(config)
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
import random
|
||||||
|
avg = pd + random.uniform(-1, 1)
|
||||||
|
result['latency'] = {'avg': round(avg, 3), 'min': round(avg - 0.5, 3),
|
||||||
|
'max': round(avg + 0.5, 3), 'std': 0.2, 'valid': True}
|
||||||
|
|
||||||
|
if args.buildup:
|
||||||
|
if not args.dry_run:
|
||||||
|
print(f" Buildup check (20 s)...")
|
||||||
|
buildup = run_buildup_check(config, duration_sec=20, interval_sec=1)
|
||||||
|
result['buildup'] = buildup
|
||||||
|
bd = buildup.get('buildup_detected')
|
||||||
|
print(f" Buildup: {'YES ⚠' if bd else ('NO' if bd is False else 'N/A')}")
|
||||||
|
else:
|
||||||
|
result['buildup'] = {'buildup_detected': False, 'note': 'dry_run'}
|
||||||
|
|
||||||
|
if args.quality:
|
||||||
|
if not args.dry_run:
|
||||||
|
print(f" Quality test ({args.quality_duration} s)...")
|
||||||
|
combo_plot_dir = test_output_dir / f"{qos}_{rate}_{ch}_{pd}ms"
|
||||||
|
combo_plot_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
quality = run_quality_check(config, duration_sec=args.quality_duration,
|
||||||
|
output_dir=combo_plot_dir)
|
||||||
|
result['quality'] = quality
|
||||||
|
apm = quality.get('artifacts_per_min')
|
||||||
|
print(f" Quality: {f'{apm:.1f} artifacts/min' if apm is not None else 'ERROR'}")
|
||||||
|
else:
|
||||||
|
result['quality'] = {'artifacts_per_min': 0.5, 'total_artifacts': 1,
|
||||||
|
'note': 'dry_run'}
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
matrix_results = {}
|
||||||
|
|
||||||
|
for idx, (qos, rate, ch, pd) in enumerate(combos, 1):
|
||||||
|
key = f"{qos}_{rate}_{ch}_{pd}ms"
|
||||||
|
print(f"\n[{idx:2d}/{total}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms")
|
||||||
|
matrix_results[key] = run_combo(qos, rate, ch, pd)
|
||||||
|
|
||||||
|
# --- Retry failed combinations ---
|
||||||
|
# ALSA/hardware failures always retry (up to 3 times) regardless of threshold.
|
||||||
|
# Other failures retry only if the failure rate is <= 10%.
|
||||||
|
def _is_failed(r):
|
||||||
|
lat = r.get('latency')
|
||||||
|
return lat is None or lat.get('valid') is False
|
||||||
|
|
||||||
|
def _is_alsa_failure(r):
|
||||||
|
lat = r.get('latency') or {}
|
||||||
|
return lat.get('alsa_error', False)
|
||||||
|
|
||||||
|
MAX_RETRIES = 3
|
||||||
|
for retry_round in range(1, MAX_RETRIES + 1):
|
||||||
|
failed_keys = [k for k, r in matrix_results.items() if _is_failed(r)]
|
||||||
|
if not failed_keys:
|
||||||
|
break
|
||||||
|
|
||||||
|
alsa_keys = [k for k in failed_keys if _is_alsa_failure(matrix_results[k])]
|
||||||
|
other_keys = [k for k in failed_keys if k not in alsa_keys]
|
||||||
|
retry_threshold = total * 0.10
|
||||||
|
|
||||||
|
keys_to_retry = list(alsa_keys)
|
||||||
|
if 0 < len(other_keys) <= retry_threshold:
|
||||||
|
keys_to_retry += other_keys
|
||||||
|
elif len(other_keys) > retry_threshold:
|
||||||
|
print(f"\n{len(other_keys)}/{total} non-hardware failures "
|
||||||
|
f"({len(other_keys)/total*100:.0f}%) — above 10% threshold, skipping retry.")
|
||||||
|
|
||||||
|
if not keys_to_retry:
|
||||||
|
break
|
||||||
|
|
||||||
|
n_other_retrying = len(keys_to_retry) - len(alsa_keys)
|
||||||
|
print(f"\n{'=' * 70}")
|
||||||
|
print(f"RETRY ROUND {retry_round}/{MAX_RETRIES} — "
|
||||||
|
f"{len(keys_to_retry)} combo(s) "
|
||||||
|
f"[{len(alsa_keys)} hw-error, {n_other_retrying} other]")
|
||||||
|
print(f"{'=' * 70}")
|
||||||
|
for retry_idx, key in enumerate(keys_to_retry, 1):
|
||||||
|
r = matrix_results[key]
|
||||||
|
qos, rate, ch, pd = r['qos'], r['sample_rate'], r['channels'], r['presentation_delay_ms']
|
||||||
|
print(f"\n[retry {retry_round}.{retry_idx}/{len(keys_to_retry)}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms")
|
||||||
|
matrix_results[key] = run_combo(qos, rate, ch, pd)
|
||||||
|
matrix_results[key]['retried'] = True
|
||||||
|
|
||||||
|
# --- Save results ---
|
||||||
|
output_data = {
|
||||||
|
'metadata': {
|
||||||
|
'test_id': test_id,
|
||||||
|
'timestamp': timestamp.isoformat(),
|
||||||
|
'serial_number': args.serial_number,
|
||||||
|
'software_version': args.software_version,
|
||||||
|
'comment': args.comment,
|
||||||
|
'options': {
|
||||||
|
'measurements_per_combo': args.measurements,
|
||||||
|
'settle_time_sec': args.settle_time,
|
||||||
|
'buildup_enabled': args.buildup,
|
||||||
|
'quality_enabled': args.quality,
|
||||||
|
'quality_duration_sec': args.quality_duration if args.quality else None,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'matrix_results': matrix_results,
|
||||||
|
}
|
||||||
|
|
||||||
|
output_file = test_output_dir / f"{test_id}_matrix_results.yaml"
|
||||||
|
with open(output_file, 'w') as f:
|
||||||
|
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
||||||
|
|
||||||
|
# --- Auto-generate table image ---
|
||||||
|
try:
|
||||||
|
from plot_matrix import build_table
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
show_buildup = any(r.get('buildup') is not None for r in matrix_results.values())
|
||||||
|
show_quality = any(r.get('quality') is not None for r in matrix_results.values())
|
||||||
|
fig = build_table(
|
||||||
|
matrix_results=matrix_results,
|
||||||
|
baseline_results=None,
|
||||||
|
metadata=output_data['metadata'],
|
||||||
|
baseline_metadata=None,
|
||||||
|
show_buildup=show_buildup,
|
||||||
|
show_quality=show_quality,
|
||||||
|
)
|
||||||
|
plot_file = test_output_dir / f"{test_id}_matrix_results_table.png"
|
||||||
|
fig.savefig(plot_file, dpi=150, bbox_inches='tight',
|
||||||
|
facecolor='white', edgecolor='none')
|
||||||
|
plt.close(fig)
|
||||||
|
plot_file_path = plot_file
|
||||||
|
print(f"Table image saved to: {plot_file}")
|
||||||
|
except Exception as e:
|
||||||
|
plot_file_path = None
|
||||||
|
print(f"Warning: could not auto-generate table image: {e}")
|
||||||
|
|
||||||
|
# --- Summary ---
|
||||||
|
passed = sum(1 for r in matrix_results.values()
|
||||||
|
if r.get('latency') and r['latency'].get('valid', False))
|
||||||
|
failed = total - passed
|
||||||
|
print("\n" + "=" * 70)
|
||||||
|
print(f"MATRIX TEST COMPLETE | PASS: {passed} FAIL: {failed} Total: {total}")
|
||||||
|
print(f"Results: {output_file}")
|
||||||
|
if plot_file_path:
|
||||||
|
print(f"Table: {plot_file_path.resolve()}")
|
||||||
|
print(f"Re-plot: python plot_matrix.py {output_file}")
|
||||||
|
print("=" * 70)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@@ -88,7 +88,7 @@ def display_results(yaml_file: Path):
|
|||||||
|
|
||||||
|
|
||||||
def list_all_results(results_dir: Path):
|
def list_all_results(results_dir: Path):
|
||||||
yaml_files = sorted(results_dir.glob("*_results.yaml"))
|
yaml_files = sorted(results_dir.rglob("*_results.yaml"))
|
||||||
|
|
||||||
if not yaml_files:
|
if not yaml_files:
|
||||||
print("No test results found.")
|
print("No test results found.")
|
||||||
|
|||||||
Reference in New Issue
Block a user