From 5000a780d7880886b20522d10db3b9403cb487ff Mon Sep 17 00:00:00 2001 From: pstruebi Date: Fri, 14 Mar 2025 18:40:23 +0100 Subject: [PATCH] use asyncio --- ollama-speedtest.py | 198 +++++++++++++++++++++++++++++--------------- 1 file changed, 132 insertions(+), 66 deletions(-) diff --git a/ollama-speedtest.py b/ollama-speedtest.py index 3838443..cf5ef17 100644 --- a/ollama-speedtest.py +++ b/ollama-speedtest.py @@ -1,67 +1,109 @@ import time import ollama -import base64 -import tabulate +from ollama import AsyncClient from rich.console import Console from rich.table import Table import datetime +import signal +import sys import asyncio -import concurrent.futures + +def nanosec_to_sec(nanosec): + """Convert nanoseconds to seconds""" + return nanosec / 1_000_000_000 async def generate_tokens_async(model_name, host_ip): # Define the prompt to generate tokens - prompt = """Generate 1000 random words with no spaces, each word should be between 3-5 letters long. Separate them with line breaks.\n\n""" + prompt = """Why is the sky blue? Give a comprehensive explanation.""" - # Set the host for ollama client - ollama.host = f"http://{host_ip}:11434" + # Create a client for the specific host + client = AsyncClient(host=f"http://{host_ip}:11434") + start_time = time.time() + + # Print a message to indicate we're waiting + print(f" Generating tokens with {model_name} on {host_ip}...") + + # Execute the request with timeout try: - # Start timing the generation - start_time = time.time() + # Make the request using the chat API + response = await client.chat( + model=model_name, + messages=[ + { + 'role': 'user', + 'content': prompt, + } + ] + ) - # Use ollama client to generate response - run in a thread pool to avoid blocking - loop = asyncio.get_event_loop() - with concurrent.futures.ThreadPoolExecutor() as pool: - response = await loop.run_in_executor( - pool, - lambda: ollama.generate(model=model_name, prompt=prompt, stream=False) - ) - - # Calculate the time taken + # If we get here, the request was successful end_time = time.time() + # print generation finished message + print(f" Generation finished with {model_name} on {host_ip}") + generation_time = end_time - start_time - # Get the generated content - generated_content = response['response'] + # Get the generated content from the response + generated_content = response['message']['content'] - # Estimate the number of tokens in the response - # Rough estimate: 1 token is approximately 4 characters for English text - estimated_tokens = len(generated_content) / 4 + # Get accurate token metrics from the Ollama response metadata + prompt_eval_count = response.get('prompt_eval_count', 0) + prompt_eval_duration = response.get('prompt_eval_duration', 0) + eval_count = response.get('eval_count', 0) + eval_duration = response.get('eval_duration', 0) + total_duration = response.get('total_duration', 0) - # Calculate tokens per second - tokens_per_second = estimated_tokens / generation_time + # Calculate tokens per second using the Ollama metadata + if prompt_eval_duration > 0: + prompt_tokens_per_second = prompt_eval_count / nanosec_to_sec(prompt_eval_duration) + else: + prompt_tokens_per_second = 0 + + if eval_duration > 0: + response_tokens_per_second = eval_count / nanosec_to_sec(eval_duration) + else: + response_tokens_per_second = 0 + + if total_duration > 0: + total_tokens = prompt_eval_count + eval_count + total_tokens_per_second = total_tokens / nanosec_to_sec(total_duration) + else: + # Fall back to wall-clock time if total_duration not provided + total_tokens = prompt_eval_count + eval_count + total_tokens_per_second = total_tokens / generation_time return { "success": True, - "tokens_per_second": tokens_per_second, + "total_tokens_per_second": total_tokens_per_second, + "prompt_tokens_per_second": prompt_tokens_per_second, + "response_tokens_per_second": response_tokens_per_second, "generation_time": generation_time, "content_length": len(generated_content), - "estimated_tokens": estimated_tokens + "prompt_eval_count": prompt_eval_count, + "eval_count": eval_count, + "total_duration_seconds": nanosec_to_sec(total_duration), + "prompt_eval_duration_seconds": nanosec_to_sec(prompt_eval_duration), + "eval_duration_seconds": nanosec_to_sec(eval_duration) } except Exception as e: - print(f"Error for {model_name} on {host_ip}: {str(e)}") + error_msg = str(e) + print(f" Request Error: {error_msg}") return { "success": False, - "error": str(e) + "error": error_msg } + -async def test_host(host, models): - """Process all models for a single host sequentially""" +async def test_host(host, models, console): + """Test models sequentially on a single host""" results = [] + console.print(f"[bold cyan]Testing host: {host}[/bold cyan]") + for model in models: - print(f"Testing model: {model} on host: {host}") + console.print(f"[bold green]Testing model: {model} on {host}[/bold green]") # Run the test result = await generate_tokens_async(model, host) @@ -73,8 +115,11 @@ async def test_host(host, models): "result": result }) - # Add a small delay between tests on same host + # Add a small delay between tests on the same host await asyncio.sleep(1) + + # Print a separator + console.print("─" * 50) return results @@ -87,9 +132,11 @@ def print_report(results): # Add columns table.add_column("Host IP", style="cyan") table.add_column("Model", style="green") - table.add_column("Tokens/Second", style="magenta") - table.add_column("Generation Time (s)", style="yellow") - table.add_column("Content Length", style="blue") + table.add_column("Total T/S", style="magenta") + table.add_column("Prompt T/S", style="blue") + table.add_column("Response T/S", style="yellow") + table.add_column("Prompt Tokens", style="blue") + table.add_column("Response Tokens", style="yellow") table.add_column("Status", style="red") # Add rows @@ -98,17 +145,21 @@ def print_report(results): model = result["model"] if result["result"]["success"]: - tokens_per_second = f"{result['result']['tokens_per_second']:.2f}" - generation_time = f"{result['result']['generation_time']:.2f}" - content_length = str(result['result']['content_length']) + total_tps = f"{result['result'].get('total_tokens_per_second', 0):.2f}" + prompt_tps = f"{result['result'].get('prompt_tokens_per_second', 0):.2f}" + response_tps = f"{result['result'].get('response_tokens_per_second', 0):.2f}" + prompt_tokens = str(result['result'].get('prompt_eval_count', 0)) + response_tokens = str(result['result'].get('eval_count', 0)) status = "✅ Success" else: - tokens_per_second = "N/A" - generation_time = "N/A" - content_length = "N/A" + total_tps = "N/A" + prompt_tps = "N/A" + response_tps = "N/A" + prompt_tokens = "N/A" + response_tokens = "N/A" status = f"❌ Failed: {result['result']['error']}" - table.add_row(host, model, tokens_per_second, generation_time, content_length, status) + table.add_row(host, model, total_tps, prompt_tps, response_tps, prompt_tokens, response_tokens, status) # Print the table console.print(table) @@ -120,44 +171,59 @@ def print_report(results): summary_table.add_column("Metric", style="cyan") summary_table.add_column("Value", style="green") - avg_tokens_per_second = sum(r["result"]["tokens_per_second"] for r in successful_results) / len(successful_results) - fastest_host_model = max(successful_results, key=lambda x: x["result"]["tokens_per_second"]) - slowest_host_model = min(successful_results, key=lambda x: x["result"]["tokens_per_second"]) + avg_total_tps = sum(r["result"].get("total_tokens_per_second", 0) for r in successful_results) / len(successful_results) + avg_response_tps = sum(r["result"].get("response_tokens_per_second", 0) for r in successful_results) / len(successful_results) - summary_table.add_row("Average Tokens/Second", f"{avg_tokens_per_second:.2f}") - summary_table.add_row("Fastest Configuration", + # Find fastest by response token speed (usually what people care about most) + fastest_host_model = max(successful_results, key=lambda x: x["result"].get("response_tokens_per_second", 0)) + slowest_host_model = min(successful_results, key=lambda x: x["result"].get("response_tokens_per_second", 0)) + + summary_table.add_row("Average Total Tokens/Second", f"{avg_total_tps:.2f}") + summary_table.add_row("Average Response Tokens/Second", f"{avg_response_tps:.2f}") + summary_table.add_row("Fastest Response", f"{fastest_host_model['host']} with {fastest_host_model['model']} " + - f"({fastest_host_model['result']['tokens_per_second']:.2f} tokens/s)") - summary_table.add_row("Slowest Configuration", + f"({fastest_host_model['result'].get('response_tokens_per_second', 0):.2f} tokens/s)") + summary_table.add_row("Slowest Response", f"{slowest_host_model['host']} with {slowest_host_model['model']} " + - f"({slowest_host_model['result']['tokens_per_second']:.2f} tokens/s)") + f"({slowest_host_model['result'].get('response_tokens_per_second', 0):.2f} tokens/s)") console.print(summary_table) async def main_async(): # Define the test matrix test_matrix = { - "localhost": ["llama3.2:3b-instruct-q4_0"], + #"localhost": ["llama3.2:3b-instruct-q4_0"], "192.168.50.3": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"], "192.168.50.121": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"] } - # Create tasks to test each host in parallel - tasks = [] - for host, models in test_matrix.items(): - task = asyncio.create_task(test_host(host, models)) - tasks.append(task) + console = Console() - # Wait for all tasks to complete - host_results = await asyncio.gather(*tasks) - - # Flatten results - all_results = [] - for result_list in host_results: - all_results.extend(result_list) - - # Print the report - print_report(all_results) + try: + # Create tasks to test each host in parallel + tasks = [] + for host, models in test_matrix.items(): + # Create a task for each host (models will be tested sequentially within each host) + task = asyncio.create_task(test_host(host, models, console)) + tasks.append(task) + + # Wait for all host tests to complete + results_by_host = await asyncio.gather(*tasks) + + # Flatten the results from all hosts + all_results = [] + for host_results in results_by_host: + all_results.extend(host_results) + + # Print the overall report + print_report(all_results) + + except KeyboardInterrupt: + console.print("\n[bold red]Test interrupted by user.[/bold red]") + # We can't easily get partial results when interrupted in async mode + + except Exception as e: + console.print(f"[bold red]Error during testing: {str(e)}[/bold red]") def main(): # Run the async main function