Skip to content
AI Engineering
AI Engineering11 min read0 views

FastAPI WebSocket Production Patterns for AI Agents in 2026

How to run FastAPI WebSockets in production for AI voice agents: connection managers, the broadcast bug everyone hits, JWT auth on upgrade, and Uvicorn worker tuning.

One synchronous database call blocks every WebSocket connection on the worker. That is the bug that takes FastAPI WebSocket deployments down at 2 a.m. on day 17.

Why does FastAPI need its own WebSocket playbook?

flowchart TD
  Client[Client] --> Edge[Cloudflare Worker]
  Edge -->|WS upgrade| DO[Durable Object]
  DO --> AI[(OpenAI Realtime WS)]
  AI --> DO
  DO --> Client
  DO -.hibernation.-> Storage[(Persisted state)]
CallSphere reference architecture

Because FastAPI's superpower — async request handling — is also its sharpest knife. A single worker can serve thousands of concurrent WebSocket connections on cooperative multitasking. The moment any handler blocks the event loop, every other client on that worker freezes. So the production pattern is not "make WebSockets work" but "audit every line in a WebSocket handler for accidental blocking."

The good news: when done right, FastAPI WebSockets are lighter and more debuggable than Node.js equivalents, with full access to Python's audio/ML stack and first-class async DB drivers like asyncpg and motor.

How does the production pattern actually look?

A correct FastAPI WebSocket service has six pieces:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
  1. A connection manager that tracks user_id → WebSocket in memory plus Redis.
  2. A JWT auth handshake on the upgrade path — query parameter, since browsers cannot set headers.
  3. A broadcast helper that catches send failures and prunes dead sockets in the same pass.
  4. A heartbeat task sending pings every 30 s and closing zombies after two missed pongs.
  5. Uvicorn workers sized to roughly min(2 × CPU, 4) because each worker holds independent connection state.
  6. Redis pub/sub as the cross-worker bus so broadcasts span all workers and pods.

Skip any one of these and you ship a service that works in dev and corrupts in prod.

CallSphere's implementation

The CallSphere Healthcare voice agent runs on FastAPI port 8084, accepting OpenAI Realtime API connections over WebSocket. The handler is roughly 280 lines. We use:

  • Auth on upgrade via short-lived (60 s) JWTs encoded in a query parameter, validated before websocket.accept().
  • Per-session state in a dict keyed by session_id plus Redis for cross-worker fan-out.
  • asyncpg for the audit log so the event loop never blocks on Postgres.
  • OpenTelemetry spans wrapping every WebSocket message, exported to our self-hosted Tempo instance.

That FastAPI service is one of 115+ database tables and 90+ tools coordinated across the platform, with HIPAA controls applied to every span.

Code: the connection manager that does not deadlock

from fastapi import WebSocket
import asyncio

class ConnectionManager:
    def __init__(self) -> None:
        self.active: dict[str, WebSocket] = {}

    async def connect(self, sid: str, ws: WebSocket) -> None:
        await ws.accept()
        self.active[sid] = ws

    async def broadcast(self, payload: dict) -> None:
        dead = []
        for sid, ws in list(self.active.items()):
            try:
                await ws.send_json(payload)
            except Exception:
                dead.append(sid)
        for sid in dead:
            self.active.pop(sid, None)

Build steps

  1. Use async def for every WebSocket handler. Sync handlers block the loop and there is no warning.
  2. Validate JWT in a Depends before websocket.accept(). Reject on the upgrade, not after.
  3. Run Uvicorn with --workers 2 --loop uvloop --http httptools for ~3× throughput.
  4. Add a periodic asyncio.create_task for heartbeats; pong timeouts go to the same broadcast cleanup path as send failures.
  5. Wire asyncio.Queue between your audio receive task and your transcription task to apply backpressure.
  6. Test with starlette.testclient.TestClient — it supports WebSockets without a running server.

FAQ

Why does broadcast hang for 30 seconds? A client went silent without disconnecting and your send_json is waiting on the TCP buffer. Use a per-send timeout (asyncio.wait_for) and treat the timeout as a dead socket.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Can I share a single asyncpg pool? Yes — that is the correct pattern. Just never pass connections across tasks; acquire and release per message.

How do I scale across pods? Add Redis pub/sub. Each pod publishes outbound events; every pod subscribes and local-fans-out to its connections.

Should I use websockets library or Starlette's? Use Starlette's (FastAPI's built-in). It integrates with Depends, middleware, and OpenAPI tooling.

Do I need sticky sessions? No, if you use Redis pub/sub. Yes, only if you keep per-session state purely in worker memory.

CallSphere ships 37 agents on FastAPI + Node services. Try the 14-day free trial, or join the affiliate program to refer accounts at $149/$499/$1499.

Sources

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.