Skip to content
Learn Agentic AI
Learn Agentic AI14 min read9 views

Streaming Agent Responses: Real-Time Output with run_streamed()

Build real-time agent interfaces with Runner.run_streamed(). Learn about stream events, WebSocket transport, persistent connections, and building streaming chat UIs.

Why Streaming Matters for Agent Applications

When an agent needs to reason through multiple tool calls before responding, the total latency can reach 10-30 seconds. Without streaming, the user stares at a loading spinner the entire time. With streaming, they see progress immediately — partial text appears as it is generated, tool calls are visible as they execute, and the experience feels responsive.

The OpenAI Agents SDK provides first-class streaming support through Runner.run_streamed(), which returns events in real-time as the agent loop executes.

Basic Streaming Setup

Runner.run_streamed() returns a RunResultStreaming object immediately. You then iterate over its events:

flowchart TD
    START["Streaming Agent Responses: Real-Time Output with …"] --> A
    A["Why Streaming Matters for Agent Applica…"]
    A --> B
    B["Basic Streaming Setup"]
    B --> C
    C["Stream Event Types"]
    C --> D
    D["Building a Complete Streaming Handler"]
    D --> E
    E["WebSocket Transport"]
    E --> F
    F["Streaming with FastAPI"]
    F --> G
    G["Streaming with Multi-Agent Handoffs"]
    G --> H
    H["Performance Considerations"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from agents import Agent, Runner

agent = Agent(
    name="Explainer",
    instructions="Explain topics in detail with examples.",
)

async def main():
    result = Runner.run_streamed(agent, "Explain how TCP/IP works")

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                print(event.data.delta.text, end="", flush=True)

    print()  # Final newline

asyncio.run(main())

The text appears word-by-word (or chunk-by-chunk) as the model generates it, providing immediate feedback to the user.

Stream Event Types

The stream_events() iterator yields events with different types:

raw_response_event

These are the lowest-level events, corresponding to chunks from the model's streaming response. They contain text deltas, tool call deltas, and other raw data:

async for event in result.stream_events():
    if event.type == "raw_response_event":
        data = event.data
        # Text content delta
        if hasattr(data, 'delta') and hasattr(data.delta, 'text'):
            handle_text_chunk(data.delta.text)

run_item_stream_event

Higher-level events that represent complete items in the agent loop:

async for event in result.stream_events():
    if event.type == "run_item_stream_event":
        item = event.item

        if item.type == "tool_call_item":
            print(f"\n[Calling tool: {item.raw_item.name}]")

        elif item.type == "tool_call_output_item":
            print(f"\n[Tool returned: {item.output[:100]}]")

        elif item.type == "message_output_item":
            print(f"\n[Agent message]")

agent_updated_stream_event

Fired when the current agent changes during a handoff:

async for event in result.stream_events():
    if event.type == "agent_updated_stream_event":
        print(f"\n[Handed off to: {event.new_agent.name}]")

Building a Complete Streaming Handler

Here is a comprehensive streaming handler that processes all event types:

import asyncio
from agents import Agent, Runner, function_tool

@function_tool
def search_docs(query: str) -> str:
    """Search documentation for relevant articles.

    Args:
        query: Search query.
    """
    return f"Found 3 articles about '{query}': [Article 1, Article 2, Article 3]"

agent = Agent(
    name="Doc Assistant",
    instructions="Help users find information in documentation. Use the search tool when needed.",
    tools=[search_docs],
)

async def stream_agent_response(user_input: str):
    result = Runner.run_streamed(agent, user_input)

    current_text = ""

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                chunk = event.data.delta.text
                current_text += chunk
                print(chunk, end="", flush=True)

        elif event.type == "run_item_stream_event":
            item = event.item
            if item.type == "tool_call_item":
                print(f"\n  >> Searching: {item.raw_item.name}...", flush=True)
            elif item.type == "tool_call_output_item":
                print(f"  >> Results received", flush=True)

        elif event.type == "agent_updated_stream_event":
            print(f"\n  >> Transferring to {event.new_agent.name}...", flush=True)

    print()

    # Access the complete result after streaming
    final_output = result.final_output
    return final_output

asyncio.run(stream_agent_response("How do I configure authentication?"))

WebSocket Transport

By default, the SDK uses HTTP with Server-Sent Events (SSE) for streaming. For lower latency and bidirectional communication, you can switch to WebSocket transport:

flowchart TD
    ROOT["Streaming Agent Responses: Real-Time Output …"] 
    ROOT --> P0["Stream Event Types"]
    P0 --> P0C0["raw_response_event"]
    P0 --> P0C1["run_item_stream_event"]
    P0 --> P0C2["agent_updated_stream_event"]
    ROOT --> P1["WebSocket Transport"]
    P1 --> P1C0["Persistent WebSocket Sessions"]
    style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
    style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
from agents import set_default_openai_responses_transport

# Switch to WebSocket transport globally
set_default_openai_responses_transport("websocket")

WebSocket transport benefits:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

  • Lower latency: No HTTP overhead per message
  • Persistent connection: Reuses the same connection across multiple requests
  • Bidirectional: Foundation for real-time interactive agents

Persistent WebSocket Sessions

For applications that make many sequential LLM calls (like agents with multiple tool-calling turns), persistent WebSocket sessions avoid the connection setup overhead:

from agents import Runner
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def run_with_persistent_websocket():
    agent = Agent(
        name="Fast Agent",
        instructions="Respond quickly using tools.",
        tools=[tool_a, tool_b, tool_c],
    )

    # Use a persistent WebSocket session
    async with client.responses.websocket_session() as session:
        result = Runner.run_streamed(
            agent,
            "Process this complex request",
        )

        async for event in result.stream_events():
            if event.type == "raw_response_event":
                if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                    print(event.data.delta.text, end="", flush=True)

This is especially valuable when an agent makes 5-10 LLM calls in a single run (due to tool calls). Each subsequent call reuses the WebSocket connection instead of establishing a new HTTP connection.

Streaming with FastAPI

Here is how to integrate streaming into a FastAPI endpoint using Server-Sent Events:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json

app = FastAPI()

agent = Agent(
    name="Chat Agent",
    instructions="You are a helpful chat assistant.",
)

async def event_generator(user_input: str):
    result = Runner.run_streamed(agent, user_input)

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                chunk = event.data.delta.text
                yield f"data: {json.dumps({'type': 'text', 'content': chunk})}\n\n"

        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                yield f"data: {json.dumps({'type': 'tool_call', 'name': event.item.raw_item.name})}\n\n"

    yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: dict):
    return StreamingResponse(
        event_generator(request["message"]),
        media_type="text/event-stream",
    )

