Skip to content
Learn Agentic AI
Learn Agentic AI10 min read6 views

The Map-Reduce Pattern for AI Agents: Parallel Processing of Large Datasets

Implement the Map-Reduce pattern for AI agents to split large workloads across parallel agent workers and aggregate their results efficiently.

When a Single Agent Is Not Enough

Suppose you need an AI agent to analyze 500 customer reviews, summarize a 200-page document, or evaluate code across 50 repositories. A single sequential agent would take too long and might hit context window limits. The Map-Reduce pattern solves this by splitting work into chunks, processing each chunk in parallel with separate agent instances, and then aggregating the partial results into a final output.

This pattern, borrowed from distributed computing, is one of the most practical ways to scale AI agent workloads.

The Three Phases

  1. Split — Divide the input data into manageable chunks
  2. Map — Process each chunk independently (in parallel)
  3. Reduce — Combine all partial results into a single output

Implementation

import asyncio
from dataclasses import dataclass
from typing import Any, Callable, Awaitable


@dataclass
class MapResult:
    chunk_index: int
    output: Any
    success: bool
    error: str | None = None


class AgentMapReduce:
    def __init__(
        self,
        splitter: Callable[[Any], list[Any]],
        mapper: Callable[[Any, int], Awaitable[Any]],
        reducer: Callable[[list[MapResult]], Any],
        max_concurrency: int = 10,
    ):
        self.splitter = splitter
        self.mapper = mapper
        self.reducer = reducer
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def _process_chunk(self, chunk: Any,
                              index: int) -> MapResult:
        async with self.semaphore:
            try:
                output = await self.mapper(chunk, index)
                return MapResult(
                    chunk_index=index,
                    output=output,
                    success=True,
                )
            except Exception as e:
                return MapResult(
                    chunk_index=index,
                    output=None,
                    success=False,
                    error=str(e),
                )

    async def run(self, data: Any) -> Any:
        # Split phase
        chunks = self.splitter(data)
        print(f"Split into {len(chunks)} chunks")

        # Map phase — parallel execution
        tasks = [
            self._process_chunk(chunk, i)
            for i, chunk in enumerate(chunks)
        ]
        results = await asyncio.gather(*tasks)

        # Report failures
        failed = [r for r in results if not r.success]
        if failed:
            print(f"Warning: {len(failed)} chunks failed")

        # Reduce phase
        successful = sorted(
            [r for r in results if r.success],
            key=lambda r: r.chunk_index,
        )
        return self.reducer(successful)

Applying It to Review Analysis

Here is how you would analyze hundreds of customer reviews in parallel:

flowchart TD
    START["The Map-Reduce Pattern for AI Agents: Parallel Pr…"] --> A
    A["When a Single Agent Is Not Enough"]
    A --> B
    B["The Three Phases"]
    B --> C
    C["Implementation"]
    C --> D
    D["Applying It to Review Analysis"]
    D --> E
    E["Controlling Concurrency"]
    E --> F
    F["FAQ"]
    F --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import openai

client = openai.AsyncOpenAI()


def split_reviews(reviews: list[str]) -> list[list[str]]:
    chunk_size = 20
    return [reviews[i:i + chunk_size]
            for i in range(0, len(reviews), chunk_size)]


async def analyze_chunk(chunk: list[str], index: int) -> dict:
    combined = "\n---\n".join(chunk)
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system",
             "content": (
                 "Analyze these reviews. Return JSON with: "
                 "positive_count, negative_count, neutral_count, "
                 "top_themes (list of strings), average_sentiment (0-1)."
             )},
            {"role": "user", "content": combined},
        ],
        response_format={"type": "json_object"},
    )
    import json
    return json.loads(response.choices[0].message.content)


def aggregate_results(results: list[MapResult]) -> dict:
    total_positive = sum(r.output["positive_count"] for r in results)
    total_negative = sum(r.output["negative_count"] for r in results)
    total_neutral = sum(r.output["neutral_count"] for r in results)
    all_themes = []
    for r in results:
        all_themes.extend(r.output["top_themes"])

    # Deduplicate themes by frequency
    from collections import Counter
    theme_counts = Counter(all_themes)
    top_themes = [t for t, _ in theme_counts.most_common(10)]

    avg_sentiment = (
        sum(r.output["average_sentiment"] for r in results) / len(results)
    )
    return {
        "total_reviews": total_positive + total_negative + total_neutral,
        "positive": total_positive,
        "negative": total_negative,
        "neutral": total_neutral,
        "top_themes": top_themes,
        "average_sentiment": round(avg_sentiment, 3),
    }


mr = AgentMapReduce(
    splitter=split_reviews,
    mapper=analyze_chunk,
    reducer=aggregate_results,
    max_concurrency=5,
)

# reviews = [... 500 review strings ...]
# result = asyncio.run(mr.run(reviews))

Controlling Concurrency

The max_concurrency parameter controls how many map workers run simultaneously via an asyncio semaphore. This is essential for respecting API rate limits. If your LLM provider allows 10 requests per second, set concurrency to 8-9 to stay safely below the limit.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

FAQ

How should I choose the chunk size for splitting?

Balance between context window limits and meaningful work units. Each chunk should contain enough data for the agent to produce a useful partial result, but not so much that it exceeds the model's context window. For GPT-4o with 128K tokens, 20-50 reviews per chunk works well.

What if some chunks fail during the map phase?

The implementation above captures failures per chunk without aborting the entire run. After the map phase completes, retry only the failed chunks. If failures persist, log them and proceed with partial results — in many analytical workloads, losing 2-3% of data is acceptable.

Can I chain Map-Reduce with the Pipeline pattern?

Absolutely. A pipeline stage can internally use Map-Reduce for the heavy parallel work. For example, stage 1 fetches data, stage 2 runs Map-Reduce analysis across it, and stage 3 formats the aggregated results into a report.


#AgentDesignPatterns #MapReduce #Python #ParallelProcessing #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Fine-Tuning LLMs for Agentic Tasks: When and How to Customize Foundation Models

When fine-tuning beats prompting for AI agents: dataset creation from agent traces, SFT and DPO training approaches, evaluation methodology, and cost-benefit analysis for agentic fine-tuning.

AI Interview Prep

7 Agentic AI & Multi-Agent System Interview Questions for 2026

Real agentic AI and multi-agent system interview questions from Anthropic, OpenAI, and Microsoft in 2026. Covers agent design patterns, memory systems, safety, orchestration frameworks, tool calling, and evaluation.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

Adaptive Thinking in Claude 4.6: How AI Agents Decide When and How Much to Reason

Technical exploration of adaptive thinking in Claude 4.6 — how the model dynamically adjusts reasoning depth, its impact on agent architectures, and practical implementation patterns.

Learn Agentic AI

How NVIDIA Vera CPU Solves the Agentic AI Bottleneck: Architecture Deep Dive

Technical analysis of NVIDIA's Vera CPU designed for agentic AI workloads — why the CPU is the bottleneck, how Vera's architecture addresses it, and what it means for agent performance.