diff --git a/config.yaml b/config.yaml index b609f66..76adcc2 100644 --- a/config.yaml +++ b/config.yaml @@ -38,3 +38,8 @@ artifact_detection: energy_variation: enabled: true threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes) + +latency_buildup: + measurement_interval: 10 # seconds between latency measurements + max_duration: null # maximum test duration in seconds (null = run until canceled) + buildup_threshold_percent: 5.0 # percentage change threshold for buildup detection diff --git a/test_latency_buildup.py b/test_latency_buildup.py new file mode 100755 index 0000000..fc70be1 --- /dev/null +++ b/test_latency_buildup.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +import argparse +import yaml +import time +import signal +import sys +from datetime import datetime +from pathlib import Path +import numpy as np +import matplotlib.pyplot as plt + +sys.path.insert(0, str(Path(__file__).parent)) +from src.audio_tests import run_latency_test, find_audio_device, generate_chirp, play_and_record, calculate_latency + + +class LatencyBuildupTest: + def __init__(self, config, measurement_interval=30, max_duration=None): + self.config = config + self.measurement_interval = measurement_interval + self.max_duration = max_duration + self.running = False + self.measurements = [] + self.start_time = None + + def signal_handler(self, signum, frame): + print(f"\n\n{'='*70}") + print("TEST STOPPED - Generating final results...") + print(f"{'='*70}") + self.running = False + + def run_single_latency_measurement(self): + """Run a single latency measurement and return the result""" + try: + # Use existing latency test function with 1 measurement for speed + latency_stats = run_latency_test(self.config, num_measurements=1, + save_plots=False, output_dir=None) + return latency_stats['avg'] + except Exception as e: + print(f"❌ Error in latency measurement: {e}") + return None + + def analyze_buildup(self, latencies, timestamps): + """Analyze latency build-up and return analysis results""" + if len(latencies) < 2: + return { + 'buildup_detected': False, + 'start_latency': 0, + 'end_latency': 0, + 'change_ms': 0, + 'change_percent': 0, + 'trend': 'insufficient_data' + } + + start_latency = latencies[0] + end_latency = latencies[-1] + change_ms = end_latency - start_latency + change_percent = (change_ms / start_latency) * 100 if start_latency > 0 else 0 + + # Determine if buildup occurred (±5% threshold) + buildup_detected = abs(change_percent) > 5.0 + + # Calculate trend using linear regression + if len(latencies) >= 3: + x = np.arange(len(latencies)) + y = np.array(latencies) + slope = np.polyfit(x, y, 1)[0] + + if slope > 0.01: # Positive trend + trend = 'increasing' + elif slope < -0.01: # Negative trend + trend = 'decreasing' + else: + trend = 'stable' + else: + trend = 'insufficient_data' + + return { + 'buildup_detected': buildup_detected, + 'start_latency': start_latency, + 'end_latency': end_latency, + 'change_ms': change_ms, + 'change_percent': change_percent, + 'trend': trend + } + + def plot_latency_buildup(self, timestamps, latencies, output_dir): + """Create and save latency over time plot""" + fig, ax = plt.subplots(1, 1, figsize=(12, 6)) + + # Convert timestamps to relative time in seconds + relative_times = [(t - timestamps[0]).total_seconds() for t in timestamps] + + # Plot latency measurements + ax.plot(relative_times, latencies, 'b-o', markersize=4, linewidth=2, label='Latency Measurements') + + # Add trend line if we have enough data + if len(latencies) >= 3: + x = np.array(relative_times) + y = np.array(latencies) + z = np.polyfit(x, y, 1) + p = np.poly1d(z) + ax.plot(x, p(x), "r--", alpha=0.8, linewidth=2, label=f'Trend: {z[0]:.4f} ms/s') + + # Add reference line for start latency + ax.axhline(y=latencies[0], color='g', linestyle=':', alpha=0.7, + label=f'Start: {latencies[0]:.3f} ms') + + ax.set_xlabel('Time (seconds)', fontsize=12) + ax.set_ylabel('Latency (ms)', fontsize=12) + ax.set_title('Latency Build-up Over Time', fontsize=14, fontweight='bold') + ax.grid(True, alpha=0.3) + ax.legend() + + # Format y-axis to show reasonable precision + ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:.3f}')) + + plt.tight_layout() + plot_file = output_dir / 'latency_buildup_graph.png' + plt.savefig(plot_file, dpi=150, bbox_inches='tight') + plt.close() + + return plot_file + + def run_test(self): + """Run the latency build-up test""" + self.running = True + self.start_time = datetime.now() + + print(f"Starting latency build-up test at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Measurement interval: {self.measurement_interval} seconds") + if self.max_duration: + print(f"Maximum duration: {self.max_duration} seconds") + print("Press Ctrl+C to stop the test early") + print("=" * 70) + + # Set up signal handler for graceful shutdown + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + + measurement_count = 0 + + while self.running: + current_time = datetime.now() + measurement_count += 1 + + print(f"\n[{current_time.strftime('%H:%M:%S')}] Measurement #{measurement_count}") + + # Perform latency measurement + latency = self.run_single_latency_measurement() + + if latency is not None: + self.measurements.append((current_time, latency)) + timestamps, latencies = zip(*self.measurements) + + # Calculate current statistics + avg_latency = np.mean(latencies) + min_latency = np.min(latencies) + max_latency = np.max(latencies) + std_latency = np.std(latencies) + + print(f" Current latency: {latency:.3f} ms") + print(f" Average so far: {avg_latency:.3f} ms") + print(f" Range: {min_latency:.3f} - {max_latency:.3f} ms") + print(f" Std deviation: {std_latency:.3f} ms") + + # Analyze for buildup + analysis = self.analyze_buildup(latencies, timestamps) + if analysis['buildup_detected']: + print(f" ⚠️ BUILDUP DETECTED: {analysis['change_percent']:+.2f}% " + f"({analysis['change_ms']:+.3f} ms)") + else: + print(f" ✅ No significant buildup: {analysis['change_percent']:+.2f}%") + + print(f" Trend: {analysis['trend']}") + else: + print(f" ❌ Measurement failed") + + # Check if we should continue + if self.max_duration: + elapsed = (current_time - self.start_time).total_seconds() + if elapsed >= self.max_duration: + print(f"\nMaximum duration of {self.max_duration} seconds reached") + break + + if not self.running: + break + + # Wait for next measurement with interruptible sleep + if self.running: + print(f" Waiting {self.measurement_interval} seconds...") + # Sleep in smaller chunks to allow quick interruption + sleep_chunk = 1.0 # Check every second + time_slept = 0 + while self.running and time_slept < self.measurement_interval: + time.sleep(min(sleep_chunk, self.measurement_interval - time_slept)) + time_slept += sleep_chunk + + return self.generate_results() + + def generate_results(self): + """Generate final results and analysis""" + if not self.measurements: + return {'error': 'No measurements completed'} + + timestamps, latencies = zip(*self.measurements) + end_time = datetime.now() + total_duration = (end_time - self.start_time).total_seconds() + + # Final analysis + analysis = self.analyze_buildup(latencies, timestamps) + + # Statistics + stats = { + 'count': len(latencies), + 'avg_ms': float(np.mean(latencies)), + 'min_ms': float(np.min(latencies)), + 'max_ms': float(np.max(latencies)), + 'std_ms': float(np.std(latencies)), + 'range_ms': float(np.max(latencies) - np.min(latencies)) + } + + results = { + 'test_metadata': { + 'start_time': self.start_time.isoformat(), + 'end_time': end_time.isoformat(), + 'total_duration_sec': total_duration, + 'measurement_interval_sec': self.measurement_interval, + 'total_measurements': len(latencies) + }, + 'latency_measurements': [ + { + 'timestamp': t.isoformat(), + 'latency_ms': float(l) + } + for t, l in self.measurements + ], + 'statistics': stats, + 'buildup_analysis': analysis + } + + return results + + +def main(): + parser = argparse.ArgumentParser(description='Run latency build-up test over time') + parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)') + parser.add_argument('--software-version', required=True, help='Software version (git commit hash)') + parser.add_argument('--comment', default='', help='Comments about this test') + parser.add_argument('--config', default='config.yaml', help='Path to config file') + parser.add_argument('--interval', type=int, help='Measurement interval in seconds (default from config)') + parser.add_argument('--duration', type=int, help='Maximum test duration in seconds (default: run until canceled)') + + args = parser.parse_args() + + with open(args.config, 'r') as f: + config = yaml.safe_load(f) + + # Use config values as defaults if not overridden by command line + measurement_interval = args.interval if args.interval else config['latency_buildup']['measurement_interval'] + max_duration = args.duration if args.duration else config['latency_buildup'].get('max_duration') + + timestamp = datetime.now() + test_id = timestamp.strftime('%Y%m%d_%H%M%S') + + results_dir = Path(config['output']['results_dir']) + results_dir.mkdir(exist_ok=True) + + test_output_dir = results_dir / f"{test_id}_latency_buildup" + test_output_dir.mkdir(exist_ok=True) + + save_plots = config['output'].get('save_plots', False) + + print("=" * 70) + print("LATENCY BUILD-UP TEST") + print("=" * 70) + print(f"Test ID: {test_id}") + print(f"Serial Number: {args.serial_number}") + print(f"Software: {args.software_version}") + if args.comment: + print(f"Comment: {args.comment}") + print(f"Measurement Interval: {measurement_interval} seconds") + if max_duration: + print(f"Maximum Duration: {max_duration} seconds") + else: + print("Duration: Run until canceled (Ctrl+C)") + if save_plots: + print(f"Plots will be saved to: {test_output_dir}") + print("-" * 70) + + # Create and run test + test = LatencyBuildupTest(config, measurement_interval=measurement_interval, max_duration=max_duration) + results = test.run_test() + + # Display final results + print("\n" + "=" * 70) + print("TEST COMPLETE - FINAL RESULTS") + print("=" * 70) + + if 'error' in results: + print(f"❌ Test failed: {results['error']}") + else: + metadata = results['test_metadata'] + stats = results['statistics'] + analysis = results['buildup_analysis'] + + print(f"\n📊 Test Summary:") + print(f" Duration: {metadata['total_duration_sec']:.1f} seconds") + print(f" Measurements: {metadata['total_measurements']}") + print(f" Interval: {metadata['measurement_interval_sec']} seconds") + + print(f"\n⏱️ Latency Statistics:") + print(f" Average: {stats['avg_ms']:.3f} ms") + print(f" Range: {stats['min_ms']:.3f} - {stats['max_ms']:.3f} ms") + print(f" Std Dev: {stats['std_ms']:.3f} ms") + + print(f"\n📈 Build-up Analysis:") + print(f" Start Latency: {analysis['start_latency']:.3f} ms") + print(f" End Latency: {analysis['end_latency']:.3f} ms") + print(f" Change: {analysis['change_ms']:+.3f} ms ({analysis['change_percent']:+.2f}%)") + print(f" Trend: {analysis['trend']}") + + if analysis['buildup_detected']: + print(f"\n⚠️ BUILDUP DETECTED!") + print(f" Latency changed by {abs(analysis['change_percent']):.2f}% (threshold: ±5%)") + else: + print(f"\n✅ No significant buildup detected") + print(f" Latency change within acceptable range (±5%)") + + # Generate and save plot + if save_plots and len(results['latency_measurements']) > 1: + timestamps = [datetime.fromisoformat(m['timestamp']) for m in results['latency_measurements']] + latencies = [m['latency_ms'] for m in results['latency_measurements']] + + plot_file = test.plot_latency_buildup(timestamps, latencies, test_output_dir) + print(f"\n📊 Latency graph saved to: {plot_file}") + + # Save results to file + output_data = { + 'metadata': { + 'test_id': test_id, + 'timestamp': timestamp.isoformat(), + 'serial_number': args.serial_number, + 'software_version': args.software_version, + 'comment': args.comment + }, + 'latency_buildup_result': results + } + + output_file = test_output_dir / f"{test_id}_latency_buildup_results.yaml" + with open(output_file, 'w') as f: + yaml.dump(output_data, f, default_flow_style=False, sort_keys=False) + + print("\n" + "=" * 70) + print("✅ Results saved to:") + print(f" YAML: {output_file}") + if save_plots and len(results.get('latency_measurements', [])) > 1: + print(f" Graph: {test_output_dir}/latency_buildup_graph.png") + print("=" * 70) + + +if __name__ == '__main__': + main()