---
title: "Parallel Fan-Out Fan-In Patterns: Processing Multiple Sub-Tasks Simultaneously"
description: "Implement fan-out fan-in patterns for AI agents to distribute work across parallel sub-tasks, aggregate results, handle partial failures gracefully, and enforce timeouts on straggler tasks."
canonical: https://callsphere.ai/blog/parallel-fan-out-fan-in-patterns-processing-sub-tasks-simultaneously
category: "Learn Agentic AI"
tags: ["Fan-Out Fan-In", "Parallel Processing", "Concurrency", "asyncio", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:43.611Z
---

# Parallel Fan-Out Fan-In Patterns: Processing Multiple Sub-Tasks Simultaneously

> Implement fan-out fan-in patterns for AI agents to distribute work across parallel sub-tasks, aggregate results, handle partial failures gracefully, and enforce timeouts on straggler tasks.

## The Fan-Out Fan-In Pattern

Many agent tasks naturally decompose into independent sub-tasks. A research agent might need to search five databases simultaneously. A code review agent might analyze ten files in parallel. A customer support agent might check order status, payment history, and shipping details all at once.

The **fan-out fan-in** pattern distributes work across multiple concurrent sub-tasks (fan-out) and then collects and merges the results (fan-in). This pattern dramatically reduces total execution time — instead of running N tasks sequentially in N * T seconds, you run them in parallel in roughly T seconds.

## Basic Fan-Out with asyncio.gather

The simplest implementation uses `asyncio.gather`:

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
import asyncio
from typing import Any, Callable, Awaitable

async def fan_out_gather(
    tasks: list[Callable[[], Awaitable[Any]]],
) -> list[Any]:
    """Run all tasks in parallel and collect results."""
    return await asyncio.gather(*[task() for task in tasks])

# Example: search multiple sources in parallel
async def search_arxiv(query: str) -> dict:
    await asyncio.sleep(1)  # Simulate API call
    return {"source": "arxiv", "results": ["paper1", "paper2"]}

async def search_scholar(query: str) -> dict:
    await asyncio.sleep(1.5)
    return {"source": "scholar", "results": ["paper3"]}

async def search_semantic(query: str) -> dict:
    await asyncio.sleep(0.8)
    return {"source": "semantic_scholar", "results": ["paper4", "paper5"]}

query = "agentic AI workflows"
results = await fan_out_gather([
    lambda: search_arxiv(query),
    lambda: search_scholar(query),
    lambda: search_semantic(query),
])
# All three searches complete in ~1.5s (the slowest) instead of ~3.3s
```

The problem with plain `gather` is that one failed task raises an exception and cancels everything. Production systems need better error handling.

## Robust Fan-Out with Partial Failure Handling

Use `asyncio.gather(return_exceptions=True)` or a custom wrapper to handle individual task failures without aborting the entire batch:

```python
from dataclasses import dataclass
from typing import TypeVar, Generic

T = TypeVar("T")

@dataclass
class TaskResult(Generic[T]):
    task_id: str
    success: bool
    result: T | None = None
    error: str | None = None
    duration_ms: float = 0.0

async def robust_fan_out(
    tasks: dict[str, Callable[[], Awaitable[Any]]],
    timeout: float | None = None,
) -> dict[str, TaskResult]:
    """Fan-out with per-task error handling and optional timeout."""
    import time

    async def wrapped(task_id: str, fn: Callable) -> TaskResult:
        start = time.monotonic()
        try:
            result = await fn()
            elapsed = (time.monotonic() - start) * 1000
            return TaskResult(task_id, True, result, duration_ms=elapsed)
        except Exception as e:
            elapsed = (time.monotonic() - start) * 1000
            return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)

    coros = [wrapped(tid, fn) for tid, fn in tasks.items()]

    if timeout:
        done, pending = await asyncio.wait(
            [asyncio.create_task(c) for c in coros],
            timeout=timeout,
        )
        # Cancel timed-out tasks
        for task in pending:
            task.cancel()

        results = {}
        for task in done:
            r = task.result()
            results[r.task_id] = r
        for task in pending:
            # Mark timed-out tasks
            results[f"timeout_{id(task)}"] = TaskResult(
                "unknown", False, error="Task timed out"
            )
        return results
    else:
        raw_results = await asyncio.gather(*coros)
        return {r.task_id: r for r in raw_results}
