---
title: "Exporting Agent Traces to Third-Party Platforms — Ai agent platform export traces to data warehouse"
description: "Ai agent platform export traces to data warehouse: learn how to use add_trace_processor() to export OpenAI Agents SDK traces to Langfuse, Weights and Biases, Arize, and custom observability platforms with production-ready exporter patterns."
canonical: https://callsphere.ai/blog/exporting-agent-traces-third-party-platforms
category: "Learn Agentic AI"
tags: ["OpenAI", "Trace Export", "Langfuse", "Observability"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-06-02T14:58:57.678Z
---

# Exporting Agent Traces to Third-Party Platforms — Ai agent platform export traces to data warehouse

> Ai agent platform export traces to data warehouse: learn how to use add_trace_processor() to export OpenAI Agents SDK traces to Langfuse, Weights and Biases, Arize, and custom observability platforms with production-ready exporter patterns.

## Why Export Traces Beyond OpenAI

The OpenAI dashboard provides a solid trace viewer for development and initial debugging. But production observability demands more: correlating agent traces with application metrics in Datadog, analyzing LLM cost trends in Langfuse, running evaluation pipelines in Weights and Biases, or feeding traces into Arize for drift detection. Each platform brings specialized capabilities that the OpenAI dashboard was not designed to replicate.

The Agents SDK solves this with a clean abstraction: trace processors. A trace processor receives every completed trace and can forward it to any external system. You register processors at startup, and they run automatically without modifying your agent code.

## The Trace Processor Interface

A trace processor is any object that implements the `TracingProcessor` protocol. The interface has three methods:

```mermaid
flowchart LR
    APP(["Agent or API"])
    SDK["OTel SDK
GenAI conventions"]
    COL["OTel Collector"]
    subgraph BACKENDS["Backends"]
        TR[("Traces
Tempo or Honeycomb")]
        MET[("Metrics
Prometheus")]
        LOG[("Logs
Loki or ELK")]
    end
    DASH["Grafana plus alerts"]
    PAGE(["Pager"])
    APP --> SDK --> COL
    COL --> TR
    COL --> MET
    COL --> LOG
    TR --> DASH
    MET --> DASH
    LOG --> DASH
    DASH --> PAGE
    style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
    style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
    style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
```

```python
from agents.tracing import TracingProcessor, Trace, Span

class MyProcessor(TracingProcessor):
    def on_trace_start(self, trace: Trace) -> None:
        """Called when a new trace begins."""
        pass

    def on_span_end(self, span: Span) -> None:
        """Called when any span within a trace completes."""
        pass

    def on_trace_end(self, trace: Trace) -> None:
        """Called when the entire trace completes."""
        pass

    async def shutdown(self) -> None:
        """Called during application shutdown for cleanup."""
        pass
```

You register processors using `add_trace_processor()`:

```python
from agents import add_trace_processor

processor = MyProcessor()
add_trace_processor(processor)
```

Once registered, every trace generated by `Runner.run()` flows through your processor automatically. You can register multiple processors — traces are fanned out to all of them.

## Exporting to Langfuse

Langfuse is purpose-built for LLM observability, offering cost tracking, prompt management, evaluation scoring, and detailed generation analytics. Here is a production-ready Langfuse exporter:

```python
import os
from langfuse import Langfuse
from agents.tracing import TracingProcessor, Trace, Span

class LangfuseTraceProcessor(TracingProcessor):
    def __init__(self):
        self.client = Langfuse(
            public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
            secret_key=os.environ["LANGFUSE_SECRET_KEY"],
            host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
        )
        self._traces = {}

    def on_trace_start(self, trace: Trace) -> None:
        langfuse_trace = self.client.trace(
            id=trace.trace_id,
            name=trace.name,
            metadata=trace.metadata or {},
        )
        self._traces[trace.trace_id] = langfuse_trace

    def on_span_end(self, span: Span) -> None:
        langfuse_trace = self._traces.get(span.trace_id)
        if not langfuse_trace:
            return

        if span.span_type == "generation":
            langfuse_trace.generation(
                name=span.name,
                model=span.data.get("model", "unknown"),
                input=span.data.get("input"),
                output=span.data.get("output"),
                usage={
                    "input_tokens": span.data.get("input_tokens", 0),
                    "output_tokens": span.data.get("output_tokens", 0),
                },
                start_time=span.start_time,
                end_time=span.end_time,
            )
        elif span.span_type == "function":
            langfuse_trace.span(
                name=f"tool:{span.name}",
                input=span.data.get("input"),
                output=span.data.get("output"),
                start_time=span.start_time,
                end_time=span.end_time,
            )
        else:
            langfuse_trace.span(
                name=span.name,
                metadata=span.data,
                start_time=span.start_time,
                end_time=span.end_time,
            )

    def on_trace_end(self, trace: Trace) -> None:
        self._traces.pop(trace.trace_id, None)
        self.client.flush()

    async def shutdown(self) -> None:
        self.client.flush()
        self.client.shutdown()
```

Register it at application startup:

```python
from agents import add_trace_processor

langfuse_processor = LangfuseTraceProcessor()
add_trace_processor(langfuse_processor)
```

Now every agent run automatically appears in your Langfuse dashboard with full generation details, token usage, and cost calculations.

## Exporting to Weights and Biases

Weights and Biases excels at experiment tracking, making it ideal for comparing agent performance across prompt versions, model configurations, and tool sets:

```python
import wandb
from agents.tracing import TracingProcessor, Trace, Span

class WandBTraceProcessor(TracingProcessor):
    def __init__(self, project: str = "agent-traces"):
        self.project = project
        self._run = None
        self._spans = []

    def on_trace_start(self, trace: Trace) -> None:
        self._run = wandb.init(
            project=self.project,
            name=trace.name,
            config=trace.metadata or {},
            reinit=True,
        )
        self._spans = []

    def on_span_end(self, span: Span) -> None:
        duration_ms = (span.end_time - span.start_time).total_seconds() * 1000
        span_record = {
            "span_name": span.name,
            "span_type": span.span_type,
            "duration_ms": duration_ms,
        }

        if span.span_type == "generation":
            span_record["model"] = span.data.get("model")
            span_record["input_tokens"] = span.data.get("input_tokens", 0)
            span_record["output_tokens"] = span.data.get("output_tokens", 0)
            span_record["total_tokens"] = (
                span_record["input_tokens"] + span_record["output_tokens"]
            )

        self._spans.append(span_record)

    def on_trace_end(self, trace: Trace) -> None:
        if not self._run:
            return

        # Log summary metrics
        total_duration = sum(s["duration_ms"] for s in self._spans)
        total_tokens = sum(s.get("total_tokens", 0) for s in self._spans)
        generation_count = sum(1 for s in self._spans if s["span_type"] == "generation")
        tool_count = sum(1 for s in self._spans if s["span_type"] == "function")

        wandb.log({
            "total_duration_ms": total_duration,
            "total_tokens": total_tokens,
            "generation_count": generation_count,
            "tool_call_count": tool_count,
        })

        # Log span table for detailed analysis
        table = wandb.Table(
            columns=["name", "type", "duration_ms", "tokens"],
            data=[[s["span_name"], s["span_type"], s["duration_ms"],
                   s.get("total_tokens", 0)] for s in self._spans],
        )
        wandb.log({"spans": table})
        self._run.finish()

    async def shutdown(self) -> None:
        if self._run:
            self._run.finish()
```

## Exporting to Arize for Drift Detection

Arize specializes in ML observability with embedding drift detection, which is particularly valuable for spotting when agent inputs shift away from your tested distribution:

```python
import os
from arize.api import Client as ArizeClient
from arize.utils.types import ModelTypes, Environments
from agents.tracing import TracingProcessor, Trace, Span

class ArizeTraceProcessor(TracingProcessor):
    def __init__(self):
        self.client = ArizeClient(
            api_key=os.environ["ARIZE_API_KEY"],
            space_key=os.environ["ARIZE_SPACE_KEY"],
        )
        self.model_id = "agent-system"
        self._generations = []

    def on_trace_start(self, trace: Trace) -> None:
        self._generations = []

    def on_span_end(self, span: Span) -> None:
        if span.span_type == "generation":
            self._generations.append({
                "trace_id": span.trace_id,
                "input": str(span.data.get("input", "")),
                "output": str(span.data.get("output", "")),
                "model": span.data.get("model", "unknown"),
                "tokens": span.data.get("output_tokens", 0),
            })

    def on_trace_end(self, trace: Trace) -> None:
        for gen in self._generations:
            self.client.log(
                model_id=self.model_id,
                model_version=gen["model"],
                model_type=ModelTypes.GENERATIVE_LLM,
                environment=Environments.PRODUCTION,
                prediction_id=gen["trace_id"],
                prediction_label=gen["output"][:500],
                features={"input_text": gen["input"][:1000]},
                tags=trace.metadata or {},
            )
        self._generations = []

    async def shutdown(self) -> None:
        pass
```

## Building a Custom Exporter

If your organization uses an internal observability platform or a tool without an existing integration, building a custom exporter follows the same pattern. Here is an exporter that sends traces to any OpenTelemetry-compatible endpoint:

```python
import httpx
from agents.tracing import TracingProcessor, Trace, Span

class OTelExporter(TracingProcessor):
    def __init__(self, endpoint: str, service_name: str = "agent-service"):
        self.endpoint = endpoint
        self.service_name = service_name
        self._client = httpx.AsyncClient()
        self._spans_buffer = []

    def on_span_end(self, span: Span) -> None:
        otel_span = {
            "traceId": span.trace_id,
            "spanId": span.span_id,
            "parentSpanId": span.parent_span_id,
            "operationName": span.name,
            "startTime": span.start_time.isoformat(),
            "endTime": span.end_time.isoformat(),
            "tags": {
                "span.type": span.span_type,
                "service.name": self.service_name,
            },
        }
        if span.data:
            for key, value in span.data.items():
                otel_span["tags"][f"agent.{key}"] = str(value)[:256]
        self._spans_buffer.append(otel_span)

    def on_trace_end(self, trace: Trace) -> None:
        if not self._spans_buffer:
            return
        # Fire and forget — use a background task in production
        import asyncio
        asyncio.create_task(self._flush())

    async def _flush(self) -> None:
        spans = self._spans_buffer.copy()
        self._spans_buffer.clear()
        try:
            await self._client.post(
                f"{self.endpoint}/v1/traces",
                json={"resourceSpans": spans},
                timeout=5.0,
            )
        except httpx.HTTPError:
            pass  # Log to fallback in production

    async def shutdown(self) -> None:
        await self._flush()
        await self._client.aclose()
```

## Registering Multiple Processors

You can run several exporters simultaneously. Traces are distributed to all registered processors:

```python
from agents import add_trace_processor

add_trace_processor(LangfuseTraceProcessor())
add_trace_processor(WandBTraceProcessor(project="my-agent"))
add_trace_processor(OTelExporter(endpoint="https://otel.internal:4318"))
```

This fan-out design means you can use Langfuse for LLM-specific analytics, W&B for experiment comparison, and your internal OTel stack for infrastructure correlation — all from the same trace data.

## Production Considerations

1. **Buffer and batch** — Network calls in `on_span_end` add latency to your agent runs. Buffer spans and flush in batches during `on_trace_end` or on a timer.
2. **Handle failures gracefully** — If an exporter fails, it should never crash the agent run. Wrap network calls in try/except and log failures to a fallback destination.
3. **Respect backpressure** — If your downstream system is slow, drop or sample traces rather than building up an unbounded buffer.
4. **Use async where possible** — Exporters that make HTTP calls should use async clients and fire-and-forget patterns to minimize impact on agent response latency.
5. **Implement shutdown cleanly** — The `shutdown()` method is your opportunity to flush remaining buffers. Register it with your application's shutdown hooks to prevent data loss.

Trace export transforms the Agents SDK from a development tool into a production observability pillar that integrates seamlessly with your existing monitoring infrastructure.

## Background and Key Concepts: Ai agent platform export traces to data warehouse

This guide is written for engineers and operators evaluating **ai agent platform export traces to data warehouse** in real production systems. Ai agent platform export traces to data warehouse sits alongside distributed tracing, real time, tool invocation in the daily work of teams shipping production AI. The notes below give a plain-language reference for terms used throughout the article.

- **distributed tracing** — referenced in this guide when discussing ai agent platform export traces to data warehouse.
- **real time** — referenced in this guide when discussing ai agent platform export traces to data warehouse.
- **tool invocation** — referenced in this guide when discussing ai agent platform export traces to data warehouse.

For teams that want to ship **ai agent platform export traces to data warehouse** in voice and chat agents this quarter, CallSphere runs **37 agents** and **90+ function tools** across **6 verticals** on a single dashboard. Start a [14-day trial](/trial), see live [demo agents](/demo), or compare tiers on [/pricing](/pricing).

---

Source: https://callsphere.ai/blog/exporting-agent-traces-third-party-platforms
