Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026
How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production.
TL;DR
Streaming is no longer a "nice-to-have" UX polish on top of a chat box. For agent systems shipping in 2026, streaming is part of the contract between the agent and whatever consumes it — a browser tab, a voice realtime pipeline, or a downstream agent waiting for partial tool calls so it can dispatch sub-tasks early. Both the OpenAI Agents SDK (Runner.run_streamed / runner.stream) and LangChain / LangGraph (astream_events, graph.astream) expose token-level, tool-call-level, and node-level event streams. The hard part is not turning streaming on; it is consuming the events without dropping them, surfacing them through SSE or WebSockets without head-of-line blocking, and measuring the time-to-first-token (TTFT) that actually matters to the user. This post walks through both SDKs in Python and TypeScript with model snapshots pinned to gpt-4o-2024-11-20 and gpt-4.1-2025-04-14, then ends with the streaming numbers we run our voice agents against on CallSphere.
Why Streaming Is a Production Concern, Not a UX Concern
A non-streamed agent reply has exactly one observable timestamp: when the response object lands. A streamed agent reply has dozens — first token, first tool-call delta, first reasoning chunk, mid-stream finish-reason flips, end-of-tool-call markers, and the final done event. Every one of those is a place a downstream system can react earlier. Concrete examples we have shipped:
- A voice agent that begins TTS playback on the first 30 characters of streamed text — cutting perceived latency from ~1.4s to ~360ms.
- A browser-using agent that dispatches a "navigate" tool call as soon as the URL argument is fully streamed, before the rest of the tool-call payload finishes.
- A multi-agent supervisor that stops a sub-agent mid-stream when it detects a contradiction in the partial output, saving tokens on a doomed run.
None of those work on a non-streamed pipeline. The latency budget on real-time agent UX in 2026 (especially voice and chat surfaces) makes streaming structurally non-optional.
Event Types Across the Stream Lifecycle
flowchart LR
A[Run start] --> B[response.created]
B --> C[reasoning.delta x N]
C --> D[output_text.delta x N]
D --> E[tool_call.delta arg-by-arg]
E --> F[tool_call.completed]
F --> G[tool_result added to context]
G --> H[output_text.delta x N]
H --> I[response.completed]
I --> J[Agent handoff or done]
style B fill:#e0f2fe
style I fill:#dcfce7
style E fill:#fef3c7
Figure 1 — Event types over time for a streaming agent run with one tool call. Tokens stream both before and after the tool call; tool-call arguments themselves stream argument-by-argument as deltas, not as a single payload.
The two events most teams underuse are reasoning.delta (on o3-class and gpt-5-class models that expose intermediate reasoning) and tool_call.delta. Reasoning deltas are gold for evaluating thought quality on the fly. Tool-call deltas let you start work before the full call lands.
OpenAI Agents SDK — Python Streaming
The Agents SDK exposes Runner.run_streamed which returns a result object you iterate via stream_events(). Pin the model snapshot.
import asyncio
from agents import Agent, Runner, function_tool
@function_tool
async def lookup_inventory(sku: str) -> dict:
# pretend this hits Postgres
return {"sku": sku, "in_stock": 42}
agent = Agent(
name="inventory-bot",
model="gpt-4.1-2025-04-14",
instructions="Answer SKU questions. Use lookup_inventory for stock.",
tools=[lookup_inventory],
)
async def main():
first_token_at = None
started = asyncio.get_event_loop().time()
result = Runner.run_streamed(agent, input="How many of SKU-991 are in stock?")
async for event in result.stream_events():
if event.type == "raw_response_event":
data = event.data
if data.type == "response.output_text.delta":
if first_token_at is None:
first_token_at = asyncio.get_event_loop().time() - started
print(f"TTFT={first_token_at*1000:.0f}ms")
print(data.delta, end="", flush=True)
elif event.type == "run_item_stream_event":
item = event.item
if item.type == "tool_call_item":
print(f"\n[tool_call] {item.raw_item.name}")
elif item.type == "tool_call_output_item":
print(f"\n[tool_result] {item.output}")
elif event.type == "agent_updated_stream_event":
print(f"\n[handoff] -> {event.new_agent.name}")
print(f"\nfinal: {result.final_output}")
asyncio.run(main())
Three things to internalize:
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
raw_response_eventcarries the OpenAI-shaped events (response.output_text.delta,response.function_call_arguments.delta, etc.). This is your token stream.run_item_stream_eventcarries SDK-level semantic items — completed tool calls, tool results, message items. This is your agent-level stream.agent_updated_stream_eventfires on multi-agent handoffs. If your topology has handoffs, you must handle this or you will silently keep streaming under the wrong agent context.
OpenAI Agents SDK — TypeScript Streaming
The TS SDK mirrors Python with idiomatic async iterators.
import { Agent, run, tool } from "@openai/agents";
import { z } from "zod";
const lookupInventory = tool({
name: "lookup_inventory",
description: "Get on-hand stock for a SKU",
parameters: z.object({ sku: z.string() }),
execute: async ({ sku }) => ({ sku, in_stock: 42 }),
});
const agent = new Agent({
name: "inventory-bot",
model: "gpt-4.1-2025-04-14",
instructions: "Answer SKU questions. Use lookup_inventory for stock.",
tools: [lookupInventory],
});
const started = performance.now();
let firstTokenAt: number | null = null;
const stream = await run(agent, "How many of SKU-991 are in stock?", {
stream: true,
});
for await (const event of stream) {
if (event.type === "raw_model_stream_event") {
const e = event.data;
if (e.type === "output_text_delta") {
if (firstTokenAt === null) {
firstTokenAt = performance.now() - started;
console.log(`TTFT=${firstTokenAt.toFixed(0)}ms`);
}
process.stdout.write(e.delta);
}
} else if (event.type === "run_item_stream_event") {
if (event.item.type === "tool_call_item") {
console.log(`\n[tool_call] ${event.item.rawItem.name}`);
}
}
}
await stream.completed;
console.log(`\nfinal: ${stream.finalOutput}`);
The TS path is what we use behind our Next.js API routes — the agent stream is forwarded to the browser as Server-Sent Events.
LangChain / LangGraph — astream_events
LangChain has two streaming primitives. astream yields top-level chunks of the runnable; astream_events yields a typed event stream that also includes child runnables (tools, retrievers, sub-graphs). For agents you almost always want astream_events.
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
@tool
def lookup_inventory(sku: str) -> dict:
"""Get on-hand stock for a SKU."""
return {"sku": sku, "in_stock": 42}
llm = ChatOpenAI(model="gpt-4o-2024-11-20", temperature=0)
agent = create_react_agent(llm, [lookup_inventory])
async def stream_run(question: str):
async for ev in agent.astream_events(
{"messages": [("user", question)]},
version="v2",
):
kind = ev["event"]
name = ev.get("name", "")
if kind == "on_chat_model_stream":
chunk = ev["data"]["chunk"]
if chunk.content:
print(chunk.content, end="", flush=True)
elif kind == "on_tool_start":
print(f"\n[tool_start] {name} args={ev['data'].get('input')}")
elif kind == "on_tool_end":
print(f"\n[tool_end] {name} -> {ev['data'].get('output')}")
elif kind == "on_chain_end" and name == "LangGraph":
print("\n[graph done]")
If you are running a LangGraph state machine directly rather than the prebuilt ReAct agent, prefer graph.astream(..., stream_mode=["updates", "messages"]). updates gives you per-node state diffs; messages gives you token-level streams from every chat-model node, tagged with the node name. We use the dual-mode form on every supervisor graph because we need to know which sub-agent is producing each token for routing and metering.
Comparison Table — Streaming APIs
| Feature | OpenAI Agents SDK (Py/TS) | LangChain astream_events |
LangGraph graph.astream |
|---|---|---|---|
| Token deltas | response.output_text.delta |
on_chat_model_stream |
stream_mode="messages" |
| Tool-call argument deltas | response.function_call_arguments.delta |
on_chat_model_stream (within tool-call payload) |
Same as above |
| Tool start/end events | run_item_stream_event w/ tool_call_item |
on_tool_start, on_tool_end |
updates mode shows tool-node updates |
| Multi-agent handoff event | agent_updated_stream_event |
n/a (use sub-graph events) | Node-tagged messages |
| Reasoning deltas (o3 / gpt-5) | response.reasoning.delta |
on_chat_model_stream w/ reasoning_content |
Same |
| Backpressure | Async iterator (consumer paces) | Async iterator | Async iterator |
| Native trace integration | Agents SDK tracing | LangSmith | LangSmith |
| Best for | OpenAI-native, voice, single-vendor | Heterogeneous LLMs, RAG | Graph topologies, supervisors |
There is no universally "best" choice. We use Agents SDK on the realtime voice path (because of the Realtime integration) and LangGraph on the multi-step research and agentic RAG path.
Forwarding the Stream — SSE vs WebSockets
Two transports dominate.
Server-Sent Events (SSE) are HTTP/1.1-friendly, work through most corporate proxies, are unidirectional (server → client), and reconnect automatically via Last-Event-ID. SSE is the right default for a token-stream-only UX. The wire format is trivial:
event: token
data: {"delta": "Hel"}
event: token
data: {"delta": "lo"}
event: done
data: {"final": "Hello"}
WebSockets are full-duplex, which matters when the client needs to send mid-stream signals back — barge-in for voice, cancel buttons for long-running browser agents, partial user inputs while the agent is still talking. WebSockets are also what the OpenAI Realtime API uses on the wire, so if you are already in that pipeline you stay there.
A practical rule we follow: if the only message direction during a stream is server → client, use SSE; if the user can interrupt or steer mid-stream, use WebSockets.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Backpressure Is Real
Both SDKs use async iterators, which means the consumer paces the stream. If your consumer awaits a slow downstream — say, a TTS engine that processes ~80 tokens/s while OpenAI is shipping you ~120 tokens/s — the iterator will block, and the OpenAI client will buffer internally until the connection-level backpressure kicks in. We saw three classes of bug here:
- Silent buffering — tokens pile up in the SDK buffer, perceived latency on the client looks fine, but if the run is canceled mid-stream the in-flight tokens are still billed and still flow into traces. Fix: cancel the underlying response, not just stop iterating.
- Head-of-line blocking on slow tool execution — when a tool runs synchronously for 8 seconds, no further token deltas arrive even though the model is "done." Fix: make tools async and return placeholder structures for downstream streaming.
- Client-side reordering — SSE messages arriving over a flaky mobile network can interleave with cancel signals. Always include a monotonic sequence number per event and discard out-of-order on the client.
Measuring TTFT and Final-Token Latency
TTFT is "time from request submission to first output_text.delta event." Final-token latency is "time to last output_text.delta event before response.completed." The gap between them is the streaming window. For a 250-token answer at p50:
| Surface | TTFT p50 | TTFT p95 | Final-token p50 | Streaming window |
|---|---|---|---|---|
| Browser chat (gpt-4o-2024-11-20) | 410 ms | 880 ms | 3.8 s | 3.4 s |
| Browser chat (gpt-4.1-2025-04-14) | 360 ms | 760 ms | 3.1 s | 2.7 s |
| Voice agent (gpt-4o-realtime-preview-2025-06-03) | 280 ms | 540 ms | 2.6 s | 2.3 s |
| LangGraph supervisor (3 hops, gpt-4o) | 1.2 s | 2.4 s | 6.4 s | 5.2 s |
These are our production p-numbers, US-East. On the voice path, every 100ms of TTFT shaved is measurable in conversation rating. We log these as histograms and alert on p95 drift > 20% week-over-week.
Where This Lands at CallSphere
Our voice agents drive the most aggressive streaming requirements in our stack. The realtime voice path uses the OpenAI Realtime API for bidirectional audio streaming, and the higher-reasoning path uses the Agents SDK's Runner.run_streamed so that text begins shaping into TTS before the model has finished thinking. The chat agents are LangGraph-based and use graph.astream with dual-mode streams so the supervisor can route on partial answers. The eval pipeline (covered in the companion post on token-level eval) consumes those same event streams and emits stream-quality metrics in real time.
The mental model: streaming is not a transport detail. It is the agent's observability surface, its UX surface, and its tool-dispatch substrate, all in one. Build it deliberately.
Frequently Asked Questions
Should I use astream or astream_events on LangChain?
astream_events for agents, always. astream only sees the top-level runnable's outputs and misses tool calls, retrievals, and sub-graph events.
How do I stream tool-call arguments before the call is dispatched?
On the OpenAI Agents SDK, listen for response.function_call_arguments.delta events on the raw stream. You can begin parsing the partial JSON and react when a specific argument key (e.g. url) is fully present. The SDK will not invoke the tool until the call is complete; you can pre-warm work outside the SDK.
Can I stream from o3 or gpt-5 reasoning models?
Yes, with caveats. Reasoning deltas come through response.reasoning.delta events, but visible reasoning is throttled and may be summarized. Plan for noticeably longer TTFT on reasoning models — our gpt-5 TTFT p50 is ~1.8s vs 0.4s on gpt-4.1.
Do streamed runs cost differently from non-streamed?
No. Token billing is identical. The difference is wall-clock perception and the ability to cancel mid-flight, which is a real cost lever — we save ~6% on tokens monthly by canceling streams that the supervisor flags as off-track.
How do I trace a streamed run end-to-end?
LangSmith and the Agents SDK tracer both support streamed runs natively. The trace is finalized on response.completed, but partial traces are visible during the run for debugging. Pair this with the trace-to-fix workflow and you get the same observability story whether the run streamed or not.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.