Exporting Agent Traces to Third-Party Platforms
Learn how to use add_trace_processor() to export OpenAI Agents SDK traces to Langfuse, Weights and Biases, Arize, and custom observability platforms with production-ready exporter patterns.
Why Export Traces Beyond OpenAI
The OpenAI dashboard provides a solid trace viewer for development and initial debugging. But production observability demands more: correlating agent traces with application metrics in Datadog, analyzing LLM cost trends in Langfuse, running evaluation pipelines in Weights and Biases, or feeding traces into Arize for drift detection. Each platform brings specialized capabilities that the OpenAI dashboard was not designed to replicate.
The Agents SDK solves this with a clean abstraction: trace processors. A trace processor receives every completed trace and can forward it to any external system. You register processors at startup, and they run automatically without modifying your agent code.
The Trace Processor Interface
A trace processor is any object that implements the TracingProcessor protocol. The interface has three methods:
flowchart TD
START["Exporting Agent Traces to Third-Party Platforms"] --> A
A["Why Export Traces Beyond OpenAI"]
A --> B
B["The Trace Processor Interface"]
B --> C
C["Exporting to Langfuse"]
C --> D
D["Exporting to Weights and Biases"]
D --> E
E["Exporting to Arize for Drift Detection"]
E --> F
F["Building a Custom Exporter"]
F --> G
G["Registering Multiple Processors"]
G --> H
H["Production Considerations"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from agents.tracing import TracingProcessor, Trace, Span
class MyProcessor(TracingProcessor):
def on_trace_start(self, trace: Trace) -> None:
"""Called when a new trace begins."""
pass
def on_span_end(self, span: Span) -> None:
"""Called when any span within a trace completes."""
pass
def on_trace_end(self, trace: Trace) -> None:
"""Called when the entire trace completes."""
pass
async def shutdown(self) -> None:
"""Called during application shutdown for cleanup."""
pass
You register processors using add_trace_processor():
from agents import add_trace_processor
processor = MyProcessor()
add_trace_processor(processor)
Once registered, every trace generated by Runner.run() flows through your processor automatically. You can register multiple processors — traces are fanned out to all of them.
Exporting to Langfuse
Langfuse is purpose-built for LLM observability, offering cost tracking, prompt management, evaluation scoring, and detailed generation analytics. Here is a production-ready Langfuse exporter:
import os
from langfuse import Langfuse
from agents.tracing import TracingProcessor, Trace, Span
class LangfuseTraceProcessor(TracingProcessor):
def __init__(self):
self.client = Langfuse(
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)
self._traces = {}
def on_trace_start(self, trace: Trace) -> None:
langfuse_trace = self.client.trace(
id=trace.trace_id,
name=trace.name,
metadata=trace.metadata or {},
)
self._traces[trace.trace_id] = langfuse_trace
def on_span_end(self, span: Span) -> None:
langfuse_trace = self._traces.get(span.trace_id)
if not langfuse_trace:
return
if span.span_type == "generation":
langfuse_trace.generation(
name=span.name,
model=span.data.get("model", "unknown"),
input=span.data.get("input"),
output=span.data.get("output"),
usage={
"input_tokens": span.data.get("input_tokens", 0),
"output_tokens": span.data.get("output_tokens", 0),
},
start_time=span.start_time,
end_time=span.end_time,
)
elif span.span_type == "function":
langfuse_trace.span(
name=f"tool:{span.name}",
input=span.data.get("input"),
output=span.data.get("output"),
start_time=span.start_time,
end_time=span.end_time,
)
else:
langfuse_trace.span(
name=span.name,
metadata=span.data,
start_time=span.start_time,
end_time=span.end_time,
)
def on_trace_end(self, trace: Trace) -> None:
self._traces.pop(trace.trace_id, None)
self.client.flush()
async def shutdown(self) -> None:
self.client.flush()
self.client.shutdown()
Register it at application startup:
from agents import add_trace_processor
langfuse_processor = LangfuseTraceProcessor()
add_trace_processor(langfuse_processor)
Now every agent run automatically appears in your Langfuse dashboard with full generation details, token usage, and cost calculations.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Exporting to Weights and Biases
Weights and Biases excels at experiment tracking, making it ideal for comparing agent performance across prompt versions, model configurations, and tool sets:
import wandb
from agents.tracing import TracingProcessor, Trace, Span
class WandBTraceProcessor(TracingProcessor):
def __init__(self, project: str = "agent-traces"):
self.project = project
self._run = None
self._spans = []
def on_trace_start(self, trace: Trace) -> None:
self._run = wandb.init(
project=self.project,
name=trace.name,
config=trace.metadata or {},
reinit=True,
)
self._spans = []
def on_span_end(self, span: Span) -> None:
duration_ms = (span.end_time - span.start_time).total_seconds() * 1000
span_record = {
"span_name": span.name,
"span_type": span.span_type,
"duration_ms": duration_ms,
}
if span.span_type == "generation":
span_record["model"] = span.data.get("model")
span_record["input_tokens"] = span.data.get("input_tokens", 0)
span_record["output_tokens"] = span.data.get("output_tokens", 0)
span_record["total_tokens"] = (
span_record["input_tokens"] + span_record["output_tokens"]
)
self._spans.append(span_record)
def on_trace_end(self, trace: Trace) -> None:
if not self._run:
return
# Log summary metrics
total_duration = sum(s["duration_ms"] for s in self._spans)
total_tokens = sum(s.get("total_tokens", 0) for s in self._spans)
generation_count = sum(1 for s in self._spans if s["span_type"] == "generation")
tool_count = sum(1 for s in self._spans if s["span_type"] == "function")
wandb.log({
"total_duration_ms": total_duration,
"total_tokens": total_tokens,
"generation_count": generation_count,
"tool_call_count": tool_count,
})
# Log span table for detailed analysis
table = wandb.Table(
columns=["name", "type", "duration_ms", "tokens"],
data=[[s["span_name"], s["span_type"], s["duration_ms"],
s.get("total_tokens", 0)] for s in self._spans],
)
wandb.log({"spans": table})
self._run.finish()
async def shutdown(self) -> None:
if self._run:
self._run.finish()
Exporting to Arize for Drift Detection
Arize specializes in ML observability with embedding drift detection, which is particularly valuable for spotting when agent inputs shift away from your tested distribution:
import os
from arize.api import Client as ArizeClient
from arize.utils.types import ModelTypes, Environments
from agents.tracing import TracingProcessor, Trace, Span
class ArizeTraceProcessor(TracingProcessor):
def __init__(self):
self.client = ArizeClient(
api_key=os.environ["ARIZE_API_KEY"],
space_key=os.environ["ARIZE_SPACE_KEY"],
)
self.model_id = "agent-system"
self._generations = []
def on_trace_start(self, trace: Trace) -> None:
self._generations = []
def on_span_end(self, span: Span) -> None:
if span.span_type == "generation":
self._generations.append({
"trace_id": span.trace_id,
"input": str(span.data.get("input", "")),
"output": str(span.data.get("output", "")),
"model": span.data.get("model", "unknown"),
"tokens": span.data.get("output_tokens", 0),
})
def on_trace_end(self, trace: Trace) -> None:
for gen in self._generations:
self.client.log(
model_id=self.model_id,
model_version=gen["model"],
model_type=ModelTypes.GENERATIVE_LLM,
environment=Environments.PRODUCTION,
prediction_id=gen["trace_id"],
prediction_label=gen["output"][:500],
features={"input_text": gen["input"][:1000]},
tags=trace.metadata or {},
)
self._generations = []
async def shutdown(self) -> None:
pass
Building a Custom Exporter
If your organization uses an internal observability platform or a tool without an existing integration, building a custom exporter follows the same pattern. Here is an exporter that sends traces to any OpenTelemetry-compatible endpoint:
import httpx
from agents.tracing import TracingProcessor, Trace, Span
class OTelExporter(TracingProcessor):
def __init__(self, endpoint: str, service_name: str = "agent-service"):
self.endpoint = endpoint
self.service_name = service_name
self._client = httpx.AsyncClient()
self._spans_buffer = []
def on_span_end(self, span: Span) -> None:
otel_span = {
"traceId": span.trace_id,
"spanId": span.span_id,
"parentSpanId": span.parent_span_id,
"operationName": span.name,
"startTime": span.start_time.isoformat(),
"endTime": span.end_time.isoformat(),
"tags": {
"span.type": span.span_type,
"service.name": self.service_name,
},
}
if span.data:
for key, value in span.data.items():
otel_span["tags"][f"agent.{key}"] = str(value)[:256]
self._spans_buffer.append(otel_span)
def on_trace_end(self, trace: Trace) -> None:
if not self._spans_buffer:
return
# Fire and forget — use a background task in production
import asyncio
asyncio.create_task(self._flush())
async def _flush(self) -> None:
spans = self._spans_buffer.copy()
self._spans_buffer.clear()
try:
await self._client.post(
f"{self.endpoint}/v1/traces",
json={"resourceSpans": spans},
timeout=5.0,
)
except httpx.HTTPError:
pass # Log to fallback in production
async def shutdown(self) -> None:
await self._flush()
await self._client.aclose()
Registering Multiple Processors
You can run several exporters simultaneously. Traces are distributed to all registered processors:
from agents import add_trace_processor
add_trace_processor(LangfuseTraceProcessor())
add_trace_processor(WandBTraceProcessor(project="my-agent"))
add_trace_processor(OTelExporter(endpoint="https://otel.internal:4318"))
This fan-out design means you can use Langfuse for LLM-specific analytics, W&B for experiment comparison, and your internal OTel stack for infrastructure correlation — all from the same trace data.
Production Considerations
Buffer and batch — Network calls in
on_span_endadd latency to your agent runs. Buffer spans and flush in batches duringon_trace_endor on a timer.Handle failures gracefully — If an exporter fails, it should never crash the agent run. Wrap network calls in try/except and log failures to a fallback destination.
Respect backpressure — If your downstream system is slow, drop or sample traces rather than building up an unbounded buffer.
Use async where possible — Exporters that make HTTP calls should use async clients and fire-and-forget patterns to minimize impact on agent response latency.
Implement shutdown cleanly — The
shutdown()method is your opportunity to flush remaining buffers. Register it with your application's shutdown hooks to prevent data loss.
Trace export transforms the Agents SDK from a development tool into a production observability pillar that integrates seamlessly with your existing monitoring infrastructure.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.