Skip to main content

Overview

LiteAgent supports parallel execution to significantly reduce testing time and improve throughput. You can run multiple agents simultaneously, scale individual agents, or distribute tests across different categories.

Parallel Execution Methods

1. Docker Compose Scaling

The easiest way to run tests in parallel:
# Scale single agent to multiple instances
docker compose up --scale browseruse=5

# Scale multiple agents
docker compose up --scale browseruse=3 --scale agente=2 --scale multion=1

# Background parallel execution
docker compose up -d --scale browseruse=10

2. Shell Background Processes

For local development:
# Run agents in parallel with background processes
./run.sh browseruse --category test1 &
./run.sh agente --category test2 &
./run.sh multion --category test3 &

# Wait for all to complete
wait

echo "All tests completed"

3. Python Multiprocessing

For programmatic control:
import multiprocessing
import subprocess

def run_agent(agent, category):
    """Run a single agent test."""
    cmd = f"./run.sh {agent} --category {category}"
    return subprocess.run(cmd, shell=True, capture_output=True)

# Run multiple agents in parallel
with multiprocessing.Pool(processes=4) as pool:
    tasks = [
        ("browseruse", "benchmark"),
        ("agente", "security"),
        ("multion", "ecommerce"),
        ("skyvern", "visual")
    ]

    results = pool.starmap(run_agent, tasks)

Docker Compose Parallel Configuration

Replica-Based Scaling

# docker-compose.yml
services:
  browseruse:
    build:
      context: .
      dockerfile: Dockerfile.browseruse
    volumes:
      - ./data/db:/app/data/db
      - ./data/prompts:/app/data/prompts
    deploy:
      replicas: 5  # Always run 5 instances
    environment:
      - WORKER_ID=${WORKER_ID}

Dynamic Scaling

# Scale services dynamically
docker compose up browseruse  # Start 1 instance
docker compose scale browseruse=10  # Scale to 10 instances
docker compose scale browseruse=5   # Scale down to 5

Multiple Agent Types

# docker-compose.parallel.yml
services:
  browseruse-team:
    extends: browseruse
    deploy:
      replicas: 5
    command: ["bash", "-c", "./run.sh browseruse --category browser_tests"]

  agente-team:
    extends: agente
    deploy:
      replicas: 3
    command: ["bash", "-c", "./run.sh agente --category reasoning_tests"]

  multion-team:
    extends: multion
    deploy:
      replicas: 2
    command: ["bash", "-c", "./run.sh multion --category visual_tests"]

Advanced Parallel Strategies

Category-Based Distribution

#!/bin/bash
# parallel_categories.sh

categories=("benchmark" "dark_patterns" "security" "performance" "regression")
agents=("browseruse" "agente" "multion")

for category in "${categories[@]}"; do
    for agent in "${agents[@]}"; do
        echo "Starting $agent with $category"
        docker compose run -d $agent bash -c "./run.sh $agent --category $category" &
    done
done

wait
echo "All category tests completed"

Prompt File Distribution

#!/usr/bin/env python3
# distribute_prompts.py

import os
import glob
import subprocess
import multiprocessing
from pathlib import Path

def run_single_prompt(agent, prompt_file):
    """Run agent on a single prompt file."""
    site_line = ""
    task_lines = []

    with open(prompt_file, 'r') as f:
        lines = f.readlines()
        site_line = lines[0].strip()
        task_lines = [line.strip() for line in lines[1:] if line.strip()]

    task = " ".join(task_lines)

    cmd = [
        "python", "main.py", agent,
        "--site", site_line,
        "--task", task,
        "--timeout", "300"
    ]

    return subprocess.run(cmd, capture_output=True, text=True)

def parallel_prompt_execution(agent, prompt_dir, max_workers=4):
    """Execute all prompts in parallel."""
    prompt_files = glob.glob(f"{prompt_dir}/*.txt")

    with multiprocessing.Pool(max_workers) as pool:
        tasks = [(agent, pf) for pf in prompt_files]
        results = pool.starmap(run_single_prompt, tasks)

    return results

