Compare commits

...

8 Commits

Author SHA1 Message Date
530d58440f Adds null test to artifact. 2026-04-24 08:39:03 +02:00
8f44cf56d4 Deviation hist. 2026-04-21 16:03:46 +02:00
fdfe43b6b9 Retries on alsa error, coming from the audiointerface. 2026-04-21 15:07:31 +02:00
Pbopbo
cc8766b278 Matrix test. 2026-04-09 09:47:13 +02:00
Pbopbo
0c7de92ae9 Refactoring to sort results in year/month/day forlder. 2026-04-08 11:10:12 +02:00
Pbopbo
dd118ddb23 Adds check if test is valid to latency test. 2026-04-08 10:27:22 +02:00
Pbopbo
31cc2c0e92 more plotting 2026-04-07 16:54:05 +02:00
Pbopbo
2cab55c8cd Plotting for write read time and avail logs. 2026-03-31 11:55:44 +02:00
13 changed files with 2125 additions and 33 deletions

View File

@@ -52,3 +52,72 @@ Again input it into the audio interface and measure both loopback and radio path
============
Implement Matrix test
Test:
Fast / Robust
16k / 24k / 48k
Mono / Stereo
Presentation Delay 10 / 20 / 40 / 80
For each combination test:
Latency
Latency buildup yes/no
Maybe: Audio quality BUT this way test gets really long.
Plot a table with the results, also compare to 'baseline' measurement.
Use the existing tests as a guideline how to save the results.
For setting the parameters for the tests use the API:
http://beacon29.local:5000/init
curl -X 'POST' \ 'http://beacon29.local:5000/init' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "qos_config": { "iso_int_multiple_10ms": 1, "number_of_retransmissions": 2, "max_transport_latency_ms": 23 }, "debug": false, "device_name": "Auracaster", "transport": "", "auracast_device_address": "F0:F1:F2:F3:F4:F5", "auracast_sampling_rate_hz": 16000, "octets_per_frame": 160, "frame_duration_us": 10000, "presentation_delay_us": 10000, "manufacturer_data": [ null, null ], "immediate_rendering": false, "assisted_listening_stream": false, "bigs": [ { "id": 12, "random_address": "F1:F1:F2:F3:F4:F5", "language": "deu", "name": "Broadcast0", "program_info": "Vorlesung DE", "audio_source": "device:ch1", "input_format": "auto", "loop": true, "precode_wav": false, "iso_que_len": 1, "num_bis": 1, "input_gain_db": 0 } ], "analog_gain": 50 }'
It has to have the name Broadcast0.
qos fast is "number_of_retransmissions": 2, "max_transport_latency_ms": 23
qos robust is "number_of_retransmissions": 4, "max_transport_latency_ms": 43
Mono is "num_bis": 1
Stereo is "num_bis": 2
16k is "auracast_sampling_rate_hz": 16000, "octets_per_frame": 40
24k is "auracast_sampling_rate_hz": 24000, "octets_per_frame": 60
48k is "auracast_sampling_rate_hz": 48000, "octets_per_frame": 120
The results shall be plotted as a table:
Presentation delay 10 / 20 / 40 /80
Mono Stereo Mono Stereo Mono Stereo ...
x
Fast 16k
Fast 24k
Fast 48k
Robust 16k
Robust 24k
Robust 48k
For each combination you have to run the latency test. If the test fails print fail. Else print the ms value.
Optional: Also run the build up test for 20 secs. As a result just print if there is a buildup or not.
Optional: Also run the quality test for 3 min per combination and display the err/min.
The result shall be saved as a yaml (like in all the other scripts).
Important to save the API call aswell.
And create an image with the table.
There should be a feature to compare this measurement to a 'baseline' measurement.
Failed tests should be colored red.
Tests significantly worse than the baseline in orange.
And better values in green.
No change should be just white.

View File

@@ -31,13 +31,38 @@ artifact_detection:
threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive) threshold_db: -60 # Detect unexpected frequencies above noise floor + this threshold (more negative = less sensitive)
amplitude_spikes: amplitude_spikes:
enabled: true enabled: true
threshold_factor: 5.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive. threshold_factor: 10.0 # MAD-based outlier detection on envelope (detects clicks, pops, dropouts). Lower = more sensitive.
zero_crossing: zero_crossing:
enabled: false enabled: false
threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion) threshold_factor: 2.0 # Number of standard deviations for zero-crossing anomalies (detects distortion)
energy_variation: energy_variation:
enabled: true enabled: true
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes) threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
null_test:
enabled: true
# Align Ch2 (DUT) to Ch1 (Loopback), subtract, detect bursts in residual
max_lag_ms: 500.0 # Maximum expected delay between channels for alignment search
window_ms: 5.0 # Short-time RMS window length for burst detection
threshold_factor: 6.0 # Flag windows where residual RMS exceeds baseline × this factor
min_burst_ms: 0.5 # Minimum burst duration to report (filters out single-sample spikes)
sample_slip_detection: true
sample_slip_window_ms: 50.0 # Correlation window for xcorr-based lag tracking (fallback only)
# Sync marker: one chirp burst at the start and one at the end of the played signal.
# Marker-based alignment is immune to periodic-signal ambiguity (e.g. pure sine).
# Sample slip detection compares the lag at the start marker vs. the end marker.
marker_duration_sec: 0.05 # Length of each chirp marker burst
marker_first_offset_sec: 0.5 # Offset from signal start (and from signal end) for markers
marker_f0: 200.0 # Marker chirp start frequency (Hz)
marker_f1: 16000.0 # Marker chirp end frequency (Hz) — wider BW = sharper correlation peak
marker_amplitude: 0.7 # Marker amplitude (mixed on top of test tone)
# Sample slip threshold: only report if inter-marker lag deviates from median by >= this many samples.
# Rule of thumb: peak timing precision ≈ 1 / (marker_f1 - marker_f0) * sample_rate
# With 200-16000 Hz BW at 44100 Hz: precision ≈ 3 samples → min_slip_samples = 5 gives good margin.
min_slip_samples: 5
latency:
max_std_dev_ms: 1.0 # Maximum allowed std deviation; test fails if exceeded
min_avg_ms: 1.0 # Minimum expected average latency; near-zero indicates bad loopback
latency_buildup: latency_buildup:
measurement_interval: 10 # seconds between latency measurements measurement_interval: 10 # seconds between latency measurements

92
plot_alsa_status.py Normal file
View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""Parse ALSA status log file and plot avail value over time."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
def parse_log(log_path):
timestamps = []
avail_values = []
with open(log_path, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
# Check for timestamp line
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
# Check for avail line (only if we have a timestamp)
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
timestamps.append(current_timestamp)
avail_values.append(int(avail_match.group(1)))
current_timestamp = None # Reset until next timestamp
if not timestamps:
print("No valid timestamp/avail pairs found in the log file.", file=sys.stderr)
sys.exit(1)
# Convert to relative seconds from first timestamp
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, avail_values
def plot(seconds, avail_values, out_path):
plt.figure(figsize=(12, 6))
plt.plot(seconds, avail_values, label="avail", linewidth=1, alpha=0.7)
# Add moving average (windowed mean)
if len(avail_values) >= 10: # Only if we have enough data points
window_size = min(50, len(avail_values) // 10) # Adaptive window size
import numpy as np
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
# Adjust timestamps for the moving average (they align with window centers)
ma_seconds = seconds[window_size-1:]
plt.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})", linewidth=2)
plt.xlabel("Time (s)")
plt.ylabel("Available samples")
plt.title("ALSA Available Samples Over Time")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
print(f"Plot saved to {out_path}")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <path_to_alsa_status_log>", file=sys.stderr)
sys.exit(1)
log_path = sys.argv[1]
if not os.path.isfile(log_path):
print(f"File not found: {log_path}", file=sys.stderr)
sys.exit(1)
seconds, avail_values = parse_log(log_path)
log_dir = os.path.dirname(os.path.abspath(log_path))
log_base = os.path.splitext(os.path.basename(log_path))[0]
out_path = os.path.join(log_dir, f"{log_base}_avail_plot.png")
plot(seconds, avail_values, out_path)
if __name__ == "__main__":
main()

