Skip to content
Learn Agentic AI
Learn Agentic AI11 min read1 views

Parallel Fan-Out Fan-In Patterns: Processing Multiple Sub-Tasks Simultaneously

Implement fan-out fan-in patterns for AI agents to distribute work across parallel sub-tasks, aggregate results, handle partial failures gracefully, and enforce timeouts on straggler tasks.

The Fan-Out Fan-In Pattern

Many agent tasks naturally decompose into independent sub-tasks. A research agent might need to search five databases simultaneously. A code review agent might analyze ten files in parallel. A customer support agent might check order status, payment history, and shipping details all at once.

The fan-out fan-in pattern distributes work across multiple concurrent sub-tasks (fan-out) and then collects and merges the results (fan-in). This pattern dramatically reduces total execution time — instead of running N tasks sequentially in N * T seconds, you run them in parallel in roughly T seconds.

Basic Fan-Out with asyncio.gather

The simplest implementation uses asyncio.gather:

flowchart TD
    START["Parallel Fan-Out Fan-In Patterns: Processing Mult…"] --> A
    A["The Fan-Out Fan-In Pattern"]
    A --> B
    B["Basic Fan-Out with asyncio.gather"]
    B --> C
    C["Robust Fan-Out with Partial Failure Han…"]
    C --> D
    D["The Fan-In Aggregator"]
    D --> E
    E["Bounded Concurrency with Semaphores"]
    E --> F
    F["Complete Agent Example: Multi-Source Re…"]
    F --> G
    G["FAQ"]
    G --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from typing import Any, Callable, Awaitable

async def fan_out_gather(
    tasks: list[Callable[[], Awaitable[Any]]],
) -> list[Any]:
    """Run all tasks in parallel and collect results."""
    return await asyncio.gather(*[task() for task in tasks])

# Example: search multiple sources in parallel
async def search_arxiv(query: str) -> dict:
    await asyncio.sleep(1)  # Simulate API call
    return {"source": "arxiv", "results": ["paper1", "paper2"]}

async def search_scholar(query: str) -> dict:
    await asyncio.sleep(1.5)
    return {"source": "scholar", "results": ["paper3"]}

async def search_semantic(query: str) -> dict:
    await asyncio.sleep(0.8)
    return {"source": "semantic_scholar", "results": ["paper4", "paper5"]}

query = "agentic AI workflows"
results = await fan_out_gather([
    lambda: search_arxiv(query),
    lambda: search_scholar(query),
    lambda: search_semantic(query),
])
# All three searches complete in ~1.5s (the slowest) instead of ~3.3s

The problem with plain gather is that one failed task raises an exception and cancels everything. Production systems need better error handling.

Robust Fan-Out with Partial Failure Handling

Use asyncio.gather(return_exceptions=True) or a custom wrapper to handle individual task failures without aborting the entire batch:

from dataclasses import dataclass
from typing import TypeVar, Generic

T = TypeVar("T")

@dataclass
class TaskResult(Generic[T]):
    task_id: str
    success: bool
    result: T | None = None
    error: str | None = None
    duration_ms: float = 0.0

async def robust_fan_out(
    tasks: dict[str, Callable[[], Awaitable[Any]]],
    timeout: float | None = None,
) -> dict[str, TaskResult]:
    """Fan-out with per-task error handling and optional timeout."""
    import time

    async def wrapped(task_id: str, fn: Callable) -> TaskResult:
        start = time.monotonic()
        try:
            result = await fn()
            elapsed = (time.monotonic() - start) * 1000
            return TaskResult(task_id, True, result, duration_ms=elapsed)
        except Exception as e:
            elapsed = (time.monotonic() - start) * 1000
            return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)

    coros = [wrapped(tid, fn) for tid, fn in tasks.items()]

    if timeout:
        done, pending = await asyncio.wait(
            [asyncio.create_task(c) for c in coros],
            timeout=timeout,
        )
        # Cancel timed-out tasks
        for task in pending:
            task.cancel()

        results = {}
        for task in done:
            r = task.result()
            results[r.task_id] = r
        for task in pending:
            # Mark timed-out tasks
            results[f"timeout_{id(task)}"] = TaskResult(
                "unknown", False, error="Task timed out"
            )
        return results
    else:
        raw_results = await asyncio.gather(*coros)
        return {r.task_id: r for r in raw_results}

Now individual task failures are captured in the TaskResult without crashing the entire operation.

The Fan-In Aggregator

After fan-out completes, the fan-in stage merges partial results into a coherent output:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class ResultAggregator:
    """Merge results from parallel sub-tasks."""

    def __init__(self, min_success_ratio: float = 0.5):
        self.min_success_ratio = min_success_ratio

    def aggregate(
        self, results: dict[str, TaskResult]
    ) -> dict[str, Any]:
        successful = {
            tid: r for tid, r in results.items() if r.success
        }
        failed = {
            tid: r for tid, r in results.items() if not r.success
        }

        total = len(results)
        success_count = len(successful)
        success_ratio = success_count / total if total else 0

        if success_ratio < self.min_success_ratio:
            raise InsufficientResultsError(
                f"Only {success_count}/{total} tasks succeeded "
                f"({success_ratio:.0%}), minimum is {self.min_success_ratio:.0%}"
            )

        return {
            "merged_results": [r.result for r in successful.values()],
            "success_count": success_count,
            "failure_count": len(failed),
            "failed_tasks": {
                tid: r.error for tid, r in failed.items()
            },
            "total_duration_ms": max(
                r.duration_ms for r in results.values()
            ),
        }

