---
title: "Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026"
description: "How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production."
canonical: https://callsphere.ai/blog/streaming-agent-responses-openai-langchain-2026
category: "Agentic AI"
tags: ["Streaming", "OpenAI Agents SDK", "LangChain", "LangGraph", "Agent Evaluation", "Production AI", "Latency"]
author: "CallSphere Team"
published: 2026-05-06T00:00:00.000Z
updated: 2026-05-06T07:06:01.714Z
---

# Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026

> How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production.

## TL;DR

Streaming is no longer a "nice-to-have" UX polish on top of a chat box. For agent systems shipping in 2026, streaming is part of the *contract* between the agent and whatever consumes it — a browser tab, a voice realtime pipeline, or a downstream agent waiting for partial tool calls so it can dispatch sub-tasks early. Both the **OpenAI Agents SDK** (`Runner.run_streamed` / `runner.stream`) and **LangChain / LangGraph** (`astream_events`, `graph.astream`) expose token-level, tool-call-level, and node-level event streams. The hard part is not turning streaming on; it is consuming the events without dropping them, surfacing them through SSE or WebSockets without head-of-line blocking, and measuring the time-to-first-token (TTFT) that actually matters to the user. This post walks through both SDKs in Python and TypeScript with model snapshots pinned to `gpt-4o-2024-11-20` and `gpt-4.1-2025-04-14`, then ends with the streaming numbers we run our voice agents against on [CallSphere](/products).

## Why Streaming Is a Production Concern, Not a UX Concern

A non-streamed agent reply has exactly one observable timestamp: when the response object lands. A streamed agent reply has dozens — first token, first tool-call delta, first reasoning chunk, mid-stream finish-reason flips, end-of-tool-call markers, and the final `done` event. Every one of those is a place a downstream system can react earlier. Concrete examples we have shipped:

- A voice agent that begins TTS playback on the first 30 characters of streamed text — cutting perceived latency from ~1.4s to ~360ms.
- A browser-using agent that dispatches a "navigate" tool call as soon as the URL argument is fully streamed, *before* the rest of the tool-call payload finishes.
- A multi-agent supervisor that stops a sub-agent mid-stream when it detects a contradiction in the partial output, saving tokens on a doomed run.

None of those work on a non-streamed pipeline. The latency budget on real-time agent UX in 2026 (especially [voice and chat surfaces](/products)) makes streaming structurally non-optional.

## Event Types Across the Stream Lifecycle

```mermaid
flowchart LR
  A[Run start] --> B[response.created]
  B --> C[reasoning.delta x N]
  C --> D[output_text.delta x N]
  D --> E[tool_call.delta arg-by-arg]
  E --> F[tool_call.completed]
  F --> G[tool_result added to context]
  G --> H[output_text.delta x N]
  H --> I[response.completed]
  I --> J[Agent handoff or done]
  style B fill:#e0f2fe
  style I fill:#dcfce7
  style E fill:#fef3c7
```

*Figure 1 — Event types over time for a streaming agent run with one tool call. Tokens stream both before and after the tool call; tool-call arguments themselves stream argument-by-argument as deltas, not as a single payload.*

The two events most teams underuse are `reasoning.delta` (on `o3`-class and `gpt-5`-class models that expose intermediate reasoning) and `tool_call.delta`. Reasoning deltas are gold for [evaluating thought quality](/blog/evaluating-reasoning-traces-agent-thought-quality) on the fly. Tool-call deltas let you start work before the full call lands.

## OpenAI Agents SDK — Python Streaming

The Agents SDK exposes `Runner.run_streamed` which returns a result object you iterate via `stream_events()`. Pin the model snapshot.