302
plot_combined.py Normal file
View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""Combine ALSA avail, perf metrics, and latency plots into one figure."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
# Regex patterns
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
PERF_RE = re.compile(
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
r".*?sample mean=([\d.]+)ms"
r".*?write mean=([\d.]+)ms"
r".*?loop mean=([\d.]+)ms"
)
LATENCY_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*latency.*?(\d+(?:\.\d+)?)ms")
PYALSA_AVAIL_BEFORE_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: avail before read: (\d+)")
PYALSA_AVAIL_AFTER_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: .* avail=(\d+)")
def parse_alsa_status(log_path):
timestamps = []
avail_values = []
with open(log_path, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
timestamps.append(current_timestamp)
avail_values.append(int(avail_match.group(1)))
current_timestamp = None
if not timestamps:
return [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, avail_values
def parse_perf_log(log_path):
timestamps = []
sample_means = []
write_means = []
loop_means = []
with open(log_path, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str, sample, write, loop = m.groups()
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
timestamps.append(ts)
sample_means.append(float(sample))
write_means.append(float(write))
loop_means.append(float(loop))
if not timestamps:
return [], [], [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, sample_means, write_means, loop_means
def parse_pyalsa_avail(perf_file):
"""Parse PyALSA avail before/after read from the perf log file."""
before_timestamps = []
before_values = []
after_timestamps = []
after_values = []
with open(perf_file, "r") as f:
for line in f:
line = line.strip()
# Check for "avail before read"
before_match = PYALSA_AVAIL_BEFORE_RE.match(line)
if before_match:
ts_str, avail = before_match.groups()
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
before_timestamps.append(ts)
before_values.append(int(avail))
continue
# Check for "avail=" (after read)
after_match = PYALSA_AVAIL_AFTER_RE.match(line)
if after_match:
ts_str, avail = after_match.groups()
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
after_timestamps.append(ts)
after_values.append(int(avail))
return before_timestamps, before_values, after_timestamps, after_values
def parse_latency_yaml(yaml_path):
import yaml
with open(yaml_path, 'r') as f:
data = yaml.safe_load(f)
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
timestamps = []
latencies = []
for measurement in latency_measurements:
ts_str = measurement['timestamp']
latency = measurement['latency_ms']
# Parse ISO format timestamp
ts = datetime.fromisoformat(ts_str)
timestamps.append(ts)
latencies.append(float(latency))
if not timestamps:
return [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, latencies
def plot_combined(alsa_file, perf_file, latency_file, out_path):
# Parse all logs
alsa_seconds, avail_values = parse_alsa_status(alsa_file)
perf_seconds, sample_means, write_means, loop_means = parse_perf_log(perf_file)
latency_seconds, latencies = parse_latency_yaml(latency_file)
# Parse PyALSA avail data
before_timestamps, before_values, after_timestamps, after_values = parse_pyalsa_avail(perf_file)
# Get absolute timestamps for proper alignment
alsa_timestamps = []
perf_timestamps = []
latency_timestamps = []
# Re-parse to get absolute timestamps for alignment
with open(alsa_file, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
alsa_timestamps.append(current_timestamp)
current_timestamp = None
with open(perf_file, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str = m.group(1)
# Add current year to the timestamp since it doesn't include year
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
perf_timestamps.append(ts)
import yaml
with open(latency_file, 'r') as f:
data = yaml.safe_load(f)
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
for measurement in latency_measurements:
ts_str = measurement['timestamp']
ts = datetime.fromisoformat(ts_str)
latency_timestamps.append(ts)
# Find earliest timestamp
all_abs_timestamps = []
if alsa_timestamps:
all_abs_timestamps.extend(alsa_timestamps)
if perf_timestamps:
all_abs_timestamps.extend(perf_timestamps)
if latency_timestamps:
all_abs_timestamps.extend(latency_timestamps)
if before_timestamps:
all_abs_timestamps.extend(before_timestamps)
if after_timestamps:
all_abs_timestamps.extend(after_timestamps)
t0_absolute = min(all_abs_timestamps)
# Convert all times to seconds from earliest timestamp
alsa_aligned = [(ts - t0_absolute).total_seconds() for ts in alsa_timestamps] if alsa_timestamps else []
perf_aligned = [(ts - t0_absolute).total_seconds() for ts in perf_timestamps] if perf_timestamps else []
latency_aligned = [(ts - t0_absolute).total_seconds() for ts in latency_timestamps] if latency_timestamps else []
before_aligned = [(ts - t0_absolute).total_seconds() for ts in before_timestamps] if before_timestamps else []
after_aligned = [(ts - t0_absolute).total_seconds() for ts in after_timestamps] if after_timestamps else []
# Create figure with 4 subplots sharing x-axis
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(14, 12), sharex=True)
fig.suptitle("Combined Audio Performance Metrics", fontsize=16)
# Plot 1: ALSA avail
if alsa_aligned and avail_values:
ax1.plot(alsa_aligned, avail_values, label="avail", linewidth=1, alpha=0.7, color='blue')
if len(avail_values) >= 10:
window_size = min(50, len(avail_values) // 10)
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
ma_seconds = alsa_aligned[window_size-1:]
ax1.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})",
linewidth=2, color='darkblue')
ax1.set_ylabel("Available samples")
ax1.set_title("ALSA Available Samples")
ax1.legend()
ax1.grid(True, alpha=0.3)
# Plot 2: Perf metrics
if perf_aligned:
ax2.plot(perf_aligned, sample_means, label="sample mean", linewidth=1, alpha=0.8, color='green')
ax2.plot(perf_aligned, write_means, label="write mean", linewidth=1, alpha=0.8, color='orange')
ax2.plot(perf_aligned, loop_means, label="loop mean", linewidth=1, alpha=0.8, color='red')
# Add moving average for loop mean
if len(loop_means) >= 10:
window_size = min(50, len(loop_means) // 10)
moving_avg = np.convolve(loop_means, np.ones(window_size)/window_size, mode='valid')
ma_seconds = perf_aligned[window_size-1:]
ax2.plot(ma_seconds, moving_avg, label=f"loop mean moving avg (window={window_size})",
linewidth=2, color='darkred', alpha=0.9)
ax2.set_ylabel("Duration (ms)")
ax2.set_title("Performance Metrics")
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: Latency
if latency_aligned:
ax3.plot(latency_aligned, latencies, label="latency", linewidth=1, color='purple')
ax3.set_ylabel("Latency (ms)")
ax3.set_title("Latency Buildup")
ax3.legend()
ax3.grid(True, alpha=0.3)
# Plot 4: PyALSA avail before/after read
if before_aligned and before_values:
ax4.plot(before_aligned, before_values, label="avail before read", linewidth=1, alpha=0.7, color='cyan')
if after_aligned and after_values:
ax4.plot(after_aligned, after_values, label="avail after read", linewidth=1, alpha=0.7, color='magenta')
ax4.set_xlabel("Time (s)")
ax4.set_ylabel("Available samples")
ax4.set_title("PyALSA Available Samples (Before/After Read)")
if before_aligned or after_aligned:
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(out_path, dpi=150, bbox_inches='tight')
print(f"Combined plot saved to {out_path}")
# Show interactive plot
plt.show()
def main():
if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} <alsa_status.log> <perf_log.log> <latency_results.yaml>", file=sys.stderr)
sys.exit(1)
alsa_file = sys.argv[1]
perf_file = sys.argv[2]
latency_file = sys.argv[3]
for file_path in [alsa_file, perf_file, latency_file]:
if not os.path.isfile(file_path):
print(f"File not found: {file_path}", file=sys.stderr)
sys.exit(1)
# Determine output path (same directory as first file)
log_dir = os.path.dirname(os.path.abspath(alsa_file))
out_path = os.path.join(log_dir, "combined_audio_plot.png")
plot_combined(alsa_file, perf_file, latency_file, out_path)
if __name__ == "__main__":
main()

437
plot_matrix.py Normal file
View File

@@ -0,0 +1,437 @@
#!/usr/bin/env python3
"""
Plot a results table image from a matrix test YAML file.
Usage:
python plot_matrix.py <results.yaml>
python plot_matrix.py <results.yaml> --baseline <baseline.yaml>
python plot_matrix.py <results.yaml> --baseline <baseline.yaml> --output table.png
"""
import argparse
import sys
from typing import Optional
import yaml
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from pathlib import Path
from datetime import datetime
# ---------------------------------------------------------------------------
# Matrix layout constants
# ---------------------------------------------------------------------------
QOS_RATES = [
('fast', '16k'),
('fast', '24k'),
('fast', '48k'),
('robust', '16k'),
('robust', '24k'),
('robust', '48k'),
]
CHANNELS = ['mono', 'stereo']
PRESENTATION_DELAYS_MS = [10, 20, 40, 80]
# ---------------------------------------------------------------------------
# Colour helpers
# ---------------------------------------------------------------------------
COLOR_FAIL = '#FF4444' # red
COLOR_WORSE = '#FFA500' # orange
COLOR_BETTER = '#66BB6A' # green
COLOR_NEUTRAL = '#FFFFFF' # white
COLOR_MISSING = '#DDDDDD' # light grey not run / no data
COLOR_HEADER = '#263238' # dark blue-grey header
COLOR_SUBHDR = '#455A64' # secondary header
COLOR_ROW_EVEN = '#FAFAFA'
COLOR_ROW_ODD = '#F0F4F8'
COLOR_HEADER_TEXT = '#FFFFFF'
def _latency_ok(lat: Optional[dict]) -> bool:
if lat is None:
return False
if lat.get('error'):
return False
if lat.get('valid') is False:
return False
return lat.get('avg') is not None
def _cell_color(result: dict, baseline_result: Optional[dict],
worse_threshold_pct: float = 10.0,
better_threshold_pct: float = 5.0) -> str:
"""Return a hex colour for the cell."""
lat = result.get('latency')
if not _latency_ok(lat):
return COLOR_FAIL
if baseline_result is None:
return COLOR_NEUTRAL
base_lat = baseline_result.get('latency')
if not _latency_ok(base_lat):
return COLOR_NEUTRAL
current_avg = lat['avg']
base_avg = base_lat['avg']
if base_avg == 0:
return COLOR_NEUTRAL
diff_pct = (current_avg - base_avg) / base_avg * 100.0
if diff_pct > worse_threshold_pct:
return COLOR_WORSE
if diff_pct < -better_threshold_pct:
return COLOR_BETTER
return COLOR_NEUTRAL
def _cell_text(result: dict, show_buildup: bool, show_quality: bool) -> list:
"""Return list of text lines for a cell."""
lat = result.get('latency')
lines = []
if not _latency_ok(lat):
err = lat.get('error', 'FAIL') if lat else 'NO DATA'
short = err[:20] if len(err) > 20 else err
lines.append('FAIL')
if short and short != 'FAIL':
lines.append(short)
return lines
lines.append(f"{lat['avg']:.1f} ms")
if show_buildup:
bd = result.get('buildup')
if bd is not None:
detected = bd.get('buildup_detected')
if detected is True:
lines.append('buildup: YES')
elif detected is False:
lines.append('buildup: no')
else:
lines.append('buildup: n/a')
if show_quality:
q = result.get('quality')
if q is not None:
apm = q.get('artifacts_per_min')
if apm is not None:
lines.append(f"{apm:.1f} art/min")
else:
lines.append('quality: err')
return lines
# ---------------------------------------------------------------------------
# Core table builder
# ---------------------------------------------------------------------------
def build_table(
matrix_results: dict,
baseline_results: Optional[dict],
metadata: dict,
baseline_metadata: Optional[dict],
show_buildup: bool,
show_quality: bool,
worse_threshold_pct: float = 10.0,
better_threshold_pct: float = 5.0,
) -> plt.Figure:
"""
Build and return a matplotlib Figure containing the results table.
"""
n_rows = len(QOS_RATES) # 6
n_pd = len(PRESENTATION_DELAYS_MS) # 4
n_ch = len(CHANNELS) # 2
n_cols = n_pd * n_ch # 8
# Determine cell height based on content rows per cell
lines_per_cell = 1
if show_buildup:
lines_per_cell += 1
if show_quality:
lines_per_cell += 1
cell_h = 0.5 + 0.22 * lines_per_cell # inches
cell_w = 1.45 # inches
row_label_w = 1.4 # inches for row labels
hdr_h = 0.55 # top presentation-delay header row
sub_h = 0.38 # mono/stereo sub-header row
total_w = row_label_w + n_cols * cell_w + 0.3
total_h = hdr_h + sub_h + n_rows * cell_h + 1.6 # extra for title & legend
fig, ax = plt.subplots(figsize=(total_w, total_h))
ax.set_xlim(0, total_w)
ax.set_ylim(0, total_h)
ax.axis('off')
# coordinate helpers (y grows upward in matplotlib, so we flip)
def x_col(col_idx: int) -> float:
return row_label_w + col_idx * cell_w
def y_row(row_idx: int) -> float:
# row 0 = topmost data row
return total_h - 1.4 - hdr_h - sub_h - (row_idx + 1) * cell_h
def add_rect(x, y, w, h, facecolor, edgecolor='#90A4AE', lw=0.6, zorder=1):
rect = mpatches.FancyBboxPatch(
(x, y), w, h,
boxstyle='square,pad=0',
facecolor=facecolor, edgecolor=edgecolor, linewidth=lw, zorder=zorder)
ax.add_patch(rect)
def add_text(x, y, text, fontsize=8, color='black', ha='center', va='center',
bold=False, wrap_lines=None):
weight = 'bold' if bold else 'normal'
if wrap_lines:
for i, line in enumerate(wrap_lines):
offset = (len(wrap_lines) - 1) / 2.0 - i
ax.text(x, y + offset * (fontsize * 0.014),
line, fontsize=fontsize, color=color,
ha=ha, va='center', fontweight=weight,
clip_on=True)
else:
ax.text(x, y, text, fontsize=fontsize, color=color,
ha=ha, va='center', fontweight=weight, clip_on=True)
# -----------------------------------------------------------------------
# Title
# -----------------------------------------------------------------------
ts = metadata.get('timestamp', '')
try:
ts_fmt = datetime.fromisoformat(ts).strftime('%Y-%m-%d %H:%M')
except Exception:
ts_fmt = ts
title_lines = [
f"Matrix Test Results — {metadata.get('test_id', '')}",
f"SN: {metadata.get('serial_number', 'n/a')} SW: {metadata.get('software_version', 'n/a')} {ts_fmt}",
]
if metadata.get('comment'):
title_lines.append(f"Comment: {metadata['comment']}")
if baseline_metadata:
title_lines.append(
f"Baseline: {baseline_metadata.get('test_id', 'n/a')} "
f"({baseline_metadata.get('timestamp', '')[:10]})"
)
title_y = total_h - 0.25
for i, line in enumerate(title_lines):
ax.text(total_w / 2, title_y - i * 0.28, line,
fontsize=9 if i == 0 else 7.5,
fontweight='bold' if i == 0 else 'normal',
ha='center', va='top', color='#1A237E')
# -----------------------------------------------------------------------
# Row label column header (top-left corner block)
# -----------------------------------------------------------------------
hdr_top = total_h - 1.4
# Spans presentation-delay header + mono/stereo sub-header
add_rect(0, hdr_top - hdr_h - sub_h, row_label_w, hdr_h + sub_h,
facecolor=COLOR_HEADER)
add_text(row_label_w / 2, hdr_top - (hdr_h + sub_h) / 2,
'QoS / Rate', fontsize=8, color=COLOR_HEADER_TEXT, bold=True)
# -----------------------------------------------------------------------
# Presentation-delay group headers
# -----------------------------------------------------------------------
for pd_idx, pd_ms in enumerate(PRESENTATION_DELAYS_MS):
col_start = pd_idx * n_ch
x = x_col(col_start)
w = cell_w * n_ch
add_rect(x, hdr_top - hdr_h, w, hdr_h, facecolor=COLOR_HEADER)
add_text(x + w / 2, hdr_top - hdr_h / 2,
f'PD {pd_ms} ms', fontsize=8.5, color=COLOR_HEADER_TEXT, bold=True)
# -----------------------------------------------------------------------
# Mono / Stereo sub-headers
# -----------------------------------------------------------------------
sub_top = hdr_top - hdr_h
for col in range(n_cols):
ch = CHANNELS[col % n_ch]
x = x_col(col)
add_rect(x, sub_top - sub_h, cell_w, sub_h, facecolor=COLOR_SUBHDR)
add_text(x + cell_w / 2, sub_top - sub_h / 2,
ch.capitalize(), fontsize=7.5, color=COLOR_HEADER_TEXT, bold=True)
# -----------------------------------------------------------------------
# Data rows
# -----------------------------------------------------------------------
for row_idx, (qos, rate) in enumerate(QOS_RATES):
row_bg = COLOR_ROW_EVEN if row_idx % 2 == 0 else COLOR_ROW_ODD
# Row label
y = y_row(row_idx)
add_rect(0, y, row_label_w, cell_h, facecolor=COLOR_SUBHDR if row_idx < 3 else '#37474F')
label = f"{'Fast' if qos == 'fast' else 'Robust'} {rate}"
add_text(row_label_w / 2, y + cell_h / 2,
label, fontsize=8, color=COLOR_HEADER_TEXT, bold=True)
for col_idx, (pd_ms, ch) in enumerate(
[(pd, ch)
for pd in PRESENTATION_DELAYS_MS
for ch in CHANNELS]):
key = f"{qos}_{rate}_{ch}_{pd_ms}ms"
result = matrix_results.get(key)
baseline_result = baseline_results.get(key) if baseline_results else None
x = x_col(col_idx)
if result is None:
add_rect(x, y, cell_w, cell_h, facecolor=COLOR_MISSING)
add_text(x + cell_w / 2, y + cell_h / 2, '', fontsize=8)
continue
color = _cell_color(result, baseline_result,
worse_threshold_pct, better_threshold_pct)
add_rect(x, y, cell_w, cell_h, facecolor=color)
lines = _cell_text(result, show_buildup, show_quality)
# font size depends on how many lines
fs = 8.5 if len(lines) == 1 else 7.5
is_fail = color == COLOR_FAIL
txt_color = '#FFFFFF' if is_fail else '#1A1A2E'
# centre vertically
n = len(lines)
line_gap = cell_h / (n + 1)
for li, line in enumerate(lines):
line_y = y + cell_h - line_gap * (li + 1)
bold_line = li == 0 # first line (latency) is bold
ax.text(x + cell_w / 2, line_y, line,
fontsize=fs if li == 0 else fs - 0.5,
color=txt_color,
ha='center', va='center',
fontweight='bold' if bold_line else 'normal',
clip_on=True)
# -----------------------------------------------------------------------
# Outer border for the full table
# -----------------------------------------------------------------------
table_x = 0
table_y = y_row(n_rows - 1)
table_w = row_label_w + n_cols * cell_w
table_h_total = hdr_top - table_y
rect = mpatches.Rectangle((table_x, table_y), table_w, table_h_total,
fill=False, edgecolor='#37474F', linewidth=1.5)
ax.add_patch(rect)
# -----------------------------------------------------------------------
# Legend
# -----------------------------------------------------------------------
legend_y = y_row(n_rows - 1) - 0.55
legend_items = [
(COLOR_FAIL, 'FAIL / error'),
(COLOR_WORSE, f'>{worse_threshold_pct:.0f}% worse than baseline'),
(COLOR_NEUTRAL, 'Within threshold'),
(COLOR_BETTER, f'>{better_threshold_pct:.0f}% better than baseline'),
(COLOR_MISSING, 'Not measured'),
]
lx = 0.2
for color, label in legend_items:
add_rect(lx, legend_y - 0.18, 0.28, 0.25, facecolor=color,
edgecolor='#90A4AE', lw=0.8)
ax.text(lx + 0.35, legend_y - 0.055, label, fontsize=7, va='center')
lx += 2.2
plt.tight_layout(pad=0.1)
return fig
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def load_matrix_results(path: Path) -> tuple:
"""Load a matrix results YAML and return (matrix_results, metadata)."""
with open(path, 'r') as f:
data = yaml.safe_load(f)
return data.get('matrix_results', {}), data.get('metadata', {})
def main():
parser = argparse.ArgumentParser(
description='Plot matrix test results as a table image')
parser.add_argument('results', help='Path to matrix results YAML file')
parser.add_argument('--baseline', default=None,
help='Path to baseline matrix results YAML for comparison')
parser.add_argument('--output', default=None,
help='Output image path (default: <results_stem>_table.png)')
parser.add_argument('--worse-threshold', type=float, default=10.0,
help='Percent worse than baseline to colour orange (default: 10)')
parser.add_argument('--better-threshold', type=float, default=5.0,
help='Percent better than baseline to colour green (default: 5)')
parser.add_argument('--dpi', type=int, default=150,
help='Output image DPI (default: 150)')
args = parser.parse_args()
results_path = Path(args.results)
if not results_path.exists():
print(f"ERROR: Results file not found: {results_path}", file=sys.stderr)
sys.exit(1)
matrix_results, metadata = load_matrix_results(results_path)
baseline_results = None
baseline_metadata = None
if args.baseline:
baseline_path = Path(args.baseline)
if not baseline_path.exists():
print(f"ERROR: Baseline file not found: {baseline_path}", file=sys.stderr)
sys.exit(1)
baseline_results, baseline_metadata = load_matrix_results(baseline_path)
print(f"Comparing against baseline: {baseline_path.name}")
# Detect which optional columns are present
show_buildup = any(
r.get('buildup') is not None
for r in matrix_results.values()
)
show_quality = any(
r.get('quality') is not None
for r in matrix_results.values()
)
print(f"Results: {len(matrix_results)} combinations")
print(f"Show buildup column: {show_buildup}")
print(f"Show quality column: {show_quality}")
fig = build_table(
matrix_results=matrix_results,
baseline_results=baseline_results,
metadata=metadata,
baseline_metadata=baseline_metadata,
show_buildup=show_buildup,
show_quality=show_quality,
worse_threshold_pct=args.worse_threshold,
better_threshold_pct=args.better_threshold,
)
# Always save next to the results YAML
folder_copy = results_path.parent / f"{results_path.stem}_table.png"
fig.savefig(folder_copy, dpi=args.dpi, bbox_inches='tight',
facecolor='white', edgecolor='none')
print(f"Table saved to: {folder_copy}")
# If a custom --output path was given (and differs), save there too
if args.output:
output_path = Path(args.output)
if output_path.resolve() != folder_copy.resolve():
fig.savefig(output_path, dpi=args.dpi, bbox_inches='tight',
facecolor='white', edgecolor='none')
print(f"Table also saved to: {output_path}")
plt.close(fig)
if __name__ == '__main__':
main()

81
plot_perf_log.py Normal file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Parse Perf lines from a log file and plot sample mean, write mean, and loop mean over time."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
PERF_RE = re.compile(
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
r".*?sample mean=([\d.]+)ms"
r".*?write mean=([\d.]+)ms"
r".*?loop mean=([\d.]+)ms"
)
def parse_log(log_path):
timestamps = []
sample_means = []
write_means = []
loop_means = []
with open(log_path, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str, sample, write, loop = m.groups()
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
timestamps.append(ts)
sample_means.append(float(sample))
write_means.append(float(write))
loop_means.append(float(loop))
if not timestamps:
print("No Perf lines found in the log file.", file=sys.stderr)
sys.exit(1)
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, sample_means, write_means, loop_means
def plot(seconds, sample_means, write_means, loop_means, out_path):
plt.figure(figsize=(12, 6))
plt.plot(seconds, sample_means, label="sample mean (ms)")
plt.plot(seconds, write_means, label="write mean (ms)")
plt.plot(seconds, loop_means, label="loop mean (ms)")
plt.xlabel("Time (s)")
plt.ylabel("Duration (ms)")
plt.title("Perf Metrics Over Time")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
print(f"Plot saved to {out_path}")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <path_to_log_file>", file=sys.stderr)
sys.exit(1)
log_path = sys.argv[1]
if not os.path.isfile(log_path):
print(f"File not found: {log_path}", file=sys.stderr)
sys.exit(1)
seconds, sample_means, write_means, loop_means = parse_log(log_path)
log_dir = os.path.dirname(os.path.abspath(log_path))
log_base = os.path.splitext(os.path.basename(log_path))[0]
out_path = os.path.join(log_dir, f"{log_base}_perf_plot.png")
plot(seconds, sample_means, write_means, loop_means, out_path)
if __name__ == "__main__":
main()

View File

@@ -3,3 +3,4 @@ scipy>=1.10.0
sounddevice>=0.4.6 sounddevice>=0.4.6
PyYAML>=6.0 PyYAML>=6.0
matplotlib>=3.7.0 matplotlib>=3.7.0
requests>=2.28.0

View File

@@ -1,33 +1,34 @@
import time
import numpy as np import numpy as np
import sounddevice as sd import sounddevice as sd
from scipy import signal from scipy import signal
from typing import Tuple, Dict, List from scipy.io import wavfile
from typing import Tuple, Dict, List, Optional
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from pathlib import Path from pathlib import Path
def find_audio_device(device_name: str = "Scarlett") -> tuple: def find_audio_device(device_name: str = "Scarlett") -> tuple:
devices = sd.query_devices() devices = sd.query_devices()
for idx, device in enumerate(devices): for idx, device in enumerate(devices):
if device_name.lower() in device['name'].lower(): if device_name.lower() in device['name'].lower():
if device['max_input_channels'] >= 2 and device['max_output_channels'] >= 2: if device['max_input_channels'] >= 2 and device['max_output_channels'] >= 2:
return (idx, idx) return (idx, idx)
default_device = sd.default.device default_device = sd.default.device
if hasattr(default_device, '__getitem__'): if hasattr(default_device, '__getitem__'):
input_dev = int(default_device[0]) if default_device[0] is not None else 0 input_dev = int(default_device[0]) if default_device[0] is not None else 0
output_dev = int(default_device[1]) if default_device[1] is not None else 0 output_dev = int(default_device[1]) if default_device[1] is not None else 0
else: else:
input_dev = output_dev = int(default_device) if default_device is not None else 0 input_dev = output_dev = int(default_device) if default_device is not None else 0
input_info = devices[input_dev] input_info = devices[input_dev]
output_info = devices[output_dev] output_info = devices[output_dev]
if input_info['max_input_channels'] >= 2 and output_info['max_output_channels'] >= 2: if input_info['max_input_channels'] >= 2 and output_info['max_output_channels'] >= 2:
print(f"Using default device - Input: {input_info['name']}, Output: {output_info['name']}") print(f"Using default device - Input: {input_info['name']}, Output: {output_info['name']}")
return (input_dev, output_dev) return (input_dev, output_dev)
raise RuntimeError(f"No suitable audio device found with 2+ input/output channels") raise RuntimeError(f"No suitable audio device found with 2+ input/output channels")
@@ -45,11 +46,18 @@ def generate_chirp(duration: float, sample_rate: int, f0: float = 100, f1: float
def play_and_record(tone: np.ndarray, sample_rate: int, device_id: tuple, channels: int = 2) -> np.ndarray: def play_and_record(tone: np.ndarray, sample_rate: int, device_id: tuple, channels: int = 2) -> np.ndarray:
output_signal = np.column_stack([tone, tone]) output_signal = np.column_stack([tone, tone])
input_dev, output_dev = device_id input_dev, output_dev = device_id
recording = sd.playrec(output_signal, samplerate=sample_rate,
channels=channels, device=(input_dev, output_dev), blocking=True) sd.stop()
recording = sd.playrec(output_signal, samplerate=sample_rate,
channels=channels, device=(input_dev, output_dev),
latency='high', blocking=True)
sd.stop()
if not np.isfinite(recording).all():
raise RuntimeError("Recording contains NaN/Inf — ALSA stream corrupted. "
"Try replugging the audio interface.")
return recording return recording
@@ -213,9 +221,15 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool =
channels = config['audio']['channels'] channels = config['audio']['channels']
device_ids = find_audio_device(device_name) device_ids = find_audio_device(device_name)
chirp_signal = generate_chirp(duration, sample_rate, amplitude=amplitude) chirp_signal = generate_chirp(duration, sample_rate, amplitude=amplitude)
# Discard one warm-up recording to flush stale ALSA ring buffer data
try:
play_and_record(chirp_signal, sample_rate, device_ids, channels)
except Exception:
pass
latencies = [] latencies = []
last_recording = None last_recording = None
last_correlation = None last_correlation = None
@@ -235,11 +249,22 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool =
last_correlation = correlation last_correlation = correlation
last_lags = lags last_lags = lags
avg = float(np.mean(latencies))
std_dev = float(np.std(latencies))
latency_cfg = config.get('latency', {})
max_std_dev_ms = latency_cfg.get('max_std_dev_ms', None)
min_avg_ms = latency_cfg.get('min_avg_ms', None)
valid = True
if max_std_dev_ms is not None and std_dev > max_std_dev_ms:
valid = False
if min_avg_ms is not None and avg < min_avg_ms:
valid = False
latency_stats = { latency_stats = {
'avg': float(np.mean(latencies)), 'avg': avg,
'min': float(np.min(latencies)), 'min': float(np.min(latencies)),
'max': float(np.max(latencies)), 'max': float(np.max(latencies)),
'std': float(np.std(latencies)) 'std': std_dev,
'valid': valid
} }
if save_plots and output_dir and last_recording is not None: if save_plots and output_dir and last_recording is not None:
@@ -509,6 +534,312 @@ def detect_artifacts_energy_variation(signal_data: np.ndarray, sample_rate: int,
return artifacts return artifacts
def generate_sync_marker(sample_rate: int, duration_sec: float = 0.05,
f0: float = 200.0, f1: float = 4000.0,
amplitude: float = 0.7) -> np.ndarray:
t = np.linspace(0, duration_sec, int(sample_rate * duration_sec), endpoint=False)
win = np.hanning(len(t))
return amplitude * win * signal.chirp(t, f0, duration_sec, f1, method='linear')
def embed_markers(base_signal: np.ndarray, marker: np.ndarray,
sample_rate: int, interval_sec: float = 5.0,
first_offset_sec: float = 0.5) -> Tuple[np.ndarray, List[int]]:
out = base_signal.copy()
marker_len = len(marker)
positions: List[int] = []
t = first_offset_sec
total_sec = len(base_signal) / sample_rate
while t + marker_len / sample_rate + 0.1 <= total_sec:
pos = int(t * sample_rate)
end = pos + marker_len
out[pos:end] = np.clip(out[pos:end] + marker, -1.0, 1.0)
positions.append(pos)
t += interval_sec
return out, positions
def find_all_marker_positions(channel: np.ndarray, marker_template: np.ndarray,
expected_positions: List[int], sample_rate: int,
search_radius_ms: float = 500.0) -> List[Optional[int]]:
corr = np.abs(signal.correlate(channel, marker_template, mode='valid'))
search_radius = int(search_radius_ms / 1000.0 * sample_rate)
found: List[Optional[int]] = []
for expected in expected_positions:
lo = max(0, expected - search_radius)
hi = min(len(corr), expected + search_radius + 1)
if lo >= hi:
found.append(None)
continue
local_idx = int(np.argmax(corr[lo:hi]))
found.append(lo + local_idx)
return found
def align_by_markers(reference: np.ndarray, dut: np.ndarray,
marker_template: np.ndarray, expected_positions: List[int],
sample_rate: int, max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]:
ref_pos = find_all_marker_positions(reference, marker_template, expected_positions,
sample_rate, max_lag_ms)
dut_pos = find_all_marker_positions(dut, marker_template, expected_positions,
sample_rate, max_lag_ms)
lags = [dp - rp for rp, dp in zip(ref_pos, dut_pos)
if rp is not None and dp is not None]
if not lags:
return align_channels(reference, dut, sample_rate, max_lag_ms)
lag_samples = int(round(float(np.median(lags))))
if lag_samples >= 0:
ref_aligned = reference[:len(reference) - lag_samples]
dut_aligned = dut[lag_samples:]
else:
ref_aligned = reference[-lag_samples:]
dut_aligned = dut[:len(dut) + lag_samples]
min_len = min(len(ref_aligned), len(dut_aligned))
return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples
def detect_sample_slips_by_markers(reference: np.ndarray, dut: np.ndarray,
marker_template: np.ndarray,
expected_ref_positions: List[int],
sample_rate: int, lag_samples: int,
min_slip_samples: int = 3) -> List[Dict]:
artifacts: List[Dict] = []
total_duration = len(reference) / sample_rate
expected_dut_positions = [p + lag_samples for p in expected_ref_positions]
ref_found = find_all_marker_positions(reference, marker_template, expected_ref_positions,
sample_rate, search_radius_ms=200.0)
dut_found = find_all_marker_positions(dut, marker_template, expected_dut_positions,
sample_rate, search_radius_ms=200.0)
interval_lags: List[Optional[int]] = [
(dp - rp) if rp is not None and dp is not None else None
for rp, dp in zip(ref_found, dut_found)
]
valid_lags = [v for v in interval_lags if v is not None]
baseline_lag = int(round(float(np.median(valid_lags)))) if valid_lags else 0
for i in range(len(interval_lags)):
if interval_lags[i] is None:
continue
delta = int(interval_lags[i]) - baseline_lag
if abs(delta) >= min_slip_samples:
time_sec = float(expected_ref_positions[i] / sample_rate)
if time_sec < 1.0 or time_sec > total_duration - 1.0:
continue
artifacts.append({
'type': 'sample_slip',
'time_sec': round(time_sec, 4),
'lag_change_samples': int(delta),
'lag_baseline': baseline_lag,
'lag_at_marker': int(interval_lags[i]),
})
return artifacts
def align_channels(reference: np.ndarray, dut: np.ndarray, sample_rate: int,
max_lag_ms: float = 500.0) -> Tuple[np.ndarray, np.ndarray, int]:
max_lag_samples = int(max_lag_ms / 1000.0 * sample_rate)
# Use a representative middle segment (up to 4 s) for robust correlation
seg_len = min(len(reference), len(dut), int(sample_rate * 4))
mid = min(len(reference), len(dut)) // 2
ref_seg = reference[mid - seg_len // 2: mid + seg_len // 2]
dut_seg = dut[mid - seg_len // 2: mid + seg_len // 2]
correlation = signal.correlate(dut_seg, ref_seg, mode='full')
lags = np.arange(-(len(ref_seg) - 1), len(ref_seg))
valid_mask = np.abs(lags) <= max_lag_samples
corr_masked = np.where(valid_mask, np.abs(correlation), 0)
lag_samples = int(lags[np.argmax(corr_masked)])
if lag_samples >= 0:
ref_aligned = reference[:len(reference) - lag_samples]
dut_aligned = dut[lag_samples:]
else:
ref_aligned = reference[-lag_samples:]
dut_aligned = dut[:len(dut) + lag_samples]
min_len = min(len(ref_aligned), len(dut_aligned))
return ref_aligned[:min_len], dut_aligned[:min_len], lag_samples
def compute_residual(ref_aligned: np.ndarray, dut_aligned: np.ndarray) -> Tuple[np.ndarray, float]:
ref_rms = np.sqrt(np.mean(ref_aligned ** 2))
if ref_rms < 1e-10:
return dut_aligned.copy(), 1.0
gain = float(np.sqrt(np.mean(dut_aligned ** 2)) / ref_rms)
residual = dut_aligned - gain * ref_aligned
return residual, gain
def detect_glitches_short_time_energy(residual: np.ndarray, sample_rate: int,
window_ms: float = 5.0,
threshold_factor: float = 6.0,
min_burst_ms: float = 0.5) -> List[Dict]:
from scipy.ndimage import uniform_filter1d
artifacts = []
window_samples = max(4, int(window_ms / 1000.0 * sample_rate))
min_burst_samples = max(1, int(min_burst_ms / 1000.0 * sample_rate))
total_duration = len(residual) / sample_rate
rms_envelope = np.sqrt(np.maximum(
uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0))
baseline_rms = np.median(rms_envelope)
if baseline_rms < 1e-12:
return artifacts
exceed = rms_envelope > threshold_factor * baseline_rms
changes = np.diff(exceed.astype(np.int8), prepend=0, append=0)
burst_starts = np.where(changes == 1)[0]
burst_ends = np.where(changes == -1)[0]
for bstart, bend in zip(burst_starts, burst_ends):
burst_len = int(bend - bstart)
if burst_len < min_burst_samples:
continue
time_sec = float(bstart / sample_rate)
if time_sec < 1.0 or time_sec > total_duration - 1.0:
continue
peak_rms = float(np.max(rms_envelope[bstart:bend]))
artifacts.append({
'type': 'null_test_glitch',
'time_sec': round(time_sec, 4),
'duration_ms': round(burst_len / sample_rate * 1000.0, 2),
'peak_residual_rms': round(peak_rms, 8),
'baseline_rms': round(float(baseline_rms), 8),
'deviation_factor': round(peak_rms / baseline_rms, 2)
})
return artifacts
def detect_sample_slips(reference: np.ndarray, dut: np.ndarray, sample_rate: int,
window_ms: float = 50.0, step_ms: float = 100.0) -> List[Dict]:
artifacts = []
window_samples = int(window_ms / 1000.0 * sample_rate)
step_samples = int(step_ms / 1000.0 * sample_rate)
max_search = min(window_samples // 4, int(0.010 * sample_rate)) # ±10 ms
min_len = min(len(reference), len(dut))
total_duration = min_len / sample_rate
if min_len < window_samples * 2:
return artifacts
lags_over_time = []
times = []
for start in range(0, min_len - window_samples, step_samples):
ref_seg = reference[start: start + window_samples]
dut_seg = dut[start: start + window_samples]
corr = signal.correlate(dut_seg, ref_seg, mode='full')
lags = np.arange(-(len(ref_seg) - 1), len(ref_seg))
valid = np.abs(lags) <= max_search
best_lag = int(lags[np.argmax(np.where(valid, np.abs(corr), 0))])
lags_over_time.append(best_lag)
times.append(float(start / sample_rate))
if len(lags_over_time) < 3:
return artifacts
lags_arr = np.array(lags_over_time)
for i in range(1, len(lags_arr)):
delta = int(lags_arr[i]) - int(lags_arr[i - 1])
if abs(delta) >= 1:
time_sec = times[i]
if time_sec < 1.0 or time_sec > total_duration - 1.0:
continue
artifacts.append({
'type': 'sample_slip',
'time_sec': round(time_sec, 4),
'lag_change_samples': int(delta),
'lag_before': int(lags_arr[i - 1]),
'lag_after': int(lags_arr[i])
})
return artifacts
def detect_artifacts_null_test(reference: np.ndarray, dut: np.ndarray,
sample_rate: int, null_test_config: Dict,
marker_template: Optional[np.ndarray] = None,
marker_positions: Optional[List[int]] = None) -> Dict:
result = {
'enabled': null_test_config.get('enabled', True),
'lag_samples': 0,
'lag_ms': 0.0,
'gain_factor': 1.0,
'residual_rms': 0.0,
'residual_peak': 0.0,
'total_count': 0,
'by_type': {},
'artifacts': [],
'_ref_aligned': None,
'_dut_aligned': None,
'_residual': None,
}
if not result['enabled']:
return result
# Guard: skip if either channel is silent/corrupted (e.g. ALSA underrun)
min_signal_level = 1e-4
if np.max(np.abs(reference)) < min_signal_level or np.max(np.abs(dut)) < min_signal_level:
result['error'] = 'recording_too_quiet_or_corrupted'
print(" ⚠ Null test skipped: one or both channels are silent (possible ALSA underrun).")
return result
max_lag_ms = float(null_test_config.get('max_lag_ms', 500.0))
if marker_template is not None and marker_positions:
ref_aligned, dut_aligned, lag_samples = align_by_markers(
reference, dut, marker_template, marker_positions, sample_rate, max_lag_ms)
alignment_method = 'marker'
else:
ref_aligned, dut_aligned, lag_samples = align_channels(
reference, dut, sample_rate, max_lag_ms)
alignment_method = 'xcorr'
result['lag_samples'] = lag_samples
result['lag_ms'] = round(float(lag_samples) / sample_rate * 1000.0, 3)
result['alignment_method'] = alignment_method
residual, gain = compute_residual(ref_aligned, dut_aligned)
result['gain_factor'] = round(gain, 6)
result['residual_rms'] = round(float(np.sqrt(np.mean(residual ** 2))), 8)
result['residual_peak'] = round(float(np.max(np.abs(residual))), 6)
result['_ref_aligned'] = ref_aligned
result['_dut_aligned'] = dut_aligned
result['_residual'] = residual
all_artifacts = []
window_ms = float(null_test_config.get('window_ms', 5.0))
threshold_factor = float(null_test_config.get('threshold_factor', 6.0))
min_burst_ms = float(null_test_config.get('min_burst_ms', 0.5))
glitches = detect_glitches_short_time_energy(
residual, sample_rate, window_ms, threshold_factor, min_burst_ms)
all_artifacts.extend(glitches)
if null_test_config.get('sample_slip_detection', True):
if marker_template is not None and marker_positions and len(marker_positions) >= 2:
min_slip_samples = int(null_test_config.get('min_slip_samples', 3))
slips = detect_sample_slips_by_markers(
reference, dut, marker_template, marker_positions, sample_rate, lag_samples,
min_slip_samples=min_slip_samples)
else:
slip_window_ms = float(null_test_config.get('sample_slip_window_ms', 50.0))
slips = detect_sample_slips(ref_aligned, dut_aligned, sample_rate, slip_window_ms)
all_artifacts.extend(slips)
result['total_count'] = len(all_artifacts)
result['artifacts'] = all_artifacts
for a in all_artifacts:
t = a['type']
result['by_type'][t] = result['by_type'].get(t, 0) + 1
return result
def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int, def measure_frequency_accuracy(signal_data: np.ndarray, sample_rate: int,
expected_freq: float) -> Dict: expected_freq: float) -> Dict:
""" """
@@ -698,6 +1029,147 @@ def plot_artifact_detection(channel_1: np.ndarray, channel_2: np.ndarray,
plt.close() plt.close()
def plot_deviation_histogram(artifacts_ch1: Dict, artifacts_ch2: Dict, output_dir: Path):
def get_deviations(artifacts_dict):
values = []
for a in artifacts_dict.get('artifacts', []):
if 'deviation_factor' in a:
values.append(a['deviation_factor'])
return values
dev_ch1 = get_deviations(artifacts_ch1)
dev_ch2 = get_deviations(artifacts_ch2)
all_devs = dev_ch1 + dev_ch2
if not all_devs:
return
raw_min = min(all_devs)
raw_max = max(all_devs)
MAX_BINS = 50
if raw_max / max(raw_min, 1e-9) > 20 or (raw_max - raw_min) > MAX_BINS:
bins = np.logspace(np.log10(max(raw_min, 0.1)), np.log10(raw_max + 1), MAX_BINS + 1)
bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)]
else:
bin_min = int(np.floor(raw_min))
bin_max = int(np.ceil(raw_max)) + 1
bins = np.arange(bin_min, bin_max + 1)
if len(bins) > MAX_BINS + 1:
bins = np.linspace(bin_min, bin_max, MAX_BINS + 1)
bin_labels = [f"{bins[i]:.1f}-{bins[i+1]:.1f}" for i in range(len(bins) - 1)]
counts_ch1, _ = np.histogram(dev_ch1 if dev_ch1 else [0], bins=bins)
counts_ch2, _ = np.histogram(dev_ch2 if dev_ch2 else [0], bins=bins)
x = np.arange(len(bin_labels))
width = 0.4
fig_width = min(24, max(10, len(bin_labels) * 0.5))
fig, ax = plt.subplots(figsize=(fig_width, 6))
bars1 = ax.bar(x - width / 2, counts_ch1, width, label='Ch1 Loopback', color='steelblue', alpha=0.85)
bars2 = ax.bar(x + width / 2, counts_ch2, width, label='Ch2 DUT/Radio', color='tomato', alpha=0.85)
ax.set_xlabel('Deviation Factor (σ)')
ax.set_ylabel('Count')
ax.set_title('Artifact Deviation Factor Distribution')
ax.set_xticks(x)
ax.set_xticklabels(bin_labels, rotation=45, ha='right')
ax.yaxis.get_major_locator().set_params(integer=True)
ax.legend()
ax.grid(True, axis='y', alpha=0.3)
for bar in bars1:
if bar.get_height() > 0:
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.05,
str(int(bar.get_height())), ha='center', va='bottom', fontsize=8)
for bar in bars2:
if bar.get_height() > 0:
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.05,
str(int(bar.get_height())), ha='center', va='bottom', fontsize=8)
plt.tight_layout()
plot_file = output_dir / 'artifact_deviation_histogram.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
def plot_null_test(null_test_result: Dict, sample_rate: int, output_dir: Path,
marker_positions: Optional[List[int]] = None,
marker_len_samples: int = 0):
from scipy.ndimage import uniform_filter1d
ref_aligned = null_test_result['_ref_aligned']
dut_aligned = null_test_result['_dut_aligned']
residual = null_test_result['_residual']
if ref_aligned is None or residual is None:
return
total_duration = len(ref_aligned) / sample_rate
fig, axes = plt.subplots(3, 1, figsize=(16, 12))
time = np.arange(len(ref_aligned)) / sample_rate
axes[0].plot(time, ref_aligned, alpha=0.6, linewidth=0.5, label='Ch1 Loopback (aligned)')
axes[0].plot(time, dut_aligned, alpha=0.6, linewidth=0.5, label='Ch2 DUT (aligned)')
axes[0].set_ylabel('Amplitude')
axes[0].set_title(
f'Null Test — Aligned Channels '
f'(lag={null_test_result["lag_ms"]:.2f} ms, '
f'gain={null_test_result["gain_factor"]:.4f})')
axes[0].legend(loc='upper right', fontsize=8)
axes[0].grid(True, alpha=0.3)
axes[1].plot(time, residual, alpha=0.8, linewidth=0.4, color='purple')
axes[1].set_ylabel('Amplitude')
axes[1].set_title(
f'Residual e[n] = DUT gain·Ref '
f'(RMS={null_test_result["residual_rms"]:.2e}, '
f'peak={null_test_result["residual_peak"]:.4f})')
for a in null_test_result['artifacts']:
color = 'red' if a['type'] == 'null_test_glitch' else 'orange'
axes[1].axvline(x=a['time_sec'], color=color, alpha=0.6, linewidth=1.0)
axes[1].grid(True, alpha=0.3)
window_samples = max(4, int(5.0 / 1000.0 * sample_rate))
rms_envelope = np.sqrt(np.maximum(
uniform_filter1d(residual ** 2, size=window_samples, mode='reflect'), 0.0))
axes[2].plot(time, rms_envelope, linewidth=0.6, color='darkgreen', label='Short-time RMS (5 ms)')
axes[2].set_xlabel('Time (s)')
axes[2].set_ylabel('Short-time RMS')
axes[2].set_title('Short-time RMS of Residual')
axes[2].grid(True, alpha=0.3)
# Determine x-window for lower two axes: exclude marker regions at start and end
margin_sec = 0.5
if marker_positions and len(marker_positions) >= 2 and marker_len_samples > 0:
x_min = (marker_positions[0] + marker_len_samples) / sample_rate + margin_sec
x_max = marker_positions[-1] / sample_rate - margin_sec
else:
x_min = margin_sec
x_max = total_duration - margin_sec
if x_min < x_max:
axes[1].set_xlim(x_min, x_max)
axes[2].set_xlim(x_min, x_max)
# Recompute baseline from the visible region only (unaffected by marker bursts)
i_min = max(0, int(x_min * sample_rate))
i_max = min(len(rms_envelope), int(x_max * sample_rate))
baseline = float(np.median(rms_envelope[i_min:i_max]))
else:
baseline = float(np.median(rms_envelope))
axes[2].axhline(y=baseline, color='steelblue', linestyle='--', linewidth=1,
label=f'Baseline ({baseline:.2e})')
axes[2].legend(fontsize=8)
plt.tight_layout()
plt.savefig(output_dir / 'null_test_residual.png', dpi=150, bbox_inches='tight')
plt.close()
def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict: def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_dir: Path = None) -> Dict:
import time import time
@@ -708,6 +1180,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
device_name = config['audio']['device_name'] device_name = config['audio']['device_name']
channels = config['audio']['channels'] channels = config['audio']['channels']
detector_config = config['artifact_detection']['detectors'] detector_config = config['artifact_detection']['detectors']
null_test_config = detector_config.get('null_test', {})
startup_delay = config['artifact_detection'].get('startup_delay', 10) startup_delay = config['artifact_detection'].get('startup_delay', 10)
signal_type = config['artifact_detection'].get('signal_type', 'sine') signal_type = config['artifact_detection'].get('signal_type', 'sine')
@@ -718,6 +1191,10 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
time.sleep(startup_delay) time.sleep(startup_delay)
print("Starting recording...") print("Starting recording...")
use_null_test = signal_type != 'silent' and null_test_config.get('enabled', True)
marker_template: Optional[np.ndarray] = None
marker_positions: Optional[List[int]] = []
if signal_type == 'chirp': if signal_type == 'chirp':
f0 = config['artifact_detection'].get('chirp_f0', 100) f0 = config['artifact_detection'].get('chirp_f0', 100)
f1 = config['artifact_detection'].get('chirp_f1', 8000) f1 = config['artifact_detection'].get('chirp_f1', 8000)
@@ -726,21 +1203,67 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
recording = play_and_record(tone, sample_rate, device_ids, channels) recording = play_and_record(tone, sample_rate, device_ids, channels)
elif signal_type == 'silent': elif signal_type == 'silent':
frequency = 1000 frequency = 1000
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate, recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate,
channels=channels, device=device_ids[0], blocking=True) channels=channels, device=device_ids[0], blocking=True)
else: else:
tone = generate_test_tone(frequency, duration, sample_rate, amplitude) tone = generate_test_tone(frequency, duration, sample_rate, amplitude)
if use_null_test:
marker_duration_sec = float(null_test_config.get('marker_duration_sec', 0.05))
marker_offset_sec = float(null_test_config.get('marker_first_offset_sec', 0.5))
marker_template = generate_sync_marker(
sample_rate,
duration_sec=marker_duration_sec,
f0=float(null_test_config.get('marker_f0', 200.0)),
f1=float(null_test_config.get('marker_f1', 16000.0)),
amplitude=float(null_test_config.get('marker_amplitude', 0.7)),
)
marker_len = len(marker_template)
pos_start = int(marker_offset_sec * sample_rate)
pos_end = len(tone) - int(marker_offset_sec * sample_rate) - marker_len
marker_positions = [pos_start, pos_end]
tone = tone.copy()
for pos in marker_positions:
tone[pos:pos + marker_len] = np.clip(
tone[pos:pos + marker_len] + marker_template, -1.0, 1.0)
print(f" Embedded sync markers at start ({marker_offset_sec:.1f}s) and end "
f"({pos_end/sample_rate:.1f}s) "
f"(chirp 200→16000 Hz, {marker_duration_sec*1000:.0f} ms)")
recording = play_and_record(tone, sample_rate, device_ids, channels) recording = play_and_record(tone, sample_rate, device_ids, channels)
channel_1 = recording[:, 0] channel_1 = recording[:, 0]
channel_2 = recording[:, 1] channel_2 = recording[:, 1]
if output_dir:
wavfile.write(str(output_dir / 'channel_1_loopback_recording.wav'), sample_rate, channel_1.astype(np.float32))
wavfile.write(str(output_dir / 'channel_2_dut_recording.wav'), sample_rate, channel_2.astype(np.float32))
artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config) artifacts_ch1 = detect_artifacts_combined(channel_1, sample_rate, frequency, detector_config)
artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config) artifacts_ch2 = detect_artifacts_combined(channel_2, sample_rate, frequency, detector_config)
null_test_result = None
if use_null_test:
print("Running null test (align → subtract → residual analysis)...")
null_test_result = detect_artifacts_null_test(
channel_1, channel_2, sample_rate, null_test_config,
marker_template=marker_template,
marker_positions=marker_positions if marker_positions else None,
)
if 'error' not in null_test_result:
print(f" Lag: {null_test_result['lag_ms']:.2f} ms "
f"[{null_test_result.get('alignment_method', '?')}] | "
f"Gain: {null_test_result['gain_factor']:.4f} | "
f"Residual RMS: {null_test_result['residual_rms']:.2e} | "
f"Glitches: {null_test_result['total_count']}")
if save_plots and output_dir: if save_plots and output_dir:
plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2, plot_artifact_detection(channel_1, channel_2, artifacts_ch1, artifacts_ch2,
frequency, sample_rate, output_dir) frequency, sample_rate, output_dir)
plot_deviation_histogram(artifacts_ch1, artifacts_ch2, output_dir)
if null_test_result is not None and null_test_result['_residual'] is not None:
plot_null_test(null_test_result, sample_rate, output_dir,
marker_positions=marker_positions,
marker_len_samples=len(marker_template) if marker_template is not None else 0)
anomalies_dir = output_dir / 'individual_anomalies' anomalies_dir = output_dir / 'individual_anomalies'
anomalies_dir.mkdir(exist_ok=True) anomalies_dir.mkdir(exist_ok=True)
@@ -758,7 +1281,13 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts']) total_anomaly_plots = len(artifacts_ch1['artifacts']) + len(artifacts_ch2['artifacts'])
if total_anomaly_plots > 0: if total_anomaly_plots > 0:
print(f"✓ Generated {total_anomaly_plots} individual anomaly plots") print(f"✓ Generated {total_anomaly_plots} individual anomaly plots")
null_test_serializable = None
if null_test_result is not None:
null_test_serializable = {
k: v for k, v in null_test_result.items() if not k.startswith('_')
}
result = { result = {
'signal_type': signal_type, 'signal_type': signal_type,
'duration_sec': float(duration), 'duration_sec': float(duration),
@@ -774,6 +1303,7 @@ def run_artifact_detection_test(config: Dict, save_plots: bool = False, output_d
'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60), 'artifact_rate_per_minute': float(artifacts_ch2['total_count'] / duration * 60),
'frequency_accuracy': artifacts_ch2['frequency_accuracy'] 'frequency_accuracy': artifacts_ch2['frequency_accuracy']
}, },
'null_test': null_test_serializable,
'detector_config': detector_config 'detector_config': detector_config
} }