The frontend consumes this with the EventSource API or a fetch-based SSE reader:

// Frontend JavaScript
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message: userInput }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  const lines = text.split('\n');

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = JSON.parse(line.slice(6));
      if (data.type === 'text') {
        appendToChat(data.content);
      } else if (data.type === 'tool_call') {
        showToolIndicator(data.name);
      }
    }
  }
}

Streaming with Multi-Agent Handoffs

When agents hand off to each other, streaming shows the transition in real-time:

flowchart TD
    CENTER(("Core Concepts"))
    CENTER --> N0["Lower latency: No HTTP overhead per mes…"]
    CENTER --> N1["Persistent connection: Reuses the same …"]
    CENTER --> N2["Bidirectional: Foundation for real-time…"]
    CENTER --> N3["Show progress indicators during tool ca…"]
    style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
support_agent = Agent(
    name="Support Agent",
    instructions="Handle general questions.",
)

billing_agent = Agent(
    name="Billing Agent",
    instructions="Handle billing questions.",
)

triage_agent = Agent(
    name="Triage Agent",
    instructions="Route to the appropriate agent.",
    handoffs=[support_agent, billing_agent],
)

async def stream_with_handoffs(user_input: str):
    result = Runner.run_streamed(triage_agent, user_input)
    current_agent = triage_agent.name

    async for event in result.stream_events():
        if event.type == "agent_updated_stream_event":
            current_agent = event.new_agent.name
            print(f"\n--- Transferred to {current_agent} ---\n")

        elif event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                print(event.data.delta.text, end="", flush=True)

    print(f"\n\nFinal agent: {result.last_agent.name}")

Performance Considerations

  1. Use WebSocket transport for high-frequency applications. If your agent makes many LLM calls per request, the connection reuse significantly reduces latency.

  2. Buffer small chunks. In a web UI, updating the DOM for every single token can cause performance issues. Buffer chunks and update on a timer (every 50-100ms).

  3. Handle backpressure. If your event consumer is slower than the stream producer, events can queue up in memory. Monitor memory usage in high-throughput scenarios.

  4. Set timeouts on the stream. A stalled stream can hold connections open indefinitely. Implement a timeout that closes the stream if no events arrive within a reasonable window.

  5. Test with slow connections. Streaming UIs behave differently on 3G vs fiber. Test with network throttling to ensure a good experience across connection speeds.

Best Practices

  1. Always handle all event types. Even if you only display text, log tool calls and handoffs for debugging.

  2. Show progress indicators during tool calls. Users should know the agent is working, not stalled.

  3. Provide a fallback for non-streaming clients. Not all clients support SSE or WebSocket. Offer a non-streaming endpoint as well.

  4. Clean up resources. If the user disconnects mid-stream, ensure the streaming context is properly closed.


Source: OpenAI Agents SDK — Streaming

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like