---
title: "Streaming AI Agent Responses with FastAPI: SSE and StreamingResponse"
description: "Implement real-time token-by-token streaming from AI agents using FastAPI's StreamingResponse and Server-Sent Events. Covers async generators, error handling during streams, and JavaScript client integration."
canonical: https://callsphere.ai/blog/streaming-ai-agent-responses-fastapi-sse-streaming-response
category: "Learn Agentic AI"
tags: ["FastAPI", "Streaming", "SSE", "AI Agents", "Real-Time"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-07T02:05:47.787Z
---

# Streaming AI Agent Responses with FastAPI: SSE and StreamingResponse

> Implement real-time token-by-token streaming from AI agents using FastAPI's StreamingResponse and Server-Sent Events. Covers async generators, error handling during streams, and JavaScript client integration.

## Why Streaming Matters for AI Agents

When an AI agent takes 5 to 15 seconds to generate a complete response, making the user stare at a loading spinner destroys the experience. Streaming sends tokens to the client as they are generated, so the user sees the response forming in real time. This is the same pattern that powers ChatGPT, Claude, and every modern AI chat interface.

FastAPI provides two mechanisms for streaming: `StreamingResponse` for raw HTTP streaming and Server-Sent Events (SSE) for structured event streams. For AI agent backends, SSE is usually the better choice because it provides built-in reconnection, event typing, and a clean browser API via `EventSource`.

## Basic StreamingResponse with an Async Generator

The simplest streaming approach wraps an async generator that yields chunks from your LLM:

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai

app = FastAPI()

async def generate_stream(prompt: str):
    client = openai.AsyncOpenAI()
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield delta.content

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    return StreamingResponse(
        generate_stream(request.message),
        media_type="text/plain",
    )
```

This works, but it has limitations. The client has no structured way to know when the stream ends, whether an error occurred mid-stream, or to distinguish between different types of events like tokens versus tool calls.

## Server-Sent Events for Structured Streaming

SSE solves these problems by sending typed, newline-delimited events. Install the `sse-starlette` package which integrates cleanly with FastAPI:

```bash
pip install sse-starlette
```

Now build a proper SSE endpoint:

```python
import json
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse

router = APIRouter()

async def agent_event_stream(
    message: str,
    session_id: str,
    llm_service: LLMService,
):
    try:
        # Send a start event
        yield {
            "event": "start",
            "data": json.dumps({"session_id": session_id}),
        }

        # Stream LLM tokens
        full_response = ""
        async for token in llm_service.stream_generate(message):
            full_response += token
            yield {
                "event": "token",
                "data": json.dumps({"content": token}),
            }

        # Send completion event with metadata
        yield {
            "event": "done",
            "data": json.dumps({
                "total_tokens": len(full_response.split()),
                "session_id": session_id,
            }),
        }

    except Exception as e:
        yield {
            "event": "error",
            "data": json.dumps({"message": str(e)}),
        }

@router.post("/chat/stream")
async def stream_agent_response(
    request: ChatRequest,
    llm_service: LLMService = Depends(get_llm_service),
):
    return EventSourceResponse(
        agent_event_stream(
            message=request.message,
            session_id=request.session_id,
            llm_service=llm_service,
        )
    )
```

Each event has a typed `event` field and a JSON `data` payload. The client can handle `token`, `done`, and `error` events differently.

## Streaming Tool Call Results

AI agents often invoke tools mid-response. You can stream tool execution as separate events so the frontend can render tool status indicators:

```python
async def agent_with_tools_stream(message: str, agent: Agent):
    yield {"event": "start", "data": "{}"}

    async for event in agent.run_stream(message):
        if event.type == "token":
            yield {
                "event": "token",
                "data": json.dumps({"content": event.content}),
            }
        elif event.type == "tool_call":
            yield {
                "event": "tool_call",
                "data": json.dumps({
                    "tool": event.tool_name,
                    "args": event.arguments,
                }),
            }
        elif event.type == "tool_result":
            yield {
                "event": "tool_result",
                "data": json.dumps({
                    "tool": event.tool_name,
                    "result": event.result,
                }),
            }

    yield {"event": "done", "data": "{}"}
```

## JavaScript Client Integration

On the frontend, use the native `EventSource` API or the `fetch` API for POST-based SSE:

```javascript
async function streamChat(message) {
  const response = await fetch("/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message, session_id: "abc123" }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split("\n");

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = JSON.parse(line.slice(6));
        appendToChat(data.content);
      }
    }
  }
}
```

## Error Handling in Streams

Errors during streaming require special handling because the HTTP status code has already been sent as 200. You cannot change it mid-stream. Instead, send an error event and close the stream:

```python
async def safe_stream(message: str, llm: LLMService):
    try:
        async for token in llm.stream_generate(message):
            yield {"event": "token", "data": json.dumps({"content": token})}
    except openai.RateLimitError:
        yield {
            "event": "error",
            "data": json.dumps({
                "code": "rate_limited",
                "message": "Too many requests. Please retry.",
                "retry_after": 30,
            }),
        }
    except openai.APIError as e:
        yield {
            "event": "error",
            "data": json.dumps({
                "code": "llm_error",
                "message": "Agent encountered an error.",
            }),
        }
```

## FAQ

### Can I use SSE with POST requests?

Standard `EventSource` in the browser only supports GET requests. For POST-based SSE, use the `fetch` API with a `ReadableStream` reader as shown above, or use a library like `@microsoft/fetch-event-source` which provides an `EventSource`-like API for POST requests. Most AI chat interfaces use POST because you need to send the conversation history in the request body.

### How do I handle client disconnections during streaming?

FastAPI and Starlette detect client disconnections automatically. When the client closes the connection, the async generator receives a `GeneratorExit` or `CancelledError` exception. You can catch this to clean up resources. The `sse-starlette` library also supports a `ping` parameter that sends periodic keepalive messages to detect dead connections early.

### Should I buffer the full response before saving it to the database?

Yes. Accumulate tokens in a string variable as you stream them. After the stream completes successfully, save the full response to your database in the `done` event handler. Do not write individual tokens to the database as they arrive since that would create excessive database writes for no benefit.

---

#FastAPI #Streaming #SSE #AIAgents #RealTime #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/streaming-ai-agent-responses-fastapi-sse-streaming-response
