---
title: "Streaming with Async Generators: Building Real-Time AI Response Pipelines"
description: "Build real-time streaming AI pipelines using Python async generators. Learn yield patterns, consumer chains, and backpressure for delivering LLM tokens to users instantly."
canonical: https://callsphere.ai/blog/streaming-async-generators-real-time-ai-response-pipelines
category: "Learn Agentic AI"
tags: ["Python", "Streaming", "Async Generators", "Real-Time", "AI Agents"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-06-06T04:20:56.125Z
---

# Streaming with Async Generators: Building Real-Time AI Response Pipelines

> Build real-time streaming AI pipelines using Python async generators. Learn yield patterns, consumer chains, and backpressure for delivering LLM tokens to users instantly.

## Why Streaming Matters for AI Agents

Users perceive LLM responses as faster when tokens arrive incrementally. A 3-second response that streams token-by-token from 200ms feels instant. The same response delivered as a single block after 3 seconds feels slow. Streaming also enables real-time processing pipelines where downstream steps begin working before the LLM finishes generating.

Python async generators provide a natural abstraction for streaming data through processing stages. Each stage yields results as they become available, creating a pipeline where data flows continuously rather than in batch.

## Async Generator Fundamentals

An async generator is a function that uses both `async def` and `yield`. It produces values lazily and asynchronously.

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
import asyncio

async def stream_llm_tokens(prompt: str):
    """Simulate streaming LLM response token by token."""
    tokens = [
        "Async", " generators", " enable", " real-time",
        " streaming", " of", " LLM", " responses",
        " to", " users.", ""
    ]
    for token in tokens:
        await asyncio.sleep(0.1)  # Simulate token generation delay
        yield token

async def main():
    async for token in stream_llm_tokens("Explain streaming"):
        print(token, end="", flush=True)
    print()  # Newline at the end

asyncio.run(main())
```

Each `yield` pauses the generator and delivers a value to the consumer. The generator resumes when the consumer requests the next value via `async for`.

## Streaming from Real LLM APIs

Most LLM APIs support server-sent events (SSE) for streaming. Here is how to consume them with httpx.

```python
import httpx
import json

async def stream_openai_response(
    client: httpx.AsyncClient,
    messages: list[dict],
    model: str = "gpt-4o",
):
    """Stream tokens from OpenAI's API."""
    async with client.stream(
        "POST",
        "https://api.openai.com/v1/chat/completions",
        json={
            "model": model,
            "messages": messages,
            "stream": True,
        },
    ) as response:
        response.raise_for_status()
        async for line in response.aiter_lines():
            if not line.startswith("data: "):
                continue
            data = line[6:]  # Strip "data: " prefix
            if data == "[DONE]":
                break
            chunk = json.loads(data)
            delta = chunk["choices"][0].get("delta", {})
            content = delta.get("content", "")
            if content:
                yield content
```

## Building Processing Chains

The real power of async generators emerges when you chain them. Each stage transforms the stream and yields results to the next stage.

```python
async def stream_tokens(prompt: str):
    """Stage 1: Generate raw tokens from LLM."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield token

async def buffer_sentences(token_stream):
    """Stage 2: Buffer tokens into complete sentences."""
    buffer = ""
    async for token in token_stream:
        buffer += token
        # Yield complete sentences
        while ". " in buffer or buffer.endswith("."):
            if ". " in buffer:
                sentence, buffer = buffer.split(". ", 1)
                yield sentence.strip() + "."
            elif buffer.endswith("."):
                yield buffer.strip()
                buffer = ""
                break
    if buffer.strip():
        yield buffer.strip()

async def add_metadata(sentence_stream):
    """Stage 3: Enrich sentences with metadata."""
    index = 0
    async for sentence in sentence_stream:
        yield {
            "index": index,
            "text": sentence,
            "word_count": len(sentence.split()),
            "timestamp": time.monotonic(),
        }
        index += 1

async def main():
    # Chain the pipeline stages
    tokens = stream_tokens("Explain three benefits of async Python")
    sentences = buffer_sentences(tokens)
    enriched = add_metadata(sentences)

    async for item in enriched:
        print(f"[{item['index']}] ({item['word_count']} words) "
              f"{item['text']}")

asyncio.run(main())
```

## FastAPI Streaming Response

Integrate async generators directly with FastAPI's `StreamingResponse` for real-time delivery to clients.

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def generate_sse_stream(prompt: str):
    """Generate Server-Sent Events from LLM stream."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/api/chat/stream")
async def chat_stream(prompt: str):
    return StreamingResponse(
        generate_sse_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )
```

## Backpressure in Streaming Pipelines

When a consumer is slower than a producer, tokens accumulate in memory. Use an `asyncio.Queue` to bound the buffer.

```python
async def bounded_stream(
    source,
    max_buffer: int = 50,
):
    """Apply backpressure to a stream with a bounded buffer."""
    queue: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)

    async def producer():
        async for item in source:
            await queue.put(item)  # Blocks when queue is full
        await queue.put(None)  # Sentinel

    producer_task = asyncio.create_task(producer())

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    await producer_task
```

## Multiplexing Multiple Streams

An agent might need to stream from multiple sources and merge the results.

```python
async def merge_streams(*streams):
    """Merge multiple async streams into one, yielding as available."""
    queue: asyncio.Queue = asyncio.Queue()
    active = len(streams)

    async def feed(stream, source_id):
        nonlocal active
        async for item in stream:
            await queue.put((source_id, item))
        active -= 1
        if active == 0:
            await queue.put(None)

    tasks = [
        asyncio.create_task(feed(stream, i))
        for i, stream in enumerate(streams)
    ]

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    for task in tasks:
        await task
```

## FAQ

### How do I handle errors in the middle of a streaming response?

Once you have started sending a streaming HTTP response, you cannot change the status code. The standard pattern is to send an error event in the SSE stream: `yield f"data: {json.dumps({'error': str(e)})}\n\n"`. The client-side JavaScript should watch for error events and handle them appropriately. For critical errors, close the stream after sending the error event.

### What is the memory overhead of async generators compared to collecting all results into a list?

Async generators use constant memory regardless of the total data volume — they hold only the current yielded value. A list holds every item simultaneously. For streaming 10,000 LLM tokens, a generator uses memory for one token at a time while a list stores all 10,000. This makes generators essential for long-running or high-volume streams.

### Can I replay or tee an async generator to send data to multiple consumers?

Async generators are single-use — once consumed, the data is gone. To send the same stream to multiple consumers, use an `asyncio.Queue` per consumer and a producer task that reads from the generator once and puts each item into all queues. Libraries like `aiostream` provide a `stream.tee()` utility for this pattern.

---

#Python #Streaming #AsyncGenerators #RealTime #AIAgents #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/streaming-async-generators-real-time-ai-response-pipelines
