Deploying Chat Agents with WebSocket Connections for Real-Time Interaction
Build a real-time chat agent using WebSocket connections with FastAPI, OpenAI's streaming responses, persistent conversation state, and response chaining via previous_response_id.
Why WebSockets for Chat Agents?
HTTP request-response is fine for simple chatbot interactions, but production chat agents need real-time, bidirectional communication. Users expect to see responses streaming in character by character, not waiting several seconds for a complete response. WebSockets provide a persistent connection where both client and server can send messages at any time, enabling streaming responses, typing indicators, and immediate feedback.
In this post, we build a complete WebSocket-based chat agent using FastAPI on the backend and OpenAI's streaming capabilities.
Architecture Overview
The architecture has three layers:
flowchart TD
START["Deploying Chat Agents with WebSocket Connections …"] --> A
A["Why WebSockets for Chat Agents?"]
A --> B
B["Architecture Overview"]
B --> C
C["FastAPI WebSocket Server"]
C --> D
D["The Chat Agent"]
D --> E
E["WebSocket Endpoint with Streaming"]
E --> F
F["Streaming Agent Responses"]
F --> G
G["Response Chaining with previous_respons…"]
G --> H
H["Client-Side WebSocket Handler"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
- Client — A browser (or mobile app) opens a WebSocket connection
- Server — FastAPI manages WebSocket lifecycle, authentication, and message routing
- Agent — OpenAI's Agents SDK processes messages and streams responses
Each WebSocket connection represents one conversation session. The server maintains conversation history and uses previous_response_id to chain responses for continuity.
FastAPI WebSocket Server
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from typing import Dict
import json
import asyncio
from datetime import datetime
app = FastAPI()
class ConnectionManager:
"""Manages active WebSocket connections and conversation state."""
def __init__(self):
self.active: Dict[str, WebSocket] = {}
self.conversations: Dict[str, list] = {}
self.last_response_ids: Dict[str, str] = {}
async def connect(self, session_id: str, websocket: WebSocket):
await websocket.accept()
self.active[session_id] = websocket
if session_id not in self.conversations:
self.conversations[session_id] = []
def disconnect(self, session_id: str):
self.active.pop(session_id, None)
async def send_event(self, session_id: str, event: dict):
ws = self.active.get(session_id)
if ws:
await ws.send_json(event)
def get_history(self, session_id: str) -> list:
return self.conversations.get(session_id, [])
def add_message(self, session_id: str, role: str, content: str):
self.conversations.setdefault(session_id, []).append({
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
})
def set_last_response_id(self, session_id: str, response_id: str):
self.last_response_ids[session_id] = response_id
def get_last_response_id(self, session_id: str) -> str | None:
return self.last_response_ids.get(session_id)
manager = ConnectionManager()
The Chat Agent
from agents import Agent, Runner
support_agent = Agent(
name="Real-Time Support Agent",
instructions="""You are a real-time customer support agent for TechCorp.
You help users with account issues, billing questions, and product
features. Be concise since this is a real-time chat. Keep responses
under 3 paragraphs unless the user asks for detailed explanations.
Use markdown formatting for code snippets and lists.""",
)
WebSocket Endpoint with Streaming
The core endpoint handles the WebSocket lifecycle, processes user messages through the agent, and streams responses back token by token:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
flowchart TD
CENTER(("Core Concepts"))
CENTER --> N0["Client — A browser or mobile app opens …"]
CENTER --> N1["Server — FastAPI manages WebSocket life…"]
CENTER --> N2["Agent — OpenAI39s Agents SDK processes …"]
CENTER --> N3["TLS termination — Always use wss://, ne…"]
CENTER --> N4["Connection limits — Set max connections…"]
CENTER --> N5["Message size limits — Reject messages o…"]
style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
@app.websocket("/ws/chat/{session_id}")
async def websocket_chat(
websocket: WebSocket,
session_id: str,
token: str = Query(default=None),
):
# Authenticate the connection
if not await verify_token(token):
await websocket.close(code=4001, reason="Unauthorized")
return
await manager.connect(session_id, websocket)
# Send connection confirmation
await manager.send_event(session_id, {
"type": "connected",
"session_id": session_id,
"timestamp": datetime.utcnow().isoformat(),
})
try:
while True:
# Receive message from client
data = await websocket.receive_json()
message_type = data.get("type", "message")
if message_type == "message":
user_text = data.get("content", "").strip()
if not user_text:
continue
# Store user message
manager.add_message(session_id, "user", user_text)
# Send typing indicator
await manager.send_event(session_id, {
"type": "typing",
"is_typing": True,
})
# Stream agent response
await stream_agent_response(session_id, user_text)
elif message_type == "ping":
await manager.send_event(session_id, {"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(session_id)
Streaming Agent Responses
The key function that bridges the agent SDK with WebSocket streaming:
async def stream_agent_response(session_id: str, user_input: str):
"""Run the agent and stream the response over WebSocket."""
# Build input with conversation history context
previous_response_id = manager.get_last_response_id(session_id)
full_response = ""
stream_id = None
try:
result = Runner.run_streamed(
support_agent,
input=user_input,
)
# Send start event
await manager.send_event(session_id, {
"type": "response_start",
})
async for event in result.stream_events():
# Handle different streaming event types
if event.type == "raw_response_event":
raw = event.data
# Text delta — stream to client
if hasattr(raw, "type") and raw.type == "response.output_text.delta":
chunk = raw.delta
full_response += chunk
await manager.send_event(session_id, {
"type": "text_delta",
"content": chunk,
})
# Get final result
final = await result.final_output
response_id = getattr(result, "last_response_id", None)
if response_id:
manager.set_last_response_id(session_id, response_id)
# Store assistant message
manager.add_message(session_id, "assistant", full_response)
# Send completion event
await manager.send_event(session_id, {
"type": "response_end",
"full_content": full_response,
})
except Exception as e:
await manager.send_event(session_id, {
"type": "error",
"message": "I encountered an issue. Please try again.",
})
finally:
# Clear typing indicator
await manager.send_event(session_id, {
"type": "typing",
"is_typing": False,
})
Response Chaining with previous_response_id
When using OpenAI's Responses API directly, previous_response_id links responses together so the model retains full conversation context without you resending all messages:
from openai import AsyncOpenAI
client = AsyncOpenAI()
class ConversationChain:
"""Manages chained responses for a single conversation."""
def __init__(self):
self.last_response_id: str | None = None
async def send_message(self, user_input: str) -> str:
params = {
"model": "gpt-4.1",
"input": user_input,
}
# Chain to previous response if one exists
if self.last_response_id:
params["previous_response_id"] = self.last_response_id
response = await client.responses.create(**params)
self.last_response_id = response.id
return response.output_text
# Usage across multiple turns
chain = ConversationChain()
reply1 = await chain.send_message("My name is Alex and I need help with billing.")
reply2 = await chain.send_message("What is my current plan?")
# The model remembers Alex's name and billing context from reply1
This approach is more efficient than resending the full conversation history because OpenAI caches previous responses server-side.
Client-Side WebSocket Handler
class ChatClient {
constructor(sessionId, token) {
this.sessionId = sessionId;
this.ws = new WebSocket(
`wss://api.example.com/ws/chat/${sessionId}?token=${token}`
);
this.messageBuffer = "";
this.setupHandlers();
}
setupHandlers() {
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "connected":
this.onConnected(data);
break;
case "typing":
this.onTyping(data.is_typing);
break;
case "text_delta":
this.messageBuffer += data.content;
this.onStreamChunk(this.messageBuffer);
break;
case "response_end":
this.messageBuffer = "";
this.onResponseComplete(data.full_content);
break;
case "error":
this.onError(data.message);
break;
}
};
this.ws.onclose = (event) => {
if (event.code !== 1000) {
// Abnormal close — attempt reconnect
setTimeout(() => this.reconnect(), 2000);
}
};
}
send(content) {
this.ws.send(JSON.stringify({
type: "message",
content: content,
}));
}
reconnect() {
// Reconnection logic with exponential backoff
this.ws = new WebSocket(
`wss://api.example.com/ws/chat/${this.sessionId}`
);
this.setupHandlers();
}
}
Heartbeat and Connection Health
WebSocket connections can silently die. Implement a heartbeat mechanism:
async def heartbeat(session_id: str, interval: int = 30):
"""Send periodic pings to keep the connection alive."""
while session_id in manager.active:
try:
await manager.send_event(session_id, {"type": "ping"})
await asyncio.sleep(interval)
except Exception:
manager.disconnect(session_id)
break
Start the heartbeat task when a connection is established:
await manager.connect(session_id, websocket)
heartbeat_task = asyncio.create_task(heartbeat(session_id))
try:
# ... main message loop
finally:
heartbeat_task.cancel()
manager.disconnect(session_id)
Scaling WebSocket Connections
For production deployments with multiple server instances, you need a pub/sub layer so messages reach the right server. Redis Pub/Sub works well:
import redis.asyncio as redis
class RedisPubSubBridge:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def publish(self, session_id: str, event: dict):
channel = f"chat:{session_id}"
await self.redis.publish(channel, json.dumps(event))
async def subscribe(self, session_id: str, callback):
pubsub = self.redis.pubsub()
await pubsub.subscribe(f"chat:{session_id}")
async for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
await callback(event)
This ensures that even if the WebSocket connection lands on server A but the agent processing happens on server B, the response still reaches the client.
Production Deployment Checklist
- TLS termination — Always use
wss://, neverws://in production - Connection limits — Set max connections per user to prevent resource exhaustion
- Message size limits — Reject messages over a reasonable size (e.g., 10KB)
- Authentication — Validate JWT tokens on connection, not just on the first message
- Graceful shutdown — Close all connections with code 1001 (Going Away) during deploys
- Monitoring — Track active connections, message throughput, and error rates
- Load balancing — Use sticky sessions or a pub/sub bridge for multi-instance deployments
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.