Compare commits

...

5 Commits

Author SHA1 Message Date
Pbopbo
0c7de92ae9 Refactoring to sort results in year/month/day forlder. 2026-04-08 11:10:12 +02:00
Pbopbo
dd118ddb23 Adds check if test is valid to latency test. 2026-04-08 10:27:22 +02:00
Pbopbo
31cc2c0e92 more plotting 2026-04-07 16:54:05 +02:00
Pbopbo
2cab55c8cd Plotting for write read time and avail logs. 2026-03-31 11:55:44 +02:00
Pbopbo
8d3a144614 Adds latency build up test. 2026-03-31 10:53:55 +02:00
9 changed files with 866 additions and 10 deletions

View File

@@ -38,3 +38,12 @@ artifact_detection:
energy_variation:
enabled: true
threshold_db: 6.0 # Energy change threshold in dB between consecutive windows (detects level changes)
latency:
max_std_dev_ms: 0.5 # Maximum allowed std deviation; test fails if exceeded
min_avg_ms: 1.0 # Minimum expected average latency; near-zero indicates bad loopback
latency_buildup:
measurement_interval: 10 # seconds between latency measurements
max_duration: null # maximum test duration in seconds (null = run until canceled)
buildup_threshold_percent: 5.0 # percentage change threshold for buildup detection

92
plot_alsa_status.py Normal file
View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""Parse ALSA status log file and plot avail value over time."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
def parse_log(log_path):
timestamps = []
avail_values = []
with open(log_path, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
# Check for timestamp line
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
# Check for avail line (only if we have a timestamp)
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
timestamps.append(current_timestamp)
avail_values.append(int(avail_match.group(1)))
current_timestamp = None # Reset until next timestamp
if not timestamps:
print("No valid timestamp/avail pairs found in the log file.", file=sys.stderr)
sys.exit(1)
# Convert to relative seconds from first timestamp
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, avail_values
def plot(seconds, avail_values, out_path):
plt.figure(figsize=(12, 6))
plt.plot(seconds, avail_values, label="avail", linewidth=1, alpha=0.7)
# Add moving average (windowed mean)
if len(avail_values) >= 10: # Only if we have enough data points
window_size = min(50, len(avail_values) // 10) # Adaptive window size
import numpy as np
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
# Adjust timestamps for the moving average (they align with window centers)
ma_seconds = seconds[window_size-1:]
plt.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})", linewidth=2)
plt.xlabel("Time (s)")
plt.ylabel("Available samples")
plt.title("ALSA Available Samples Over Time")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
print(f"Plot saved to {out_path}")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <path_to_alsa_status_log>", file=sys.stderr)
sys.exit(1)
log_path = sys.argv[1]
if not os.path.isfile(log_path):
print(f"File not found: {log_path}", file=sys.stderr)
sys.exit(1)
seconds, avail_values = parse_log(log_path)
log_dir = os.path.dirname(os.path.abspath(log_path))
log_base = os.path.splitext(os.path.basename(log_path))[0]
out_path = os.path.join(log_dir, f"{log_base}_avail_plot.png")
plot(seconds, avail_values, out_path)
if __name__ == "__main__":
main()