```python
import asyncio
from agents import Agent, Runner, function_tool

@function_tool
async def lookup_inventory(sku: str) -> dict:
    # pretend this hits Postgres
    return {"sku": sku, "in_stock": 42}

agent = Agent(
    name="inventory-bot",
    model="gpt-4.1-2025-04-14",
    instructions="Answer SKU questions. Use lookup_inventory for stock.",
    tools=[lookup_inventory],
)

async def main():
    first_token_at = None
    started = asyncio.get_event_loop().time()
    result = Runner.run_streamed(agent, input="How many of SKU-991 are in stock?")

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            data = event.data
            if data.type == "response.output_text.delta":
                if first_token_at is None:
                    first_token_at = asyncio.get_event_loop().time() - started
                    print(f"TTFT={first_token_at*1000:.0f}ms")
                print(data.delta, end="", flush=True)
        elif event.type == "run_item_stream_event":
            item = event.item
            if item.type == "tool_call_item":
                print(f"\n[tool_call] {item.raw_item.name}")
            elif item.type == "tool_call_output_item":
                print(f"\n[tool_result] {item.output}")
        elif event.type == "agent_updated_stream_event":
            print(f"\n[handoff] -> {event.new_agent.name}")

    print(f"\nfinal: {result.final_output}")

asyncio.run(main())
```

Three things to internalize:

1. `raw_response_event` carries the OpenAI-shaped events (`response.output_text.delta`, `response.function_call_arguments.delta`, etc.). This is your token stream.
2. `run_item_stream_event` carries SDK-level semantic items — completed tool calls, tool results, message items. This is your *agent-level* stream.
3. `agent_updated_stream_event` fires on [multi-agent handoffs](/blog/multi-agent-handoffs-openai-agents-sdk-pattern). If your topology has handoffs, you must handle this or you will silently keep streaming under the wrong agent context.

## OpenAI Agents SDK — TypeScript Streaming

The TS SDK mirrors Python with idiomatic async iterators.

```ts
import { Agent, run, tool } from "@openai/agents";
import { z } from "zod";

const lookupInventory = tool({
  name: "lookup_inventory",
  description: "Get on-hand stock for a SKU",
  parameters: z.object({ sku: z.string() }),
  execute: async ({ sku }) => ({ sku, in_stock: 42 }),
});

const agent = new Agent({
  name: "inventory-bot",
  model: "gpt-4.1-2025-04-14",
  instructions: "Answer SKU questions. Use lookup_inventory for stock.",
  tools: [lookupInventory],
});

const started = performance.now();
let firstTokenAt: number | null = null;

const stream = await run(agent, "How many of SKU-991 are in stock?", {
  stream: true,
});

for await (const event of stream) {
  if (event.type === "raw_model_stream_event") {
    const e = event.data;
    if (e.type === "output_text_delta") {
      if (firstTokenAt === null) {
        firstTokenAt = performance.now() - started;
        console.log(`TTFT=${firstTokenAt.toFixed(0)}ms`);
      }
      process.stdout.write(e.delta);
    }
  } else if (event.type === "run_item_stream_event") {
    if (event.item.type === "tool_call_item") {
      console.log(`\n[tool_call] ${event.item.rawItem.name}`);
    }
  }
}

await stream.completed;
console.log(`\nfinal: ${stream.finalOutput}`);
```

The TS path is what we use behind our [Next.js API routes](/products) — the agent stream is forwarded to the browser as Server-Sent Events.

## LangChain / LangGraph — `astream_events`

LangChain has two streaming primitives. `astream` yields top-level chunks of the runnable; `astream_events` yields a typed event stream that *also* includes child runnables (tools, retrievers, sub-graphs). For agents you almost always want `astream_events`.