```

Now individual task failures are captured in the `TaskResult` without crashing the entire operation.

## The Fan-In Aggregator

After fan-out completes, the fan-in stage merges partial results into a coherent output:

```python
class ResultAggregator:
    """Merge results from parallel sub-tasks."""

    def __init__(self, min_success_ratio: float = 0.5):
        self.min_success_ratio = min_success_ratio

    def aggregate(
        self, results: dict[str, TaskResult]
    ) -> dict[str, Any]:
        successful = {
            tid: r for tid, r in results.items() if r.success
        }
        failed = {
            tid: r for tid, r in results.items() if not r.success
        }

        total = len(results)
        success_count = len(successful)
        success_ratio = success_count / total if total else 0

        if success_ratio  dict[str, TaskResult]:
    """Fan-out with bounded concurrency."""
    semaphore = asyncio.Semaphore(max_concurrency)
    import time

    async def limited(task_id: str, fn: Callable) -> TaskResult:
        async with semaphore:
            start = time.monotonic()
            try:
                result = await fn()
                elapsed = (time.monotonic() - start) * 1000
                return TaskResult(task_id, True, result, duration_ms=elapsed)
            except Exception as e:
                elapsed = (time.monotonic() - start) * 1000
                return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)

    coros = [limited(tid, fn) for tid, fn in tasks.items()]
    raw_results = await asyncio.gather(*coros)
    return {r.task_id: r for r in raw_results}
```

With `max_concurrency=5`, at most five tasks run simultaneously even if you fan out to fifty sub-tasks.

## Complete Agent Example: Multi-Source Research

Putting the pattern together for a research agent that queries multiple sources, handles failures, and synthesizes results:

```python
class ResearchAgent:
    def __init__(self, llm_client, sources: dict):
        self.llm = llm_client
        self.sources = sources
        self.aggregator = ResultAggregator(min_success_ratio=0.4)

    async def research(self, query: str) -> str:
        # Fan-out: query all sources in parallel
        tasks = {
            name: lambda n=name: self._search_source(n, query)
            for name in self.sources
        }
        results = await bounded_fan_out(tasks, max_concurrency=3)

        # Fan-in: aggregate results
        aggregated = self.aggregator.aggregate(results)

        # Synthesize with LLM
        return await self._synthesize(query, aggregated)

    async def _search_source(self, source_name: str, query: str) -> list:
        search_fn = self.sources[source_name]
        return await search_fn(query)

    async def _synthesize(self, query: str, aggregated: dict) -> str:
        all_results = []
        for source_results in aggregated["merged_results"]:
            all_results.extend(source_results)

        prompt = f"""Synthesize these research results for the query: {query}

Results from {aggregated['success_count']} sources:
{json.dumps(all_results, indent=2)}

Note: {aggregated['failure_count']} sources failed and are excluded.
Provide a comprehensive summary."""

        response = await self.llm.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
        )
        return response.choices[0].message.content
```

## FAQ

### How do I decide the right max_concurrency value?

Start with the most restrictive downstream limit. If you are calling an API with a rate limit of 10 requests per second, set max_concurrency to 10 or lower. If you are calling multiple different APIs, each with its own limit, use separate semaphores per API. For LLM APIs, check your tier's rate limit (requests per minute) and set concurrency accordingly. Monitor for 429 (rate limit) errors and adjust down if they appear.

### Should I use asyncio.gather, asyncio.TaskGroup, or asyncio.wait?

Use `gather` with `return_exceptions=True` for the simplest case where you want all results including errors. Use `TaskGroup` (Python 3.11 and later) when you want structured concurrency with automatic cleanup — if one task fails, all others are cancelled. Use `wait` when you need fine-grained control over timeouts or want to process results as they complete rather than waiting for all tasks.

### What happens if the slowest sub-task takes much longer than the others?

This is the "straggler" problem. Set a timeout on the entire fan-out operation. When the timeout fires, cancel the straggler and proceed with the results you have. The aggregator checks whether enough tasks succeeded to produce a meaningful result. For research tasks, getting four out of five sources in two seconds is often better than waiting 30 seconds for the fifth source.

---

#FanOutFanIn #ParallelProcessing #Concurrency #Asyncio #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/parallel-fan-out-fan-in-patterns-processing-sub-tasks-simultaneously
