Skip to content
Learn Agentic AI
Learn Agentic AI14 min read8 views

Production Monitoring and Alerting for AI Agent Systems

Learn how to build production monitoring and alerting for AI agent systems including latency tracking, error rate dashboards, token usage analytics, alerting pipelines, and SLA enforcement.

Why Agent Systems Need Specialized Monitoring

Traditional API monitoring tracks request latency, error rates, and throughput. Agent systems demand all of that plus dimensions that do not exist in conventional backends: token consumption per request, LLM provider availability, tool execution success rates, and multi-agent handoff reliability.

An agent that responds successfully but consumes 50,000 tokens per request will bankrupt your LLM budget before your uptime dashboard shows a single red indicator. A tool that silently returns stale data will produce confident but wrong agent responses without triggering any error-rate alert. Production monitoring for agents requires purpose-built instrumentation.

Core Metrics to Track

Every agent monitoring system should capture these categories:

flowchart TD
    START["Production Monitoring and Alerting for AI Agent S…"] --> A
    A["Why Agent Systems Need Specialized Moni…"]
    A --> B
    B["Core Metrics to Track"]
    B --> C
    C["Building a Metrics Collection Processor"]
    C --> D
    D["Prometheus Integration"]
    D --> E
    E["Alerting Rules"]
    E --> F
    F["Token Cost Tracking"]
    F --> G
    G["SLA Enforcement"]
    G --> H
    H["Putting It All Together"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff

Latency Metrics — Total end-to-end response time, LLM generation latency per call, tool execution latency per tool, and time-to-first-token for streaming responses.

Error Metrics — LLM API error rate (rate limits, timeouts, server errors), tool execution failure rate, agent loop terminations (max_turns exceeded), and guardrail violations.

Cost Metrics — Input and output tokens per request, total tokens per workflow, cost per request mapped to model pricing, and cumulative daily spend.

Quality Metrics — Guardrail trigger rate, conversation length before resolution, and tool retry rate.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Building a Metrics Collection Processor

The foundation is a trace processor that extracts metrics from every agent run and sends them to your metrics backend:

import time
from agents.tracing import TracingProcessor, Trace, Span

class MetricsCollector(TracingProcessor):
    def __init__(self, metrics_client):
        self.metrics = metrics_client
        self._trace_start_times = {}

    def on_trace_start(self, trace: Trace) -> None:
        self._trace_start_times[trace.trace_id] = time.monotonic()

    def on_span_end(self, span: Span) -> None:
        duration_s = (span.end_time - span.start_time).total_seconds()
        labels = {
            "workflow": span.trace_name or "unknown",
            "span_type": span.span_type,
            "span_name": span.name,
        }

        # Latency histogram
        self.metrics.histogram(
            "agent.span.duration_seconds",
            duration_s,
            labels=labels,
        )

        if span.span_type == "generation":
            model = span.data.get("model", "unknown") if span.data else "unknown"
            input_tokens = span.data.get("input_tokens", 0) if span.data else 0
            output_tokens = span.data.get("output_tokens", 0) if span.data else 0

            self.metrics.histogram(
                "agent.llm.duration_seconds",
                duration_s,
                labels={**labels, "model": model},
            )
            self.metrics.counter(
                "agent.tokens.input_total",
                input_tokens,
                labels={"model": model, "workflow": labels["workflow"]},
            )
            self.metrics.counter(
                "agent.tokens.output_total",
                output_tokens,
                labels={"model": model, "workflow": labels["workflow"]},
            )

        elif span.span_type == "function":
            self.metrics.histogram(
                "agent.tool.duration_seconds",
                duration_s,
                labels={"tool": span.name, "workflow": labels["workflow"]},
            )
            # Track tool errors
            if span.data and span.data.get("error"):
                self.metrics.counter(
                    "agent.tool.errors_total",
                    1,
                    labels={"tool": span.name, "workflow": labels["workflow"]},
                )

    def on_trace_end(self, trace: Trace) -> None:
        start = self._trace_start_times.pop(trace.trace_id, None)
        if start:
            total_duration = time.monotonic() - start
            self.metrics.histogram(
                "agent.workflow.duration_seconds",
                total_duration,
                labels={"workflow": trace.name or "unknown"},
            )
        self.metrics.counter(
            "agent.workflow.completions_total",
            1,
            labels={"workflow": trace.name or "unknown"},
        )

    async def shutdown(self) -> None:
        pass

Prometheus Integration

For teams using Prometheus and Grafana, here is a concrete integration using the official Python client:

from prometheus_client import Histogram, Counter, Gauge

# Define Prometheus metrics
WORKFLOW_DURATION = Histogram(
    "agent_workflow_duration_seconds",
    "End-to-end agent workflow duration",
    ["workflow"],
    buckets=[0.5, 1, 2, 5, 10, 30, 60, 120],
)

LLM_DURATION = Histogram(
    "agent_llm_call_duration_seconds",
    "Individual LLM call duration",
    ["model", "workflow"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
)

TOKEN_USAGE = Counter(
    "agent_tokens_total",
    "Total tokens consumed",
    ["model", "direction", "workflow"],
)

TOOL_DURATION = Histogram(
    "agent_tool_duration_seconds",
    "Tool execution duration",
    ["tool", "workflow"],
    buckets=[0.01, 0.05, 0.1, 0.5, 1, 5, 10],
)

TOOL_ERRORS = Counter(
    "agent_tool_errors_total",
    "Tool execution failures",
    ["tool", "workflow"],
)

ACTIVE_WORKFLOWS = Gauge(
    "agent_active_workflows",
    "Currently running agent workflows",
    ["workflow"],
)

Wire these into the trace processor:

class PrometheusTraceProcessor(TracingProcessor):
    def on_trace_start(self, trace: Trace) -> None:
        ACTIVE_WORKFLOWS.labels(workflow=trace.name or "unknown").inc()

    def on_span_end(self, span: Span) -> None:
        duration = (span.end_time - span.start_time).total_seconds()
        workflow = span.trace_name or "unknown"

        if span.span_type == "generation":
            model = span.data.get("model", "unknown") if span.data else "unknown"
            LLM_DURATION.labels(model=model, workflow=workflow).observe(duration)

            input_tokens = span.data.get("input_tokens", 0) if span.data else 0
            output_tokens = span.data.get("output_tokens", 0) if span.data else 0
            TOKEN_USAGE.labels(model=model, direction="input", workflow=workflow).inc(input_tokens)
            TOKEN_USAGE.labels(model=model, direction="output", workflow=workflow).inc(output_tokens)

        elif span.span_type == "function":
            TOOL_DURATION.labels(tool=span.name, workflow=workflow).observe(duration)
            if span.data and span.data.get("error"):
                TOOL_ERRORS.labels(tool=span.name, workflow=workflow).inc()

    def on_trace_end(self, trace: Trace) -> None:
        workflow = trace.name or "unknown"
        ACTIVE_WORKFLOWS.labels(workflow=workflow).dec()
        total = (trace.end_time - trace.start_time).total_seconds()
        WORKFLOW_DURATION.labels(workflow=workflow).observe(total)

    async def shutdown(self) -> None:
        pass

Alerting Rules

Metrics without alerts are dashboards nobody watches. Here are essential alerting rules for agent systems:

# Prometheus alerting rules (alerts.yml)
groups:
  - name: agent_alerts
    rules:
      # High latency alert
      - alert: AgentWorkflowSlowResponse
        expr: |
          histogram_quantile(0.95,
            rate(agent_workflow_duration_seconds_bucket[5m])
          ) > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Agent workflow p95 latency exceeds 30 seconds"
          description: "Workflow {{ $labels.workflow }} p95 latency is {{ $value }}s"

      # LLM API error rate
      - alert: AgentLLMHighErrorRate
        expr: |
          rate(agent_llm_errors_total[5m])
          / rate(agent_llm_calls_total[5m]) > 0.05
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "LLM error rate exceeds 5%"

      # Token budget alert
      - alert: AgentTokenBudgetExceeded
        expr: |
          sum(increase(agent_tokens_total[1h])) > 1000000
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "Agent token consumption exceeds 1M tokens per hour"

      # Tool failure rate
      - alert: AgentToolHighFailureRate
        expr: |
          rate(agent_tool_errors_total[5m])
          / rate(agent_tool_duration_seconds_count[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Tool {{ $labels.tool }} failure rate exceeds 10%"

      # Stuck workflows
      - alert: AgentWorkflowStuck
        expr: agent_active_workflows > 0
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Agent workflows stuck for over 10 minutes"

Token Cost Tracking

Token usage directly translates to cost. Build a cost tracking layer on top of your token metrics:

MODEL_PRICING = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "gpt-4.1": {"input": 2.00 / 1_000_000, "output": 8.00 / 1_000_000},
    "gpt-4.1-mini": {"input": 0.40 / 1_000_000, "output": 1.60 / 1_000_000},
    "gpt-4.1-nano": {"input": 0.10 / 1_000_000, "output": 0.40 / 1_000_000},
}

class CostTracker(TracingProcessor):
    def __init__(self, metrics_client):
        self.metrics = metrics_client

    def on_span_end(self, span: Span) -> None:
        if span.span_type != "generation" or not span.data:
            return

        model = span.data.get("model", "")
        pricing = MODEL_PRICING.get(model)
        if not pricing:
            return

        input_tokens = span.data.get("input_tokens", 0)
        output_tokens = span.data.get("output_tokens", 0)

        input_cost = input_tokens * pricing["input"]
        output_cost = output_tokens * pricing["output"]
        total_cost = input_cost + output_cost

        self.metrics.counter(
            "agent.cost.dollars_total",
            total_cost,
            labels={
                "model": model,
                "workflow": span.trace_name or "unknown",
                "cost_type": "total",
            },
        )

    def on_trace_end(self, trace: Trace) -> None:
        pass

    async def shutdown(self) -> None:
        pass

With this processor running, you can set budget alerts: "Alert me when daily spend exceeds $50" or "Alert when any single workflow costs more than $0.50 per execution."

SLA Enforcement

Define SLAs for your agent system and enforce them programmatically:

from dataclasses import dataclass

@dataclass
class WorkflowSLA:
    max_latency_seconds: float
    max_tokens_per_request: int

WORKFLOW_SLAS = {
    "customer-support": WorkflowSLA(max_latency_seconds=15.0, max_tokens_per_request=8000),
    "document-analysis": WorkflowSLA(max_latency_seconds=60.0, max_tokens_per_request=50000),
}

class SLAEnforcementProcessor(TracingProcessor):
    def __init__(self, alert_service):
        self.alert = alert_service
        self._trace_tokens = {}

    def on_trace_start(self, trace: Trace) -> None:
        self._trace_tokens[trace.trace_id] = 0

    def on_span_end(self, span: Span) -> None:
        if span.span_type == "generation" and span.data:
            self._trace_tokens[span.trace_id] = self._trace_tokens.get(span.trace_id, 0) + (
                span.data.get("input_tokens", 0) + span.data.get("output_tokens", 0)
            )

    def on_trace_end(self, trace: Trace) -> None:
        sla = WORKFLOW_SLAS.get(trace.name or "")
        tokens = self._trace_tokens.pop(trace.trace_id, 0)
        if not sla:
            return
        duration = (trace.end_time - trace.start_time).total_seconds()
        if duration > sla.max_latency_seconds:
            self.alert.send(severity="warning", message=f"SLA breach: {trace.name} {duration:.1f}s > {sla.max_latency_seconds}s")
        if tokens > sla.max_tokens_per_request:
            self.alert.send(severity="warning", message=f"SLA breach: {trace.name} {tokens} tokens > {sla.max_tokens_per_request}")

    async def shutdown(self) -> None:
        pass

Putting It All Together

Register your full monitoring stack at application startup:

from agents import add_trace_processor
from prometheus_client import start_http_server

# Start Prometheus metrics endpoint
start_http_server(8001)

# Register all monitoring processors
add_trace_processor(PrometheusTraceProcessor())
add_trace_processor(CostTracker(prometheus_metrics))
add_trace_processor(SLAEnforcementProcessor(pagerduty_client))

Production monitoring for AI agents is not an extension of traditional APM — it is a distinct discipline that accounts for the nondeterministic, token-consuming, multi-step nature of agentic workflows. Build your monitoring stack before your first production deployment, not after your first incident.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.