---
title: "The Pipeline Pattern: Sequential Agent Stages for Complex Data Processing"
description: "Master the Pipeline pattern for AI agents — design sequential processing stages with intermediate results, error propagation, and checkpointing for resilient multi-step workflows."
canonical: https://callsphere.ai/blog/pipeline-pattern-sequential-agent-stages-data-processing
category: "Learn Agentic AI"
tags: ["Agent Design Patterns", "Pipeline Pattern", "Python", "Agentic AI", "Data Processing"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-08T22:01:58.803Z
---

# The Pipeline Pattern: Sequential Agent Stages for Complex Data Processing

> Master the Pipeline pattern for AI agents — design sequential processing stages with intermediate results, error propagation, and checkpointing for resilient multi-step workflows.

## What Is the Pipeline Pattern?

The Pipeline pattern structures work as a sequence of stages, where the output of one stage becomes the input of the next. In AI agent systems, each stage can be a distinct agent or function that performs a specific transformation — extracting data, enriching it, validating it, and finally producing a result.

This pattern shines when you have a complex task that can be decomposed into well-defined, ordered steps. Instead of one monolithic agent trying to do everything at once, you break the work into focused stages that are individually testable, replaceable, and observable.

## Designing the Pipeline Framework

A good pipeline framework needs four capabilities: stage registration, intermediate result passing, error propagation, and checkpointing for recovery.

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
from dataclasses import dataclass, field
from typing import Any, Callable
from datetime import datetime
import json

@dataclass
class StageResult:
    stage_name: str
    output: Any
    started_at: datetime
    completed_at: datetime
    success: bool
    error: str | None = None

@dataclass
class PipelineContext:
    initial_input: Any
    results: list[StageResult] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

    @property
    def last_output(self) -> Any:
        if self.results:
            return self.results[-1].output
        return self.initial_input

    def checkpoint(self, path: str):
        data = {
            "initial_input": self.initial_input,
            "completed_stages": [r.stage_name for r in self.results
                                 if r.success],
            "last_output": self.last_output,
            "metadata": self.metadata,
        }
        with open(path, "w") as f:
            json.dump(data, f)

class Pipeline:
    def __init__(self, name: str, checkpoint_dir: str | None = None):
        self.name = name
        self.stages: list[tuple[str, Callable]] = []
        self.checkpoint_dir = checkpoint_dir

    def add_stage(self, name: str, handler: Callable):
        self.stages.append((name, handler))
        return self  # fluent interface

    def run(self, initial_input: Any) -> PipelineContext:
        ctx = PipelineContext(initial_input=initial_input)

        for stage_name, handler in self.stages:
            started = datetime.now()
            try:
                output = handler(ctx.last_output, ctx)
                ctx.results.append(StageResult(
                    stage_name=stage_name,
                    output=output,
                    started_at=started,
                    completed_at=datetime.now(),
                    success=True,
                ))
                if self.checkpoint_dir:
                    ctx.checkpoint(
                        f"{self.checkpoint_dir}/{self.name}_{stage_name}.json"
                    )
            except Exception as e:
                ctx.results.append(StageResult(
                    stage_name=stage_name,
                    output=None,
                    started_at=started,
                    completed_at=datetime.now(),
                    success=False,
                    error=str(e),
                ))
                raise PipelineError(stage_name, e, ctx)

        return ctx

class PipelineError(Exception):
    def __init__(self, stage: str, cause: Exception,
                 context: PipelineContext):
        self.stage = stage
        self.cause = cause
        self.context = context
        super().__init__(f"Pipeline failed at stage '{stage}': {cause}")
```

## Building a Real Pipeline

Here is a document-processing pipeline that extracts text, summarizes it, classifies sentiment, and generates a report:

```python
import openai

client = openai.OpenAI()

def extract_text(raw_input: str, ctx: PipelineContext) -> str:
    # Simulate extraction from raw document
    return raw_input.strip()

def summarize(text: str, ctx: PipelineContext) -> str:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Summarize in 2-3 sentences."},
            {"role": "user", "content": text},
        ],
    )
    return response.choices[0].message.content

def classify_sentiment(summary: str, ctx: PipelineContext) -> dict:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system",
             "content": "Return JSON: {"sentiment": "positive|negative|neutral", "score": 0.0-1.0}"},
            {"role": "user", "content": summary},
        ],
        response_format={"type": "json_object"},
    )
    import json
    result = json.loads(response.choices[0].message.content)
    ctx.metadata["sentiment"] = result["sentiment"]
    return result

pipeline = (
    Pipeline("doc_analysis", checkpoint_dir="/tmp/checkpoints")
    .add_stage("extract", extract_text)
    .add_stage("summarize", summarize)
    .add_stage("classify", classify_sentiment)
)

result = pipeline.run("Long document text here...")
print(result.last_output)  # {"sentiment": "positive", "score": 0.87}
```

## Error Propagation and Recovery

When a stage fails, `PipelineError` captures the stage name, the original exception, and the full pipeline context including all completed stage results. This lets you resume from the last successful checkpoint rather than re-running the entire pipeline — critical when early stages involve expensive API calls.

## FAQ

### How do I resume a pipeline from a checkpoint after a failure?

Load the checkpoint file, find the last completed stage, then create a new pipeline run starting from the next stage using the saved `last_output` as the initial input. Skip stages that already completed successfully in the checkpoint.

### Should each pipeline stage be a separate agent or a simple function?

Use simple functions for deterministic transformations (parsing, formatting, validation) and agents for stages that require reasoning or decision-making (summarization, classification). Mixing both keeps the pipeline fast where possible and intelligent where needed.

### How do I handle stages that need to branch conditionally?

Add a conditional stage that inspects the input and returns a flag in `ctx.metadata`. Subsequent stages check that flag to decide their behavior. For complex branching, consider combining the Pipeline pattern with the Router pattern.

---

#AgentDesignPatterns #PipelinePattern #Python #AgenticAI #DataProcessing #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/pipeline-pattern-sequential-agent-stages-data-processing
