Building Async Agent Pipelines: Chaining Asynchronous Steps with Dependencies
Design and implement async agent pipelines that chain dependent steps, resolve execution order automatically, and handle backpressure in production AI systems.
Why Pipelines Matter for AI Agents
Real-world AI agents rarely execute a single LLM call. They run multi-step workflows: retrieve context, classify intent, generate a response, validate the output, then format it. Some steps depend on others. Some can run in parallel. A well-designed async pipeline maximizes concurrency while respecting dependencies.
The key insight is modeling your agent workflow as a directed acyclic graph (DAG) of steps, where edges represent data dependencies. Steps without mutual dependencies execute concurrently. Steps that consume another step's output wait automatically.
Simple Linear Pipeline
The simplest pipeline chains steps sequentially, each consuming the previous step's output.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
flowchart LR
INPUT(["User intent"])
PARSE["Parse plus<br/>classify"]
PLAN["Plan and tool<br/>selection"]
AGENT["Agent loop<br/>LLM plus tools"]
GUARD{"Guardrails<br/>and policy"}
EXEC["Execute and<br/>verify result"]
OBS[("Trace and metrics")]
OUT(["Outcome plus<br/>next action"])
INPUT --> PARSE --> PLAN --> AGENT --> GUARD
GUARD -->|Pass| EXEC --> OUT
GUARD -->|Fail| AGENT
AGENT --> OBS
style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
style OUT fill:#059669,stroke:#047857,color:#fff
import asyncio
from dataclasses import dataclass
from typing import Any
@dataclass
class PipelineContext:
"""Shared context flowing through the pipeline."""
user_query: str
retrieved_docs: list[str] | None = None
intent: str | None = None
response: str | None = None
validated: bool = False
async def retrieve_context(ctx: PipelineContext) -> PipelineContext:
"""Step 1: Retrieve relevant documents."""
print("Retrieving context...")
await asyncio.sleep(0.5) # Simulate vector search
ctx.retrieved_docs = [
"Doc about async Python patterns",
"Doc about agent architectures",
]
return ctx
async def classify_intent(ctx: PipelineContext) -> PipelineContext:
"""Step 2: Classify user intent."""
print("Classifying intent...")
await asyncio.sleep(0.3) # Simulate LLM call
ctx.intent = "technical_question"
return ctx
async def generate_response(ctx: PipelineContext) -> PipelineContext:
"""Step 3: Generate response using context and intent."""
print(f"Generating response for intent: {ctx.intent}")
await asyncio.sleep(1.0) # Simulate LLM call
ctx.response = f"Based on {len(ctx.retrieved_docs)} docs: ..."
return ctx
async def validate_output(ctx: PipelineContext) -> PipelineContext:
"""Step 4: Validate the response."""
print("Validating output...")
await asyncio.sleep(0.2) # Simulate validation
ctx.validated = True
return ctx
async def run_linear_pipeline(query: str) -> PipelineContext:
ctx = PipelineContext(user_query=query)
ctx = await retrieve_context(ctx)
ctx = await classify_intent(ctx)
ctx = await generate_response(ctx)
ctx = await validate_output(ctx)
return ctx
Diamond Pipeline: Parallel Steps with Shared Dependencies
In many agent workflows, some steps are independent and can run concurrently. A diamond pattern runs parallel branches that converge at a later step.
async def run_diamond_pipeline(query: str) -> PipelineContext:
"""
Pipeline with parallel branches:
retrieve_context
/ \
classify_intent extract_entities
\ /
generate_response
|
validate_output
"""
ctx = PipelineContext(user_query=query)
# Step 1: Retrieve context (both branches depend on this)
ctx = await retrieve_context(ctx)
# Step 2: Run classification and entity extraction in parallel
async def classify(ctx):
await asyncio.sleep(0.3)
ctx.intent = "technical_question"
return ctx
async def extract_entities(ctx):
await asyncio.sleep(0.4)
ctx.entities = ["asyncio", "Python", "pipelines"]
return ctx
await asyncio.gather(classify(ctx), extract_entities(ctx))
# Step 3: Generate response (depends on both parallel steps)
ctx = await generate_response(ctx)
# Step 4: Validate
ctx = await validate_output(ctx)
return ctx
Generic DAG-Based Pipeline Engine
For complex agents, build a reusable pipeline engine that resolves dependencies automatically.
from collections import defaultdict
class AsyncPipeline:
"""DAG-based async pipeline with automatic dependency resolution."""
def __init__(self):
self.steps: dict[str, Any] = {}
self.dependencies: dict[str, list[str]] = defaultdict(list)
def add_step(self, name: str, func, depends_on: list[str] = None):
"""Register a pipeline step with its dependencies."""
self.steps[name] = func
if depends_on:
self.dependencies[name] = depends_on
async def execute(self, ctx: dict) -> dict:
"""Execute the pipeline respecting dependency order."""
completed: dict[str, asyncio.Event] = {
name: asyncio.Event() for name in self.steps
}
results: dict[str, Any] = {}
async def run_step(name: str):
# Wait for all dependencies to complete
for dep in self.dependencies[name]:
await completed[dep].wait()
# Execute the step
result = await self.steps[name](ctx, results)
results[name] = result
completed[name].set()
# Launch all steps concurrently — they self-schedule
# via dependency waits
await asyncio.gather(
*[run_step(name) for name in self.steps]
)
return results
# Usage
pipeline = AsyncPipeline()
pipeline.add_step("retrieve", retrieve_docs_step)
pipeline.add_step("classify", classify_step, depends_on=["retrieve"])
pipeline.add_step("entities", extract_entities_step, depends_on=["retrieve"])
pipeline.add_step("generate", generate_step,
depends_on=["classify", "entities"])
pipeline.add_step("validate", validate_step, depends_on=["generate"])
results = asyncio.run(pipeline.execute({"query": "How does asyncio work?"}))
Backpressure: Preventing Pipeline Overload
When a fast producer feeds a slow consumer, work accumulates unboundedly. Use asyncio.Queue with a max size to apply backpressure.
async def pipeline_with_backpressure(
queries: list[str],
max_in_flight: int = 10,
):
"""Pipeline with bounded queues for backpressure."""
retrieval_queue = asyncio.Queue(maxsize=max_in_flight)
generation_queue = asyncio.Queue(maxsize=max_in_flight)
results = []
async def retriever():
while True:
query = await retrieval_queue.get()
if query is None:
await generation_queue.put(None)
break
docs = await fetch_documents(query)
await generation_queue.put((query, docs))
retrieval_queue.task_done()
async def generator():
while True:
item = await generation_queue.get()
if item is None:
break
query, docs = item
response = await generate_llm_response(query, docs)
results.append(response)
generation_queue.task_done()
# Start workers
ret_task = asyncio.create_task(retriever())
gen_task = asyncio.create_task(generator())
# Feed queries — this blocks when queue is full
for query in queries:
await retrieval_queue.put(query)
await retrieval_queue.put(None) # Sentinel
await asyncio.gather(ret_task, gen_task)
return results
The maxsize parameter on the queue ensures the producer blocks when the consumer cannot keep up, preventing unbounded memory growth.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
FAQ
How do I handle a step failure in a multi-step pipeline?
Wrap each step in a try/except and decide on a failure strategy: skip the step and continue with degraded results, retry the step with backoff, or abort the entire pipeline. The DAG engine approach makes this clean — each step checks its dependencies' results and can short-circuit if a required dependency failed.
Can I add conditional branching to async pipelines?
Yes. In the DAG engine, make the step function inspect the context or prior results and return early if the branch should not execute. For example, a moderation step might set a flag that causes the generation step to return a canned safety response instead of calling the LLM.
What is the performance overhead of the DAG pipeline approach compared to hand-coded async?
The overhead is negligible. The asyncio.Event-based synchronization adds microseconds of latency per step. The dominant cost is always the LLM API calls, which take hundreds of milliseconds. The DAG approach pays for itself in maintainability and correctness.
#Python #Asyncio #Pipeline #AIAgents #Architecture #AgenticAI #LearnAI #AIEngineering
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.