Skip to content
Agentic AI
Agentic AI13 min read0 views

Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026

How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production.

TL;DR

Streaming is no longer a "nice-to-have" UX polish on top of a chat box. For agent systems shipping in 2026, streaming is part of the contract between the agent and whatever consumes it — a browser tab, a voice realtime pipeline, or a downstream agent waiting for partial tool calls so it can dispatch sub-tasks early. Both the OpenAI Agents SDK (Runner.run_streamed / runner.stream) and LangChain / LangGraph (astream_events, graph.astream) expose token-level, tool-call-level, and node-level event streams. The hard part is not turning streaming on; it is consuming the events without dropping them, surfacing them through SSE or WebSockets without head-of-line blocking, and measuring the time-to-first-token (TTFT) that actually matters to the user. This post walks through both SDKs in Python and TypeScript with model snapshots pinned to gpt-4o-2024-11-20 and gpt-4.1-2025-04-14, then ends with the streaming numbers we run our voice agents against on CallSphere.

Why Streaming Is a Production Concern, Not a UX Concern

A non-streamed agent reply has exactly one observable timestamp: when the response object lands. A streamed agent reply has dozens — first token, first tool-call delta, first reasoning chunk, mid-stream finish-reason flips, end-of-tool-call markers, and the final done event. Every one of those is a place a downstream system can react earlier. Concrete examples we have shipped:

  • A voice agent that begins TTS playback on the first 30 characters of streamed text — cutting perceived latency from ~1.4s to ~360ms.
  • A browser-using agent that dispatches a "navigate" tool call as soon as the URL argument is fully streamed, before the rest of the tool-call payload finishes.
  • A multi-agent supervisor that stops a sub-agent mid-stream when it detects a contradiction in the partial output, saving tokens on a doomed run.

None of those work on a non-streamed pipeline. The latency budget on real-time agent UX in 2026 (especially voice and chat surfaces) makes streaming structurally non-optional.

Event Types Across the Stream Lifecycle

flowchart LR
  A[Run start] --> B[response.created]
  B --> C[reasoning.delta x N]
  C --> D[output_text.delta x N]
  D --> E[tool_call.delta arg-by-arg]
  E --> F[tool_call.completed]
  F --> G[tool_result added to context]
  G --> H[output_text.delta x N]
  H --> I[response.completed]
  I --> J[Agent handoff or done]
  style B fill:#e0f2fe
  style I fill:#dcfce7
  style E fill:#fef3c7

Figure 1 — Event types over time for a streaming agent run with one tool call. Tokens stream both before and after the tool call; tool-call arguments themselves stream argument-by-argument as deltas, not as a single payload.

The two events most teams underuse are reasoning.delta (on o3-class and gpt-5-class models that expose intermediate reasoning) and tool_call.delta. Reasoning deltas are gold for evaluating thought quality on the fly. Tool-call deltas let you start work before the full call lands.

OpenAI Agents SDK — Python Streaming

The Agents SDK exposes Runner.run_streamed which returns a result object you iterate via stream_events(). Pin the model snapshot.

import asyncio
from agents import Agent, Runner, function_tool

@function_tool
async def lookup_inventory(sku: str) -> dict:
    # pretend this hits Postgres
    return {"sku": sku, "in_stock": 42}

agent = Agent(
    name="inventory-bot",
    model="gpt-4.1-2025-04-14",
    instructions="Answer SKU questions. Use lookup_inventory for stock.",
    tools=[lookup_inventory],
)

async def main():
    first_token_at = None
    started = asyncio.get_event_loop().time()
    result = Runner.run_streamed(agent, input="How many of SKU-991 are in stock?")

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            data = event.data
            if data.type == "response.output_text.delta":
                if first_token_at is None:
                    first_token_at = asyncio.get_event_loop().time() - started
                    print(f"TTFT={first_token_at*1000:.0f}ms")
                print(data.delta, end="", flush=True)
        elif event.type == "run_item_stream_event":
            item = event.item
            if item.type == "tool_call_item":
                print(f"\n[tool_call] {item.raw_item.name}")
            elif item.type == "tool_call_output_item":
                print(f"\n[tool_result] {item.output}")
        elif event.type == "agent_updated_stream_event":
            print(f"\n[handoff] -> {event.new_agent.name}")

    print(f"\nfinal: {result.final_output}")

asyncio.run(main())

Three things to internalize:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
  1. raw_response_event carries the OpenAI-shaped events (response.output_text.delta, response.function_call_arguments.delta, etc.). This is your token stream.
  2. run_item_stream_event carries SDK-level semantic items — completed tool calls, tool results, message items. This is your agent-level stream.
  3. agent_updated_stream_event fires on multi-agent handoffs. If your topology has handoffs, you must handle this or you will silently keep streaming under the wrong agent context.

