Skip to content
Learn Agentic AI
Learn Agentic AI11 min read5 views

Parallel Agent Execution with asyncio.gather

Learn how to run multiple OpenAI agents concurrently using asyncio.gather for dramatic performance improvements, with error handling strategies and a complete market research example.

Why Parallel Execution Matters

When you run agents sequentially, total execution time is the sum of all agent runtimes. If Agent A takes 3 seconds, Agent B takes 4 seconds, and Agent C takes 2 seconds, you wait 9 seconds.

With parallel execution, total time equals the slowest agent. Those same three agents running concurrently finish in 4 seconds — a 56% reduction.

The OpenAI Agents SDK is built on async Python, making it a natural fit for parallel execution via asyncio.gather. This post covers the patterns, pitfalls, and production considerations for running agents in parallel.

Basic Parallel Execution

The simplest pattern runs multiple agents on the same input concurrently:

flowchart TD
    START["Parallel Agent Execution with asyncio.gather"] --> A
    A["Why Parallel Execution Matters"]
    A --> B
    B["Basic Parallel Execution"]
    B --> C
    C["Parallel Execution with Different Inputs"]
    C --> D
    D["Error Handling in Parallel Execution"]
    D --> E
    E["Combining Results from Parallel Agents"]
    E --> F
    F["Performance Benchmarking"]
    F --> G
    G["Building a Complete Market Research Sys…"]
    G --> H
    H["When NOT to Parallelize"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from agents import Agent, Runner
import asyncio

sentiment_agent = Agent(
    name="SentimentAnalyzer",
    instructions="Analyze the sentiment of the given text. Return: positive, negative, or neutral with a confidence score 0-100.",
    model="gpt-4o-mini",
)

topic_agent = Agent(
    name="TopicExtractor",
    instructions="Extract the main topics from the given text. Return a JSON list of topics.",
    model="gpt-4o-mini",
)

summary_agent = Agent(
    name="Summarizer",
    instructions="Summarize the given text in exactly one sentence.",
    model="gpt-4o-mini",
)

async def analyze_text(text: str):
    # Run all three agents in parallel
    sentiment_result, topic_result, summary_result = await asyncio.gather(
        Runner.run(sentiment_agent, input=text),
        Runner.run(topic_agent, input=text),
        Runner.run(summary_agent, input=text),
    )

    return {
        "sentiment": sentiment_result.final_output,
        "topics": topic_result.final_output,
        "summary": summary_result.final_output,
    }

async def main():
    text = """
    The new AI regulations proposed by the European Commission have sparked
    intense debate among technology leaders. While some argue the rules will
    stifle innovation, others believe they provide necessary consumer protections.
    The legislation is expected to be finalized by Q3 2026.
    """
    results = await analyze_text(text)
    for key, value in results.items():
        print(f"{key}: {value}\n")

asyncio.run(main())

Parallel Execution with Different Inputs

A more common pattern is running the same agent on different inputs, or different agents on different inputs:

from agents import Agent, Runner
import asyncio

researcher = Agent(
    name="Researcher",
    instructions="""Research the given company and provide:
    - Industry and market position
    - Key products/services
    - Recent developments
    - Competitive advantages""",
    model="gpt-4o",
)

async def research_companies(companies: list[str]) -> dict:
    """Research multiple companies in parallel."""
    tasks = [
        Runner.run(researcher, input=f"Research this company: {company}")
        for company in companies
    ]

    results = await asyncio.gather(*tasks)

    return {
        company: result.final_output
        for company, result in zip(companies, results)
    }

async def main():
    companies = ["Stripe", "Datadog", "Cloudflare", "Vercel"]
    reports = await research_companies(companies)
    for company, report in reports.items():
        print(f"\n{'=' * 40}")
        print(f"Company: {company}")
        print(report)

asyncio.run(main())

Error Handling in Parallel Execution

The critical question with asyncio.gather is: what happens when one agent fails? By default, if any task raises an exception, gather cancels all remaining tasks and raises the first exception. This is often not what you want.

return_exceptions=True

The simplest error handling strategy uses the return_exceptions parameter:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from agents import Agent, Runner
import asyncio

agent_a = Agent(name="AgentA", instructions="Analyze market trends.", model="gpt-4o")
agent_b = Agent(name="AgentB", instructions="Analyze competitor positioning.", model="gpt-4o")
agent_c = Agent(name="AgentC", instructions="Analyze customer sentiment.", model="gpt-4o")

async def parallel_analysis(input_text: str) -> dict:
    results = await asyncio.gather(
        Runner.run(agent_a, input=input_text),
        Runner.run(agent_b, input=input_text),
        Runner.run(agent_c, input=input_text),
        return_exceptions=True,  # Don't cancel other tasks on failure
    )

    analysis = {}
    agents = ["market_trends", "competitor", "sentiment"]

    for name, result in zip(agents, results):
        if isinstance(result, Exception):
            analysis[name] = f"ERROR: {type(result).__name__}: {str(result)}"
        else:
            analysis[name] = result.final_output

    return analysis

async def main():
    analysis = await parallel_analysis("Analyze the AI agent framework market")
    for section, content in analysis.items():
        print(f"\n{section}:")
        print(content)

asyncio.run(main())

Retry Logic for Failed Agents

For production systems, add retry logic around individual agent calls:

from agents import Agent, Runner
import asyncio

async def run_with_retry(
    agent: Agent,
    input_text: str,
    max_retries: int = 3,
    delay: float = 1.0,
) -> str:
    """Run an agent with retry logic."""
    last_error = None
    for attempt in range(max_retries):
        try:
            result = await Runner.run(agent, input=input_text)
            return result.final_output
        except Exception as e:
            last_error = e
            if attempt < max_retries - 1:
                await asyncio.sleep(delay * (2 ** attempt))  # Exponential backoff
    return f"FAILED after {max_retries} attempts: {str(last_error)}"

async def parallel_with_retries(agents: list[Agent], input_text: str) -> list[str]:
    """Run multiple agents in parallel, each with retry logic."""
    tasks = [
        run_with_retry(agent, input_text)
        for agent in agents
    ]
    return await asyncio.gather(*tasks)

Timeout per Agent

Prevent a slow agent from holding up the entire pipeline:

from agents import Agent, Runner
import asyncio

async def run_with_timeout(
    agent: Agent,
    input_text: str,
    timeout_seconds: float = 30.0,
) -> str:
    """Run an agent with a timeout."""
    try:
        result = await asyncio.wait_for(
            Runner.run(agent, input=input_text),
            timeout=timeout_seconds,
        )
        return result.final_output
    except asyncio.TimeoutError:
        return f"TIMEOUT: {agent.name} did not complete within {timeout_seconds}s"

async def parallel_with_timeouts(
    agents: list[Agent],
    input_text: str,
    timeout: float = 30.0,
) -> list[str]:
    """Run multiple agents in parallel with individual timeouts."""
    tasks = [
        run_with_timeout(agent, input_text, timeout)
        for agent in agents
    ]
    return await asyncio.gather(*tasks)

Combining Results from Parallel Agents

After running agents in parallel, you often need a synthesis step. Use a dedicated synthesis agent:

flowchart TD
    ROOT["Parallel Agent Execution with asyncio.gather"] 
    ROOT --> P0["Error Handling in Parallel Execution"]
    P0 --> P0C0["return_exceptions=True"]
    P0 --> P0C1["Retry Logic for Failed Agents"]
    P0 --> P0C2["Timeout per Agent"]
    ROOT --> P1["When NOT to Parallelize"]
    P1 --> P1C0["Using Semaphore for Rate Limiting"]
    style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
    style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
from agents import Agent, Runner
import asyncio

# Parallel analysis agents
market_agent = Agent(
    name="MarketAnalyst",
    instructions="Analyze market size, growth rate, and trends for the given industry.",
    model="gpt-4o",
)

competitor_agent = Agent(
    name="CompetitorAnalyst",
    instructions="Identify top 5 competitors, their market share, and key differentiators.",
    model="gpt-4o",
)

customer_agent = Agent(
    name="CustomerAnalyst",
    instructions="Analyze target customer segments, pain points, and buying patterns.",
    model="gpt-4o",
)

# Synthesis agent
synthesizer = Agent(
    name="ReportSynthesizer",
    instructions="""You receive three separate analysis reports: market analysis,
    competitor analysis, and customer analysis. Synthesize them into a single
    coherent executive report with these sections:
    1. Executive Summary
    2. Market Opportunity
    3. Competitive Landscape
    4. Target Customer Profile
    5. Strategic Recommendations

    Be concise but data-driven. Reference specific findings from each report.""",
    model="gpt-4o",
)

async def generate_market_report(industry: str) -> str:
    """Generate a comprehensive market report using parallel agents."""

    # Phase 1: Run analysis agents in parallel
    market_result, competitor_result, customer_result = await asyncio.gather(
        Runner.run(market_agent, input=f"Analyze the {industry} industry"),
        Runner.run(competitor_agent, input=f"Analyze competitors in {industry}"),
        Runner.run(customer_agent, input=f"Analyze customers in {industry}"),
    )

    # Phase 2: Synthesize results
    combined_input = f"""
    MARKET ANALYSIS:
    {market_result.final_output}

    COMPETITOR ANALYSIS:
    {competitor_result.final_output}

    CUSTOMER ANALYSIS:
    {customer_result.final_output}
    """

    synthesis = await Runner.run(synthesizer, input=combined_input)
    return synthesis.final_output

async def main():
    report = await generate_market_report("AI-powered customer service platforms")
    print(report)

asyncio.run(main())

Performance Benchmarking

Here is a utility to measure the performance difference between sequential and parallel execution:

from agents import Agent, Runner
import asyncio
import time

async def benchmark_sequential(agents: list[Agent], input_text: str) -> float:
    """Run agents sequentially and return total time."""
    start = time.monotonic()
    for agent in agents:
        await Runner.run(agent, input=input_text)
    elapsed = time.monotonic() - start
    return elapsed

async def benchmark_parallel(agents: list[Agent], input_text: str) -> float:
    """Run agents in parallel and return total time."""
    start = time.monotonic()
    await asyncio.gather(*[
        Runner.run(agent, input=input_text)
        for agent in agents
    ])
    elapsed = time.monotonic() - start
    return elapsed

async def main():
    agents = [
        Agent(name=f"Agent{i}", instructions=f"Analyze aspect {i} of the input.", model="gpt-4o-mini")
        for i in range(5)
    ]
    input_text = "Analyze the AI agent framework market"

    seq_time = await benchmark_sequential(agents, input_text)
    par_time = await benchmark_parallel(agents, input_text)

    print(f"Sequential: {seq_time:.2f}s")
    print(f"Parallel:   {par_time:.2f}s")
    print(f"Speedup:    {seq_time / par_time:.1f}x")

asyncio.run(main())

Typical results with 5 agents: sequential takes 12-15 seconds, parallel takes 3-4 seconds, yielding a 3-4x speedup.

Building a Complete Market Research System

Here is a full market research system that demonstrates all parallel execution patterns:

from agents import Agent, Runner
from pydantic import BaseModel
import asyncio
import json

# ─── Structured Output Models ───

class MarketData(BaseModel):
    market_size_usd: str
    growth_rate: str
    key_trends: list[str]
    risks: list[str]

class CompetitorProfile(BaseModel):
    name: str
    market_share: str
    strengths: list[str]
    weaknesses: list[str]

class CompetitorReport(BaseModel):
    competitors: list[CompetitorProfile]

class CustomerSegment(BaseModel):
    name: str
    size: str
    pain_points: list[str]
    willingness_to_pay: str

class CustomerReport(BaseModel):
    segments: list[CustomerSegment]

# ─── Specialized Agents with Structured Output ───

market_agent = Agent(
    name="MarketResearcher",
    instructions="Provide detailed market analysis with specific numbers and data points.",
    model="gpt-4o",
    output_type=MarketData,
)

competitor_agent = Agent(
    name="CompetitorResearcher",
    instructions="Profile the top 3-5 competitors with specific market share estimates.",
    model="gpt-4o",
    output_type=CompetitorReport,
)

customer_agent = Agent(
    name="CustomerResearcher",
    instructions="Identify 3-4 distinct customer segments with specific characteristics.",
    model="gpt-4o",
    output_type=CustomerReport,
)

# ─── Synthesis Agent ───

synthesis_agent = Agent(
    name="ReportWriter",
    instructions="""Write an executive market research report from the provided data.
    Structure it as: Executive Summary, Market Overview, Competitive Landscape,
    Customer Segments, and Strategic Recommendations. Use specific data points.""",
    model="gpt-4o",
)

# ─── Orchestration ───

async def run_with_timeout_and_retry(
    agent: Agent,
    input_text: str,
    timeout: float = 45.0,
    retries: int = 2,
):
    """Run agent with timeout and retry logic."""
    for attempt in range(retries):
        try:
            result = await asyncio.wait_for(
                Runner.run(agent, input=input_text),
                timeout=timeout,
            )
            return result
        except asyncio.TimeoutError:
            if attempt == retries - 1:
                raise
            await asyncio.sleep(1)
        except Exception:
            if attempt == retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)

