Skip to content
Learn Agentic AI
Learn Agentic AI9 min read4 views

Redis Sessions for Distributed Agent Deployments

Set up RedisSession in the OpenAI Agents SDK for distributed AI agent deployments with session sharing across instances, production configuration, and worker coordination.

The Problem with Single-Machine Sessions

SQLiteSession works beautifully for single-server deployments. But the moment you scale beyond one machine — running multiple API server replicas behind a load balancer, deploying workers across a Kubernetes cluster, or handling user sessions that might land on different servers — you need a shared session backend.

Redis is the natural choice. It is fast, widely deployed, supports TTL-based expiration, and is already part of most production stacks. The OpenAI Agents SDK provides RedisSession as a first-class integration.

Installing the Redis Extension

RedisSession is an optional dependency. Install it with the redis extra:

flowchart TD
    START["Redis Sessions for Distributed Agent Deployments"] --> A
    A["The Problem with Single-Machine Sessions"]
    A --> B
    B["Installing the Redis Extension"]
    B --> C
    C["RedisSession Setup"]
    C --> D
    D["Running with a Distributed Agent"]
    D --> E
    E["Distributed Worker Scenarios"]
    E --> F
    F["Session Sharing Across Instances"]
    F --> G
    G["Production Redis Configuration"]
    G --> H
    H["Redis vs SQLite: When to Choose Each"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
pip install openai-agents[redis]

This installs the redis async Python client alongside the agents SDK.

RedisSession Setup

Basic Configuration with from_url()

The simplest way to create a RedisSession is with a Redis URL:

from agents.extensions.sessions import RedisSession

# Connect to local Redis
session = RedisSession.from_url("redis://localhost:6379/0")

# Connect to remote Redis with password
session = RedisSession.from_url("redis://:[email protected]:6379/0")

# Connect with TLS (Redis Cloud, AWS ElastiCache)
session = RedisSession.from_url("rediss://:[email protected]:6380/0")

The from_url() factory parses the URL and creates the underlying async Redis client automatically.

Using an Existing Redis Client

If your application already has a Redis connection pool, you can pass the client directly:

import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession

# Reuse existing Redis client
redis_client = aioredis.from_url(
    "redis://localhost:6379/0",
    max_connections=20,
    decode_responses=False,
)

session = RedisSession(client=redis_client)

This avoids creating duplicate connections and lets you share the pool with other parts of your application (caching, pub/sub, rate limiting).

Running with a Distributed Agent

Here is a complete example showing a Redis-backed agent that can be served from any worker:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

flowchart TD
    ROOT["Redis Sessions for Distributed Agent Deploym…"] 
    ROOT --> P0["RedisSession Setup"]
    P0 --> P0C0["Basic Configuration with from_url"]
    P0 --> P0C1["Using an Existing Redis Client"]
    ROOT --> P1["Distributed Worker Scenarios"]
    P1 --> P1C0["Scenario 1: Load-Balanced API Servers"]
    P1 --> P1C1["Scenario 2: Background Worker Processing"]
    P1 --> P1C2["Scenario 3: Multi-Region Deployment"]
    ROOT --> P2["Session Sharing Across Instances"]
    P2 --> P2C0["Key Prefix and Namespace Strategy"]
    ROOT --> P3["Production Redis Configuration"]
    P3 --> P3C0["Connection Pooling"]
    P3 --> P3C1["Health Checks"]
    P3 --> P3C2["TTL for Session Expiry"]
    P3 --> P3C3["Monitoring"]
    style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
    style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P3 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
import asyncio
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession

REDIS_URL = "redis://redis.internal:6379/0"
session = RedisSession.from_url(REDIS_URL)

support_agent = Agent(
    name="SupportAgent",
    instructions="""You are a customer support agent. You have access to
    conversation history to provide contextual responses. Always reference
    previous interactions when relevant.""",
)

async def handle_request(session_id: str, message: str) -> str:
    """Handle a support request — can run on any worker."""
    result = await Runner.run(
        support_agent,
        message,
        session=session,
        session_id=session_id,
    )
    return result.final_output

Any worker that calls handle_request with the same session_id gets the same conversation history, regardless of which server handled the previous request.

Distributed Worker Scenarios

Scenario 1: Load-Balanced API Servers

Multiple FastAPI instances behind a reverse proxy all share the same Redis session backend.

from fastapi import FastAPI, Request
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession

app = FastAPI()
session = RedisSession.from_url("redis://redis:6379/0")

agent = Agent(name="API Agent", instructions="You are a helpful assistant.")

@app.post("/chat")
async def chat(request: Request):
    body = await request.json()
    session_id = body["session_id"]
    message = body["message"]

    result = await Runner.run(
        agent, message, session=session, session_id=session_id
    )
    return {"response": result.final_output}

Deploy this behind nginx or a Kubernetes service and requests distribute across replicas. The user experience is seamless because every replica reads from and writes to the same Redis instance.

Scenario 2: Background Worker Processing

A web server enqueues agent tasks, and background workers process them asynchronously — both sharing session state.

# worker.py — runs as a separate process
import asyncio
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession

session = RedisSession.from_url("redis://redis:6379/0")

analysis_agent = Agent(
    name="AnalysisAgent",
    instructions="Analyze the data provided and generate insights.",
)

async def process_job(job: dict):
    """Background worker picks up jobs from a queue."""
    result = await Runner.run(
        analysis_agent,
        job["message"],
        session=session,
        session_id=job["session_id"],
    )
    # Store result back in Redis or a database
    return result.final_output

Scenario 3: Multi-Region Deployment

For global deployments, use Redis Cluster or a managed service like AWS ElastiCache Global Datastore:

import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession

# Redis Cluster for multi-region
redis_client = aioredis.RedisCluster.from_url(
    "redis://redis-cluster.example.com:6379",
    decode_responses=False,
)

session = RedisSession(client=redis_client)

Session Sharing Across Instances

The key to session sharing is consistent session IDs. If two workers use the same session_id, they see the same history. This means your session ID scheme matters.

def generate_session_id(user_id: str, conversation_id: str) -> str:
    """Deterministic session ID from user and conversation."""
    return f"session:{user_id}:{conversation_id}"

# Worker A handles turn 1
await Runner.run(agent, "Hello", session=session, session_id="session:usr_1:conv_1")

# Worker B handles turn 2 — sees turn 1 in history
await Runner.run(agent, "Follow up", session=session, session_id="session:usr_1:conv_1")

Key Prefix and Namespace Strategy

In production, use prefixes to avoid key collisions with other Redis data:

session = RedisSession.from_url(
    "redis://redis:6379/0",
    key_prefix="myapp:agent_sessions:",
)

Now session keys are stored as myapp:agent_sessions:{session_id} — cleanly namespaced and easy to monitor or purge.

Production Redis Configuration

Connection Pooling

import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession

redis_client = aioredis.from_url(
    "redis://redis:6379/0",
    max_connections=50,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True,
    decode_responses=False,
)

session = RedisSession(client=redis_client)

Health Checks

Verify Redis connectivity at startup:

async def check_redis_health(session: RedisSession):
    try:
        await session.client.ping()
        print("Redis connection healthy")
    except Exception as e:
        print(f"Redis connection failed: {e}")
        raise SystemExit(1)

TTL for Session Expiry

Redis supports TTL natively. Set expiration on session keys to automatically clean up stale conversations:

# After storing a session, set TTL
await session.client.expire(
    f"myapp:agent_sessions:{session_id}",
    60 * 60 * 24 * 7  # 7 days
)

Monitoring

Monitor your session keys with Redis CLI or a dashboard:

# Count active sessions
redis-cli KEYS "myapp:agent_sessions:*" | wc -l

# Check memory usage for a session
redis-cli MEMORY USAGE "myapp:agent_sessions:session:usr_1:conv_1"

Redis vs SQLite: When to Choose Each

Factor SQLiteSession RedisSession
Deployment Single server Multi-server / distributed
Latency Disk I/O Sub-millisecond (in-memory)
Persistence Automatic (file) Requires AOF/RDB config
Scaling Vertical only Horizontal with clustering
Dependencies None (built-in) Redis server required
TTL support Manual cleanup Native

For most production web applications, RedisSession is the right default. It adds minimal complexity — you likely already run Redis — and it unlocks horizontal scaling from day one.

Sources:

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like