---
title: "Real-Time AI: Streaming, WebSockets, and Server-Sent Events for LLM Applications"
description: "How to build responsive AI applications using streaming, WebSockets, and SSE, with practical patterns for token streaming, agent status updates, and real-time collaboration."
canonical: https://callsphere.ai/blog/real-time-ai-streaming-websockets-server-sent-events
category: "Technology"
tags: ["Streaming", "WebSockets", "SSE", "Real-Time AI", "API Design", "Frontend"]
author: "CallSphere Team"
published: 2026-03-12T00:00:00.000Z
updated: 2026-05-07T03:57:47.378Z
---

# Real-Time AI: Streaming, WebSockets, and Server-Sent Events for LLM Applications

> How to build responsive AI applications using streaming, WebSockets, and SSE, with practical patterns for token streaming, agent status updates, and real-time collaboration.

## Why Real-Time Matters for AI

LLM inference is slow compared to traditional APIs. A complex query to a frontier model can take 5-30 seconds for the full response. Without streaming, users stare at a loading spinner for the entire duration. With streaming, they see tokens appear in real-time, dramatically improving perceived performance and user experience.

But token streaming is just the beginning. Production AI systems need real-time updates for agent status, tool execution progress, error notifications, and multi-user collaboration.

### Token Streaming: The Foundation

#### Server-Sent Events (SSE)

SSE is the most common pattern for LLM token streaming. It uses a standard HTTP connection with a special content type:

```python
# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()

@app.post("/api/chat")
async def chat(request: ChatRequest):
    async def generate():
        client = anthropic.AsyncAnthropic()
        async with client.messages.stream(
            model="claude-sonnet-4-20250514",
            messages=request.messages,
            max_tokens=4096
        ) as stream:
            async for event in stream:
                if event.type == "content_block_delta":
                    yield f"data: {json.dumps({'text': event.delta.text})}\n\n"

            # Send final message with usage stats
            final = await stream.get_final_message()
            yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )
```

Client-side consumption:

```typescript
const response = await fetch('/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ messages })
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const chunk = decoder.decode(value);
  const lines = chunk.split('\n').filter(l => l.startsWith('data: '));

  for (const line of lines) {
    const data = JSON.parse(line.slice(6));
    if (data.text) appendToUI(data.text);
    if (data.done) showUsageStats(data.usage);
  }
}
```

**SSE advantages**: Simple, HTTP-based, works through most proxies and load balancers, automatic reconnection built into the EventSource API.

```mermaid
flowchart TD
    HUB(("Why Real-Time Matters
for AI"))
    HUB --> L0["Token Streaming: The
Foundation"]
    style L0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    HUB --> L1["Choosing the Right Protocol"]
    style L1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    HUB --> L2["Production Patterns"]
    style L2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style HUB fill:#4f46e5,stroke:#4338ca,color:#fff
```

```mermaid
flowchart LR
    IN(["Input prompt"])
    subgraph PRE["Pre processing"]
        TOK["Tokenize"]
        EMB["Embed"]
    end
    subgraph CORE["Model Core"]
        ATTN["Self attention layers"]
        MLP["Feed forward layers"]
    end
    subgraph POST["Post processing"]
        SAMP["Sampling"]
        DETOK["Detokenize"]
    end
    OUT(["Generated text"])
    IN --> TOK --> EMB --> ATTN --> MLP --> SAMP --> DETOK --> OUT
    style IN fill:#f1f5f9,stroke:#64748b,color:#0f172a
    style CORE fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```mermaid
