Skip to content
Learn Agentic AI
Learn Agentic AI12 min read3 views

Streaming AI Agent Responses with FastAPI: SSE and StreamingResponse

Implement real-time token-by-token streaming from AI agents using FastAPI's StreamingResponse and Server-Sent Events. Covers async generators, error handling during streams, and JavaScript client integration.

Why Streaming Matters for AI Agents

When an AI agent takes 5 to 15 seconds to generate a complete response, making the user stare at a loading spinner destroys the experience. Streaming sends tokens to the client as they are generated, so the user sees the response forming in real time. This is the same pattern that powers ChatGPT, Claude, and every modern AI chat interface.

FastAPI provides two mechanisms for streaming: StreamingResponse for raw HTTP streaming and Server-Sent Events (SSE) for structured event streams. For AI agent backends, SSE is usually the better choice because it provides built-in reconnection, event typing, and a clean browser API via EventSource.

Basic StreamingResponse with an Async Generator

The simplest streaming approach wraps an async generator that yields chunks from your LLM:

flowchart TD
    START["Streaming AI Agent Responses with FastAPI: SSE an…"] --> A
    A["Why Streaming Matters for AI Agents"]
    A --> B
    B["Basic StreamingResponse with an Async G…"]
    B --> C
    C["Server-Sent Events for Structured Strea…"]
    C --> D
    D["Streaming Tool Call Results"]
    D --> E
    E["JavaScript Client Integration"]
    E --> F
    F["Error Handling in Streams"]
    F --> G
    G["FAQ"]
    G --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai

app = FastAPI()

async def generate_stream(prompt: str):
    client = openai.AsyncOpenAI()
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield delta.content

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    return StreamingResponse(
        generate_stream(request.message),
        media_type="text/plain",
    )

This works, but it has limitations. The client has no structured way to know when the stream ends, whether an error occurred mid-stream, or to distinguish between different types of events like tokens versus tool calls.

Server-Sent Events for Structured Streaming

SSE solves these problems by sending typed, newline-delimited events. Install the sse-starlette package which integrates cleanly with FastAPI:

pip install sse-starlette

Now build a proper SSE endpoint:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import json
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse

router = APIRouter()

async def agent_event_stream(
    message: str,
    session_id: str,
    llm_service: LLMService,
):
    try:
        # Send a start event
        yield {
            "event": "start",
            "data": json.dumps({"session_id": session_id}),
        }

        # Stream LLM tokens
        full_response = ""
        async for token in llm_service.stream_generate(message):
            full_response += token
            yield {
                "event": "token",
                "data": json.dumps({"content": token}),
            }

        # Send completion event with metadata
        yield {
            "event": "done",
            "data": json.dumps({
                "total_tokens": len(full_response.split()),
                "session_id": session_id,
            }),
        }

    except Exception as e:
        yield {
            "event": "error",
            "data": json.dumps({"message": str(e)}),
        }

@router.post("/chat/stream")
async def stream_agent_response(
    request: ChatRequest,
    llm_service: LLMService = Depends(get_llm_service),
):
    return EventSourceResponse(
        agent_event_stream(
            message=request.message,
            session_id=request.session_id,
            llm_service=llm_service,
        )
    )

Each event has a typed event field and a JSON data payload. The client can handle token, done, and error events differently.

Streaming Tool Call Results

AI agents often invoke tools mid-response. You can stream tool execution as separate events so the frontend can render tool status indicators:

async def agent_with_tools_stream(message: str, agent: Agent):
    yield {"event": "start", "data": "{}"}

    async for event in agent.run_stream(message):
        if event.type == "token":
            yield {
                "event": "token",
                "data": json.dumps({"content": event.content}),
            }
        elif event.type == "tool_call":
            yield {
                "event": "tool_call",
                "data": json.dumps({
                    "tool": event.tool_name,
                    "args": event.arguments,
                }),
            }
        elif event.type == "tool_result":
            yield {
                "event": "tool_result",
                "data": json.dumps({
                    "tool": event.tool_name,
                    "result": event.result,
                }),
            }

    yield {"event": "done", "data": "{}"}

JavaScript Client Integration

On the frontend, use the native EventSource API or the fetch API for POST-based SSE:

async function streamChat(message) {
  const response = await fetch("/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message, session_id: "abc123" }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split("\n");

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = JSON.parse(line.slice(6));
        appendToChat(data.content);
      }
    }
  }
}

Error Handling in Streams

Errors during streaming require special handling because the HTTP status code has already been sent as 200. You cannot change it mid-stream. Instead, send an error event and close the stream:

async def safe_stream(message: str, llm: LLMService):
    try:
        async for token in llm.stream_generate(message):
            yield {"event": "token", "data": json.dumps({"content": token})}
    except openai.RateLimitError:
        yield {
            "event": "error",
            "data": json.dumps({
                "code": "rate_limited",
                "message": "Too many requests. Please retry.",
                "retry_after": 30,
            }),
        }
    except openai.APIError as e:
        yield {
            "event": "error",
            "data": json.dumps({
                "code": "llm_error",
                "message": "Agent encountered an error.",
            }),
        }

FAQ

Can I use SSE with POST requests?

Standard EventSource in the browser only supports GET requests. For POST-based SSE, use the fetch API with a ReadableStream reader as shown above, or use a library like @microsoft/fetch-event-source which provides an EventSource-like API for POST requests. Most AI chat interfaces use POST because you need to send the conversation history in the request body.

How do I handle client disconnections during streaming?

FastAPI and Starlette detect client disconnections automatically. When the client closes the connection, the async generator receives a GeneratorExit or CancelledError exception. You can catch this to clean up resources. The sse-starlette library also supports a ping parameter that sends periodic keepalive messages to detect dead connections early.

Should I buffer the full response before saving it to the database?

Yes. Accumulate tokens in a string variable as you stream them. After the stream completes successfully, save the full response to your database in the done event handler. Do not write individual tokens to the database as they arrive since that would create excessive database writes for no benefit.


#FastAPI #Streaming #SSE #AIAgents #RealTime #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Use Cases

Automating Client Document Collection: How AI Agents Chase Missing Tax Documents and Reduce Filing Delays

See how AI agents automate tax document collection — chasing missing W-2s, 1099s, and receipts via calls and texts to eliminate the #1 CPA bottleneck.

Technical Guides

AI Voice Agent Architecture: Real-Time STT, LLM, and TTS Pipeline

Deep dive into the real-time STT → LLM → TTS pipeline that powers modern AI voice agents — latency, streaming, and error recovery.

Learn Agentic AI

API Design for AI Agent Tool Functions: Best Practices and Anti-Patterns

How to design tool functions that LLMs can use effectively with clear naming, enum parameters, structured responses, informative error messages, and documentation.

Learn Agentic AI

Computer Use in GPT-5.4: Building AI Agents That Navigate Desktop Applications

Technical guide to GPT-5.4's computer use capabilities for building AI agents that interact with desktop UIs, browser automation, and real-world application workflows.

Learn Agentic AI

Prompt Engineering for AI Agents: System Prompts, Tool Descriptions, and Few-Shot Patterns

Agent-specific prompt engineering techniques: crafting effective system prompts, writing clear tool descriptions for function calling, and few-shot examples that improve complex task performance.

Learn Agentic AI

AI Agents for IT Helpdesk: L1 Automation, Ticket Routing, and Knowledge Base Integration

Build IT helpdesk AI agents with multi-agent architecture for triage, device, network, and security issues. RAG-powered knowledge base, automated ticket creation, routing, and escalation.