302
plot_combined.py Normal file
View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""Combine ALSA avail, perf metrics, and latency plots into one figure."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
# Regex patterns
TIMESTAMP_RE = re.compile(r"^===== (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) =====")
AVAIL_RE = re.compile(r"^avail\s*:\s*(\d+)")
PERF_RE = re.compile(
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
r".*?sample mean=([\d.]+)ms"
r".*?write mean=([\d.]+)ms"
r".*?loop mean=([\d.]+)ms"
)
LATENCY_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*latency.*?(\d+(?:\.\d+)?)ms")
PYALSA_AVAIL_BEFORE_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: avail before read: (\d+)")
PYALSA_AVAIL_AFTER_RE = re.compile(r"^(\w+ \d+ \d+:\d+:\d+).*PyALSA: .* avail=(\d+)")
def parse_alsa_status(log_path):
timestamps = []
avail_values = []
with open(log_path, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
timestamps.append(current_timestamp)
avail_values.append(int(avail_match.group(1)))
current_timestamp = None
if not timestamps:
return [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, avail_values
def parse_perf_log(log_path):
timestamps = []
sample_means = []
write_means = []
loop_means = []
with open(log_path, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str, sample, write, loop = m.groups()
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
timestamps.append(ts)
sample_means.append(float(sample))
write_means.append(float(write))
loop_means.append(float(loop))
if not timestamps:
return [], [], [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, sample_means, write_means, loop_means
def parse_pyalsa_avail(perf_file):
"""Parse PyALSA avail before/after read from the perf log file."""
before_timestamps = []
before_values = []
after_timestamps = []
after_values = []
with open(perf_file, "r") as f:
for line in f:
line = line.strip()
# Check for "avail before read"
before_match = PYALSA_AVAIL_BEFORE_RE.match(line)
if before_match:
ts_str, avail = before_match.groups()
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
before_timestamps.append(ts)
before_values.append(int(avail))
continue
# Check for "avail=" (after read)
after_match = PYALSA_AVAIL_AFTER_RE.match(line)
if after_match:
ts_str, avail = after_match.groups()
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
after_timestamps.append(ts)
after_values.append(int(avail))
return before_timestamps, before_values, after_timestamps, after_values
def parse_latency_yaml(yaml_path):
import yaml
with open(yaml_path, 'r') as f:
data = yaml.safe_load(f)
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
timestamps = []
latencies = []
for measurement in latency_measurements:
ts_str = measurement['timestamp']
latency = measurement['latency_ms']
# Parse ISO format timestamp
ts = datetime.fromisoformat(ts_str)
timestamps.append(ts)
latencies.append(float(latency))
if not timestamps:
return [], []
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, latencies
def plot_combined(alsa_file, perf_file, latency_file, out_path):
# Parse all logs
alsa_seconds, avail_values = parse_alsa_status(alsa_file)
perf_seconds, sample_means, write_means, loop_means = parse_perf_log(perf_file)
latency_seconds, latencies = parse_latency_yaml(latency_file)
# Parse PyALSA avail data
before_timestamps, before_values, after_timestamps, after_values = parse_pyalsa_avail(perf_file)
# Get absolute timestamps for proper alignment
alsa_timestamps = []
perf_timestamps = []
latency_timestamps = []
# Re-parse to get absolute timestamps for alignment
with open(alsa_file, "r") as f:
current_timestamp = None
for line in f:
line = line.strip()
ts_match = TIMESTAMP_RE.match(line)
if ts_match:
current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S.%f")
continue
if current_timestamp:
avail_match = AVAIL_RE.match(line)
if avail_match:
alsa_timestamps.append(current_timestamp)
current_timestamp = None
with open(perf_file, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str = m.group(1)
# Add current year to the timestamp since it doesn't include year
current_year = datetime.now().year
ts_with_year = f"{current_year} {ts_str}"
ts = datetime.strptime(ts_with_year, "%Y %b %d %H:%M:%S")
perf_timestamps.append(ts)
import yaml
with open(latency_file, 'r') as f:
data = yaml.safe_load(f)
latency_measurements = data.get('latency_buildup_result', {}).get('latency_measurements', [])
for measurement in latency_measurements:
ts_str = measurement['timestamp']
ts = datetime.fromisoformat(ts_str)
latency_timestamps.append(ts)
# Find earliest timestamp
all_abs_timestamps = []
if alsa_timestamps:
all_abs_timestamps.extend(alsa_timestamps)
if perf_timestamps:
all_abs_timestamps.extend(perf_timestamps)
if latency_timestamps:
all_abs_timestamps.extend(latency_timestamps)
if before_timestamps:
all_abs_timestamps.extend(before_timestamps)
if after_timestamps:
all_abs_timestamps.extend(after_timestamps)
t0_absolute = min(all_abs_timestamps)
# Convert all times to seconds from earliest timestamp
alsa_aligned = [(ts - t0_absolute).total_seconds() for ts in alsa_timestamps] if alsa_timestamps else []
perf_aligned = [(ts - t0_absolute).total_seconds() for ts in perf_timestamps] if perf_timestamps else []
latency_aligned = [(ts - t0_absolute).total_seconds() for ts in latency_timestamps] if latency_timestamps else []
before_aligned = [(ts - t0_absolute).total_seconds() for ts in before_timestamps] if before_timestamps else []
after_aligned = [(ts - t0_absolute).total_seconds() for ts in after_timestamps] if after_timestamps else []
# Create figure with 4 subplots sharing x-axis
fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(14, 12), sharex=True)
fig.suptitle("Combined Audio Performance Metrics", fontsize=16)
# Plot 1: ALSA avail
if alsa_aligned and avail_values:
ax1.plot(alsa_aligned, avail_values, label="avail", linewidth=1, alpha=0.7, color='blue')
if len(avail_values) >= 10:
window_size = min(50, len(avail_values) // 10)
moving_avg = np.convolve(avail_values, np.ones(window_size)/window_size, mode='valid')
ma_seconds = alsa_aligned[window_size-1:]
ax1.plot(ma_seconds, moving_avg, label=f"moving mean (window={window_size})",
linewidth=2, color='darkblue')
ax1.set_ylabel("Available samples")
ax1.set_title("ALSA Available Samples")
ax1.legend()
ax1.grid(True, alpha=0.3)
# Plot 2: Perf metrics
if perf_aligned:
ax2.plot(perf_aligned, sample_means, label="sample mean", linewidth=1, alpha=0.8, color='green')
ax2.plot(perf_aligned, write_means, label="write mean", linewidth=1, alpha=0.8, color='orange')
ax2.plot(perf_aligned, loop_means, label="loop mean", linewidth=1, alpha=0.8, color='red')
# Add moving average for loop mean
if len(loop_means) >= 10:
window_size = min(50, len(loop_means) // 10)
moving_avg = np.convolve(loop_means, np.ones(window_size)/window_size, mode='valid')
ma_seconds = perf_aligned[window_size-1:]
ax2.plot(ma_seconds, moving_avg, label=f"loop mean moving avg (window={window_size})",
linewidth=2, color='darkred', alpha=0.9)
ax2.set_ylabel("Duration (ms)")
ax2.set_title("Performance Metrics")
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: Latency
if latency_aligned:
ax3.plot(latency_aligned, latencies, label="latency", linewidth=1, color='purple')
ax3.set_ylabel("Latency (ms)")
ax3.set_title("Latency Buildup")
ax3.legend()
ax3.grid(True, alpha=0.3)
# Plot 4: PyALSA avail before/after read
if before_aligned and before_values:
ax4.plot(before_aligned, before_values, label="avail before read", linewidth=1, alpha=0.7, color='cyan')
if after_aligned and after_values:
ax4.plot(after_aligned, after_values, label="avail after read", linewidth=1, alpha=0.7, color='magenta')
ax4.set_xlabel("Time (s)")
ax4.set_ylabel("Available samples")
ax4.set_title("PyALSA Available Samples (Before/After Read)")
if before_aligned or after_aligned:
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(out_path, dpi=150, bbox_inches='tight')
print(f"Combined plot saved to {out_path}")
# Show interactive plot
plt.show()
def main():
if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} <alsa_status.log> <perf_log.log> <latency_results.yaml>", file=sys.stderr)
sys.exit(1)
alsa_file = sys.argv[1]
perf_file = sys.argv[2]
latency_file = sys.argv[3]
for file_path in [alsa_file, perf_file, latency_file]:
if not os.path.isfile(file_path):
print(f"File not found: {file_path}", file=sys.stderr)
sys.exit(1)
# Determine output path (same directory as first file)
log_dir = os.path.dirname(os.path.abspath(alsa_file))
out_path = os.path.join(log_dir, "combined_audio_plot.png")
plot_combined(alsa_file, perf_file, latency_file, out_path)
if __name__ == "__main__":
main()

81
plot_perf_log.py Normal file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Parse Perf lines from a log file and plot sample mean, write mean, and loop mean over time."""
import sys
import re
import os
from datetime import datetime
import matplotlib.pyplot as plt
PERF_RE = re.compile(
r"^(\w+ \d+ \d+:\d+:\d+) .* Perf\(.*?\):"
r".*?sample mean=([\d.]+)ms"
r".*?write mean=([\d.]+)ms"
r".*?loop mean=([\d.]+)ms"
)
def parse_log(log_path):
timestamps = []
sample_means = []
write_means = []
loop_means = []
with open(log_path, "r") as f:
for line in f:
m = PERF_RE.search(line)
if m:
ts_str, sample, write, loop = m.groups()
ts = datetime.strptime(ts_str, "%b %d %H:%M:%S")
timestamps.append(ts)
sample_means.append(float(sample))
write_means.append(float(write))
loop_means.append(float(loop))
if not timestamps:
print("No Perf lines found in the log file.", file=sys.stderr)
sys.exit(1)
t0 = timestamps[0]
seconds = [(t - t0).total_seconds() for t in timestamps]
return seconds, sample_means, write_means, loop_means
def plot(seconds, sample_means, write_means, loop_means, out_path):
plt.figure(figsize=(12, 6))
plt.plot(seconds, sample_means, label="sample mean (ms)")
plt.plot(seconds, write_means, label="write mean (ms)")
plt.plot(seconds, loop_means, label="loop mean (ms)")
plt.xlabel("Time (s)")
plt.ylabel("Duration (ms)")
plt.title("Perf Metrics Over Time")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
print(f"Plot saved to {out_path}")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <path_to_log_file>", file=sys.stderr)
sys.exit(1)
log_path = sys.argv[1]
if not os.path.isfile(log_path):
print(f"File not found: {log_path}", file=sys.stderr)
sys.exit(1)
seconds, sample_means, write_means, loop_means = parse_log(log_path)
log_dir = os.path.dirname(os.path.abspath(log_path))
log_base = os.path.splitext(os.path.basename(log_path))[0]
out_path = os.path.join(log_dir, f"{log_base}_perf_plot.png")
plot(seconds, sample_means, write_means, loop_means, out_path)
if __name__ == "__main__":
main()

View File

@@ -235,11 +235,22 @@ def run_latency_test(config: Dict, num_measurements: int = 5, save_plots: bool =
last_correlation = correlation
last_lags = lags
avg = float(np.mean(latencies))
std_dev = float(np.std(latencies))
latency_cfg = config.get('latency', {})
max_std_dev_ms = latency_cfg.get('max_std_dev_ms', None)
min_avg_ms = latency_cfg.get('min_avg_ms', None)
valid = True
if max_std_dev_ms is not None and std_dev > max_std_dev_ms:
valid = False
if min_avg_ms is not None and avg < min_avg_ms:
valid = False
latency_stats = {
'avg': float(np.mean(latencies)),
'avg': avg,
'min': float(np.min(latencies)),
'max': float(np.max(latencies)),
'std': float(np.std(latencies))
'std': std_dev,
'valid': valid
}
if save_plots and output_dir and last_recording is not None:

View File

@@ -36,10 +36,9 @@ def main():
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / f"{test_id}_artifact_detection"
test_output_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_artifact_detection"
test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False)

View File

@@ -26,10 +26,9 @@ def main():
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
results_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / f"{test_id}_latency"
test_output_dir.mkdir(exist_ok=True)
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency"
test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False)
@@ -47,7 +46,9 @@ def main():
try:
latency_stats = run_latency_test(config, num_measurements=args.measurements,
save_plots=save_plots, output_dir=test_output_dir)
print(f"✓ Latency: avg={latency_stats['avg']:.3f}ms, "
valid = latency_stats.get('valid', True)
status = "PASS" if valid else "FAIL"
print(f"{'' if valid else ''} Latency [{status}]: avg={latency_stats['avg']:.3f}ms, "
f"min={latency_stats['min']:.3f}ms, max={latency_stats['max']:.3f}ms, "
f"std={latency_stats['std']:.3f}ms")
except Exception as e:

361
test_latency_buildup.py Executable file
View File

@@ -0,0 +1,361 @@
#!/usr/bin/env python3
import argparse
import yaml
import time
import signal
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
sys.path.insert(0, str(Path(__file__).parent))
from src.audio_tests import run_latency_test, find_audio_device, generate_chirp, play_and_record, calculate_latency
class LatencyBuildupTest:
def __init__(self, config, measurement_interval=30, max_duration=None):
self.config = config
self.measurement_interval = measurement_interval
self.max_duration = max_duration
self.running = False
self.measurements = []
self.start_time = None
def signal_handler(self, signum, frame):
print(f"\n\n{'='*70}")
print("TEST STOPPED - Generating final results...")
print(f"{'='*70}")
self.running = False
def run_single_latency_measurement(self):
"""Run a single latency measurement and return the result"""
try:
# Use existing latency test function with 1 measurement for speed
latency_stats = run_latency_test(self.config, num_measurements=1,
save_plots=False, output_dir=None)
return latency_stats['avg']
except Exception as e:
print(f"❌ Error in latency measurement: {e}")
return None
def analyze_buildup(self, latencies, timestamps):
"""Analyze latency build-up and return analysis results"""
if len(latencies) < 2:
return {
'buildup_detected': False,
'start_latency': 0,
'end_latency': 0,
'change_ms': 0,
'change_percent': 0,
'trend': 'insufficient_data'
}
start_latency = latencies[0]
end_latency = latencies[-1]
change_ms = end_latency - start_latency
change_percent = (change_ms / start_latency) * 100 if start_latency > 0 else 0
# Determine if buildup occurred (±5% threshold)
buildup_detected = abs(change_percent) > 5.0
# Calculate trend using linear regression
if len(latencies) >= 3:
x = np.arange(len(latencies))
y = np.array(latencies)
slope = np.polyfit(x, y, 1)[0]
if slope > 0.01: # Positive trend
trend = 'increasing'
elif slope < -0.01: # Negative trend
trend = 'decreasing'
else:
trend = 'stable'
else:
trend = 'insufficient_data'
return {
'buildup_detected': buildup_detected,
'start_latency': start_latency,
'end_latency': end_latency,
'change_ms': change_ms,
'change_percent': change_percent,
'trend': trend
}
def plot_latency_buildup(self, timestamps, latencies, output_dir):
"""Create and save latency over time plot"""
fig, ax = plt.subplots(1, 1, figsize=(12, 6))
# Convert timestamps to relative time in seconds
relative_times = [(t - timestamps[0]).total_seconds() for t in timestamps]
# Plot latency measurements
ax.plot(relative_times, latencies, 'b-o', markersize=4, linewidth=2, label='Latency Measurements')
# Add trend line if we have enough data
if len(latencies) >= 3:
x = np.array(relative_times)
y = np.array(latencies)
z = np.polyfit(x, y, 1)
p = np.poly1d(z)
ax.plot(x, p(x), "r--", alpha=0.8, linewidth=2, label=f'Trend: {z[0]:.4f} ms/s')
# Add reference line for start latency
ax.axhline(y=latencies[0], color='g', linestyle=':', alpha=0.7,
label=f'Start: {latencies[0]:.3f} ms')
ax.set_xlabel('Time (seconds)', fontsize=12)
ax.set_ylabel('Latency (ms)', fontsize=12)
ax.set_title('Latency Build-up Over Time', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
# Format y-axis to show reasonable precision
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:.3f}'))
plt.tight_layout()
plot_file = output_dir / 'latency_buildup_graph.png'
plt.savefig(plot_file, dpi=150, bbox_inches='tight')
plt.close()
return plot_file
def run_test(self):
"""Run the latency build-up test"""
self.running = True
self.start_time = datetime.now()
print(f"Starting latency build-up test at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Measurement interval: {self.measurement_interval} seconds")
if self.max_duration:
print(f"Maximum duration: {self.max_duration} seconds")
print("Press Ctrl+C to stop the test early")
print("=" * 70)
# Set up signal handler for graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
measurement_count = 0
while self.running:
current_time = datetime.now()
measurement_count += 1
print(f"\n[{current_time.strftime('%H:%M:%S')}] Measurement #{measurement_count}")
# Perform latency measurement
latency = self.run_single_latency_measurement()
if latency is not None:
self.measurements.append((current_time, latency))
timestamps, latencies = zip(*self.measurements)
# Calculate current statistics
avg_latency = np.mean(latencies)
min_latency = np.min(latencies)
max_latency = np.max(latencies)
std_latency = np.std(latencies)
print(f" Current latency: {latency:.3f} ms")
print(f" Average so far: {avg_latency:.3f} ms")
print(f" Range: {min_latency:.3f} - {max_latency:.3f} ms")
print(f" Std deviation: {std_latency:.3f} ms")
# Analyze for buildup
analysis = self.analyze_buildup(latencies, timestamps)
if analysis['buildup_detected']:
print(f" ⚠️ BUILDUP DETECTED: {analysis['change_percent']:+.2f}% "
f"({analysis['change_ms']:+.3f} ms)")
else:
print(f" ✅ No significant buildup: {analysis['change_percent']:+.2f}%")
print(f" Trend: {analysis['trend']}")
else:
print(f" ❌ Measurement failed")
# Check if we should continue
if self.max_duration:
elapsed = (current_time - self.start_time).total_seconds()
if elapsed >= self.max_duration:
print(f"\nMaximum duration of {self.max_duration} seconds reached")
break
if not self.running:
break
# Wait for next measurement with interruptible sleep
if self.running:
print(f" Waiting {self.measurement_interval} seconds...")
# Sleep in smaller chunks to allow quick interruption
sleep_chunk = 1.0 # Check every second
time_slept = 0
while self.running and time_slept < self.measurement_interval:
time.sleep(min(sleep_chunk, self.measurement_interval - time_slept))
time_slept += sleep_chunk
return self.generate_results()
def generate_results(self):
"""Generate final results and analysis"""
if not self.measurements:
return {'error': 'No measurements completed'}
timestamps, latencies = zip(*self.measurements)
end_time = datetime.now()
total_duration = (end_time - self.start_time).total_seconds()
# Final analysis
analysis = self.analyze_buildup(latencies, timestamps)
# Statistics
stats = {
'count': len(latencies),
'avg_ms': float(np.mean(latencies)),
'min_ms': float(np.min(latencies)),
'max_ms': float(np.max(latencies)),
'std_ms': float(np.std(latencies)),
'range_ms': float(np.max(latencies) - np.min(latencies))
}
results = {
'test_metadata': {
'start_time': self.start_time.isoformat(),
'end_time': end_time.isoformat(),
'total_duration_sec': total_duration,
'measurement_interval_sec': self.measurement_interval,
'total_measurements': len(latencies)
},
'latency_measurements': [
{
'timestamp': t.isoformat(),
'latency_ms': float(l)
}
for t, l in self.measurements
],
'statistics': stats,
'buildup_analysis': analysis
}
return results
def main():
parser = argparse.ArgumentParser(description='Run latency build-up test over time')
parser.add_argument('--serial-number', required=True, help='Serial number (e.g., SN001234)')
parser.add_argument('--software-version', required=True, help='Software version (git commit hash)')
parser.add_argument('--comment', default='', help='Comments about this test')
parser.add_argument('--config', default='config.yaml', help='Path to config file')
parser.add_argument('--interval', type=int, help='Measurement interval in seconds (default from config)')
parser.add_argument('--duration', type=int, help='Maximum test duration in seconds (default: run until canceled)')
args = parser.parse_args()
with open(args.config, 'r') as f:
config = yaml.safe_load(f)
# Use config values as defaults if not overridden by command line
measurement_interval = args.interval if args.interval else config['latency_buildup']['measurement_interval']
max_duration = args.duration if args.duration else config['latency_buildup'].get('max_duration')
timestamp = datetime.now()
test_id = timestamp.strftime('%Y%m%d_%H%M%S')
results_dir = Path(config['output']['results_dir'])
test_output_dir = results_dir / timestamp.strftime('%Y') / timestamp.strftime('%m') / timestamp.strftime('%d') / f"{test_id}_latency_buildup"
test_output_dir.mkdir(parents=True, exist_ok=True)
save_plots = config['output'].get('save_plots', False)
print("=" * 70)
print("LATENCY BUILD-UP TEST")
print("=" * 70)
print(f"Test ID: {test_id}")
print(f"Serial Number: {args.serial_number}")
print(f"Software: {args.software_version}")
if args.comment:
print(f"Comment: {args.comment}")
print(f"Measurement Interval: {measurement_interval} seconds")
if max_duration:
print(f"Maximum Duration: {max_duration} seconds")
else:
print("Duration: Run until canceled (Ctrl+C)")
if save_plots:
print(f"Plots will be saved to: {test_output_dir}")
print("-" * 70)
# Create and run test
test = LatencyBuildupTest(config, measurement_interval=measurement_interval, max_duration=max_duration)
results = test.run_test()
# Display final results
print("\n" + "=" * 70)
print("TEST COMPLETE - FINAL RESULTS")
print("=" * 70)
if 'error' in results:
print(f"❌ Test failed: {results['error']}")
else:
metadata = results['test_metadata']
stats = results['statistics']
analysis = results['buildup_analysis']
print(f"\n📊 Test Summary:")
print(f" Duration: {metadata['total_duration_sec']:.1f} seconds")
print(f" Measurements: {metadata['total_measurements']}")
print(f" Interval: {metadata['measurement_interval_sec']} seconds")
print(f"\n⏱️ Latency Statistics:")
print(f" Average: {stats['avg_ms']:.3f} ms")
print(f" Range: {stats['min_ms']:.3f} - {stats['max_ms']:.3f} ms")
print(f" Std Dev: {stats['std_ms']:.3f} ms")
print(f"\n📈 Build-up Analysis:")
print(f" Start Latency: {analysis['start_latency']:.3f} ms")
print(f" End Latency: {analysis['end_latency']:.3f} ms")
print(f" Change: {analysis['change_ms']:+.3f} ms ({analysis['change_percent']:+.2f}%)")
print(f" Trend: {analysis['trend']}")
if analysis['buildup_detected']:
print(f"\n⚠️ BUILDUP DETECTED!")
print(f" Latency changed by {abs(analysis['change_percent']):.2f}% (threshold: ±5%)")
else:
print(f"\n✅ No significant buildup detected")
print(f" Latency change within acceptable range (±5%)")
# Generate and save plot
if save_plots and len(results['latency_measurements']) > 1:
timestamps = [datetime.fromisoformat(m['timestamp']) for m in results['latency_measurements']]
latencies = [m['latency_ms'] for m in results['latency_measurements']]
plot_file = test.plot_latency_buildup(timestamps, latencies, test_output_dir)
print(f"\n📊 Latency graph saved to: {plot_file}")
# Save results to file
output_data = {
'metadata': {
'test_id': test_id,
'timestamp': timestamp.isoformat(),
'serial_number': args.serial_number,
'software_version': args.software_version,
'comment': args.comment
},
'latency_buildup_result': results
}
output_file = test_output_dir / f"{test_id}_latency_buildup_results.yaml"
with open(output_file, 'w') as f:
yaml.dump(output_data, f, default_flow_style=False, sort_keys=False)
print("\n" + "=" * 70)
print("✅ Results saved to:")
print(f" YAML: {output_file}")
if save_plots and len(results.get('latency_measurements', [])) > 1:
print(f" Graph: {test_output_dir}/latency_buildup_graph.png")
print("=" * 70)
if __name__ == '__main__':
main()

View File

@@ -88,7 +88,7 @@ def display_results(yaml_file: Path):
def list_all_results(results_dir: Path):
yaml_files = sorted(results_dir.glob("*_results.yaml"))
yaml_files = sorted(results_dir.rglob("*_results.yaml"))
if not yaml_files:
print("No test results found.")