Python Generators and Iterators for Streaming AI Responses
Master Python generators and async iterators for building efficient streaming AI response pipelines with memory-efficient processing, backpressure handling, and real-time output.
Why Streaming Matters for AI Applications
When an LLM generates a 2,000-token response, waiting for the full completion before showing anything creates a poor user experience. Streaming sends tokens as they are generated, reducing perceived latency from seconds to milliseconds. Python generators are the natural abstraction for this pattern — they produce values lazily, one at a time, without holding the entire response in memory.
Beyond user experience, generators enable memory-efficient processing of large datasets for embeddings, batch inference, and document chunking — all critical operations in AI pipelines.
Generator Basics for Token Streaming
A generator function uses yield instead of return. Each yield pauses the function and produces a value. The function resumes from where it paused on the next iteration.
flowchart TD
START["Python Generators and Iterators for Streaming AI …"] --> A
A["Why Streaming Matters for AI Applicatio…"]
A --> B
B["Generator Basics for Token Streaming"]
B --> C
C["Async Generators for Real LLM Streaming"]
C --> D
D["Pipeline Composition with Generators"]
D --> E
E["Memory-Efficient Batch Processing"]
E --> F
F["Generator-Based Streaming in FastAPI"]
F --> G
G["FAQ"]
G --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from typing import Generator
def stream_tokens(text: str, chunk_size: int = 4) -> Generator[str, None, None]:
"""Simulate token streaming by yielding chunks of text."""
for i in range(0, len(text), chunk_size):
yield text[i:i + chunk_size]
# Lazy evaluation - only one chunk in memory at a time
for token in stream_tokens("The agent analyzed the document carefully."):
print(token, end="", flush=True)
Async Generators for Real LLM Streaming
Real LLM APIs stream over HTTP using server-sent events. Async generators handle this naturally.
import httpx
import json
from typing import AsyncGenerator
async def stream_chat_completion(
messages: list[dict],
model: str = "gpt-4o",
api_key: str = "",
) -> AsyncGenerator[str, None]:
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={"model": model, "messages": messages, "stream": True},
timeout=60.0,
) as response:
async for line in response.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
chunk = json.loads(line[6:])
delta = chunk["choices"][0].get("delta", {})
if content := delta.get("content"):
yield content
# Usage
async def main():
messages = [{"role": "user", "content": "Explain agentic AI"}]
full_response = []
async for token in stream_chat_completion(messages, api_key="sk-..."):
print(token, end="", flush=True)
full_response.append(token)
complete = "".join(full_response)
Pipeline Composition with Generators
Generators compose into processing pipelines where each stage transforms the stream without buffering everything.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from typing import AsyncGenerator
async def chunk_by_sentence(
token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[str, None]:
buffer = ""
async for token in token_stream:
buffer += token
while ". " in buffer:
sentence, buffer = buffer.split(". ", 1)
yield sentence.strip() + "."
async def add_citations(
sentence_stream: AsyncGenerator[str, None],
knowledge_base: dict,
) -> AsyncGenerator[str, None]:
async for sentence in sentence_stream:
# Check if sentence needs a citation
for keyword, citation in knowledge_base.items():
if keyword.lower() in sentence.lower():
sentence += f" [{citation}]"
break
yield sentence
# Compose the pipeline
async def enriched_stream(messages, api_key, kb):
raw_tokens = stream_chat_completion(messages, api_key=api_key)
sentences = chunk_by_sentence(raw_tokens)
enriched = add_citations(sentences, kb)
async for sentence in enriched:
yield sentence
Memory-Efficient Batch Processing
When generating embeddings for thousands of documents, generators prevent loading everything into memory at once.
from typing import Generator
from pathlib import Path
def read_documents(directory: Path) -> Generator[str, None, None]:
for file_path in directory.glob("*.txt"):
yield file_path.read_text()
def batch_items(items: Generator, batch_size: int = 32) -> Generator[list, None, None]:
batch = []
for item in items:
batch.append(item)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
async def embed_directory(directory: Path):
documents = read_documents(directory)
for batch in batch_items(documents, batch_size=32):
embeddings = await embedding_api.embed(batch)
await vector_db.upsert(embeddings)
# Only 32 documents in memory at any time
Generator-Based Streaming in FastAPI
FastAPI supports StreamingResponse with generators for real-time AI output.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
async for token in stream_chat_completion(
request.messages, api_key=settings.api_key
):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
FAQ
What is the difference between a generator and an iterator in Python?
Every generator is an iterator, but not every iterator is a generator. An iterator is any object with __iter__ and __next__ methods. A generator is a function that uses yield and automatically creates an iterator. Generators are more concise and handle state management automatically.
How do I handle errors in a streaming pipeline without losing partial results?
Wrap the consumer loop in a try/except and process whatever has been yielded so far. You can also use generator.throw() to inject exceptions into a running generator from the outside, allowing it to handle errors and potentially recover or yield a fallback value.
Can generators cause memory leaks if not fully consumed?
Yes. If you break out of a generator loop early, the generator object stays in memory with its full stack frame until garbage collected. Call generator.close() explicitly or use it inside a with statement via contextlib.closing to ensure cleanup.
#Python #Generators #Streaming #AsyncProgramming #AgenticAI #LearnAI #AIEngineering
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.