Skip to content
Learn Agentic AI
Learn Agentic AI10 min read5 views

Custom Session Implementations: The SessionABC Protocol

Build custom session backends for the OpenAI Agents SDK by implementing the SessionABC protocol with complete DynamoDB and MongoDB examples and testing strategies.

When Built-In Sessions Are Not Enough

The OpenAI Agents SDK ships with SQLite, Redis, and SQLAlchemy sessions. For most applications, one of these fits. But sometimes you need something different:

  • Your organization standardizes on DynamoDB and does not run Redis or PostgreSQL
  • You want to store sessions in MongoDB alongside your document-oriented data
  • You need a session that integrates with a proprietary storage system
  • You want to add custom middleware (logging, metrics, validation) at the session layer

The SDK defines a clear protocol — SessionABC — that any custom session must implement. Build to this interface and your custom session works with every feature of the SDK: runners, compaction, encryption wrappers, and multi-agent handoffs.

The SessionABC Interface

SessionABC is an abstract base class with four required methods:

flowchart TD
    START["Custom Session Implementations: The SessionABC Pr…"] --> A
    A["When Built-In Sessions Are Not Enough"]
    A --> B
    B["The SessionABC Interface"]
    B --> C
    C["Building a DynamoDB Session"]
    C --> D
    D["Building a MongoDB Session"]
    D --> E
    E["Testing Custom Sessions"]
    E --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from abc import ABC, abstractmethod
from typing import Any

class SessionABC(ABC):
    @abstractmethod
    async def get_items(self, session_id: str, limit: int | None = None) -> list[Any]:
        """Retrieve conversation items for a session."""
        ...

    @abstractmethod
    async def add_items(self, session_id: str, items: list[Any]) -> None:
        """Append new items to a session."""
        ...

    @abstractmethod
    async def pop_item(self, session_id: str) -> Any | None:
        """Remove and return the last item from a session."""
        ...

    @abstractmethod
    async def clear_session(self, session_id: str) -> None:
        """Delete all items for a session."""
        ...

Method Responsibilities

Method Purpose Called By
get_items() Load history before each run Runner (pre-run)
add_items() Persist new turns after each run Runner (post-run)
pop_item() Remove last item (undo/correction) Manual or compaction
clear_session() Wipe entire session Manual cleanup

Building a DynamoDB Session

Let us build a complete DynamoDB-backed session. DynamoDB is a natural fit for session data: it scales automatically, supports TTL, and is serverless.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

flowchart TD
    ROOT["Custom Session Implementations: The SessionA…"] 
    ROOT --> P0["The SessionABC Interface"]
    P0 --> P0C0["Method Responsibilities"]
    ROOT --> P1["Building a DynamoDB Session"]
    P1 --> P1C0["Table Design"]
    P1 --> P1C1["Implementation"]
    P1 --> P1C2["Using the DynamoDB Session"]
    ROOT --> P2["Building a MongoDB Session"]
    P2 --> P2C0["Using the MongoDB Session"]
    style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
    style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
    style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b

Table Design

DynamoDB table: agent_sessions

  • Partition key: session_id (String)
  • Sort key: item_order (Number)
  • TTL attribute: expires_at (Number — Unix timestamp)

Implementation

import json
import time
from typing import Any
import boto3
from botocore.config import Config
from agents.extensions.sessions import SessionABC

