Skip to content
Learn Agentic AI
Learn Agentic AI14 min read10 views

Building an Agent Orchestration Dashboard: Visualizing Workflow Status and Performance

Learn how to build a real-time orchestration dashboard for AI agent workflows. Covers UI components, status tracking, timeline views, error drill-down, and the backend API that powers it all.

Why Build a Custom Dashboard

Off-the-shelf orchestration platforms include their own dashboards, but custom dashboards serve a different purpose. They surface domain-specific insights — agent quality scores, LLM cost breakdowns, model performance comparisons — that generic workflow UIs do not provide.

A well-designed orchestration dashboard answers three questions at a glance: What is running right now? What failed recently and why? How much is this costing?

Dashboard API with FastAPI

The backend API provides endpoints for the dashboard frontend to consume. Start with the data models and core endpoints.

flowchart TD
    START["Building an Agent Orchestration Dashboard: Visual…"] --> A
    A["Why Build a Custom Dashboard"]
    A --> B
    B["Dashboard API with FastAPI"]
    B --> C
    C["Workflow List and Filtering"]
    C --> D
    D["Timeline View API"]
    D --> E
    E["Error Drill-Down API"]
    E --> F
    F["Cost Breakdown API"]
    F --> G
    G["Real-Time Updates with WebSocket"]
    G --> H
    H["FAQ"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from fastapi import FastAPI, Query
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional

app = FastAPI(title="Agent Orchestration Dashboard API")

class WorkflowSummary(BaseModel):
    id: str
    name: str
    status: str
    started_at: datetime
    duration_seconds: float | None
    step_count: int
    current_step: str | None
    error: str | None
    total_tokens: int
    cost_usd: float

class DashboardOverview(BaseModel):
    active_workflows: int
    completed_today: int
    failed_today: int
    success_rate: float
    total_cost_today_usd: float
    total_tokens_today: int
    avg_duration_seconds: float

@app.get("/api/dashboard/overview")
async def get_overview(
    hours: int = Query(default=24, ge=1, le=168),
) -> DashboardOverview:
    """Get high-level dashboard statistics."""
    since = datetime.utcnow() - timedelta(hours=hours)

    active = await workflow_store.count(status="running")
    completed = await workflow_store.count(
        status="completed", since=since
    )
    failed = await workflow_store.count(
        status="failed", since=since
    )
    total = completed + failed
    success_rate = completed / total if total > 0 else 1.0

    cost_data = await metrics_store.sum_cost(since=since)
    token_data = await metrics_store.sum_tokens(since=since)
    avg_duration = await metrics_store.avg_duration(since=since)

    return DashboardOverview(
        active_workflows=active,
        completed_today=completed,
        failed_today=failed,
        success_rate=round(success_rate, 4),
        total_cost_today_usd=round(cost_data, 2),
        total_tokens_today=token_data,
        avg_duration_seconds=round(avg_duration, 2),
    )

Workflow List and Filtering

class WorkflowFilter(BaseModel):
    status: Optional[str] = None
    name: Optional[str] = None
    since: Optional[datetime] = None
    until: Optional[datetime] = None
    min_cost_usd: Optional[float] = None
    has_errors: Optional[bool] = None

@app.get("/api/dashboard/workflows")
async def list_workflows(
    status: Optional[str] = None,
    name: Optional[str] = None,
    hours: int = Query(default=24, ge=1, le=168),
    limit: int = Query(default=50, ge=1, le=200),
    offset: int = Query(default=0, ge=0),
) -> dict:
    """List workflows with filtering and pagination."""
    since = datetime.utcnow() - timedelta(hours=hours)

    workflows = await workflow_store.query(
        status=status,
        name=name,
        since=since,
        limit=limit,
        offset=offset,
    )

    total = await workflow_store.count(
        status=status, name=name, since=since
    )

    return {
        "workflows": [
            WorkflowSummary(
                id=wf.id,
                name=wf.name,
                status=wf.status,
                started_at=wf.created_at,
                duration_seconds=wf.duration_seconds,
                step_count=len(wf.steps),
                current_step=_get_current_step(wf),
                error=_get_last_error(wf),
                total_tokens=wf.metrics.total_tokens,
                cost_usd=wf.metrics.total_cost_usd,
            )
            for wf in workflows
        ],
        "total": total,
        "limit": limit,
        "offset": offset,
    }

def _get_current_step(wf) -> str | None:
    running = [s for s in wf.steps if s.status == "running"]
    return running[0].name if running else None

def _get_last_error(wf) -> str | None:
    failed = [s for s in wf.steps if s.status == "failed"]
    return failed[-1].error if failed else None

Timeline View API

The timeline view shows each step's duration and status as a horizontal bar chart, making it easy to spot bottlenecks.

class TimelineStep(BaseModel):
    name: str
    status: str
    started_at: datetime | None
    completed_at: datetime | None
    duration_ms: float | None
    attempts: int
    error: str | None
    llm_calls: int
    tokens_used: int

@app.get("/api/dashboard/workflows/{workflow_id}/timeline")
async def get_workflow_timeline(workflow_id: str) -> dict:
    """Get detailed timeline for a specific workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    steps = []
    for step in wf.steps:
        llm_calls_for_step = [
            call for call in wf.metrics.llm_calls
            if call.get("step_name") == step.name
        ]
        steps.append(TimelineStep(
            name=step.name,
            status=step.status,
            started_at=step.started_at,
            completed_at=step.completed_at,
            duration_ms=(
                (step.completed_at - step.started_at).total_seconds()
                * 1000
                if step.started_at and step.completed_at
                else None
            ),
            attempts=step.attempts,
            error=step.error,
            llm_calls=len(llm_calls_for_step),
            tokens_used=sum(
                c.get("input_tokens", 0) + c.get("output_tokens", 0)
                for c in llm_calls_for_step
            ),
        ))

    return {
        "workflow_id": wf.id,
        "workflow_name": wf.name,
        "total_duration_ms": (
            (wf.updated_at - wf.created_at).total_seconds() * 1000
        ),
        "steps": steps,
    }

Error Drill-Down API

When a workflow fails, the dashboard needs to show exactly what went wrong at each level.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class ErrorDetail(BaseModel):
    workflow_id: str
    step_name: str
    error_type: str
    error_message: str
    stack_trace: str | None
    attempt_number: int
    timestamp: datetime
    context_snapshot: dict

@app.get("/api/dashboard/workflows/{workflow_id}/errors")
async def get_workflow_errors(workflow_id: str) -> list[ErrorDetail]:
    """Get detailed error information for a workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    errors = []
    for step in wf.steps:
        if step.error:
            errors.append(ErrorDetail(
                workflow_id=wf.id,
                step_name=step.name,
                error_type=step.error_type or "Unknown",
                error_message=step.error,
                stack_trace=step.stack_trace,
                attempt_number=step.attempts,
                timestamp=step.completed_at or wf.updated_at,
                context_snapshot={
                    k: str(v)[:200]  # Truncate large values
                    for k, v in wf.context.items()
                    if not k.startswith("_")
                },
            ))

    return errors

Cost Breakdown API

class CostBreakdown(BaseModel):
    model: str
    call_count: int
    total_tokens: int
    input_tokens: int
    output_tokens: int
    total_cost_usd: float

@app.get("/api/dashboard/costs")
async def get_cost_breakdown(
    hours: int = Query(default=24, ge=1, le=168),
    group_by: str = Query(default="model", enum=["model", "workflow", "step"]),
) -> dict:
    """Get cost breakdown by model, workflow, or step."""
    since = datetime.utcnow() - timedelta(hours=hours)

    raw_data = await metrics_store.get_llm_costs(
        since=since, group_by=group_by
    )

    breakdowns = [
        CostBreakdown(
            model=row["group_key"],
            call_count=row["call_count"],
            total_tokens=row["total_tokens"],
            input_tokens=row["input_tokens"],
            output_tokens=row["output_tokens"],
            total_cost_usd=round(row["total_cost"], 4),
        )
        for row in raw_data
    ]

    return {
        "period_hours": hours,
        "breakdowns": breakdowns,
        "total_cost_usd": round(
            sum(b.total_cost_usd for b in breakdowns), 2
        ),
    }

Real-Time Updates with WebSocket

For live dashboard updates, stream workflow state changes over WebSocket.

from fastapi import WebSocket, WebSocketDisconnect
import json

class DashboardBroadcaster:
    def __init__(self):
        self.connections: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.connections.remove(ws)

    async def broadcast(self, event: dict):
        dead = []
        for ws in self.connections:
            try:
                await ws.send_json(event)
            except Exception:
                dead.append(ws)
        for ws in dead:
            self.connections.remove(ws)

broadcaster = DashboardBroadcaster()

@app.websocket("/ws/dashboard")
async def dashboard_ws(ws: WebSocket):
    await broadcaster.connect(ws)
    try:
        while True:
            await ws.receive_text()  # Keep connection alive
    except WebSocketDisconnect:
        broadcaster.disconnect(ws)

# Call this from your orchestrator when state changes
async def on_workflow_event(event_type: str, workflow_id: str, data: dict):
    await broadcaster.broadcast({
        "type": event_type,
        "workflow_id": workflow_id,
        "timestamp": datetime.utcnow().isoformat(),
        "data": data,
    })

FAQ

What refresh rate should the dashboard use?

For the overview panel, poll every 5-10 seconds. For individual workflow detail views, use WebSocket connections for real-time updates. Avoid polling faster than every 2 seconds — it creates unnecessary database load and the human eye cannot perceive changes faster than that anyway.

Should I build the dashboard frontend in React or use Grafana?

Use Grafana for metrics-heavy dashboards (latency percentiles, throughput charts, cost trends) since it integrates natively with Prometheus and requires no frontend code. Build a custom React or Next.js dashboard for workflow-specific views like timelines, step drill-downs, and action buttons (retry, cancel, pause) that Grafana cannot provide. Many teams use both.

How do I handle dashboard performance with thousands of workflows?

Implement server-side pagination, filtering, and aggregation. Never load all workflows into the frontend. Use database indexes on status, created_at, and workflow_name columns. For the overview metrics, pre-compute aggregates on a schedule rather than computing them on every request. A materialized view or Redis cache updated every 30 seconds works well for dashboard summary statistics.


#Dashboard #Visualization #AgentOrchestration #FastAPI #Python #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Business

Call Analytics and Agent Performance Dashboard Guide

Build a high-impact call analytics dashboard that tracks agent performance, call quality, and customer outcomes with actionable KPIs and benchmarks.

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

Building a Research Agent with Web Search and Report Generation: Complete Tutorial

Build a research agent that searches the web, extracts and synthesizes data, and generates formatted reports using OpenAI Agents SDK and web search tools.

Learn Agentic AI

OpenAI Agents SDK in 2026: Building Multi-Agent Systems with Handoffs and Guardrails

Complete tutorial on the OpenAI Agents SDK covering agent creation, tool definitions, handoff patterns between specialist agents, and input/output guardrails for safe AI systems.

Learn Agentic AI

LangGraph Agent Patterns 2026: Building Stateful Multi-Step AI Workflows

Complete LangGraph tutorial covering state machines for agents, conditional edges, human-in-the-loop patterns, checkpointing, and parallel execution with full code examples.