Running Agents: Runner.run(), run_sync(), and run_streamed() Explained
Master the three execution methods in the OpenAI Agents SDK. Learn when to use async run(), synchronous run_sync(), and streaming run_streamed() with practical code examples.
Three Ways to Run an Agent
The OpenAI Agents SDK provides three methods on the Runner class for executing agents. Each serves a different use case:
| Method | Async | Streaming | Best For |
|---|---|---|---|
Runner.run() |
Yes | No | Production web servers, async applications |
Runner.run_sync() |
No | No | Scripts, CLI tools, notebooks, quick prototyping |
Runner.run_streamed() |
Yes | Yes | Chat UIs, real-time output, long responses |
All three methods execute the same underlying agent loop — the difference is in how they return results to your code.
Runner.run() — The Async Workhorse
Runner.run() is the primary execution method. It is asynchronous, returning an awaitable that resolves to a RunResult when the agent loop completes:
flowchart TD
START["Running Agents: Runner.run, run_sync, and run_str…"] --> A
A["Three Ways to Run an Agent"]
A --> B
B["Runner.run — The Async Workhorse"]
B --> C
C["Runner.run_sync — Synchronous Convenien…"]
C --> D
D["Runner.run_streamed — Real-Time Output"]
D --> E
E["Input Types"]
E --> F
F["RunConfig: Controlling Execution"]
F --> G
G["The RunResult Object"]
G --> H
H["Best Practices"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
)
async def main():
result = await Runner.run(
agent,
"Explain the difference between threads and processes.",
)
print(result.final_output)
print(f"Agent that responded: {result.last_agent.name}")
asyncio.run(main())
When to Use run()
Use Runner.run() whenever you are in an async context:
- FastAPI / Starlette endpoints — These are natively async
- Background task workers — Celery with async support, arq, etc.
- Batch processing — Run multiple agents concurrently with
asyncio.gather()
Concurrent Execution
Because run() is async, you can run multiple agents in parallel:
import asyncio
from agents import Agent, Runner
summarizer = Agent(name="Summarizer", instructions="Summarize the given text in 2 sentences.")
translator = Agent(name="Translator", instructions="Translate the given text to French.")
critic = Agent(name="Critic", instructions="Identify logical flaws in the given text.")
async def process_text(text: str):
# Run all three agents concurrently
summarize_task = Runner.run(summarizer, text)
translate_task = Runner.run(translator, text)
critic_task = Runner.run(critic, text)
results = await asyncio.gather(summarize_task, translate_task, critic_task)
return {
"summary": results[0].final_output,
"french": results[1].final_output,
"critique": results[2].final_output,
}
asyncio.run(process_text("The quantum computer will solve all NP-hard problems by 2027."))
This sends three independent LLM requests simultaneously, significantly reducing total latency compared to sequential execution.
Runner.run_sync() — Synchronous Convenience
Runner.run_sync() is a synchronous wrapper around Runner.run(). It blocks the current thread until the agent loop completes:
from agents import Agent, Runner
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
)
# No async/await needed
result = Runner.run_sync(agent, "What is the capital of Japan?")
print(result.final_output)
When to Use run_sync()
- Scripts and CLI tools — No need to set up an async event loop
- Jupyter notebooks — Avoids event loop conflicts
- Quick prototyping — Fastest way to test an agent
- Django views — If you are not using Django's async views
Important: Do not use run_sync() inside an existing async event loop (like a FastAPI endpoint). It will raise an error or deadlock because it tries to create its own event loop.
Runner.run_streamed() — Real-Time Output
Runner.run_streamed() returns a RunResultStreaming object immediately, then streams events as the agent processes:
flowchart TD
ROOT["Running Agents: Runner.run, run_sync, and ru…"]
ROOT --> P0["Runner.run — The Async Workhorse"]
P0 --> P0C0["When to Use run"]
P0 --> P0C1["Concurrent Execution"]
ROOT --> P1["Runner.run_sync — Synchronous Convenien…"]
P1 --> P1C0["When to Use run_sync"]
ROOT --> P2["Runner.run_streamed — Real-Time Output"]
P2 --> P2C0["Stream Event Types"]
P2 --> P2C1["Building a Chat UI with Streaming"]
ROOT --> P3["Input Types"]
P3 --> P3C0["String Input"]
P3 --> P3C1["Message List Input"]
P3 --> P3C2["Continuing from a Previous Run"]
style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P3 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Storyteller",
instructions="Write engaging short stories.",
)
async def main():
result = Runner.run_streamed(agent, "Write a story about a robot learning to paint.")
async for event in result.stream_events():
if event.type == "raw_response_event":
# Access the raw streaming delta
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print() # Newline after streaming completes
# The final result is still available after streaming
final = result.final_output
print(f"\nFull response length: {len(final)} characters")
asyncio.run(main())
Stream Event Types
The stream_events() async iterator yields events with a type field:
raw_response_event— Raw chunks from the model response, including text deltasagent_updated_stream_event— Fired when the current agent changes (during handoffs)run_item_stream_event— Higher-level events for tool calls, messages, handoffs
Building a Chat UI with Streaming
Here is a pattern for building an interactive chat loop with streaming:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import asyncio
from agents import Agent, Runner
agent = Agent(
name="Chat Assistant",
instructions="You are a friendly chat assistant. Keep responses concise.",
)
async def chat():
conversation_history = []
while True:
user_input = input("\nYou: ")
if user_input.lower() in ("quit", "exit"):
break
# Build input with conversation history
conversation_history.append({
"role": "user",
"content": user_input,
})
print("Assistant: ", end="", flush=True)
result = Runner.run_streamed(agent, conversation_history)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
print(event.data.delta.text, end="", flush=True)
print()
# Add assistant response to history
conversation_history.append({
"role": "assistant",
"content": result.final_output,
})
asyncio.run(chat())
Input Types
All three runner methods accept flexible input types:
String Input
The simplest form — a single user message:
result = await Runner.run(agent, "Hello, how are you?")
Message List Input
For multi-turn conversations or providing context:
result = await Runner.run(agent, [
{"role": "user", "content": "My name is Alice."},
{"role": "assistant", "content": "Hello Alice! How can I help you today?"},
{"role": "user", "content": "What is my name?"},
])
Continuing from a Previous Run
Pass a previous RunResult to continue the conversation with full context:
result1 = await Runner.run(agent, "My favorite color is blue.")
result2 = await Runner.run(agent, "What is my favorite color?", previous_result=result1)
# result2.final_output will reference "blue"
RunConfig: Controlling Execution
The RunConfig parameter lets you customize execution behavior:
flowchart TD
CENTER(("Core Concepts"))
CENTER --> N0["FastAPI / Starlette endpoints — These a…"]
CENTER --> N1["Background task workers — Celery with a…"]
CENTER --> N2["Batch processing — Run multiple agents …"]
CENTER --> N3["Scripts and CLI tools — No need to set …"]
CENTER --> N4["Jupyter notebooks — Avoids event loop c…"]
CENTER --> N5["Quick prototyping — Fastest way to test…"]
style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
from agents import Agent, Runner, RunConfig
agent = Agent(name="Assistant", instructions="Be helpful.")
result = await Runner.run(
agent,
"Complex multi-step question here...",
run_config=RunConfig(
max_turns=10, # Limit agent loop iterations
tracing_disabled=False, # Enable tracing (default)
workflow_name="customer-support", # Name for tracing
trace_id="unique-trace-id", # Custom trace ID
),
)
max_turns
The max_turns parameter is a safety mechanism that limits how many iterations the agent loop can execute. Each "turn" is one LLM call. If the limit is reached, the SDK raises MaxTurnsExceeded:
from agents import Agent, Runner, MaxTurnsExceeded
agent = Agent(
name="Research Agent",
instructions="Research the topic thoroughly using all available tools.",
tools=[search_tool, analyze_tool],
)
try:
result = await Runner.run(agent, "Research quantum computing", max_turns=5)
except MaxTurnsExceeded:
print("Agent exceeded the maximum number of turns. The task may be too complex.")
Set max_turns based on your use case:
- Simple Q&A: 2-3 turns
- Tool-using agents: 5-10 turns
- Complex research agents: 15-25 turns
- Never leave it unlimited in production
The RunResult Object
Every run returns a RunResult (or RunResultStreaming for streamed runs) with these key properties:
result = await Runner.run(agent, "Hello")
# The final text or structured output
output = result.final_output
# The agent that produced the final output (may differ from the starting agent if handoffs occurred)
last_agent = result.last_agent
# All items generated during the run: messages, tool calls, tool outputs, handoffs
items = result.new_items
# The raw input that started the run
original_input = result.input
# For structured outputs, get the typed result
typed_output = result.final_output_as(MyPydanticModel)
Best Practices
Use
run()in production,run_sync()only for scripts and testing.Always set
max_turnsto prevent runaway agent loops that burn through your API budget.Use streaming for user-facing applications. Waiting 10+ seconds for a response with no feedback is a poor user experience.
Handle exceptions around all runner calls. Network errors, rate limits, and model errors can all occur.
Pass conversation history as message lists for multi-turn chat rather than concatenating strings.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.