Skip to content
Learn Agentic AI
Learn Agentic AI14 min read5 views

Deploying Chat Agents with WebSocket Connections for Real-Time Interaction

Build a real-time chat agent using WebSocket connections with FastAPI, OpenAI's streaming responses, persistent conversation state, and response chaining via previous_response_id.

Why WebSockets for Chat Agents?

HTTP request-response is fine for simple chatbot interactions, but production chat agents need real-time, bidirectional communication. Users expect to see responses streaming in character by character, not waiting several seconds for a complete response. WebSockets provide a persistent connection where both client and server can send messages at any time, enabling streaming responses, typing indicators, and immediate feedback.

In this post, we build a complete WebSocket-based chat agent using FastAPI on the backend and OpenAI's streaming capabilities.

Architecture Overview

The architecture has three layers:

flowchart TD
    START["Deploying Chat Agents with WebSocket Connections …"] --> A
    A["Why WebSockets for Chat Agents?"]
    A --> B
    B["Architecture Overview"]
    B --> C
    C["FastAPI WebSocket Server"]
    C --> D
    D["The Chat Agent"]
    D --> E
    E["WebSocket Endpoint with Streaming"]
    E --> F
    F["Streaming Agent Responses"]
    F --> G
    G["Response Chaining with previous_respons…"]
    G --> H
    H["Client-Side WebSocket Handler"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
  1. Client — A browser (or mobile app) opens a WebSocket connection
  2. Server — FastAPI manages WebSocket lifecycle, authentication, and message routing
  3. Agent — OpenAI's Agents SDK processes messages and streams responses

Each WebSocket connection represents one conversation session. The server maintains conversation history and uses previous_response_id to chain responses for continuity.

FastAPI WebSocket Server

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from typing import Dict
import json
import asyncio
from datetime import datetime

app = FastAPI()

class ConnectionManager:
    """Manages active WebSocket connections and conversation state."""

    def __init__(self):
        self.active: Dict[str, WebSocket] = {}
        self.conversations: Dict[str, list] = {}
        self.last_response_ids: Dict[str, str] = {}

    async def connect(self, session_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active[session_id] = websocket
        if session_id not in self.conversations:
            self.conversations[session_id] = []

    def disconnect(self, session_id: str):
        self.active.pop(session_id, None)

    async def send_event(self, session_id: str, event: dict):
        ws = self.active.get(session_id)
        if ws:
            await ws.send_json(event)

    def get_history(self, session_id: str) -> list:
        return self.conversations.get(session_id, [])

    def add_message(self, session_id: str, role: str, content: str):
        self.conversations.setdefault(session_id, []).append({
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat(),
        })

    def set_last_response_id(self, session_id: str, response_id: str):
        self.last_response_ids[session_id] = response_id

    def get_last_response_id(self, session_id: str) -> str | None:
        return self.last_response_ids.get(session_id)

manager = ConnectionManager()

The Chat Agent

from agents import Agent, Runner

support_agent = Agent(
    name="Real-Time Support Agent",
    instructions="""You are a real-time customer support agent for TechCorp.
    You help users with account issues, billing questions, and product
    features. Be concise since this is a real-time chat. Keep responses
    under 3 paragraphs unless the user asks for detailed explanations.
    Use markdown formatting for code snippets and lists.""",
)

WebSocket Endpoint with Streaming

The core endpoint handles the WebSocket lifecycle, processes user messages through the agent, and streams responses back token by token:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

flowchart TD
    CENTER(("Core Concepts"))
    CENTER --> N0["Client — A browser or mobile app opens …"]
    CENTER --> N1["Server — FastAPI manages WebSocket life…"]
    CENTER --> N2["Agent — OpenAI39s Agents SDK processes …"]
    CENTER --> N3["TLS termination — Always use wss://, ne…"]
    CENTER --> N4["Connection limits — Set max connections…"]
    CENTER --> N5["Message size limits — Reject messages o…"]
    style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
@app.websocket("/ws/chat/{session_id}")
async def websocket_chat(
    websocket: WebSocket,
    session_id: str,
    token: str = Query(default=None),
):
    # Authenticate the connection
    if not await verify_token(token):
        await websocket.close(code=4001, reason="Unauthorized")
        return

    await manager.connect(session_id, websocket)

    # Send connection confirmation
    await manager.send_event(session_id, {
        "type": "connected",
        "session_id": session_id,
        "timestamp": datetime.utcnow().isoformat(),
    })

    try:
        while True:
            # Receive message from client
            data = await websocket.receive_json()
            message_type = data.get("type", "message")

            if message_type == "message":
                user_text = data.get("content", "").strip()
                if not user_text:
                    continue

                # Store user message
                manager.add_message(session_id, "user", user_text)

                # Send typing indicator
                await manager.send_event(session_id, {
                    "type": "typing",
                    "is_typing": True,
                })

                # Stream agent response
                await stream_agent_response(session_id, user_text)

            elif message_type == "ping":
                await manager.send_event(session_id, {"type": "pong"})

    except WebSocketDisconnect:
        manager.disconnect(session_id)

Streaming Agent Responses

The key function that bridges the agent SDK with WebSocket streaming:

async def stream_agent_response(session_id: str, user_input: str):
    """Run the agent and stream the response over WebSocket."""

    # Build input with conversation history context
    previous_response_id = manager.get_last_response_id(session_id)

    full_response = ""
    stream_id = None

    try:
        result = Runner.run_streamed(
            support_agent,
            input=user_input,
        )

        # Send start event
        await manager.send_event(session_id, {
            "type": "response_start",
        })

        async for event in result.stream_events():
            # Handle different streaming event types
            if event.type == "raw_response_event":
                raw = event.data
                # Text delta — stream to client
                if hasattr(raw, "type") and raw.type == "response.output_text.delta":
                    chunk = raw.delta
                    full_response += chunk
                    await manager.send_event(session_id, {
                        "type": "text_delta",
                        "content": chunk,
                    })

        # Get final result
        final = await result.final_output
        response_id = getattr(result, "last_response_id", None)

        if response_id:
            manager.set_last_response_id(session_id, response_id)

        # Store assistant message
        manager.add_message(session_id, "assistant", full_response)

        # Send completion event
        await manager.send_event(session_id, {
            "type": "response_end",
            "full_content": full_response,
        })

    except Exception as e:
        await manager.send_event(session_id, {
            "type": "error",
            "message": "I encountered an issue. Please try again.",
        })

    finally:
        # Clear typing indicator
        await manager.send_event(session_id, {
            "type": "typing",
            "is_typing": False,
        })

Response Chaining with previous_response_id

When using OpenAI's Responses API directly, previous_response_id links responses together so the model retains full conversation context without you resending all messages:

from openai import AsyncOpenAI

client = AsyncOpenAI()

class ConversationChain:
    """Manages chained responses for a single conversation."""

    def __init__(self):
        self.last_response_id: str | None = None

    async def send_message(self, user_input: str) -> str:
        params = {
            "model": "gpt-4.1",
            "input": user_input,
        }

        # Chain to previous response if one exists
        if self.last_response_id:
            params["previous_response_id"] = self.last_response_id

        response = await client.responses.create(**params)
        self.last_response_id = response.id

        return response.output_text

# Usage across multiple turns
chain = ConversationChain()
reply1 = await chain.send_message("My name is Alex and I need help with billing.")
reply2 = await chain.send_message("What is my current plan?")
# The model remembers Alex's name and billing context from reply1

This approach is more efficient than resending the full conversation history because OpenAI caches previous responses server-side.

Client-Side WebSocket Handler

class ChatClient {
  constructor(sessionId, token) {
    this.sessionId = sessionId;
    this.ws = new WebSocket(
      `wss://api.example.com/ws/chat/${sessionId}?token=${token}`
    );
    this.messageBuffer = "";
    this.setupHandlers();
  }

  setupHandlers() {
    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      switch (data.type) {
        case "connected":
          this.onConnected(data);
          break;
        case "typing":
          this.onTyping(data.is_typing);
          break;
        case "text_delta":
          this.messageBuffer += data.content;
          this.onStreamChunk(this.messageBuffer);
          break;
        case "response_end":
          this.messageBuffer = "";
          this.onResponseComplete(data.full_content);
          break;
        case "error":
          this.onError(data.message);
          break;
      }
    };

    this.ws.onclose = (event) => {
      if (event.code !== 1000) {
        // Abnormal close — attempt reconnect
        setTimeout(() => this.reconnect(), 2000);
      }
    };
  }

  send(content) {
    this.ws.send(JSON.stringify({
      type: "message",
      content: content,
    }));
  }

  reconnect() {
    // Reconnection logic with exponential backoff
    this.ws = new WebSocket(
      `wss://api.example.com/ws/chat/${this.sessionId}`
    );
    this.setupHandlers();
  }
}

Heartbeat and Connection Health

WebSocket connections can silently die. Implement a heartbeat mechanism:

async def heartbeat(session_id: str, interval: int = 30):
    """Send periodic pings to keep the connection alive."""
    while session_id in manager.active:
        try:
            await manager.send_event(session_id, {"type": "ping"})
            await asyncio.sleep(interval)
        except Exception:
            manager.disconnect(session_id)
            break

Start the heartbeat task when a connection is established:

await manager.connect(session_id, websocket)
heartbeat_task = asyncio.create_task(heartbeat(session_id))

try:
    # ... main message loop
finally:
    heartbeat_task.cancel()
    manager.disconnect(session_id)

Scaling WebSocket Connections

For production deployments with multiple server instances, you need a pub/sub layer so messages reach the right server. Redis Pub/Sub works well:

import redis.asyncio as redis

class RedisPubSubBridge:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)

    async def publish(self, session_id: str, event: dict):
        channel = f"chat:{session_id}"
        await self.redis.publish(channel, json.dumps(event))

    async def subscribe(self, session_id: str, callback):
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(f"chat:{session_id}")
        async for message in pubsub.listen():
            if message["type"] == "message":
                event = json.loads(message["data"])
                await callback(event)

This ensures that even if the WebSocket connection lands on server A but the agent processing happens on server B, the response still reaches the client.

Production Deployment Checklist

  1. TLS termination — Always use wss://, never ws:// in production
  2. Connection limits — Set max connections per user to prevent resource exhaustion
  3. Message size limits — Reject messages over a reasonable size (e.g., 10KB)
  4. Authentication — Validate JWT tokens on connection, not just on the first message
  5. Graceful shutdown — Close all connections with code 1001 (Going Away) during deploys
  6. Monitoring — Track active connections, message throughput, and error rates
  7. Load balancing — Use sticky sessions or a pub/sub bridge for multi-instance deployments
Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.