import time import ollama from ollama import AsyncClient from rich.console import Console from rich.table import Table import datetime import signal import sys import asyncio def nanosec_to_sec(nanosec): """Convert nanoseconds to seconds""" return nanosec / 1_000_000_000 async def generate_tokens_async(model_name, host_ip): # Define the prompt to generate tokens prompt = """Why is the sky blue? Give a comprehensive explanation.""" # Create a client for the specific host client = AsyncClient(host=f"http://{host_ip}:11434") start_time = time.time() # Print a message to indicate we're waiting print(f" Generating tokens with {model_name} on {host_ip}...") # Execute the request with timeout try: # Make the request using the chat API response = await client.chat( model=model_name, messages=[ { 'role': 'user', 'content': prompt, } ] ) # If we get here, the request was successful end_time = time.time() # print generation finished message print(f" Generation finished with {model_name} on {host_ip}") generation_time = end_time - start_time # Get the generated content from the response generated_content = response['message']['content'] # Get accurate token metrics from the Ollama response metadata prompt_eval_count = response.get('prompt_eval_count', 0) prompt_eval_duration = response.get('prompt_eval_duration', 0) eval_count = response.get('eval_count', 0) eval_duration = response.get('eval_duration', 0) total_duration = response.get('total_duration', 0) # Calculate tokens per second using the Ollama metadata if prompt_eval_duration > 0: prompt_tokens_per_second = prompt_eval_count / nanosec_to_sec(prompt_eval_duration) else: prompt_tokens_per_second = 0 if eval_duration > 0: response_tokens_per_second = eval_count / nanosec_to_sec(eval_duration) else: response_tokens_per_second = 0 if total_duration > 0: total_tokens = prompt_eval_count + eval_count total_tokens_per_second = total_tokens / nanosec_to_sec(total_duration) else: # Fall back to wall-clock time if total_duration not provided total_tokens = prompt_eval_count + eval_count total_tokens_per_second = total_tokens / generation_time return { "success": True, "total_tokens_per_second": total_tokens_per_second, "prompt_tokens_per_second": prompt_tokens_per_second, "response_tokens_per_second": response_tokens_per_second, "generation_time": generation_time, "content_length": len(generated_content), "prompt_eval_count": prompt_eval_count, "eval_count": eval_count, "total_duration_seconds": nanosec_to_sec(total_duration), "prompt_eval_duration_seconds": nanosec_to_sec(prompt_eval_duration), "eval_duration_seconds": nanosec_to_sec(eval_duration) } except Exception as e: error_msg = str(e) print(f" Request Error: {error_msg}") return { "success": False, "error": error_msg } async def test_host(host, models, console): """Test models sequentially on a single host""" results = [] console.print(f"[bold cyan]Testing host: {host}[/bold cyan]") for model in models: console.print(f"[bold green]Testing model: {model} on {host}[/bold green]") # Run the test result = await generate_tokens_async(model, host) # Store the result results.append({ "host": host, "model": model, "result": result }) # Add a small delay between tests on the same host await asyncio.sleep(1) # Print a separator console.print("─" * 50) return results def print_report(results): console = Console() # Create a table table = Table(title=f"Ollama Performance Test Report - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # Add columns table.add_column("Host IP", style="cyan") table.add_column("Model", style="green") table.add_column("Total T/S", style="magenta") table.add_column("Prompt T/S", style="blue") table.add_column("Response T/S", style="yellow") table.add_column("Prompt Tokens", style="blue") table.add_column("Response Tokens", style="yellow") table.add_column("Status", style="red") # Add rows for result in results: host = result["host"] model = result["model"] if result["result"]["success"]: total_tps = f"{result['result'].get('total_tokens_per_second', 0):.2f}" prompt_tps = f"{result['result'].get('prompt_tokens_per_second', 0):.2f}" response_tps = f"{result['result'].get('response_tokens_per_second', 0):.2f}" prompt_tokens = str(result['result'].get('prompt_eval_count', 0)) response_tokens = str(result['result'].get('eval_count', 0)) status = "✅ Success" else: total_tps = "N/A" prompt_tps = "N/A" response_tps = "N/A" prompt_tokens = "N/A" response_tokens = "N/A" status = f"❌ Failed: {result['result']['error']}" table.add_row(host, model, total_tps, prompt_tps, response_tps, prompt_tokens, response_tokens, status) # Print the table console.print(table) # Print summary statistics if there are successful results successful_results = [r for r in results if r["result"]["success"]] if successful_results: summary_table = Table(title="Summary Statistics") summary_table.add_column("Metric", style="cyan") summary_table.add_column("Value", style="green") avg_total_tps = sum(r["result"].get("total_tokens_per_second", 0) for r in successful_results) / len(successful_results) avg_response_tps = sum(r["result"].get("response_tokens_per_second", 0) for r in successful_results) / len(successful_results) # Find fastest by response token speed (usually what people care about most) fastest_host_model = max(successful_results, key=lambda x: x["result"].get("response_tokens_per_second", 0)) slowest_host_model = min(successful_results, key=lambda x: x["result"].get("response_tokens_per_second", 0)) summary_table.add_row("Average Total Tokens/Second", f"{avg_total_tps:.2f}") summary_table.add_row("Average Response Tokens/Second", f"{avg_response_tps:.2f}") summary_table.add_row("Fastest Response", f"{fastest_host_model['host']} with {fastest_host_model['model']} " + f"({fastest_host_model['result'].get('response_tokens_per_second', 0):.2f} tokens/s)") summary_table.add_row("Slowest Response", f"{slowest_host_model['host']} with {slowest_host_model['model']} " + f"({slowest_host_model['result'].get('response_tokens_per_second', 0):.2f} tokens/s)") console.print(summary_table) async def main_async(): # Define the test matrix test_matrix = { #"localhost": ["llama3.2:3b-instruct-q4_0"], "192.168.50.3": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"], "192.168.50.121": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"] } console = Console() try: # Create tasks to test each host in parallel tasks = [] for host, models in test_matrix.items(): # Create a task for each host (models will be tested sequentially within each host) task = asyncio.create_task(test_host(host, models, console)) tasks.append(task) # Wait for all host tests to complete results_by_host = await asyncio.gather(*tasks) # Flatten the results from all hosts all_results = [] for host_results in results_by_host: all_results.extend(host_results) # Print the overall report print_report(all_results) except KeyboardInterrupt: console.print("\n[bold red]Test interrupted by user.[/bold red]") # We can't easily get partial results when interrupted in async mode except Exception as e: console.print(f"[bold red]Error during testing: {str(e)}[/bold red]") def main(): # Run the async main function asyncio.run(main_async()) if __name__ == "__main__": main()