---
title: "Building a Real-Time AI Dashboard: Live Metrics, Streaming Logs, and Agent Status"
description: "Build a production-grade real-time dashboard for monitoring AI agents, featuring live metrics pipelines, streaming log aggregation, agent health indicators, and efficient frontend rendering with React."
canonical: https://callsphere.ai/blog/building-real-time-ai-dashboard-live-metrics-streaming-logs-agent-status
category: "Learn Agentic AI"
tags: ["Dashboard", "Real-Time AI", "Monitoring", "React", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:43.354Z
---

# Building a Real-Time AI Dashboard: Live Metrics, Streaming Logs, and Agent Status

> Build a production-grade real-time dashboard for monitoring AI agents, featuring live metrics pipelines, streaming log aggregation, agent health indicators, and efficient frontend rendering with React.

## Why AI Agents Need Real-Time Dashboards

Monitoring AI agents in production requires more than traditional APM tools. You need to see token throughput, model latency percentiles, tool call success rates, agent reasoning traces, and cost accumulation — all updating in real time. A well-built dashboard transforms a black-box AI system into an observable one where you can spot degradation before users notice.

The architecture follows three layers: a metrics collection backend that aggregates data from running agents, a streaming transport layer that pushes updates to the browser, and a frontend that renders efficiently without choking on high-frequency updates.

## Backend: Metrics Collection and Aggregation

Start by instrumenting your agents to emit structured events. Each event carries a timestamp, agent ID, event type, and a payload with type-specific data.

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
import asyncio
import time
import json
from dataclasses import dataclass, asdict
from typing import Optional
from collections import defaultdict, deque

@dataclass
class AgentMetricEvent:
    agent_id: str
    event_type: str  # "token", "tool_call", "error", "completion"
    timestamp: float
    payload: dict

class MetricsAggregator:
    def __init__(self, window_seconds: int = 60):
        self.window = window_seconds
        self.events: deque[AgentMetricEvent] = deque()
        self.subscribers: list[asyncio.Queue] = []

    def record(self, event: AgentMetricEvent):
        self.events.append(event)
        self._prune_old_events()

        snapshot = self._compute_snapshot()
        for queue in self.subscribers:
            try:
                queue.put_nowait(snapshot)
            except asyncio.QueueFull:
                pass  # Drop if subscriber is slow

    def _prune_old_events(self):
        cutoff = time.time() - self.window
        while self.events and self.events[0].timestamp  dict:
        now = time.time()
        recent = [e for e in self.events if e.timestamp > now - self.window]

        tokens = [e for e in recent if e.event_type == "token"]
        tool_calls = [e for e in recent if e.event_type == "tool_call"]
        errors = [e for e in recent if e.event_type == "error"]
        completions = [e for e in recent if e.event_type == "completion"]

        latencies = [
            e.payload.get("latency_ms", 0) for e in completions
        ]
        latencies.sort()

        return {
            "timestamp": now,
            "tokens_per_second": len(tokens) / max(self.window, 1),
            "tool_calls_total": len(tool_calls),
            "error_rate": len(errors) / max(len(recent), 1),
            "completions": len(completions),
            "p50_latency_ms": latencies[len(latencies) // 2] if latencies else 0,
            "p99_latency_ms": latencies[int(len(latencies) * 0.99)] if latencies else 0,
            "active_agents": len(set(e.agent_id for e in recent)),
        }

    def subscribe(self) -> asyncio.Queue:
        queue = asyncio.Queue(maxsize=100)
        self.subscribers.append(queue)
        return queue

    def unsubscribe(self, queue: asyncio.Queue):
        self.subscribers.remove(queue)

aggregator = MetricsAggregator(window_seconds=60)
```

The aggregator uses a sliding window deque for memory efficiency. Old events are pruned on each insertion, keeping memory usage bounded. Subscribers receive computed snapshots rather than raw events, reducing frontend processing load.

## Streaming Transport with SSE

For a monitoring dashboard, SSE is the right transport — the data flows one direction (server to browser), and we get automatic reconnection for free.

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def metrics_stream():
    queue = aggregator.subscribe()
    try:
        while True:
            snapshot = await queue.get()
            data = json.dumps(snapshot)
            yield f"event: metrics\ndata: {data}\n\n"
    finally:
        aggregator.unsubscribe(queue)

@app.get("/api/dashboard/stream")
async def dashboard_stream():
    return StreamingResponse(
        metrics_stream(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )
```

## Streaming Logs Endpoint

Agent logs need their own stream. Structured log events let the frontend filter and highlight based on severity or agent ID.

```python
from collections import deque

log_buffer: deque[dict] = deque(maxlen=1000)
log_subscribers: list[asyncio.Queue] = []

def emit_agent_log(agent_id: str, level: str, message: str, metadata: dict = None):
    entry = {
        "timestamp": time.time(),
        "agent_id": agent_id,
        "level": level,
        "message": message,
        "metadata": metadata or {},
    }
    log_buffer.append(entry)
    for q in log_subscribers:
        try:
            q.put_nowait(entry)
        except asyncio.QueueFull:
            pass

async def log_stream():
    queue = asyncio.Queue(maxsize=200)
    log_subscribers.append(queue)
    try:
        # Send recent history first
        for entry in log_buffer:
            yield f"event: log\ndata: {json.dumps(entry)}\n\n"
        # Then stream new entries
        while True:
            entry = await queue.get()
            yield f"event: log\ndata: {json.dumps(entry)}\n\n"
    finally:
        log_subscribers.remove(queue)
```

Sending the recent buffer on connection lets newly opened dashboards see immediate context instead of staring at a blank screen.

## Frontend: Efficient React Rendering

High-frequency updates can overwhelm React if every SSE event triggers a re-render. Batch updates and use `requestAnimationFrame` to align rendering with the browser's paint cycle.

```typescript
import { useState, useEffect, useRef, useCallback } from "react";

interface DashboardMetrics {
  tokens_per_second: number;
  error_rate: number;
  p50_latency_ms: number;
  p99_latency_ms: number;
  active_agents: number;
}

function useMetricsStream(url: string): DashboardMetrics | null {
  const [metrics, setMetrics] = useState(null);
  const latestRef = useRef(null);
  const rafRef = useRef(0);

  const scheduleUpdate = useCallback(() => {
    if (rafRef.current) return;
    rafRef.current = requestAnimationFrame(() => {
      rafRef.current = 0;
      if (latestRef.current) {
        setMetrics({ ...latestRef.current });
      }
    });
  }, []);

  useEffect(() => {
    const source = new EventSource(url);
    source.addEventListener("metrics", (event) => {
      latestRef.current = JSON.parse(event.data);
      scheduleUpdate();
    });
    return () => {
      source.close();
      if (rafRef.current) cancelAnimationFrame(rafRef.current);
    };
  }, [url, scheduleUpdate]);

  return metrics;
}
```

This hook stores the latest event in a ref (no re-render) and schedules a single state update per animation frame. Even if the server sends 30 events per second, React only re-renders at the display refresh rate.

## FAQ

### How do you handle dashboard access when there are hundreds of agents producing metrics?

Use server-side aggregation to pre-compute summary statistics rather than pushing raw events to the browser. The `MetricsAggregator` pattern shown above computes totals and percentiles server-side, so the browser receives one compact snapshot per update regardless of how many agents are running. For drill-down views, let the user select specific agents and open filtered streams that only include events from those agents.

### What happens if the metrics aggregator crashes and loses in-memory data?

For production systems, persist metrics to a time-series database like TimescaleDB or InfluxDB alongside the in-memory aggregator. The in-memory layer serves real-time streaming, while the database provides historical data for trend analysis and post-incident investigation. On restart, the aggregator begins with an empty window and fills naturally within one window period (typically 60 seconds).

### How do you test a real-time dashboard during development without running actual AI agents?

Build a metrics simulator that generates realistic event patterns — bursts of token events, periodic tool calls, occasional errors, and varying latency distributions. Run the simulator as a script that calls the same `aggregator.record()` method your real agents use. This lets you test the full pipeline including edge cases like error rate spikes and latency degradation without consuming API credits.

---

#Dashboard #RealTimeAI #Monitoring #React #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/building-real-time-ai-dashboard-live-metrics-streaming-logs-agent-status
