---
title: "Building an Async Agent Worker Pool: Concurrent Session Processing at Scale"
description: "Design and implement an async worker pool for processing concurrent AI agent sessions. Learn health monitoring, dynamic scaling, graceful drain, and production deployment patterns."
canonical: https://callsphere.ai/blog/building-async-agent-worker-pool-concurrent-session-processing-scale
category: "Learn Agentic AI"
tags: ["Python", "Worker Pool", "asyncio", "Scaling", "Production"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-08T12:25:12.431Z
---

# Building an Async Agent Worker Pool: Concurrent Session Processing at Scale

> Design and implement an async worker pool for processing concurrent AI agent sessions. Learn health monitoring, dynamic scaling, graceful drain, and production deployment patterns.

## Why Worker Pools for AI Agents

A single async event loop can handle hundreds of concurrent agent sessions. But without structure, you get uncontrolled resource usage: unbounded memory growth, connection exhaustion, and cascading failures when one overloaded component drags down everything else.

A worker pool provides bounded concurrency, health monitoring, and graceful lifecycle management. Each worker processes one agent session at a time, and the pool controls how many workers run simultaneously. This architecture is the foundation for production AI agent deployments.

## Core Worker Pool Design

```python
import asyncio
import uuid
import time
from dataclasses import dataclass, field
from enum import Enum

class WorkerState(Enum):
    IDLE = "idle"
    BUSY = "busy"
    DRAINING = "draining"
    STOPPED = "stopped"

@dataclass
class WorkerStats:
    tasks_completed: int = 0
    tasks_failed: int = 0
    total_processing_time: float = 0.0
    last_heartbeat: float = field(default_factory=time.monotonic)

@dataclass
class AgentJob:
    job_id: str
    session_id: str
    payload: dict
    created_at: float = field(default_factory=time.monotonic)

class AgentWorker:
    """Individual worker that processes agent sessions."""

    def __init__(self, worker_id: str, agent_factory):
        self.worker_id = worker_id
        self.state = WorkerState.IDLE
        self.stats = WorkerStats()
        self._agent_factory = agent_factory
        self._current_job: AgentJob | None = None

    async def process(self, job: AgentJob) -> dict:
        """Process a single agent job."""
        self.state = WorkerState.BUSY
        self._current_job = job
        start = time.monotonic()

        try:
            agent = self._agent_factory()
            result = await agent.run(
                session_id=job.session_id,
                payload=job.payload,
            )
            self.stats.tasks_completed += 1
            return {"status": "success", "result": result}

        except Exception as e:
            self.stats.tasks_failed += 1
            return {"status": "error", "error": str(e)}

        finally:
            elapsed = time.monotonic() - start
            self.stats.total_processing_time += elapsed
            self.stats.last_heartbeat = time.monotonic()
            self._current_job = None
            if self.state != WorkerState.DRAINING:
                self.state = WorkerState.IDLE
```

## The Worker Pool Manager

The pool manages a fixed number of workers and routes incoming jobs through a bounded queue.

```mermaid
flowchart LR
    USERS(["Traffic"])
    LB["Geo LB plus
Anycast"]
    EDGE["Edge cache plus
rate limit"]
    APP["Stateless app pods
HPA on QPS"]
    QUEUE[(Async work queue)]
    WORKER["Worker pool
GPU or CPU"]
    CACHE[("Redis cache
LLM responses")]
    DB[("Read replicas
and primary")]
    OBS[(Observability)]
    USERS --> LB --> EDGE --> APP
    APP --> CACHE
    APP --> QUEUE --> WORKER
    APP --> DB
    APP --> OBS
    style LB fill:#4f46e5,stroke:#4338ca,color:#fff
    style WORKER fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style CACHE fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#0ea5e9,stroke:#0369a1,color:#fff
```

```python
class AgentWorkerPool:
    """Pool of async workers for concurrent agent processing."""

    def __init__(
        self,
        num_workers: int = 10,
        max_queue_size: int = 100,
        agent_factory=None,
    ):
        self.num_workers = num_workers
        self.max_queue_size = max_queue_size
        self._agent_factory = agent_factory
        self._queue: asyncio.Queue[AgentJob | None] = asyncio.Queue(
            maxsize=max_queue_size
        )
        self._workers: list[AgentWorker] = []
        self._worker_tasks: list[asyncio.Task] = []
        self._results: dict[str, asyncio.Future] = {}
        self._running = False

    async def start(self):
        """Start all workers in the pool."""
        self._running = True
        for i in range(self.num_workers):
            worker = AgentWorker(
                worker_id=f"worker-{i:03d}",
                agent_factory=self._agent_factory,
            )
            self._workers.append(worker)
            task = asyncio.create_task(
                self._worker_loop(worker),
                name=f"worker-{i:03d}",
            )
            self._worker_tasks.append(task)
        print(f"Worker pool started with {self.num_workers} workers")

    async def _worker_loop(self, worker: AgentWorker):
        """Main loop for a single worker."""
        while self._running or not self._queue.empty():
            try:
                job = await asyncio.wait_for(
                    self._queue.get(), timeout=1.0
                )
            except TimeoutError:
                worker.stats.last_heartbeat = time.monotonic()
                continue

            if job is None:  # Shutdown sentinel
                break

            result = await worker.process(job)

            # Deliver result to the waiting caller
            if job.job_id in self._results:
                self._results[job.job_id].set_result(result)

            self._queue.task_done()

        worker.state = WorkerState.STOPPED

    async def submit(self, session_id: str, payload: dict) -> dict:
        """Submit a job and wait for the result."""
        job = AgentJob(
            job_id=str(uuid.uuid4()),
            session_id=session_id,
            payload=payload,
        )

        # Create a future to receive the result
        future = asyncio.get_running_loop().create_future()
        self._results[job.job_id] = future

        try:
            await self._queue.put(job)
            result = await future
            return result
        finally:
            self._results.pop(job.job_id, None)

    async def submit_nowait(
        self, session_id: str, payload: dict
    ) -> str:
        """Submit a job without waiting. Returns job_id."""
        job = AgentJob(
            job_id=str(uuid.uuid4()),
            session_id=session_id,
            payload=payload,
        )
        self._queue.put_nowait(job)  # Raises QueueFull if at capacity
        return job.job_id
```

## Health Monitoring

Monitor worker health to detect stuck tasks and unhealthy workers.

```python
    async def health_check(self) -> dict:
        """Return pool health metrics."""
        now = time.monotonic()
        stale_threshold = 120.0  # 2 minutes without heartbeat

        worker_states = {}
        stale_workers = []
        for worker in self._workers:
            worker_states[worker.worker_id] = {
                "state": worker.state.value,
                "completed": worker.stats.tasks_completed,
                "failed": worker.stats.tasks_failed,
                "avg_time": (
                    worker.stats.total_processing_time
                    / max(worker.stats.tasks_completed, 1)
                ),
                "last_heartbeat_ago": now - worker.stats.last_heartbeat,
            }
            if now - worker.stats.last_heartbeat > stale_threshold:
                stale_workers.append(worker.worker_id)

        return {
            "pool_size": self.num_workers,
            "queue_size": self._queue.qsize(),
            "queue_capacity": self.max_queue_size,
            "utilization": sum(
                1 for w in self._workers
                if w.state == WorkerState.BUSY
            ) / self.num_workers,
            "total_completed": sum(
                w.stats.tasks_completed for w in self._workers
            ),
            "total_failed": sum(
                w.stats.tasks_failed for w in self._workers
            ),
            "stale_workers": stale_workers,
            "workers": worker_states,
        }
```

## Dynamic Scaling

Adjust pool size based on queue depth and worker utilization.

```python
    async def auto_scale(
        self,
        min_workers: int = 2,
        max_workers: int = 50,
        scale_up_threshold: float = 0.8,
        scale_down_threshold: float = 0.3,
        check_interval: float = 10.0,
    ):
        """Automatically scale workers based on load."""
        while self._running:
            await asyncio.sleep(check_interval)

            health = await self.health_check()
            utilization = health["utilization"]
            queue_ratio = (
                health["queue_size"] / health["queue_capacity"]
            )

            if (utilization > scale_up_threshold
                    or queue_ratio > 0.5):
                # Scale up
                new_count = min(
                    max_workers,
                    self.num_workers + max(1, self.num_workers // 4),
                )
                if new_count > self.num_workers:
                    await self._add_workers(
                        new_count - self.num_workers
                    )
                    print(f"Scaled up to {self.num_workers} workers")

            elif (utilization  min_workers):
                # Scale down
                new_count = max(
                    min_workers,
                    self.num_workers - max(1, self.num_workers // 4),
                )
                if new_count = count:
                break
            if worker.state == WorkerState.IDLE:
                worker.state = WorkerState.DRAINING
                await self._queue.put(None)  # Sentinel to stop
                removed += 1
                self.num_workers -= 1
```

## Graceful Drain

When shutting down, finish in-flight jobs before stopping.

```python
    async def drain(self, timeout: float = 60.0):
        """Gracefully drain and shut down the pool."""
        print("Draining worker pool...")
        self._running = False

        # Send shutdown sentinels for all workers
        for _ in self._workers:
            await self._queue.put(None)

        # Wait for all workers to finish
        try:
            async with asyncio.timeout(timeout):
                await asyncio.gather(
                    *self._worker_tasks,
                    return_exceptions=True,
                )
        except TimeoutError:
            print(f"Drain timeout. Cancelling remaining tasks.")
            for task in self._worker_tasks:
                if not task.done():
                    task.cancel()

        final_health = await self.health_check()
        print(
            f"Pool drained. Completed: {final_health['total_completed']}, "
            f"Failed: {final_health['total_failed']}"
        )
```

## Integrating with FastAPI

```python
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException

pool: AgentWorkerPool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = AgentWorkerPool(
        num_workers=10,
        max_queue_size=200,
        agent_factory=create_agent,
    )
    await pool.start()
    # Start auto-scaler in background
    scaler = asyncio.create_task(pool.auto_scale())
    yield
    scaler.cancel()
    await pool.drain(timeout=30.0)

app = FastAPI(lifespan=lifespan)

@app.post("/api/agent/run")
async def run_agent(session_id: str, query: str):
    try:
        result = await pool.submit(
            session_id=session_id,
            payload={"query": query},
        )
        return result
    except asyncio.QueueFull:
        raise HTTPException(
            status_code=503,
            detail="Agent pool at capacity. Retry later.",
        )

@app.get("/api/agent/health")
async def agent_health():
    return await pool.health_check()
```

## FAQ

### How do I choose the right number of workers?

Start with workers equal to your expected concurrent sessions. If each agent session takes 5 seconds and you expect 20 requests per second peak, you need at least 100 workers. Monitor utilization in production: sustained above 80% means you need more workers or faster processing. Below 30% means you are over-provisioned.

### What happens when the job queue is full?

With `submit_nowait`, an `asyncio.QueueFull` exception is raised immediately, which you should translate to an HTTP 503 (Service Unavailable) with a Retry-After header. With `submit`, the call blocks until queue space opens. Always set a reasonable `max_queue_size` to prevent unbounded memory growth during traffic spikes.

### How is this different from just using asyncio.Semaphore?

A semaphore limits concurrency but provides no structure. A worker pool adds job queuing, health monitoring, per-worker statistics, dynamic scaling, and graceful shutdown. For simple scripts, a semaphore is fine. For production services processing thousands of agent sessions, the worker pool architecture is essential for observability and reliability.

---

#Python #WorkerPool #Asyncio #Scaling #Production #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/building-async-agent-worker-pool-concurrent-session-processing-scale