async def generate_research_report(topic: str) -> str:
    """Generate a full market research report using parallel agents."""

    print(f"Starting parallel research on: {topic}")

    # Phase 1: Parallel data gathering
    results = await asyncio.gather(
        run_with_timeout_and_retry(market_agent, f"Market analysis: {topic}"),
        run_with_timeout_and_retry(competitor_agent, f"Competitor analysis: {topic}"),
        run_with_timeout_and_retry(customer_agent, f"Customer analysis: {topic}"),
        return_exceptions=True,
    )

    # Phase 2: Collect results, handling any failures
    sections = []
    labels = ["MARKET DATA", "COMPETITOR DATA", "CUSTOMER DATA"]

    for label, result in zip(labels, results):
        if isinstance(result, Exception):
            sections.append(f"{label}: Data unavailable due to error: {str(result)}")
        else:
            output = result.final_output
            if hasattr(output, 'model_dump'):
                sections.append(f"{label}:\n{json.dumps(output.model_dump(), indent=2)}")
            else:
                sections.append(f"{label}:\n{output}")

    combined = "\n\n".join(sections)

    # Phase 3: Synthesize into final report
    report_result = await Runner.run(
        synthesis_agent,
        input=f"Write a market research report from this data:\n\n{combined}",
    )

    return report_result.final_output

