Skip to content
Learn Agentic AI
Learn Agentic AI13 min read4 views

Streaming with Async Generators: Building Real-Time AI Response Pipelines

Build real-time streaming AI pipelines using Python async generators. Learn yield patterns, consumer chains, and backpressure for delivering LLM tokens to users instantly.

Why Streaming Matters for AI Agents

Users perceive LLM responses as faster when tokens arrive incrementally. A 3-second response that streams token-by-token from 200ms feels instant. The same response delivered as a single block after 3 seconds feels slow. Streaming also enables real-time processing pipelines where downstream steps begin working before the LLM finishes generating.

Python async generators provide a natural abstraction for streaming data through processing stages. Each stage yields results as they become available, creating a pipeline where data flows continuously rather than in batch.

Async Generator Fundamentals

An async generator is a function that uses both async def and yield. It produces values lazily and asynchronously.

flowchart TD
    START["Streaming with Async Generators: Building Real-Ti…"] --> A
    A["Why Streaming Matters for AI Agents"]
    A --> B
    B["Async Generator Fundamentals"]
    B --> C
    C["Streaming from Real LLM APIs"]
    C --> D
    D["Building Processing Chains"]
    D --> E
    E["FastAPI Streaming Response"]
    E --> F
    F["Backpressure in Streaming Pipelines"]
    F --> G
    G["Multiplexing Multiple Streams"]
    G --> H
    H["FAQ"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio

async def stream_llm_tokens(prompt: str):
    """Simulate streaming LLM response token by token."""
    tokens = [
        "Async", " generators", " enable", " real-time",
        " streaming", " of", " LLM", " responses",
        " to", " users.", ""
    ]
    for token in tokens:
        await asyncio.sleep(0.1)  # Simulate token generation delay
        yield token

async def main():
    async for token in stream_llm_tokens("Explain streaming"):
        print(token, end="", flush=True)
    print()  # Newline at the end

asyncio.run(main())

Each yield pauses the generator and delivers a value to the consumer. The generator resumes when the consumer requests the next value via async for.

Streaming from Real LLM APIs

Most LLM APIs support server-sent events (SSE) for streaming. Here is how to consume them with httpx.

import httpx
import json

async def stream_openai_response(
    client: httpx.AsyncClient,
    messages: list[dict],
    model: str = "gpt-4o",
):
    """Stream tokens from OpenAI's API."""
    async with client.stream(
        "POST",
        "https://api.openai.com/v1/chat/completions",
        json={
            "model": model,
            "messages": messages,
            "stream": True,
        },
    ) as response:
        response.raise_for_status()
        async for line in response.aiter_lines():
            if not line.startswith("data: "):
                continue
            data = line[6:]  # Strip "data: " prefix
            if data == "[DONE]":
                break
            chunk = json.loads(data)
            delta = chunk["choices"][0].get("delta", {})
            content = delta.get("content", "")
            if content:
                yield content

Building Processing Chains

The real power of async generators emerges when you chain them. Each stage transforms the stream and yields results to the next stage.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

async def stream_tokens(prompt: str):
    """Stage 1: Generate raw tokens from LLM."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield token

async def buffer_sentences(token_stream):
    """Stage 2: Buffer tokens into complete sentences."""
    buffer = ""
    async for token in token_stream:
        buffer += token
        # Yield complete sentences
        while ". " in buffer or buffer.endswith("."):
            if ". " in buffer:
                sentence, buffer = buffer.split(". ", 1)
                yield sentence.strip() + "."
            elif buffer.endswith("."):
                yield buffer.strip()
                buffer = ""
                break
    if buffer.strip():
        yield buffer.strip()

async def add_metadata(sentence_stream):
    """Stage 3: Enrich sentences with metadata."""
    index = 0
    async for sentence in sentence_stream:
        yield {
            "index": index,
            "text": sentence,
            "word_count": len(sentence.split()),
            "timestamp": time.monotonic(),
        }
        index += 1

async def main():
    # Chain the pipeline stages
    tokens = stream_tokens("Explain three benefits of async Python")
    sentences = buffer_sentences(tokens)
    enriched = add_metadata(sentences)

    async for item in enriched:
        print(f"[{item['index']}] ({item['word_count']} words) "
              f"{item['text']}")

asyncio.run(main())

FastAPI Streaming Response

Integrate async generators directly with FastAPI's StreamingResponse for real-time delivery to clients.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def generate_sse_stream(prompt: str):
    """Generate Server-Sent Events from LLM stream."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/api/chat/stream")
