Custom Session Implementations: The SessionABC Protocol
Build custom session backends for the OpenAI Agents SDK by implementing the SessionABC protocol with complete DynamoDB and MongoDB examples and testing strategies.
When Built-In Sessions Are Not Enough
The OpenAI Agents SDK ships with SQLite, Redis, and SQLAlchemy sessions. For most applications, one of these fits. But sometimes you need something different:
- Your organization standardizes on DynamoDB and does not run Redis or PostgreSQL
- You want to store sessions in MongoDB alongside your document-oriented data
- You need a session that integrates with a proprietary storage system
- You want to add custom middleware (logging, metrics, validation) at the session layer
The SDK defines a clear protocol — SessionABC — that any custom session must implement. Build to this interface and your custom session works with every feature of the SDK: runners, compaction, encryption wrappers, and multi-agent handoffs.
The SessionABC Interface
SessionABC is an abstract base class with four required methods:
flowchart TD
START["Custom Session Implementations: The SessionABC Pr…"] --> A
A["When Built-In Sessions Are Not Enough"]
A --> B
B["The SessionABC Interface"]
B --> C
C["Building a DynamoDB Session"]
C --> D
D["Building a MongoDB Session"]
D --> E
E["Testing Custom Sessions"]
E --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from abc import ABC, abstractmethod
from typing import Any
class SessionABC(ABC):
@abstractmethod
async def get_items(self, session_id: str, limit: int | None = None) -> list[Any]:
"""Retrieve conversation items for a session."""
...
@abstractmethod
async def add_items(self, session_id: str, items: list[Any]) -> None:
"""Append new items to a session."""
...
@abstractmethod
async def pop_item(self, session_id: str) -> Any | None:
"""Remove and return the last item from a session."""
...
@abstractmethod
async def clear_session(self, session_id: str) -> None:
"""Delete all items for a session."""
...
Method Responsibilities
| Method | Purpose | Called By |
|---|---|---|
get_items() |
Load history before each run | Runner (pre-run) |
add_items() |
Persist new turns after each run | Runner (post-run) |
pop_item() |
Remove last item (undo/correction) | Manual or compaction |
clear_session() |
Wipe entire session | Manual cleanup |
Building a DynamoDB Session
Let us build a complete DynamoDB-backed session. DynamoDB is a natural fit for session data: it scales automatically, supports TTL, and is serverless.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
flowchart TD
ROOT["Custom Session Implementations: The SessionA…"]
ROOT --> P0["The SessionABC Interface"]
P0 --> P0C0["Method Responsibilities"]
ROOT --> P1["Building a DynamoDB Session"]
P1 --> P1C0["Table Design"]
P1 --> P1C1["Implementation"]
P1 --> P1C2["Using the DynamoDB Session"]
ROOT --> P2["Building a MongoDB Session"]
P2 --> P2C0["Using the MongoDB Session"]
style ROOT fill:#4f46e5,stroke:#4338ca,color:#fff
style P0 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P1 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
style P2 fill:#e0e7ff,stroke:#6366f1,color:#1e293b
Table Design
DynamoDB table: agent_sessions
- Partition key:
session_id(String) - Sort key:
item_order(Number) - TTL attribute:
expires_at(Number — Unix timestamp)
Implementation
import json
import time
from typing import Any
import boto3
from botocore.config import Config
from agents.extensions.sessions import SessionABC
class DynamoDBSession(SessionABC):
"""DynamoDB-backed session for the OpenAI Agents SDK."""
def __init__(
self,
table_name: str = "agent_sessions",
region: str = "us-east-1",
ttl_seconds: int | None = None,
):
config = Config(region_name=region)
self.dynamodb = boto3.resource("dynamodb", config=config)
self.table = self.dynamodb.Table(table_name)
self.ttl_seconds = ttl_seconds
async def get_items(
self, session_id: str, limit: int | None = None
) -> list[Any]:
"""Retrieve items from DynamoDB, ordered by item_order."""
kwargs = {
"KeyConditionExpression": "session_id = :sid",
"ExpressionAttributeValues": {":sid": session_id},
"ScanIndexForward": True, # Ascending order
}
if limit is not None:
kwargs["Limit"] = limit
response = self.table.query(**kwargs)
items = response.get("Items", [])
return [json.loads(item["item_data"]) for item in items]
async def add_items(self, session_id: str, items: list[Any]) -> None:
"""Append items to the session in DynamoDB."""
# Get current max item_order
response = self.table.query(
KeyConditionExpression="session_id = :sid",
ExpressionAttributeValues={":sid": session_id},
ScanIndexForward=False,
Limit=1,
ProjectionExpression="item_order",
)
existing = response.get("Items", [])
next_order = (existing[0]["item_order"] + 1) if existing else 0
# Batch write new items
with self.table.batch_writer() as batch:
for i, item in enumerate(items):
record = {
"session_id": session_id,
"item_order": next_order + i,
"item_data": json.dumps(item),
"created_at": int(time.time()),
}
if self.ttl_seconds:
record["expires_at"] = int(time.time()) + self.ttl_seconds
batch.put_item(Item=record)
async def pop_item(self, session_id: str) -> Any | None:
"""Remove and return the last item."""
response = self.table.query(
KeyConditionExpression="session_id = :sid",
ExpressionAttributeValues={":sid": session_id},
ScanIndexForward=False,
Limit=1,
)
items = response.get("Items", [])
if not items:
return None
last_item = items[0]
self.table.delete_item(
Key={
"session_id": session_id,
"item_order": last_item["item_order"],
}
)
return json.loads(last_item["item_data"])
async def clear_session(self, session_id: str) -> None:
"""Delete all items for a session."""
response = self.table.query(
KeyConditionExpression="session_id = :sid",
ExpressionAttributeValues={":sid": session_id},
ProjectionExpression="session_id, item_order",
)
with self.table.batch_writer() as batch:
for item in response["Items"]:
batch.delete_item(
Key={
"session_id": item["session_id"],
"item_order": item["item_order"],
}
)
Using the DynamoDB Session
from agents import Agent, Runner
session = DynamoDBSession(
table_name="agent_sessions",
region="us-east-1",
ttl_seconds=60 * 60 * 24 * 7, # 7-day TTL
)
agent = Agent(name="DynamoAgent", instructions="You remember conversations.")
result = await Runner.run(
agent, "Hello!", session=session, session_id="user-abc-conv-1"
)
Building a MongoDB Session
MongoDB is another popular choice, especially for teams already using it as their primary datastore.
flowchart TD
CENTER(("Core Concepts"))
CENTER --> N0["Your organization standardizes on Dynam…"]
CENTER --> N1["You want to store sessions in MongoDB a…"]
CENTER --> N2["You need a session that integrates with…"]
CENTER --> N3["You want to add custom middleware loggi…"]
CENTER --> N4["Partition key: session_id String"]
CENTER --> N5["Sort key: item_order Number"]
style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
import json
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from agents.extensions.sessions import SessionABC
class MongoDBSession(SessionABC):
"""MongoDB-backed session using Motor async driver."""
def __init__(self, mongo_url: str, database: str = "agents", collection: str = "sessions"):
self.client = AsyncIOMotorClient(mongo_url)
self.collection = self.client[database][collection]
async def initialize(self):
"""Create indexes for efficient queries."""
await self.collection.create_index(
[("session_id", 1), ("item_order", 1)],
unique=True,
)
await self.collection.create_index("session_id")
async def get_items(
self, session_id: str, limit: int | None = None
) -> list[Any]:
cursor = self.collection.find(
{"session_id": session_id},
sort=[("item_order", 1)],
)
if limit is not None:
cursor = cursor.limit(limit)
items = []
async for doc in cursor:
items.append(doc["item_data"])
return items
async def add_items(self, session_id: str, items: list[Any]) -> None:
# Get current max order
last_doc = await self.collection.find_one(
{"session_id": session_id},
sort=[("item_order", -1)],
projection={"item_order": 1},
)
next_order = (last_doc["item_order"] + 1) if last_doc else 0
documents = [
{
"session_id": session_id,
"item_order": next_order + i,
"item_data": item,
"created_at": time.time(),
}
for i, item in enumerate(items)
]
if documents:
await self.collection.insert_many(documents)
async def pop_item(self, session_id: str) -> Any | None:
last_doc = await self.collection.find_one_and_delete(
{"session_id": session_id},
sort=[("item_order", -1)],
)
if last_doc:
return last_doc["item_data"]
return None
async def clear_session(self, session_id: str) -> None:
await self.collection.delete_many({"session_id": session_id})
Using the MongoDB Session
import asyncio
from agents import Agent, Runner
async def main():
session = MongoDBSession("mongodb://localhost:27017")
await session.initialize()
agent = Agent(name="MongoAgent", instructions="You are helpful.")
result = await Runner.run(
agent, "Remember: my API key rotates on Fridays.",
session=session, session_id="ops-team-conv"
)
print(result.final_output)
asyncio.run(main())
Testing Custom Sessions
Every custom session should pass a standard test suite that validates the SessionABC contract:
import pytest
import asyncio
async def session_contract_tests(session):
"""Test suite that validates any SessionABC implementation."""
sid = "test-session-001"
# Start clean
await session.clear_session(sid)
# Test empty session
items = await session.get_items(sid)
assert items == [], "Empty session should return empty list"
# Test add and retrieve
await session.add_items(sid, [{"role": "user", "content": "Hello"}])
items = await session.get_items(sid)
assert len(items) == 1
assert items[0]["content"] == "Hello"
# Test multiple adds
await session.add_items(sid, [
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "How are you?"},
])
items = await session.get_items(sid)
assert len(items) == 3
# Test ordering
assert items[0]["content"] == "Hello"
assert items[1]["content"] == "Hi there"
assert items[2]["content"] == "How are you?"
# Test limit
items = await session.get_items(sid, limit=2)
assert len(items) == 2
# Test pop_item
popped = await session.pop_item(sid)
assert popped["content"] == "How are you?"
items = await session.get_items(sid)
assert len(items) == 2
# Test pop from empty
await session.clear_session(sid)
popped = await session.pop_item(sid)
assert popped is None
# Test session isolation
await session.add_items("session-a", [{"data": "a"}])
await session.add_items("session-b", [{"data": "b"}])
assert (await session.get_items("session-a"))[0]["data"] == "a"
assert (await session.get_items("session-b"))[0]["data"] == "b"
# Cleanup
await session.clear_session("session-a")
await session.clear_session("session-b")
print("All contract tests passed!")
# Run against your implementation
# asyncio.run(session_contract_tests(DynamoDBSession()))
# asyncio.run(session_contract_tests(MongoDBSession("mongodb://localhost")))
Building to the SessionABC protocol means your custom session integrates seamlessly with all SDK features — compaction, encryption, session sharing, and multi-agent handoffs work out of the box.
Sources:
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.