class DynamoDBSession(SessionABC):
    """DynamoDB-backed session for the OpenAI Agents SDK."""

    def __init__(
        self,
        table_name: str = "agent_sessions",
        region: str = "us-east-1",
        ttl_seconds: int | None = None,
    ):
        config = Config(region_name=region)
        self.dynamodb = boto3.resource("dynamodb", config=config)
        self.table = self.dynamodb.Table(table_name)
        self.ttl_seconds = ttl_seconds

    async def get_items(
        self, session_id: str, limit: int | None = None
    ) -> list[Any]:
        """Retrieve items from DynamoDB, ordered by item_order."""
        kwargs = {
            "KeyConditionExpression": "session_id = :sid",
            "ExpressionAttributeValues": {":sid": session_id},
            "ScanIndexForward": True,  # Ascending order
        }

        if limit is not None:
            kwargs["Limit"] = limit

        response = self.table.query(**kwargs)
        items = response.get("Items", [])

        return [json.loads(item["item_data"]) for item in items]

    async def add_items(self, session_id: str, items: list[Any]) -> None:
        """Append items to the session in DynamoDB."""
        # Get current max item_order
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,
            Limit=1,
            ProjectionExpression="item_order",
        )

        existing = response.get("Items", [])
        next_order = (existing[0]["item_order"] + 1) if existing else 0

        # Batch write new items
        with self.table.batch_writer() as batch:
            for i, item in enumerate(items):
                record = {
                    "session_id": session_id,
                    "item_order": next_order + i,
                    "item_data": json.dumps(item),
                    "created_at": int(time.time()),
                }

                if self.ttl_seconds:
                    record["expires_at"] = int(time.time()) + self.ttl_seconds

                batch.put_item(Item=record)

    async def pop_item(self, session_id: str) -> Any | None:
        """Remove and return the last item."""
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,
            Limit=1,
        )

        items = response.get("Items", [])
        if not items:
            return None

        last_item = items[0]
        self.table.delete_item(
            Key={
                "session_id": session_id,
                "item_order": last_item["item_order"],
            }
        )

        return json.loads(last_item["item_data"])

    async def clear_session(self, session_id: str) -> None:
        """Delete all items for a session."""
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ProjectionExpression="session_id, item_order",
        )

        with self.table.batch_writer() as batch:
            for item in response["Items"]:
                batch.delete_item(
                    Key={
                        "session_id": item["session_id"],
                        "item_order": item["item_order"],
                    }
                )

Using the DynamoDB Session

from agents import Agent, Runner

session = DynamoDBSession(
    table_name="agent_sessions",
    region="us-east-1",
    ttl_seconds=60 * 60 * 24 * 7,  # 7-day TTL
)

agent = Agent(name="DynamoAgent", instructions="You remember conversations.")

result = await Runner.run(
    agent, "Hello!", session=session, session_id="user-abc-conv-1"
)

Building a MongoDB Session

MongoDB is another popular choice, especially for teams already using it as their primary datastore.

flowchart TD
    CENTER(("Core Concepts"))
    CENTER --> N0["Your organization standardizes on Dynam…"]
    CENTER --> N1["You want to store sessions in MongoDB a…"]
    CENTER --> N2["You need a session that integrates with…"]
    CENTER --> N3["You want to add custom middleware loggi…"]
    CENTER --> N4["Partition key: session_id String"]
    CENTER --> N5["Sort key: item_order Number"]
    style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
import json
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from agents.extensions.sessions import SessionABC

class MongoDBSession(SessionABC):
    """MongoDB-backed session using Motor async driver."""

    def __init__(self, mongo_url: str, database: str = "agents", collection: str = "sessions"):
        self.client = AsyncIOMotorClient(mongo_url)
        self.collection = self.client[database][collection]

    async def initialize(self):
        """Create indexes for efficient queries."""
        await self.collection.create_index(
            [("session_id", 1), ("item_order", 1)],
            unique=True,
        )
        await self.collection.create_index("session_id")

    async def get_items(
        self, session_id: str, limit: int | None = None
    ) -> list[Any]:
        cursor = self.collection.find(
            {"session_id": session_id},
            sort=[("item_order", 1)],
        )

        if limit is not None:
            cursor = cursor.limit(limit)

        items = []
        async for doc in cursor:
            items.append(doc["item_data"])

        return items

    async def add_items(self, session_id: str, items: list[Any]) -> None:
        # Get current max order
        last_doc = await self.collection.find_one(
            {"session_id": session_id},
            sort=[("item_order", -1)],
            projection={"item_order": 1},
        )

        next_order = (last_doc["item_order"] + 1) if last_doc else 0

        documents = [
            {
                "session_id": session_id,
                "item_order": next_order + i,
                "item_data": item,
                "created_at": time.time(),
            }
            for i, item in enumerate(items)
        ]

        if documents:
            await self.collection.insert_many(documents)

    async def pop_item(self, session_id: str) -> Any | None:
        last_doc = await self.collection.find_one_and_delete(
            {"session_id": session_id},
            sort=[("item_order", -1)],
        )

        if last_doc:
            return last_doc["item_data"]
        return None

    async def clear_session(self, session_id: str) -> None:
        await self.collection.delete_many({"session_id": session_id})

