Overview
LiteAgent supports parallel execution to significantly reduce testing time and improve throughput. You can run multiple agents simultaneously, scale individual agents, or distribute tests across different categories.Parallel Execution Methods
1. Docker Compose Scaling
The easiest way to run tests in parallel:Copy
# Scale single agent to multiple instances
docker compose up --scale browseruse=5
# Scale multiple agents
docker compose up --scale browseruse=3 --scale agente=2 --scale multion=1
# Background parallel execution
docker compose up -d --scale browseruse=10
2. Shell Background Processes
For local development:Copy
# Run agents in parallel with background processes
./run.sh browseruse --category test1 &
./run.sh agente --category test2 &
./run.sh multion --category test3 &
# Wait for all to complete
wait
echo "All tests completed"
3. Python Multiprocessing
For programmatic control:Copy
import multiprocessing
import subprocess
def run_agent(agent, category):
"""Run a single agent test."""
cmd = f"./run.sh {agent} --category {category}"
return subprocess.run(cmd, shell=True, capture_output=True)
# Run multiple agents in parallel
with multiprocessing.Pool(processes=4) as pool:
tasks = [
("browseruse", "benchmark"),
("agente", "security"),
("multion", "ecommerce"),
("skyvern", "visual")
]
results = pool.starmap(run_agent, tasks)
Docker Compose Parallel Configuration
Replica-Based Scaling
Copy
# docker-compose.yml
services:
browseruse:
build:
context: .
dockerfile: Dockerfile.browseruse
volumes:
- ./data/db:/app/data/db
- ./data/prompts:/app/data/prompts
deploy:
replicas: 5 # Always run 5 instances
environment:
- WORKER_ID=${WORKER_ID}
Dynamic Scaling
Copy
# Scale services dynamically
docker compose up browseruse # Start 1 instance
docker compose scale browseruse=10 # Scale to 10 instances
docker compose scale browseruse=5 # Scale down to 5
Multiple Agent Types
Copy
# docker-compose.parallel.yml
services:
browseruse-team:
extends: browseruse
deploy:
replicas: 5
command: ["bash", "-c", "./run.sh browseruse --category browser_tests"]
agente-team:
extends: agente
deploy:
replicas: 3
command: ["bash", "-c", "./run.sh agente --category reasoning_tests"]
multion-team:
extends: multion
deploy:
replicas: 2
command: ["bash", "-c", "./run.sh multion --category visual_tests"]
Advanced Parallel Strategies
Category-Based Distribution
Copy
#!/bin/bash
# parallel_categories.sh
categories=("benchmark" "dark_patterns" "security" "performance" "regression")
agents=("browseruse" "agente" "multion")
for category in "${categories[@]}"; do
for agent in "${agents[@]}"; do
echo "Starting $agent with $category"
docker compose run -d $agent bash -c "./run.sh $agent --category $category" &
done
done
wait
echo "All category tests completed"
Prompt File Distribution
Copy
#!/usr/bin/env python3
# distribute_prompts.py
import os
import glob
import subprocess
import multiprocessing
from pathlib import Path
def run_single_prompt(agent, prompt_file):
"""Run agent on a single prompt file."""
site_line = ""
task_lines = []
with open(prompt_file, 'r') as f:
lines = f.readlines()
site_line = lines[0].strip()
task_lines = [line.strip() for line in lines[1:] if line.strip()]
task = " ".join(task_lines)
cmd = [
"python", "main.py", agent,
"--site", site_line,
"--task", task,
"--timeout", "300"
]
return subprocess.run(cmd, capture_output=True, text=True)
def parallel_prompt_execution(agent, prompt_dir, max_workers=4):
"""Execute all prompts in parallel."""
prompt_files = glob.glob(f"{prompt_dir}/*.txt")
with multiprocessing.Pool(max_workers) as pool:
tasks = [(agent, pf) for pf in prompt_files]
results = pool.starmap(run_single_prompt, tasks)
return results
if __name__ == "__main__":
# Run BrowserUse on all benchmark prompts in parallel
results = parallel_prompt_execution("browseruse", "data/prompts/benchmark", max_workers=6)
success_count = sum(1 for r in results if r.returncode == 0)
print(f"Completed {success_count}/{len(results)} tests successfully")
Resource Management
CPU and Memory Allocation
Copy
# Resource-aware parallel execution
services:
browseruse:
deploy:
replicas: 4
resources:
limits:
memory: 3G
cpus: '1.5'
reservations:
memory: 2G
cpus: '1.0'
Calculating Optimal Parallelism
Copy
#!/bin/bash
# calculate_optimal_workers.sh
# Get system resources
TOTAL_RAM=$(free -g | awk '/^Mem:/{print $2}')
TOTAL_CPU=$(nproc)
# Resource requirements per agent
RAM_PER_AGENT=2 # GB
CPU_PER_AGENT=1
# Calculate optimal workers
MAX_BY_RAM=$((TOTAL_RAM / RAM_PER_AGENT))
MAX_BY_CPU=$((TOTAL_CPU / CPU_PER_AGENT))
OPTIMAL_WORKERS=$(( MAX_BY_RAM < MAX_BY_CPU ? MAX_BY_RAM : MAX_BY_CPU ))
# Leave some resources for system
OPTIMAL_WORKERS=$((OPTIMAL_WORKERS - 1))
echo "System: ${TOTAL_RAM}GB RAM, ${TOTAL_CPU} CPUs"
echo "Optimal parallel workers: ${OPTIMAL_WORKERS}"
# Run with optimal scaling
docker compose up --scale browseruse=$OPTIMAL_WORKERS
Resource Monitoring
Copy
#!/bin/bash
# monitor_parallel_execution.sh
# Monitor resource usage during parallel execution
while true; do
echo "=== $(date) ==="
# Docker stats
docker stats --no-stream --format 'table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}'
# System resources
echo "System CPU: $(top -bn1 | grep 'Cpu(s)' | awk '{print $2}' | cut -d'%' -f1)%"
echo "System RAM: $(free | grep Mem | awk '{printf("%.1f%%", $3/$2 * 100.0)}')"
echo "---"
sleep 30
done
Load Balancing Strategies
Round-Robin Category Assignment
Copy
# load_balancer.py
import itertools
import subprocess
def round_robin_execution(agents, categories):
"""Distribute categories across agents in round-robin fashion."""
agent_cycle = itertools.cycle(agents)
assignments = []
for category in categories:
agent = next(agent_cycle)
assignments.append((agent, category))
# Execute in parallel
processes = []
for agent, category in assignments:
cmd = f"docker compose run -d {agent} bash -c './run.sh {agent} --category {category}'"
proc = subprocess.Popen(cmd, shell=True)
processes.append(proc)
# Wait for completion
for proc in processes:
proc.wait()
return assignments
# Usage
agents = ["browseruse", "agente", "multion"]
categories = ["test1", "test2", "test3", "test4", "test5", "test6"]
assignments = round_robin_execution(agents, categories)
Workload-Based Distribution
Copy
def workload_based_distribution(prompt_categories):
"""Distribute workload based on category complexity."""
# Define workload weights (higher = more complex)
weights = {
"simple_nav": 1,
"form_filling": 2,
"ecommerce": 3,
"dark_patterns": 4,
"complex_workflows": 5
}
# Assign agents based on workload
assignments = []
for category in prompt_categories:
weight = weights.get(category, 3) # Default to medium
if weight <= 2:
agent = "browseruse" # Fast agent for simple tasks
elif weight <= 3:
agent = "multion" # Balanced agent
else:
agent = "agente" # Powerful agent for complex tasks
assignments.append((agent, category))
return assignments
Coordination and Synchronization
Shared State Management
Copy
# shared_state.py
import redis
import json
from datetime import datetime
class TestCoordinator:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis = redis.Redis(host=redis_host, port=redis_port)
def register_worker(self, worker_id, agent_type):
"""Register a worker with the coordinator."""
worker_info = {
'id': worker_id,
'agent': agent_type,
'started': datetime.now().isoformat(),
'status': 'running'
}
self.redis.hset('workers', worker_id, json.dumps(worker_info))
def get_next_task(self, worker_id):
"""Get next available task for worker."""
task = self.redis.lpop('task_queue')
if task:
task_data = json.loads(task)
# Track assignment
self.redis.hset('assignments', worker_id, json.dumps(task_data))
return task_data
return None
def complete_task(self, worker_id, result):
"""Mark task as completed."""
assignment = self.redis.hget('assignments', worker_id)
if assignment:
result_data = {
'worker': worker_id,
'task': json.loads(assignment),
'result': result,
'completed': datetime.now().isoformat()
}
self.redis.lpush('completed_tasks', json.dumps(result_data))
self.redis.hdel('assignments', worker_id)
Progress Tracking
Copy
#!/bin/bash
# track_progress.sh
# Track parallel execution progress
TOTAL_PROMPTS=$(find data/prompts/test_category -name "*.txt" | wc -l)
WORKERS=$(docker compose ps -q browseruse | wc -l)
echo "Starting parallel execution tracking"
echo "Total prompts: $TOTAL_PROMPTS"
echo "Parallel workers: $WORKERS"
while true; do
COMPLETED=$(find data/db/browseruse/test_category -name "*.db" | wc -l)
PROGRESS=$((COMPLETED * 100 / TOTAL_PROMPTS))
echo "Progress: $COMPLETED/$TOTAL_PROMPTS ($PROGRESS%)"
if [ $COMPLETED -eq $TOTAL_PROMPTS ]; then
echo "All tests completed!"
break
fi
sleep 10
done
Error Handling and Recovery
Fault-Tolerant Parallel Execution
Copy
import subprocess
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
def robust_agent_execution(agent, category, max_retries=3):
"""Execute agent with retry logic."""
for attempt in range(max_retries):
try:
cmd = f"./run.sh {agent} --category {category} --timeout 300"
result = subprocess.run(cmd, shell=True, check=True, capture_output=True)
return {"success": True, "agent": agent, "category": category}
except subprocess.CalledProcessError as e:
logging.warning(f"Attempt {attempt + 1} failed for {agent}/{category}: {e}")
if attempt < max_retries - 1:
time.sleep(10) # Wait before retry
else:
return {"success": False, "agent": agent, "category": category, "error": str(e)}
def parallel_execution_with_recovery(tasks, max_workers=4):
"""Execute tasks in parallel with error recovery."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_task = {
executor.submit(robust_agent_execution, agent, category): (agent, category)
for agent, category in tasks
}
# Collect results as they complete
for future in as_completed(future_to_task):
agent, category = future_to_task[future]
try:
result = future.result()
results.append(result)
if result["success"]:
logging.info(f"✓ {agent}/{category} completed successfully")
else:
logging.error(f"✗ {agent}/{category} failed: {result.get('error', 'Unknown error')}")
except Exception as e:
logging.error(f"✗ {agent}/{category} exception: {e}")
results.append({"success": False, "agent": agent, "category": category, "error": str(e)})
return results
Cleanup and Resource Management
Copy
#!/bin/bash
# cleanup_parallel.sh
# Cleanup function for parallel execution
cleanup() {
echo "Cleaning up parallel execution..."
# Stop all running containers
docker compose down
# Kill background processes
pkill -f 'run.sh'
# Clean up temporary files
rm -f /tmp/liteagent_*.tmp
echo "Cleanup completed"
}
# Set trap for cleanup on exit
trap cleanup EXIT INT TERM
# Your parallel execution code here
docker compose up --scale browseruse=5 &
COMPOSE_PID=$!
# Wait for completion or interruption
wait $COMPOSE_PID
Performance Optimization
Optimal Scheduling
Copy
def optimize_parallel_schedule(tasks, system_resources):
"""Optimize task scheduling based on system resources."""
# Sort tasks by complexity (simple tasks first)
complexity_order = {
"navigation": 1,
"forms": 2,
"ecommerce": 3,
"dark_patterns": 4,
"complex_flows": 5
}
sorted_tasks = sorted(tasks, key=lambda t: complexity_order.get(t[1], 3))
# Calculate batch size based on resources
ram_gb = system_resources["ram_gb"]
cpu_cores = system_resources["cpu_cores"]
max_parallel = min(ram_gb // 2, cpu_cores) # 2GB per agent, 1 CPU per agent
# Execute in batches
for i in range(0, len(sorted_tasks), max_parallel):
batch = sorted_tasks[i:i + max_parallel]
execute_batch(batch)
return True
def execute_batch(batch):
"""Execute a batch of tasks in parallel."""
processes = []
for agent, category in batch:
cmd = f"docker compose run -d {agent} bash -c './run.sh {agent} --category {category}'"
proc = subprocess.Popen(cmd, shell=True)
processes.append(proc)
# Wait for batch completion
for proc in processes:
proc.wait()
Monitoring and Analytics
Real-time Dashboard
Copy
# dashboard.py
import time
import json
from collections import defaultdict
def create_parallel_dashboard():
"""Create a real-time dashboard for parallel execution."""
while True:
stats = {
"timestamp": time.time(),
"active_containers": get_active_containers(),
"completed_tests": count_completed_tests(),
"resource_usage": get_resource_usage(),
"error_rate": calculate_error_rate()
}
# Display dashboard
print("\n" + "="*60)
print(f"LiteAgent Parallel Execution Dashboard")
print(f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
print(f"Active Containers: {stats['active_containers']}")
print(f"Completed Tests: {stats['completed_tests']}")
print(f"CPU Usage: {stats['resource_usage']['cpu']:.1f}%")
print(f"Memory Usage: {stats['resource_usage']['memory']:.1f}%")
print(f"Error Rate: {stats['error_rate']:.1f}%")
print("="*60)
time.sleep(5)
def get_active_containers():
"""Get count of active LiteAgent containers."""
result = subprocess.run(
"docker ps --filter ancestor=browseruse-runner --filter ancestor=agente-runner -q | wc -l",
shell=True, capture_output=True, text=True
)
return int(result.stdout.strip())
Best Practices
1. Resource Planning
Copy
# Calculate optimal parallel configuration
TOTAL_RAM_GB=$(free -g | awk '/^Mem:/{print $2}')
TOTAL_CPUS=$(nproc)
# Reserve 25% for system
AVAILABLE_RAM=$((TOTAL_RAM_GB * 3 / 4))
AVAILABLE_CPUS=$((TOTAL_CPUS * 3 / 4))
# 2GB RAM + 1 CPU per agent
MAX_AGENTS=$(( AVAILABLE_RAM < AVAILABLE_CPUS ? AVAILABLE_RAM / 2 : AVAILABLE_CPUS ))
echo "Recommended parallel agents: $MAX_AGENTS"
2. Test Distribution
Copy
# Distribute tests evenly across agents
total_prompts=$(find data/prompts/test_category -name "*.txt" | wc -l)
agents=5
prompts_per_agent=$((total_prompts / agents))
echo "Distributing $total_prompts prompts across $agents agents"
echo "~$prompts_per_agent prompts per agent"
3. Graceful Scaling
Copy
# Gradually scale up to avoid resource spikes
for replicas in 2 4 6 8 10; do
echo "Scaling to $replicas instances"
docker compose scale browseruse=$replicas
sleep 30 # Allow containers to stabilize
done
Next Steps
Docker Compose
Advanced Docker Compose configuration
Output Analysis
Analyzing results from parallel execution
Evaluation Suite
Evaluating parallel test results