Skip to content
Technology
Technology10 min read7 views

Building Agentic AI with Redis: Caching, Sessions, and Pub/Sub Patterns

Master Redis patterns for agentic AI including LLM response caching, conversation sessions, pub/sub for real-time events, and agent performance optimization.

Redis as the Real-Time Backbone of Agentic AI

Every production agentic AI system has a need for speed that relational databases cannot satisfy. Sub-millisecond session lookups for active conversations. Instant cache hits for repeated LLM queries. Real-time event propagation when an agent completes a tool call. Atomic counters for rate limiting. Leaderboards for agent performance tracking.

Redis fills all these roles. It is the Swiss Army knife of agentic AI infrastructure — not the primary data store (that remains PostgreSQL), but the performance layer that makes the difference between a sluggish agent and one that responds in under a second.

At CallSphere, Redis handles session management, LLM response caching, real-time event distribution, and operational metrics across all our agent deployments. This guide covers the Redis patterns we use in production.

Pattern 1: LLM Response Caching

LLM API calls are the single most expensive operation in an agentic AI system — expensive in both latency (1-10 seconds) and cost (dollars per thousand calls). Caching identical or near-identical requests saves both.

flowchart TD
    START["Building Agentic AI with Redis: Caching, Sessions…"] --> A
    A["Redis as the Real-Time Backbone of Agen…"]
    A --> B
    B["Pattern 1: LLM Response Caching"]
    B --> C
    C["Pattern 2: Conversation Session Storage"]
    C --> D
    D["Pattern 3: Pub/Sub for Real-Time Agent …"]
    D --> E
    E["Pattern 4: Sorted Sets for Agent Perfor…"]
    E --> F
    F["Pattern 5: Streams for Agent Audit Logs"]
    F --> G
    G["Redis Connection Management"]
    G --> H
    H["Frequently Asked Questions"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff

Exact Match Caching

The simplest and most effective caching strategy: hash the prompt and cache the response.

import hashlib
import json
import redis.asyncio as redis

class LLMCache:
    def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
        self.redis = redis_client
        self.default_ttl = default_ttl

    def _cache_key(self, model: str, messages: list, tools: list = None) -> str:
        """Generate a deterministic cache key from the request."""
        payload = {
            "model": model,
            "messages": messages,
            "tools": sorted(tools or [], key=lambda t: t["name"]) if tools else [],
        }
        content = json.dumps(payload, sort_keys=True)
        return f"llm:cache:{hashlib.sha256(content.encode()).hexdigest()}"

    async def get(self, model: str, messages: list, tools: list = None) -> dict | None:
        key = self._cache_key(model, messages, tools)
        cached = await self.redis.get(key)
        if cached:
            await self.redis.hincrby("llm:cache:stats", "hits", 1)
            return json.loads(cached)
        await self.redis.hincrby("llm:cache:stats", "misses", 1)
        return None

    async def set(self, model: str, messages: list, response: dict,
                  tools: list = None, ttl: int = None):
        key = self._cache_key(model, messages, tools)
        await self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(response),
        )

    async def get_hit_rate(self) -> float:
        stats = await self.redis.hgetall("llm:cache:stats")
        hits = int(stats.get(b"hits", 0))
        misses = int(stats.get(b"misses", 0))
        total = hits + misses
        return hits / total if total > 0 else 0.0

When to Cache (and When Not To)

Cache This Do Not Cache This
Classification/routing decisions Conversations with user-specific context
Tool parameter extraction from templates Creative or generative responses
FAQ-style questions Time-sensitive queries (weather, stock prices)
System prompt + fixed input combinations Multi-turn conversations
Embedding generation for static content Responses that include PII

Semantic Caching with Embeddings

For near-identical queries (e.g., "What are your business hours?" and "When are you open?"), use embedding similarity to find cached responses:

import numpy as np

class SemanticLLMCache:
    def __init__(self, redis_client, embedding_client, similarity_threshold=0.95):
        self.redis = redis_client
        self.embedder = embedding_client
        self.threshold = similarity_threshold

    async def get_semantic(self, query: str) -> dict | None:
        query_embedding = await self.embedder.embed(query)

        # Search cached embeddings (stored in Redis sorted set by timestamp)
        cached_keys = await self.redis.zrevrangebyscore(
            "llm:semantic_cache:keys", "+inf", "-inf", start=0, num=100
        )

        for key in cached_keys:
            cached_embedding = await self.redis.get(f"llm:semantic_cache:emb:{key.decode()}")
            if cached_embedding:
                cached_vec = np.frombuffer(cached_embedding, dtype=np.float32)
                similarity = np.dot(query_embedding, cached_vec) / (
                    np.linalg.norm(query_embedding) * np.linalg.norm(cached_vec)
                )
                if similarity >= self.threshold:
                    response = await self.redis.get(f"llm:semantic_cache:resp:{key.decode()}")
                    if response:
                        return json.loads(response)
        return None

