---
title: "Building Custom Trace Processors for Agent Observability"
description: "Build custom trace processors and exporters for the OpenAI Agents SDK to ship agent telemetry to Elasticsearch, Datadog, or any backend using TraceProvider, BatchTraceProcessor, and BackendSpanExporter."
canonical: https://callsphere.ai/blog/building-custom-trace-processors-agent-observability
category: "Learn Agentic AI"
tags: ["OpenAI", "Trace Processor", "Custom", "Observability"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-05-06T22:04:23.280Z
---

# Building Custom Trace Processors for Agent Observability

> Build custom trace processors and exporters for the OpenAI Agents SDK to ship agent telemetry to Elasticsearch, Datadog, or any backend using TraceProvider, BatchTraceProcessor, and BackendSpanExporter.

## Why Custom Trace Processors Matter

The OpenAI Agents SDK ships with a default trace processor that sends spans to the OpenAI dashboard. For development and small-scale deployments, this is sufficient. But production systems need more — you need agent telemetry alongside your application metrics in Elasticsearch, Datadog, Grafana, or your own analytics backend. Custom trace processors let you intercept every span the SDK generates and route it wherever you need.

This post covers the full architecture: implementing a custom `TracingProcessor`, building a `BackendSpanExporter`, wiring everything together with `TraceProvider`, and optimizing throughput with `BatchTraceProcessor`.

## The Tracing Architecture

The Agents SDK tracing pipeline has four components:

```mermaid
flowchart LR
    APP(["Agent or API"])
    SDK["OTel SDK
GenAI conventions"]
    COL["OTel Collector"]
    subgraph BACKENDS["Backends"]
        TR[("Traces
Tempo or Honeycomb")]
        MET[("Metrics
Prometheus")]
        LOG[("Logs
Loki or ELK")]
    end
    DASH["Grafana plus alerts"]
    PAGE(["Pager"])
    APP --> SDK --> COL
    COL --> TR
    COL --> MET
    COL --> LOG
    TR --> DASH
    MET --> DASH
    LOG --> DASH
    DASH --> PAGE
    style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
    style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
    style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
```

1. **Spans** — Individual units of work (agent invocation, tool call, generation, handoff)
2. **TracingProcessor** — Receives span lifecycle events (start, end) and decides what to do with them
3. **Exporter** — Ships completed spans to an external backend
4. **TraceProvider** — The global registry that connects processors to the SDK

The default setup sends everything to OpenAI. To add your own backend, you implement a custom processor and exporter, then register them with the `TraceProvider`.

## Implementing a Custom TracingProcessor

The `TracingProcessor` protocol defines the interface your processor must implement:

```python
from agents.tracing import (
    TracingProcessor,
    Span,
    Trace,
)
from typing import Any
import json
import time

class ElasticsearchTracingProcessor(TracingProcessor):
    """Processor that collects spans and exports them to Elasticsearch."""

    def __init__(self, exporter: "ElasticsearchSpanExporter"):
        self._exporter = exporter
        self._active_traces: dict[str, dict[str, Any]] = {}

    def on_trace_start(self, trace: Trace) -> None:
        """Called when a new trace begins."""
        self._active_traces[trace.trace_id] = {
            "trace_id": trace.trace_id,
            "workflow_name": trace.workflow_name,
            "group_id": trace.group_id,
            "metadata": trace.metadata,
            "started_at": time.time(),
            "spans": [],
        }

    def on_trace_end(self, trace: Trace) -> None:
        """Called when a trace completes. Flush all collected spans."""
        trace_data = self._active_traces.pop(trace.trace_id, None)
        if trace_data:
            trace_data["ended_at"] = time.time()
            trace_data["duration_ms"] = (
                (trace_data["ended_at"] - trace_data["started_at"]) * 1000
            )
            self._exporter.export_trace(trace_data)

    def on_span_start(self, span: Span) -> None:
        """Called when a span begins within an active trace."""
        trace_data = self._active_traces.get(span.trace_id)
        if trace_data:
            trace_data["spans"].append({
                "span_id": span.span_id,
                "parent_id": span.parent_id,
                "span_type": span.span_type,
                "name": span.name,
                "started_at": time.time(),
            })

    def on_span_end(self, span: Span) -> None:
        """Called when a span completes."""
        trace_data = self._active_traces.get(span.trace_id)
        if trace_data:
            for s in trace_data["spans"]:
                if s["span_id"] == span.span_id:
                    s["ended_at"] = time.time()
                    s["duration_ms"] = (s["ended_at"] - s["started_at"]) * 1000
                    s["status"] = span.status
                    s["attributes"] = span.attributes
                    break

    def shutdown(self) -> None:
        """Flush any remaining data on shutdown."""
        for trace_id in list(self._active_traces.keys()):
            trace_data = self._active_traces.pop(trace_id)
            self._exporter.export_trace(trace_data)
        self._exporter.flush()

    def force_flush(self) -> None:
        """Force export of all buffered data."""
        self._exporter.flush()
```

The four lifecycle methods — `on_trace_start`, `on_trace_end`, `on_span_start`, and `on_span_end` — give you access to every event in the tracing pipeline.

## Building the Elasticsearch Exporter

The exporter handles the actual transport of span data to your backend:

```python
from elasticsearch import Elasticsearch, helpers
from typing import Any
import threading
import queue
import logging

logger = logging.getLogger(__name__)

class ElasticsearchSpanExporter:
    """Exports agent traces to Elasticsearch."""

    def __init__(
        self,
        es_url: str = "http://localhost:9200",
        index_prefix: str = "agent-traces",
        api_key: str | None = None,
        batch_size: int = 50,
        flush_interval: float = 5.0,
    ):
        self._client = Elasticsearch(
            es_url,
            api_key=api_key,
        )
        self._index_prefix = index_prefix
        self._batch_size = batch_size
        self._buffer: queue.Queue[dict[str, Any]] = queue.Queue()
        self._flush_interval = flush_interval
        self._shutdown_event = threading.Event()

        # Background flush thread
        self._flush_thread = threading.Thread(
            target=self._periodic_flush, daemon=True
        )
        self._flush_thread.start()

    def export_trace(self, trace_data: dict[str, Any]) -> None:
        """Queue a trace for export."""
        # Index the trace summary
        self._buffer.put({
            "_index": f"{self._index_prefix}-traces",
            "_id": trace_data["trace_id"],
            "_source": {
                "trace_id": trace_data["trace_id"],
                "workflow_name": trace_data["workflow_name"],
                "group_id": trace_data.get("group_id"),
                "metadata": trace_data.get("metadata", {}),
                "duration_ms": trace_data.get("duration_ms", 0),
                "span_count": len(trace_data.get("spans", [])),
                "timestamp": trace_data["started_at"],
            },
        })

        # Index each span separately for granular querying
        for span in trace_data.get("spans", []):
            self._buffer.put({
                "_index": f"{self._index_prefix}-spans",
                "_id": span["span_id"],
                "_source": {
                    "trace_id": trace_data["trace_id"],
                    "span_id": span["span_id"],
                    "parent_id": span.get("parent_id"),
                    "span_type": span["span_type"],
                    "name": span.get("name"),
                    "duration_ms": span.get("duration_ms", 0),
                    "status": span.get("status"),
                    "attributes": span.get("attributes", {}),
                    "timestamp": span["started_at"],
                },
            })

        if self._buffer.qsize() >= self._batch_size:
            self.flush()

    def flush(self) -> None:
        """Flush buffered documents to Elasticsearch."""
        actions = []
        while not self._buffer.empty():
            try:
                actions.append(self._buffer.get_nowait())
            except queue.Empty:
                break

        if actions:
            try:
                helpers.bulk(self._client, actions)
                logger.info(f"Exported {len(actions)} documents to Elasticsearch")
            except Exception as e:
                logger.error(f"Failed to export to Elasticsearch: {e}")
                # Re-queue failed documents
                for action in actions:
                    self._buffer.put(action)

    def _periodic_flush(self) -> None:
        """Periodically flush the buffer."""
        while not self._shutdown_event.is_set():
            self._shutdown_event.wait(self._flush_interval)
            if not self._buffer.empty():
                self.flush()

    def close(self) -> None:
        """Shutdown the exporter."""
        self._shutdown_event.set()
        self._flush_thread.join(timeout=10)
        self.flush()
        self._client.close()
```

## Registering with TraceProvider

With both the processor and exporter built, register them with the global `TraceProvider`:

```python
from agents.tracing import set_trace_processors
import os

# Create the exporter
es_exporter = ElasticsearchSpanExporter(
    es_url=os.environ["ELASTICSEARCH_URL"],
    api_key=os.environ.get("ELASTICSEARCH_API_KEY"),
    index_prefix="agent-traces",
    batch_size=100,
    flush_interval=3.0,
)

# Create the processor
es_processor = ElasticsearchTracingProcessor(exporter=es_exporter)

# Register — this adds your processor alongside the default OpenAI processor
set_trace_processors([es_processor])
```

Now every trace produced by any `Runner.run()` call will flow through both the default OpenAI processor and your Elasticsearch processor.

## Building a BatchTraceProcessor for High Throughput

For high-volume systems, processing spans one at a time is inefficient. A batch processor collects spans and exports them in bulk:

```python
import threading
import time
from typing import Any

class BatchTraceProcessor(TracingProcessor):
    """Batches spans before exporting for higher throughput."""

    def __init__(
        self,
        exporter: ElasticsearchSpanExporter,
        max_batch_size: int = 200,
        flush_interval: float = 2.0,
        max_queue_size: int = 10000,
    ):
        self._exporter = exporter
        self._max_batch_size = max_batch_size
        self._flush_interval = flush_interval
        self._span_buffer: list[dict[str, Any]] = []
        self._lock = threading.Lock()
        self._shutdown = threading.Event()

        self._flush_thread = threading.Thread(
            target=self._auto_flush, daemon=True
        )
        self._flush_thread.start()

    def on_trace_start(self, trace: Trace) -> None:
        pass  # We handle data at the span level

    def on_trace_end(self, trace: Trace) -> None:
        self._maybe_flush()

    def on_span_start(self, span: Span) -> None:
        pass  # Buffer on end, not start

    def on_span_end(self, span: Span) -> None:
        with self._lock:
            self._span_buffer.append({
                "trace_id": span.trace_id,
                "span_id": span.span_id,
                "parent_id": span.parent_id,
                "span_type": span.span_type,
                "name": span.name,
                "duration_ms": getattr(span, "duration_ms", 0),
                "status": span.status,
                "attributes": span.attributes,
                "timestamp": time.time(),
            })
        self._maybe_flush()

    def _maybe_flush(self) -> None:
        with self._lock:
            if len(self._span_buffer) >= self._max_batch_size:
                batch = self._span_buffer[:]
                self._span_buffer.clear()
        if batch:
            self._export_batch(batch)

    def _export_batch(self, batch: list[dict]) -> None:
        for span_data in batch:
            self._exporter.export_trace({"spans": [span_data], "trace_id": span_data["trace_id"]})
        self._exporter.flush()

    def _auto_flush(self) -> None:
        while not self._shutdown.is_set():
            self._shutdown.wait(self._flush_interval)
            self._maybe_flush()

    def shutdown(self) -> None:
        self._shutdown.set()
        self._flush_thread.join(timeout=10)
        self._maybe_flush()

    def force_flush(self) -> None:
        self._maybe_flush()
```

## Creating Kibana Dashboards

With agent traces in Elasticsearch, you can build powerful Kibana dashboards:

```json
{
  "agent_performance_dashboard": {
    "panels": [
      {
        "title": "Average Agent Run Duration (ms)",
        "type": "metric",
        "query": {
          "aggs": {
            "avg_duration": {
              "avg": { "field": "duration_ms" }
            }
          }
        }
      },
      {
        "title": "Tool Call Latency Distribution",
        "type": "histogram",
        "query": {
          "bool": {
            "filter": [
              { "term": { "span_type": "tool_call" } }
            ]
          }
        }
      },
      {
        "title": "Handoff Frequency by Agent Pair",
        "type": "heatmap",
        "query": {
          "bool": {
            "filter": [
              { "term": { "span_type": "handoff" } }
            ]
          }
        }
      }
    ]
  }
}
```

## Practical Observability Patterns

Here are the patterns that matter most in production agent observability:

1. **Latency percentiles** — Track p50, p95, and p99 for each agent and tool call. A single slow tool can dominate user-perceived latency.
2. **Handoff heatmaps** — Visualize which agents hand off to which others. Unexpected handoff patterns reveal confusion in the triage logic.
3. **Token consumption by agent** — Track input and output tokens per agent to identify which agents are the most expensive.
4. **Error rate by span type** — Separate tool call errors from generation errors from guardrail blocks. Each requires a different remediation.
5. **Trace depth monitoring** — If a trace has more than 10-15 agent spans, the orchestration logic likely needs simplification.

Custom trace processors transform agent traces from a debugging tool into a full observability platform. Start with Elasticsearch for flexible querying, add Kibana dashboards for visualization, and set up alerting rules for latency and error rate thresholds. The investment pays off the first time you diagnose a production issue in minutes instead of hours.

---

Source: https://callsphere.ai/blog/building-custom-trace-processors-agent-observability