View File

@@ -36,10 +36,9 @@ def main():
test_id = timestamp.strftime('%Y%m%d_%H%M%S') test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir']) results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / f"{test_id}_artifact_detection" test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_artifact_detection"
test_output_dir.mkdir(exist_ok=True) test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False) save_plots = config['output'].get('save_plots', False)
@@ -134,13 +133,42 @@ def main():
print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz") print(f" Measured: {freq_acc['measured_freq_hz']:.2f} Hz")
print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)") print(f" Error: {freq_acc['error_hz']:+.2f} Hz ({freq_acc['error_percent']:+.3f}%)")
nt = result.get('null_test')
if nt and nt.get('enabled'):
print("\n🔬 NULL TEST (Ch2 DUT vs Ch1 Loopback reference):")
print(f" Alignment lag: {nt['lag_ms']:.2f} ms ({nt['lag_samples']} samples)")
print(f" Gain factor: {nt['gain_factor']:.4f}")
print(f" Residual RMS: {nt['residual_rms']:.2e}")
print(f" Residual peak: {nt['residual_peak']:.4f}")
print(f" Glitches found: {nt['total_count']}")
if nt['by_type']:
print(" By type:")
for artifact_type, count in nt['by_type'].items():
print(f" - {artifact_type}: {count}")
if nt['artifacts']:
print(" Glitch timestamps:")
for a in nt['artifacts'][:20]:
if a['type'] == 'null_test_glitch':
print(f" {a['time_sec']:.3f}s dur={a['duration_ms']:.1f}ms "
f"dev={a['deviation_factor']:.1f}×baseline")
elif a['type'] == 'sample_slip':
baseline = a.get('lag_baseline', a.get('lag_before', '?'))
at = a.get('lag_at_marker', a.get('lag_after', '?'))
print(f" {a['time_sec']:.3f}s sample_slip "
f"Δ={a['lag_change_samples']:+d} samples "
f"(baseline={baseline}, at_marker={at})")
if len(nt['artifacts']) > 20:
print(f" ... and {len(nt['artifacts']) - 20} more (see YAML)")
ch1_count = result['channel_1_loopback']['total_artifacts'] ch1_count = result['channel_1_loopback']['total_artifacts']
ch2_count = result['channel_2_dut']['total_artifacts'] ch2_count = result['channel_2_dut']['total_artifacts']
if ch2_count > ch1_count: if nt and nt.get('enabled') and nt['total_count'] > 0:
print(f"\n⚠️ NULL TEST: {nt['total_count']} glitch(es) detected in DUT path residual")
elif ch2_count > ch1_count:
delta = ch2_count - ch1_count delta = ch2_count - ch1_count
print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback") print(f"\n⚠️ DEGRADATION DETECTED: {delta} more artifacts in radio path vs loopback")
elif ch1_count == ch2_count == 0: elif ch1_count == ch2_count == 0 and (not nt or nt['total_count'] == 0):
print("\n✅ EXCELLENT: No artifacts detected in either path!") print("\n✅ EXCELLENT: No artifacts detected in either path!")
else: else:
print(f"\n Loopback baseline: {ch1_count} artifacts") print(f"\n Loopback baseline: {ch1_count} artifacts")

