Production Monitoring and Alerting for AI Agent Systems
Learn how to build production monitoring and alerting for AI agent systems including latency tracking, error rate dashboards, token usage analytics, alerting pipelines, and SLA enforcement.
Why Agent Systems Need Specialized Monitoring
Traditional API monitoring tracks request latency, error rates, and throughput. Agent systems demand all of that plus dimensions that do not exist in conventional backends: token consumption per request, LLM provider availability, tool execution success rates, and multi-agent handoff reliability.
An agent that responds successfully but consumes 50,000 tokens per request will bankrupt your LLM budget before your uptime dashboard shows a single red indicator. A tool that silently returns stale data will produce confident but wrong agent responses without triggering any error-rate alert. Production monitoring for agents requires purpose-built instrumentation.
Core Metrics to Track
Every agent monitoring system should capture these categories:
flowchart TD
START["Production Monitoring and Alerting for AI Agent S…"] --> A
A["Why Agent Systems Need Specialized Moni…"]
A --> B
B["Core Metrics to Track"]
B --> C
C["Building a Metrics Collection Processor"]
C --> D
D["Prometheus Integration"]
D --> E
E["Alerting Rules"]
E --> F
F["Token Cost Tracking"]
F --> G
G["SLA Enforcement"]
G --> H
H["Putting It All Together"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
Latency Metrics — Total end-to-end response time, LLM generation latency per call, tool execution latency per tool, and time-to-first-token for streaming responses.
Error Metrics — LLM API error rate (rate limits, timeouts, server errors), tool execution failure rate, agent loop terminations (max_turns exceeded), and guardrail violations.
Cost Metrics — Input and output tokens per request, total tokens per workflow, cost per request mapped to model pricing, and cumulative daily spend.
Quality Metrics — Guardrail trigger rate, conversation length before resolution, and tool retry rate.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Building a Metrics Collection Processor
The foundation is a trace processor that extracts metrics from every agent run and sends them to your metrics backend:
import time
from agents.tracing import TracingProcessor, Trace, Span
class MetricsCollector(TracingProcessor):
def __init__(self, metrics_client):
self.metrics = metrics_client
self._trace_start_times = {}
def on_trace_start(self, trace: Trace) -> None:
self._trace_start_times[trace.trace_id] = time.monotonic()
def on_span_end(self, span: Span) -> None:
duration_s = (span.end_time - span.start_time).total_seconds()
labels = {
"workflow": span.trace_name or "unknown",
"span_type": span.span_type,
"span_name": span.name,
}
# Latency histogram
self.metrics.histogram(
"agent.span.duration_seconds",
duration_s,
labels=labels,
)
if span.span_type == "generation":
model = span.data.get("model", "unknown") if span.data else "unknown"
input_tokens = span.data.get("input_tokens", 0) if span.data else 0
output_tokens = span.data.get("output_tokens", 0) if span.data else 0
self.metrics.histogram(
"agent.llm.duration_seconds",
duration_s,
labels={**labels, "model": model},
)
self.metrics.counter(
"agent.tokens.input_total",
input_tokens,
labels={"model": model, "workflow": labels["workflow"]},
)
self.metrics.counter(
"agent.tokens.output_total",
output_tokens,
labels={"model": model, "workflow": labels["workflow"]},
)
elif span.span_type == "function":
self.metrics.histogram(
"agent.tool.duration_seconds",
duration_s,
labels={"tool": span.name, "workflow": labels["workflow"]},
)
# Track tool errors
if span.data and span.data.get("error"):
self.metrics.counter(
"agent.tool.errors_total",
1,
labels={"tool": span.name, "workflow": labels["workflow"]},
)
def on_trace_end(self, trace: Trace) -> None:
start = self._trace_start_times.pop(trace.trace_id, None)
if start:
total_duration = time.monotonic() - start
self.metrics.histogram(
"agent.workflow.duration_seconds",
total_duration,
labels={"workflow": trace.name or "unknown"},
)
self.metrics.counter(
"agent.workflow.completions_total",
1,
labels={"workflow": trace.name or "unknown"},
)
async def shutdown(self) -> None:
pass
Prometheus Integration
For teams using Prometheus and Grafana, here is a concrete integration using the official Python client:
from prometheus_client import Histogram, Counter, Gauge
# Define Prometheus metrics
WORKFLOW_DURATION = Histogram(
"agent_workflow_duration_seconds",
"End-to-end agent workflow duration",
["workflow"],
buckets=[0.5, 1, 2, 5, 10, 30, 60, 120],
)
LLM_DURATION = Histogram(
"agent_llm_call_duration_seconds",
"Individual LLM call duration",
["model", "workflow"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
)
TOKEN_USAGE = Counter(
"agent_tokens_total",
"Total tokens consumed",
["model", "direction", "workflow"],
)
TOOL_DURATION = Histogram(
"agent_tool_duration_seconds",
"Tool execution duration",
["tool", "workflow"],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 5, 10],
)
TOOL_ERRORS = Counter(
"agent_tool_errors_total",
"Tool execution failures",
["tool", "workflow"],
)
ACTIVE_WORKFLOWS = Gauge(
"agent_active_workflows",
"Currently running agent workflows",
["workflow"],
)
Wire these into the trace processor:
class PrometheusTraceProcessor(TracingProcessor):
def on_trace_start(self, trace: Trace) -> None:
ACTIVE_WORKFLOWS.labels(workflow=trace.name or "unknown").inc()
def on_span_end(self, span: Span) -> None:
duration = (span.end_time - span.start_time).total_seconds()
workflow = span.trace_name or "unknown"
if span.span_type == "generation":
model = span.data.get("model", "unknown") if span.data else "unknown"
LLM_DURATION.labels(model=model, workflow=workflow).observe(duration)
input_tokens = span.data.get("input_tokens", 0) if span.data else 0
output_tokens = span.data.get("output_tokens", 0) if span.data else 0
TOKEN_USAGE.labels(model=model, direction="input", workflow=workflow).inc(input_tokens)
TOKEN_USAGE.labels(model=model, direction="output", workflow=workflow).inc(output_tokens)
elif span.span_type == "function":
TOOL_DURATION.labels(tool=span.name, workflow=workflow).observe(duration)
if span.data and span.data.get("error"):
TOOL_ERRORS.labels(tool=span.name, workflow=workflow).inc()
def on_trace_end(self, trace: Trace) -> None:
workflow = trace.name or "unknown"
ACTIVE_WORKFLOWS.labels(workflow=workflow).dec()
total = (trace.end_time - trace.start_time).total_seconds()
WORKFLOW_DURATION.labels(workflow=workflow).observe(total)
async def shutdown(self) -> None:
pass
Alerting Rules
Metrics without alerts are dashboards nobody watches. Here are essential alerting rules for agent systems:
# Prometheus alerting rules (alerts.yml)
groups:
- name: agent_alerts
rules:
# High latency alert
- alert: AgentWorkflowSlowResponse
expr: |
histogram_quantile(0.95,
rate(agent_workflow_duration_seconds_bucket[5m])
) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Agent workflow p95 latency exceeds 30 seconds"
description: "Workflow {{ $labels.workflow }} p95 latency is {{ $value }}s"
# LLM API error rate
- alert: AgentLLMHighErrorRate
expr: |
rate(agent_llm_errors_total[5m])
/ rate(agent_llm_calls_total[5m]) > 0.05
for: 3m
labels:
severity: critical
annotations:
summary: "LLM error rate exceeds 5%"
# Token budget alert
- alert: AgentTokenBudgetExceeded
expr: |
sum(increase(agent_tokens_total[1h])) > 1000000
for: 1m
labels:
severity: warning
annotations:
summary: "Agent token consumption exceeds 1M tokens per hour"
# Tool failure rate
- alert: AgentToolHighFailureRate
expr: |
rate(agent_tool_errors_total[5m])
/ rate(agent_tool_duration_seconds_count[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Tool {{ $labels.tool }} failure rate exceeds 10%"
# Stuck workflows
- alert: AgentWorkflowStuck
expr: agent_active_workflows > 0
for: 10m
labels:
severity: critical
annotations:
summary: "Agent workflows stuck for over 10 minutes"
Token Cost Tracking
Token usage directly translates to cost. Build a cost tracking layer on top of your token metrics:
MODEL_PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"gpt-4.1": {"input": 2.00 / 1_000_000, "output": 8.00 / 1_000_000},
"gpt-4.1-mini": {"input": 0.40 / 1_000_000, "output": 1.60 / 1_000_000},
"gpt-4.1-nano": {"input": 0.10 / 1_000_000, "output": 0.40 / 1_000_000},
}
class CostTracker(TracingProcessor):
def __init__(self, metrics_client):
self.metrics = metrics_client
def on_span_end(self, span: Span) -> None:
if span.span_type != "generation" or not span.data:
return
model = span.data.get("model", "")
pricing = MODEL_PRICING.get(model)
if not pricing:
return
input_tokens = span.data.get("input_tokens", 0)
output_tokens = span.data.get("output_tokens", 0)
input_cost = input_tokens * pricing["input"]
output_cost = output_tokens * pricing["output"]
total_cost = input_cost + output_cost
self.metrics.counter(
"agent.cost.dollars_total",
total_cost,
labels={
"model": model,
"workflow": span.trace_name or "unknown",
"cost_type": "total",
},
)
def on_trace_end(self, trace: Trace) -> None:
pass
async def shutdown(self) -> None:
pass
With this processor running, you can set budget alerts: "Alert me when daily spend exceeds $50" or "Alert when any single workflow costs more than $0.50 per execution."
SLA Enforcement
Define SLAs for your agent system and enforce them programmatically:
from dataclasses import dataclass
@dataclass
class WorkflowSLA:
max_latency_seconds: float
max_tokens_per_request: int
WORKFLOW_SLAS = {
"customer-support": WorkflowSLA(max_latency_seconds=15.0, max_tokens_per_request=8000),
"document-analysis": WorkflowSLA(max_latency_seconds=60.0, max_tokens_per_request=50000),
}
class SLAEnforcementProcessor(TracingProcessor):
def __init__(self, alert_service):
self.alert = alert_service
self._trace_tokens = {}
def on_trace_start(self, trace: Trace) -> None:
self._trace_tokens[trace.trace_id] = 0
def on_span_end(self, span: Span) -> None:
if span.span_type == "generation" and span.data:
self._trace_tokens[span.trace_id] = self._trace_tokens.get(span.trace_id, 0) + (
span.data.get("input_tokens", 0) + span.data.get("output_tokens", 0)
)
def on_trace_end(self, trace: Trace) -> None:
sla = WORKFLOW_SLAS.get(trace.name or "")
tokens = self._trace_tokens.pop(trace.trace_id, 0)
if not sla:
return
duration = (trace.end_time - trace.start_time).total_seconds()
if duration > sla.max_latency_seconds:
self.alert.send(severity="warning", message=f"SLA breach: {trace.name} {duration:.1f}s > {sla.max_latency_seconds}s")
if tokens > sla.max_tokens_per_request:
self.alert.send(severity="warning", message=f"SLA breach: {trace.name} {tokens} tokens > {sla.max_tokens_per_request}")
async def shutdown(self) -> None:
pass
Putting It All Together
Register your full monitoring stack at application startup:
from agents import add_trace_processor
from prometheus_client import start_http_server
# Start Prometheus metrics endpoint
start_http_server(8001)
# Register all monitoring processors
add_trace_processor(PrometheusTraceProcessor())
add_trace_processor(CostTracker(prometheus_metrics))
add_trace_processor(SLAEnforcementProcessor(pagerduty_client))
Production monitoring for AI agents is not an extension of traditional APM — it is a distinct discipline that accounts for the nondeterministic, token-consuming, multi-step nature of agentic workflows. Build your monitoring stack before your first production deployment, not after your first incident.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.