Skip to content
Learn Agentic AI
Learn Agentic AI15 min read2 views

Building a Custom Agent Orchestrator: When Off-the-Shelf Tools Are Not Enough

Learn when and how to build a custom agent orchestrator. Covers state machine design, queue integration, monitoring hooks, and the architectural patterns that make custom orchestrators maintainable.

When to Build Custom

Off-the-shelf orchestration platforms like Temporal, Prefect, and Airflow solve 80% of workflow needs. But AI agent systems sometimes hit the other 20%:

  • Sub-second latency requirements that batch-oriented schedulers cannot meet
  • Custom LLM routing logic that needs to inspect token counts, model availability, and cost in real time
  • Tight integration with existing infrastructure that would require fighting an orchestration framework's opinions
  • Specialized retry semantics — for example, retrying with a different model when one returns low-confidence results
  • Multi-tenant isolation requirements that off-the-shelf tools do not support natively

If your agent system has any of these constraints, a custom orchestrator may be the right choice. The key is to build it with clear boundaries so it remains maintainable.

Core Architecture

A custom orchestrator has four components: a state machine that tracks workflow progress, a task queue that distributes work, a worker pool that executes tasks, and a persistence layer that stores state.

flowchart TD
    START["Building a Custom Agent Orchestrator: When Off-th…"] --> A
    A["When to Build Custom"]
    A --> B
    B["Core Architecture"]
    B --> C
    C["The State Machine"]
    C --> D
    D["Queue Integration"]
    D --> E
    E["The Orchestrator Engine"]
    E --> F
    F["Monitoring Hooks"]
    F --> G
    G["FAQ"]
    G --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

class WorkflowStatus(Enum):
    CREATED = "created"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"

@dataclass
class Step:
    name: str
    handler: str  # Dotted path to the handler function
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: str | None = None
    attempts: int = 0
    max_retries: int = 3
    started_at: datetime | None = None
    completed_at: datetime | None = None

@dataclass
class Workflow:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    status: WorkflowStatus = WorkflowStatus.CREATED
    steps: list[Step] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)

The State Machine

The state machine enforces valid transitions and prevents workflows from entering inconsistent states.

class WorkflowStateMachine:
    VALID_TRANSITIONS = {
        WorkflowStatus.CREATED: {WorkflowStatus.RUNNING},
        WorkflowStatus.RUNNING: {
            WorkflowStatus.COMPLETED,
            WorkflowStatus.FAILED,
            WorkflowStatus.PAUSED,
        },
        WorkflowStatus.PAUSED: {WorkflowStatus.RUNNING, WorkflowStatus.FAILED},
        WorkflowStatus.FAILED: {WorkflowStatus.RUNNING},  # Allow restart
    }

    def transition(self, workflow: Workflow, new_status: WorkflowStatus):
        allowed = self.VALID_TRANSITIONS.get(workflow.status, set())
        if new_status not in allowed:
            raise ValueError(
                f"Cannot transition from {workflow.status} to {new_status}"
            )
        workflow.status = new_status
        workflow.updated_at = datetime.utcnow()

Queue Integration

Use Redis Streams or a similar lightweight queue to distribute work to workers.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import redis.asyncio as redis
import json

class TaskQueue:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.stream = "agent:tasks"
        self.group = "agent-workers"

    async def initialize(self):
        try:
            await self.redis.xgroup_create(
                self.stream, self.group, mkstream=True
            )
        except redis.ResponseError:
            pass  # Group already exists

    async def enqueue(self, workflow_id: str, step_name: str, payload: dict):
        await self.redis.xadd(
            self.stream,
            {
                "workflow_id": workflow_id,
                "step_name": step_name,
                "payload": json.dumps(payload),
            },
        )

    async def dequeue(self, consumer: str, count: int = 1, block_ms: int = 5000):
        messages = await self.redis.xreadgroup(
            self.group,
            consumer,
            {self.stream: ">"},
            count=count,
            block=block_ms,
        )
        results = []
        for stream_name, entries in messages:
            for msg_id, fields in entries:
                results.append({
                    "id": msg_id,
                    "workflow_id": fields[b"workflow_id"].decode(),
                    "step_name": fields[b"step_name"].decode(),
                    "payload": json.loads(fields[b"payload"]),
                })
        return results

    async def acknowledge(self, message_id: str):
        await self.redis.xack(self.stream, self.group, message_id)