if __name__ == "__main__":
    # Run BrowserUse on all benchmark prompts in parallel
    results = parallel_prompt_execution("browseruse", "data/prompts/benchmark", max_workers=6)

    success_count = sum(1 for r in results if r.returncode == 0)
    print(f"Completed {success_count}/{len(results)} tests successfully")

Resource Management

CPU and Memory Allocation

# Resource-aware parallel execution
services:
  browseruse:
    deploy:
      replicas: 4
      resources:
        limits:
          memory: 3G
          cpus: '1.5'
        reservations:
          memory: 2G
          cpus: '1.0'

Calculating Optimal Parallelism

#!/bin/bash
# calculate_optimal_workers.sh

# Get system resources
TOTAL_RAM=$(free -g | awk '/^Mem:/{print $2}')
TOTAL_CPU=$(nproc)

# Resource requirements per agent
RAM_PER_AGENT=2  # GB
CPU_PER_AGENT=1

# Calculate optimal workers
MAX_BY_RAM=$((TOTAL_RAM / RAM_PER_AGENT))
MAX_BY_CPU=$((TOTAL_CPU / CPU_PER_AGENT))

OPTIMAL_WORKERS=$(( MAX_BY_RAM < MAX_BY_CPU ? MAX_BY_RAM : MAX_BY_CPU ))

# Leave some resources for system
OPTIMAL_WORKERS=$((OPTIMAL_WORKERS - 1))

echo "System: ${TOTAL_RAM}GB RAM, ${TOTAL_CPU} CPUs"
echo "Optimal parallel workers: ${OPTIMAL_WORKERS}"

# Run with optimal scaling
docker compose up --scale browseruse=$OPTIMAL_WORKERS

Resource Monitoring

#!/bin/bash
# monitor_parallel_execution.sh

# Monitor resource usage during parallel execution
while true; do
    echo "=== $(date) ==="

    # Docker stats
    docker stats --no-stream --format 'table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}'

    # System resources
    echo "System CPU: $(top -bn1 | grep 'Cpu(s)' | awk '{print $2}' | cut -d'%' -f1)%"
    echo "System RAM: $(free | grep Mem | awk '{printf("%.1f%%", $3/$2 * 100.0)}')"

    echo "---"
    sleep 30
done

Load Balancing Strategies

Round-Robin Category Assignment

# load_balancer.py
import itertools
import subprocess

def round_robin_execution(agents, categories):
    """Distribute categories across agents in round-robin fashion."""
    agent_cycle = itertools.cycle(agents)

    assignments = []
    for category in categories:
        agent = next(agent_cycle)
        assignments.append((agent, category))

    # Execute in parallel
    processes = []
    for agent, category in assignments:
        cmd = f"docker compose run -d {agent} bash -c './run.sh {agent} --category {category}'"
        proc = subprocess.Popen(cmd, shell=True)
        processes.append(proc)

    # Wait for completion
    for proc in processes:
        proc.wait()

    return assignments

# Usage
agents = ["browseruse", "agente", "multion"]
categories = ["test1", "test2", "test3", "test4", "test5", "test6"]
assignments = round_robin_execution(agents, categories)

Workload-Based Distribution

def workload_based_distribution(prompt_categories):
    """Distribute workload based on category complexity."""

    # Define workload weights (higher = more complex)
    weights = {
        "simple_nav": 1,
        "form_filling": 2,
        "ecommerce": 3,
        "dark_patterns": 4,
        "complex_workflows": 5
    }

    # Assign agents based on workload
    assignments = []
    for category in prompt_categories:
        weight = weights.get(category, 3)  # Default to medium

        if weight <= 2:
            agent = "browseruse"  # Fast agent for simple tasks
        elif weight <= 3:
            agent = "multion"     # Balanced agent
        else:
            agent = "agente"      # Powerful agent for complex tasks

        assignments.append((agent, category))

    return assignments

Coordination and Synchronization

Shared State Management

# shared_state.py
import redis
import json
from datetime import datetime

