---
title: "Redis Sessions for Distributed Agent Deployments"
description: "Set up RedisSession in the OpenAI Agents SDK for distributed AI agent deployments with session sharing across instances, production configuration, and worker coordination."
canonical: https://callsphere.ai/blog/redis-sessions-distributed-agent-deployments
category: "Learn Agentic AI"
tags: ["OpenAI", "Redis", "Distributed", "Sessions", "Production"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-05-06T01:02:41.579Z
---

# Redis Sessions for Distributed Agent Deployments

> Set up RedisSession in the OpenAI Agents SDK for distributed AI agent deployments with session sharing across instances, production configuration, and worker coordination.

## The Problem with Single-Machine Sessions

SQLiteSession works beautifully for single-server deployments. But the moment you scale beyond one machine — running multiple API server replicas behind a load balancer, deploying workers across a Kubernetes cluster, or handling user sessions that might land on different servers — you need a shared session backend.

Redis is the natural choice. It is fast, widely deployed, supports TTL-based expiration, and is already part of most production stacks. The OpenAI Agents SDK provides `RedisSession` as a first-class integration.

## Installing the Redis Extension

RedisSession is an optional dependency. Install it with the redis extra:

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```bash
pip install openai-agents[redis]
```

This installs the `redis` async Python client alongside the agents SDK.

## RedisSession Setup

### Basic Configuration with from_url()

The simplest way to create a RedisSession is with a Redis URL:

```python
from agents.extensions.sessions import RedisSession

# Connect to local Redis
session = RedisSession.from_url("redis://localhost:6379/0")

# Connect to remote Redis with password
session = RedisSession.from_url("redis://:mypassword@redis.example.com:6379/0")

# Connect with TLS (Redis Cloud, AWS ElastiCache)
session = RedisSession.from_url("rediss://:mypassword@redis.example.com:6380/0")
```

The `from_url()` factory parses the URL and creates the underlying async Redis client automatically.

### Using an Existing Redis Client

If your application already has a Redis connection pool, you can pass the client directly:

```python
import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession

# Reuse existing Redis client
redis_client = aioredis.from_url(
    "redis://localhost:6379/0",
    max_connections=20,
    decode_responses=False,
)

session = RedisSession(client=redis_client)
```

This avoids creating duplicate connections and lets you share the pool with other parts of your application (caching, pub/sub, rate limiting).

## Running with a Distributed Agent

Here is a complete example showing a Redis-backed agent that can be served from any worker:

```python
import asyncio
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession

REDIS_URL = "redis://redis.internal:6379/0"
session = RedisSession.from_url(REDIS_URL)

support_agent = Agent(
    name="SupportAgent",
    instructions="""You are a customer support agent. You have access to
    conversation history to provide contextual responses. Always reference
    previous interactions when relevant.""",
)

async def handle_request(session_id: str, message: str) -> str:
    """Handle a support request — can run on any worker."""
    result = await Runner.run(
        support_agent,
        message,
        session=session,
        session_id=session_id,
    )
    return result.final_output
```

Any worker that calls `handle_request` with the same `session_id` gets the same conversation history, regardless of which server handled the previous request.

## Distributed Worker Scenarios

### Scenario 1: Load-Balanced API Servers

Multiple FastAPI instances behind a reverse proxy all share the same Redis session backend.

```python
from fastapi import FastAPI, Request
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession

app = FastAPI()
session = RedisSession.from_url("redis://redis:6379/0")

agent = Agent(name="API Agent", instructions="You are a helpful assistant.")

@app.post("/chat")
async def chat(request: Request):
    body = await request.json()
    session_id = body["session_id"]
    message = body["message"]

    result = await Runner.run(
        agent, message, session=session, session_id=session_id
    )
    return {"response": result.final_output}
```

Deploy this behind nginx or a Kubernetes service and requests distribute across replicas. The user experience is seamless because every replica reads from and writes to the same Redis instance.

### Scenario 2: Background Worker Processing

A web server enqueues agent tasks, and background workers process them asynchronously — both sharing session state.

```python
# worker.py — runs as a separate process
import asyncio
from agents import Agent, Runner
from agents.extensions.sessions import RedisSession

session = RedisSession.from_url("redis://redis:6379/0")

analysis_agent = Agent(
    name="AnalysisAgent",
    instructions="Analyze the data provided and generate insights.",
)

async def process_job(job: dict):
    """Background worker picks up jobs from a queue."""
    result = await Runner.run(
        analysis_agent,
        job["message"],
        session=session,
        session_id=job["session_id"],
    )
    # Store result back in Redis or a database
    return result.final_output
```

### Scenario 3: Multi-Region Deployment

For global deployments, use Redis Cluster or a managed service like AWS ElastiCache Global Datastore:

```python
import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession

# Redis Cluster for multi-region
redis_client = aioredis.RedisCluster.from_url(
    "redis://redis-cluster.example.com:6379",
    decode_responses=False,
)

session = RedisSession(client=redis_client)
```

## Session Sharing Across Instances

The key to session sharing is consistent session IDs. If two workers use the same `session_id`, they see the same history. This means your session ID scheme matters.

```python
def generate_session_id(user_id: str, conversation_id: str) -> str:
    """Deterministic session ID from user and conversation."""
    return f"session:{user_id}:{conversation_id}"

# Worker A handles turn 1
await Runner.run(agent, "Hello", session=session, session_id="session:usr_1:conv_1")

# Worker B handles turn 2 — sees turn 1 in history
await Runner.run(agent, "Follow up", session=session, session_id="session:usr_1:conv_1")
```

### Key Prefix and Namespace Strategy

In production, use prefixes to avoid key collisions with other Redis data:

```python
session = RedisSession.from_url(
    "redis://redis:6379/0",
    key_prefix="myapp:agent_sessions:",
)
```

Now session keys are stored as `myapp:agent_sessions:{session_id}` — cleanly namespaced and easy to monitor or purge.

## Production Redis Configuration

### Connection Pooling

```python
import redis.asyncio as aioredis
from agents.extensions.sessions import RedisSession

redis_client = aioredis.from_url(
    "redis://redis:6379/0",
    max_connections=50,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True,
    decode_responses=False,
)

session = RedisSession(client=redis_client)
```

### Health Checks

Verify Redis connectivity at startup:

```python
async def check_redis_health(session: RedisSession):
    try:
        await session.client.ping()
        print("Redis connection healthy")
    except Exception as e:
        print(f"Redis connection failed: {e}")
        raise SystemExit(1)
```

### TTL for Session Expiry

Redis supports TTL natively. Set expiration on session keys to automatically clean up stale conversations:

```python
# After storing a session, set TTL
await session.client.expire(
    f"myapp:agent_sessions:{session_id}",
    60 * 60 * 24 * 7  # 7 days
)
```

### Monitoring

Monitor your session keys with Redis CLI or a dashboard:

```bash
# Count active sessions
redis-cli KEYS "myapp:agent_sessions:*" | wc -l

# Check memory usage for a session
redis-cli MEMORY USAGE "myapp:agent_sessions:session:usr_1:conv_1"
```

## Redis vs SQLite: When to Choose Each

| Factor | SQLiteSession | RedisSession |
| --- | --- | --- |
| Deployment | Single server | Multi-server / distributed |
| Latency | Disk I/O | Sub-millisecond (in-memory) |
| Persistence | Automatic (file) | Requires AOF/RDB config |
| Scaling | Vertical only | Horizontal with clustering |
| Dependencies | None (built-in) | Redis server required |
| TTL support | Manual cleanup | Native |

For most production web applications, RedisSession is the right default. It adds minimal complexity — you likely already run Redis — and it unlocks horizontal scaling from day one.

**Sources:**

- [https://openai.github.io/openai-agents-python/sessions/](https://openai.github.io/openai-agents-python/sessions/)
- [https://redis.io/docs/latest/develop/clients/redis-py/](https://redis.io/docs/latest/develop/clients/redis-py/)

---

Source: https://callsphere.ai/blog/redis-sessions-distributed-agent-deployments