The min_success_ratio parameter controls how many tasks must succeed before the aggregated result is considered valid. For a research agent querying five sources, maybe three out of five is acceptable. For a financial reconciliation, you might need all tasks to succeed.

Bounded Concurrency with Semaphores

Unbounded fan-out can overwhelm downstream services. Use a semaphore to limit concurrency:

async def bounded_fan_out(
    tasks: dict[str, Callable[[], Awaitable[Any]]],
    max_concurrency: int = 5,
) -> dict[str, TaskResult]:
    """Fan-out with bounded concurrency."""
    semaphore = asyncio.Semaphore(max_concurrency)
    import time

    async def limited(task_id: str, fn: Callable) -> TaskResult:
        async with semaphore:
            start = time.monotonic()
            try:
                result = await fn()
                elapsed = (time.monotonic() - start) * 1000
                return TaskResult(task_id, True, result, duration_ms=elapsed)
            except Exception as e:
                elapsed = (time.monotonic() - start) * 1000
                return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)

    coros = [limited(tid, fn) for tid, fn in tasks.items()]
    raw_results = await asyncio.gather(*coros)
    return {r.task_id: r for r in raw_results}

With max_concurrency=5, at most five tasks run simultaneously even if you fan out to fifty sub-tasks.

Complete Agent Example: Multi-Source Research

Putting the pattern together for a research agent that queries multiple sources, handles failures, and synthesizes results:

class ResearchAgent:
    def __init__(self, llm_client, sources: dict):
        self.llm = llm_client
        self.sources = sources
        self.aggregator = ResultAggregator(min_success_ratio=0.4)

    async def research(self, query: str) -> str:
        # Fan-out: query all sources in parallel
        tasks = {
            name: lambda n=name: self._search_source(n, query)
            for name in self.sources
        }
        results = await bounded_fan_out(tasks, max_concurrency=3)

        # Fan-in: aggregate results
        aggregated = self.aggregator.aggregate(results)

        # Synthesize with LLM
        return await self._synthesize(query, aggregated)

    async def _search_source(self, source_name: str, query: str) -> list:
        search_fn = self.sources[source_name]
        return await search_fn(query)

    async def _synthesize(self, query: str, aggregated: dict) -> str:
        all_results = []
        for source_results in aggregated["merged_results"]:
            all_results.extend(source_results)

        prompt = f"""Synthesize these research results for the query: {query}

Results from {aggregated['success_count']} sources:
{json.dumps(all_results, indent=2)}

Note: {aggregated['failure_count']} sources failed and are excluded.
Provide a comprehensive summary."""

        response = await self.llm.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
        )
        return response.choices[0].message.content

FAQ

How do I decide the right max_concurrency value?

Start with the most restrictive downstream limit. If you are calling an API with a rate limit of 10 requests per second, set max_concurrency to 10 or lower. If you are calling multiple different APIs, each with its own limit, use separate semaphores per API. For LLM APIs, check your tier's rate limit (requests per minute) and set concurrency accordingly. Monitor for 429 (rate limit) errors and adjust down if they appear.

Should I use asyncio.gather, asyncio.TaskGroup, or asyncio.wait?

Use gather with return_exceptions=True for the simplest case where you want all results including errors. Use TaskGroup (Python 3.11 and later) when you want structured concurrency with automatic cleanup — if one task fails, all others are cancelled. Use wait when you need fine-grained control over timeouts or want to process results as they complete rather than waiting for all tasks.

What happens if the slowest sub-task takes much longer than the others?

This is the "straggler" problem. Set a timeout on the entire fan-out operation. When the timeout fires, cancel the straggler and proceed with the results you have. The aggregator checks whether enough tasks succeeded to produce a meaningful result. For research tasks, getting four out of five sources in two seconds is often better than waiting 30 seconds for the fifth source.


#FanOutFanIn #ParallelProcessing #Concurrency #Asyncio #Python #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

OpenAI Agents SDK in 2026: Building Multi-Agent Systems with Handoffs and Guardrails

Complete tutorial on the OpenAI Agents SDK covering agent creation, tool definitions, handoff patterns between specialist agents, and input/output guardrails for safe AI systems.

Learn Agentic AI

Building a Research Agent with Web Search and Report Generation: Complete Tutorial

Build a research agent that searches the web, extracts and synthesizes data, and generates formatted reports using OpenAI Agents SDK and web search tools.

Learn Agentic AI

Build a Customer Support Agent from Scratch: Python, OpenAI, and Twilio in 60 Minutes

Step-by-step tutorial to build a production-ready customer support AI agent using Python FastAPI, OpenAI Agents SDK, and Twilio Voice with five integrated tools.

Learn Agentic AI

LangGraph Agent Patterns 2026: Building Stateful Multi-Step AI Workflows

Complete LangGraph tutorial covering state machines for agents, conditional edges, human-in-the-loop patterns, checkpointing, and parallel execution with full code examples.