Redis Sessions for Distributed Agent Deployments
Set up RedisSession in the OpenAI Agents SDK for distributed AI agent deployments with session sharing across instances, production configuration, and worker coordination.
The Problem with Single-Machine Sessions
SQLiteSession works beautifully for single-server deployments. But the moment you scale beyond one machine — running multiple API server replicas behind a load balancer, deploying workers across a Kubernetes cluster, or handling user sessions that might land on different servers — you need a shared session backend.
Redis is the natural choice. It is fast, widely deployed, supports TTL-based expiration, and is already part of most production stacks. The OpenAI Agents SDK provides RedisSession as a first-class integration.
Installing the Redis Extension
RedisSession is an optional dependency. Install it with the redis extra:
flowchart TD
START["Redis Sessions for Distributed Agent Deployments"] --> A
A["The Problem with Single-Machine Sessions"]
A --> B
B["Installing the Redis Extension"]
B --> C
C["RedisSession Setup"]
C --> D
D["Running with a Distributed Agent"]
D --> E
E["Distributed Worker Scenarios"]
E --> F
F["Session Sharing Across Instances"]
F --> G
G["Production Redis Configuration"]
G --> H
H["Redis vs SQLite: When to Choose Each"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
pip install openai-agents[redis]
This installs the redis async Python client alongside the agents SDK.
RedisSession Setup
Basic Configuration with from_url()
The simplest way to create a RedisSession is with a Redis URL:
from agents.extensions.sessions import RedisSession
# Connect to local Redis
session = RedisSession.from_url("redis://localhost:6379/0")
# Connect to remote Redis with password
session = RedisSession.from_url("redis://:[email protected]:6379/0")
# Connect with TLS (Redis Cloud, AWS ElastiCache)
session = RedisSession.from_url("rediss://:[email protected]:6380/0")
The from_url() factory parses the URL and creates the underlying async Redis client automatically.
Using an Existing Redis Client
If your application already has a Redis connection pool, you can pass the client directly:
import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession
# Reuse existing Redis client
redis_client = aioredis.from_url(
"redis://localhost:6379/0",
max_connections=20,
decode_responses=False,
)
session = RedisSession(client=redis_client)
This avoids creating duplicate connections and lets you share the pool with other parts of your application (caching, pub/sub, rate limiting).
Running with a Distributed Agent
Here is a complete example showing a Redis-backed agent that can be served from any worker:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
flowchart TD
ROOT["Redis Sessions for Distributed Agent Deploym…"]
ROOT --> P0["RedisSession Setup"]
P0 --> P0C0["Basic Configuration with from_url"]
P0 --> P0C1["Using an Existing Redis Client"]
ROOT --> P1["Distributed Worker Scenarios"]
P1 --> P1C0["Scenario 1: Load-Balanced API Servers"]
P1 --> P1C1["Scenario 2: Background Worker Processing"]
P1 --> P1C2["Scenario 3: Multi-Region Deployment"]
ROOT --> P2["Session Sharing Across Instances"]
P2 --> P2C0["Key Prefix and Namespace Strategy"]
ROOT --> P3["Production Redis Configuration"]
P3 --> P3C0["Connection Pooling"]
P3 --> P3C1["Health Checks"]
P3 --> P3C2["TTL for Session Expiry"]
P3 --> P3C3["Monitoring"]
style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P3 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
import asyncio
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession
REDIS_URL = "redis://redis.internal:6379/0"
session = RedisSession.from_url(REDIS_URL)
support_agent = Agent(
name="SupportAgent",
instructions="""You are a customer support agent. You have access to
conversation history to provide contextual responses. Always reference
previous interactions when relevant.""",
)
async def handle_request(session_id: str, message: str) -> str:
"""Handle a support request — can run on any worker."""
result = await Runner.run(
support_agent,
message,
session=session,
session_id=session_id,
)
return result.final_output
Any worker that calls handle_request with the same session_id gets the same conversation history, regardless of which server handled the previous request.
Distributed Worker Scenarios
Scenario 1: Load-Balanced API Servers
Multiple FastAPI instances behind a reverse proxy all share the same Redis session backend.
from fastapi import FastAPI, Request
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession
app = FastAPI()
session = RedisSession.from_url("redis://redis:6379/0")
agent = Agent(name="API Agent", instructions="You are a helpful assistant.")
@app.post("/chat")
async def chat(request: Request):
body = await request.json()
session_id = body["session_id"]
message = body["message"]
result = await Runner.run(
agent, message, session=session, session_id=session_id
)
return {"response": result.final_output}
Deploy this behind nginx or a Kubernetes service and requests distribute across replicas. The user experience is seamless because every replica reads from and writes to the same Redis instance.
Scenario 2: Background Worker Processing
A web server enqueues agent tasks, and background workers process them asynchronously — both sharing session state.
# worker.py — runs as a separate process
import asyncio
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession
session = RedisSession.from_url("redis://redis:6379/0")
analysis_agent = Agent(
name="AnalysisAgent",
instructions="Analyze the data provided and generate insights.",
)
async def process_job(job: dict):
"""Background worker picks up jobs from a queue."""
result = await Runner.run(
analysis_agent,
job["message"],
session=session,
session_id=job["session_id"],
)
# Store result back in Redis or a database
return result.final_output
Scenario 3: Multi-Region Deployment
For global deployments, use Redis Cluster or a managed service like AWS ElastiCache Global Datastore:
import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession
# Redis Cluster for multi-region
redis_client = aioredis.RedisCluster.from_url(
"redis://redis-cluster.example.com:6379",
decode_responses=False,
)
session = RedisSession(client=redis_client)
Session Sharing Across Instances
The key to session sharing is consistent session IDs. If two workers use the same session_id, they see the same history. This means your session ID scheme matters.
def generate_session_id(user_id: str, conversation_id: str) -> str:
"""Deterministic session ID from user and conversation."""
return f"session:{user_id}:{conversation_id}"
# Worker A handles turn 1
await Runner.run(agent, "Hello", session=session, session_id="session:usr_1:conv_1")
# Worker B handles turn 2 — sees turn 1 in history
await Runner.run(agent, "Follow up", session=session, session_id="session:usr_1:conv_1")
Key Prefix and Namespace Strategy
In production, use prefixes to avoid key collisions with other Redis data:
session = RedisSession.from_url(
"redis://redis:6379/0",
key_prefix="myapp:agent_sessions:",
)
Now session keys are stored as myapp:agent_sessions:{session_id} — cleanly namespaced and easy to monitor or purge.
Production Redis Configuration
Connection Pooling
import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession
redis_client = aioredis.from_url(
"redis://redis:6379/0",
max_connections=50,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
decode_responses=False,
)
session = RedisSession(client=redis_client)
Health Checks
Verify Redis connectivity at startup:
async def check_redis_health(session: RedisSession):
try:
await session.client.ping()
print("Redis connection healthy")
except Exception as e:
print(f"Redis connection failed: {e}")
raise SystemExit(1)
TTL for Session Expiry
Redis supports TTL natively. Set expiration on session keys to automatically clean up stale conversations:
# After storing a session, set TTL
await session.client.expire(
f"myapp:agent_sessions:{session_id}",
60 * 60 * 24 * 7 # 7 days
)
Monitoring
Monitor your session keys with Redis CLI or a dashboard:
# Count active sessions
redis-cli KEYS "myapp:agent_sessions:*" | wc -l
# Check memory usage for a session
redis-cli MEMORY USAGE "myapp:agent_sessions:session:usr_1:conv_1"
Redis vs SQLite: When to Choose Each
| Factor | SQLiteSession | RedisSession |
|---|---|---|
| Deployment | Single server | Multi-server / distributed |
| Latency | Disk I/O | Sub-millisecond (in-memory) |
| Persistence | Automatic (file) | Requires AOF/RDB config |
| Scaling | Vertical only | Horizontal with clustering |
| Dependencies | None (built-in) | Redis server required |
| TTL support | Manual cleanup | Native |
For most production web applications, RedisSession is the right default. It adds minimal complexity — you likely already run Redis — and it unlocks horizontal scaling from day one.
Sources:
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.