---
title: "WebSocket Servers for AI Agents: Real-Time Bidirectional Agent Communication"
description: "Build real-time AI agent interfaces using WebSocket connections in FastAPI with connection lifecycle management, heartbeat mechanisms, and structured message protocols."
canonical: https://callsphere.ai/blog/websocket-servers-ai-agents-real-time-bidirectional-communication
category: "Learn Agentic AI"
tags: ["WebSocket", "AI Agents", "Real-Time", "FastAPI", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-07T03:29:28.082Z
---

# WebSocket Servers for AI Agents: Real-Time Bidirectional Agent Communication

> Build real-time AI agent interfaces using WebSocket connections in FastAPI with connection lifecycle management, heartbeat mechanisms, and structured message protocols.

## Why WebSockets for AI Agents

REST endpoints work well for simple request-response agent interactions, but they fall short when you need real-time, bidirectional communication. Think of a coding assistant that streams tokens as it generates code, receives user interruptions mid-generation, and pushes tool execution updates back to the client — all within a single persistent connection.

WebSockets maintain a long-lived TCP connection between client and server, allowing both sides to send messages at any time without the overhead of repeated HTTP handshakes. For AI agents, this means token-by-token streaming, live status updates during tool calls, and the ability for users to cancel or redirect the agent mid-response.

## Basic WebSocket Setup in FastAPI

FastAPI has native WebSocket support. Here is a minimal agent WebSocket endpoint:

```mermaid
flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway
auth plus rate limit"]
    APP["FastAPI app
handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer
business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
```

```python
# app/routes/ws_agent.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import json

router = APIRouter()

@router.websocket("/ws/agent")
async def agent_websocket(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            raw = await websocket.receive_text()
            message = json.loads(raw)

            if message.get("type") == "chat":
                await handle_chat(websocket, message)
            elif message.get("type") == "cancel":
                await handle_cancel(websocket, message)
            elif message.get("type") == "ping":
                await websocket.send_json({"type": "pong"})

    except WebSocketDisconnect:
        print("Client disconnected")
```

## Defining a Message Protocol

Establish a clear protocol so clients and servers communicate consistently:

```python
# app/models/ws_messages.py
from pydantic import BaseModel
from typing import Optional, Literal

class ClientMessage(BaseModel):
    type: Literal["chat", "cancel", "ping"]
    session_id: Optional[str] = None
    content: Optional[str] = None

class ServerMessage(BaseModel):
    type: Literal["token", "complete", "error", "status", "pong"]
    session_id: str
    content: Optional[str] = None
    metadata: Optional[dict] = None
```

Validate every incoming message:

```python
async def agent_websocket(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            raw = await websocket.receive_text()
            try:
                message = ClientMessage.model_validate_json(raw)
            except Exception:
                await websocket.send_json({
                    "type": "error",
                    "content": "Invalid message format",
                    "session_id": "",
                })
                continue

            if message.type == "chat":
                await handle_chat(websocket, message)
    except WebSocketDisconnect:
        pass
```

## Streaming Agent Responses Token by Token

Stream the agent output as it generates, giving users immediate feedback:

```python
from agents import Agent, Runner

agent = Agent(name="assistant", instructions="You are a helpful assistant.")

async def handle_chat(websocket: WebSocket, message: ClientMessage):
    session_id = message.session_id or str(uuid.uuid4())

    await websocket.send_json({
        "type": "status",
        "session_id": session_id,
        "content": "thinking",
    })

    result = Runner.run_streamed(agent, message.content)

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            delta = event.data
            if hasattr(delta, "delta") and hasattr(delta.delta, "content"):
                await websocket.send_json({
                    "type": "token",
                    "session_id": session_id,
                    "content": delta.delta.content,
                })

    await websocket.send_json({
        "type": "complete",
        "session_id": session_id,
        "content": result.final_output,
    })
```

## Connection Manager for Multiple Clients

Track active connections so you can broadcast updates or clean up stale sessions:

```python
# app/services/connection_manager.py
from fastapi import WebSocket
import asyncio

class ConnectionManager:
    def __init__(self):
        self.active: dict[str, WebSocket] = {}
        self.locks: dict[str, asyncio.Lock] = {}

    async def connect(self, session_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active[session_id] = websocket
        self.locks[session_id] = asyncio.Lock()

    def disconnect(self, session_id: str):
        self.active.pop(session_id, None)
        self.locks.pop(session_id, None)

    async def send(self, session_id: str, data: dict):
        ws = self.active.get(session_id)
        if ws:
            async with self.locks[session_id]:
                await ws.send_json(data)

manager = ConnectionManager()
```

## Heartbeat Mechanism

Detect dead connections before they cause resource leaks:

```python
import asyncio

async def heartbeat_loop(websocket: WebSocket, interval: int = 30):
    """Send pings to detect dead connections."""
    try:
        while True:
            await asyncio.sleep(interval)
            await websocket.send_json({"type": "ping"})
    except Exception:
        pass  # Connection closed

@router.websocket("/ws/agent")
async def agent_websocket(websocket: WebSocket):
    await websocket.accept()
    heartbeat_task = asyncio.create_task(
        heartbeat_loop(websocket, interval=30)
    )
    try:
        while True:
            raw = await websocket.receive_text()
            message = ClientMessage.model_validate_json(raw)
            if message.type == "chat":
                await handle_chat(websocket, message)
    except WebSocketDisconnect:
        heartbeat_task.cancel()
```

## FAQ

### How do I handle authentication on WebSocket connections?

WebSocket connections start as an HTTP upgrade request, so you can authenticate during the handshake. Pass a JWT token as a query parameter (`/ws/agent?token=xxx`) or in a header. Validate the token in the WebSocket endpoint before calling `websocket.accept()`. Reject invalid tokens by closing the connection with a 4001 code.

### What happens when the WebSocket connection drops mid-agent-response?

The server receives a `WebSocketDisconnect` exception. Cancel any running agent tasks for that session to avoid wasting LLM tokens. On the client side, implement automatic reconnection with exponential backoff and include the `session_id` so the server can resume the conversation context from where it left off.

### How many concurrent WebSocket connections can a single FastAPI server handle?

A single uvicorn worker can handle thousands of concurrent WebSocket connections since they are I/O-bound. The bottleneck is typically the LLM API rate limit, not the WebSocket connections themselves. Run multiple uvicorn workers with `--workers 4` and use a load balancer with sticky sessions to distribute connections across workers.

---

#WebSocket #AIAgents #RealTime #FastAPI #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/websocket-servers-ai-agents-real-time-bidirectional-communication
