use asyncio

This commit is contained in:
2025-03-14 18:40:23 +01:00
parent 197512a6bc
commit 5000a780d7

View File

@@ -1,67 +1,109 @@
import time import time
import ollama import ollama
import base64 from ollama import AsyncClient
import tabulate
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
import datetime import datetime
import signal
import sys
import asyncio import asyncio
import concurrent.futures
def nanosec_to_sec(nanosec):
"""Convert nanoseconds to seconds"""
return nanosec / 1_000_000_000
async def generate_tokens_async(model_name, host_ip): async def generate_tokens_async(model_name, host_ip):
# Define the prompt to generate tokens # Define the prompt to generate tokens
prompt = """Generate 1000 random words with no spaces, each word should be between 3-5 letters long. Separate them with line breaks.\n\n""" prompt = """Why is the sky blue? Give a comprehensive explanation."""
# Set the host for ollama client # Create a client for the specific host
ollama.host = f"http://{host_ip}:11434" client = AsyncClient(host=f"http://{host_ip}:11434")
start_time = time.time()
# Print a message to indicate we're waiting
print(f" Generating tokens with {model_name} on {host_ip}...")
# Execute the request with timeout
try: try:
# Start timing the generation # Make the request using the chat API
start_time = time.time() response = await client.chat(
model=model_name,
messages=[
{
'role': 'user',
'content': prompt,
}
]
)
# Use ollama client to generate response - run in a thread pool to avoid blocking # If we get here, the request was successful
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
response = await loop.run_in_executor(
pool,
lambda: ollama.generate(model=model_name, prompt=prompt, stream=False)
)
# Calculate the time taken
end_time = time.time() end_time = time.time()
# print generation finished message
print(f" Generation finished with {model_name} on {host_ip}")
generation_time = end_time - start_time generation_time = end_time - start_time
# Get the generated content # Get the generated content from the response
generated_content = response['response'] generated_content = response['message']['content']
# Estimate the number of tokens in the response # Get accurate token metrics from the Ollama response metadata
# Rough estimate: 1 token is approximately 4 characters for English text prompt_eval_count = response.get('prompt_eval_count', 0)
estimated_tokens = len(generated_content) / 4 prompt_eval_duration = response.get('prompt_eval_duration', 0)
eval_count = response.get('eval_count', 0)
eval_duration = response.get('eval_duration', 0)
total_duration = response.get('total_duration', 0)
# Calculate tokens per second # Calculate tokens per second using the Ollama metadata
tokens_per_second = estimated_tokens / generation_time if prompt_eval_duration > 0:
prompt_tokens_per_second = prompt_eval_count / nanosec_to_sec(prompt_eval_duration)
else:
prompt_tokens_per_second = 0
if eval_duration > 0:
response_tokens_per_second = eval_count / nanosec_to_sec(eval_duration)
else:
response_tokens_per_second = 0
if total_duration > 0:
total_tokens = prompt_eval_count + eval_count
total_tokens_per_second = total_tokens / nanosec_to_sec(total_duration)
else:
# Fall back to wall-clock time if total_duration not provided
total_tokens = prompt_eval_count + eval_count
total_tokens_per_second = total_tokens / generation_time
return { return {
"success": True, "success": True,
"tokens_per_second": tokens_per_second, "total_tokens_per_second": total_tokens_per_second,
"prompt_tokens_per_second": prompt_tokens_per_second,
"response_tokens_per_second": response_tokens_per_second,
"generation_time": generation_time, "generation_time": generation_time,
"content_length": len(generated_content), "content_length": len(generated_content),
"estimated_tokens": estimated_tokens "prompt_eval_count": prompt_eval_count,
"eval_count": eval_count,
"total_duration_seconds": nanosec_to_sec(total_duration),
"prompt_eval_duration_seconds": nanosec_to_sec(prompt_eval_duration),
"eval_duration_seconds": nanosec_to_sec(eval_duration)
} }
except Exception as e: except Exception as e:
print(f"Error for {model_name} on {host_ip}: {str(e)}") error_msg = str(e)
print(f" Request Error: {error_msg}")
return { return {
"success": False, "success": False,
"error": str(e) "error": error_msg
} }
async def test_host(host, models):
"""Process all models for a single host sequentially""" async def test_host(host, models, console):
"""Test models sequentially on a single host"""
results = [] results = []
console.print(f"[bold cyan]Testing host: {host}[/bold cyan]")
for model in models: for model in models:
print(f"Testing model: {model} on host: {host}") console.print(f"[bold green]Testing model: {model} on {host}[/bold green]")
# Run the test # Run the test
result = await generate_tokens_async(model, host) result = await generate_tokens_async(model, host)
@@ -73,9 +115,12 @@ async def test_host(host, models):
"result": result "result": result
}) })
# Add a small delay between tests on same host # Add a small delay between tests on the same host
await asyncio.sleep(1) await asyncio.sleep(1)
# Print a separator
console.print("" * 50)
return results return results
def print_report(results): def print_report(results):
@@ -87,9 +132,11 @@ def print_report(results):
# Add columns # Add columns
table.add_column("Host IP", style="cyan") table.add_column("Host IP", style="cyan")
table.add_column("Model", style="green") table.add_column("Model", style="green")
table.add_column("Tokens/Second", style="magenta") table.add_column("Total T/S", style="magenta")
table.add_column("Generation Time (s)", style="yellow") table.add_column("Prompt T/S", style="blue")
table.add_column("Content Length", style="blue") table.add_column("Response T/S", style="yellow")
table.add_column("Prompt Tokens", style="blue")
table.add_column("Response Tokens", style="yellow")
table.add_column("Status", style="red") table.add_column("Status", style="red")
# Add rows # Add rows
@@ -98,17 +145,21 @@ def print_report(results):
model = result["model"] model = result["model"]
if result["result"]["success"]: if result["result"]["success"]:
tokens_per_second = f"{result['result']['tokens_per_second']:.2f}" total_tps = f"{result['result'].get('total_tokens_per_second', 0):.2f}"
generation_time = f"{result['result']['generation_time']:.2f}" prompt_tps = f"{result['result'].get('prompt_tokens_per_second', 0):.2f}"
content_length = str(result['result']['content_length']) response_tps = f"{result['result'].get('response_tokens_per_second', 0):.2f}"
prompt_tokens = str(result['result'].get('prompt_eval_count', 0))
response_tokens = str(result['result'].get('eval_count', 0))
status = "✅ Success" status = "✅ Success"
else: else:
tokens_per_second = "N/A" total_tps = "N/A"
generation_time = "N/A" prompt_tps = "N/A"
content_length = "N/A" response_tps = "N/A"
prompt_tokens = "N/A"
response_tokens = "N/A"
status = f"❌ Failed: {result['result']['error']}" status = f"❌ Failed: {result['result']['error']}"
table.add_row(host, model, tokens_per_second, generation_time, content_length, status) table.add_row(host, model, total_tps, prompt_tps, response_tps, prompt_tokens, response_tokens, status)
# Print the table # Print the table
console.print(table) console.print(table)
@@ -120,44 +171,59 @@ def print_report(results):
summary_table.add_column("Metric", style="cyan") summary_table.add_column("Metric", style="cyan")
summary_table.add_column("Value", style="green") summary_table.add_column("Value", style="green")
avg_tokens_per_second = sum(r["result"]["tokens_per_second"] for r in successful_results) / len(successful_results) avg_total_tps = sum(r["result"].get("total_tokens_per_second", 0) for r in successful_results) / len(successful_results)
fastest_host_model = max(successful_results, key=lambda x: x["result"]["tokens_per_second"]) avg_response_tps = sum(r["result"].get("response_tokens_per_second", 0) for r in successful_results) / len(successful_results)
slowest_host_model = min(successful_results, key=lambda x: x["result"]["tokens_per_second"])
summary_table.add_row("Average Tokens/Second", f"{avg_tokens_per_second:.2f}") # Find fastest by response token speed (usually what people care about most)
summary_table.add_row("Fastest Configuration", fastest_host_model = max(successful_results, key=lambda x: x["result"].get("response_tokens_per_second", 0))
slowest_host_model = min(successful_results, key=lambda x: x["result"].get("response_tokens_per_second", 0))
summary_table.add_row("Average Total Tokens/Second", f"{avg_total_tps:.2f}")
summary_table.add_row("Average Response Tokens/Second", f"{avg_response_tps:.2f}")
summary_table.add_row("Fastest Response",
f"{fastest_host_model['host']} with {fastest_host_model['model']} " + f"{fastest_host_model['host']} with {fastest_host_model['model']} " +
f"({fastest_host_model['result']['tokens_per_second']:.2f} tokens/s)") f"({fastest_host_model['result'].get('response_tokens_per_second', 0):.2f} tokens/s)")
summary_table.add_row("Slowest Configuration", summary_table.add_row("Slowest Response",
f"{slowest_host_model['host']} with {slowest_host_model['model']} " + f"{slowest_host_model['host']} with {slowest_host_model['model']} " +
f"({slowest_host_model['result']['tokens_per_second']:.2f} tokens/s)") f"({slowest_host_model['result'].get('response_tokens_per_second', 0):.2f} tokens/s)")
console.print(summary_table) console.print(summary_table)
async def main_async(): async def main_async():
# Define the test matrix # Define the test matrix
test_matrix = { test_matrix = {
"localhost": ["llama3.2:3b-instruct-q4_0"], #"localhost": ["llama3.2:3b-instruct-q4_0"],
"192.168.50.3": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"], "192.168.50.3": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"],
"192.168.50.121": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"] "192.168.50.121": ["llama3.2:3b-instruct-q4_0", "llama3.1:8b-instruct-q4_0", "llama3.1:8b-instruct-q8_0"]
} }
# Create tasks to test each host in parallel console = Console()
tasks = []
for host, models in test_matrix.items():
task = asyncio.create_task(test_host(host, models))
tasks.append(task)
# Wait for all tasks to complete try:
host_results = await asyncio.gather(*tasks) # Create tasks to test each host in parallel
tasks = []
for host, models in test_matrix.items():
# Create a task for each host (models will be tested sequentially within each host)
task = asyncio.create_task(test_host(host, models, console))
tasks.append(task)
# Flatten results # Wait for all host tests to complete
all_results = [] results_by_host = await asyncio.gather(*tasks)
for result_list in host_results:
all_results.extend(result_list)
# Print the report # Flatten the results from all hosts
print_report(all_results) all_results = []
for host_results in results_by_host:
all_results.extend(host_results)
# Print the overall report
print_report(all_results)
except KeyboardInterrupt:
console.print("\n[bold red]Test interrupted by user.[/bold red]")
# We can't easily get partial results when interrupted in async mode
except Exception as e:
console.print(f"[bold red]Error during testing: {str(e)}[/bold red]")
def main(): def main():
# Run the async main function # Run the async main function