Skip to content
WebSocket Servers for AI Agents: Real-Time Bidirectional Agent Communication
Learn Agentic AI11 min read37 views

WebSocket Servers for AI Agents: Real-Time Bidirectional Agent Communication

Build real-time AI agent interfaces using WebSocket connections in FastAPI with connection lifecycle management, heartbeat mechanisms, and structured message protocols.

Why WebSockets for AI Agents

REST endpoints work well for simple request-response agent interactions, but they fall short when you need real-time, bidirectional communication. Think of a coding assistant that streams tokens as it generates code, receives user interruptions mid-generation, and pushes tool execution updates back to the client — all within a single persistent connection.

WebSockets maintain a long-lived TCP connection between client and server, allowing both sides to send messages at any time without the overhead of repeated HTTP handshakes. For AI agents, this means token-by-token streaming, live status updates during tool calls, and the ability for users to cancel or redirect the agent mid-response.

Basic WebSocket Setup in FastAPI

FastAPI has native WebSocket support. Here is a minimal agent WebSocket endpoint:

flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway<br/>auth plus rate limit"]
    APP["FastAPI app<br/>handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer<br/>business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
# app/routes/ws_agent.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import json

router = APIRouter()

@router.websocket("/ws/agent")
async def agent_websocket(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            raw = await websocket.receive_text()
            message = json.loads(raw)

            if message.get("type") == "chat":
                await handle_chat(websocket, message)
            elif message.get("type") == "cancel":
                await handle_cancel(websocket, message)
            elif message.get("type") == "ping":
                await websocket.send_json({"type": "pong"})

    except WebSocketDisconnect:
        print("Client disconnected")

Defining a Message Protocol

Establish a clear protocol so clients and servers communicate consistently:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
# app/models/ws_messages.py
from pydantic import BaseModel
from typing import Optional, Literal

class ClientMessage(BaseModel):
    type: Literal["chat", "cancel", "ping"]
    session_id: Optional[str] = None
    content: Optional[str] = None

class ServerMessage(BaseModel):
    type: Literal["token", "complete", "error", "status", "pong"]
    session_id: str
    content: Optional[str] = None
    metadata: Optional[dict] = None

Validate every incoming message:

async def agent_websocket(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            raw = await websocket.receive_text()
            try:
                message = ClientMessage.model_validate_json(raw)
            except Exception:
                await websocket.send_json({
                    "type": "error",
                    "content": "Invalid message format",
                    "session_id": "",
                })
                continue

            if message.type == "chat":
                await handle_chat(websocket, message)
    except WebSocketDisconnect:
        pass

Streaming Agent Responses Token by Token

Stream the agent output as it generates, giving users immediate feedback:

from agents import Agent, Runner

agent = Agent(name="assistant", instructions="You are a helpful assistant.")

async def handle_chat(websocket: WebSocket, message: ClientMessage):
    session_id = message.session_id or str(uuid.uuid4())

    await websocket.send_json({
        "type": "status",
        "session_id": session_id,
        "content": "thinking",
    })

    result = Runner.run_streamed(agent, message.content)

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            delta = event.data
            if hasattr(delta, "delta") and hasattr(delta.delta, "content"):
                await websocket.send_json({
                    "type": "token",
                    "session_id": session_id,
                    "content": delta.delta.content,
                })

    await websocket.send_json({
        "type": "complete",
        "session_id": session_id,
        "content": result.final_output,
    })

Connection Manager for Multiple Clients

Track active connections so you can broadcast updates or clean up stale sessions:

# app/services/connection_manager.py
from fastapi import WebSocket
import asyncio

class ConnectionManager:
    def __init__(self):
        self.active: dict[str, WebSocket] = {}
        self.locks: dict[str, asyncio.Lock] = {}

    async def connect(self, session_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active[session_id] = websocket
        self.locks[session_id] = asyncio.Lock()

    def disconnect(self, session_id: str):
        self.active.pop(session_id, None)
        self.locks.pop(session_id, None)

    async def send(self, session_id: str, data: dict):
        ws = self.active.get(session_id)
        if ws:
            async with self.locks[session_id]:
                await ws.send_json(data)

manager = ConnectionManager()

Heartbeat Mechanism

Detect dead connections before they cause resource leaks:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

import asyncio

async def heartbeat_loop(websocket: WebSocket, interval: int = 30):
    """Send pings to detect dead connections."""
    try:
        while True:
            await asyncio.sleep(interval)
            await websocket.send_json({"type": "ping"})
    except Exception:
        pass  # Connection closed

@router.websocket("/ws/agent")
async def agent_websocket(websocket: WebSocket):
    await websocket.accept()
    heartbeat_task = asyncio.create_task(
        heartbeat_loop(websocket, interval=30)
    )
    try:
        while True:
            raw = await websocket.receive_text()
            message = ClientMessage.model_validate_json(raw)
            if message.type == "chat":
                await handle_chat(websocket, message)
    except WebSocketDisconnect:
        heartbeat_task.cancel()

FAQ

How do I handle authentication on WebSocket connections?

WebSocket connections start as an HTTP upgrade request, so you can authenticate during the handshake. Pass a JWT token as a query parameter (/ws/agent?token=xxx) or in a header. Validate the token in the WebSocket endpoint before calling websocket.accept(). Reject invalid tokens by closing the connection with a 4001 code.

What happens when the WebSocket connection drops mid-agent-response?

The server receives a WebSocketDisconnect exception. Cancel any running agent tasks for that session to avoid wasting LLM tokens. On the client side, implement automatic reconnection with exponential backoff and include the session_id so the server can resume the conversation context from where it left off.

How many concurrent WebSocket connections can a single FastAPI server handle?

A single uvicorn worker can handle thousands of concurrent WebSocket connections since they are I/O-bound. The bottleneck is typically the LLM API rate limit, not the WebSocket connections themselves. Run multiple uvicorn workers with --workers 4 and use a load balancer with sticky sessions to distribute connections across workers.


#WebSocket #AIAgents #RealTime #FastAPI #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.