```python
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool

@tool
def lookup_inventory(sku: str) -> dict:
    """Get on-hand stock for a SKU."""
    return {"sku": sku, "in_stock": 42}

llm = ChatOpenAI(model="gpt-4o-2024-11-20", temperature=0)
agent = create_react_agent(llm, [lookup_inventory])

async def stream_run(question: str):
    async for ev in agent.astream_events(
        {"messages": [("user", question)]},
        version="v2",
    ):
        kind = ev["event"]
        name = ev.get("name", "")
        if kind == "on_chat_model_stream":
            chunk = ev["data"]["chunk"]
            if chunk.content:
                print(chunk.content, end="", flush=True)
        elif kind == "on_tool_start":
            print(f"\n[tool_start] {name} args={ev['data'].get('input')}")
        elif kind == "on_tool_end":
            print(f"\n[tool_end] {name} -> {ev['data'].get('output')}")
        elif kind == "on_chain_end" and name == "LangGraph":
            print("\n[graph done]")
```

If you are running a [LangGraph state machine](/blog/langgraph-state-machine-architecture-deep-dive-2026) directly rather than the prebuilt ReAct agent, prefer `graph.astream(..., stream_mode=["updates", "messages"])`. `updates` gives you per-node state diffs; `messages` gives you token-level streams from every chat-model node, tagged with the node name. We use the dual-mode form on every supervisor graph because we need to know *which* sub-agent is producing each token for routing and metering.

## Comparison Table — Streaming APIs

| Feature | OpenAI Agents SDK (Py/TS) | LangChain `astream_events` | LangGraph `graph.astream` |
| --- | --- | --- | --- |
| Token deltas | `response.output_text.delta` | `on_chat_model_stream` | `stream_mode="messages"` |
| Tool-call argument deltas | `response.function_call_arguments.delta` | `on_chat_model_stream` (within tool-call payload) | Same as above |
| Tool start/end events | `run_item_stream_event` w/ `tool_call_item` | `on_tool_start`, `on_tool_end` | `updates` mode shows tool-node updates |
| Multi-agent handoff event | `agent_updated_stream_event` | n/a (use sub-graph events) | Node-tagged messages |
| Reasoning deltas (o3 / gpt-5) | `response.reasoning.delta` | `on_chat_model_stream` w/ `reasoning_content` | Same |
| Backpressure | Async iterator (consumer paces) | Async iterator | Async iterator |
| Native trace integration | Agents SDK tracing | LangSmith | LangSmith |
| Best for | OpenAI-native, voice, single-vendor | Heterogeneous LLMs, RAG | Graph topologies, supervisors |

There is no universally "best" choice. We use Agents SDK on the realtime voice path (because of the `Realtime` integration) and LangGraph on the multi-step research and [agentic RAG](/blog/agentic-rag-langgraph-iterative-retrieval-2026) path.

## Forwarding the Stream — SSE vs WebSockets

Two transports dominate.

**Server-Sent Events (SSE)** are HTTP/1.1-friendly, work through most corporate proxies, are unidirectional (server → client), and reconnect automatically via `Last-Event-ID`. SSE is the right default for a token-stream-only UX. The wire format is trivial:

```
event: token
data: {"delta": "Hel"}

event: token
data: {"delta": "lo"}

event: done
data: {"final": "Hello"}
```

**WebSockets** are full-duplex, which matters when the *client* needs to send mid-stream signals back — barge-in for voice, cancel buttons for long-running browser agents, partial user inputs while the agent is still talking. WebSockets are also what the OpenAI Realtime API uses on the wire, so if you are already in that pipeline you stay there.

A practical rule we follow: if the only message direction during a stream is server → client, use SSE; if the user can interrupt or steer mid-stream, use WebSockets.

## Backpressure Is Real

Both SDKs use async iterators, which means *the consumer paces the stream*. If your consumer awaits a slow downstream — say, a TTS engine that processes ~80 tokens/s while OpenAI is shipping you ~120 tokens/s — the iterator will block, and the OpenAI client will buffer internally until the connection-level backpressure kicks in. We saw three classes of bug here:

