---
title: "Streaming Agent Responses: Real-Time Output with run_streamed()"
description: "Build real-time agent interfaces with Runner.run_streamed(). Learn about stream events, WebSocket transport, persistent connections, and building streaming chat UIs."
canonical: https://callsphere.ai/blog/openai-agents-sdk-streaming-responses-real-time-output-websocket
category: "Learn Agentic AI"
tags: ["OpenAI", "Streaming", "WebSocket", "Real-Time", "Python"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-05-06T20:51:56.409Z
---

# Streaming Agent Responses: Real-Time Output with run_streamed()

> Build real-time agent interfaces with Runner.run_streamed(). Learn about stream events, WebSocket transport, persistent connections, and building streaming chat UIs.

## Why Streaming Matters for Agent Applications

When an agent needs to reason through multiple tool calls before responding, the total latency can reach 10-30 seconds. Without streaming, the user stares at a loading spinner the entire time. With streaming, they see progress immediately — partial text appears as it is generated, tool calls are visible as they execute, and the experience feels responsive.

The OpenAI Agents SDK provides first-class streaming support through `Runner.run_streamed()`, which returns events in real-time as the agent loop executes.

## Basic Streaming Setup

`Runner.run_streamed()` returns a `RunResultStreaming` object immediately. You then iterate over its events:

```mermaid
flowchart LR
    INPUT(["User input"])
    AGENT["Agent
name plus instructions"]
    HAND{"Handoff to
another agent?"}
    SUB["Sub-agent
specialist"]
    GUARD{"Guardrail
passed?"}
    TOOL["Tool call"]
    SDK[("Tracing
OpenAI dashboard")]
    OUT(["Final output"])
    INPUT --> AGENT --> HAND
    HAND -->|Yes| SUB --> GUARD
    HAND -->|No| GUARD
    GUARD -->|Yes| TOOL --> AGENT
    GUARD -->|Block| OUT
    AGENT --> OUT
    AGENT --> SDK
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style SDK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
import asyncio
from agents import Agent, Runner

agent = Agent(
    name="Explainer",
    instructions="Explain topics in detail with examples.",
)

async def main():
    result = Runner.run_streamed(agent, "Explain how TCP/IP works")

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                print(event.data.delta.text, end="", flush=True)

    print()  # Final newline

asyncio.run(main())
```

The text appears word-by-word (or chunk-by-chunk) as the model generates it, providing immediate feedback to the user.

## Stream Event Types

The `stream_events()` iterator yields events with different types:

### raw_response_event

These are the lowest-level events, corresponding to chunks from the model's streaming response. They contain text deltas, tool call deltas, and other raw data:

```python
async for event in result.stream_events():
    if event.type == "raw_response_event":
        data = event.data
        # Text content delta
        if hasattr(data, 'delta') and hasattr(data.delta, 'text'):
            handle_text_chunk(data.delta.text)
```

### run_item_stream_event

Higher-level events that represent complete items in the agent loop:

```python
async for event in result.stream_events():
    if event.type == "run_item_stream_event":
        item = event.item

        if item.type == "tool_call_item":
            print(f"\n[Calling tool: {item.raw_item.name}]")

        elif item.type == "tool_call_output_item":
            print(f"\n[Tool returned: {item.output[:100]}]")

        elif item.type == "message_output_item":
            print(f"\n[Agent message]")
```

### agent_updated_stream_event

Fired when the current agent changes during a handoff:

```python
async for event in result.stream_events():
    if event.type == "agent_updated_stream_event":
        print(f"\n[Handed off to: {event.new_agent.name}]")
```

## Building a Complete Streaming Handler

Here is a comprehensive streaming handler that processes all event types:

```python
import asyncio
from agents import Agent, Runner, function_tool

@function_tool
def search_docs(query: str) -> str:
    """Search documentation for relevant articles.

    Args:
        query: Search query.
    """
    return f"Found 3 articles about '{query}': [Article 1, Article 2, Article 3]"

agent = Agent(
    name="Doc Assistant",
    instructions="Help users find information in documentation. Use the search tool when needed.",
    tools=[search_docs],
)

async def stream_agent_response(user_input: str):
    result = Runner.run_streamed(agent, user_input)

    current_text = ""

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                chunk = event.data.delta.text
                current_text += chunk
                print(chunk, end="", flush=True)

        elif event.type == "run_item_stream_event":
            item = event.item
            if item.type == "tool_call_item":
                print(f"\n  >> Searching: {item.raw_item.name}...", flush=True)
            elif item.type == "tool_call_output_item":
                print(f"  >> Results received", flush=True)

        elif event.type == "agent_updated_stream_event":
            print(f"\n  >> Transferring to {event.new_agent.name}...", flush=True)

    print()

    # Access the complete result after streaming
    final_output = result.final_output
    return final_output

asyncio.run(stream_agent_response("How do I configure authentication?"))
```

## WebSocket Transport

By default, the SDK uses HTTP with Server-Sent Events (SSE) for streaming. For lower latency and bidirectional communication, you can switch to WebSocket transport:

```python
from agents import set_default_openai_responses_transport

# Switch to WebSocket transport globally
set_default_openai_responses_transport("websocket")
```

WebSocket transport benefits:

- **Lower latency**: No HTTP overhead per message
- **Persistent connection**: Reuses the same connection across multiple requests
- **Bidirectional**: Foundation for real-time interactive agents

### Persistent WebSocket Sessions

For applications that make many sequential LLM calls (like agents with multiple tool-calling turns), persistent WebSocket sessions avoid the connection setup overhead:

```python
from agents import Runner
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def run_with_persistent_websocket():
    agent = Agent(
        name="Fast Agent",
        instructions="Respond quickly using tools.",
        tools=[tool_a, tool_b, tool_c],
    )

    # Use a persistent WebSocket session
    async with client.responses.websocket_session() as session:
        result = Runner.run_streamed(
            agent,
            "Process this complex request",
        )

        async for event in result.stream_events():
            if event.type == "raw_response_event":
                if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                    print(event.data.delta.text, end="", flush=True)
```

This is especially valuable when an agent makes 5-10 LLM calls in a single run (due to tool calls). Each subsequent call reuses the WebSocket connection instead of establishing a new HTTP connection.

## Streaming with FastAPI

Here is how to integrate streaming into a FastAPI endpoint using Server-Sent Events:

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json

app = FastAPI()

agent = Agent(
    name="Chat Agent",
    instructions="You are a helpful chat assistant.",
)

async def event_generator(user_input: str):
    result = Runner.run_streamed(agent, user_input)

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                chunk = event.data.delta.text
                yield f"data: {json.dumps({'type': 'text', 'content': chunk})}\n\n"

        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                yield f"data: {json.dumps({'type': 'tool_call', 'name': event.item.raw_item.name})}\n\n"

    yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: dict):
    return StreamingResponse(
        event_generator(request["message"]),
        media_type="text/event-stream",
    )
