---
title: "Load Testing AI Agents: Simulating Concurrent Users and Measuring Performance"
description: "Learn how to load test AI agent systems using Locust and k6, simulate concurrent agent sessions, measure throughput and latency, and identify performance bottlenecks."
canonical: https://callsphere.ai/blog/load-testing-ai-agents-concurrent-users-performance
category: "Learn Agentic AI"
tags: ["Load Testing", "Performance", "AI Agents", "Locust", "k6", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T08:49:44.886Z
---

# Load Testing AI Agents: Simulating Concurrent Users and Measuring Performance

> Learn how to load test AI agent systems using Locust and k6, simulate concurrent agent sessions, measure throughput and latency, and identify performance bottlenecks.

## Why AI Agents Need Load Testing

AI agents have unique performance characteristics that differ from traditional web services. A single agent request can trigger multiple LLM calls, tool executions, and memory lookups — turning a 200ms API endpoint into a 5-30 second workflow. When 100 users hit this simultaneously, you need to know whether your system queues requests gracefully or falls over.

Load testing AI agents reveals bottlenecks in LLM rate limits, connection pool exhaustion, memory leaks in long-running sessions, and concurrency bugs in shared state.

## Load Testing with Locust

Locust is a Python-based load testing framework that models each simulated user as a coroutine.

```mermaid
flowchart LR
    PR(["PR opened"])
    UNIT["Unit tests"]
    EVAL["Eval harness
PromptFoo or Braintrust"]
    GOLD[("Golden set
200 tagged cases")]
    JUDGE["LLM as judge
plus regex graders"]
    SCORE["Aggregate score
and per slice"]
    GATE{"Score regress
more than 2 percent?"}
    BLOCK(["Block merge"])
    MERGE(["Merge to main"])
    PR --> UNIT --> EVAL --> GOLD --> JUDGE --> SCORE --> GATE
    GATE -->|Yes| BLOCK
    GATE -->|No| MERGE
    style EVAL fill:#4f46e5,stroke:#4338ca,color:#fff
    style GATE fill:#f59e0b,stroke:#d97706,color:#1f2937
    style BLOCK fill:#dc2626,stroke:#b91c1c,color:#fff
    style MERGE fill:#059669,stroke:#047857,color:#fff
```

```python
# locustfile.py
from locust import HttpUser, task, between, events
import json
import time

class AgentUser(HttpUser):
    wait_time = between(2, 5)  # seconds between requests per user

    def on_start(self):
        """Create a session for this simulated user."""
        response = self.client.post("/api/sessions", json={
            "user_id": f"loadtest-{self.environment.runner.user_count}",
        })
        self.session_id = response.json()["session_id"]

    @task(3)
    def simple_question(self):
        """Most common: a single-turn question."""
        self.client.post(
            f"/api/sessions/{self.session_id}/messages",
            json={"content": "What are your business hours?"},
            name="/api/sessions/[id]/messages - simple",
        )

    @task(2)
    def tool_calling_question(self):
        """Triggers tool execution on the server."""
        self.client.post(
            f"/api/sessions/{self.session_id}/messages",
            json={"content": "Look up order #12345"},
            name="/api/sessions/[id]/messages - tool_call",
        )

    @task(1)
    def multi_turn_conversation(self):
        """Simulates a 3-message conversation."""
        messages = [
            "I need help with my account",
            "My email is test@example.com",
            "I want to change my plan to premium",
        ]
        for msg in messages:
            self.client.post(
                f"/api/sessions/{self.session_id}/messages",
                json={"content": msg},
                name="/api/sessions/[id]/messages - multi_turn",
            )
            time.sleep(1)  # Simulate user reading the response
```

Run it with increasing concurrency:

```bash
# Start with 10 users, ramp up by 2 per second
locust -f locustfile.py --host=http://localhost:8000 \
    --users 10 --spawn-rate 2 --run-time 5m --headless
```

## Measuring Agent-Specific Metrics

Standard latency metrics are not enough. Track agent-specific measurements.

```python
import time
from dataclasses import dataclass, field

@dataclass
class AgentMetrics:
    request_latencies: list[float] = field(default_factory=list)
    llm_call_counts: list[int] = field(default_factory=list)
    tool_call_counts: list[int] = field(default_factory=list)
    token_usages: list[int] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)

    def record(self, latency: float, llm_calls: int, tool_calls: int, tokens: int):
        self.request_latencies.append(latency)
        self.llm_call_counts.append(llm_calls)
        self.tool_call_counts.append(tool_calls)
        self.token_usages.append(tokens)

    def summary(self) -> dict:
        import statistics
        lats = self.request_latencies
        return {
            "total_requests": len(lats),
            "p50_latency": round(statistics.median(lats), 2),
            "p95_latency": round(sorted(lats)[int(len(lats) * 0.95)], 2),
            "p99_latency": round(sorted(lats)[int(len(lats) * 0.99)], 2),
            "avg_llm_calls_per_request": round(
                statistics.mean(self.llm_call_counts), 1
            ),
            "avg_tokens_per_request": round(
                statistics.mean(self.token_usages), 0
            ),
            "error_rate": round(len(self.errors) / max(len(lats), 1) * 100, 2),
        }
```

## Testing Rate Limit Behavior

LLM providers enforce rate limits (tokens per minute, requests per minute). Verify your agent degrades gracefully.

```python
import asyncio
import aiohttp

async def test_rate_limit_handling(base_url: str, concurrent: int = 50):
    """Send concurrent requests to trigger rate limiting."""
    results = {"success": 0, "rate_limited": 0, "error": 0}

    async def send_request(session, i):
        try:
            async with session.post(
                f"{base_url}/api/sessions/test/messages",
                json={"content": f"Test message {i}"},
                timeout=aiohttp.ClientTimeout(total=60),
            ) as resp:
                if resp.status == 200:
                    results["success"] += 1
                elif resp.status == 429:
                    results["rate_limited"] += 1
                    data = await resp.json()
                    assert "retry" in data.get("message", "").lower()
                else:
                    results["error"] += 1
        except asyncio.TimeoutError:
            results["error"] += 1

    async with aiohttp.ClientSession() as session:
        tasks = [send_request(session, i) for i in range(concurrent)]
        await asyncio.gather(*tasks)

    print(f"Results: {results}")
    assert results["error"] == 0, "Errors should be handled as 429, not 500"
    return results
```

## Load Testing with k6

For teams that prefer JavaScript, k6 provides excellent performance testing.

```javascript
// k6-agent-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('agent_errors');
const agentLatency = new Trend('agent_latency', true);

export const options = {
  stages: [
    { duration: '1m', target: 10 },   // ramp up
    { duration: '3m', target: 50 },   // sustained load
    { duration: '1m', target: 100 },  // peak load
    { duration: '1m', target: 0 },    // ramp down
  ],
  thresholds: {
    agent_latency: ['p(95) r.status === 200,
    'response has content': (r) => r.json().content !== undefined,
    'latency under 20s': (r) => r.timings.duration < 20000,
  });

  sleep(Math.random() * 3 + 1);
}
```

Run with: `k6 run k6-agent-test.js`

## Identifying Bottlenecks

After a load test, analyze where time is spent per request.

```python
# Instrument your agent endpoint
import time
import logging

logger = logging.getLogger(__name__)

async def handle_message(session_id: str, content: str):
    timings = {}

    t0 = time.monotonic()
    context = await load_session_context(session_id)
    timings["context_load"] = time.monotonic() - t0

    t0 = time.monotonic()
    llm_response = await call_llm(context, content)
    timings["llm_call"] = time.monotonic() - t0

    t0 = time.monotonic()
    result = await execute_tools(llm_response.tool_calls)
    timings["tool_execution"] = time.monotonic() - t0

    logger.info(f"Request timings: {timings}")
    return result
```

## FAQ

### What is a reasonable latency target for AI agents?

For synchronous responses, target under 10 seconds at p95. For streaming responses, target first-token latency under 2 seconds. These numbers depend heavily on the model and number of tool calls involved.

### How do I load test streaming endpoints?

Use WebSocket or SSE clients in your load test scripts. Measure time-to-first-byte separately from total completion time. Locust supports WebSocket via the `locust-plugins` package.

### Should I use my production LLM account for load tests?

No. Use a separate API key with its own rate limits and budget caps. Some teams use a cheaper model (gpt-4o-mini) for load testing and only run a small number of tests against the production model.

---

#LoadTesting #Performance #AIAgents #Locust #K6 #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/load-testing-ai-agents-concurrent-users-performance
