Skip to content
Learn Agentic AI
Learn Agentic AI13 min read14 views

Running Agents: Runner.run(), run_sync(), and run_streamed() Explained

Master the three execution methods in the OpenAI Agents SDK. Learn when to use async run(), synchronous run_sync(), and streaming run_streamed() with practical code examples.

Three Ways to Run an Agent

The OpenAI Agents SDK provides three methods on the Runner class for executing agents. Each serves a different use case:

Method Async Streaming Best For
Runner.run() Yes No Production web servers, async applications
Runner.run_sync() No No Scripts, CLI tools, notebooks, quick prototyping
Runner.run_streamed() Yes Yes Chat UIs, real-time output, long responses

All three methods execute the same underlying agent loop — the difference is in how they return results to your code.

Runner.run() — The Async Workhorse

Runner.run() is the primary execution method. It is asynchronous, returning an awaitable that resolves to a RunResult when the agent loop completes:

flowchart TD
    START["Running Agents: Runner.run, run_sync, and run_str…"] --> A
    A["Three Ways to Run an Agent"]
    A --> B
    B["Runner.run — The Async Workhorse"]
    B --> C
    C["Runner.run_sync — Synchronous Convenien…"]
    C --> D
    D["Runner.run_streamed — Real-Time Output"]
    D --> E
    E["Input Types"]
    E --> F
    F["RunConfig: Controlling Execution"]
    F --> G
    G["The RunResult Object"]
    G --> H
    H["Best Practices"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from agents import Agent, Runner

agent = Agent(
    name="Assistant",
    instructions="You are a helpful assistant.",
)

async def main():
    result = await Runner.run(
        agent,
        "Explain the difference between threads and processes.",
    )
    print(result.final_output)
    print(f"Agent that responded: {result.last_agent.name}")

asyncio.run(main())

When to Use run()

Use Runner.run() whenever you are in an async context:

  • FastAPI / Starlette endpoints — These are natively async
  • Background task workers — Celery with async support, arq, etc.
  • Batch processing — Run multiple agents concurrently with asyncio.gather()

Concurrent Execution

Because run() is async, you can run multiple agents in parallel:

import asyncio
from agents import Agent, Runner

summarizer = Agent(name="Summarizer", instructions="Summarize the given text in 2 sentences.")
translator = Agent(name="Translator", instructions="Translate the given text to French.")
critic = Agent(name="Critic", instructions="Identify logical flaws in the given text.")

async def process_text(text: str):
    # Run all three agents concurrently
    summarize_task = Runner.run(summarizer, text)
    translate_task = Runner.run(translator, text)
    critic_task = Runner.run(critic, text)

    results = await asyncio.gather(summarize_task, translate_task, critic_task)

    return {
        "summary": results[0].final_output,
        "french": results[1].final_output,
        "critique": results[2].final_output,
    }

asyncio.run(process_text("The quantum computer will solve all NP-hard problems by 2027."))

This sends three independent LLM requests simultaneously, significantly reducing total latency compared to sequential execution.

Runner.run_sync() — Synchronous Convenience

Runner.run_sync() is a synchronous wrapper around Runner.run(). It blocks the current thread until the agent loop completes:

from agents import Agent, Runner

agent = Agent(
    name="Assistant",
    instructions="You are a helpful assistant.",
)

# No async/await needed
result = Runner.run_sync(agent, "What is the capital of Japan?")
print(result.final_output)

When to Use run_sync()

  • Scripts and CLI tools — No need to set up an async event loop
  • Jupyter notebooks — Avoids event loop conflicts
  • Quick prototyping — Fastest way to test an agent
  • Django views — If you are not using Django's async views

Important: Do not use run_sync() inside an existing async event loop (like a FastAPI endpoint). It will raise an error or deadlock because it tries to create its own event loop.

Runner.run_streamed() — Real-Time Output

Runner.run_streamed() returns a RunResultStreaming object immediately, then streams events as the agent processes:

flowchart TD
    ROOT["Running Agents: Runner.run, run_sync, and ru…"] 
    ROOT --> P0["Runner.run — The Async Workhorse"]
    P0 --> P0C0["When to Use run"]
    P0 --> P0C1["Concurrent Execution"]
    ROOT --> P1["Runner.run_sync — Synchronous Convenien…"]
    P1 --> P1C0["When to Use run_sync"]
    ROOT --> P2["Runner.run_streamed — Real-Time Output"]
    P2 --> P2C0["Stream Event Types"]
    P2 --> P2C1["Building a Chat UI with Streaming"]
    ROOT --> P3["Input Types"]
    P3 --> P3C0["String Input"]
    P3 --> P3C1["Message List Input"]
    P3 --> P3C2["Continuing from a Previous Run"]
    style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
    style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P3 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
import asyncio
from agents import Agent, Runner

agent = Agent(
    name="Storyteller",
    instructions="Write engaging short stories.",
)

async def main():
    result = Runner.run_streamed(agent, "Write a story about a robot learning to paint.")

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            # Access the raw streaming delta
            if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                print(event.data.delta.text, end="", flush=True)

    print()  # Newline after streaming completes

    # The final result is still available after streaming
    final = result.final_output
    print(f"\nFull response length: {len(final)} characters")

asyncio.run(main())

Stream Event Types

The stream_events() async iterator yields events with a type field:

  • raw_response_event — Raw chunks from the model response, including text deltas
  • agent_updated_stream_event — Fired when the current agent changes (during handoffs)
  • run_item_stream_event — Higher-level events for tool calls, messages, handoffs

Building a Chat UI with Streaming

Here is a pattern for building an interactive chat loop with streaming:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncio
from agents import Agent, Runner

agent = Agent(
    name="Chat Assistant",
    instructions="You are a friendly chat assistant. Keep responses concise.",
)

async def chat():
    conversation_history = []

    while True:
        user_input = input("\nYou: ")
        if user_input.lower() in ("quit", "exit"):
            break

        # Build input with conversation history
        conversation_history.append({
            "role": "user",
            "content": user_input,
        })

        print("Assistant: ", end="", flush=True)

        result = Runner.run_streamed(agent, conversation_history)

        async for event in result.stream_events():
            if event.type == "raw_response_event":
                if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
                    print(event.data.delta.text, end="", flush=True)

        print()

        # Add assistant response to history
        conversation_history.append({
            "role": "assistant",
            "content": result.final_output,
        })

asyncio.run(chat())

Input Types

All three runner methods accept flexible input types:

String Input

The simplest form — a single user message:

result = await Runner.run(agent, "Hello, how are you?")

Message List Input

For multi-turn conversations or providing context:

result = await Runner.run(agent, [
    {"role": "user", "content": "My name is Alice."},
    {"role": "assistant", "content": "Hello Alice! How can I help you today?"},
    {"role": "user", "content": "What is my name?"},
])

Continuing from a Previous Run

Pass a previous RunResult to continue the conversation with full context:

result1 = await Runner.run(agent, "My favorite color is blue.")
result2 = await Runner.run(agent, "What is my favorite color?", previous_result=result1)
# result2.final_output will reference "blue"

RunConfig: Controlling Execution

The RunConfig parameter lets you customize execution behavior:

flowchart TD
    CENTER(("Core Concepts"))
    CENTER --> N0["FastAPI / Starlette endpoints — These a…"]
    CENTER --> N1["Background task workers — Celery with a…"]
    CENTER --> N2["Batch processing — Run multiple agents …"]
    CENTER --> N3["Scripts and CLI tools — No need to set …"]
    CENTER --> N4["Jupyter notebooks — Avoids event loop c…"]
    CENTER --> N5["Quick prototyping — Fastest way to test…"]
    style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
from agents import Agent, Runner, RunConfig

agent = Agent(name="Assistant", instructions="Be helpful.")

result = await Runner.run(
    agent,
    "Complex multi-step question here...",
    run_config=RunConfig(
        max_turns=10,                    # Limit agent loop iterations
        tracing_disabled=False,          # Enable tracing (default)
        workflow_name="customer-support", # Name for tracing
        trace_id="unique-trace-id",      # Custom trace ID
    ),
)

max_turns

The max_turns parameter is a safety mechanism that limits how many iterations the agent loop can execute. Each "turn" is one LLM call. If the limit is reached, the SDK raises MaxTurnsExceeded:

from agents import Agent, Runner, MaxTurnsExceeded

agent = Agent(
    name="Research Agent",
    instructions="Research the topic thoroughly using all available tools.",
    tools=[search_tool, analyze_tool],
)

try:
    result = await Runner.run(agent, "Research quantum computing", max_turns=5)
except MaxTurnsExceeded:
    print("Agent exceeded the maximum number of turns. The task may be too complex.")

Set max_turns based on your use case:

  • Simple Q&A: 2-3 turns
  • Tool-using agents: 5-10 turns
  • Complex research agents: 15-25 turns
  • Never leave it unlimited in production

The RunResult Object

Every run returns a RunResult (or RunResultStreaming for streamed runs) with these key properties:

result = await Runner.run(agent, "Hello")

# The final text or structured output
output = result.final_output

# The agent that produced the final output (may differ from the starting agent if handoffs occurred)
last_agent = result.last_agent

# All items generated during the run: messages, tool calls, tool outputs, handoffs
items = result.new_items

# The raw input that started the run
original_input = result.input

# For structured outputs, get the typed result
typed_output = result.final_output_as(MyPydanticModel)

Best Practices

  1. Use run() in production, run_sync() only for scripts and testing.

  2. Always set max_turns to prevent runaway agent loops that burn through your API budget.

  3. Use streaming for user-facing applications. Waiting 10+ seconds for a response with no feedback is a poor user experience.

  4. Handle exceptions around all runner calls. Network errors, rate limits, and model errors can all occur.

  5. Pass conversation history as message lists for multi-turn chat rather than concatenating strings.


Source: OpenAI Agents SDK — Running Agents

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like