Using the MongoDB Session

import asyncio
from agents import Agent, Runner

async def main():
    session = MongoDBSession("mongodb://localhost:27017")
    await session.initialize()

    agent = Agent(name="MongoAgent", instructions="You are helpful.")

    result = await Runner.run(
        agent, "Remember: my API key rotates on Fridays.",
        session=session, session_id="ops-team-conv"
    )
    print(result.final_output)

asyncio.run(main())

Testing Custom Sessions

Every custom session should pass a standard test suite that validates the SessionABC contract:

import pytest
import asyncio

async def session_contract_tests(session):
    """Test suite that validates any SessionABC implementation."""
    sid = "test-session-001"

    # Start clean
    await session.clear_session(sid)

    # Test empty session
    items = await session.get_items(sid)
    assert items == [], "Empty session should return empty list"

    # Test add and retrieve
    await session.add_items(sid, [{"role": "user", "content": "Hello"}])
    items = await session.get_items(sid)
    assert len(items) == 1
    assert items[0]["content"] == "Hello"

    # Test multiple adds
    await session.add_items(sid, [
        {"role": "assistant", "content": "Hi there"},
        {"role": "user", "content": "How are you?"},
    ])
    items = await session.get_items(sid)
    assert len(items) == 3

    # Test ordering
    assert items[0]["content"] == "Hello"
    assert items[1]["content"] == "Hi there"
    assert items[2]["content"] == "How are you?"

    # Test limit
    items = await session.get_items(sid, limit=2)
    assert len(items) == 2

    # Test pop_item
    popped = await session.pop_item(sid)
    assert popped["content"] == "How are you?"
    items = await session.get_items(sid)
    assert len(items) == 2

    # Test pop from empty
    await session.clear_session(sid)
    popped = await session.pop_item(sid)
    assert popped is None

    # Test session isolation
    await session.add_items("session-a", [{"data": "a"}])
    await session.add_items("session-b", [{"data": "b"}])
    assert (await session.get_items("session-a"))[0]["data"] == "a"
    assert (await session.get_items("session-b"))[0]["data"] == "b"

    # Cleanup
    await session.clear_session("session-a")
    await session.clear_session("session-b")

    print("All contract tests passed!")

# Run against your implementation
# asyncio.run(session_contract_tests(DynamoDBSession()))
# asyncio.run(session_contract_tests(MongoDBSession("mongodb://localhost")))

Building to the SessionABC protocol means your custom session integrates seamlessly with all SDK features — compaction, encryption, session sharing, and multi-agent handoffs work out of the box.

Sources:

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Technical Guides

How AI Voice Agents Actually Work: Technical Deep Dive (2026 Edition)

A full technical walkthrough of how modern AI voice agents work — speech-to-text, LLM orchestration, TTS, tool calling, and sub-second latency.

Technical Guides

Voice AI Latency: Why Sub-Second Response Time Matters (And How to Hit It)

A technical breakdown of voice AI latency budgets — STT, LLM, TTS, network — and how to hit sub-second end-to-end response times.

Technical Guides

Building Voice Agents with the OpenAI Realtime API: Full Tutorial

Hands-on tutorial for building voice agents with the OpenAI Realtime API — WebSocket setup, PCM16 audio, server VAD, and function calling.

AI Interview Prep

8 AI System Design Interview Questions Actually Asked at FAANG in 2026

Real AI system design interview questions from Google, Meta, OpenAI, and Anthropic. Covers LLM serving, RAG pipelines, recommendation systems, AI agents, and more — with detailed answer frameworks.

AI Interview Prep

8 LLM & RAG Interview Questions That OpenAI, Anthropic & Google Actually Ask

Real LLM and RAG interview questions from top AI labs in 2026. Covers fine-tuning vs RAG decisions, production RAG pipelines, evaluation, PEFT methods, positional embeddings, and safety guardrails with expert answers.

AI Interview Prep

7 ML Fundamentals Questions That Top AI Companies Still Ask in 2026

Real machine learning fundamentals interview questions from OpenAI, Google DeepMind, Meta, and xAI in 2026. Covers attention mechanisms, KV cache, distributed training, MoE, speculative decoding, and emerging architectures.