OpenAI Agents SDK — TypeScript Streaming

The TS SDK mirrors Python with idiomatic async iterators.

import { Agent, run, tool } from "@openai/agents";
import { z } from "zod";

const lookupInventory = tool({
  name: "lookup_inventory",
  description: "Get on-hand stock for a SKU",
  parameters: z.object({ sku: z.string() }),
  execute: async ({ sku }) => ({ sku, in_stock: 42 }),
});

const agent = new Agent({
  name: "inventory-bot",
  model: "gpt-4.1-2025-04-14",
  instructions: "Answer SKU questions. Use lookup_inventory for stock.",
  tools: [lookupInventory],
});

const started = performance.now();
let firstTokenAt: number | null = null;

const stream = await run(agent, "How many of SKU-991 are in stock?", {
  stream: true,
});

for await (const event of stream) {
  if (event.type === "raw_model_stream_event") {
    const e = event.data;
    if (e.type === "output_text_delta") {
      if (firstTokenAt === null) {
        firstTokenAt = performance.now() - started;
        console.log(`TTFT=${firstTokenAt.toFixed(0)}ms`);
      }
      process.stdout.write(e.delta);
    }
  } else if (event.type === "run_item_stream_event") {
    if (event.item.type === "tool_call_item") {
      console.log(`\n[tool_call] ${event.item.rawItem.name}`);
    }
  }
}

await stream.completed;
console.log(`\nfinal: ${stream.finalOutput}`);

The TS path is what we use behind our Next.js API routes — the agent stream is forwarded to the browser as Server-Sent Events.

LangChain / LangGraph — astream_events

LangChain has two streaming primitives. astream yields top-level chunks of the runnable; astream_events yields a typed event stream that also includes child runnables (tools, retrievers, sub-graphs). For agents you almost always want astream_events.

from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool

@tool
def lookup_inventory(sku: str) -> dict:
    """Get on-hand stock for a SKU."""
    return {"sku": sku, "in_stock": 42}

llm = ChatOpenAI(model="gpt-4o-2024-11-20", temperature=0)
agent = create_react_agent(llm, [lookup_inventory])

async def stream_run(question: str):
    async for ev in agent.astream_events(
        {"messages": [("user", question)]},
        version="v2",
    ):
        kind = ev["event"]
        name = ev.get("name", "")
        if kind == "on_chat_model_stream":
            chunk = ev["data"]["chunk"]
            if chunk.content:
                print(chunk.content, end="", flush=True)
        elif kind == "on_tool_start":
            print(f"\n[tool_start] {name} args={ev['data'].get('input')}")
        elif kind == "on_tool_end":
            print(f"\n[tool_end] {name} -> {ev['data'].get('output')}")
        elif kind == "on_chain_end" and name == "LangGraph":
            print("\n[graph done]")

If you are running a LangGraph state machine directly rather than the prebuilt ReAct agent, prefer graph.astream(..., stream_mode=["updates", "messages"]). updates gives you per-node state diffs; messages gives you token-level streams from every chat-model node, tagged with the node name. We use the dual-mode form on every supervisor graph because we need to know which sub-agent is producing each token for routing and metering.

Comparison Table — Streaming APIs

Feature OpenAI Agents SDK (Py/TS) LangChain astream_events LangGraph graph.astream
Token deltas response.output_text.delta on_chat_model_stream stream_mode="messages"
Tool-call argument deltas response.function_call_arguments.delta on_chat_model_stream (within tool-call payload) Same as above
Tool start/end events run_item_stream_event w/ tool_call_item on_tool_start, on_tool_end updates mode shows tool-node updates
Multi-agent handoff event agent_updated_stream_event n/a (use sub-graph events) Node-tagged messages
Reasoning deltas (o3 / gpt-5) response.reasoning.delta on_chat_model_stream w/ reasoning_content Same
Backpressure Async iterator (consumer paces) Async iterator Async iterator
Native trace integration Agents SDK tracing LangSmith LangSmith
Best for OpenAI-native, voice, single-vendor Heterogeneous LLMs, RAG Graph topologies, supervisors

There is no universally "best" choice. We use Agents SDK on the realtime voice path (because of the Realtime integration) and LangGraph on the multi-step research and agentic RAG path.

Forwarding the Stream — SSE vs WebSockets

Two transports dominate.

