#!/usr/bin/env python3 import argparse import yaml import time import signal import sys from datetime import datetime from pathlib import Path import numpy as np import matplotlib.pyplot as plt sys.path.insert(0, str(Path(__file__).parent)) from src.audio_tests import run_latency_test, find_audio_device, generate_chirp, play_and_record, calculate_latency class LatencyBuildupTest: def __init__(self, config, measurement_interval=30, max_duration=None): self.config = config self.measurement_interval = measurement_interval self.max_duration = max_duration self.running = False self.measurements = [] self.start_time = None def signal_handler(self, signum, frame): print(f"\n\n{'='*70}") print("TEST STOPPED - Generating final results...") print(f"{'='*70}") self.running = False def run_single_latency_measurement(self): """Run a single latency measurement and return the result""" try: # Use existing latency test function with 1 measurement for speed latency_stats = run_latency_test(self.config, num_measurements=1, save_plots=False, output_dir=None) return latency_stats['avg'] except Exception as e: print(f"❌ Error in latency measurement: {e}") return None def analyze_buildup(self, latencies, timestamps): """Analyze latency build-up and return analysis results""" if len(latencies) < 2: return { 'buildup_detected': False, 'start_latency': 0, 'end_latency': 0, 'change_ms': 0, 'change_percent': 0, 'trend': 'insufficient_data' } start_latency = latencies[0] end_latency = latencies[-1] change_ms = end_latency - start_latency change_percent = (change_ms / start_latency) * 100 if start_latency > 0 else 0 # Determine if buildup occurred (±5% threshold) buildup_detected = abs(change_percent) > 5.0 # Calculate trend using linear regression if len(latencies) >= 3: x = np.arange(len(latencies)) y = np.array(latencies) slope = np.polyfit(x, y, 1)[0] if slope > 0.01: # Positive trend trend = 'increasing' elif slope < -0.01: # Negative trend trend = 'decreasing' else: trend = 'stable' else: trend = 'insufficient_data' return { 'buildup_detected': buildup_detected, 'start_latency': start_latency, 'end_latency': end_latency, 'change_ms': change_ms, 'change_percent': change_percent, 'trend': trend } def plot_latency_buildup(self, timestamps, latencies, output_dir): """Create and save latency over time plot""" fig, ax = plt.subplots(1, 1, figsize=(12, 6)) # Convert timestamps to relative time in seconds relative_times = [(t - timestamps[0]).total_seconds() for t in timestamps] # Plot latency measurements ax.plot(relative_times, latencies, 'b-o', markersize=4, linewidth=2, label='Latency Measurements') # Add trend line if we have enough data if len(latencies) >= 3: x = np.array(relative_times) y = np.array(latencies) z = np.polyfit(x, y, 1) p = np.poly1d(z) ax.plot(x, p(x), "r--", alpha=0.8, linewidth=2, label=f'Trend: {z[0]:.4f} ms/s') # Add reference line for start latency ax.axhline(y=latencies[0], color='g', linestyle=':', alpha=0.7, label=f'Start: {latencies[0]:.3f} ms') ax.set_xlabel('Time (seconds)', fontsize=12) ax.set_ylabel('Latency (ms)', fontsize=12) ax.set_title('Latency Build-up Over Time', fontsize=14, fontweight='bold') ax.grid(True, alpha=0.3) ax.legend() # Format y-axis to show reasonable precision ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:.3f}')) plt.tight_layout() plot_file = output_dir / 'latency_buildup_graph.png' plt.savefig(plot_file, dpi=150, bbox_inches='tight') plt.close() return plot_file def run_test(self): """Run the latency build-up test""" self.running = True self.start_time = datetime.now() print(f"Starting latency build-up test at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"Measurement interval: {self.measurement_interval} seconds") if self.max_duration: print(f"Maximum duration: {self.max_duration} seconds") print("Press Ctrl+C to stop the test early") print("=" * 70) # Set up signal handler for graceful shutdown signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) measurement_count = 0 while self.running: current_time = datetime.now() measurement_count += 1 print(f"\n[{current_time.strftime('%H:%M:%S')}] Measurement #{measurement_count}") # Perform latency measurement latency = self.run_single_latency_measurement() if latency is not None: self.measurements.append((current_time, latency)) timestamps, latencies = zip(*self.measurements) # Calculate current statistics avg_latency = np.mean(latencies) min_latency = np.min(latencies) max_latency = np.max(latencies) std_latency = np.std(latencies) print(f" Current latency: {latency:.3f} ms") print(f" Average so far: {avg_latency:.3f} ms") print(f" Range: {min_latency:.3f} - {max_latency:.3f} ms") print(f" Std deviation: {std_latency:.3f} ms") # Analyze for buildup analysis = self.analyze_buildup(latencies, timestamps) if analysis['buildup_detected']: print(f" ⚠️ BUILDUP DETECTED: {analysis['change_percent']:+.2f}% " f"({analysis['change_ms']:+.3f} ms)") else: print(f" ✅ No significant buildup: {analysis['change_percent']:+.2f}%") print(f" Trend: {analysis['trend']}") else: print(f" ❌ Measurement failed") # Check if we should continue if self.max_duration: elapsed = (current_time - self.start_time).total_seconds() if elapsed >= self.max_duration: print(f"\nMaximum duration of {self.max_duration} seconds reached") break if not self.running: break # Wait for next measurement with interruptible sleep if self.running: print(f" Waiting {self.measurement_interval} seconds...") # Sleep in smaller chunks to allow quick interruption sleep_chunk = 1.0 # Check every second time_slept = 0 while self.running and time_slept < self.measurement_interval: time.sleep(min(sleep_chunk, self.measurement_interval - time_slept)) time_slept += sleep_chunk return self.generate_results() def generate_results(self): """Generate final results and analysis""" if not self.measurements: return {'error': 'No measurements completed'} timestamps, latencies = zip(*self.measurements) end_time = datetime.now() total_duration = (end_time - self.start_time).total_seconds() # Final analysis analysis = self.analyze_buildup(latencies, timestamps) # Statistics stats = { 'count': len(latencies), 'avg_ms': float(np.mean(latencies)), 'min_ms': float(np.min(latencies)), 'max_ms': float(np.max(latencies)), 'std_ms': float(np.std(latencies)), 'range_ms': float(np.max(latencies) - np.min(latencies)) } results = { 'test_metadata': { 'start_time': self.start_time.isoformat(), 'end_time': end_time.isoformat(), 'total_duration_sec': total_duration, 'measurement_interval_sec': self.measurement_interval, 'total_measurements': len(latencies) }, 'latency_measurements': [ { 'timestamp': t.isoformat(), 'latency_ms': float(l) } for t, l in self.measurements ], 'statistics': stats, 'buildup_analysis': analysis } return results def main(): parser = argparse.ArgumentParser(description='Run latency build-up test over time') parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)') parser.add_argument('--software-version', required=True, help='Software version (git commit hash)') parser.add_argument('--comment', default='', help='Comments about this test') parser.add_argument('--config', default='config.yaml', help='Path to config file') parser.add_argument('--interval', type=int, help='Measurement interval in seconds (default from config)') parser.add_argument('--duration', type=int, help='Maximum test duration in seconds (default: run until canceled)') args = parser.parse_args() with open(args.config, 'r') as f: config = yaml.safe_load(f) # Use config values as defaults if not overridden by command line measurement_interval = args.interval if args.interval else config['latency_buildup']['measurement_interval'] max_duration = args.duration if args.duration else config['latency_buildup'].get('max_duration') timestamp = datetime.now() test_id = timestamp.strftime('%Y%m%d_%H%M%S') results_dir = Path(config['output']['results_dir']) test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency_buildup" test_output_dir.mkdir(parents=True, exist_ok=True) save_plots = config['output'].get('save_plots', False) print("=" * 70) print("LATENCY BUILD-UP TEST") print("=" * 70) print(f"Test ID: {test_id}") print(f"Serial Number: {args.serial_number}") print(f"Software: {args.software_version}") if args.comment: print(f"Comment: {args.comment}") print(f"Measurement Interval: {measurement_interval} seconds") if max_duration: print(f"Maximum Duration: {max_duration} seconds") else: print("Duration: Run until canceled (Ctrl+C)") if save_plots: print(f"Plots will be saved to: {test_output_dir}") print("-" * 70) # Create and run test test = LatencyBuildupTest(config, measurement_interval=measurement_interval, max_duration=max_duration) results = test.run_test() # Display final results print("\n" + "=" * 70) print("TEST COMPLETE - FINAL RESULTS") print("=" * 70) if 'error' in results: print(f"❌ Test failed: {results['error']}") else: metadata = results['test_metadata'] stats = results['statistics'] analysis = results['buildup_analysis'] print(f"\n📊 Test Summary:") print(f" Duration: {metadata['total_duration_sec']:.1f} seconds") print(f" Measurements: {metadata['total_measurements']}") print(f" Interval: {metadata['measurement_interval_sec']} seconds") print(f"\n⏱️ Latency Statistics:") print(f" Average: {stats['avg_ms']:.3f} ms") print(f" Range: {stats['min_ms']:.3f} - {stats['max_ms']:.3f} ms") print(f" Std Dev: {stats['std_ms']:.3f} ms") print(f"\n📈 Build-up Analysis:") print(f" Start Latency: {analysis['start_latency']:.3f} ms") print(f" End Latency: {analysis['end_latency']:.3f} ms") print(f" Change: {analysis['change_ms']:+.3f} ms ({analysis['change_percent']:+.2f}%)") print(f" Trend: {analysis['trend']}") if analysis['buildup_detected']: print(f"\n⚠️ BUILDUP DETECTED!") print(f" Latency changed by {abs(analysis['change_percent']):.2f}% (threshold: ±5%)") else: print(f"\n✅ No significant buildup detected") print(f" Latency change within acceptable range (±5%)") # Generate and save plot if save_plots and len(results['latency_measurements']) > 1: timestamps = [datetime.fromisoformat(m['timestamp']) for m in results['latency_measurements']] latencies = [m['latency_ms'] for m in results['latency_measurements']] plot_file = test.plot_latency_buildup(timestamps, latencies, test_output_dir) print(f"\n📊 Latency graph saved to: {plot_file}") # Save results to file output_data = { 'metadata': { 'test_id': test_id, 'timestamp': timestamp.isoformat(), 'serial_number': args.serial_number, 'software_version': args.software_version, 'comment': args.comment }, 'latency_buildup_result': results } output_file = test_output_dir / f"{test_id}_latency_buildup_results.yaml" with open(output_file, 'w') as f: yaml.dump(output_data, f, default_flow_style=False, sort_keys=False) print("\n" + "=" * 70) print("✅ Results saved to:") print(f" YAML: {output_file}") if save_plots and len(results.get('latency_measurements', [])) > 1: print(f" Graph: {test_output_dir}/latency_buildup_graph.png") print("=" * 70) if __name__ == '__main__': main()