```

The frontend consumes this with the EventSource API or a fetch-based SSE reader:

```javascript
// Frontend JavaScript
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message: userInput }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  const lines = text.split('\n');

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = JSON.parse(line.slice(6));
      if (data.type === 'text') {
        appendToChat(data.content);
      } else if (data.type === 'tool_call') {
        showToolIndicator(data.name);
      }
    }
  }
}
```

## Streaming with Multi-Agent Handoffs

When agents hand off to each other, streaming shows the transition in real-time:

```python
support_agent = Agent(
    name="Support Agent",
    instructions="Handle general questions.",
)

billing_agent = Agent(
    name="Billing Agent",
    instructions="Handle billing questions.",
)

triage_agent = Agent(
    name="Triage Agent",
    instructions="Route to the appropriate agent.",
    handoffs=[support_agent, billing_agent],
)

async def stream_with_handoffs(user_input: str):
    result = Runner.run_streamed(triage_agent, user_input)
    current_agent = triage_agent.name

    async for event in result.stream_events():
        if event.type == "agent_updated_stream_event":
            current_agent = event.new_agent.name
            print(f"\n--- Transferred to {current_agent} ---\n")

        elif event.type == "raw_response_event":
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                print(event.data.delta.text, end="", flush=True)

    print(f"\n\nFinal agent: {result.last_agent.name}")
```

## Performance Considerations

1. **Use WebSocket transport for high-frequency applications.** If your agent makes many LLM calls per request, the connection reuse significantly reduces latency.
2. **Buffer small chunks.** In a web UI, updating the DOM for every single token can cause performance issues. Buffer chunks and update on a timer (every 50-100ms).
3. **Handle backpressure.** If your event consumer is slower than the stream producer, events can queue up in memory. Monitor memory usage in high-throughput scenarios.
4. **Set timeouts on the stream.** A stalled stream can hold connections open indefinitely. Implement a timeout that closes the stream if no events arrive within a reasonable window.
5. **Test with slow connections.** Streaming UIs behave differently on 3G vs fiber. Test with network throttling to ensure a good experience across connection speeds.

## Best Practices

1. **Always handle all event types.** Even if you only display text, log tool calls and handoffs for debugging.
2. **Show progress indicators during tool calls.** Users should know the agent is working, not stalled.
3. **Provide a fallback for non-streaming clients.** Not all clients support SSE or WebSocket. Offer a non-streaming endpoint as well.
4. **Clean up resources.** If the user disconnects mid-stream, ensure the streaming context is properly closed.

---

**Source:** [OpenAI Agents SDK — Streaming](https://openai.github.io/openai-agents-python/running_agents/#streaming)

---

Source: https://callsphere.ai/blog/openai-agents-sdk-streaming-responses-real-time-output-websocket