flowchart TD
    HUB(("Why Real-Time Matters
for AI"))
    HUB --> L0["Token Streaming: The
Foundation"]
    style L0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    HUB --> L1["Choosing the Right Protocol"]
    style L1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    HUB --> L2["Production Patterns"]
    style L2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style HUB fill:#4f46e5,stroke:#4338ca,color:#fff
```

**SSE limitations**: Unidirectional (server to client only), limited to text data, connection limits per domain in browsers (6 in HTTP/1.1).

#### WebSockets

WebSockets provide full-duplex communication, essential for interactive agent sessions:

```python
# FastAPI WebSocket for interactive agent
from fastapi import WebSocket

@app.websocket("/ws/agent")
async def agent_session(websocket: WebSocket):
    await websocket.accept()
    agent = create_agent(tools=available_tools)

    while True:
        user_message = await websocket.receive_json()

        async for event in agent.run_stream(user_message["content"]):
            match event.type:
                case "thinking":
                    await websocket.send_json({
                        "type": "thinking",
                        "content": event.text
                    })
                case "tool_call":
                    await websocket.send_json({
                        "type": "tool_call",
                        "tool": event.name,
                        "args": event.args,
                        "status": "executing"
                    })
                case "tool_result":
                    await websocket.send_json({
                        "type": "tool_result",
                        "tool": event.name,
                        "result": event.result
                    })
                case "text_delta":
                    await websocket.send_json({
                        "type": "text",
                        "content": event.text
                    })
```

**WebSocket advantages**: Bidirectional, low latency, supports binary data, client can send messages while receiving.

**WebSocket limitations**: More complex infrastructure (sticky sessions, WebSocket-aware load balancers), no automatic reconnection, connection management overhead.

### Choosing the Right Protocol

| Use Case | Recommended Protocol |
| --- | --- |
| Simple chat with streaming | SSE |
| Interactive agent with tool use | WebSocket |
| Real-time collaboration | WebSocket |
| Notification/status updates | SSE |
| Voice/audio streaming | WebSocket |
| Webhook-style events | SSE |

### Production Patterns

#### Structured Streaming Events

Do not just stream raw text. Define an event protocol:

```typescript
type StreamEvent =
  | { type: 'text_delta'; content: string }
  | { type: 'tool_start'; tool: string; args: Record }
  | { type: 'tool_end'; tool: string; result: unknown; duration_ms: number }
  | { type: 'thinking'; content: string }
  | { type: 'error'; message: string; recoverable: boolean }
  | { type: 'done'; usage: { input_tokens: number; output_tokens: number } };
```

This enables rich UI updates: show a spinner when a tool is executing, display thinking text in a collapsible panel, and show token usage when complete.

#### Backpressure Handling

If the client cannot consume tokens as fast as the model generates them (common on slow networks), implement backpressure:

- **SSE**: The TCP send buffer naturally provides backpressure, but set reasonable buffer limits
- **WebSocket**: Monitor the send buffer size and pause generation if it exceeds a threshold

#### Reconnection and State Recovery

Connections drop. Your protocol should handle it:

```python
# Server-side: assign event IDs for recovery
event_id = 0
async for token in stream:
    event_id += 1
    yield f"id: {event_id}\ndata: {json.dumps({'text': token})}\n\n"

# Client-side: reconnect with Last-Event-ID
const eventSource = new EventSource('/stream', {
    headers: { 'Last-Event-ID': lastReceivedId }
});
```

#### Infrastructure Considerations

- **Reverse proxies**: Nginx requires `proxy_buffering off` and `proxy_read_timeout` settings for SSE. Use `proxy_http_version 1.1` and `Upgrade` headers for WebSocket
- **Load balancers**: WebSocket requires sticky sessions or connection-aware routing. SSE works with standard HTTP load balancing
- **CDNs**: Most CDNs do not support SSE/WebSocket. Route real-time traffic directly to origin
- **Kubernetes**: Use `sessionAffinity: ClientIP` for WebSocket services; increase `proxy-read-timeout` annotations for SSE

Streaming is not just a UX nicety -- it is a fundamental requirement for AI applications. The difference between a 10-second loading spinner and seeing tokens appear immediately is the difference between an application users tolerate and one they enjoy.

**Sources:** [MDN Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) | [FastAPI WebSocket Docs](https://fastapi.tiangolo.com/advanced/websockets/) | [Vercel AI SDK Streaming](https://sdk.vercel.ai/docs/ai-sdk-core/streaming)

---

Source: https://callsphere.ai/blog/real-time-ai-streaming-websockets-server-sent-events
