Error Handling in Agent Workflows: Exceptions, Retries, and Recovery
Master error handling in the OpenAI Agents SDK. Learn about MaxTurnsExceeded, tool errors, model behavior errors, retry policies, and building resilient agent workflows.
Production Agents Must Handle Failure
In production, things go wrong. APIs time out. Models hallucinate invalid tool arguments. Rate limits hit at peak traffic. Network connections drop. A production-grade agent system must handle all of these failures gracefully.
The OpenAI Agents SDK provides multiple layers of error handling: exception types for different failure modes, tool error recovery within the agent loop, retry policies for transient failures, and hooks for custom error handling logic.
Exception Types
The SDK defines several exception types that you should handle in your application code:
flowchart TD
START["Error Handling in Agent Workflows: Exceptions, Re…"] --> A
A["Production Agents Must Handle Failure"]
A --> B
B["Exception Types"]
B --> C
C["Tool Error Recovery"]
C --> D
D["Tool Timeouts"]
D --> E
E["Retry Policies"]
E --> F
F["Comprehensive Error Handling Pattern"]
F --> G
G["Application-Level Retries"]
G --> H
H["Best Practices"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
MaxTurnsExceeded
Raised when the agent loop exceeds the max_turns limit without producing a final output:
from agents import Agent, Runner, MaxTurnsExceeded
agent = Agent(
name="Research Agent",
instructions="Research the topic using available tools.",
tools=[search_tool, analyze_tool],
)
try:
result = await Runner.run(agent, "Research quantum computing", max_turns=5)
print(result.final_output)
except MaxTurnsExceeded:
print("The agent could not complete the task within the turn limit.")
print("Consider increasing max_turns or simplifying the task.")
When this happens:
- The task is genuinely complex and requires many tool calls
- The agent is stuck in a loop, calling the same tool repeatedly
- The instructions are ambiguous about when to stop
How to handle it:
- Return a graceful error to the user
- Log the partial results for debugging
- Consider retrying with a higher
max_turnsor rephrased input
ModelBehaviorError
Raised when the model produces output that the SDK cannot process. This is rare with OpenAI models but can occur with third-party providers:
from agents import ModelBehaviorError
try:
result = await Runner.run(agent, "Process this request")
except ModelBehaviorError as e:
print(f"Model produced unexpected output: {e}")
# Log and alert — this usually indicates a model or provider issue
UserError
Raised when the SDK detects incorrect usage in your code, such as misconfigured agents or invalid parameters:
from agents import UserError
try:
# This would raise UserError if, e.g., output_type is not a valid type
agent = Agent(name="Test", instructions="Test", output_type="not_a_type")
except UserError as e:
print(f"Configuration error: {e}")
InputGuardrailTripwireTriggered and OutputGuardrailTripwireTriggered
Raised when input or output guardrails detect content that should not be processed:
from agents import InputGuardrailTripwireTriggered, OutputGuardrailTripwireTriggered
try:
result = await Runner.run(agent, user_input)
except InputGuardrailTripwireTriggered:
print("Input was flagged by safety guardrails.")
except OutputGuardrailTripwireTriggered:
print("Output was flagged by safety guardrails.")
Tool Error Recovery
One of the most powerful features of the agent loop is automatic tool error recovery. When a tool raises an exception, the SDK does not crash. Instead, it:
- Catches the exception
- Converts the error message to a string
- Sends it back to the LLM as the tool result
- The LLM can then decide how to proceed — retry, try a different approach, or report the error
from agents import function_tool
@function_tool
async def fetch_data(url: str) -> str:
"""Fetch data from a URL.
Args:
url: The URL to fetch data from.
"""
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5)
response.raise_for_status()
return response.text[:2000]
If the URL is unreachable, the agent sees something like: "Error: Connection timeout after 5 seconds." The agent can then:
- Try a different URL
- Ask the user for a corrected URL
- Report that the data source is unavailable
This self-healing behavior means agents handle many errors without any special error handling code from you.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Controlling Tool Error Behavior
You can customize how tool errors are reported by catching exceptions inside the tool and returning descriptive error messages:
@function_tool
async def query_database(sql: str) -> str:
"""Execute a read-only SQL query.
Args:
sql: The SQL query to execute.
"""
if not sql.strip().upper().startswith("SELECT"):
return "Error: Only SELECT queries are allowed for safety."
try:
async with get_db_connection() as conn:
rows = await conn.fetch(sql)
if not rows:
return "Query returned no results."
return format_rows(rows)
except asyncpg.PostgresError as e:
return f"Database error: {e}. Please check your query syntax."
except asyncio.TimeoutError:
return "Query timed out. Try a simpler query or add LIMIT clause."
By catching exceptions and returning clear error messages, you give the LLM the information it needs to self-correct.
Tool Timeouts
Tools that perform I/O should have timeouts to prevent the agent loop from hanging:
flowchart TD
ROOT["Error Handling in Agent Workflows: Exception…"]
ROOT --> P0["Exception Types"]
P0 --> P0C0["MaxTurnsExceeded"]
P0 --> P0C1["ModelBehaviorError"]
P0 --> P0C2["UserError"]
P0 --> P0C3["InputGuardrailTripwireTriggered and Out…"]
ROOT --> P1["Tool Error Recovery"]
P1 --> P1C0["Controlling Tool Error Behavior"]
style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
@function_tool(timeout=10)
async def call_external_api(endpoint: str) -> str:
"""Call an external API endpoint.
Args:
endpoint: The API endpoint path.
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/{endpoint}")
return response.text
When a tool exceeds its timeout, the SDK raises a ToolTimeoutError internally, which is converted to an error message for the LLM. The agent can then decide to retry or skip.
Retry Policies
The SDK supports configurable retry policies for transient failures at the LLM API level. These retries happen automatically before the error reaches your application code:
from agents import Agent, Runner, RunConfig
result = await Runner.run(
agent,
"Process this request",
run_config=RunConfig(
model_provider_retry_config={
"max_retries": 3,
"initial_delay": 1.0,
"max_delay": 30.0,
"backoff_factor": 2.0,
},
),
)
The retry policy applies to:
- Network errors: Connection refused, DNS failures, timeouts
- HTTP 429: Rate limit responses (respects Retry-After header)
- HTTP 500/502/503: Server-side errors from the provider
The retries use exponential backoff: first retry after 1 second, second after 2 seconds, third after 4 seconds (capped at 30 seconds).
Comprehensive Error Handling Pattern
Here is a production-ready error handling pattern that covers all failure modes:
flowchart TD
CENTER(("Core Concepts"))
CENTER --> N0["The task is genuinely complex and requi…"]
CENTER --> N1["The agent is stuck in a loop, calling t…"]
CENTER --> N2["The instructions are ambiguous about wh…"]
CENTER --> N3["Return a graceful error to the user"]
CENTER --> N4["Log the partial results for debugging"]
CENTER --> N5["Consider retrying with a higher max_tur…"]
style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
import asyncio
import logging
from agents import (
Agent,
Runner,
MaxTurnsExceeded,
ModelBehaviorError,
InputGuardrailTripwireTriggered,
OutputGuardrailTripwireTriggered,
RunConfig,
)
logger = logging.getLogger(__name__)
agent = Agent(
name="Production Agent",
instructions="You are a helpful assistant.",
tools=[search_tool, database_tool],
)
async def handle_request(user_input: str, user_id: str) -> dict:
"""Handle a user request with comprehensive error handling."""
try:
result = await Runner.run(
agent,
user_input,
run_config=RunConfig(
max_turns=10,
workflow_name="customer-request",
),
)
return {
"status": "success",
"response": result.final_output,
"agent": result.last_agent.name,
}
except MaxTurnsExceeded:
logger.warning(f"Max turns exceeded for user {user_id}", extra={
"user_id": user_id,
"input_preview": user_input[:100],
})
return {
"status": "incomplete",
"response": "I was not able to fully complete your request. Could you try breaking it into smaller questions?",
}
except InputGuardrailTripwireTriggered:
logger.info(f"Input guardrail triggered for user {user_id}")
return {
"status": "blocked",
"response": "I am not able to process that request. Please rephrase your question.",
}
except OutputGuardrailTripwireTriggered:
logger.warning(f"Output guardrail triggered for user {user_id}")
return {
"status": "blocked",
"response": "I generated a response that did not meet our safety guidelines. Please try again.",
}
except ModelBehaviorError as e:
logger.error(f"Model behavior error: {e}", exc_info=True)
return {
"status": "error",
"response": "An unexpected error occurred. Our team has been notified.",
}
except Exception as e:
logger.error(f"Unexpected error for user {user_id}: {e}", exc_info=True)
return {
"status": "error",
"response": "Something went wrong. Please try again later.",
}
Application-Level Retries
For critical workflows where you need the agent to succeed, implement application-level retries with escalation:
async def robust_agent_call(
agent: Agent,
user_input: str,
max_attempts: int = 3,
) -> str:
"""Run an agent with application-level retries and escalation."""
last_error = None
for attempt in range(1, max_attempts + 1):
try:
# Increase max_turns with each attempt
max_turns = 5 * attempt
result = await Runner.run(
agent,
user_input,
max_turns=max_turns,
)
return result.final_output
except MaxTurnsExceeded:
last_error = "exceeded_turns"
logger.info(f"Attempt {attempt}: max turns exceeded, retrying with higher limit")
continue
except Exception as e:
last_error = str(e)
if attempt < max_attempts:
wait_time = 2 ** attempt
logger.info(f"Attempt {attempt} failed: {e}. Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
continue
raise RuntimeError(f"Agent failed after {max_attempts} attempts. Last error: {last_error}")
Best Practices
Always catch MaxTurnsExceeded in production. It is the most common agent-specific error.
Set appropriate max_turns. Too low and agents cannot complete complex tasks. Too high and a stuck agent burns through your API budget.
Let tools return error strings instead of raising exceptions when possible. This gives the LLM a chance to self-correct.
Use tool timeouts for all I/O operations. A hanging tool blocks the entire agent loop.
Log the full RunResult on errors. The
new_itemslist contains the complete trace of what happened, which is invaluable for debugging.Implement circuit breakers for tools that call external services. If a service is down, fail fast rather than burning through retries.
Never expose raw error messages to users. Map all errors to user-friendly messages.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.