---
title: "Building Async Agent Pipelines: Chaining Asynchronous Steps with Dependencies"
description: "Design and implement async agent pipelines that chain dependent steps, resolve execution order automatically, and handle backpressure in production AI systems."
canonical: https://callsphere.ai/blog/building-async-agent-pipelines-chaining-asynchronous-steps-dependencies
category: "Learn Agentic AI"
tags: ["Python", "asyncio", "Pipeline", "AI Agents", "Architecture"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-06-01T19:20:34.035Z
---

# Building Async Agent Pipelines: Chaining Asynchronous Steps with Dependencies

> Design and implement async agent pipelines that chain dependent steps, resolve execution order automatically, and handle backpressure in production AI systems.

## Why Pipelines Matter for AI Agents

Real-world AI agents rarely execute a single LLM call. They run multi-step workflows: retrieve context, classify intent, generate a response, validate the output, then format it. Some steps depend on others. Some can run in parallel. A well-designed async pipeline maximizes concurrency while respecting dependencies.

The key insight is modeling your agent workflow as a directed acyclic graph (DAG) of steps, where edges represent data dependencies. Steps without mutual dependencies execute concurrently. Steps that consume another step's output wait automatically.

## Simple Linear Pipeline

The simplest pipeline chains steps sequentially, each consuming the previous step's output.

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
import asyncio
from dataclasses import dataclass
from typing import Any

@dataclass
class PipelineContext:
    """Shared context flowing through the pipeline."""
    user_query: str
    retrieved_docs: list[str] | None = None
    intent: str | None = None
    response: str | None = None
    validated: bool = False

async def retrieve_context(ctx: PipelineContext) -> PipelineContext:
    """Step 1: Retrieve relevant documents."""
    print("Retrieving context...")
    await asyncio.sleep(0.5)  # Simulate vector search
    ctx.retrieved_docs = [
        "Doc about async Python patterns",
        "Doc about agent architectures",
    ]
    return ctx

async def classify_intent(ctx: PipelineContext) -> PipelineContext:
    """Step 2: Classify user intent."""
    print("Classifying intent...")
    await asyncio.sleep(0.3)  # Simulate LLM call
    ctx.intent = "technical_question"
    return ctx

async def generate_response(ctx: PipelineContext) -> PipelineContext:
    """Step 3: Generate response using context and intent."""
    print(f"Generating response for intent: {ctx.intent}")
    await asyncio.sleep(1.0)  # Simulate LLM call
    ctx.response = f"Based on {len(ctx.retrieved_docs)} docs: ..."
    return ctx

async def validate_output(ctx: PipelineContext) -> PipelineContext:
    """Step 4: Validate the response."""
    print("Validating output...")
    await asyncio.sleep(0.2)  # Simulate validation
    ctx.validated = True
    return ctx

async def run_linear_pipeline(query: str) -> PipelineContext:
    ctx = PipelineContext(user_query=query)
    ctx = await retrieve_context(ctx)
    ctx = await classify_intent(ctx)
    ctx = await generate_response(ctx)
    ctx = await validate_output(ctx)
    return ctx
```

## Diamond Pipeline: Parallel Steps with Shared Dependencies

In many agent workflows, some steps are independent and can run concurrently. A diamond pattern runs parallel branches that converge at a later step.

```python
async def run_diamond_pipeline(query: str) -> PipelineContext:
    """
    Pipeline with parallel branches:

        retrieve_context
           /        \
    classify_intent  extract_entities
           \        /
        generate_response
              |
        validate_output
    """
    ctx = PipelineContext(user_query=query)

    # Step 1: Retrieve context (both branches depend on this)
    ctx = await retrieve_context(ctx)

    # Step 2: Run classification and entity extraction in parallel
    async def classify(ctx):
        await asyncio.sleep(0.3)
        ctx.intent = "technical_question"
        return ctx

    async def extract_entities(ctx):
        await asyncio.sleep(0.4)
        ctx.entities = ["asyncio", "Python", "pipelines"]
        return ctx

    await asyncio.gather(classify(ctx), extract_entities(ctx))

    # Step 3: Generate response (depends on both parallel steps)
    ctx = await generate_response(ctx)

    # Step 4: Validate
    ctx = await validate_output(ctx)
    return ctx
```

## Generic DAG-Based Pipeline Engine

For complex agents, build a reusable pipeline engine that resolves dependencies automatically.

```python
from collections import defaultdict

class AsyncPipeline:
    """DAG-based async pipeline with automatic dependency resolution."""

    def __init__(self):
        self.steps: dict[str, Any] = {}
        self.dependencies: dict[str, list[str]] = defaultdict(list)

    def add_step(self, name: str, func, depends_on: list[str] = None):
        """Register a pipeline step with its dependencies."""
        self.steps[name] = func
        if depends_on:
            self.dependencies[name] = depends_on

    async def execute(self, ctx: dict) -> dict:
        """Execute the pipeline respecting dependency order."""
        completed: dict[str, asyncio.Event] = {
            name: asyncio.Event() for name in self.steps
        }
        results: dict[str, Any] = {}

        async def run_step(name: str):
            # Wait for all dependencies to complete
            for dep in self.dependencies[name]:
                await completed[dep].wait()

            # Execute the step
            result = await self.steps[name](ctx, results)
            results[name] = result
            completed[name].set()

        # Launch all steps concurrently — they self-schedule
        # via dependency waits
        await asyncio.gather(
            *[run_step(name) for name in self.steps]
        )
        return results

# Usage
pipeline = AsyncPipeline()
pipeline.add_step("retrieve", retrieve_docs_step)
pipeline.add_step("classify", classify_step, depends_on=["retrieve"])
pipeline.add_step("entities", extract_entities_step, depends_on=["retrieve"])
pipeline.add_step("generate", generate_step,
                   depends_on=["classify", "entities"])
pipeline.add_step("validate", validate_step, depends_on=["generate"])

results = asyncio.run(pipeline.execute({"query": "How does asyncio work?"}))
```

## Backpressure: Preventing Pipeline Overload

When a fast producer feeds a slow consumer, work accumulates unboundedly. Use `asyncio.Queue` with a max size to apply backpressure.

```python
async def pipeline_with_backpressure(
    queries: list[str],
    max_in_flight: int = 10,
):
    """Pipeline with bounded queues for backpressure."""
    retrieval_queue = asyncio.Queue(maxsize=max_in_flight)
    generation_queue = asyncio.Queue(maxsize=max_in_flight)
    results = []

    async def retriever():
        while True:
            query = await retrieval_queue.get()
            if query is None:
                await generation_queue.put(None)
                break
            docs = await fetch_documents(query)
            await generation_queue.put((query, docs))
            retrieval_queue.task_done()

    async def generator():
        while True:
            item = await generation_queue.get()
            if item is None:
                break
            query, docs = item
            response = await generate_llm_response(query, docs)
            results.append(response)
            generation_queue.task_done()

    # Start workers
    ret_task = asyncio.create_task(retriever())
    gen_task = asyncio.create_task(generator())

    # Feed queries — this blocks when queue is full
    for query in queries:
        await retrieval_queue.put(query)
    await retrieval_queue.put(None)  # Sentinel

    await asyncio.gather(ret_task, gen_task)
    return results
```

The `maxsize` parameter on the queue ensures the producer blocks when the consumer cannot keep up, preventing unbounded memory growth.

## FAQ

### How do I handle a step failure in a multi-step pipeline?

Wrap each step in a try/except and decide on a failure strategy: skip the step and continue with degraded results, retry the step with backoff, or abort the entire pipeline. The DAG engine approach makes this clean — each step checks its dependencies' results and can short-circuit if a required dependency failed.

### Can I add conditional branching to async pipelines?

Yes. In the DAG engine, make the step function inspect the context or prior results and return early if the branch should not execute. For example, a moderation step might set a flag that causes the generation step to return a canned safety response instead of calling the LLM.

### What is the performance overhead of the DAG pipeline approach compared to hand-coded async?

The overhead is negligible. The asyncio.Event-based synchronization adds microseconds of latency per step. The dominant cost is always the LLM API calls, which take hundreds of milliseconds. The DAG approach pays for itself in maintainability and correctness.

---

#Python #Asyncio #Pipeline #AIAgents #Architecture #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/building-async-agent-pipelines-chaining-asynchronous-steps-dependencies