Server-Sent Events (SSE) are HTTP/1.1-friendly, work through most corporate proxies, are unidirectional (server → client), and reconnect automatically via Last-Event-ID. SSE is the right default for a token-stream-only UX. The wire format is trivial:

event: token
data: {"delta": "Hel"}

event: token
data: {"delta": "lo"}

event: done
data: {"final": "Hello"}

WebSockets are full-duplex, which matters when the client needs to send mid-stream signals back — barge-in for voice, cancel buttons for long-running browser agents, partial user inputs while the agent is still talking. WebSockets are also what the OpenAI Realtime API uses on the wire, so if you are already in that pipeline you stay there.

A practical rule we follow: if the only message direction during a stream is server → client, use SSE; if the user can interrupt or steer mid-stream, use WebSockets.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Backpressure Is Real

Both SDKs use async iterators, which means the consumer paces the stream. If your consumer awaits a slow downstream — say, a TTS engine that processes ~80 tokens/s while OpenAI is shipping you ~120 tokens/s — the iterator will block, and the OpenAI client will buffer internally until the connection-level backpressure kicks in. We saw three classes of bug here:

  1. Silent buffering — tokens pile up in the SDK buffer, perceived latency on the client looks fine, but if the run is canceled mid-stream the in-flight tokens are still billed and still flow into traces. Fix: cancel the underlying response, not just stop iterating.
  2. Head-of-line blocking on slow tool execution — when a tool runs synchronously for 8 seconds, no further token deltas arrive even though the model is "done." Fix: make tools async and return placeholder structures for downstream streaming.
  3. Client-side reordering — SSE messages arriving over a flaky mobile network can interleave with cancel signals. Always include a monotonic sequence number per event and discard out-of-order on the client.

Measuring TTFT and Final-Token Latency

TTFT is "time from request submission to first output_text.delta event." Final-token latency is "time to last output_text.delta event before response.completed." The gap between them is the streaming window. For a 250-token answer at p50:

Surface TTFT p50 TTFT p95 Final-token p50 Streaming window
Browser chat (gpt-4o-2024-11-20) 410 ms 880 ms 3.8 s 3.4 s
Browser chat (gpt-4.1-2025-04-14) 360 ms 760 ms 3.1 s 2.7 s
Voice agent (gpt-4o-realtime-preview-2025-06-03) 280 ms 540 ms 2.6 s 2.3 s
LangGraph supervisor (3 hops, gpt-4o) 1.2 s 2.4 s 6.4 s 5.2 s

These are our production p-numbers, US-East. On the voice path, every 100ms of TTFT shaved is measurable in conversation rating. We log these as histograms and alert on p95 drift > 20% week-over-week.

Where This Lands at CallSphere

Our voice agents drive the most aggressive streaming requirements in our stack. The realtime voice path uses the OpenAI Realtime API for bidirectional audio streaming, and the higher-reasoning path uses the Agents SDK's Runner.run_streamed so that text begins shaping into TTS before the model has finished thinking. The chat agents are LangGraph-based and use graph.astream with dual-mode streams so the supervisor can route on partial answers. The eval pipeline (covered in the companion post on token-level eval) consumes those same event streams and emits stream-quality metrics in real time.

The mental model: streaming is not a transport detail. It is the agent's observability surface, its UX surface, and its tool-dispatch substrate, all in one. Build it deliberately.

Frequently Asked Questions

Should I use astream or astream_events on LangChain?

astream_events for agents, always. astream only sees the top-level runnable's outputs and misses tool calls, retrievals, and sub-graph events.

How do I stream tool-call arguments before the call is dispatched?

On the OpenAI Agents SDK, listen for response.function_call_arguments.delta events on the raw stream. You can begin parsing the partial JSON and react when a specific argument key (e.g. url) is fully present. The SDK will not invoke the tool until the call is complete; you can pre-warm work outside the SDK.

Can I stream from o3 or gpt-5 reasoning models?

Yes, with caveats. Reasoning deltas come through response.reasoning.delta events, but visible reasoning is throttled and may be summarized. Plan for noticeably longer TTFT on reasoning models — our gpt-5 TTFT p50 is ~1.8s vs 0.4s on gpt-4.1.

Do streamed runs cost differently from non-streamed?

No. Token billing is identical. The difference is wall-clock perception and the ability to cancel mid-flight, which is a real cost lever — we save ~6% on tokens monthly by canceling streams that the supervisor flags as off-track.

How do I trace a streamed run end-to-end?

LangSmith and the Agents SDK tracer both support streamed runs natively. The trace is finalized on response.completed, but partial traces are visible during the run for debugging. Pair this with the trace-to-fix workflow and you get the same observability story whether the run streamed or not.

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.