diff --git a/ai_stuff/prompts.md b/ai_stuff/prompts.md index 8cfbd0c..3d65848 100644 --- a/ai_stuff/prompts.md +++ b/ai_stuff/prompts.md @@ -52,3 +52,72 @@ Again input it into the audio interface and measure both loopback and radio path + +============ + +Implement Matrix test + + +Test: + + Fast / Robust + + 16k / 24k / 48k + + Mono / Stereo + + Presentation Delay 10 / 20 / 40 / 80 + + +For each combination test: + + Latency + + Latency buildup yes/no + + Maybe: Audio quality BUT this way test gets really long. + + + +Plot a table with the results, also compare to 'baseline' measurement. + +Use the existing tests as a guideline how to save the results. + +For setting the parameters for the tests use the API: +http://beacon29.local:5000/init +curl -X 'POST' \ 'http://beacon29.local:5000/init' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "qos_config": { "iso_int_multiple_10ms": 1, "number_of_retransmissions": 2, "max_transport_latency_ms": 23 }, "debug": false, "device_name": "Auracaster", "transport": "", "auracast_device_address": "F0:F1:F2:F3:F4:F5", "auracast_sampling_rate_hz": 16000, "octets_per_frame": 160, "frame_duration_us": 10000, "presentation_delay_us": 10000, "manufacturer_data": [ null, null ], "immediate_rendering": false, "assisted_listening_stream": false, "bigs": [ { "id": 12, "random_address": "F1:F1:F2:F3:F4:F5", "language": "deu", "name": "Broadcast0", "program_info": "Vorlesung DE", "audio_source": "device:ch1", "input_format": "auto", "loop": true, "precode_wav": false, "iso_que_len": 1, "num_bis": 1, "input_gain_db": 0 } ], "analog_gain": 50 }' + +It has to have the name Broadcast0. +qos fast is "number_of_retransmissions": 2, "max_transport_latency_ms": 23 +qos robust is "number_of_retransmissions": 4, "max_transport_latency_ms": 43 +Mono is "num_bis": 1 +Stereo is "num_bis": 2 +16k is "auracast_sampling_rate_hz": 16000, "octets_per_frame": 40 +24k is "auracast_sampling_rate_hz": 24000, "octets_per_frame": 60 +48k is "auracast_sampling_rate_hz": 48000, "octets_per_frame": 120 + +The results shall be plotted as a table: +Presentation delay 10 / 20 / 40 /80 +Mono Stereo Mono Stereo Mono Stereo ... +x +Fast 16k +Fast 24k +Fast 48k +Robust 16k +Robust 24k +Robust 48k + +For each combination you have to run the latency test. If the test fails print fail. Else print the ms value. +Optional: Also run the build up test for 20 secs. As a result just print if there is a buildup or not. +Optional: Also run the quality test for 3 min per combination and display the err/min. + +The result shall be saved as a yaml (like in all the other scripts). +Important to save the API call aswell. + +And create an image with the table. + +There should be a feature to compare this measurement to a 'baseline' measurement. +Failed tests should be colored red. +Tests significantly worse than the baseline in orange. +And better values in green. +No change should be just white. \ No newline at end of file diff --git a/config.yaml b/config.yaml index 8e3be9c..e550ced 100644 --- a/config.yaml +++ b/config.yaml @@ -40,7 +40,7 @@ artifact_detection: threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes) latency: - max_std_dev_ms: 0.5 # Maximum allowed std deviation; test fails if exceeded + max_std_dev_ms: 1.0 # Maximum allowed std deviation; test fails if exceeded min_avg_ms: 1.0 # Minimum expected average latency; near-zero indicates bad loopback latency_buildup: diff --git a/plot_matrix.py b/plot_matrix.py new file mode 100644 index 0000000..7642f43 --- /dev/null +++ b/plot_matrix.py @@ -0,0 +1,437 @@ +#!/usr/bin/env python3 +""" +Plot a results table image from a matrix test YAML file. + +Usage: + python plot_matrix.py + python plot_matrix.py --baseline + python plot_matrix.py --baseline --output table.png +""" +import argparse +import sys +from typing import Optional +import yaml +import numpy as np +import matplotlib +import matplotlib.pyplot as plt +import matplotlib.patches as mpatches +from pathlib import Path +from datetime import datetime + +# --------------------------------------------------------------------------- +# Matrix layout constants +# --------------------------------------------------------------------------- + +QOS_RATES = [ + ('fast', '16k'), + ('fast', '24k'), + ('fast', '48k'), + ('robust', '16k'), + ('robust', '24k'), + ('robust', '48k'), +] + +CHANNELS = ['mono', 'stereo'] +PRESENTATION_DELAYS_MS = [10, 20, 40, 80] + + +# --------------------------------------------------------------------------- +# Colour helpers +# --------------------------------------------------------------------------- + +COLOR_FAIL = '#FF4444' # red +COLOR_WORSE = '#FFA500' # orange +COLOR_BETTER = '#66BB6A' # green +COLOR_NEUTRAL = '#FFFFFF' # white +COLOR_MISSING = '#DDDDDD' # light grey – not run / no data +COLOR_HEADER = '#263238' # dark blue-grey header +COLOR_SUBHDR = '#455A64' # secondary header +COLOR_ROW_EVEN = '#FAFAFA' +COLOR_ROW_ODD = '#F0F4F8' +COLOR_HEADER_TEXT = '#FFFFFF' + + +def _latency_ok(lat: Optional[dict]) -> bool: + if lat is None: + return False + if lat.get('error'): + return False + if lat.get('valid') is False: + return False + return lat.get('avg') is not None + + +def _cell_color(result: dict, baseline_result: Optional[dict], + worse_threshold_pct: float = 10.0, + better_threshold_pct: float = 5.0) -> str: + """Return a hex colour for the cell.""" + lat = result.get('latency') + + if not _latency_ok(lat): + return COLOR_FAIL + + if baseline_result is None: + return COLOR_NEUTRAL + + base_lat = baseline_result.get('latency') + if not _latency_ok(base_lat): + return COLOR_NEUTRAL + + current_avg = lat['avg'] + base_avg = base_lat['avg'] + + if base_avg == 0: + return COLOR_NEUTRAL + + diff_pct = (current_avg - base_avg) / base_avg * 100.0 + + if diff_pct > worse_threshold_pct: + return COLOR_WORSE + if diff_pct < -better_threshold_pct: + return COLOR_BETTER + return COLOR_NEUTRAL + + +def _cell_text(result: dict, show_buildup: bool, show_quality: bool) -> list: + """Return list of text lines for a cell.""" + lat = result.get('latency') + lines = [] + + if not _latency_ok(lat): + err = lat.get('error', 'FAIL') if lat else 'NO DATA' + short = err[:20] if len(err) > 20 else err + lines.append('FAIL') + if short and short != 'FAIL': + lines.append(short) + return lines + + lines.append(f"{lat['avg']:.1f} ms") + + if show_buildup: + bd = result.get('buildup') + if bd is not None: + detected = bd.get('buildup_detected') + if detected is True: + lines.append('buildup: YES') + elif detected is False: + lines.append('buildup: no') + else: + lines.append('buildup: n/a') + + if show_quality: + q = result.get('quality') + if q is not None: + apm = q.get('artifacts_per_min') + if apm is not None: + lines.append(f"{apm:.1f} art/min") + else: + lines.append('quality: err') + + return lines + + +# --------------------------------------------------------------------------- +# Core table builder +# --------------------------------------------------------------------------- + +def build_table( + matrix_results: dict, + baseline_results: Optional[dict], + metadata: dict, + baseline_metadata: Optional[dict], + show_buildup: bool, + show_quality: bool, + worse_threshold_pct: float = 10.0, + better_threshold_pct: float = 5.0, +) -> plt.Figure: + """ + Build and return a matplotlib Figure containing the results table. + """ + n_rows = len(QOS_RATES) # 6 + n_pd = len(PRESENTATION_DELAYS_MS) # 4 + n_ch = len(CHANNELS) # 2 + n_cols = n_pd * n_ch # 8 + + # Determine cell height based on content rows per cell + lines_per_cell = 1 + if show_buildup: + lines_per_cell += 1 + if show_quality: + lines_per_cell += 1 + + cell_h = 0.5 + 0.22 * lines_per_cell # inches + cell_w = 1.45 # inches + row_label_w = 1.4 # inches for row labels + + hdr_h = 0.55 # top presentation-delay header row + sub_h = 0.38 # mono/stereo sub-header row + + total_w = row_label_w + n_cols * cell_w + 0.3 + total_h = hdr_h + sub_h + n_rows * cell_h + 1.6 # extra for title & legend + + fig, ax = plt.subplots(figsize=(total_w, total_h)) + ax.set_xlim(0, total_w) + ax.set_ylim(0, total_h) + ax.axis('off') + + # coordinate helpers (y grows upward in matplotlib, so we flip) + def x_col(col_idx: int) -> float: + return row_label_w + col_idx * cell_w + + def y_row(row_idx: int) -> float: + # row 0 = topmost data row + return total_h - 1.4 - hdr_h - sub_h - (row_idx + 1) * cell_h + + def add_rect(x, y, w, h, facecolor, edgecolor='#90A4AE', lw=0.6, zorder=1): + rect = mpatches.FancyBboxPatch( + (x, y), w, h, + boxstyle='square,pad=0', + facecolor=facecolor, edgecolor=edgecolor, linewidth=lw, zorder=zorder) + ax.add_patch(rect) + + def add_text(x, y, text, fontsize=8, color='black', ha='center', va='center', + bold=False, wrap_lines=None): + weight = 'bold' if bold else 'normal' + if wrap_lines: + for i, line in enumerate(wrap_lines): + offset = (len(wrap_lines) - 1) / 2.0 - i + ax.text(x, y + offset * (fontsize * 0.014), + line, fontsize=fontsize, color=color, + ha=ha, va='center', fontweight=weight, + clip_on=True) + else: + ax.text(x, y, text, fontsize=fontsize, color=color, + ha=ha, va='center', fontweight=weight, clip_on=True) + + # ----------------------------------------------------------------------- + # Title + # ----------------------------------------------------------------------- + ts = metadata.get('timestamp', '') + try: + ts_fmt = datetime.fromisoformat(ts).strftime('%Y-%m-%d %H:%M') + except Exception: + ts_fmt = ts + + title_lines = [ + f"Matrix Test Results — {metadata.get('test_id', '')}", + f"SN: {metadata.get('serial_number', 'n/a')} SW: {metadata.get('software_version', 'n/a')} {ts_fmt}", + ] + if metadata.get('comment'): + title_lines.append(f"Comment: {metadata['comment']}") + if baseline_metadata: + title_lines.append( + f"Baseline: {baseline_metadata.get('test_id', 'n/a')} " + f"({baseline_metadata.get('timestamp', '')[:10]})" + ) + + title_y = total_h - 0.25 + for i, line in enumerate(title_lines): + ax.text(total_w / 2, title_y - i * 0.28, line, + fontsize=9 if i == 0 else 7.5, + fontweight='bold' if i == 0 else 'normal', + ha='center', va='top', color='#1A237E') + + # ----------------------------------------------------------------------- + # Row label column header (top-left corner block) + # ----------------------------------------------------------------------- + hdr_top = total_h - 1.4 + # Spans presentation-delay header + mono/stereo sub-header + add_rect(0, hdr_top - hdr_h - sub_h, row_label_w, hdr_h + sub_h, + facecolor=COLOR_HEADER) + add_text(row_label_w / 2, hdr_top - (hdr_h + sub_h) / 2, + 'QoS / Rate', fontsize=8, color=COLOR_HEADER_TEXT, bold=True) + + # ----------------------------------------------------------------------- + # Presentation-delay group headers + # ----------------------------------------------------------------------- + for pd_idx, pd_ms in enumerate(PRESENTATION_DELAYS_MS): + col_start = pd_idx * n_ch + x = x_col(col_start) + w = cell_w * n_ch + add_rect(x, hdr_top - hdr_h, w, hdr_h, facecolor=COLOR_HEADER) + add_text(x + w / 2, hdr_top - hdr_h / 2, + f'PD {pd_ms} ms', fontsize=8.5, color=COLOR_HEADER_TEXT, bold=True) + + # ----------------------------------------------------------------------- + # Mono / Stereo sub-headers + # ----------------------------------------------------------------------- + sub_top = hdr_top - hdr_h + for col in range(n_cols): + ch = CHANNELS[col % n_ch] + x = x_col(col) + add_rect(x, sub_top - sub_h, cell_w, sub_h, facecolor=COLOR_SUBHDR) + add_text(x + cell_w / 2, sub_top - sub_h / 2, + ch.capitalize(), fontsize=7.5, color=COLOR_HEADER_TEXT, bold=True) + + # ----------------------------------------------------------------------- + # Data rows + # ----------------------------------------------------------------------- + for row_idx, (qos, rate) in enumerate(QOS_RATES): + row_bg = COLOR_ROW_EVEN if row_idx % 2 == 0 else COLOR_ROW_ODD + + # Row label + y = y_row(row_idx) + add_rect(0, y, row_label_w, cell_h, facecolor=COLOR_SUBHDR if row_idx < 3 else '#37474F') + label = f"{'Fast' if qos == 'fast' else 'Robust'} {rate}" + add_text(row_label_w / 2, y + cell_h / 2, + label, fontsize=8, color=COLOR_HEADER_TEXT, bold=True) + + for col_idx, (pd_ms, ch) in enumerate( + [(pd, ch) + for pd in PRESENTATION_DELAYS_MS + for ch in CHANNELS]): + key = f"{qos}_{rate}_{ch}_{pd_ms}ms" + result = matrix_results.get(key) + baseline_result = baseline_results.get(key) if baseline_results else None + + x = x_col(col_idx) + + if result is None: + add_rect(x, y, cell_w, cell_h, facecolor=COLOR_MISSING) + add_text(x + cell_w / 2, y + cell_h / 2, '—', fontsize=8) + continue + + color = _cell_color(result, baseline_result, + worse_threshold_pct, better_threshold_pct) + add_rect(x, y, cell_w, cell_h, facecolor=color) + + lines = _cell_text(result, show_buildup, show_quality) + # font size depends on how many lines + fs = 8.5 if len(lines) == 1 else 7.5 + is_fail = color == COLOR_FAIL + txt_color = '#FFFFFF' if is_fail else '#1A1A2E' + + # centre vertically + n = len(lines) + line_gap = cell_h / (n + 1) + for li, line in enumerate(lines): + line_y = y + cell_h - line_gap * (li + 1) + bold_line = li == 0 # first line (latency) is bold + ax.text(x + cell_w / 2, line_y, line, + fontsize=fs if li == 0 else fs - 0.5, + color=txt_color, + ha='center', va='center', + fontweight='bold' if bold_line else 'normal', + clip_on=True) + + # ----------------------------------------------------------------------- + # Outer border for the full table + # ----------------------------------------------------------------------- + table_x = 0 + table_y = y_row(n_rows - 1) + table_w = row_label_w + n_cols * cell_w + table_h_total = hdr_top - table_y + rect = mpatches.Rectangle((table_x, table_y), table_w, table_h_total, + fill=False, edgecolor='#37474F', linewidth=1.5) + ax.add_patch(rect) + + # ----------------------------------------------------------------------- + # Legend + # ----------------------------------------------------------------------- + legend_y = y_row(n_rows - 1) - 0.55 + legend_items = [ + (COLOR_FAIL, 'FAIL / error'), + (COLOR_WORSE, f'>{worse_threshold_pct:.0f}% worse than baseline'), + (COLOR_NEUTRAL, 'Within threshold'), + (COLOR_BETTER, f'>{better_threshold_pct:.0f}% better than baseline'), + (COLOR_MISSING, 'Not measured'), + ] + lx = 0.2 + for color, label in legend_items: + add_rect(lx, legend_y - 0.18, 0.28, 0.25, facecolor=color, + edgecolor='#90A4AE', lw=0.8) + ax.text(lx + 0.35, legend_y - 0.055, label, fontsize=7, va='center') + lx += 2.2 + + plt.tight_layout(pad=0.1) + return fig + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def load_matrix_results(path: Path) -> tuple: + """Load a matrix results YAML and return (matrix_results, metadata).""" + with open(path, 'r') as f: + data = yaml.safe_load(f) + return data.get('matrix_results', {}), data.get('metadata', {}) + + +def main(): + parser = argparse.ArgumentParser( + description='Plot matrix test results as a table image') + parser.add_argument('results', help='Path to matrix results YAML file') + parser.add_argument('--baseline', default=None, + help='Path to baseline matrix results YAML for comparison') + parser.add_argument('--output', default=None, + help='Output image path (default: _table.png)') + parser.add_argument('--worse-threshold', type=float, default=10.0, + help='Percent worse than baseline to colour orange (default: 10)') + parser.add_argument('--better-threshold', type=float, default=5.0, + help='Percent better than baseline to colour green (default: 5)') + parser.add_argument('--dpi', type=int, default=150, + help='Output image DPI (default: 150)') + args = parser.parse_args() + + results_path = Path(args.results) + if not results_path.exists(): + print(f"ERROR: Results file not found: {results_path}", file=sys.stderr) + sys.exit(1) + + matrix_results, metadata = load_matrix_results(results_path) + + baseline_results = None + baseline_metadata = None + if args.baseline: + baseline_path = Path(args.baseline) + if not baseline_path.exists(): + print(f"ERROR: Baseline file not found: {baseline_path}", file=sys.stderr) + sys.exit(1) + baseline_results, baseline_metadata = load_matrix_results(baseline_path) + print(f"Comparing against baseline: {baseline_path.name}") + + # Detect which optional columns are present + show_buildup = any( + r.get('buildup') is not None + for r in matrix_results.values() + ) + show_quality = any( + r.get('quality') is not None + for r in matrix_results.values() + ) + + print(f"Results: {len(matrix_results)} combinations") + print(f"Show buildup column: {show_buildup}") + print(f"Show quality column: {show_quality}") + + fig = build_table( + matrix_results=matrix_results, + baseline_results=baseline_results, + metadata=metadata, + baseline_metadata=baseline_metadata, + show_buildup=show_buildup, + show_quality=show_quality, + worse_threshold_pct=args.worse_threshold, + better_threshold_pct=args.better_threshold, + ) + + # Always save next to the results YAML + folder_copy = results_path.parent / f"{results_path.stem}_table.png" + fig.savefig(folder_copy, dpi=args.dpi, bbox_inches='tight', + facecolor='white', edgecolor='none') + print(f"Table saved to: {folder_copy}") + + # If a custom --output path was given (and differs), save there too + if args.output: + output_path = Path(args.output) + if output_path.resolve() != folder_copy.resolve(): + fig.savefig(output_path, dpi=args.dpi, bbox_inches='tight', + facecolor='white', edgecolor='none') + print(f"Table also saved to: {output_path}") + + plt.close(fig) + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt index 7ab305e..7fd8c82 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ scipy>=1.10.0 sounddevice>=0.4.6 PyYAML>=6.0 matplotlib>=3.7.0 +requests>=2.28.0 diff --git a/src/audio_tests.py b/src/audio_tests.py index 511871a..a4ec4d8 100644 --- a/src/audio_tests.py +++ b/src/audio_tests.py @@ -1,3 +1,4 @@ +import time import numpy as np import sounddevice as sd from scipy import signal @@ -8,26 +9,25 @@ from pathlib import Path def find_audio_device(device_name: str = "Scarlett") -> tuple: devices = sd.query_devices() - for idx, device in enumerate(devices): if device_name.lower() in device['name'].lower(): if device['max_input_channels'] >= 2 and device['max_output_channels'] >= 2: return (idx, idx) - + default_device = sd.default.device if hasattr(default_device, '__getitem__'): input_dev = int(default_device[0]) if default_device[0] is not None else 0 output_dev = int(default_device[1]) if default_device[1] is not None else 0 else: input_dev = output_dev = int(default_device) if default_device is not None else 0 - + input_info = devices[input_dev] output_info = devices[output_dev] - + if input_info['max_input_channels'] >= 2 and output_info['max_output_channels'] >= 2: print(f"Using default device - Input: {input_info['name']}, Output: {output_info['name']}") return (input_dev, output_dev) - + raise RuntimeError(f"No suitable audio device found with 2+ input/output channels") @@ -45,11 +45,18 @@ def generate_chirp(duration: float, sample_rate: int, f0: float = 100, f1: float def play_and_record(tone: np.ndarray, sample_rate: int, device_id: tuple, channels: int = 2) -> np.ndarray: output_signal = np.column_stack([tone, tone]) - input_dev, output_dev = device_id - recording = sd.playrec(output_signal, samplerate=sample_rate, - channels=channels, device=(input_dev, output_dev), blocking=True) - + + sd.stop() + recording = sd.playrec(output_signal, samplerate=sample_rate, + channels=channels, device=(input_dev, output_dev), + latency='high', blocking=True) + sd.stop() + + if not np.isfinite(recording).all(): + raise RuntimeError("Recording contains NaN/Inf — ALSA stream corrupted. " + "Try replugging the audio interface.") + return recording @@ -213,9 +220,15 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool = channels = config['audio']['channels'] device_ids = find_audio_device(device_name) - + chirp_signal = generate_chirp(duration, sample_rate, amplitude=amplitude) - + + # Discard one warm-up recording to flush stale ALSA ring buffer data + try: + play_and_record(chirp_signal, sample_rate, device_ids, channels) + except Exception: + pass + latencies = [] last_recording = None last_correlation = None diff --git a/test_matrix.py b/test_matrix.py new file mode 100644 index 0000000..f2acaab --- /dev/null +++ b/test_matrix.py @@ -0,0 +1,505 @@ +#!/usr/bin/env python3 +import argparse +import copy +import sys +import time +import yaml +import requests +import numpy as np +from datetime import datetime +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from src.audio_tests import run_latency_test, run_artifact_detection_test + + +# --------------------------------------------------------------------------- +# Parameter definitions +# --------------------------------------------------------------------------- + +QOS_PROFILES = { + 'fast': {'number_of_retransmissions': 2, 'max_transport_latency_ms': 22}, + 'robust': {'number_of_retransmissions': 4, 'max_transport_latency_ms': 43}, +} + +SAMPLE_RATES = { + '16k': {'auracast_sampling_rate_hz': 16000, 'octets_per_frame': 40}, + '24k': {'auracast_sampling_rate_hz': 24000, 'octets_per_frame': 60}, + '48k': {'auracast_sampling_rate_hz': 48000, 'octets_per_frame': 120}, +} + +CHANNELS = { + 'mono': {'num_bis': 1}, + 'stereo': {'num_bis': 2}, +} + +# PRESENTATION_DELAYS_MS = [10, 20, 40, 80] +PRESENTATION_DELAYS_MS = [10] + +API_URL = 'http://beacon29.local:5000/init' + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def build_api_payload(qos_name: str, rate_name: str, channel_name: str, pd_ms: int) -> dict: + qos = QOS_PROFILES[qos_name] + rate = SAMPLE_RATES[rate_name] + ch = CHANNELS[channel_name] + return { + 'qos_config': { + 'iso_int_multiple_10ms': 1, + 'number_of_retransmissions': qos['number_of_retransmissions'], + 'max_transport_latency_ms': qos['max_transport_latency_ms'], + }, + 'debug': False, + 'device_name': 'Auracaster', + 'transport': '', + 'auracast_device_address': 'F0:F1:F2:F3:F4:F5', + 'auracast_sampling_rate_hz': rate['auracast_sampling_rate_hz'], + 'octets_per_frame': rate['octets_per_frame'], + 'frame_duration_us': 10000, + 'presentation_delay_us': pd_ms * 1000, + 'manufacturer_data': [None, None], + 'immediate_rendering': False, + 'assisted_listening_stream': False, + 'bigs': [{ + 'id': 12, + 'random_address': 'F1:F1:F2:F3:F4:F5', + 'language': 'deu', + 'name': 'Broadcast0', + 'program_info': 'Vorlesung DE', + 'audio_source': 'device:ch1', + 'input_format': 'auto', + 'loop': True, + 'precode_wav': False, + 'iso_que_len': 1, + 'num_bis': ch['num_bis'], + 'input_gain_db': 0, + }], + 'analog_gain': 50, + } + + +STOP_URL = 'http://beacon29.local:5000/stop_audio' + + +def stop_device(timeout: int = 10) -> None: + """POST to stop_audio before reconfiguring. Errors are non-fatal.""" + try: + requests.post(STOP_URL, timeout=timeout, + headers={'accept': 'application/json'}) + except Exception as e: + print(f" stop_audio warning: {e}") + + +def configure_device(payload: dict, timeout: int = 15) -> tuple: + """POST the init payload to the device API. Returns (success, response_or_error).""" + try: + resp = requests.post(API_URL, json=payload, timeout=timeout, + headers={'accept': 'application/json', + 'Content-Type': 'application/json'}) + resp.raise_for_status() + try: + return True, resp.json() + except Exception: + return True, resp.text + except Exception as e: + return False, str(e) + + +def run_buildup_check(config: dict, duration_sec: int = 20, interval_sec: int = 1) -> dict: + """ + Lightweight buildup check: take latency measurements over duration_sec seconds, + return analysis dict with 'buildup_detected' bool and stats. + """ + measurements = [] + t_end = time.time() + duration_sec + + while time.time() < t_end: + try: + stats = run_latency_test(config, num_measurements=1, save_plots=False) + measurements.append(float(stats['avg'])) + except Exception as e: + print(f" buildup measurement error: {e}") + remaining = t_end - time.time() + if remaining <= 0: + break + time.sleep(min(interval_sec, remaining)) + + if len(measurements) < 2: + return {'buildup_detected': None, 'measurements': measurements, + 'note': 'insufficient_data'} + + start_l = measurements[0] + end_l = measurements[-1] + change_ms = end_l - start_l + change_pct = (change_ms / start_l * 100.0) if start_l > 0 else 0.0 + buildup_detected = abs(change_pct) > 5.0 + + x = np.arange(len(measurements)) + y = np.array(measurements) + slope = float(np.polyfit(x, y, 1)[0]) if len(measurements) >= 3 else 0.0 + if slope > 0.01: + trend = 'increasing' + elif slope < -0.01: + trend = 'decreasing' + else: + trend = 'stable' + + return { + 'buildup_detected': buildup_detected, + 'start_latency_ms': round(start_l, 3), + 'end_latency_ms': round(end_l, 3), + 'change_ms': round(change_ms, 3), + 'change_percent': round(change_pct, 2), + 'trend': trend, + 'measurements': [round(m, 3) for m in measurements], + } + + +def run_quality_check(config: dict, duration_sec: int = 180, + output_dir: Path = None) -> dict: + """ + Run artifact detection for duration_sec seconds. + Returns dict with artifacts_per_min and total_artifacts. + """ + cfg = copy.deepcopy(config) + cfg['artifact_detection']['duration'] = float(duration_sec) + cfg['artifact_detection']['startup_delay'] = 0 + + try: + result = run_artifact_detection_test( + cfg, + save_plots=output_dir is not None, + output_dir=output_dir, + ) + dut = result['channel_2_dut'] + return { + 'artifacts_per_min': round(float(dut['artifact_rate_per_minute']), 2), + 'total_artifacts': int(dut['total_artifacts']), + 'duration_sec': duration_sec, + 'artifacts_by_type': dut['artifacts_by_type'], + } + except Exception as e: + return {'error': str(e), 'artifacts_per_min': None} + + +# --------------------------------------------------------------------------- +# USB recovery helper +# --------------------------------------------------------------------------- + +def _try_usb_audio_reset(config: dict) -> None: + """ + Try to recover the audio device after an ALSA xrun. + + Strategy: + 1. Reinitialize PortAudio (Pa_Terminate + Pa_Initialize) — no root needed, + closes all ALSA handles and reopens them cleanly. + 2. If that fails, attempt a USB-level reset via USBDEVFS_RESET ioctl. + Requires either root or membership in the 'plugdev' group: + sudo usermod -aG plugdev $USER (then re-login) + 3. Always finish with a 3 s settle sleep. + """ + import fcntl + import os + import re + import sounddevice as _sd + + USBDEVFS_RESET = 0x5514 + + # Stop any active sounddevice stream first + try: + _sd.stop() + except Exception: + pass + + # USB-level reset via ioctl (equivalent to replug) + device_name = config['audio'].get('device_name', 'Scarlett') + try: + with open('/proc/asound/cards') as f: + cards_text = f.read() + + card_num = None + for line in cards_text.splitlines(): + if device_name.lower() in line.lower(): + m = re.match(r'\s*(\d+)', line) + if m: + card_num = m.group(1) + break + + if card_num is not None: + card_sysfs = f'/sys/class/sound/card{card_num}' + real_path = Path(os.path.realpath(card_sysfs)) + usb_dev_path = None + for parent in real_path.parents: + if (parent / 'idVendor').exists(): + usb_dev_path = parent + break + + if usb_dev_path is not None: + bus_num = int((usb_dev_path / 'busnum').read_text().strip()) + dev_num = int((usb_dev_path / 'devnum').read_text().strip()) + dev_file = f'/dev/bus/usb/{bus_num:03d}/{dev_num:03d}' + with open(dev_file, 'wb') as f: + fcntl.ioctl(f, USBDEVFS_RESET, 0) + print(f" Recovery: USB reset of {dev_file} OK") + + except PermissionError as e: + print(f" Recovery: USB reset skipped (permission denied — " + f"add yourself to plugdev: sudo usermod -aG plugdev $USER)") + except Exception as e: + print(f" Recovery: USB reset skipped ({e})") + + time.sleep(3) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description='Run matrix test across all QoS/rate/channel/delay combinations') + parser.add_argument('--serial-number', required=True, + help='Serial number (e.g. SN001234)') + parser.add_argument('--software-version', required=True, + help='Software version / git commit hash') + parser.add_argument('--comment', default='', + help='Free-text comment for this test run') + parser.add_argument('--config', default='config.yaml', + help='Path to config file') + parser.add_argument('--measurements', type=int, default=5, + help='Latency measurements per combination (default: 5)') + parser.add_argument('--settle-time', type=int, default=5, + help='Seconds to wait after API call before measuring (default: 15)') + parser.add_argument('--buildup', action='store_true', + help='Run 20 s buildup test per combination') + parser.add_argument('--quality', action='store_true', + help='Run 3 min quality/artifact test per combination') + parser.add_argument('--quality-duration', type=int, default=180, + help='Quality test duration in seconds (default: 180)') + parser.add_argument('--dry-run', action='store_true', + help='Skip API calls and audio measurements (for testing the script)') + args = parser.parse_args() + + with open(args.config, 'r') as f: + config = yaml.safe_load(f) + + timestamp = datetime.now() + test_id = timestamp.strftime('%Y%m%d_%H%M%S') + + results_dir = Path(config['output']['results_dir']) + test_output_dir = (results_dir + / timestamp.strftime('%Y') + / timestamp.strftime('%m') + / timestamp.strftime('%d') + / f"{test_id}_matrix") + test_output_dir.mkdir(parents=True, exist_ok=True) + + # All combinations in the specified order + combos = [ + (qos, rate, ch, pd) + for qos in ['fast', 'robust'] + for rate in ['16k', '24k', '48k'] + for ch in ['mono', 'stereo'] + for pd in PRESENTATION_DELAYS_MS + ] + + total = len(combos) + print("=" * 70) + print("MATRIX TEST") + print("=" * 70) + print(f"Test ID: {test_id}") + print(f"Serial Number: {args.serial_number}") + print(f"Software: {args.software_version}") + if args.comment: + print(f"Comment: {args.comment}") + print(f"Combinations: {total}") + print(f"Measurements/combo: {args.measurements}") + print(f"Settle time: {args.settle_time} s") + print(f"Buildup test: {'yes (20 s)' if args.buildup else 'no'}") + print(f"Quality test: {'yes (' + str(args.quality_duration) + ' s)' if args.quality else 'no'}") + if args.dry_run: + print("DRY RUN MODE - no API calls or audio measurements") + print("=" * 70) + + def run_combo(qos, rate, ch, pd): + """Run a single combination and return its result dict.""" + payload = build_api_payload(qos, rate, ch, pd) + result = { + 'qos': qos, + 'sample_rate': rate, + 'channels': ch, + 'presentation_delay_ms': pd, + 'api_payload': payload, + 'api_success': None, + 'latency': None, + 'buildup': None, + 'quality': None, + } + + if not args.dry_run: + stop_device() + ok, api_resp = configure_device(payload) + result['api_success'] = ok + result['api_response'] = api_resp if not ok else str(api_resp) + + if not ok: + print(f" API FAILED: {api_resp}") + result['latency'] = {'error': f'API failed: {api_resp}', 'valid': False, + 'avg': None} + return result + + print(f" API OK -> settling {args.settle_time} s...") + time.sleep(args.settle_time) + else: + result['api_success'] = True + + if not args.dry_run: + try: + lat = run_latency_test(config, num_measurements=args.measurements, + save_plots=False) + result['latency'] = { + 'avg': round(float(lat['avg']), 3), + 'min': round(float(lat['min']), 3), + 'max': round(float(lat['max']), 3), + 'std': round(float(lat['std']), 3), + 'valid': bool(lat.get('valid', True)), + } + status = "PASS" if result['latency']['valid'] else "FAIL" + print(f" Latency [{status}]: avg={lat['avg']:.1f} ms " + f"std={lat['std']:.2f} ms") + except Exception as e: + result['latency'] = {'error': str(e), 'valid': False, 'avg': None} + print(f" Latency ERROR: {e}") + + if not result['latency'].get('valid', False): + print(" Latency invalid — attempting USB recovery, skipping buildup/quality.") + _try_usb_audio_reset(config) + return result + else: + import random + avg = pd + random.uniform(-1, 1) + result['latency'] = {'avg': round(avg, 3), 'min': round(avg - 0.5, 3), + 'max': round(avg + 0.5, 3), 'std': 0.2, 'valid': True} + + if args.buildup: + if not args.dry_run: + print(f" Buildup check (20 s)...") + buildup = run_buildup_check(config, duration_sec=20, interval_sec=1) + result['buildup'] = buildup + bd = buildup.get('buildup_detected') + print(f" Buildup: {'YES ⚠' if bd else ('NO' if bd is False else 'N/A')}") + else: + result['buildup'] = {'buildup_detected': False, 'note': 'dry_run'} + + if args.quality: + if not args.dry_run: + print(f" Quality test ({args.quality_duration} s)...") + combo_plot_dir = test_output_dir / f"{qos}_{rate}_{ch}_{pd}ms" + combo_plot_dir.mkdir(parents=True, exist_ok=True) + quality = run_quality_check(config, duration_sec=args.quality_duration, + output_dir=combo_plot_dir) + result['quality'] = quality + apm = quality.get('artifacts_per_min') + print(f" Quality: {f'{apm:.1f} artifacts/min' if apm is not None else 'ERROR'}") + else: + result['quality'] = {'artifacts_per_min': 0.5, 'total_artifacts': 1, + 'note': 'dry_run'} + + return result + + matrix_results = {} + + for idx, (qos, rate, ch, pd) in enumerate(combos, 1): + key = f"{qos}_{rate}_{ch}_{pd}ms" + print(f"\n[{idx:2d}/{total}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms") + matrix_results[key] = run_combo(qos, rate, ch, pd) + + # --- Retry failed combinations if failure rate < 10% --- + def _is_failed(r): + lat = r.get('latency') + return lat is None or lat.get('valid') is False + + failed_keys = [k for k, r in matrix_results.items() if _is_failed(r)] + retry_threshold = total * 0.10 + + if 0 < len(failed_keys) <= retry_threshold: + print(f"\n{'=' * 70}") + print(f"RETRYING {len(failed_keys)} failed combination(s) " + f"({len(failed_keys)}/{total} = {len(failed_keys)/total*100:.0f}% < 10%)") + print(f"{'=' * 70}") + for retry_idx, key in enumerate(failed_keys, 1): + r = matrix_results[key] + qos, rate, ch, pd = r['qos'], r['sample_rate'], r['channels'], r['presentation_delay_ms'] + print(f"\n[retry {retry_idx}/{len(failed_keys)}] {qos:6s} {rate:3s} {ch:6s} PD={pd:2d}ms") + matrix_results[key] = run_combo(qos, rate, ch, pd) + matrix_results[key]['retried'] = True + elif len(failed_keys) > retry_threshold: + print(f"\n{len(failed_keys)}/{total} combinations failed " + f"({len(failed_keys)/total*100:.0f}%) — above 10% threshold, skipping retry.") + + # --- Save results --- + output_data = { + 'metadata': { + 'test_id': test_id, + 'timestamp': timestamp.isoformat(), + 'serial_number': args.serial_number, + 'software_version': args.software_version, + 'comment': args.comment, + 'options': { + 'measurements_per_combo': args.measurements, + 'settle_time_sec': args.settle_time, + 'buildup_enabled': args.buildup, + 'quality_enabled': args.quality, + 'quality_duration_sec': args.quality_duration if args.quality else None, + }, + }, + 'matrix_results': matrix_results, + } + + output_file = test_output_dir / f"{test_id}_matrix_results.yaml" + with open(output_file, 'w') as f: + yaml.dump(output_data, f, default_flow_style=False, sort_keys=False) + + # --- Auto-generate table image --- + try: + from plot_matrix import build_table + import matplotlib.pyplot as plt + show_buildup = any(r.get('buildup') is not None for r in matrix_results.values()) + show_quality = any(r.get('quality') is not None for r in matrix_results.values()) + fig = build_table( + matrix_results=matrix_results, + baseline_results=None, + metadata=output_data['metadata'], + baseline_metadata=None, + show_buildup=show_buildup, + show_quality=show_quality, + ) + plot_file = test_output_dir / f"{test_id}_matrix_results_table.png" + fig.savefig(plot_file, dpi=150, bbox_inches='tight', + facecolor='white', edgecolor='none') + plt.close(fig) + plot_file_path = plot_file + print(f"Table image saved to: {plot_file}") + except Exception as e: + plot_file_path = None + print(f"Warning: could not auto-generate table image: {e}") + + # --- Summary --- + passed = sum(1 for r in matrix_results.values() + if r.get('latency') and r['latency'].get('valid', False)) + failed = total - passed + print("\n" + "=" * 70) + print(f"MATRIX TEST COMPLETE | PASS: {passed} FAIL: {failed} Total: {total}") + print(f"Results: {output_file}") + if plot_file_path: + print(f"Table: {plot_file_path.resolve()}") + print(f"Re-plot: python plot_matrix.py {output_file}") + print("=" * 70) + + +if __name__ == '__main__': + main()