---
title: "Deploying Chat Agents with WebSocket Connections for Real-Time Interaction"
description: "Build a real-time chat agent using WebSocket connections with FastAPI, OpenAI's streaming responses, persistent conversation state, and response chaining via previous_response_id."
canonical: https://callsphere.ai/blog/deploying-chat-agents-websocket-real-time-interaction
category: "Learn Agentic AI"
tags: ["OpenAI", "WebSocket", "Chat", "Real-Time", "Deployment"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-05-06T01:02:41.653Z
---

# Deploying Chat Agents with WebSocket Connections for Real-Time Interaction

> Build a real-time chat agent using WebSocket connections with FastAPI, OpenAI's streaming responses, persistent conversation state, and response chaining via previous_response_id.

## Why WebSockets for Chat Agents?

HTTP request-response is fine for simple chatbot interactions, but production chat agents need real-time, bidirectional communication. Users expect to see responses streaming in character by character, not waiting several seconds for a complete response. WebSockets provide a persistent connection where both client and server can send messages at any time, enabling streaming responses, typing indicators, and immediate feedback.

In this post, we build a complete WebSocket-based chat agent using FastAPI on the backend and OpenAI's streaming capabilities.

## Architecture Overview

The architecture has three layers:

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

1. **Client** — A browser (or mobile app) opens a WebSocket connection
2. **Server** — FastAPI manages WebSocket lifecycle, authentication, and message routing
3. **Agent** — OpenAI's Agents SDK processes messages and streams responses

Each WebSocket connection represents one conversation session. The server maintains conversation history and uses `previous_response_id` to chain responses for continuity.

## FastAPI WebSocket Server

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from typing import Dict
import json
import asyncio
from datetime import datetime

app = FastAPI()

class ConnectionManager:
    """Manages active WebSocket connections and conversation state."""

    def __init__(self):
        self.active: Dict[str, WebSocket] = {}
        self.conversations: Dict[str, list] = {}
        self.last_response_ids: Dict[str, str] = {}

    async def connect(self, session_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active[session_id] = websocket
        if session_id not in self.conversations:
            self.conversations[session_id] = []

    def disconnect(self, session_id: str):
        self.active.pop(session_id, None)

    async def send_event(self, session_id: str, event: dict):
        ws = self.active.get(session_id)
        if ws:
            await ws.send_json(event)

    def get_history(self, session_id: str) -> list:
        return self.conversations.get(session_id, [])

    def add_message(self, session_id: str, role: str, content: str):
        self.conversations.setdefault(session_id, []).append({
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat(),
        })

    def set_last_response_id(self, session_id: str, response_id: str):
        self.last_response_ids[session_id] = response_id

    def get_last_response_id(self, session_id: str) -> str | None:
        return self.last_response_ids.get(session_id)

manager = ConnectionManager()
```

## The Chat Agent

```python
from agents import Agent, Runner

support_agent = Agent(
    name="Real-Time Support Agent",
    instructions="""You are a real-time customer support agent for TechCorp.
    You help users with account issues, billing questions, and product
    features. Be concise since this is a real-time chat. Keep responses
    under 3 paragraphs unless the user asks for detailed explanations.
    Use markdown formatting for code snippets and lists.""",
)
```

## WebSocket Endpoint with Streaming

The core endpoint handles the WebSocket lifecycle, processes user messages through the agent, and streams responses back token by token:

```python
@app.websocket("/ws/chat/{session_id}")
async def websocket_chat(
    websocket: WebSocket,
    session_id: str,
    token: str = Query(default=None),
):
    # Authenticate the connection
    if not await verify_token(token):
        await websocket.close(code=4001, reason="Unauthorized")
        return

    await manager.connect(session_id, websocket)

    # Send connection confirmation
    await manager.send_event(session_id, {
        "type": "connected",
        "session_id": session_id,
        "timestamp": datetime.utcnow().isoformat(),
    })

    try:
        while True:
            # Receive message from client
            data = await websocket.receive_json()
            message_type = data.get("type", "message")

            if message_type == "message":
                user_text = data.get("content", "").strip()
                if not user_text:
                    continue

                # Store user message
                manager.add_message(session_id, "user", user_text)

                # Send typing indicator
                await manager.send_event(session_id, {
                    "type": "typing",
                    "is_typing": True,
                })

                # Stream agent response
                await stream_agent_response(session_id, user_text)

            elif message_type == "ping":
                await manager.send_event(session_id, {"type": "pong"})

    except WebSocketDisconnect:
        manager.disconnect(session_id)
```

## Streaming Agent Responses

The key function that bridges the agent SDK with WebSocket streaming:

```python
async def stream_agent_response(session_id: str, user_input: str):
    """Run the agent and stream the response over WebSocket."""

    # Build input with conversation history context
    previous_response_id = manager.get_last_response_id(session_id)

    full_response = ""
    stream_id = None

    try:
        result = Runner.run_streamed(
            support_agent,
            input=user_input,
        )

        # Send start event
        await manager.send_event(session_id, {
            "type": "response_start",
        })

        async for event in result.stream_events():
            # Handle different streaming event types
            if event.type == "raw_response_event":
                raw = event.data
                # Text delta — stream to client
                if hasattr(raw, "type") and raw.type == "response.output_text.delta":
                    chunk = raw.delta
                    full_response += chunk
                    await manager.send_event(session_id, {
                        "type": "text_delta",
                        "content": chunk,
                    })

        # Get final result
        final = await result.final_output
        response_id = getattr(result, "last_response_id", None)

        if response_id:
            manager.set_last_response_id(session_id, response_id)

        # Store assistant message
        manager.add_message(session_id, "assistant", full_response)

        # Send completion event
        await manager.send_event(session_id, {
            "type": "response_end",
            "full_content": full_response,
        })

    except Exception as e:
        await manager.send_event(session_id, {
            "type": "error",
            "message": "I encountered an issue. Please try again.",
        })

    finally:
        # Clear typing indicator
        await manager.send_event(session_id, {
            "type": "typing",
            "is_typing": False,
        })
```

## Response Chaining with previous_response_id

When using OpenAI's Responses API directly, `previous_response_id` links responses together so the model retains full conversation context without you resending all messages:

```python
from openai import AsyncOpenAI

client = AsyncOpenAI()

class ConversationChain:
    """Manages chained responses for a single conversation."""

    def __init__(self):
        self.last_response_id: str | None = None

    async def send_message(self, user_input: str) -> str:
        params = {
            "model": "gpt-4.1",
            "input": user_input,
        }

        # Chain to previous response if one exists
        if self.last_response_id:
            params["previous_response_id"] = self.last_response_id

        response = await client.responses.create(**params)
        self.last_response_id = response.id

        return response.output_text

# Usage across multiple turns
chain = ConversationChain()
reply1 = await chain.send_message("My name is Alex and I need help with billing.")
reply2 = await chain.send_message("What is my current plan?")
# The model remembers Alex's name and billing context from reply1
```

This approach is more efficient than resending the full conversation history because OpenAI caches previous responses server-side.

## Client-Side WebSocket Handler

```javascript
class ChatClient {
  constructor(sessionId, token) {
    this.sessionId = sessionId;
    this.ws = new WebSocket(
      `wss://api.example.com/ws/chat/${sessionId}?token=${token}`
    );
    this.messageBuffer = "";
    this.setupHandlers();
  }

  setupHandlers() {
    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      switch (data.type) {
        case "connected":
          this.onConnected(data);
          break;
        case "typing":
          this.onTyping(data.is_typing);
          break;
        case "text_delta":
          this.messageBuffer += data.content;
          this.onStreamChunk(this.messageBuffer);
          break;
        case "response_end":
          this.messageBuffer = "";
          this.onResponseComplete(data.full_content);
          break;
        case "error":
          this.onError(data.message);
          break;
      }
    };

    this.ws.onclose = (event) => {
      if (event.code !== 1000) {
        // Abnormal close — attempt reconnect
        setTimeout(() => this.reconnect(), 2000);
      }
    };
  }

  send(content) {
    this.ws.send(JSON.stringify({
      type: "message",
      content: content,
    }));
  }

  reconnect() {
    // Reconnection logic with exponential backoff
    this.ws = new WebSocket(
      `wss://api.example.com/ws/chat/${this.sessionId}`
    );
    this.setupHandlers();
  }
}
```

## Heartbeat and Connection Health

WebSocket connections can silently die. Implement a heartbeat mechanism:

```python
async def heartbeat(session_id: str, interval: int = 30):
    """Send periodic pings to keep the connection alive."""
    while session_id in manager.active:
        try:
            await manager.send_event(session_id, {"type": "ping"})
            await asyncio.sleep(interval)
        except Exception:
            manager.disconnect(session_id)
            break
```

Start the heartbeat task when a connection is established:

```python
await manager.connect(session_id, websocket)
heartbeat_task = asyncio.create_task(heartbeat(session_id))

try:
    # ... main message loop
finally:
    heartbeat_task.cancel()
    manager.disconnect(session_id)
```

## Scaling WebSocket Connections

For production deployments with multiple server instances, you need a pub/sub layer so messages reach the right server. Redis Pub/Sub works well:

```python
import redis.asyncio as redis

class RedisPubSubBridge:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)

    async def publish(self, session_id: str, event: dict):
        channel = f"chat:{session_id}"
        await self.redis.publish(channel, json.dumps(event))

    async def subscribe(self, session_id: str, callback):
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(f"chat:{session_id}")
        async for message in pubsub.listen():
            if message["type"] == "message":
                event = json.loads(message["data"])
                await callback(event)
```

This ensures that even if the WebSocket connection lands on server A but the agent processing happens on server B, the response still reaches the client.

## Production Deployment Checklist

1. **TLS termination** — Always use `wss://`, never `ws://` in production
2. **Connection limits** — Set max connections per user to prevent resource exhaustion
3. **Message size limits** — Reject messages over a reasonable size (e.g., 10KB)
4. **Authentication** — Validate JWT tokens on connection, not just on the first message
5. **Graceful shutdown** — Close all connections with code 1001 (Going Away) during deploys
6. **Monitoring** — Track active connections, message throughput, and error rates
7. **Load balancing** — Use sticky sessions or a pub/sub bridge for multi-instance deployments

---

Source: https://callsphere.ai/blog/deploying-chat-agents-websocket-real-time-interaction