class TestCoordinator:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.redis = redis.Redis(host=redis_host, port=redis_port)

    def register_worker(self, worker_id, agent_type):
        """Register a worker with the coordinator."""
        worker_info = {
            'id': worker_id,
            'agent': agent_type,
            'started': datetime.now().isoformat(),
            'status': 'running'
        }
        self.redis.hset('workers', worker_id, json.dumps(worker_info))

    def get_next_task(self, worker_id):
        """Get next available task for worker."""
        task = self.redis.lpop('task_queue')
        if task:
            task_data = json.loads(task)
            # Track assignment
            self.redis.hset('assignments', worker_id, json.dumps(task_data))
            return task_data
        return None

    def complete_task(self, worker_id, result):
        """Mark task as completed."""
        assignment = self.redis.hget('assignments', worker_id)
        if assignment:
            result_data = {
                'worker': worker_id,
                'task': json.loads(assignment),
                'result': result,
                'completed': datetime.now().isoformat()
            }
            self.redis.lpush('completed_tasks', json.dumps(result_data))
            self.redis.hdel('assignments', worker_id)

Progress Tracking

#!/bin/bash
# track_progress.sh

# Track parallel execution progress
TOTAL_PROMPTS=$(find data/prompts/test_category -name "*.txt" | wc -l)
WORKERS=$(docker compose ps -q browseruse | wc -l)

echo "Starting parallel execution tracking"
echo "Total prompts: $TOTAL_PROMPTS"
echo "Parallel workers: $WORKERS"

while true; do
    COMPLETED=$(find data/db/browseruse/test_category -name "*.db" | wc -l)
    PROGRESS=$((COMPLETED * 100 / TOTAL_PROMPTS))

    echo "Progress: $COMPLETED/$TOTAL_PROMPTS ($PROGRESS%)"

    if [ $COMPLETED -eq $TOTAL_PROMPTS ]; then
        echo "All tests completed!"
        break
    fi

    sleep 10
done

Error Handling and Recovery

Fault-Tolerant Parallel Execution

import subprocess
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

def robust_agent_execution(agent, category, max_retries=3):
    """Execute agent with retry logic."""
    for attempt in range(max_retries):
        try:
            cmd = f"./run.sh {agent} --category {category} --timeout 300"
            result = subprocess.run(cmd, shell=True, check=True, capture_output=True)
            return {"success": True, "agent": agent, "category": category}

        except subprocess.CalledProcessError as e:
            logging.warning(f"Attempt {attempt + 1} failed for {agent}/{category}: {e}")
            if attempt < max_retries - 1:
                time.sleep(10)  # Wait before retry
            else:
                return {"success": False, "agent": agent, "category": category, "error": str(e)}

def parallel_execution_with_recovery(tasks, max_workers=4):
    """Execute tasks in parallel with error recovery."""
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_task = {
            executor.submit(robust_agent_execution, agent, category): (agent, category)
            for agent, category in tasks
        }

        # Collect results as they complete
        for future in as_completed(future_to_task):
            agent, category = future_to_task[future]
            try:
                result = future.result()
                results.append(result)

                if result["success"]:
                    logging.info(f"✓ {agent}/{category} completed successfully")
                else:
                    logging.error(f"✗ {agent}/{category} failed: {result.get('error', 'Unknown error')}")

            except Exception as e:
                logging.error(f"✗ {agent}/{category} exception: {e}")
                results.append({"success": False, "agent": agent, "category": category, "error": str(e)})

    return results

Cleanup and Resource Management

#!/bin/bash
# cleanup_parallel.sh

# Cleanup function for parallel execution
cleanup() {
    echo "Cleaning up parallel execution..."

    # Stop all running containers
    docker compose down

    # Kill background processes
    pkill -f 'run.sh'

    # Clean up temporary files
    rm -f /tmp/liteagent_*.tmp

    echo "Cleanup completed"
}

# Set trap for cleanup on exit
trap cleanup EXIT INT TERM

# Your parallel execution code here
docker compose up --scale browseruse=5 &
COMPOSE_PID=$!

# Wait for completion or interruption
wait $COMPOSE_PID

