Files
closed_loop_audio_test_suite/test_matrix.py

528 lines
20 KiB
Python

#!/usr/bin/env python3
import argparse
import copy
import sys
import time
import yaml
import requests
import numpy as np
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_latency_test, run_artifact_detection_test
# ---------------------------------------------------------------------------
# Parameter definitions
# ---------------------------------------------------------------------------
QOS_PROFILES = {
'fast': {'number_of_retransmissions': 2, 'max_transport_latency_ms': 22},
'robust': {'number_of_retransmissions': 4, 'max_transport_latency_ms': 43},
}
SAMPLE_RATES = {
'16k': {'auracast_sampling_rate_hz': 16000, 'octets_per_frame': 40},
'24k': {'auracast_sampling_rate_hz': 24000, 'octets_per_frame': 60},
'48k': {'auracast_sampling_rate_hz': 48000, 'octets_per_frame': 120},
}
CHANNELS = {
'mono': {'num_bis': 1},
'stereo': {'num_bis': 2},
}
# PRESENTATION_DELAYS_MS = [10, 20, 40, 80]
PRESENTATION_DELAYS_MS = [10]
API_URL = 'http://beacon29.local:5000/init'
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def build_api_payload(qos_name: str, rate_name: str, channel_name: str, pd_ms: int) -> dict:
qos = QOS_PROFILES[qos_name]
rate = SAMPLE_RATES[rate_name]
ch = CHANNELS[channel_name]
return {
'qos_config': {
'iso_int_multiple_10ms': 1,
'number_of_retransmissions': qos['number_of_retransmissions'],
'max_transport_latency_ms': qos['max_transport_latency_ms'],
},
'debug': False,
'device_name': 'Auracaster',
'transport': '',
'auracast_device_address': 'F0:F1:F2:F3:F4:F5',
'auracast_sampling_rate_hz': rate['auracast_sampling_rate_hz'],
'octets_per_frame': rate['octets_per_frame'],
'frame_duration_us': 10000,
'presentation_delay_us': pd_ms * 1000,
'manufacturer_data': [None, None],
'immediate_rendering': False,
'assisted_listening_stream': False,
'bigs': [{
'id': 12,
'random_address': 'F1:F1:F2:F3:F4:F5',
'language': 'deu',
'name': 'Broadcast0',
'program_info': 'Vorlesung DE',
'audio_source': 'device:ch1',
'input_format': 'auto',
'loop': True,
'precode_wav': False,
'iso_que_len': 1,
'num_bis': ch['num_bis'],
'input_gain_db': 0,
}],
'analog_gain': 50,
}
STOP_URL = 'http://beacon29.local:5000/stop_audio'
def stop_device(timeout: int = 10) -> None:
"""POST to stop_audio before reconfiguring. Errors are non-fatal."""
try:
requests.post(STOP_URL, timeout=timeout,
headers={'accept': 'application/json'})
except Exception as e:
print(f" stop_audio warning: {e}")
def configure_device(payload: dict, timeout: int = 15) -> tuple:
"""POST the init payload to the device API. Returns (success, response_or_error)."""
try:
resp = requests.post(API_URL, json=payload, timeout=timeout,
headers={'accept': 'application/json',
'Content-Type': 'application/json'})
resp.raise_for_status()
try:
return True, resp.json()
except Exception:
return True, resp.text
except Exception as e:
return False, str(e)
def run_buildup_check(config: dict, duration_sec: int = 20, interval_sec: int = 1) -> dict:
"""
Lightweight buildup check: take latency measurements over duration_sec seconds,
return analysis dict with 'buildup_detected' bool and stats.
"""
measurements = []
t_end = time.time() + duration_sec
while time.time() < t_end:
try:
stats = run_latency_test(config, num_measurements=1, save_plots=False)
measurements.append(float(stats['avg']))
except Exception as e:
print(f" buildup measurement error: {e}")
remaining = t_end - time.time()
if remaining <= 0:
break
time.sleep(min(interval_sec, remaining))
if len(measurements) < 2:
return {'buildup_detected': None, 'measurements': measurements,
'note': 'insufficient_data'}
start_l = measurements[0]
end_l = measurements[-1]
change_ms = end_l - start_l
change_pct = (change_ms / start_l * 100.0) if start_l > 0 else 0.0
buildup_detected = abs(change_pct) > 5.0
x = np.arange(len(measurements))
y = np.array(measurements)
slope = float(np.polyfit(x, y, 1)[0]) if len(measurements) >= 3 else 0.0
if slope > 0.01:
trend = 'increasing'
elif slope < -0.01:
trend = 'decreasing'
else:
trend = 'stable'
return {
'buildup_detected': buildup_detected,
'start_latency_ms': round(start_l, 3),
'end_latency_ms': round(end_l, 3),
'change_ms': round(change_ms, 3),
'change_percent': round(change_pct, 2),
'trend': trend,
'measurements': [round(m, 3) for m in measurements],
}
def run_quality_check(config: dict, duration_sec: int = 180,
output_dir: Path = None) -> dict:
"""
Run artifact detection for duration_sec seconds.
Returns dict with artifacts_per_min and total_artifacts.
"""
cfg = copy.deepcopy(config)
cfg['artifact_detection']['duration'] = float(duration_sec)
cfg['artifact_detection']['startup_delay'] = 0
try:
result = run_artifact_detection_test(
cfg,
save_plots=output_dir is not None,
output_dir=output_dir,
)
dut = result['channel_2_dut']
return {
'artifacts_per_min': round(float(dut['artifact_rate_per_minute']), 2),
'total_artifacts': int(dut['total_artifacts']),
'duration_sec': duration_sec,
'artifacts_by_type': dut['artifacts_by_type'],
}
except Exception as e:
return {'error': str(e), 'artifacts_per_min': None}
# ---------------------------------------------------------------------------
# USB recovery helper
# ---------------------------------------------------------------------------
def _try_usb_audio_reset(config: dict) -> None:
"""
Try to recover the audio device after an ALSA xrun.
Strategy:
1. Reinitialize PortAudio (Pa_Terminate + Pa_Initialize) — no root needed,
closes all ALSA handles and reopens them cleanly.
2. If that fails, attempt a USB-level reset via USBDEVFS_RESET ioctl.
Requires either root or membership in the 'plugdev' group:
sudo usermod -aG plugdev $USER (then re-login)
3. Always finish with a 3 s settle sleep.
"""
import fcntl
import os
import re
import sounddevice as _sd
USBDEVFS_RESET = 0x5514
# Stop any active sounddevice stream first
try:
_sd.stop()
except Exception:
pass
# USB-level reset via ioctl (equivalent to replug)
device_name = config['audio'].get('device_name', 'Scarlett')
try:
with open('/proc/asound/cards') as f:
cards_text = f.read()
card_num = None
for line in cards_text.splitlines():
if device_name.lower() in line.lower():
m = re.match(r'\s*(\d+)', line)
if m:
card_num = m.group(1)
break
if card_num is not None:
card_sysfs = f'/sys/class/sound/card{card_num}'
real_path = Path(os.path.realpath(card_sysfs))
usb_dev_path = None
for parent in real_path.parents:
if (parent / 'idVendor').exists():
usb_dev_path = parent
break
if usb_dev_path is not None:
bus_num = int((usb_dev_path / 'busnum').read_text().strip())
dev_num = int((usb_dev_path / 'devnum').read_text().strip())
dev_file = f'/dev/bus/usb/{bus_num:03d}/{dev_num:03d}'
with open(dev_file, 'wb') as f:
fcntl.ioctl(f, USBDEVFS_RESET, 0)
print(f" Recovery: USB reset of {dev_file} OK")
except PermissionError as e:
print(f" Recovery: USB reset skipped (permission denied — "
f"add yourself to plugdev: sudo usermod -aG plugdev $USER)")
except Exception as e:
print(f" Recovery: USB reset skipped ({e})")
time.sleep(3)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description='Run matrix test across all QoS/rate/channel/delay combinations')
parser.add_argument('--serial-number', required=True,
help='Serial number (e.g. SN001234)')
parser.add_argument('--software-version', required=True,
help='Software version / git commit hash')
parser.add_argument('--comment', default='',
help='Free-text comment for this test run')
parser.add_argument('--config', default='config.yaml',
help='Path to config file')
parser.add_argument('--measurements', type=int, default=5,
help='Latency measurements per combination (default: 5)')
parser.add_argument('--settle-time', type=int, default=5,
help='Seconds to wait after API call before measuring (default: 15)')
parser.add_argument('--buildup', action='store_true',
help='Run 20 s buildup test per combination')
parser.add_argument('--quality', action='store_true',
help='Run 3 min quality/artifact test per combination')
parser.add_argument('--quality-duration', type=int, default=180,
help='Quality test duration in seconds (default: 180)')
parser.add_argument('--dry-run', action='store_true',
help='Skip API calls and audio measurements (for testing the script)')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
timestamp = datetime.now()
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
test_output_dir = (results_dir
/ timestamp.strftime('%Y')
/ timestamp.strftime('%m')
/ timestamp.strftime('%d')
/ f"{test_id}_matrix")
test_output_dir.mkdir(parents=True, exist_ok=True)
# All combinations in the specified order
combos = [
(qos, rate, ch, pd)
for qos in ['fast', 'robust']
for rate in ['16k', '24k', '48k']
for ch in ['mono', 'stereo']
for pd in PRESENTATION_DELAYS_MS
]
total = len(combos)
print("=" * 70)
print("MATRIX TEST")
print("=" * 70)
print(f"Test ID: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Combinations: {total}")
print(f"Measurements/combo: {args.measurements}")
print(f"Settle time: {args.settle_time} s")
print(f"Buildup test: {'yes (20 s)' if args.buildup else 'no'}")
print(f"Quality test: {'yes (' + str(args.quality_duration) + ' s)' if args.quality else 'no'}")
if args.dry_run:
print("DRY RUN MODE - no API calls or audio measurements")
print("=" * 70)
def run_combo(qos, rate, ch, pd):
"""Run a single combination and return its result dict."""
payload = build_api_payload(qos, rate, ch, pd)
result = {
'qos': qos,
'sample_rate': rate,
'channels': ch,
'presentation_delay_ms': pd,
'api_payload': payload,
'api_success': None,
'latency': None,
'buildup': None,
'quality': None,
}
if not args.dry_run:
stop_device()
ok, api_resp = configure_device(payload)
result['api_success'] = ok
result['api_response'] = api_resp if not ok else str(api_resp)
if not ok:
print(f" API FAILED: {api_resp}")
result['latency'] = {'error': f'API failed: {api_resp}', 'valid': False,
'avg': None}
return result
print(f" API OK -> settling {args.settle_time} s...")
time.sleep(args.settle_time)
else:
result['api_success'] = True
if not args.dry_run:
try:
lat = run_latency_test(config, num_measurements=args.measurements,
save_plots=False)
result['latency'] = {
'avg': round(float(lat['avg']), 3),
'min': round(float(lat['min']), 3),
'max': round(float(lat['max']), 3),
'std': round(float(lat['std']), 3),
'valid': bool(lat.get('valid', True)),
}
status = "PASS" if result['latency']['valid'] else "FAIL"
print(f" Latency [{status}]: avg={lat['avg']:.1f} ms "
f"std={lat['std']:.2f} ms")
except Exception as e:
result['latency'] = {'error': str(e), 'valid': False, 'avg': None}
print(f" Latency ERROR: {e}")
if not result['latency'].get('valid', False):
print(" Latency invalid — attempting USB recovery, skipping buildup/quality.")
result['latency']['alsa_error'] = True
_try_usb_audio_reset(config)
return result
else:
import random
avg = pd + random.uniform(-1, 1)
result['latency'] = {'avg': round(avg, 3), 'min': round(avg - 0.5, 3),
'max': round(avg + 0.5, 3), 'std': 0.2, 'valid': True}
if args.buildup:
if not args.dry_run:
print(f" Buildup check (20 s)...")
buildup = run_buildup_check(config, duration_sec=20, interval_sec=1)
result['buildup'] = buildup
bd = buildup.get('buildup_detected')
print(f" Buildup: {'YES ⚠' if bd else ('NO' if bd is False else 'N/A')}")
else:
result['buildup'] = {'buildup_detected': False, 'note': 'dry_run'}
if args.quality:
if not args.dry_run:
print(f" Quality test ({args.quality_duration} s)...")
combo_plot_dir = test_output_dir / f"{qos}_{rate}_{ch}_{pd}ms"
combo_plot_dir.mkdir(parents=True, exist_ok=True)
quality = run_quality_check(config, duration_sec=args.quality_duration,
output_dir=combo_plot_dir)
result['quality'] = quality
apm = quality.get('artifacts_per_min')
print(f" Quality: {f'{apm:.1f} artifacts/min' if apm is not None else 'ERROR'}")
else:
result['quality'] = {'artifacts_per_min': 0.5, 'total_artifacts': 1,
'note': 'dry_run'}
return result
matrix_results = {}
for idx, (qos, rate, ch, pd) in enumerate(combos, 1):
key = f"{qos}_{rate}_{ch}_{pd}ms"
print(f"\n[{idx:2d}/{total}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms")
matrix_results[key] = run_combo(qos, rate, ch, pd)
# --- Retry failed combinations ---
# ALSA/hardware failures always retry (up to 3 times) regardless of threshold.
# Other failures retry only if the failure rate is <= 10%.
def _is_failed(r):
lat = r.get('latency')
return lat is None or lat.get('valid') is False
def _is_alsa_failure(r):
lat = r.get('latency') or {}
return lat.get('alsa_error', False)
MAX_RETRIES = 3
for retry_round in range(1, MAX_RETRIES + 1):
failed_keys = [k for k, r in matrix_results.items() if _is_failed(r)]
if not failed_keys:
break
alsa_keys = [k for k in failed_keys if _is_alsa_failure(matrix_results[k])]
other_keys = [k for k in failed_keys if k not in alsa_keys]
retry_threshold = total * 0.10
keys_to_retry = list(alsa_keys)
if 0 < len(other_keys) <= retry_threshold:
keys_to_retry += other_keys
elif len(other_keys) > retry_threshold:
print(f"\n{len(other_keys)}/{total} non-hardware failures "
f"({len(other_keys)/total*100:.0f}%) — above 10% threshold, skipping retry.")
if not keys_to_retry:
break
n_other_retrying = len(keys_to_retry) - len(alsa_keys)
print(f"\n{'=' * 70}")
print(f"RETRY ROUND {retry_round}/{MAX_RETRIES}"
f"{len(keys_to_retry)} combo(s) "
f"[{len(alsa_keys)} hw-error, {n_other_retrying} other]")
print(f"{'=' * 70}")
for retry_idx, key in enumerate(keys_to_retry, 1):
r = matrix_results[key]
qos, rate, ch, pd = r['qos'], r['sample_rate'], r['channels'], r['presentation_delay_ms']
print(f"\n[retry {retry_round}.{retry_idx}/{len(keys_to_retry)}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms")
matrix_results[key] = run_combo(qos, rate, ch, pd)
matrix_results[key]['retried'] = True
# --- Save results ---
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'serial_number': args.serial_number,
'software_version': args.software_version,
'comment': args.comment,
'options': {
'measurements_per_combo': args.measurements,
'settle_time_sec': args.settle_time,
'buildup_enabled': args.buildup,
'quality_enabled': args.quality,
'quality_duration_sec': args.quality_duration if args.quality else None,
},
},
'matrix_results': matrix_results,
}
output_file = test_output_dir / f"{test_id}_matrix_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
# --- Auto-generate table image ---
try:
from plot_matrix import build_table
import matplotlib.pyplot as plt
show_buildup = any(r.get('buildup') is not None for r in matrix_results.values())
show_quality = any(r.get('quality') is not None for r in matrix_results.values())
fig = build_table(
matrix_results=matrix_results,
baseline_results=None,
metadata=output_data['metadata'],
baseline_metadata=None,
show_buildup=show_buildup,
show_quality=show_quality,
)
plot_file = test_output_dir / f"{test_id}_matrix_results_table.png"
fig.savefig(plot_file, dpi=150, bbox_inches='tight',
facecolor='white', edgecolor='none')
plt.close(fig)
plot_file_path = plot_file
print(f"Table image saved to: {plot_file}")
except Exception as e:
plot_file_path = None
print(f"Warning: could not auto-generate table image: {e}")
# --- Summary ---
passed = sum(1 for r in matrix_results.values()
if r.get('latency') and r['latency'].get('valid', False))
failed = total - passed
print("\n" + "=" * 70)
print(f"MATRIX TEST COMPLETE | PASS: {passed} FAIL: {failed} Total: {total}")
print(f"Results: {output_file}")
if plot_file_path:
print(f"Table: {plot_file_path.resolve()}")
print(f"Re-plot: python plot_matrix.py {output_file}")
print("=" * 70)
if __name__ == '__main__':
main()