---
title: "Building a Custom Agent Orchestrator: When Off-the-Shelf Tools Are Not Enough"
description: "Learn when and how to build a custom agent orchestrator. Covers state machine design, queue integration, monitoring hooks, and the architectural patterns that make custom orchestrators maintainable."
canonical: https://callsphere.ai/blog/building-custom-agent-orchestrator-when-off-shelf-not-enough
category: "Learn Agentic AI"
tags: ["Agent Orchestration", "Custom Architecture", "State Machine", "Python", "System Design"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:45.499Z
---

# Building a Custom Agent Orchestrator: When Off-the-Shelf Tools Are Not Enough

> Learn when and how to build a custom agent orchestrator. Covers state machine design, queue integration, monitoring hooks, and the architectural patterns that make custom orchestrators maintainable.

## When to Build Custom

Off-the-shelf orchestration platforms like Temporal, Prefect, and Airflow solve 80% of workflow needs. But AI agent systems sometimes hit the other 20%:

- **Sub-second latency requirements** that batch-oriented schedulers cannot meet
- **Custom LLM routing logic** that needs to inspect token counts, model availability, and cost in real time
- **Tight integration with existing infrastructure** that would require fighting an orchestration framework's opinions
- **Specialized retry semantics** — for example, retrying with a different model when one returns low-confidence results
- **Multi-tenant isolation** requirements that off-the-shelf tools do not support natively

If your agent system has any of these constraints, a custom orchestrator may be the right choice. The key is to build it with clear boundaries so it remains maintainable.

## Core Architecture

A custom orchestrator has four components: a **state machine** that tracks workflow progress, a **task queue** that distributes work, a **worker pool** that executes tasks, and a **persistence layer** that stores state.

```mermaid
flowchart TD
    INPUT(["Task input"])
    SUPER["Supervisor agent
plans plus monitors"]
    W1["Worker 1
research"]
    W2["Worker 2
code"]
    W3["Worker 3
writing"]
    CRITIC{"Output meets
rubric?"}
    REWORK["Rework or
retry path"]
    SHARED[("Shared scratchpad
and memory")]
    OUT(["Final result"])
    INPUT --> SUPER
    SUPER --> W1 --> CRITIC
    SUPER --> W2 --> CRITIC
    SUPER --> W3 --> CRITIC
    W1 --> SHARED
    W2 --> SHARED
    W3 --> SHARED
    SHARED --> SUPER
    CRITIC -->|Pass| OUT
    CRITIC -->|Fail| REWORK --> SUPER
    style SUPER fill:#4f46e5,stroke:#4338ca,color:#fff
    style CRITIC fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OUT fill:#059669,stroke:#047857,color:#fff
    style SHARED fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
```

```python
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

class WorkflowStatus(Enum):
    CREATED = "created"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"

@dataclass
class Step:
    name: str
    handler: str  # Dotted path to the handler function
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: str | None = None
    attempts: int = 0
    max_retries: int = 3
    started_at: datetime | None = None
    completed_at: datetime | None = None

@dataclass
class Workflow:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    status: WorkflowStatus = WorkflowStatus.CREATED
    steps: list[Step] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)
```

## The State Machine

The state machine enforces valid transitions and prevents workflows from entering inconsistent states.

```python
class WorkflowStateMachine:
    VALID_TRANSITIONS = {
        WorkflowStatus.CREATED: {WorkflowStatus.RUNNING},
        WorkflowStatus.RUNNING: {
            WorkflowStatus.COMPLETED,
            WorkflowStatus.FAILED,
            WorkflowStatus.PAUSED,
        },
        WorkflowStatus.PAUSED: {WorkflowStatus.RUNNING, WorkflowStatus.FAILED},
        WorkflowStatus.FAILED: {WorkflowStatus.RUNNING},  # Allow restart
    }

    def transition(self, workflow: Workflow, new_status: WorkflowStatus):
        allowed = self.VALID_TRANSITIONS.get(workflow.status, set())
        if new_status not in allowed:
            raise ValueError(
                f"Cannot transition from {workflow.status} to {new_status}"
            )
        workflow.status = new_status
        workflow.updated_at = datetime.utcnow()
```

## Queue Integration

Use Redis Streams or a similar lightweight queue to distribute work to workers.

```python
import redis.asyncio as redis
import json

class TaskQueue:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.stream = "agent:tasks"
        self.group = "agent-workers"

    async def initialize(self):
        try:
            await self.redis.xgroup_create(
                self.stream, self.group, mkstream=True
            )
        except redis.ResponseError:
            pass  # Group already exists

    async def enqueue(self, workflow_id: str, step_name: str, payload: dict):
        await self.redis.xadd(
            self.stream,
            {
                "workflow_id": workflow_id,
                "step_name": step_name,
                "payload": json.dumps(payload),
            },
        )

    async def dequeue(self, consumer: str, count: int = 1, block_ms: int = 5000):
        messages = await self.redis.xreadgroup(
            self.group,
            consumer,
            {self.stream: ">"},
            count=count,
            block=block_ms,
        )
        results = []
        for stream_name, entries in messages:
            for msg_id, fields in entries:
                results.append({
                    "id": msg_id,
                    "workflow_id": fields[b"workflow_id"].decode(),
                    "step_name": fields[b"step_name"].decode(),
                    "payload": json.loads(fields[b"payload"]),
                })
        return results

    async def acknowledge(self, message_id: str):
        await self.redis.xack(self.stream, self.group, message_id)
```

## The Orchestrator Engine

```python
import importlib

class Orchestrator:
    def __init__(self, queue: TaskQueue, store: WorkflowStore):
        self.queue = queue
        self.store = store
        self.state_machine = WorkflowStateMachine()
        self.handlers: dict[str, callable] = {}

    def register_handler(self, name: str, handler: callable):
        self.handlers[name] = handler

    async def start_workflow(self, workflow: Workflow) -> str:
        self.state_machine.transition(workflow, WorkflowStatus.RUNNING)
        await self.store.save(workflow)

        # Enqueue the first pending step
        for step in workflow.steps:
            if step.status == StepStatus.PENDING:
                await self.queue.enqueue(
                    workflow.id, step.name, workflow.context
                )
                break
        return workflow.id

    async def process_step_result(
        self, workflow_id: str, step_name: str, result: Any
    ):
        workflow = await self.store.load(workflow_id)
        current_step = next(
            s for s in workflow.steps if s.name == step_name
        )
        current_step.status = StepStatus.COMPLETED
        current_step.result = result
        current_step.completed_at = datetime.utcnow()

        # Add result to context for downstream steps
        workflow.context[f"{step_name}_result"] = result

        # Find and enqueue next pending step
        next_step = next(
            (s for s in workflow.steps if s.status == StepStatus.PENDING),
            None,
        )
        if next_step:
            await self.queue.enqueue(
                workflow.id, next_step.name, workflow.context
            )
        else:
            self.state_machine.transition(
                workflow, WorkflowStatus.COMPLETED
            )

        await self.store.save(workflow)
```

## Monitoring Hooks

Add observability from day one. Emit structured events that can feed dashboards and alerts.

```python
import logging
import time

logger = logging.getLogger("orchestrator")

class MonitoredOrchestrator(Orchestrator):
    async def process_step_result(self, workflow_id, step_name, result):
        start = time.monotonic()
        await super().process_step_result(workflow_id, step_name, result)
        duration = time.monotonic() - start

        logger.info(
            "step_completed",
            extra={
                "workflow_id": workflow_id,
                "step_name": step_name,
                "duration_ms": round(duration * 1000, 2),
                "result_size": len(str(result)),
            },
        )
```

## FAQ

### How do I decide between building custom and using Temporal?

Start with Temporal or another off-the-shelf tool. Build custom only if you have confirmed that the existing tool cannot meet a specific requirement — latency, routing logic, multi-tenancy, or integration constraints. Most teams overestimate the need for custom orchestration and underestimate the maintenance cost.

### What is the biggest risk with custom orchestrators?

Incomplete failure handling. Production orchestrators must handle worker crashes, partial failures, poison messages, timeout recovery, and state corruption. Off-the-shelf tools have years of hardening around these edge cases. Budget significant testing effort for failure scenarios if you build custom.

### How do I migrate from a custom orchestrator to Temporal later?

Design your step handlers as pure functions that take inputs and return outputs without referencing the orchestrator directly. This makes them portable. The orchestration logic (step sequencing, retry policies, state transitions) is what changes when you migrate — the actual work functions stay the same.

---

#AgentOrchestration #CustomArchitecture #StateMachine #Python #SystemDesign #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/building-custom-agent-orchestrator-when-off-shelf-not-enough
