---
title: "Production Monitoring and Alerting for AI Agent Systems"
description: "Learn how to build production monitoring and alerting for AI agent systems including latency tracking, error rate dashboards, token usage analytics, alerting pipelines, and SLA enforcement."
canonical: https://callsphere.ai/blog/production-monitoring-alerting-ai-agent-systems
category: "Learn Agentic AI"
tags: ["OpenAI", "Monitoring", "Alerting", "Production", "SRE"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-05-06T01:02:41.608Z
---

# Production Monitoring and Alerting for AI Agent Systems

> Learn how to build production monitoring and alerting for AI agent systems including latency tracking, error rate dashboards, token usage analytics, alerting pipelines, and SLA enforcement.

## Why Agent Systems Need Specialized Monitoring

Traditional API monitoring tracks request latency, error rates, and throughput. Agent systems demand all of that plus dimensions that do not exist in conventional backends: token consumption per request, LLM provider availability, tool execution success rates, and multi-agent handoff reliability.

An agent that responds successfully but consumes 50,000 tokens per request will bankrupt your LLM budget before your uptime dashboard shows a single red indicator. A tool that silently returns stale data will produce confident but wrong agent responses without triggering any error-rate alert. Production monitoring for agents requires purpose-built instrumentation.

## Core Metrics to Track

Every agent monitoring system should capture these categories:

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

**Latency Metrics** — Total end-to-end response time, LLM generation latency per call, tool execution latency per tool, and time-to-first-token for streaming responses.

**Error Metrics** — LLM API error rate (rate limits, timeouts, server errors), tool execution failure rate, agent loop terminations (max_turns exceeded), and guardrail violations.

**Cost Metrics** — Input and output tokens per request, total tokens per workflow, cost per request mapped to model pricing, and cumulative daily spend.

**Quality Metrics** — Guardrail trigger rate, conversation length before resolution, and tool retry rate.

## Building a Metrics Collection Processor

The foundation is a trace processor that extracts metrics from every agent run and sends them to your metrics backend:

```python
import time
from agents.tracing import TracingProcessor, Trace, Span

class MetricsCollector(TracingProcessor):
    def __init__(self, metrics_client):
        self.metrics = metrics_client
        self._trace_start_times = {}

    def on_trace_start(self, trace: Trace) -> None:
        self._trace_start_times[trace.trace_id] = time.monotonic()

    def on_span_end(self, span: Span) -> None:
        duration_s = (span.end_time - span.start_time).total_seconds()
        labels = {
            "workflow": span.trace_name or "unknown",
            "span_type": span.span_type,
            "span_name": span.name,
        }

        # Latency histogram
        self.metrics.histogram(
            "agent.span.duration_seconds",
            duration_s,
            labels=labels,
        )

        if span.span_type == "generation":
            model = span.data.get("model", "unknown") if span.data else "unknown"
            input_tokens = span.data.get("input_tokens", 0) if span.data else 0
            output_tokens = span.data.get("output_tokens", 0) if span.data else 0

            self.metrics.histogram(
                "agent.llm.duration_seconds",
                duration_s,
                labels={**labels, "model": model},
            )
            self.metrics.counter(
                "agent.tokens.input_total",
                input_tokens,
                labels={"model": model, "workflow": labels["workflow"]},
            )
            self.metrics.counter(
                "agent.tokens.output_total",
                output_tokens,
                labels={"model": model, "workflow": labels["workflow"]},
            )

        elif span.span_type == "function":
            self.metrics.histogram(
                "agent.tool.duration_seconds",
                duration_s,
                labels={"tool": span.name, "workflow": labels["workflow"]},
            )
            # Track tool errors
            if span.data and span.data.get("error"):
                self.metrics.counter(
                    "agent.tool.errors_total",
                    1,
                    labels={"tool": span.name, "workflow": labels["workflow"]},
                )

    def on_trace_end(self, trace: Trace) -> None:
        start = self._trace_start_times.pop(trace.trace_id, None)
        if start:
            total_duration = time.monotonic() - start
            self.metrics.histogram(
                "agent.workflow.duration_seconds",
                total_duration,
                labels={"workflow": trace.name or "unknown"},
            )
        self.metrics.counter(
            "agent.workflow.completions_total",
            1,
            labels={"workflow": trace.name or "unknown"},
        )

    async def shutdown(self) -> None:
        pass
```

## Prometheus Integration

For teams using Prometheus and Grafana, here is a concrete integration using the official Python client:

```python
from prometheus_client import Histogram, Counter, Gauge

# Define Prometheus metrics
WORKFLOW_DURATION = Histogram(
    "agent_workflow_duration_seconds",
    "End-to-end agent workflow duration",
    ["workflow"],
    buckets=[0.5, 1, 2, 5, 10, 30, 60, 120],
)

LLM_DURATION = Histogram(
    "agent_llm_call_duration_seconds",
    "Individual LLM call duration",
    ["model", "workflow"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
)

TOKEN_USAGE = Counter(
    "agent_tokens_total",
    "Total tokens consumed",
    ["model", "direction", "workflow"],
)

TOOL_DURATION = Histogram(
    "agent_tool_duration_seconds",
    "Tool execution duration",
    ["tool", "workflow"],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 5, 10],
)

TOOL_ERRORS = Counter(
    "agent_tool_errors_total",
    "Tool execution failures",
    ["tool", "workflow"],
)

ACTIVE_WORKFLOWS = Gauge(
    "agent_active_workflows",
    "Currently running agent workflows",
    ["workflow"],
)
```

Wire these into the trace processor:

```python
class PrometheusTraceProcessor(TracingProcessor):
    def on_trace_start(self, trace: Trace) -> None:
        ACTIVE_WORKFLOWS.labels(workflow=trace.name or "unknown").inc()

    def on_span_end(self, span: Span) -> None:
        duration = (span.end_time - span.start_time).total_seconds()
        workflow = span.trace_name or "unknown"

        if span.span_type == "generation":
            model = span.data.get("model", "unknown") if span.data else "unknown"
            LLM_DURATION.labels(model=model, workflow=workflow).observe(duration)

            input_tokens = span.data.get("input_tokens", 0) if span.data else 0
            output_tokens = span.data.get("output_tokens", 0) if span.data else 0
            TOKEN_USAGE.labels(model=model, direction="input", workflow=workflow).inc(input_tokens)
            TOKEN_USAGE.labels(model=model, direction="output", workflow=workflow).inc(output_tokens)

        elif span.span_type == "function":
            TOOL_DURATION.labels(tool=span.name, workflow=workflow).observe(duration)
            if span.data and span.data.get("error"):
                TOOL_ERRORS.labels(tool=span.name, workflow=workflow).inc()

    def on_trace_end(self, trace: Trace) -> None:
        workflow = trace.name or "unknown"
        ACTIVE_WORKFLOWS.labels(workflow=workflow).dec()
        total = (trace.end_time - trace.start_time).total_seconds()
        WORKFLOW_DURATION.labels(workflow=workflow).observe(total)

    async def shutdown(self) -> None:
        pass
```

## Alerting Rules

Metrics without alerts are dashboards nobody watches. Here are essential alerting rules for agent systems:

```yaml
# Prometheus alerting rules (alerts.yml)
groups:
  - name: agent_alerts
    rules:
      # High latency alert
      - alert: AgentWorkflowSlowResponse
        expr: |
          histogram_quantile(0.95,
            rate(agent_workflow_duration_seconds_bucket[5m])
          ) > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Agent workflow p95 latency exceeds 30 seconds"
          description: "Workflow {{ $labels.workflow }} p95 latency is {{ $value }}s"

      # LLM API error rate
      - alert: AgentLLMHighErrorRate
        expr: |
          rate(agent_llm_errors_total[5m])
          / rate(agent_llm_calls_total[5m]) > 0.05
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "LLM error rate exceeds 5%"

      # Token budget alert
      - alert: AgentTokenBudgetExceeded
        expr: |
          sum(increase(agent_tokens_total[1h])) > 1000000
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "Agent token consumption exceeds 1M tokens per hour"

      # Tool failure rate
      - alert: AgentToolHighFailureRate
        expr: |
          rate(agent_tool_errors_total[5m])
          / rate(agent_tool_duration_seconds_count[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Tool {{ $labels.tool }} failure rate exceeds 10%"

      # Stuck workflows
      - alert: AgentWorkflowStuck
        expr: agent_active_workflows > 0
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Agent workflows stuck for over 10 minutes"
```

## Token Cost Tracking

Token usage directly translates to cost. Build a cost tracking layer on top of your token metrics:

```python
MODEL_PRICING = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "gpt-4.1": {"input": 2.00 / 1_000_000, "output": 8.00 / 1_000_000},
    "gpt-4.1-mini": {"input": 0.40 / 1_000_000, "output": 1.60 / 1_000_000},
    "gpt-4.1-nano": {"input": 0.10 / 1_000_000, "output": 0.40 / 1_000_000},
}

class CostTracker(TracingProcessor):
    def __init__(self, metrics_client):
        self.metrics = metrics_client

    def on_span_end(self, span: Span) -> None:
        if span.span_type != "generation" or not span.data:
            return

        model = span.data.get("model", "")
        pricing = MODEL_PRICING.get(model)
        if not pricing:
            return

        input_tokens = span.data.get("input_tokens", 0)
        output_tokens = span.data.get("output_tokens", 0)

        input_cost = input_tokens * pricing["input"]
        output_cost = output_tokens * pricing["output"]
        total_cost = input_cost + output_cost

        self.metrics.counter(
            "agent.cost.dollars_total",
            total_cost,
            labels={
                "model": model,
                "workflow": span.trace_name or "unknown",
                "cost_type": "total",
            },
        )

    def on_trace_end(self, trace: Trace) -> None:
        pass

    async def shutdown(self) -> None:
        pass
```

With this processor running, you can set budget alerts: "Alert me when daily spend exceeds $50" or "Alert when any single workflow costs more than $0.50 per execution."

## SLA Enforcement

Define SLAs for your agent system and enforce them programmatically:

```python
from dataclasses import dataclass

@dataclass
class WorkflowSLA:
    max_latency_seconds: float
    max_tokens_per_request: int

WORKFLOW_SLAS = {
    "customer-support": WorkflowSLA(max_latency_seconds=15.0, max_tokens_per_request=8000),
    "document-analysis": WorkflowSLA(max_latency_seconds=60.0, max_tokens_per_request=50000),
}

class SLAEnforcementProcessor(TracingProcessor):
    def __init__(self, alert_service):
        self.alert = alert_service
        self._trace_tokens = {}

    def on_trace_start(self, trace: Trace) -> None:
        self._trace_tokens[trace.trace_id] = 0

    def on_span_end(self, span: Span) -> None:
        if span.span_type == "generation" and span.data:
            self._trace_tokens[span.trace_id] = self._trace_tokens.get(span.trace_id, 0) + (
                span.data.get("input_tokens", 0) + span.data.get("output_tokens", 0)
            )

    def on_trace_end(self, trace: Trace) -> None:
        sla = WORKFLOW_SLAS.get(trace.name or "")
        tokens = self._trace_tokens.pop(trace.trace_id, 0)
        if not sla:
            return
        duration = (trace.end_time - trace.start_time).total_seconds()
        if duration > sla.max_latency_seconds:
            self.alert.send(severity="warning", message=f"SLA breach: {trace.name} {duration:.1f}s > {sla.max_latency_seconds}s")
        if tokens > sla.max_tokens_per_request:
            self.alert.send(severity="warning", message=f"SLA breach: {trace.name} {tokens} tokens > {sla.max_tokens_per_request}")

    async def shutdown(self) -> None:
        pass
```

## Putting It All Together

Register your full monitoring stack at application startup:

```python
from agents import add_trace_processor
from prometheus_client import start_http_server

# Start Prometheus metrics endpoint
start_http_server(8001)

# Register all monitoring processors
add_trace_processor(PrometheusTraceProcessor())
add_trace_processor(CostTracker(prometheus_metrics))
add_trace_processor(SLAEnforcementProcessor(pagerduty_client))
```

Production monitoring for AI agents is not an extension of traditional APM — it is a distinct discipline that accounts for the nondeterministic, token-consuming, multi-step nature of agentic workflows. Build your monitoring stack before your first production deployment, not after your first incident.

---

Source: https://callsphere.ai/blog/production-monitoring-alerting-ai-agent-systems
