---
title: "The Map-Reduce Pattern for AI Agents: Parallel Processing of Large Datasets"
description: "Implement the Map-Reduce pattern for AI agents to split large workloads across parallel agent workers and aggregate their results efficiently."
canonical: https://callsphere.ai/blog/map-reduce-pattern-ai-agents-parallel-processing-large-datasets
category: "Learn Agentic AI"
tags: ["Agent Design Patterns", "Map-Reduce", "Python", "Parallel Processing", "Agentic AI"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:42.506Z
---

# The Map-Reduce Pattern for AI Agents: Parallel Processing of Large Datasets

> Implement the Map-Reduce pattern for AI agents to split large workloads across parallel agent workers and aggregate their results efficiently.

## When a Single Agent Is Not Enough

Suppose you need an AI agent to analyze 500 customer reviews, summarize a 200-page document, or evaluate code across 50 repositories. A single sequential agent would take too long and might hit context window limits. The Map-Reduce pattern solves this by splitting work into chunks, processing each chunk in parallel with separate agent instances, and then aggregating the partial results into a final output.

This pattern, borrowed from distributed computing, is one of the most practical ways to scale AI agent workloads.

## The Three Phases

1. **Split** — Divide the input data into manageable chunks
2. **Map** — Process each chunk independently (in parallel)
3. **Reduce** — Combine all partial results into a single output

## Implementation

```python
import asyncio
from dataclasses import dataclass
from typing import Any, Callable, Awaitable

@dataclass
class MapResult:
    chunk_index: int
    output: Any
    success: bool
    error: str | None = None

class AgentMapReduce:
    def __init__(
        self,
        splitter: Callable[[Any], list[Any]],
        mapper: Callable[[Any, int], Awaitable[Any]],
        reducer: Callable[[list[MapResult]], Any],
        max_concurrency: int = 10,
    ):
        self.splitter = splitter
        self.mapper = mapper
        self.reducer = reducer
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def _process_chunk(self, chunk: Any,
                              index: int) -> MapResult:
        async with self.semaphore:
            try:
                output = await self.mapper(chunk, index)
                return MapResult(
                    chunk_index=index,
                    output=output,
                    success=True,
                )
            except Exception as e:
                return MapResult(
                    chunk_index=index,
                    output=None,
                    success=False,
                    error=str(e),
                )

    async def run(self, data: Any) -> Any:
        # Split phase
        chunks = self.splitter(data)
        print(f"Split into {len(chunks)} chunks")

        # Map phase — parallel execution
        tasks = [
            self._process_chunk(chunk, i)
            for i, chunk in enumerate(chunks)
        ]
        results = await asyncio.gather(*tasks)

        # Report failures
        failed = [r for r in results if not r.success]
        if failed:
            print(f"Warning: {len(failed)} chunks failed")

        # Reduce phase
        successful = sorted(
            [r for r in results if r.success],
            key=lambda r: r.chunk_index,
        )
        return self.reducer(successful)
```

## Applying It to Review Analysis

Here is how you would analyze hundreds of customer reviews in parallel:

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
import openai

client = openai.AsyncOpenAI()

def split_reviews(reviews: list[str]) -> list[list[str]]:
    chunk_size = 20
    return [reviews[i:i + chunk_size]
            for i in range(0, len(reviews), chunk_size)]

async def analyze_chunk(chunk: list[str], index: int) -> dict:
    combined = "\n---\n".join(chunk)
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system",
             "content": (
                 "Analyze these reviews. Return JSON with: "
                 "positive_count, negative_count, neutral_count, "
                 "top_themes (list of strings), average_sentiment (0-1)."
             )},
            {"role": "user", "content": combined},
        ],
        response_format={"type": "json_object"},
    )
    import json
    return json.loads(response.choices[0].message.content)

def aggregate_results(results: list[MapResult]) -> dict:
    total_positive = sum(r.output["positive_count"] for r in results)
    total_negative = sum(r.output["negative_count"] for r in results)
    total_neutral = sum(r.output["neutral_count"] for r in results)
    all_themes = []
    for r in results:
        all_themes.extend(r.output["top_themes"])

    # Deduplicate themes by frequency
    from collections import Counter
    theme_counts = Counter(all_themes)
    top_themes = [t for t, _ in theme_counts.most_common(10)]

    avg_sentiment = (
        sum(r.output["average_sentiment"] for r in results) / len(results)
    )
    return {
        "total_reviews": total_positive + total_negative + total_neutral,
        "positive": total_positive,
        "negative": total_negative,
        "neutral": total_neutral,
        "top_themes": top_themes,
        "average_sentiment": round(avg_sentiment, 3),
    }

mr = AgentMapReduce(
    splitter=split_reviews,
    mapper=analyze_chunk,
    reducer=aggregate_results,
    max_concurrency=5,
)

# reviews = [... 500 review strings ...]
# result = asyncio.run(mr.run(reviews))
```

## Controlling Concurrency

The `max_concurrency` parameter controls how many map workers run simultaneously via an asyncio semaphore. This is essential for respecting API rate limits. If your LLM provider allows 10 requests per second, set concurrency to 8-9 to stay safely below the limit.

## FAQ

### How should I choose the chunk size for splitting?

Balance between context window limits and meaningful work units. Each chunk should contain enough data for the agent to produce a useful partial result, but not so much that it exceeds the model's context window. For GPT-4o with 128K tokens, 20-50 reviews per chunk works well.

### What if some chunks fail during the map phase?

The implementation above captures failures per chunk without aborting the entire run. After the map phase completes, retry only the failed chunks. If failures persist, log them and proceed with partial results — in many analytical workloads, losing 2-3% of data is acceptable.

### Can I chain Map-Reduce with the Pipeline pattern?

Absolutely. A pipeline stage can internally use Map-Reduce for the heavy parallel work. For example, stage 1 fetches data, stage 2 runs Map-Reduce analysis across it, and stage 3 formats the aggregated results into a report.

---

#AgentDesignPatterns #MapReduce #Python #ParallelProcessing #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/map-reduce-pattern-ai-agents-parallel-processing-large-datasets