View File

@@ -26,10 +26,9 @@ def main():
test_id = timestamp.strftime('%Y%m%d_%H%M%S') test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir']) results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / f"{test_id}_latency" test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency"
test_output_dir.mkdir(exist_ok=True) test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False) save_plots = config['output'].get('save_plots', False)
@@ -47,7 +46,9 @@ def main():
try: try:
latency_stats = run_latency_test(config, num_measurements=args.measurements, latency_stats = run_latency_test(config, num_measurements=args.measurements,
save_plots=save_plots, output_dir=test_output_dir) save_plots=save_plots, output_dir=test_output_dir)
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, " valid = latency_stats.get('valid', True)
status = "PASS" if valid else "FAIL"
print(f"{'' if valid else ''} Latency [{status}]: avg={latency_stats['avg']:.3f}ms, "
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, " f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
f"std={latency_stats['std']:.3f}ms") f"std={latency_stats['std']:.3f}ms")
except Exception as e: except Exception as e:

View File

@@ -263,10 +263,9 @@ def main():
test_id = timestamp.strftime('%Y%m%d_%H%M%S') test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir']) results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / f"{test_id}_latency_buildup" test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency_buildup"
test_output_dir.mkdir(exist_ok=True) test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False) save_plots = config['output'].get('save_plots', False)

