362 lines
14 KiB
Python
Executable File
362 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import yaml
|
|
import time
|
|
import signal
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from src.audio_tests import run_latency_test, find_audio_device, generate_chirp, play_and_record, calculate_latency
|
|
|
|
|
|
class LatencyBuildupTest:
|
|
def __init__(self, config, measurement_interval=30, max_duration=None):
|
|
self.config = config
|
|
self.measurement_interval = measurement_interval
|
|
self.max_duration = max_duration
|
|
self.running = False
|
|
self.measurements = []
|
|
self.start_time = None
|
|
|
|
def signal_handler(self, signum, frame):
|
|
print(f"\n\n{'='*70}")
|
|
print("TEST STOPPED - Generating final results...")
|
|
print(f"{'='*70}")
|
|
self.running = False
|
|
|
|
def run_single_latency_measurement(self):
|
|
"""Run a single latency measurement and return the result"""
|
|
try:
|
|
# Use existing latency test function with 1 measurement for speed
|
|
latency_stats = run_latency_test(self.config, num_measurements=1,
|
|
save_plots=False, output_dir=None)
|
|
return latency_stats['avg']
|
|
except Exception as e:
|
|
print(f"❌ Error in latency measurement: {e}")
|
|
return None
|
|
|
|
def analyze_buildup(self, latencies, timestamps):
|
|
"""Analyze latency build-up and return analysis results"""
|
|
if len(latencies) < 2:
|
|
return {
|
|
'buildup_detected': False,
|
|
'start_latency': 0,
|
|
'end_latency': 0,
|
|
'change_ms': 0,
|
|
'change_percent': 0,
|
|
'trend': 'insufficient_data'
|
|
}
|
|
|
|
start_latency = latencies[0]
|
|
end_latency = latencies[-1]
|
|
change_ms = end_latency - start_latency
|
|
change_percent = (change_ms / start_latency) * 100 if start_latency > 0 else 0
|
|
|
|
# Determine if buildup occurred (±5% threshold)
|
|
buildup_detected = abs(change_percent) > 5.0
|
|
|
|
# Calculate trend using linear regression
|
|
if len(latencies) >= 3:
|
|
x = np.arange(len(latencies))
|
|
y = np.array(latencies)
|
|
slope = np.polyfit(x, y, 1)[0]
|
|
|
|
if slope > 0.01: # Positive trend
|
|
trend = 'increasing'
|
|
elif slope < -0.01: # Negative trend
|
|
trend = 'decreasing'
|
|
else:
|
|
trend = 'stable'
|
|
else:
|
|
trend = 'insufficient_data'
|
|
|
|
return {
|
|
'buildup_detected': buildup_detected,
|
|
'start_latency': start_latency,
|
|
'end_latency': end_latency,
|
|
'change_ms': change_ms,
|
|
'change_percent': change_percent,
|
|
'trend': trend
|
|
}
|
|
|
|
def plot_latency_buildup(self, timestamps, latencies, output_dir):
|
|
"""Create and save latency over time plot"""
|
|
fig, ax = plt.subplots(1, 1, figsize=(12, 6))
|
|
|
|
# Convert timestamps to relative time in seconds
|
|
relative_times = [(t - timestamps[0]).total_seconds() for t in timestamps]
|
|
|
|
# Plot latency measurements
|
|
ax.plot(relative_times, latencies, 'b-o', markersize=4, linewidth=2, label='Latency Measurements')
|
|
|
|
# Add trend line if we have enough data
|
|
if len(latencies) >= 3:
|
|
x = np.array(relative_times)
|
|
y = np.array(latencies)
|
|
z = np.polyfit(x, y, 1)
|
|
p = np.poly1d(z)
|
|
ax.plot(x, p(x), "r--", alpha=0.8, linewidth=2, label=f'Trend: {z[0]:.4f} ms/s')
|
|
|
|
# Add reference line for start latency
|
|
ax.axhline(y=latencies[0], color='g', linestyle=':', alpha=0.7,
|
|
label=f'Start: {latencies[0]:.3f} ms')
|
|
|
|
ax.set_xlabel('Time (seconds)', fontsize=12)
|
|
ax.set_ylabel('Latency (ms)', fontsize=12)
|
|
ax.set_title('Latency Build-up Over Time', fontsize=14, fontweight='bold')
|
|
ax.grid(True, alpha=0.3)
|
|
ax.legend()
|
|
|
|
# Format y-axis to show reasonable precision
|
|
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:.3f}'))
|
|
|
|
plt.tight_layout()
|
|
plot_file = output_dir / 'latency_buildup_graph.png'
|
|
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return plot_file
|
|
|
|
def run_test(self):
|
|
"""Run the latency build-up test"""
|
|
self.running = True
|
|
self.start_time = datetime.now()
|
|
|
|
print(f"Starting latency build-up test at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"Measurement interval: {self.measurement_interval} seconds")
|
|
if self.max_duration:
|
|
print(f"Maximum duration: {self.max_duration} seconds")
|
|
print("Press Ctrl+C to stop the test early")
|
|
print("=" * 70)
|
|
|
|
# Set up signal handler for graceful shutdown
|
|
signal.signal(signal.SIGINT, self.signal_handler)
|
|
signal.signal(signal.SIGTERM, self.signal_handler)
|
|
|
|
measurement_count = 0
|
|
|
|
while self.running:
|
|
current_time = datetime.now()
|
|
measurement_count += 1
|
|
|
|
print(f"\n[{current_time.strftime('%H:%M:%S')}] Measurement #{measurement_count}")
|
|
|
|
# Perform latency measurement
|
|
latency = self.run_single_latency_measurement()
|
|
|
|
if latency is not None:
|
|
self.measurements.append((current_time, latency))
|
|
timestamps, latencies = zip(*self.measurements)
|
|
|
|
# Calculate current statistics
|
|
avg_latency = np.mean(latencies)
|
|
min_latency = np.min(latencies)
|
|
max_latency = np.max(latencies)
|
|
std_latency = np.std(latencies)
|
|
|
|
print(f" Current latency: {latency:.3f} ms")
|
|
print(f" Average so far: {avg_latency:.3f} ms")
|
|
print(f" Range: {min_latency:.3f} - {max_latency:.3f} ms")
|
|
print(f" Std deviation: {std_latency:.3f} ms")
|
|
|
|
# Analyze for buildup
|
|
analysis = self.analyze_buildup(latencies, timestamps)
|
|
if analysis['buildup_detected']:
|
|
print(f" ⚠️ BUILDUP DETECTED: {analysis['change_percent']:+.2f}% "
|
|
f"({analysis['change_ms']:+.3f} ms)")
|
|
else:
|
|
print(f" ✅ No significant buildup: {analysis['change_percent']:+.2f}%")
|
|
|
|
print(f" Trend: {analysis['trend']}")
|
|
else:
|
|
print(f" ❌ Measurement failed")
|
|
|
|
# Check if we should continue
|
|
if self.max_duration:
|
|
elapsed = (current_time - self.start_time).total_seconds()
|
|
if elapsed >= self.max_duration:
|
|
print(f"\nMaximum duration of {self.max_duration} seconds reached")
|
|
break
|
|
|
|
if not self.running:
|
|
break
|
|
|
|
# Wait for next measurement with interruptible sleep
|
|
if self.running:
|
|
print(f" Waiting {self.measurement_interval} seconds...")
|
|
# Sleep in smaller chunks to allow quick interruption
|
|
sleep_chunk = 1.0 # Check every second
|
|
time_slept = 0
|
|
while self.running and time_slept < self.measurement_interval:
|
|
time.sleep(min(sleep_chunk, self.measurement_interval - time_slept))
|
|
time_slept += sleep_chunk
|
|
|
|
return self.generate_results()
|
|
|
|
def generate_results(self):
|
|
"""Generate final results and analysis"""
|
|
if not self.measurements:
|
|
return {'error': 'No measurements completed'}
|
|
|
|
timestamps, latencies = zip(*self.measurements)
|
|
end_time = datetime.now()
|
|
total_duration = (end_time - self.start_time).total_seconds()
|
|
|
|
# Final analysis
|
|
analysis = self.analyze_buildup(latencies, timestamps)
|
|
|
|
# Statistics
|
|
stats = {
|
|
'count': len(latencies),
|
|
'avg_ms': float(np.mean(latencies)),
|
|
'min_ms': float(np.min(latencies)),
|
|
'max_ms': float(np.max(latencies)),
|
|
'std_ms': float(np.std(latencies)),
|
|
'range_ms': float(np.max(latencies) - np.min(latencies))
|
|
}
|
|
|
|
results = {
|
|
'test_metadata': {
|
|
'start_time': self.start_time.isoformat(),
|
|
'end_time': end_time.isoformat(),
|
|
'total_duration_sec': total_duration,
|
|
'measurement_interval_sec': self.measurement_interval,
|
|
'total_measurements': len(latencies)
|
|
},
|
|
'latency_measurements': [
|
|
{
|
|
'timestamp': t.isoformat(),
|
|
'latency_ms': float(l)
|
|
}
|
|
for t, l in self.measurements
|
|
],
|
|
'statistics': stats,
|
|
'buildup_analysis': analysis
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Run latency build-up test over time')
|
|
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
|
|
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
|
|
parser.add_argument('--comment', default='', help='Comments about this test')
|
|
parser.add_argument('--config', default='config.yaml', help='Path to config file')
|
|
parser.add_argument('--interval', type=int, help='Measurement interval in seconds (default from config)')
|
|
parser.add_argument('--duration', type=int, help='Maximum test duration in seconds (default: run until canceled)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
with open(args.config, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
# Use config values as defaults if not overridden by command line
|
|
measurement_interval = args.interval if args.interval else config['latency_buildup']['measurement_interval']
|
|
max_duration = args.duration if args.duration else config['latency_buildup'].get('max_duration')
|
|
|
|
timestamp = datetime.now()
|
|
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
|
|
|
|
results_dir = Path(config['output']['results_dir'])
|
|
|
|
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency_buildup"
|
|
test_output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
save_plots = config['output'].get('save_plots', False)
|
|
|
|
print("=" * 70)
|
|
print("LATENCY BUILD-UP TEST")
|
|
print("=" * 70)
|
|
print(f"Test ID: {test_id}")
|
|
print(f"Serial Number: {args.serial_number}")
|
|
print(f"Software: {args.software_version}")
|
|
if args.comment:
|
|
print(f"Comment: {args.comment}")
|
|
print(f"Measurement Interval: {measurement_interval} seconds")
|
|
if max_duration:
|
|
print(f"Maximum Duration: {max_duration} seconds")
|
|
else:
|
|
print("Duration: Run until canceled (Ctrl+C)")
|
|
if save_plots:
|
|
print(f"Plots will be saved to: {test_output_dir}")
|
|
print("-" * 70)
|
|
|
|
# Create and run test
|
|
test = LatencyBuildupTest(config, measurement_interval=measurement_interval, max_duration=max_duration)
|
|
results = test.run_test()
|
|
|
|
# Display final results
|
|
print("\n" + "=" * 70)
|
|
print("TEST COMPLETE - FINAL RESULTS")
|
|
print("=" * 70)
|
|
|
|
if 'error' in results:
|
|
print(f"❌ Test failed: {results['error']}")
|
|
else:
|
|
metadata = results['test_metadata']
|
|
stats = results['statistics']
|
|
analysis = results['buildup_analysis']
|
|
|
|
print(f"\n📊 Test Summary:")
|
|
print(f" Duration: {metadata['total_duration_sec']:.1f} seconds")
|
|
print(f" Measurements: {metadata['total_measurements']}")
|
|
print(f" Interval: {metadata['measurement_interval_sec']} seconds")
|
|
|
|
print(f"\n⏱️ Latency Statistics:")
|
|
print(f" Average: {stats['avg_ms']:.3f} ms")
|
|
print(f" Range: {stats['min_ms']:.3f} - {stats['max_ms']:.3f} ms")
|
|
print(f" Std Dev: {stats['std_ms']:.3f} ms")
|
|
|
|
print(f"\n📈 Build-up Analysis:")
|
|
print(f" Start Latency: {analysis['start_latency']:.3f} ms")
|
|
print(f" End Latency: {analysis['end_latency']:.3f} ms")
|
|
print(f" Change: {analysis['change_ms']:+.3f} ms ({analysis['change_percent']:+.2f}%)")
|
|
print(f" Trend: {analysis['trend']}")
|
|
|
|
if analysis['buildup_detected']:
|
|
print(f"\n⚠️ BUILDUP DETECTED!")
|
|
print(f" Latency changed by {abs(analysis['change_percent']):.2f}% (threshold: ±5%)")
|
|
else:
|
|
print(f"\n✅ No significant buildup detected")
|
|
print(f" Latency change within acceptable range (±5%)")
|
|
|
|
# Generate and save plot
|
|
if save_plots and len(results['latency_measurements']) > 1:
|
|
timestamps = [datetime.fromisoformat(m['timestamp']) for m in results['latency_measurements']]
|
|
latencies = [m['latency_ms'] for m in results['latency_measurements']]
|
|
|
|
plot_file = test.plot_latency_buildup(timestamps, latencies, test_output_dir)
|
|
print(f"\n📊 Latency graph saved to: {plot_file}")
|
|
|
|
# Save results to file
|
|
output_data = {
|
|
'metadata': {
|
|
'test_id': test_id,
|
|
'timestamp': timestamp.isoformat(),
|
|
'serial_number': args.serial_number,
|
|
'software_version': args.software_version,
|
|
'comment': args.comment
|
|
},
|
|
'latency_buildup_result': results
|
|
}
|
|
|
|
output_file = test_output_dir / f"{test_id}_latency_buildup_results.yaml"
|
|
with open(output_file, 'w') as f:
|
|
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
|
|
|
|
print("\n" + "=" * 70)
|
|
print("✅ Results saved to:")
|
|
print(f" YAML: {output_file}")
|
|
if save_plots and len(results.get('latency_measurements', [])) > 1:
|
|
print(f" Graph: {test_output_dir}/latency_buildup_graph.png")
|
|
print("=" * 70)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|