Pattern 2: Conversation Session Storage

Active conversations need fast read/write access to session state. Redis hashes map naturally to conversation sessions.

class ConversationSession:
    PREFIX = "session:conv"

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def create(self, conversation_id: str, initial_state: dict) -> None:
        key = f"{self.PREFIX}:{conversation_id}"
        pipe = self.redis.pipeline()
        pipe.hset(key, mapping={
            "tenant_id": initial_state["tenant_id"],
            "current_agent": initial_state.get("current_agent", "triage"),
            "status": "active",
            "turn_count": 0,
            "token_count": 0,
            "started_at": datetime.utcnow().isoformat(),
            "context": json.dumps(initial_state.get("context", {})),
            "messages": json.dumps([]),
        })
        # Auto-expire after 2 hours of inactivity
        pipe.expire(key, 7200)
        await pipe.execute()

    async def add_message(self, conversation_id: str, role: str, content: str,
                          tokens: int = 0) -> None:
        key = f"{self.PREFIX}:{conversation_id}"
        pipe = self.redis.pipeline()

        # Append message to the messages list
        messages_raw = await self.redis.hget(key, "messages")
        messages = json.loads(messages_raw) if messages_raw else []
        messages.append({"role": role, "content": content, "ts": datetime.utcnow().isoformat()})

        # Keep only last 50 messages in session (full history in PostgreSQL)
        if len(messages) > 50:
            messages = messages[-50:]

        pipe.hset(key, mapping={
            "messages": json.dumps(messages),
            "last_activity": datetime.utcnow().isoformat(),
        })
        pipe.hincrby(key, "turn_count", 1)
        pipe.hincrby(key, "token_count", tokens)
        pipe.expire(key, 7200)  # Reset TTL on activity
        await pipe.execute()

    async def get(self, conversation_id: str) -> dict | None:
        key = f"{self.PREFIX}:{conversation_id}"
        data = await self.redis.hgetall(key)
        if not data:
            return None
        result = {k.decode(): v.decode() for k, v in data.items()}
        result["messages"] = json.loads(result.get("messages", "[]"))
        result["context"] = json.loads(result.get("context", "{}"))
        return result

    async def update_agent(self, conversation_id: str, agent_name: str) -> None:
        key = f"{self.PREFIX}:{conversation_id}"
        await self.redis.hset(key, "current_agent", agent_name)

Why Redis for Sessions Instead of PostgreSQL?

The conversation session is read and updated on every single message. That is 2-10 database operations per user turn. With Redis, each operation takes under 1ms. With PostgreSQL, each operation takes 2-10ms and holds a connection from the pool. At 1000 concurrent conversations, that difference determines whether your system stays responsive or starts queuing.

The pattern is: Redis for the hot session, PostgreSQL for the cold record. When a conversation completes, flush the session data to PostgreSQL for long-term storage and analytics.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Pattern 3: Pub/Sub for Real-Time Agent Events

When an agent finishes processing, completes a tool call, or hands off a conversation, other system components need to know immediately. Redis Pub/Sub provides fire-and-forget event distribution with negligible latency.

flowchart TD
    ROOT["Building Agentic AI with Redis: Caching, Ses…"] 
    ROOT --> P0["Pattern 1: LLM Response Caching"]
    P0 --> P0C0["Exact Match Caching"]
    P0 --> P0C1["When to Cache and When Not To"]
    P0 --> P0C2["Semantic Caching with Embeddings"]
    ROOT --> P1["Pattern 2: Conversation Session Storage"]
    P1 --> P1C0["Why Redis for Sessions Instead of Postg…"]
    ROOT --> P2["Pattern 3: Pub/Sub for Real-Time Agent …"]
    P2 --> P2C0["Pub/Sub vs. Streams for Agent Events"]
    ROOT --> P3["Redis Connection Management"]
    P3 --> P3C0["Connection Pooling"]
    P3 --> P3C1["Redis Cluster for Scale"]
    style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
    style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P3 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
class AgentEventBus:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.pubsub = self.redis.pubsub()
        self.handlers = {}

    async def publish(self, event_type: str, payload: dict) -> None:
        message = json.dumps({
            "event": event_type,
            "payload": payload,
            "timestamp": datetime.utcnow().isoformat(),
        })
        await self.redis.publish(f"agent:events:{event_type}", message)

    async def subscribe(self, event_type: str, handler) -> None:
        self.handlers[event_type] = handler
        await self.pubsub.subscribe(f"agent:events:{event_type}")

    async def listen(self) -> None:
        async for message in self.pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                handler = self.handlers.get(data["event"])
                if handler:
                    await handler(data["payload"])