1. **Silent buffering** — tokens pile up in the SDK buffer, perceived latency on the client looks fine, but if the run is canceled mid-stream the in-flight tokens are still billed and still flow into traces. Fix: cancel the *underlying* response, not just stop iterating.
2. **Head-of-line blocking on slow tool execution** — when a tool runs synchronously for 8 seconds, no further token deltas arrive even though the model is "done." Fix: make tools async and return placeholder structures for downstream streaming.
3. **Client-side reordering** — SSE messages arriving over a flaky mobile network can interleave with cancel signals. Always include a monotonic sequence number per event and discard out-of-order on the client.

## Measuring TTFT and Final-Token Latency

TTFT is "time from request submission to first `output_text.delta` event." Final-token latency is "time to last `output_text.delta` event before `response.completed`." The gap between them is the *streaming window*. For a 250-token answer at p50:

| Surface | TTFT p50 | TTFT p95 | Final-token p50 | Streaming window |
| --- | --- | --- | --- | --- |
| Browser chat (gpt-4o-2024-11-20) | 410 ms | 880 ms | 3.8 s | 3.4 s |
| Browser chat (gpt-4.1-2025-04-14) | 360 ms | 760 ms | 3.1 s | 2.7 s |
| Voice agent (gpt-4o-realtime-preview-2025-06-03) | 280 ms | 540 ms | 2.6 s | 2.3 s |
| LangGraph supervisor (3 hops, gpt-4o) | 1.2 s | 2.4 s | 6.4 s | 5.2 s |

These are our production p-numbers, US-East. On the voice path, every 100ms of TTFT shaved is measurable in conversation rating. We log these as histograms and alert on p95 drift > 20% week-over-week.

## Where This Lands at CallSphere

Our [voice agents](/products) drive the most aggressive streaming requirements in our stack. The realtime voice path uses the OpenAI Realtime API for bidirectional audio streaming, and the higher-reasoning path uses the Agents SDK's `Runner.run_streamed` so that text begins shaping into TTS before the model has finished thinking. The chat agents are LangGraph-based and use `graph.astream` with dual-mode streams so the supervisor can route on partial answers. The eval pipeline (covered in the [companion post on token-level eval](/blog/token-level-eval-streaming-agent-quality)) consumes those same event streams and emits stream-quality metrics in real time.

The mental model: streaming is not a transport detail. It is the agent's observability surface, its UX surface, and its tool-dispatch substrate, all in one. Build it deliberately.

## Frequently Asked Questions

### Should I use `astream` or `astream_events` on LangChain?

`astream_events` for agents, always. `astream` only sees the top-level runnable's outputs and misses tool calls, retrievals, and sub-graph events.

### How do I stream tool-call arguments before the call is dispatched?

On the OpenAI Agents SDK, listen for `response.function_call_arguments.delta` events on the raw stream. You can begin parsing the partial JSON and react when a specific argument key (e.g. `url`) is fully present. The SDK will not invoke the tool until the call is complete; you can pre-warm work outside the SDK.

### Can I stream from `o3` or `gpt-5` reasoning models?

Yes, with caveats. Reasoning deltas come through `response.reasoning.delta` events, but visible reasoning is throttled and may be summarized. Plan for noticeably longer TTFT on reasoning models — our gpt-5 TTFT p50 is ~1.8s vs 0.4s on gpt-4.1.

### Do streamed runs cost differently from non-streamed?

No. Token billing is identical. The difference is wall-clock perception and the ability to cancel mid-flight, which is a real cost lever — we save ~6% on tokens monthly by canceling streams that the supervisor flags as off-track.

### How do I trace a streamed run end-to-end?

LangSmith and the Agents SDK tracer both support streamed runs natively. The trace is finalized on `response.completed`, but partial traces are visible during the run for debugging. Pair this with the [trace-to-fix workflow](/blog/trace-to-production-fix-agent-observability-workflow) and you get the same observability story whether the run streamed or not.

---

Source: https://callsphere.ai/blog/streaming-agent-responses-openai-langchain-2026
