#!/usr/bin/env python3 """Combine ALSA avail, perf metrics, and latency plots into one figure.""" import sys import re import os from datetime import datetime import matplotlib.pyplot as plt import numpy as np # Regex patterns TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====") AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)") PERF_RE = re.compile( r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):" r".*?sample mean=([\d.]+)ms" r".*?write mean=([\d.]+)ms" r".*?loop mean=([\d.]+)ms" ) LATENCY_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*latency.*?(\d+(?:\.\d+)?)ms") PYALSA_AVAIL_BEFORE_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: avail before read: (\d+)") PYALSA_AVAIL_AFTER_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: .* avail=(\d+)") def parse_alsa_status(log_path): timestamps = [] avail_values = [] with open(log_path, "r") as f: current_timestamp = None for line in f: line = line.strip() ts_match = TIMESTAMP_RE.match(line) if ts_match: current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f") continue if current_timestamp: avail_match = AVAIL_RE.match(line) if avail_match: timestamps.append(current_timestamp) avail_values.append(int(avail_match.group(1))) current_timestamp = None if not timestamps: return [], [] t0 = timestamps[0] seconds = [(t - t0).total_seconds() for t in timestamps] return seconds, avail_values def parse_perf_log(log_path): timestamps = [] sample_means = [] write_means = [] loop_means = [] with open(log_path, "r") as f: for line in f: m = PERF_RE.search(line) if m: ts_str, sample, write, loop = m.groups() ts = datetime.strptime(ts_str, "%b %d %H:%M:%S") timestamps.append(ts) sample_means.append(float(sample)) write_means.append(float(write)) loop_means.append(float(loop)) if not timestamps: return [], [], [], [] t0 = timestamps[0] seconds = [(t - t0).total_seconds() for t in timestamps] return seconds, sample_means, write_means, loop_means def parse_pyalsa_avail(perf_file): """Parse PyALSA avail before/after read from the perf log file.""" before_timestamps = [] before_values = [] after_timestamps = [] after_values = [] with open(perf_file, "r") as f: for line in f: line = line.strip() # Check for "avail before read" before_match = PYALSA_AVAIL_BEFORE_RE.match(line) if before_match: ts_str, avail = before_match.groups() current_year = datetime.now().year ts_with_year = f"{current_year} {ts_str}" ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S") before_timestamps.append(ts) before_values.append(int(avail)) continue # Check for "avail=" (after read) after_match = PYALSA_AVAIL_AFTER_RE.match(line) if after_match: ts_str, avail = after_match.groups() current_year = datetime.now().year ts_with_year = f"{current_year} {ts_str}" ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S") after_timestamps.append(ts) after_values.append(int(avail)) return before_timestamps, before_values, after_timestamps, after_values def parse_latency_yaml(yaml_path): import yaml with open(yaml_path, 'r') as f: data = yaml.safe_load(f) latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', []) timestamps = [] latencies = [] for measurement in latency_measurements: ts_str = measurement['timestamp'] latency = measurement['latency_ms'] # Parse ISO format timestamp ts = datetime.fromisoformat(ts_str) timestamps.append(ts) latencies.append(float(latency)) if not timestamps: return [], [] t0 = timestamps[0] seconds = [(t - t0).total_seconds() for t in timestamps] return seconds, latencies def plot_combined(alsa_file, perf_file, latency_file, out_path): # Parse all logs alsa_seconds, avail_values = parse_alsa_status(alsa_file) perf_seconds, sample_means, write_means, loop_means = parse_perf_log(perf_file) latency_seconds, latencies = parse_latency_yaml(latency_file) # Parse PyALSA avail data before_timestamps, before_values, after_timestamps, after_values = parse_pyalsa_avail(perf_file) # Get absolute timestamps for proper alignment alsa_timestamps = [] perf_timestamps = [] latency_timestamps = [] # Re-parse to get absolute timestamps for alignment with open(alsa_file, "r") as f: current_timestamp = None for line in f: line = line.strip() ts_match = TIMESTAMP_RE.match(line) if ts_match: current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f") continue if current_timestamp: avail_match = AVAIL_RE.match(line) if avail_match: alsa_timestamps.append(current_timestamp) current_timestamp = None with open(perf_file, "r") as f: for line in f: m = PERF_RE.search(line) if m: ts_str = m.group(1) # Add current year to the timestamp since it doesn't include year current_year = datetime.now().year ts_with_year = f"{current_year} {ts_str}" ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S") perf_timestamps.append(ts) import yaml with open(latency_file, 'r') as f: data = yaml.safe_load(f) latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', []) for measurement in latency_measurements: ts_str = measurement['timestamp'] ts = datetime.fromisoformat(ts_str) latency_timestamps.append(ts) # Find earliest timestamp all_abs_timestamps = [] if alsa_timestamps: all_abs_timestamps.extend(alsa_timestamps) if perf_timestamps: all_abs_timestamps.extend(perf_timestamps) if latency_timestamps: all_abs_timestamps.extend(latency_timestamps) if before_timestamps: all_abs_timestamps.extend(before_timestamps) if after_timestamps: all_abs_timestamps.extend(after_timestamps) t0_absolute = min(all_abs_timestamps) # Convert all times to seconds from earliest timestamp alsa_aligned = [(ts - t0_absolute).total_seconds() for ts in alsa_timestamps] if alsa_timestamps else [] perf_aligned = [(ts - t0_absolute).total_seconds() for ts in perf_timestamps] if perf_timestamps else [] latency_aligned = [(ts - t0_absolute).total_seconds() for ts in latency_timestamps] if latency_timestamps else [] before_aligned = [(ts - t0_absolute).total_seconds() for ts in before_timestamps] if before_timestamps else [] after_aligned = [(ts - t0_absolute).total_seconds() for ts in after_timestamps] if after_timestamps else [] # Create figure with 4 subplots sharing x-axis fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(14, 12), sharex=True) fig.suptitle("Combined Audio Performance Metrics", fontsize=16) # Plot 1: ALSA avail if alsa_aligned and avail_values: ax1.plot(alsa_aligned, avail_values, label="avail", linewidth=1, alpha=0.7, color='blue') if len(avail_values) >= 10: window_size = min(50, len(avail_values) // 10) moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid') ma_seconds = alsa_aligned[window_size-1:] ax1.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})", linewidth=2, color='darkblue') ax1.set_ylabel("Available samples") ax1.set_title("ALSA Available Samples") ax1.legend() ax1.grid(True, alpha=0.3) # Plot 2: Perf metrics if perf_aligned: ax2.plot(perf_aligned, sample_means, label="sample mean", linewidth=1, alpha=0.8, color='green') ax2.plot(perf_aligned, write_means, label="write mean", linewidth=1, alpha=0.8, color='orange') ax2.plot(perf_aligned, loop_means, label="loop mean", linewidth=1, alpha=0.8, color='red') ax2.set_ylabel("Duration (ms)") ax2.set_title("Performance Metrics") ax2.legend() ax2.grid(True, alpha=0.3) # Plot 3: Latency if latency_aligned: ax3.plot(latency_aligned, latencies, label="latency", linewidth=1, color='purple') ax3.set_ylabel("Latency (ms)") ax3.set_title("Latency Buildup") ax3.legend() ax3.grid(True, alpha=0.3) # Plot 4: PyALSA avail before/after read if before_aligned and before_values: ax4.plot(before_aligned, before_values, label="avail before read", linewidth=1, alpha=0.7, color='cyan') if after_aligned and after_values: ax4.plot(after_aligned, after_values, label="avail after read", linewidth=1, alpha=0.7, color='magenta') ax4.set_xlabel("Time (s)") ax4.set_ylabel("Available samples") ax4.set_title("PyALSA Available Samples (Before/After Read)") if before_aligned or after_aligned: ax4.legend() ax4.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(out_path, dpi=150, bbox_inches='tight') print(f"Combined plot saved to {out_path}") # Show interactive plot plt.show() def main(): if len(sys.argv) != 4: print(f"Usage: {sys.argv[0]} ", file=sys.stderr) sys.exit(1) alsa_file = sys.argv[1] perf_file = sys.argv[2] latency_file = sys.argv[3] for file_path in [alsa_file, perf_file, latency_file]: if not os.path.isfile(file_path): print(f"File not found: {file_path}", file=sys.stderr) sys.exit(1) # Determine output path (same directory as first file) log_dir = os.path.dirname(os.path.abspath(alsa_file)) out_path = os.path.join(log_dir, "combined_audio_plot.png") plot_combined(alsa_file, perf_file, latency_file, out_path) if __name__ == "__main__": main()