# Usage
event_bus = AgentEventBus(redis_client)

# Frontend WebSocket server subscribes to conversation events
await event_bus.subscribe("agent.response.streaming", send_to_websocket)
await event_bus.subscribe("agent.tool.started", update_ui_status)
await event_bus.subscribe("agent.handoff.completed", update_ui_agent)

# Agent publishes events during processing
await event_bus.publish("agent.response.streaming", {
    "conversation_id": conv_id,
    "chunk": "Let me look up your invoice...",
    "agent": "billing_agent",
})

await event_bus.publish("agent.tool.started", {
    "conversation_id": conv_id,
    "tool": "lookup_invoice",
    "agent": "billing_agent",
})

Pub/Sub vs. Streams for Agent Events

Redis Pub/Sub is fire-and-forget — if no subscriber is listening, the message is lost. For UI updates and real-time notifications, this is fine. For events that must not be lost (tool execution requests, handoff commands), use Redis Streams instead:

class DurableAgentEventBus:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def publish(self, stream: str, event: dict) -> str:
        event_id = await self.redis.xadd(stream, event, maxlen=10000)
        return event_id

    async def consume(self, stream: str, group: str, consumer: str):
        # Create consumer group if not exists
        try:
            await self.redis.xgroup_create(stream, group, id="0", mkstream=True)
        except redis.ResponseError:
            pass  # Group already exists

        while True:
            messages = await self.redis.xreadgroup(
                group, consumer, {stream: ">"}, count=10, block=5000
            )
            for _, entries in messages:
                for msg_id, data in entries:
                    yield msg_id, data
                    await self.redis.xack(stream, group, msg_id)

Pattern 4: Sorted Sets for Agent Performance Leaderboards

Track and rank agent performance metrics using Redis sorted sets. This is useful for operational dashboards and for identifying which agents need prompt optimization.

class AgentLeaderboard:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def record_resolution(self, agent_name: str, resolution_time_seconds: float,
                                 was_successful: bool) -> None:
        today = datetime.utcnow().strftime("%Y-%m-%d")
        pipe = self.redis.pipeline()

        # Track successful resolutions
        if was_successful:
            pipe.zincrby(f"leaderboard:resolutions:{today}", 1, agent_name)

        # Track average resolution time (using two sorted sets)
        pipe.zincrby(f"leaderboard:total_time:{today}", resolution_time_seconds, agent_name)
        pipe.zincrby(f"leaderboard:total_count:{today}", 1, agent_name)

        # Expire after 30 days
        pipe.expire(f"leaderboard:resolutions:{today}", 2592000)
        pipe.expire(f"leaderboard:total_time:{today}", 2592000)
        pipe.expire(f"leaderboard:total_count:{today}", 2592000)
        await pipe.execute()

    async def get_top_agents(self, date: str, limit: int = 10) -> list:
        agents = await self.redis.zrevrangebyscore(
            f"leaderboard:resolutions:{date}", "+inf", "-inf",
            start=0, num=limit, withscores=True,
        )
        results = []
        for agent_name, resolution_count in agents:
            name = agent_name.decode()
            total_time = await self.redis.zscore(f"leaderboard:total_time:{date}", name) or 0
            total_count = await self.redis.zscore(f"leaderboard:total_count:{date}", name) or 1
            avg_time = total_time / total_count
            results.append({
                "agent": name,
                "resolutions": int(resolution_count),
                "avg_resolution_time_seconds": round(avg_time, 1),
            })
        return results

Pattern 5: Streams for Agent Audit Logs

Every agent action needs an immutable audit trail. Redis Streams provide an append-only log with consumer group support, perfect for capturing agent decisions in real time and processing them asynchronously.

class AgentAuditLog:
    STREAM_KEY = "agent:audit:log"

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def log_action(self, conversation_id: str, agent_name: str,
                          action: str, details: dict) -> str:
        entry = {
            "conversation_id": conversation_id,
            "agent": agent_name,
            "action": action,
            "details": json.dumps(details),
            "timestamp": datetime.utcnow().isoformat(),
        }
        msg_id = await self.redis.xadd(self.STREAM_KEY, entry, maxlen=100000)
        return msg_id

    async def get_conversation_audit(self, conversation_id: str, count: int = 100) -> list:
        """Get audit entries for a specific conversation (scan approach)."""
        entries = []
        last_id = "0"
        while True:
            results = await self.redis.xrange(self.STREAM_KEY, min=last_id, count=500)
            if not results:
                break
            for msg_id, data in results:
                if data.get(b"conversation_id", b"").decode() == conversation_id:
                    entries.append({
                        "id": msg_id,
                        "agent": data[b"agent"].decode(),
                        "action": data[b"action"].decode(),
                        "details": json.loads(data[b"details"]),
                        "timestamp": data[b"timestamp"].decode(),
                    })
                last_id = msg_id
            if len(results) < 500:
                break
        return entries[:count]

