---
title: "Building an Agent Orchestration Dashboard: Visualizing Workflow Status and Performance"
description: "Learn how to build a real-time orchestration dashboard for AI agent workflows. Covers UI components, status tracking, timeline views, error drill-down, and the backend API that powers it all."
canonical: https://callsphere.ai/blog/building-agent-orchestration-dashboard-visualizing-workflow-status
category: "Learn Agentic AI"
tags: ["Dashboard", "Visualization", "Agent Orchestration", "FastAPI", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-07T04:33:51.266Z
---

# Building an Agent Orchestration Dashboard: Visualizing Workflow Status and Performance

> Learn how to build a real-time orchestration dashboard for AI agent workflows. Covers UI components, status tracking, timeline views, error drill-down, and the backend API that powers it all.

## Why Build a Custom Dashboard

Off-the-shelf orchestration platforms include their own dashboards, but custom dashboards serve a different purpose. They surface **domain-specific insights** — agent quality scores, LLM cost breakdowns, model performance comparisons — that generic workflow UIs do not provide.

A well-designed orchestration dashboard answers three questions at a glance: **What is running right now?** **What failed recently and why?** **How much is this costing?**

## Dashboard API with FastAPI

The backend API provides endpoints for the dashboard frontend to consume. Start with the data models and core endpoints.

```mermaid
flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway
auth plus rate limit"]
    APP["FastAPI app
handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer
business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
```

```python
from fastapi import FastAPI, Query
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional

app = FastAPI(title="Agent Orchestration Dashboard API")

class WorkflowSummary(BaseModel):
    id: str
    name: str
    status: str
    started_at: datetime
    duration_seconds: float | None
    step_count: int
    current_step: str | None
    error: str | None
    total_tokens: int
    cost_usd: float

class DashboardOverview(BaseModel):
    active_workflows: int
    completed_today: int
    failed_today: int
    success_rate: float
    total_cost_today_usd: float
    total_tokens_today: int
    avg_duration_seconds: float

@app.get("/api/dashboard/overview")
async def get_overview(
    hours: int = Query(default=24, ge=1, le=168),
) -> DashboardOverview:
    """Get high-level dashboard statistics."""
    since = datetime.utcnow() - timedelta(hours=hours)

    active = await workflow_store.count(status="running")
    completed = await workflow_store.count(
        status="completed", since=since
    )
    failed = await workflow_store.count(
        status="failed", since=since
    )
    total = completed + failed
    success_rate = completed / total if total > 0 else 1.0

    cost_data = await metrics_store.sum_cost(since=since)
    token_data = await metrics_store.sum_tokens(since=since)
    avg_duration = await metrics_store.avg_duration(since=since)

    return DashboardOverview(
        active_workflows=active,
        completed_today=completed,
        failed_today=failed,
        success_rate=round(success_rate, 4),
        total_cost_today_usd=round(cost_data, 2),
        total_tokens_today=token_data,
        avg_duration_seconds=round(avg_duration, 2),
    )
```

## Workflow List and Filtering

```python
class WorkflowFilter(BaseModel):
    status: Optional[str] = None
    name: Optional[str] = None
    since: Optional[datetime] = None
    until: Optional[datetime] = None
    min_cost_usd: Optional[float] = None
    has_errors: Optional[bool] = None

@app.get("/api/dashboard/workflows")
async def list_workflows(
    status: Optional[str] = None,
    name: Optional[str] = None,
    hours: int = Query(default=24, ge=1, le=168),
    limit: int = Query(default=50, ge=1, le=200),
    offset: int = Query(default=0, ge=0),
) -> dict:
    """List workflows with filtering and pagination."""
    since = datetime.utcnow() - timedelta(hours=hours)

    workflows = await workflow_store.query(
        status=status,
        name=name,
        since=since,
        limit=limit,
        offset=offset,
    )

    total = await workflow_store.count(
        status=status, name=name, since=since
    )

    return {
        "workflows": [
            WorkflowSummary(
                id=wf.id,
                name=wf.name,
                status=wf.status,
                started_at=wf.created_at,
                duration_seconds=wf.duration_seconds,
                step_count=len(wf.steps),
                current_step=_get_current_step(wf),
                error=_get_last_error(wf),
                total_tokens=wf.metrics.total_tokens,
                cost_usd=wf.metrics.total_cost_usd,
            )
            for wf in workflows
        ],
        "total": total,
        "limit": limit,
        "offset": offset,
    }

def _get_current_step(wf) -> str | None:
    running = [s for s in wf.steps if s.status == "running"]
    return running[0].name if running else None

def _get_last_error(wf) -> str | None:
    failed = [s for s in wf.steps if s.status == "failed"]
    return failed[-1].error if failed else None
```

## Timeline View API

The timeline view shows each step's duration and status as a horizontal bar chart, making it easy to spot bottlenecks.

```python
class TimelineStep(BaseModel):
    name: str
    status: str
    started_at: datetime | None
    completed_at: datetime | None
    duration_ms: float | None
    attempts: int
    error: str | None
    llm_calls: int
    tokens_used: int

@app.get("/api/dashboard/workflows/{workflow_id}/timeline")
async def get_workflow_timeline(workflow_id: str) -> dict:
    """Get detailed timeline for a specific workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    steps = []
    for step in wf.steps:
        llm_calls_for_step = [
            call for call in wf.metrics.llm_calls
            if call.get("step_name") == step.name
        ]
        steps.append(TimelineStep(
            name=step.name,
            status=step.status,
            started_at=step.started_at,
            completed_at=step.completed_at,
            duration_ms=(
                (step.completed_at - step.started_at).total_seconds()
                * 1000
                if step.started_at and step.completed_at
                else None
            ),
            attempts=step.attempts,
            error=step.error,
            llm_calls=len(llm_calls_for_step),
            tokens_used=sum(
                c.get("input_tokens", 0) + c.get("output_tokens", 0)
                for c in llm_calls_for_step
            ),
        ))

    return {
        "workflow_id": wf.id,
        "workflow_name": wf.name,
        "total_duration_ms": (
            (wf.updated_at - wf.created_at).total_seconds() * 1000
        ),
        "steps": steps,
    }
```

## Error Drill-Down API

When a workflow fails, the dashboard needs to show exactly what went wrong at each level.

```python
class ErrorDetail(BaseModel):
    workflow_id: str
    step_name: str
    error_type: str
    error_message: str
    stack_trace: str | None
    attempt_number: int
    timestamp: datetime
    context_snapshot: dict

@app.get("/api/dashboard/workflows/{workflow_id}/errors")
async def get_workflow_errors(workflow_id: str) -> list[ErrorDetail]:
    """Get detailed error information for a workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    errors = []
    for step in wf.steps:
        if step.error:
            errors.append(ErrorDetail(
                workflow_id=wf.id,
                step_name=step.name,
                error_type=step.error_type or "Unknown",
                error_message=step.error,
                stack_trace=step.stack_trace,
                attempt_number=step.attempts,
                timestamp=step.completed_at or wf.updated_at,
                context_snapshot={
                    k: str(v)[:200]  # Truncate large values
                    for k, v in wf.context.items()
                    if not k.startswith("_")
                },
            ))

    return errors
```

## Cost Breakdown API

```python
class CostBreakdown(BaseModel):
    model: str
    call_count: int
    total_tokens: int
    input_tokens: int
    output_tokens: int
    total_cost_usd: float

@app.get("/api/dashboard/costs")
async def get_cost_breakdown(
    hours: int = Query(default=24, ge=1, le=168),
    group_by: str = Query(default="model", enum=["model", "workflow", "step"]),
) -> dict:
    """Get cost breakdown by model, workflow, or step."""
    since = datetime.utcnow() - timedelta(hours=hours)

    raw_data = await metrics_store.get_llm_costs(
        since=since, group_by=group_by
    )

    breakdowns = [
        CostBreakdown(
            model=row["group_key"],
            call_count=row["call_count"],
            total_tokens=row["total_tokens"],
            input_tokens=row["input_tokens"],
            output_tokens=row["output_tokens"],
            total_cost_usd=round(row["total_cost"], 4),
        )
        for row in raw_data
    ]

    return {
        "period_hours": hours,
        "breakdowns": breakdowns,
        "total_cost_usd": round(
            sum(b.total_cost_usd for b in breakdowns), 2
        ),
    }
```

## Real-Time Updates with WebSocket

For live dashboard updates, stream workflow state changes over WebSocket.

```python
from fastapi import WebSocket, WebSocketDisconnect
import json

class DashboardBroadcaster:
    def __init__(self):
        self.connections: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.connections.remove(ws)

    async def broadcast(self, event: dict):
        dead = []
        for ws in self.connections:
            try:
                await ws.send_json(event)
            except Exception:
                dead.append(ws)
        for ws in dead:
            self.connections.remove(ws)

broadcaster = DashboardBroadcaster()

@app.websocket("/ws/dashboard")
async def dashboard_ws(ws: WebSocket):
    await broadcaster.connect(ws)
    try:
        while True:
            await ws.receive_text()  # Keep connection alive
    except WebSocketDisconnect:
        broadcaster.disconnect(ws)

# Call this from your orchestrator when state changes
async def on_workflow_event(event_type: str, workflow_id: str, data: dict):
    await broadcaster.broadcast({
        "type": event_type,
        "workflow_id": workflow_id,
        "timestamp": datetime.utcnow().isoformat(),
        "data": data,
    })
```

## FAQ

### What refresh rate should the dashboard use?

For the overview panel, poll every 5-10 seconds. For individual workflow detail views, use WebSocket connections for real-time updates. Avoid polling faster than every 2 seconds — it creates unnecessary database load and the human eye cannot perceive changes faster than that anyway.

### Should I build the dashboard frontend in React or use Grafana?

Use Grafana for metrics-heavy dashboards (latency percentiles, throughput charts, cost trends) since it integrates natively with Prometheus and requires no frontend code. Build a custom React or Next.js dashboard for workflow-specific views like timelines, step drill-downs, and action buttons (retry, cancel, pause) that Grafana cannot provide. Many teams use both.

### How do I handle dashboard performance with thousands of workflows?

Implement server-side pagination, filtering, and aggregation. Never load all workflows into the frontend. Use database indexes on status, created_at, and workflow_name columns. For the overview metrics, pre-compute aggregates on a schedule rather than computing them on every request. A materialized view or Redis cache updated every 30 seconds works well for dashboard summary statistics.

---

#Dashboard #Visualization #AgentOrchestration #FastAPI #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/building-agent-orchestration-dashboard-visualizing-workflow-status
