Skip to content
Streaming with Async Generators: Building Real-Time AI Response Pipelines
Learn Agentic AI13 min read26 views

Streaming with Async Generators: Building Real-Time AI Response Pipelines

Build real-time streaming AI pipelines using Python async generators. Learn yield patterns, consumer chains, and backpressure for delivering LLM tokens to users instantly.

Why Streaming Matters for AI Agents

Users perceive LLM responses as faster when tokens arrive incrementally. A 3-second response that streams token-by-token from 200ms feels instant. The same response delivered as a single block after 3 seconds feels slow. Streaming also enables real-time processing pipelines where downstream steps begin working before the LLM finishes generating.

Python async generators provide a natural abstraction for streaming data through processing stages. Each stage yields results as they become available, creating a pipeline where data flows continuously rather than in batch.

Async Generator Fundamentals

An async generator is a function that uses both async def and yield. It produces values lazily and asynchronously.

sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
import asyncio

async def stream_llm_tokens(prompt: str):
    """Simulate streaming LLM response token by token."""
    tokens = [
        "Async", " generators", " enable", " real-time",
        " streaming", " of", " LLM", " responses",
        " to", " users.", ""
    ]
    for token in tokens:
        await asyncio.sleep(0.1)  # Simulate token generation delay
        yield token

async def main():
    async for token in stream_llm_tokens("Explain streaming"):
        print(token, end="", flush=True)
    print()  # Newline at the end

asyncio.run(main())

Each yield pauses the generator and delivers a value to the consumer. The generator resumes when the consumer requests the next value via async for.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Streaming from Real LLM APIs

Most LLM APIs support server-sent events (SSE) for streaming. Here is how to consume them with httpx.

import httpx
import json

async def stream_openai_response(
    client: httpx.AsyncClient,
    messages: list[dict],
    model: str = "gpt-4o",
):
    """Stream tokens from OpenAI's API."""
    async with client.stream(
        "POST",
        "https://api.openai.com/v1/chat/completions",
        json={
            "model": model,
            "messages": messages,
            "stream": True,
        },
    ) as response:
        response.raise_for_status()
        async for line in response.aiter_lines():
            if not line.startswith("data: "):
                continue
            data = line[6:]  # Strip "data: " prefix
            if data == "[DONE]":
                break
            chunk = json.loads(data)
            delta = chunk["choices"][0].get("delta", {})
            content = delta.get("content", "")
            if content:
                yield content

Building Processing Chains

The real power of async generators emerges when you chain them. Each stage transforms the stream and yields results to the next stage.

async def stream_tokens(prompt: str):
    """Stage 1: Generate raw tokens from LLM."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield token

async def buffer_sentences(token_stream):
    """Stage 2: Buffer tokens into complete sentences."""
    buffer = ""
    async for token in token_stream:
        buffer += token
        # Yield complete sentences
        while ". " in buffer or buffer.endswith("."):
            if ". " in buffer:
                sentence, buffer = buffer.split(". ", 1)
                yield sentence.strip() + "."
            elif buffer.endswith("."):
                yield buffer.strip()
                buffer = ""
                break
    if buffer.strip():
        yield buffer.strip()

async def add_metadata(sentence_stream):
    """Stage 3: Enrich sentences with metadata."""
    index = 0
    async for sentence in sentence_stream:
        yield {
            "index": index,
            "text": sentence,
            "word_count": len(sentence.split()),
            "timestamp": time.monotonic(),
        }
        index += 1

async def main():
    # Chain the pipeline stages
    tokens = stream_tokens("Explain three benefits of async Python")
    sentences = buffer_sentences(tokens)
    enriched = add_metadata(sentences)

    async for item in enriched:
        print(f"[{item['index']}] ({item['word_count']} words) "
              f"{item['text']}")

asyncio.run(main())

FastAPI Streaming Response

Integrate async generators directly with FastAPI's StreamingResponse for real-time delivery to clients.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def generate_sse_stream(prompt: str):
    """Generate Server-Sent Events from LLM stream."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/api/chat/stream")
async def chat_stream(prompt: str):
    return StreamingResponse(
        generate_sse_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

Backpressure in Streaming Pipelines

When a consumer is slower than a producer, tokens accumulate in memory. Use an asyncio.Queue to bound the buffer.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

async def bounded_stream(
    source,
    max_buffer: int = 50,
):
    """Apply backpressure to a stream with a bounded buffer."""
    queue: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)

    async def producer():
        async for item in source:
            await queue.put(item)  # Blocks when queue is full
        await queue.put(None)  # Sentinel

    producer_task = asyncio.create_task(producer())

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    await producer_task

Multiplexing Multiple Streams

An agent might need to stream from multiple sources and merge the results.

async def merge_streams(*streams):
    """Merge multiple async streams into one, yielding as available."""
    queue: asyncio.Queue = asyncio.Queue()
    active = len(streams)

    async def feed(stream, source_id):
        nonlocal active
        async for item in stream:
            await queue.put((source_id, item))
        active -= 1
        if active == 0:
            await queue.put(None)

    tasks = [
        asyncio.create_task(feed(stream, i))
        for i, stream in enumerate(streams)
    ]

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    for task in tasks:
        await task

FAQ

How do I handle errors in the middle of a streaming response?

Once you have started sending a streaming HTTP response, you cannot change the status code. The standard pattern is to send an error event in the SSE stream: yield f"data: {json.dumps({'error': str(e)})}\n\n". The client-side JavaScript should watch for error events and handle them appropriately. For critical errors, close the stream after sending the error event.

What is the memory overhead of async generators compared to collecting all results into a list?

Async generators use constant memory regardless of the total data volume — they hold only the current yielded value. A list holds every item simultaneously. For streaming 10,000 LLM tokens, a generator uses memory for one token at a time while a list stores all 10,000. This makes generators essential for long-running or high-volume streams.

Can I replay or tee an async generator to send data to multiple consumers?

Async generators are single-use — once consumed, the data is gone. To send the same stream to multiple consumers, use an asyncio.Queue per consumer and a producer task that reads from the generator once and puts each item into all queues. Libraries like aiostream provide a stream.tee() utility for this pattern.


#Python #Streaming #AsyncGenerators #RealTime #AIAgents #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

AI Engineering

GPT-Realtime-Whisper vs Deepgram: Streaming STT in 2026

OpenAI's GPT-Realtime-Whisper launches at $0.017/min for streaming STT. Side-by-side latency, accuracy, and cost math vs Deepgram and the field.