The Orchestrator Engine

import importlib

class Orchestrator:
    def __init__(self, queue: TaskQueue, store: WorkflowStore):
        self.queue = queue
        self.store = store
        self.state_machine = WorkflowStateMachine()
        self.handlers: dict[str, callable] = {}

    def register_handler(self, name: str, handler: callable):
        self.handlers[name] = handler

    async def start_workflow(self, workflow: Workflow) -> str:
        self.state_machine.transition(workflow, WorkflowStatus.RUNNING)
        await self.store.save(workflow)

        # Enqueue the first pending step
        for step in workflow.steps:
            if step.status == StepStatus.PENDING:
                await self.queue.enqueue(
                    workflow.id, step.name, workflow.context
                )
                break
        return workflow.id

    async def process_step_result(
        self, workflow_id: str, step_name: str, result: Any
    ):
        workflow = await self.store.load(workflow_id)
        current_step = next(
            s for s in workflow.steps if s.name == step_name
        )
        current_step.status = StepStatus.COMPLETED
        current_step.result = result
        current_step.completed_at = datetime.utcnow()

        # Add result to context for downstream steps
        workflow.context[f"{step_name}_result"] = result

        # Find and enqueue next pending step
        next_step = next(
            (s for s in workflow.steps if s.status == StepStatus.PENDING),
            None,
        )
        if next_step:
            await self.queue.enqueue(
                workflow.id, next_step.name, workflow.context
            )
        else:
            self.state_machine.transition(
                workflow, WorkflowStatus.COMPLETED
            )

        await self.store.save(workflow)

Monitoring Hooks

Add observability from day one. Emit structured events that can feed dashboards and alerts.

import logging
import time

logger = logging.getLogger("orchestrator")

class MonitoredOrchestrator(Orchestrator):
    async def process_step_result(self, workflow_id, step_name, result):
        start = time.monotonic()
        await super().process_step_result(workflow_id, step_name, result)
        duration = time.monotonic() - start

        logger.info(
            "step_completed",
            extra={
                "workflow_id": workflow_id,
                "step_name": step_name,
                "duration_ms": round(duration * 1000, 2),
                "result_size": len(str(result)),
            },
        )

FAQ

How do I decide between building custom and using Temporal?

Start with Temporal or another off-the-shelf tool. Build custom only if you have confirmed that the existing tool cannot meet a specific requirement — latency, routing logic, multi-tenancy, or integration constraints. Most teams overestimate the need for custom orchestration and underestimate the maintenance cost.

What is the biggest risk with custom orchestrators?

Incomplete failure handling. Production orchestrators must handle worker crashes, partial failures, poison messages, timeout recovery, and state corruption. Off-the-shelf tools have years of hardening around these edge cases. Budget significant testing effort for failure scenarios if you build custom.

How do I migrate from a custom orchestrator to Temporal later?

Design your step handlers as pure functions that take inputs and return outputs without referencing the orchestrator directly. This makes them portable. The orchestration logic (step sequencing, retry policies, state transitions) is what changes when you migrate — the actual work functions stay the same.


#AgentOrchestration #CustomArchitecture #StateMachine #Python #SystemDesign #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Interview Prep

8 AI System Design Interview Questions Actually Asked at FAANG in 2026

Real AI system design interview questions from Google, Meta, OpenAI, and Anthropic. Covers LLM serving, RAG pipelines, recommendation systems, AI agents, and more — with detailed answer frameworks.

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

Building a Research Agent with Web Search and Report Generation: Complete Tutorial

Build a research agent that searches the web, extracts and synthesizes data, and generates formatted reports using OpenAI Agents SDK and web search tools.

Learn Agentic AI

OpenAI Agents SDK in 2026: Building Multi-Agent Systems with Handoffs and Guardrails

Complete tutorial on the OpenAI Agents SDK covering agent creation, tool definitions, handoff patterns between specialist agents, and input/output guardrails for safe AI systems.

Learn Agentic AI

LangGraph Agent Patterns 2026: Building Stateful Multi-Step AI Workflows

Complete LangGraph tutorial covering state machines for agents, conditional edges, human-in-the-loop patterns, checkpointing, and parallel execution with full code examples.