async def chat_stream(prompt: str):
    return StreamingResponse(
        generate_sse_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

Backpressure in Streaming Pipelines

When a consumer is slower than a producer, tokens accumulate in memory. Use an asyncio.Queue to bound the buffer.

async def bounded_stream(
    source,
    max_buffer: int = 50,
):
    """Apply backpressure to a stream with a bounded buffer."""
    queue: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)

    async def producer():
        async for item in source:
            await queue.put(item)  # Blocks when queue is full
        await queue.put(None)  # Sentinel

    producer_task = asyncio.create_task(producer())

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    await producer_task

Multiplexing Multiple Streams

An agent might need to stream from multiple sources and merge the results.

async def merge_streams(*streams):
    """Merge multiple async streams into one, yielding as available."""
    queue: asyncio.Queue = asyncio.Queue()
    active = len(streams)

    async def feed(stream, source_id):
        nonlocal active
        async for item in stream:
            await queue.put((source_id, item))
        active -= 1
        if active == 0:
            await queue.put(None)

    tasks = [
        asyncio.create_task(feed(stream, i))
        for i, stream in enumerate(streams)
    ]

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    for task in tasks:
        await task

FAQ

How do I handle errors in the middle of a streaming response?

Once you have started sending a streaming HTTP response, you cannot change the status code. The standard pattern is to send an error event in the SSE stream: yield f"data: {json.dumps({'error': str(e)})}\n\n". The client-side JavaScript should watch for error events and handle them appropriately. For critical errors, close the stream after sending the error event.

What is the memory overhead of async generators compared to collecting all results into a list?

Async generators use constant memory regardless of the total data volume — they hold only the current yielded value. A list holds every item simultaneously. For streaming 10,000 LLM tokens, a generator uses memory for one token at a time while a list stores all 10,000. This makes generators essential for long-running or high-volume streams.

Can I replay or tee an async generator to send data to multiple consumers?

Async generators are single-use — once consumed, the data is gone. To send the same stream to multiple consumers, use an asyncio.Queue per consumer and a producer task that reads from the generator once and puts each item into all queues. Libraries like aiostream provide a stream.tee() utility for this pattern.


#Python #Streaming #AsyncGenerators #RealTime #AIAgents #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Use Cases

Automating Client Document Collection: How AI Agents Chase Missing Tax Documents and Reduce Filing Delays

See how AI agents automate tax document collection — chasing missing W-2s, 1099s, and receipts via calls and texts to eliminate the #1 CPA bottleneck.

Technical Guides

AI Voice Agent Architecture: Real-Time STT, LLM, and TTS Pipeline

Deep dive into the real-time STT → LLM → TTS pipeline that powers modern AI voice agents — latency, streaming, and error recovery.

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

API Design for AI Agent Tool Functions: Best Practices and Anti-Patterns

How to design tool functions that LLMs can use effectively with clear naming, enum parameters, structured responses, informative error messages, and documentation.

Learn Agentic AI

Computer Use in GPT-5.4: Building AI Agents That Navigate Desktop Applications

Technical guide to GPT-5.4's computer use capabilities for building AI agents that interact with desktop UIs, browser automation, and real-world application workflows.

Learn Agentic AI

AI Agents for IT Helpdesk: L1 Automation, Ticket Routing, and Knowledge Base Integration

Build IT helpdesk AI agents with multi-agent architecture for triage, device, network, and security issues. RAG-powered knowledge base, automated ticket creation, routing, and escalation.