Parallel Agent Execution with asyncio.gather
Learn how to run multiple OpenAI agents concurrently using asyncio.gather for dramatic performance improvements, with error handling strategies and a complete market research example.
Why Parallel Execution Matters
When you run agents sequentially, total execution time is the sum of all agent runtimes. If Agent A takes 3 seconds, Agent B takes 4 seconds, and Agent C takes 2 seconds, you wait 9 seconds.
With parallel execution, total time equals the slowest agent. Those same three agents running concurrently finish in 4 seconds — a 56% reduction.
The OpenAI Agents SDK is built on async Python, making it a natural fit for parallel execution via asyncio.gather. This post covers the patterns, pitfalls, and production considerations for running agents in parallel.
Basic Parallel Execution
The simplest pattern runs multiple agents on the same input concurrently:
flowchart TD
START["Parallel Agent Execution with asyncio.gather"] --> A
A["Why Parallel Execution Matters"]
A --> B
B["Basic Parallel Execution"]
B --> C
C["Parallel Execution with Different Inputs"]
C --> D
D["Error Handling in Parallel Execution"]
D --> E
E["Combining Results from Parallel Agents"]
E --> F
F["Performance Benchmarking"]
F --> G
G["Building a Complete Market Research Sys…"]
G --> H
H["When NOT to Parallelize"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from agents import Agent, Runner
import asyncio
sentiment_agent = Agent(
name="SentimentAnalyzer",
instructions="Analyze the sentiment of the given text. Return: positive, negative, or neutral with a confidence score 0-100.",
model="gpt-4o-mini",
)
topic_agent = Agent(
name="TopicExtractor",
instructions="Extract the main topics from the given text. Return a JSON list of topics.",
model="gpt-4o-mini",
)
summary_agent = Agent(
name="Summarizer",
instructions="Summarize the given text in exactly one sentence.",
model="gpt-4o-mini",
)
async def analyze_text(text: str):
# Run all three agents in parallel
sentiment_result, topic_result, summary_result = await asyncio.gather(
Runner.run(sentiment_agent, input=text),
Runner.run(topic_agent, input=text),
Runner.run(summary_agent, input=text),
)
return {
"sentiment": sentiment_result.final_output,
"topics": topic_result.final_output,
"summary": summary_result.final_output,
}
async def main():
text = """
The new AI regulations proposed by the European Commission have sparked
intense debate among technology leaders. While some argue the rules will
stifle innovation, others believe they provide necessary consumer protections.
The legislation is expected to be finalized by Q3 2026.
"""
results = await analyze_text(text)
for key, value in results.items():
print(f"{key}: {value}\n")
asyncio.run(main())
Parallel Execution with Different Inputs
A more common pattern is running the same agent on different inputs, or different agents on different inputs:
from agents import Agent, Runner
import asyncio
researcher = Agent(
name="Researcher",
instructions="""Research the given company and provide:
- Industry and market position
- Key products/services
- Recent developments
- Competitive advantages""",
model="gpt-4o",
)
async def research_companies(companies: list[str]) -> dict:
"""Research multiple companies in parallel."""
tasks = [
Runner.run(researcher, input=f"Research this company: {company}")
for company in companies
]
results = await asyncio.gather(*tasks)
return {
company: result.final_output
for company, result in zip(companies, results)
}
async def main():
companies = ["Stripe", "Datadog", "Cloudflare", "Vercel"]
reports = await research_companies(companies)
for company, report in reports.items():
print(f"\n{'=' * 40}")
print(f"Company: {company}")
print(report)
asyncio.run(main())
Error Handling in Parallel Execution
The critical question with asyncio.gather is: what happens when one agent fails? By default, if any task raises an exception, gather cancels all remaining tasks and raises the first exception. This is often not what you want.
return_exceptions=True
The simplest error handling strategy uses the return_exceptions parameter:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from agents import Agent, Runner
import asyncio
agent_a = Agent(name="AgentA", instructions="Analyze market trends.", model="gpt-4o")
agent_b = Agent(name="AgentB", instructions="Analyze competitor positioning.", model="gpt-4o")
agent_c = Agent(name="AgentC", instructions="Analyze customer sentiment.", model="gpt-4o")
async def parallel_analysis(input_text: str) -> dict:
results = await asyncio.gather(
Runner.run(agent_a, input=input_text),
Runner.run(agent_b, input=input_text),
Runner.run(agent_c, input=input_text),
return_exceptions=True, # Don't cancel other tasks on failure
)
analysis = {}
agents = ["market_trends", "competitor", "sentiment"]
for name, result in zip(agents, results):
if isinstance(result, Exception):
analysis[name] = f"ERROR: {type(result).__name__}: {str(result)}"
else:
analysis[name] = result.final_output
return analysis
async def main():
analysis = await parallel_analysis("Analyze the AI agent framework market")
for section, content in analysis.items():
print(f"\n{section}:")
print(content)
asyncio.run(main())
Retry Logic for Failed Agents
For production systems, add retry logic around individual agent calls:
from agents import Agent, Runner
import asyncio
async def run_with_retry(
agent: Agent,
input_text: str,
max_retries: int = 3,
delay: float = 1.0,
) -> str:
"""Run an agent with retry logic."""
last_error = None
for attempt in range(max_retries):
try:
result = await Runner.run(agent, input=input_text)
return result.final_output
except Exception as e:
last_error = e
if attempt < max_retries - 1:
await asyncio.sleep(delay * (2 ** attempt)) # Exponential backoff
return f"FAILED after {max_retries} attempts: {str(last_error)}"
async def parallel_with_retries(agents: list[Agent], input_text: str) -> list[str]:
"""Run multiple agents in parallel, each with retry logic."""
tasks = [
run_with_retry(agent, input_text)
for agent in agents
]
return await asyncio.gather(*tasks)
Timeout per Agent
Prevent a slow agent from holding up the entire pipeline:
from agents import Agent, Runner
import asyncio
async def run_with_timeout(
agent: Agent,
input_text: str,
timeout_seconds: float = 30.0,
) -> str:
"""Run an agent with a timeout."""
try:
result = await asyncio.wait_for(
Runner.run(agent, input=input_text),
timeout=timeout_seconds,
)
return result.final_output
except asyncio.TimeoutError:
return f"TIMEOUT: {agent.name} did not complete within {timeout_seconds}s"
async def parallel_with_timeouts(
agents: list[Agent],
input_text: str,
timeout: float = 30.0,
) -> list[str]:
"""Run multiple agents in parallel with individual timeouts."""
tasks = [
run_with_timeout(agent, input_text, timeout)
for agent in agents
]
return await asyncio.gather(*tasks)
Combining Results from Parallel Agents
After running agents in parallel, you often need a synthesis step. Use a dedicated synthesis agent:
flowchart TD
ROOT["Parallel Agent Execution with asyncio.gather"]
ROOT --> P0["Error Handling in Parallel Execution"]
P0 --> P0C0["return_exceptions=True"]
P0 --> P0C1["Retry Logic for Failed Agents"]
P0 --> P0C2["Timeout per Agent"]
ROOT --> P1["When NOT to Parallelize"]
P1 --> P1C0["Using Semaphore for Rate Limiting"]
style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
from agents import Agent, Runner
import asyncio
# Parallel analysis agents
market_agent = Agent(
name="MarketAnalyst",
instructions="Analyze market size, growth rate, and trends for the given industry.",
model="gpt-4o",
)
competitor_agent = Agent(
name="CompetitorAnalyst",
instructions="Identify top 5 competitors, their market share, and key differentiators.",
model="gpt-4o",
)
customer_agent = Agent(
name="CustomerAnalyst",
instructions="Analyze target customer segments, pain points, and buying patterns.",
model="gpt-4o",
)
# Synthesis agent
synthesizer = Agent(
name="ReportSynthesizer",
instructions="""You receive three separate analysis reports: market analysis,
competitor analysis, and customer analysis. Synthesize them into a single
coherent executive report with these sections:
1. Executive Summary
2. Market Opportunity
3. Competitive Landscape
4. Target Customer Profile
5. Strategic Recommendations
Be concise but data-driven. Reference specific findings from each report.""",
model="gpt-4o",
)
async def generate_market_report(industry: str) -> str:
"""Generate a comprehensive market report using parallel agents."""
# Phase 1: Run analysis agents in parallel
market_result, competitor_result, customer_result = await asyncio.gather(
Runner.run(market_agent, input=f"Analyze the {industry} industry"),
Runner.run(competitor_agent, input=f"Analyze competitors in {industry}"),
Runner.run(customer_agent, input=f"Analyze customers in {industry}"),
)
# Phase 2: Synthesize results
combined_input = f"""
MARKET ANALYSIS:
{market_result.final_output}
COMPETITOR ANALYSIS:
{competitor_result.final_output}
CUSTOMER ANALYSIS:
{customer_result.final_output}
"""
synthesis = await Runner.run(synthesizer, input=combined_input)
return synthesis.final_output
async def main():
report = await generate_market_report("AI-powered customer service platforms")
print(report)
asyncio.run(main())
Performance Benchmarking
Here is a utility to measure the performance difference between sequential and parallel execution:
from agents import Agent, Runner
import asyncio
import time
async def benchmark_sequential(agents: list[Agent], input_text: str) -> float:
"""Run agents sequentially and return total time."""
start = time.monotonic()
for agent in agents:
await Runner.run(agent, input=input_text)
elapsed = time.monotonic() - start
return elapsed
async def benchmark_parallel(agents: list[Agent], input_text: str) -> float:
"""Run agents in parallel and return total time."""
start = time.monotonic()
await asyncio.gather(*[
Runner.run(agent, input=input_text)
for agent in agents
])
elapsed = time.monotonic() - start
return elapsed
async def main():
agents = [
Agent(name=f"Agent{i}", instructions=f"Analyze aspect {i} of the input.", model="gpt-4o-mini")
for i in range(5)
]
input_text = "Analyze the AI agent framework market"
seq_time = await benchmark_sequential(agents, input_text)
par_time = await benchmark_parallel(agents, input_text)
print(f"Sequential: {seq_time:.2f}s")
print(f"Parallel: {par_time:.2f}s")
print(f"Speedup: {seq_time / par_time:.1f}x")
asyncio.run(main())
Typical results with 5 agents: sequential takes 12-15 seconds, parallel takes 3-4 seconds, yielding a 3-4x speedup.
Building a Complete Market Research System
Here is a full market research system that demonstrates all parallel execution patterns:
from agents import Agent, Runner
from pydantic import BaseModel
import asyncio
import json
# ─── Structured Output Models ───
class MarketData(BaseModel):
market_size_usd: str
growth_rate: str
key_trends: list[str]
risks: list[str]
class CompetitorProfile(BaseModel):
name: str
market_share: str
strengths: list[str]
weaknesses: list[str]
class CompetitorReport(BaseModel):
competitors: list[CompetitorProfile]
class CustomerSegment(BaseModel):
name: str
size: str
pain_points: list[str]
willingness_to_pay: str
class CustomerReport(BaseModel):
segments: list[CustomerSegment]
# ─── Specialized Agents with Structured Output ───
market_agent = Agent(
name="MarketResearcher",
instructions="Provide detailed market analysis with specific numbers and data points.",
model="gpt-4o",
output_type=MarketData,
)
competitor_agent = Agent(
name="CompetitorResearcher",
instructions="Profile the top 3-5 competitors with specific market share estimates.",
model="gpt-4o",
output_type=CompetitorReport,
)
customer_agent = Agent(
name="CustomerResearcher",
instructions="Identify 3-4 distinct customer segments with specific characteristics.",
model="gpt-4o",
output_type=CustomerReport,
)
# ─── Synthesis Agent ───
synthesis_agent = Agent(
name="ReportWriter",
instructions="""Write an executive market research report from the provided data.
Structure it as: Executive Summary, Market Overview, Competitive Landscape,
Customer Segments, and Strategic Recommendations. Use specific data points.""",
model="gpt-4o",
)
# ─── Orchestration ───
async def run_with_timeout_and_retry(
agent: Agent,
input_text: str,
timeout: float = 45.0,
retries: int = 2,
):
"""Run agent with timeout and retry logic."""
for attempt in range(retries):
try:
result = await asyncio.wait_for(
Runner.run(agent, input=input_text),
timeout=timeout,
)
return result
except asyncio.TimeoutError:
if attempt == retries - 1:
raise
await asyncio.sleep(1)
except Exception:
if attempt == retries - 1:
raise
await asyncio.sleep(2 ** attempt)
async def generate_research_report(topic: str) -> str:
"""Generate a full market research report using parallel agents."""
print(f"Starting parallel research on: {topic}")
# Phase 1: Parallel data gathering
results = await asyncio.gather(
run_with_timeout_and_retry(market_agent, f"Market analysis: {topic}"),
run_with_timeout_and_retry(competitor_agent, f"Competitor analysis: {topic}"),
run_with_timeout_and_retry(customer_agent, f"Customer analysis: {topic}"),
return_exceptions=True,
)
# Phase 2: Collect results, handling any failures
sections = []
labels = ["MARKET DATA", "COMPETITOR DATA", "CUSTOMER DATA"]
for label, result in zip(labels, results):
if isinstance(result, Exception):
sections.append(f"{label}: Data unavailable due to error: {str(result)}")
else:
output = result.final_output
if hasattr(output, 'model_dump'):
sections.append(f"{label}:\n{json.dumps(output.model_dump(), indent=2)}")
else:
sections.append(f"{label}:\n{output}")
combined = "\n\n".join(sections)
# Phase 3: Synthesize into final report
report_result = await Runner.run(
synthesis_agent,
input=f"Write a market research report from this data:\n\n{combined}",
)
return report_result.final_output
async def main():
report = await generate_research_report(
"AI-powered voice agents for customer service in 2026"
)
print("\n" + "=" * 60)
print("FINAL REPORT")
print("=" * 60)
print(report)
asyncio.run(main())
When NOT to Parallelize
Parallel execution is not always the right choice:
- When agents depend on each other's output: If Agent B needs Agent A's result, they must run sequentially
- When you are rate-limited: Running 10 agents in parallel might hit API rate limits. Use
asyncio.Semaphoreto limit concurrency - When context is shared and mutable: If agents modify the same context object, parallel execution creates race conditions
Using Semaphore for Rate Limiting
import asyncio
from agents import Agent, Runner
# Limit to 3 concurrent agent runs
semaphore = asyncio.Semaphore(3)
async def run_with_semaphore(agent: Agent, input_text: str):
async with semaphore:
return await Runner.run(agent, input=input_text)
async def main():
agents = [
Agent(name=f"Agent{i}", instructions=f"Task {i}", model="gpt-4o-mini")
for i in range(10)
]
# Only 3 will run at a time despite 10 being queued
results = await asyncio.gather(*[
run_with_semaphore(agent, "Analyze this market")
for agent in agents
])
Summary
Parallel agent execution with asyncio.gather is one of the highest-impact performance optimizations for multi-agent systems. Use it whenever you have independent tasks that can run concurrently. Add return_exceptions=True to prevent one failure from canceling everything. Add timeouts to prevent slow agents from blocking the pipeline. Add retries for resilience. And use a synthesis agent to combine parallel results into coherent output.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.