---
title: "Python Generators and Iterators for Streaming AI Responses"
description: "Master Python generators and async iterators for building efficient streaming AI response pipelines with memory-efficient processing, backpressure handling, and real-time output."
canonical: https://callsphere.ai/blog/python-generators-iterators-streaming-ai-responses
category: "Learn Agentic AI"
tags: ["Python", "Generators", "Streaming", "Async Programming", "Agentic AI"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:43.671Z
---

# Python Generators and Iterators for Streaming AI Responses

> Master Python generators and async iterators for building efficient streaming AI response pipelines with memory-efficient processing, backpressure handling, and real-time output.

## Why Streaming Matters for AI Applications

When an LLM generates a 2,000-token response, waiting for the full completion before showing anything creates a poor user experience. Streaming sends tokens as they are generated, reducing perceived latency from seconds to milliseconds. Python generators are the natural abstraction for this pattern — they produce values lazily, one at a time, without holding the entire response in memory.

Beyond user experience, generators enable memory-efficient processing of large datasets for embeddings, batch inference, and document chunking — all critical operations in AI pipelines.

## Generator Basics for Token Streaming

A generator function uses `yield` instead of `return`. Each `yield` pauses the function and produces a value. The function resumes from where it paused on the next iteration.

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
from typing import Generator

def stream_tokens(text: str, chunk_size: int = 4) -> Generator[str, None, None]:
    """Simulate token streaming by yielding chunks of text."""
    for i in range(0, len(text), chunk_size):
        yield text[i:i + chunk_size]

# Lazy evaluation - only one chunk in memory at a time
for token in stream_tokens("The agent analyzed the document carefully."):
    print(token, end="", flush=True)
```

## Async Generators for Real LLM Streaming

Real LLM APIs stream over HTTP using server-sent events. Async generators handle this naturally.

```python
import httpx
import json
from typing import AsyncGenerator

async def stream_chat_completion(
    messages: list[dict],
    model: str = "gpt-4o",
    api_key: str = "",
) -> AsyncGenerator[str, None]:
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {api_key}"},
            json={"model": model, "messages": messages, "stream": True},
            timeout=60.0,
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: ") and line != "data: [DONE]":
                    chunk = json.loads(line[6:])
                    delta = chunk["choices"][0].get("delta", {})
                    if content := delta.get("content"):
                        yield content

# Usage
async def main():
    messages = [{"role": "user", "content": "Explain agentic AI"}]
    full_response = []
    async for token in stream_chat_completion(messages, api_key="sk-..."):
        print(token, end="", flush=True)
        full_response.append(token)

    complete = "".join(full_response)
```

## Pipeline Composition with Generators

Generators compose into processing pipelines where each stage transforms the stream without buffering everything.

```python
from typing import AsyncGenerator

async def chunk_by_sentence(
    token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[str, None]:
    buffer = ""
    async for token in token_stream:
        buffer += token
        while ". " in buffer:
            sentence, buffer = buffer.split(". ", 1)
            yield sentence.strip() + "."

async def add_citations(
    sentence_stream: AsyncGenerator[str, None],
    knowledge_base: dict,
) -> AsyncGenerator[str, None]:
    async for sentence in sentence_stream:
        # Check if sentence needs a citation
        for keyword, citation in knowledge_base.items():
            if keyword.lower() in sentence.lower():
                sentence += f" [{citation}]"
                break
        yield sentence

# Compose the pipeline
async def enriched_stream(messages, api_key, kb):
    raw_tokens = stream_chat_completion(messages, api_key=api_key)
    sentences = chunk_by_sentence(raw_tokens)
    enriched = add_citations(sentences, kb)
    async for sentence in enriched:
        yield sentence
```

## Memory-Efficient Batch Processing

When generating embeddings for thousands of documents, generators prevent loading everything into memory at once.

```python
from typing import Generator
from pathlib import Path

def read_documents(directory: Path) -> Generator[str, None, None]:
    for file_path in directory.glob("*.txt"):
        yield file_path.read_text()

def batch_items(items: Generator, batch_size: int = 32) -> Generator[list, None, None]:
    batch = []
    for item in items:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

async def embed_directory(directory: Path):
    documents = read_documents(directory)
    for batch in batch_items(documents, batch_size=32):
        embeddings = await embedding_api.embed(batch)
        await vector_db.upsert(embeddings)
        # Only 32 documents in memory at any time
```

## Generator-Based Streaming in FastAPI

FastAPI supports `StreamingResponse` with generators for real-time AI output.

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        async for token in stream_chat_completion(
            request.messages, api_key=settings.api_key
        ):
            yield f"data: {json.dumps({'token': token})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")
```

## FAQ

### What is the difference between a generator and an iterator in Python?

Every generator is an iterator, but not every iterator is a generator. An iterator is any object with `__iter__` and `__next__` methods. A generator is a function that uses `yield` and automatically creates an iterator. Generators are more concise and handle state management automatically.

### How do I handle errors in a streaming pipeline without losing partial results?

Wrap the consumer loop in a try/except and process whatever has been yielded so far. You can also use `generator.throw()` to inject exceptions into a running generator from the outside, allowing it to handle errors and potentially recover or yield a fallback value.

### Can generators cause memory leaks if not fully consumed?

Yes. If you break out of a generator loop early, the generator object stays in memory with its full stack frame until garbage collected. Call `generator.close()` explicitly or use it inside a `with` statement via `contextlib.closing` to ensure cleanup.

---

#Python #Generators #Streaming #AsyncProgramming #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/python-generators-iterators-streaming-ai-responses