527
test_matrix.py Normal file
View File

@@ -0,0 +1,527 @@
#!/usr/bin/env python3
import argparse
import copy
import sys
import time
import yaml
import requests
import numpy as np
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_latency_test, run_artifact_detection_test
# ---------------------------------------------------------------------------
# Parameter definitions
# ---------------------------------------------------------------------------
QOS_PROFILES = {
'fast': {'number_of_retransmissions': 2, 'max_transport_latency_ms': 22},
'robust': {'number_of_retransmissions': 4, 'max_transport_latency_ms': 43},
}
SAMPLE_RATES = {
'16k': {'auracast_sampling_rate_hz': 16000, 'octets_per_frame': 40},
'24k': {'auracast_sampling_rate_hz': 24000, 'octets_per_frame': 60},
'48k': {'auracast_sampling_rate_hz': 48000, 'octets_per_frame': 120},
}
CHANNELS = {
'mono': {'num_bis': 1},
'stereo': {'num_bis': 2},
}
# PRESENTATION_DELAYS_MS = [10, 20, 40, 80]
PRESENTATION_DELAYS_MS = [10]
API_URL = 'http://beacon29.local:5000/init'
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def build_api_payload(qos_name: str, rate_name: str, channel_name: str, pd_ms: int) -> dict:
qos = QOS_PROFILES[qos_name]
rate = SAMPLE_RATES[rate_name]
ch = CHANNELS[channel_name]
return {
'qos_config': {
'iso_int_multiple_10ms': 1,
'number_of_retransmissions': qos['number_of_retransmissions'],
'max_transport_latency_ms': qos['max_transport_latency_ms'],
},
'debug': False,
'device_name': 'Auracaster',
'transport': '',
'auracast_device_address': 'F0:F1:F2:F3:F4:F5',
'auracast_sampling_rate_hz': rate['auracast_sampling_rate_hz'],
'octets_per_frame': rate['octets_per_frame'],
'frame_duration_us': 10000,
'presentation_delay_us': pd_ms * 1000,
'manufacturer_data': [None, None],
'immediate_rendering': False,
'assisted_listening_stream': False,
'bigs': [{
'id': 12,
'random_address': 'F1:F1:F2:F3:F4:F5',
'language': 'deu',
'name': 'Broadcast0',
'program_info': 'Vorlesung DE',
'audio_source': 'device:ch1',
'input_format': 'auto',
'loop': True,
'precode_wav': False,
'iso_que_len': 1,
'num_bis': ch['num_bis'],
'input_gain_db': 0,
}],
'analog_gain': 50,
}
STOP_URL = 'http://beacon29.local:5000/stop_audio'
def stop_device(timeout: int = 10) -> None:
"""POST to stop_audio before reconfiguring. Errors are non-fatal."""
try:
requests.post(STOP_URL, timeout=timeout,
headers={'accept': 'application/json'})
except Exception as e:
print(f" stop_audio warning: {e}")
def configure_device(payload: dict, timeout: int = 15) -> tuple:
"""POST the init payload to the device API. Returns (success, response_or_error)."""
try:
resp = requests.post(API_URL, json=payload, timeout=timeout,
headers={'accept': 'application/json',
'Content-Type': 'application/json'})
resp.raise_for_status()
try:
return True, resp.json()
except Exception:
return True, resp.text
except Exception as e:
return False, str(e)
def run_buildup_check(config: dict, duration_sec: int = 20, interval_sec: int = 1) -> dict:
"""
Lightweight buildup check: take latency measurements over duration_sec seconds,
return analysis dict with 'buildup_detected' bool and stats.
"""
measurements = []
t_end = time.time() + duration_sec
while time.time() < t_end:
try:
stats = run_latency_test(config, num_measurements=1, save_plots=False)
measurements.append(float(stats['avg']))
except Exception as e:
print(f" buildup measurement error: {e}")
remaining = t_end - time.time()
if remaining <= 0:
break
time.sleep(min(interval_sec, remaining))
if len(measurements) < 2:
return {'buildup_detected': None, 'measurements': measurements,
'note': 'insufficient_data'}
start_l = measurements[0]
end_l = measurements[-1]
change_ms = end_l - start_l
change_pct = (change_ms / start_l * 100.0) if start_l > 0 else 0.0
buildup_detected = abs(change_pct) > 5.0
x = np.arange(len(measurements))
y = np.array(measurements)
slope = float(np.polyfit(x, y, 1)[0]) if len(measurements) >= 3 else 0.0
if slope > 0.01:
trend = 'increasing'
elif slope < -0.01:
trend = 'decreasing'
else:
trend = 'stable'
return {
'buildup_detected': buildup_detected,
'start_latency_ms': round(start_l, 3),
'end_latency_ms': round(end_l, 3),
'change_ms': round(change_ms, 3),
'change_percent': round(change_pct, 2),
'trend': trend,
'measurements': [round(m, 3) for m in measurements],
}
def run_quality_check(config: dict, duration_sec: int = 180,
output_dir: Path = None) -> dict:
"""
Run artifact detection for duration_sec seconds.
Returns dict with artifacts_per_min and total_artifacts.
"""
cfg = copy.deepcopy(config)
cfg['artifact_detection']['duration'] = float(duration_sec)
cfg['artifact_detection']['startup_delay'] = 0
try:
result = run_artifact_detection_test(
cfg,
save_plots=output_dir is not None,
output_dir=output_dir,
)
dut = result['channel_2_dut']
return {
'artifacts_per_min': round(float(dut['artifact_rate_per_minute']), 2),
'total_artifacts': int(dut['total_artifacts']),
'duration_sec': duration_sec,
'artifacts_by_type': dut['artifacts_by_type'],
}
except Exception as e:
return {'error': str(e), 'artifacts_per_min': None}
# ---------------------------------------------------------------------------
# USB recovery helper
# ---------------------------------------------------------------------------
def _try_usb_audio_reset(config: dict) -> None:
"""
Try to recover the audio device after an ALSA xrun.
Strategy:
1. Reinitialize PortAudio (Pa_Terminate + Pa_Initialize) — no root needed,
closes all ALSA handles and reopens them cleanly.
2. If that fails, attempt a USB-level reset via USBDEVFS_RESET ioctl.
Requires either root or membership in the 'plugdev' group:
sudo usermod -aG plugdev $USER (then re-login)
3. Always finish with a 3 s settle sleep.
"""
import fcntl
import os
import re
import sounddevice as _sd
USBDEVFS_RESET = 0x5514
# Stop any active sounddevice stream first
try:
_sd.stop()
except Exception:
pass
# USB-level reset via ioctl (equivalent to replug)
device_name = config['audio'].get('device_name', 'Scarlett')
try:
with open('/proc/asound/cards') as f:
cards_text = f.read()
card_num = None
for line in cards_text.splitlines():
if device_name.lower() in line.lower():
m = re.match(r'\s*(\d+)', line)
if m:
card_num = m.group(1)
break
if card_num is not None:
card_sysfs = f'/sys/class/sound/card{card_num}'
real_path = Path(os.path.realpath(card_sysfs))
usb_dev_path = None
for parent in real_path.parents:
if (parent / 'idVendor').exists():
usb_dev_path = parent
break
if usb_dev_path is not None:
bus_num = int((usb_dev_path / 'busnum').read_text().strip())
dev_num = int((usb_dev_path / 'devnum').read_text().strip())
dev_file = f'/dev/bus/usb/{bus_num:03d}/{dev_num:03d}'
with open(dev_file, 'wb') as f:
fcntl.ioctl(f, USBDEVFS_RESET, 0)
print(f" Recovery: USB reset of {dev_file} OK")
except PermissionError as e:
print(f" Recovery: USB reset skipped (permission denied — "
f"add yourself to plugdev: sudo usermod -aG plugdev $USER)")
except Exception as e:
print(f" Recovery: USB reset skipped ({e})")
time.sleep(3)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description='Run matrix test across all QoS/rate/channel/delay combinations')
parser.add_argument('--serial-number', required=True,
help='Serial number (e.g. SN001234)')
parser.add_argument('--software-version', required=True,
help='Software version / git commit hash')
parser.add_argument('--comment', default='',
help='Free-text comment for this test run')
parser.add_argument('--config', default='config.yaml',
help='Path to config file')
parser.add_argument('--measurements', type=int, default=5,
help='Latency measurements per combination (default: 5)')
parser.add_argument('--settle-time', type=int, default=5,
help='Seconds to wait after API call before measuring (default: 15)')
parser.add_argument('--buildup', action='store_true',
help='Run 20 s buildup test per combination')
parser.add_argument('--quality', action='store_true',
help='Run 3 min quality/artifact test per combination')
parser.add_argument('--quality-duration', type=int, default=180,
help='Quality test duration in seconds (default: 180)')
parser.add_argument('--dry-run', action='store_true',
help='Skip API calls and audio measurements (for testing the script)')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
timestamp = datetime.now()
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
test_output_dir = (results_dir
/ timestamp.strftime('%Y')
/ timestamp.strftime('%m')
/ timestamp.strftime('%d')
/ f"{test_id}_matrix")
test_output_dir.mkdir(parents=True, exist_ok=True)
# All combinations in the specified order
combos = [
(qos, rate, ch, pd)
for qos in ['fast', 'robust']
for rate in ['16k', '24k', '48k']
for ch in ['mono', 'stereo']
for pd in PRESENTATION_DELAYS_MS
]
total = len(combos)
print("=" * 70)
print("MATRIX TEST")
print("=" * 70)
print(f"Test ID: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Combinations: {total}")
print(f"Measurements/combo: {args.measurements}")
print(f"Settle time: {args.settle_time} s")
print(f"Buildup test: {'yes (20 s)' if args.buildup else 'no'}")
print(f"Quality test: {'yes (' + str(args.quality_duration) + ' s)' if args.quality else 'no'}")
if args.dry_run:
print("DRY RUN MODE - no API calls or audio measurements")
print("=" * 70)
def run_combo(qos, rate, ch, pd):
"""Run a single combination and return its result dict."""
payload = build_api_payload(qos, rate, ch, pd)
result = {
'qos': qos,
'sample_rate': rate,
'channels': ch,
'presentation_delay_ms': pd,
'api_payload': payload,
'api_success': None,
'latency': None,
'buildup': None,
'quality': None,
}
if not args.dry_run:
stop_device()
ok, api_resp = configure_device(payload)
result['api_success'] = ok
result['api_response'] = api_resp if not ok else str(api_resp)
if not ok:
print(f" API FAILED: {api_resp}")
result['latency'] = {'error': f'API failed: {api_resp}', 'valid': False,
'avg': None}
return result
print(f" API OK -> settling {args.settle_time} s...")
time.sleep(args.settle_time)
else:
result['api_success'] = True
if not args.dry_run:
try:
lat = run_latency_test(config, num_measurements=args.measurements,
save_plots=False)
result['latency'] = {
'avg': round(float(lat['avg']), 3),
'min': round(float(lat['min']), 3),
'max': round(float(lat['max']), 3),
'std': round(float(lat['std']), 3),
'valid': bool(lat.get('valid', True)),
}
status = "PASS" if result['latency']['valid'] else "FAIL"
print(f" Latency [{status}]: avg={lat['avg']:.1f} ms "
f"std={lat['std']:.2f} ms")
except Exception as e:
result['latency'] = {'error': str(e), 'valid': False, 'avg': None}
print(f" Latency ERROR: {e}")
if not result['latency'].get('valid', False):
print(" Latency invalid — attempting USB recovery, skipping buildup/quality.")
result['latency']['alsa_error'] = True
_try_usb_audio_reset(config)
return result
else:
import random
avg = pd + random.uniform(-1, 1)
result['latency'] = {'avg': round(avg, 3), 'min': round(avg - 0.5, 3),
'max': round(avg + 0.5, 3), 'std': 0.2, 'valid': True}
if args.buildup:
if not args.dry_run:
print(f" Buildup check (20 s)...")
buildup = run_buildup_check(config, duration_sec=20, interval_sec=1)
result['buildup'] = buildup
bd = buildup.get('buildup_detected')
print(f" Buildup: {'YES ⚠' if bd else ('NO' if bd is False else 'N/A')}")
else:
result['buildup'] = {'buildup_detected': False, 'note': 'dry_run'}
if args.quality:
if not args.dry_run:
print(f" Quality test ({args.quality_duration} s)...")
combo_plot_dir = test_output_dir / f"{qos}_{rate}_{ch}_{pd}ms"
combo_plot_dir.mkdir(parents=True, exist_ok=True)
quality = run_quality_check(config, duration_sec=args.quality_duration,
output_dir=combo_plot_dir)
result['quality'] = quality
apm = quality.get('artifacts_per_min')
print(f" Quality: {f'{apm:.1f} artifacts/min' if apm is not None else 'ERROR'}")
else:
result['quality'] = {'artifacts_per_min': 0.5, 'total_artifacts': 1,
'note': 'dry_run'}
return result
matrix_results = {}
for idx, (qos, rate, ch, pd) in enumerate(combos, 1):
key = f"{qos}_{rate}_{ch}_{pd}ms"
print(f"\n[{idx:2d}/{total}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms")
matrix_results[key] = run_combo(qos, rate, ch, pd)
# --- Retry failed combinations ---
# ALSA/hardware failures always retry (up to 3 times) regardless of threshold.
# Other failures retry only if the failure rate is <= 10%.
def _is_failed(r):
lat = r.get('latency')
return lat is None or lat.get('valid') is False
def _is_alsa_failure(r):
lat = r.get('latency') or {}
return lat.get('alsa_error', False)
MAX_RETRIES = 3
for retry_round in range(1, MAX_RETRIES + 1):
failed_keys = [k for k, r in matrix_results.items() if _is_failed(r)]
if not failed_keys:
break
alsa_keys = [k for k in failed_keys if _is_alsa_failure(matrix_results[k])]
other_keys = [k for k in failed_keys if k not in alsa_keys]
retry_threshold = total * 0.10
keys_to_retry = list(alsa_keys)
if 0 < len(other_keys) <= retry_threshold:
keys_to_retry += other_keys
elif len(other_keys) > retry_threshold:
print(f"\n{len(other_keys)}/{total} non-hardware failures "
f"({len(other_keys)/total*100:.0f}%) — above 10% threshold, skipping retry.")
if not keys_to_retry:
break
n_other_retrying = len(keys_to_retry) - len(alsa_keys)
print(f"\n{'=' * 70}")
print(f"RETRY ROUND {retry_round}/{MAX_RETRIES}"
f"{len(keys_to_retry)} combo(s) "
f"[{len(alsa_keys)} hw-error, {n_other_retrying} other]")
print(f"{'=' * 70}")
for retry_idx, key in enumerate(keys_to_retry, 1):
r = matrix_results[key]
qos, rate, ch, pd = r['qos'], r['sample_rate'], r['channels'], r['presentation_delay_ms']
print(f"\n[retry {retry_round}.{retry_idx}/{len(keys_to_retry)}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms")
matrix_results[key] = run_combo(qos, rate, ch, pd)
matrix_results[key]['retried'] = True
# --- Save results ---
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'serial_number': args.serial_number,
'software_version': args.software_version,
'comment': args.comment,
'options': {
'measurements_per_combo': args.measurements,
'settle_time_sec': args.settle_time,
'buildup_enabled': args.buildup,
'quality_enabled': args.quality,
'quality_duration_sec': args.quality_duration if args.quality else None,
},
},
'matrix_results': matrix_results,
}
output_file = test_output_dir / f"{test_id}_matrix_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
# --- Auto-generate table image ---
try:
from plot_matrix import build_table
import matplotlib.pyplot as plt
show_buildup = any(r.get('buildup') is not None for r in matrix_results.values())
show_quality = any(r.get('quality') is not None for r in matrix_results.values())
fig = build_table(
matrix_results=matrix_results,
baseline_results=None,
metadata=output_data['metadata'],
baseline_metadata=None,
show_buildup=show_buildup,
show_quality=show_quality,
)
plot_file = test_output_dir / f"{test_id}_matrix_results_table.png"
fig.savefig(plot_file, dpi=150, bbox_inches='tight',
facecolor='white', edgecolor='none')
plt.close(fig)
plot_file_path = plot_file
print(f"Table image saved to: {plot_file}")
except Exception as e:
plot_file_path = None
print(f"Warning: could not auto-generate table image: {e}")
# --- Summary ---
passed = sum(1 for r in matrix_results.values()
if r.get('latency') and r['latency'].get('valid', False))
failed = total - passed
print("\n" + "=" * 70)
print(f"MATRIX TEST COMPLETE | PASS: {passed} FAIL: {failed} Total: {total}")
print(f"Results: {output_file}")
if plot_file_path:
print(f"Table: {plot_file_path.resolve()}")
print(f"Re-plot: python plot_matrix.py {output_file}")
print("=" * 70)
if __name__ == '__main__':
main()

View File

@@ -88,7 +88,7 @@ def display_results(yaml_file: Path):
def list_all_results(results_dir: Path): def list_all_results(results_dir: Path):
yaml_files = sorted(results_dir.glob("*_results.yaml")) yaml_files = sorted(results_dir.rglob("*_results.yaml"))
if not yaml_files: if not yaml_files:
print("No test results found.") print("No test results found.")