Building Custom Trace Processors for Agent Observability
Build custom trace processors and exporters for the OpenAI Agents SDK to ship agent telemetry to Elasticsearch, Datadog, or any backend using TraceProvider, BatchTraceProcessor, and BackendSpanExporter.
Why Custom Trace Processors Matter
The OpenAI Agents SDK ships with a default trace processor that sends spans to the OpenAI dashboard. For development and small-scale deployments, this is sufficient. But production systems need more — you need agent telemetry alongside your application metrics in Elasticsearch, Datadog, Grafana, or your own analytics backend. Custom trace processors let you intercept every span the SDK generates and route it wherever you need.
This post covers the full architecture: implementing a custom TracingProcessor, building a BackendSpanExporter, wiring everything together with TraceProvider, and optimizing throughput with BatchTraceProcessor.
The Tracing Architecture
The Agents SDK tracing pipeline has four components:
flowchart TD
START["Building Custom Trace Processors for Agent Observ…"] --> A
A["Why Custom Trace Processors Matter"]
A --> B
B["The Tracing Architecture"]
B --> C
C["Implementing a Custom TracingProcessor"]
C --> D
D["Building the Elasticsearch Exporter"]
D --> E
E["Registering with TraceProvider"]
E --> F
F["Building a BatchTraceProcessor for High…"]
F --> G
G["Creating Kibana Dashboards"]
G --> H
H["Practical Observability Patterns"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
- Spans — Individual units of work (agent invocation, tool call, generation, handoff)
- TracingProcessor — Receives span lifecycle events (start, end) and decides what to do with them
- Exporter — Ships completed spans to an external backend
- TraceProvider — The global registry that connects processors to the SDK
The default setup sends everything to OpenAI. To add your own backend, you implement a custom processor and exporter, then register them with the TraceProvider.
Implementing a Custom TracingProcessor
The TracingProcessor protocol defines the interface your processor must implement:
from agents.tracing import (
TracingProcessor,
Span,
Trace,
)
from typing import Any
import json
import time
class ElasticsearchTracingProcessor(TracingProcessor):
"""Processor that collects spans and exports them to Elasticsearch."""
def __init__(self, exporter: "ElasticsearchSpanExporter"):
self._exporter = exporter
self._active_traces: dict[str, dict[str, Any]] = {}
def on_trace_start(self, trace: Trace) -> None:
"""Called when a new trace begins."""
self._active_traces[trace.trace_id] = {
"trace_id": trace.trace_id,
"workflow_name": trace.workflow_name,
"group_id": trace.group_id,
"metadata": trace.metadata,
"started_at": time.time(),
"spans": [],
}
def on_trace_end(self, trace: Trace) -> None:
"""Called when a trace completes. Flush all collected spans."""
trace_data = self._active_traces.pop(trace.trace_id, None)
if trace_data:
trace_data["ended_at"] = time.time()
trace_data["duration_ms"] = (
(trace_data["ended_at"] - trace_data["started_at"]) * 1000
)
self._exporter.export_trace(trace_data)
def on_span_start(self, span: Span) -> None:
"""Called when a span begins within an active trace."""
trace_data = self._active_traces.get(span.trace_id)
if trace_data:
trace_data["spans"].append({
"span_id": span.span_id,
"parent_id": span.parent_id,
"span_type": span.span_type,
"name": span.name,
"started_at": time.time(),
})
def on_span_end(self, span: Span) -> None:
"""Called when a span completes."""
trace_data = self._active_traces.get(span.trace_id)
if trace_data:
for s in trace_data["spans"]:
if s["span_id"] == span.span_id:
s["ended_at"] = time.time()
s["duration_ms"] = (s["ended_at"] - s["started_at"]) * 1000
s["status"] = span.status
s["attributes"] = span.attributes
break
def shutdown(self) -> None:
"""Flush any remaining data on shutdown."""
for trace_id in list(self._active_traces.keys()):
trace_data = self._active_traces.pop(trace_id)
self._exporter.export_trace(trace_data)
self._exporter.flush()
def force_flush(self) -> None:
"""Force export of all buffered data."""
self._exporter.flush()
The four lifecycle methods — on_trace_start, on_trace_end, on_span_start, and on_span_end — give you access to every event in the tracing pipeline.
Building the Elasticsearch Exporter
The exporter handles the actual transport of span data to your backend:
flowchart TD
CENTER(("Core Concepts"))
CENTER --> N0["Spans — Individual units of work agent …"]
CENTER --> N1["TracingProcessor — Receives span lifecy…"]
CENTER --> N2["Exporter — Ships completed spans to an …"]
CENTER --> N3["TraceProvider — The global registry tha…"]
style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
from elasticsearch import Elasticsearch, helpers
from typing import Any
import threading
import queue
import logging
logger = logging.getLogger(__name__)
class ElasticsearchSpanExporter:
"""Exports agent traces to Elasticsearch."""
def __init__(
self,
es_url: str = "http://localhost:9200",
index_prefix: str = "agent-traces",
api_key: str | None = None,
batch_size: int = 50,
flush_interval: float = 5.0,
):
self._client = Elasticsearch(
es_url,
api_key=api_key,
)
self._index_prefix = index_prefix
self._batch_size = batch_size
self._buffer: queue.Queue[dict[str, Any]] = queue.Queue()
self._flush_interval = flush_interval
self._shutdown_event = threading.Event()
# Background flush thread
self._flush_thread = threading.Thread(
target=self._periodic_flush, daemon=True
)
self._flush_thread.start()
def export_trace(self, trace_data: dict[str, Any]) -> None:
"""Queue a trace for export."""
# Index the trace summary
self._buffer.put({
"_index": f"{self._index_prefix}-traces",
"_id": trace_data["trace_id"],
"_source": {
"trace_id": trace_data["trace_id"],
"workflow_name": trace_data["workflow_name"],
"group_id": trace_data.get("group_id"),
"metadata": trace_data.get("metadata", {}),
"duration_ms": trace_data.get("duration_ms", 0),
"span_count": len(trace_data.get("spans", [])),
"timestamp": trace_data["started_at"],
},
})
# Index each span separately for granular querying
for span in trace_data.get("spans", []):
self._buffer.put({
"_index": f"{self._index_prefix}-spans",
"_id": span["span_id"],
"_source": {
"trace_id": trace_data["trace_id"],
"span_id": span["span_id"],
"parent_id": span.get("parent_id"),
"span_type": span["span_type"],
"name": span.get("name"),
"duration_ms": span.get("duration_ms", 0),
"status": span.get("status"),
"attributes": span.get("attributes", {}),
"timestamp": span["started_at"],
},
})
if self._buffer.qsize() >= self._batch_size:
self.flush()
def flush(self) -> None:
"""Flush buffered documents to Elasticsearch."""
actions = []
while not self._buffer.empty():
try:
actions.append(self._buffer.get_nowait())
except queue.Empty:
break
if actions:
try:
helpers.bulk(self._client, actions)
logger.info(f"Exported {len(actions)} documents to Elasticsearch")
except Exception as e:
logger.error(f"Failed to export to Elasticsearch: {e}")
# Re-queue failed documents
for action in actions:
self._buffer.put(action)
def _periodic_flush(self) -> None:
"""Periodically flush the buffer."""
while not self._shutdown_event.is_set():
self._shutdown_event.wait(self._flush_interval)
if not self._buffer.empty():
self.flush()
def close(self) -> None:
"""Shutdown the exporter."""
self._shutdown_event.set()
self._flush_thread.join(timeout=10)
self.flush()
self._client.close()
Registering with TraceProvider
With both the processor and exporter built, register them with the global TraceProvider:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from agents.tracing import set_trace_processors
import os
# Create the exporter
es_exporter = ElasticsearchSpanExporter(
es_url=os.environ["ELASTICSEARCH_URL"],
api_key=os.environ.get("ELASTICSEARCH_API_KEY"),
index_prefix="agent-traces",
batch_size=100,
flush_interval=3.0,
)
# Create the processor
es_processor = ElasticsearchTracingProcessor(exporter=es_exporter)
# Register — this adds your processor alongside the default OpenAI processor
set_trace_processors([es_processor])
Now every trace produced by any Runner.run() call will flow through both the default OpenAI processor and your Elasticsearch processor.
Building a BatchTraceProcessor for High Throughput
For high-volume systems, processing spans one at a time is inefficient. A batch processor collects spans and exports them in bulk:
import threading
import time
from typing import Any
class BatchTraceProcessor(TracingProcessor):
"""Batches spans before exporting for higher throughput."""
def __init__(
self,
exporter: ElasticsearchSpanExporter,
max_batch_size: int = 200,
flush_interval: float = 2.0,
max_queue_size: int = 10000,
):
self._exporter = exporter
self._max_batch_size = max_batch_size
self._flush_interval = flush_interval
self._span_buffer: list[dict[str, Any]] = []
self._lock = threading.Lock()
self._shutdown = threading.Event()
self._flush_thread = threading.Thread(
target=self._auto_flush, daemon=True
)
self._flush_thread.start()
def on_trace_start(self, trace: Trace) -> None:
pass # We handle data at the span level
def on_trace_end(self, trace: Trace) -> None:
self._maybe_flush()
def on_span_start(self, span: Span) -> None:
pass # Buffer on end, not start
def on_span_end(self, span: Span) -> None:
with self._lock:
self._span_buffer.append({
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_id": span.parent_id,
"span_type": span.span_type,
"name": span.name,
"duration_ms": getattr(span, "duration_ms", 0),
"status": span.status,
"attributes": span.attributes,
"timestamp": time.time(),
})
self._maybe_flush()
def _maybe_flush(self) -> None:
with self._lock:
if len(self._span_buffer) >= self._max_batch_size:
batch = self._span_buffer[:]
self._span_buffer.clear()
if batch:
self._export_batch(batch)
def _export_batch(self, batch: list[dict]) -> None:
for span_data in batch:
self._exporter.export_trace({"spans": [span_data], "trace_id": span_data["trace_id"]})
self._exporter.flush()
def _auto_flush(self) -> None:
while not self._shutdown.is_set():
self._shutdown.wait(self._flush_interval)
self._maybe_flush()
def shutdown(self) -> None:
self._shutdown.set()
self._flush_thread.join(timeout=10)
self._maybe_flush()
def force_flush(self) -> None:
self._maybe_flush()
Creating Kibana Dashboards
With agent traces in Elasticsearch, you can build powerful Kibana dashboards:
{
"agent_performance_dashboard": {
"panels": [
{
"title": "Average Agent Run Duration (ms)",
"type": "metric",
"query": {
"aggs": {
"avg_duration": {
"avg": { "field": "duration_ms" }
}
}
}
},
{
"title": "Tool Call Latency Distribution",
"type": "histogram",
"query": {
"bool": {
"filter": [
{ "term": { "span_type": "tool_call" } }
]
}
}
},
{
"title": "Handoff Frequency by Agent Pair",
"type": "heatmap",
"query": {
"bool": {
"filter": [
{ "term": { "span_type": "handoff" } }
]
}
}
}
]
}
}
Practical Observability Patterns
Here are the patterns that matter most in production agent observability:
Latency percentiles — Track p50, p95, and p99 for each agent and tool call. A single slow tool can dominate user-perceived latency.
Handoff heatmaps — Visualize which agents hand off to which others. Unexpected handoff patterns reveal confusion in the triage logic.
Token consumption by agent — Track input and output tokens per agent to identify which agents are the most expensive.
Error rate by span type — Separate tool call errors from generation errors from guardrail blocks. Each requires a different remediation.
Trace depth monitoring — If a trace has more than 10-15 agent spans, the orchestration logic likely needs simplification.
Custom trace processors transform agent traces from a debugging tool into a full observability platform. Start with Elasticsearch for flexible querying, add Kibana dashboards for visualization, and set up alerting rules for latency and error rate thresholds. The investment pays off the first time you diagnose a production issue in minutes instead of hours.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.