async def main():
    report = await generate_research_report(
        "AI-powered voice agents for customer service in 2026"
    )
    print("\n" + "=" * 60)
    print("FINAL REPORT")
    print("=" * 60)
    print(report)

asyncio.run(main())

When NOT to Parallelize

Parallel execution is not always the right choice:

  • When agents depend on each other's output: If Agent B needs Agent A's result, they must run sequentially
  • When you are rate-limited: Running 10 agents in parallel might hit API rate limits. Use asyncio.Semaphore to limit concurrency
  • When context is shared and mutable: If agents modify the same context object, parallel execution creates race conditions

Using Semaphore for Rate Limiting

import asyncio
from agents import Agent, Runner

# Limit to 3 concurrent agent runs
semaphore = asyncio.Semaphore(3)

async def run_with_semaphore(agent: Agent, input_text: str):
    async with semaphore:
        return await Runner.run(agent, input=input_text)

async def main():
    agents = [
        Agent(name=f"Agent{i}", instructions=f"Task {i}", model="gpt-4o-mini")
        for i in range(10)
    ]

    # Only 3 will run at a time despite 10 being queued
    results = await asyncio.gather(*[
        run_with_semaphore(agent, "Analyze this market")
        for agent in agents
    ])

Summary

Parallel agent execution with asyncio.gather is one of the highest-impact performance optimizations for multi-agent systems. Use it whenever you have independent tasks that can run concurrently. Add return_exceptions=True to prevent one failure from canceling everything. Add timeouts to prevent slow agents from blocking the pipeline. Add retries for resilience. And use a synthesis agent to combine parallel results into coherent output.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like