Performance Optimization

Optimal Scheduling

def optimize_parallel_schedule(tasks, system_resources):
    """Optimize task scheduling based on system resources."""

    # Sort tasks by complexity (simple tasks first)
    complexity_order = {
        "navigation": 1,
        "forms": 2,
        "ecommerce": 3,
        "dark_patterns": 4,
        "complex_flows": 5
    }

    sorted_tasks = sorted(tasks, key=lambda t: complexity_order.get(t[1], 3))

    # Calculate batch size based on resources
    ram_gb = system_resources["ram_gb"]
    cpu_cores = system_resources["cpu_cores"]

    max_parallel = min(ram_gb // 2, cpu_cores)  # 2GB per agent, 1 CPU per agent

    # Execute in batches
    for i in range(0, len(sorted_tasks), max_parallel):
        batch = sorted_tasks[i:i + max_parallel]
        execute_batch(batch)

    return True

def execute_batch(batch):
    """Execute a batch of tasks in parallel."""
    processes = []

    for agent, category in batch:
        cmd = f"docker compose run -d {agent} bash -c './run.sh {agent} --category {category}'"
        proc = subprocess.Popen(cmd, shell=True)
        processes.append(proc)

    # Wait for batch completion
    for proc in processes:
        proc.wait()

Monitoring and Analytics

Real-time Dashboard

# dashboard.py
import time
import json
from collections import defaultdict

def create_parallel_dashboard():
    """Create a real-time dashboard for parallel execution."""

    while True:
        stats = {
            "timestamp": time.time(),
            "active_containers": get_active_containers(),
            "completed_tests": count_completed_tests(),
            "resource_usage": get_resource_usage(),
            "error_rate": calculate_error_rate()
        }

        # Display dashboard
        print("\n" + "="*60)
        print(f"LiteAgent Parallel Execution Dashboard")
        print(f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        print("="*60)
        print(f"Active Containers: {stats['active_containers']}")
        print(f"Completed Tests: {stats['completed_tests']}")
        print(f"CPU Usage: {stats['resource_usage']['cpu']:.1f}%")
        print(f"Memory Usage: {stats['resource_usage']['memory']:.1f}%")
        print(f"Error Rate: {stats['error_rate']:.1f}%")
        print("="*60)

        time.sleep(5)

def get_active_containers():
    """Get count of active LiteAgent containers."""
    result = subprocess.run(
        "docker ps --filter ancestor=browseruse-runner --filter ancestor=agente-runner -q | wc -l",
        shell=True, capture_output=True, text=True
    )
    return int(result.stdout.strip())

Best Practices

1. Resource Planning

# Calculate optimal parallel configuration
TOTAL_RAM_GB=$(free -g | awk '/^Mem:/{print $2}')
TOTAL_CPUS=$(nproc)

# Reserve 25% for system
AVAILABLE_RAM=$((TOTAL_RAM_GB * 3 / 4))
AVAILABLE_CPUS=$((TOTAL_CPUS * 3 / 4))

# 2GB RAM + 1 CPU per agent
MAX_AGENTS=$(( AVAILABLE_RAM < AVAILABLE_CPUS ? AVAILABLE_RAM / 2 : AVAILABLE_CPUS ))

echo "Recommended parallel agents: $MAX_AGENTS"

2. Test Distribution

# Distribute tests evenly across agents
total_prompts=$(find data/prompts/test_category -name "*.txt" | wc -l)
agents=5
prompts_per_agent=$((total_prompts / agents))

echo "Distributing $total_prompts prompts across $agents agents"
echo "~$prompts_per_agent prompts per agent"

3. Graceful Scaling

# Gradually scale up to avoid resource spikes
for replicas in 2 4 6 8 10; do
    echo "Scaling to $replicas instances"
    docker compose scale browseruse=$replicas
    sleep 30  # Allow containers to stabilize
done

Next Steps

Docker Compose

Advanced Docker Compose configuration

Output Analysis

Analyzing results from parallel execution

Evaluation Suite

Evaluating parallel test results
I