Redis Connection Management

Connection Pooling

Never create a new Redis connection per request. Use a connection pool:

import redis.asyncio as redis

# Create a shared connection pool
redis_pool = redis.ConnectionPool.from_url(
    "redis://redis:6379/0",
    max_connections=50,
    decode_responses=False,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True,
)

redis_client = redis.Redis(connection_pool=redis_pool)

Redis Cluster for Scale

When a single Redis instance is insufficient (typically above 100K concurrent conversations), use Redis Cluster to shard data across multiple nodes. Design your key naming to support hash tags for related keys:

# Keys for the same conversation use hash tag to ensure
# they land on the same shard
session_key = f"session:conv:{{{conversation_id}}}"
messages_key = f"messages:conv:{{{conversation_id}}}"
context_key = f"context:conv:{{{conversation_id}}}"
# The {conversation_id} hash tag ensures co-location

Frequently Asked Questions

How much memory does Redis need for agent sessions?

A typical conversation session with 50 messages consumes 10-50KB in Redis. With 10,000 concurrent conversations, that is 100-500MB. Add LLM response cache (typically 1-5GB depending on cache size and TTL) and operational counters (negligible). Plan for 4-8GB of Redis memory for a medium-scale agent deployment.

Should I use Redis or Memcached for LLM response caching?

Redis. Memcached is faster for simple key-value lookups but lacks the data structures (hashes, sorted sets, streams, pub/sub) that agent systems need. You would end up running both Memcached and Redis, adding operational complexity for marginal performance gain on a single use case.

How do I handle Redis downtime without losing active conversations?

Design your agent system to degrade gracefully when Redis is unavailable. Fall back to PostgreSQL for session reads (slower but functional). Disable caching and accept higher LLM costs temporarily. Queue pub/sub events in memory and replay when Redis recovers. Use Redis Sentinel or Redis Cluster for automatic failover with sub-second recovery.

What TTL should I set for LLM response cache entries?

It depends on how time-sensitive the content is. For FAQ-style responses and classification results, 24 hours is appropriate. For responses that reference live data (account balances, appointment availability), use 5-15 minutes or skip caching entirely. For embedding computations on static content, cache for 7 days or longer.

Can Redis Streams replace Kafka for agent event processing?

For small to medium agent deployments (under 50K events per second), Redis Streams are a simpler alternative to Kafka with similar semantics: append-only log, consumer groups, acknowledgment. Choose Kafka when you need multi-datacenter replication, longer retention (weeks or months), or higher throughput. Redis Streams are ideal when you already have Redis deployed and want to avoid adding another infrastructure component.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Technical Guides

Scaling AI Voice Agents to 1000+ Concurrent Calls: Architecture Guide

Architecture patterns for scaling AI voice agents to 1000+ concurrent calls — horizontal scaling, connection pooling, and queue management.

Technical Guides

Voice AI Latency: Why Sub-Second Response Time Matters (And How to Hit It)

A technical breakdown of voice AI latency budgets — STT, LLM, TTS, network — and how to hit sub-second end-to-end response times.

Learn Agentic AI

AI Agent Cost Optimization: Reducing LLM API Spend by 70% with Caching and Routing

Practical cost reduction strategies for AI agents including semantic caching, intelligent model routing, prompt optimization, and batch processing to cut LLM API spend.

Learn Agentic AI

Sub-500ms Latency Voice Agents: Architecture Patterns for Production Deployment

Technical deep dive into achieving under 500ms voice agent latency with streaming architectures, edge deployment, connection pooling, pre-warming, and async tool execution.

Learn Agentic AI

How NVIDIA Vera CPU Solves the Agentic AI Bottleneck: Architecture Deep Dive

Technical analysis of NVIDIA's Vera CPU designed for agentic AI workloads — why the CPU is the bottleneck, how Vera's architecture addresses it, and what it means for agent performance.

Learn Agentic AI

Token-Efficient Agent Design: Reducing LLM Costs Without Sacrificing Quality

Practical strategies for reducing LLM token costs in agentic systems including compact prompts, tool result summarization, selective context, and model tiering approaches.