Streaming Agent Responses: Real-Time Output with run_streamed()
Build real-time agent interfaces with Runner.run_streamed(). Learn about stream events, WebSocket transport, persistent connections, and building streaming chat UIs.
Why Streaming Matters for Agent Applications
When an agent needs to reason through multiple tool calls before responding, the total latency can reach 10-30 seconds. Without streaming, the user stares at a loading spinner the entire time. With streaming, they see progress immediately — partial text appears as it is generated, tool calls are visible as they execute, and the experience feels responsive.
The OpenAI Agents SDK provides first-class streaming support through Runner.run_streamed(), which returns events in real-time as the agent loop executes.
Basic Streaming Setup
Runner.run_streamed() returns a RunResultStreaming object immediately. You then iterate over its events:
flowchart TD
START["Streaming Agent Responses: Real-Time Output with …"] --> A
A["Why Streaming Matters for Agent Applica…"]
A --> B
B["Basic Streaming Setup"]
B --> C
C["Stream Event Types"]
C --> D
D["Building a Complete Streaming Handler"]
D --> E
E["WebSocket Transport"]
E --> F
F["Streaming with FastAPI"]
F --> G
G["Streaming with Multi-Agent Handoffs"]
G --> H
H["Performance Considerations"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Explainer",
instructions="Explain topics in detail with examples.",
)
async def main():
result = Runner.run_streamed(agent, "Explain how TCP/IP works")
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print() # Final newline
asyncio.run(main())
The text appears word-by-word (or chunk-by-chunk) as the model generates it, providing immediate feedback to the user.
Stream Event Types
The stream_events() iterator yields events with different types:
raw_response_event
These are the lowest-level events, corresponding to chunks from the model's streaming response. They contain text deltas, tool call deltas, and other raw data:
async for event in result.stream_events():
if event.type == "raw_response_event":
data = event.data
# Text content delta
if hasattr(data, 'delta') and hasattr(data.delta, 'text'):
handle_text_chunk(data.delta.text)
run_item_stream_event
Higher-level events that represent complete items in the agent loop:
async for event in result.stream_events():
if event.type == "run_item_stream_event":
item = event.item
if item.type == "tool_call_item":
print(f"\n[Calling tool: {item.raw_item.name}]")
elif item.type == "tool_call_output_item":
print(f"\n[Tool returned: {item.output[:100]}]")
elif item.type == "message_output_item":
print(f"\n[Agent message]")
agent_updated_stream_event
Fired when the current agent changes during a handoff:
async for event in result.stream_events():
if event.type == "agent_updated_stream_event":
print(f"\n[Handed off to: {event.new_agent.name}]")
Building a Complete Streaming Handler
Here is a comprehensive streaming handler that processes all event types:
import asyncio
from agents import Agent, Runner, function_tool
@function_tool
def search_docs(query: str) -> str:
"""Search documentation for relevant articles.
Args:
query: Search query.
"""
return f"Found 3 articles about '{query}': [Article 1, Article 2, Article 3]"
agent = Agent(
name="Doc Assistant",
instructions="Help users find information in documentation. Use the search tool when needed.",
tools=[search_docs],
)
async def stream_agent_response(user_input: str):
result = Runner.run_streamed(agent, user_input)
current_text = ""
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
chunk = event.data.delta.text
current_text += chunk
print(chunk, end="", flush=True)
elif event.type == "run_item_stream_event":
item = event.item
if item.type == "tool_call_item":
print(f"\n >> Searching: {item.raw_item.name}...", flush=True)
elif item.type == "tool_call_output_item":
print(f" >> Results received", flush=True)
elif event.type == "agent_updated_stream_event":
print(f"\n >> Transferring to {event.new_agent.name}...", flush=True)
print()
# Access the complete result after streaming
final_output = result.final_output
return final_output
asyncio.run(stream_agent_response("How do I configure authentication?"))
WebSocket Transport
By default, the SDK uses HTTP with Server-Sent Events (SSE) for streaming. For lower latency and bidirectional communication, you can switch to WebSocket transport:
flowchart TD
ROOT["Streaming Agent Responses: Real-Time Output …"]
ROOT --> P0["Stream Event Types"]
P0 --> P0C0["raw_response_event"]
P0 --> P0C1["run_item_stream_event"]
P0 --> P0C2["agent_updated_stream_event"]
ROOT --> P1["WebSocket Transport"]
P1 --> P1C0["Persistent WebSocket Sessions"]
style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
from agents import set_default_openai_responses_transport
# Switch to WebSocket transport globally
set_default_openai_responses_transport("websocket")
WebSocket transport benefits:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
- Lower latency: No HTTP overhead per message
- Persistent connection: Reuses the same connection across multiple requests
- Bidirectional: Foundation for real-time interactive agents
Persistent WebSocket Sessions
For applications that make many sequential LLM calls (like agents with multiple tool-calling turns), persistent WebSocket sessions avoid the connection setup overhead:
from agents import Runner
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def run_with_persistent_websocket():
agent = Agent(
name="Fast Agent",
instructions="Respond quickly using tools.",
tools=[tool_a, tool_b, tool_c],
)
# Use a persistent WebSocket session
async with client.responses.websocket_session() as session:
result = Runner.run_streamed(
agent,
"Process this complex request",
)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
This is especially valuable when an agent makes 5-10 LLM calls in a single run (due to tool calls). Each subsequent call reuses the WebSocket connection instead of establishing a new HTTP connection.
Streaming with FastAPI
Here is how to integrate streaming into a FastAPI endpoint using Server-Sent Events:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json
app = FastAPI()
agent = Agent(
name="Chat Agent",
instructions="You are a helpful chat assistant.",
)
async def event_generator(user_input: str):
result = Runner.run_streamed(agent, user_input)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
chunk = event.data.delta.text
yield f"data: {json.dumps({'type': 'text', 'content': chunk})}\n\n"
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
yield f"data: {json.dumps({'type': 'tool_call', 'name': event.item.raw_item.name})}\n\n"
yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: dict):
return StreamingResponse(
event_generator(request["message"]),
media_type="text/event-stream",
)
The frontend consumes this with the EventSource API or a fetch-based SSE reader:
// Frontend JavaScript
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: userInput }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.type === 'text') {
appendToChat(data.content);
} else if (data.type === 'tool_call') {
showToolIndicator(data.name);
}
}
}
}
Streaming with Multi-Agent Handoffs
When agents hand off to each other, streaming shows the transition in real-time:
flowchart TD
CENTER(("Core Concepts"))
CENTER --> N0["Lower latency: No HTTP overhead per mes…"]
CENTER --> N1["Persistent connection: Reuses the same …"]
CENTER --> N2["Bidirectional: Foundation for real-time…"]
CENTER --> N3["Show progress indicators during tool ca…"]
style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
support_agent = Agent(
name="Support Agent",
instructions="Handle general questions.",
)
billing_agent = Agent(
name="Billing Agent",
instructions="Handle billing questions.",
)
triage_agent = Agent(
name="Triage Agent",
instructions="Route to the appropriate agent.",
handoffs=[support_agent, billing_agent],
)
async def stream_with_handoffs(user_input: str):
result = Runner.run_streamed(triage_agent, user_input)
current_agent = triage_agent.name
async for event in result.stream_events():
if event.type == "agent_updated_stream_event":
current_agent = event.new_agent.name
print(f"\n--- Transferred to {current_agent} ---\n")
elif event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print(f"\n\nFinal agent: {result.last_agent.name}")
Performance Considerations
Use WebSocket transport for high-frequency applications. If your agent makes many LLM calls per request, the connection reuse significantly reduces latency.
Buffer small chunks. In a web UI, updating the DOM for every single token can cause performance issues. Buffer chunks and update on a timer (every 50-100ms).
Handle backpressure. If your event consumer is slower than the stream producer, events can queue up in memory. Monitor memory usage in high-throughput scenarios.
Set timeouts on the stream. A stalled stream can hold connections open indefinitely. Implement a timeout that closes the stream if no events arrive within a reasonable window.
Test with slow connections. Streaming UIs behave differently on 3G vs fiber. Test with network throttling to ensure a good experience across connection speeds.
Best Practices
Always handle all event types. Even if you only display text, log tool calls and handoffs for debugging.
Show progress indicators during tool calls. Users should know the agent is working, not stalled.
Provide a fallback for non-streaming clients. Not all clients support SSE or WebSocket. Offer a non-streaming endpoint as well.
Clean up resources. If the user disconnects mid-stream, ensure the streaming context is properly closed.
Source: OpenAI Agents SDK — Streaming
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.