---
title: "Custom Session Implementations: The SessionABC Protocol"
description: "Build custom session backends for the OpenAI Agents SDK by implementing the SessionABC protocol with complete DynamoDB and MongoDB examples and testing strategies."
canonical: https://callsphere.ai/blog/custom-session-implementations-session-abc-protocol
category: "Learn Agentic AI"
tags: ["OpenAI", "Custom Sessions", "Protocol", "DynamoDB", "Python"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-05-06T01:02:41.479Z
---

# Custom Session Implementations: The SessionABC Protocol

> Build custom session backends for the OpenAI Agents SDK by implementing the SessionABC protocol with complete DynamoDB and MongoDB examples and testing strategies.

## When Built-In Sessions Are Not Enough

The OpenAI Agents SDK ships with SQLite, Redis, and SQLAlchemy sessions. For most applications, one of these fits. But sometimes you need something different:

- Your organization standardizes on DynamoDB and does not run Redis or PostgreSQL
- You want to store sessions in MongoDB alongside your document-oriented data
- You need a session that integrates with a proprietary storage system
- You want to add custom middleware (logging, metrics, validation) at the session layer

The SDK defines a clear protocol — `SessionABC` — that any custom session must implement. Build to this interface and your custom session works with every feature of the SDK: runners, compaction, encryption wrappers, and multi-agent handoffs.

## The SessionABC Interface

`SessionABC` is an abstract base class with four required methods:

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
from abc import ABC, abstractmethod
from typing import Any

class SessionABC(ABC):
    @abstractmethod
    async def get_items(self, session_id: str, limit: int | None = None) -> list[Any]:
        """Retrieve conversation items for a session."""
        ...

    @abstractmethod
    async def add_items(self, session_id: str, items: list[Any]) -> None:
        """Append new items to a session."""
        ...

    @abstractmethod
    async def pop_item(self, session_id: str) -> Any | None:
        """Remove and return the last item from a session."""
        ...

    @abstractmethod
    async def clear_session(self, session_id: str) -> None:
        """Delete all items for a session."""
        ...
```

### Method Responsibilities

| Method | Purpose | Called By |
| --- | --- | --- |
| `get_items()` | Load history before each run | Runner (pre-run) |
| `add_items()` | Persist new turns after each run | Runner (post-run) |
| `pop_item()` | Remove last item (undo/correction) | Manual or compaction |
| `clear_session()` | Wipe entire session | Manual cleanup |

## Building a DynamoDB Session

Let us build a complete DynamoDB-backed session. DynamoDB is a natural fit for session data: it scales automatically, supports TTL, and is serverless.

### Table Design

DynamoDB table: `agent_sessions`

- Partition key: `session_id` (String)
- Sort key: `item_order` (Number)
- TTL attribute: `expires_at` (Number — Unix timestamp)

### Implementation

```python
import json
import time
from typing import Any
import boto3
from botocore.config import Config
from agents.extensions.sessions import SessionABC

class DynamoDBSession(SessionABC):
    """DynamoDB-backed session for the OpenAI Agents SDK."""

    def __init__(
        self,
        table_name: str = "agent_sessions",
        region: str = "us-east-1",
        ttl_seconds: int | None = None,
    ):
        config = Config(region_name=region)
        self.dynamodb = boto3.resource("dynamodb", config=config)
        self.table = self.dynamodb.Table(table_name)
        self.ttl_seconds = ttl_seconds

    async def get_items(
        self, session_id: str, limit: int | None = None
    ) -> list[Any]:
        """Retrieve items from DynamoDB, ordered by item_order."""
        kwargs = {
            "KeyConditionExpression": "session_id = :sid",
            "ExpressionAttributeValues": {":sid": session_id},
            "ScanIndexForward": True,  # Ascending order
        }

        if limit is not None:
            kwargs["Limit"] = limit

        response = self.table.query(**kwargs)
        items = response.get("Items", [])

        return [json.loads(item["item_data"]) for item in items]

    async def add_items(self, session_id: str, items: list[Any]) -> None:
        """Append items to the session in DynamoDB."""
        # Get current max item_order
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,
            Limit=1,
            ProjectionExpression="item_order",
        )

        existing = response.get("Items", [])
        next_order = (existing[0]["item_order"] + 1) if existing else 0

        # Batch write new items
        with self.table.batch_writer() as batch:
            for i, item in enumerate(items):
                record = {
                    "session_id": session_id,
                    "item_order": next_order + i,
                    "item_data": json.dumps(item),
                    "created_at": int(time.time()),
                }

                if self.ttl_seconds:
                    record["expires_at"] = int(time.time()) + self.ttl_seconds

                batch.put_item(Item=record)

    async def pop_item(self, session_id: str) -> Any | None:
        """Remove and return the last item."""
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ScanIndexForward=False,
            Limit=1,
        )

        items = response.get("Items", [])
        if not items:
            return None

        last_item = items[0]
        self.table.delete_item(
            Key={
                "session_id": session_id,
                "item_order": last_item["item_order"],
            }
        )

        return json.loads(last_item["item_data"])

    async def clear_session(self, session_id: str) -> None:
        """Delete all items for a session."""
        response = self.table.query(
            KeyConditionExpression="session_id = :sid",
            ExpressionAttributeValues={":sid": session_id},
            ProjectionExpression="session_id, item_order",
        )

        with self.table.batch_writer() as batch:
            for item in response["Items"]:
                batch.delete_item(
                    Key={
                        "session_id": item["session_id"],
                        "item_order": item["item_order"],
                    }
                )
```

### Using the DynamoDB Session

```python
from agents import Agent, Runner

session = DynamoDBSession(
    table_name="agent_sessions",
    region="us-east-1",
    ttl_seconds=60 * 60 * 24 * 7,  # 7-day TTL
)

agent = Agent(name="DynamoAgent", instructions="You remember conversations.")

result = await Runner.run(
    agent, "Hello!", session=session, session_id="user-abc-conv-1"
)
```

## Building a MongoDB Session

MongoDB is another popular choice, especially for teams already using it as their primary datastore.

```python
import json
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from agents.extensions.sessions import SessionABC

class MongoDBSession(SessionABC):
    """MongoDB-backed session using Motor async driver."""

    def __init__(self, mongo_url: str, database: str = "agents", collection: str = "sessions"):
        self.client = AsyncIOMotorClient(mongo_url)
        self.collection = self.client[database][collection]

    async def initialize(self):
        """Create indexes for efficient queries."""
        await self.collection.create_index(
            [("session_id", 1), ("item_order", 1)],
            unique=True,
        )
        await self.collection.create_index("session_id")

    async def get_items(
        self, session_id: str, limit: int | None = None
    ) -> list[Any]:
        cursor = self.collection.find(
            {"session_id": session_id},
            sort=[("item_order", 1)],
        )

        if limit is not None:
            cursor = cursor.limit(limit)

        items = []
        async for doc in cursor:
            items.append(doc["item_data"])

        return items

    async def add_items(self, session_id: str, items: list[Any]) -> None:
        # Get current max order
        last_doc = await self.collection.find_one(
            {"session_id": session_id},
            sort=[("item_order", -1)],
            projection={"item_order": 1},
        )

        next_order = (last_doc["item_order"] + 1) if last_doc else 0

        documents = [
            {
                "session_id": session_id,
                "item_order": next_order + i,
                "item_data": item,
                "created_at": time.time(),
            }
            for i, item in enumerate(items)
        ]

        if documents:
            await self.collection.insert_many(documents)

    async def pop_item(self, session_id: str) -> Any | None:
        last_doc = await self.collection.find_one_and_delete(
            {"session_id": session_id},
            sort=[("item_order", -1)],
        )

        if last_doc:
            return last_doc["item_data"]
        return None

    async def clear_session(self, session_id: str) -> None:
        await self.collection.delete_many({"session_id": session_id})
```

### Using the MongoDB Session

```python
import asyncio
from agents import Agent, Runner

async def main():
    session = MongoDBSession("mongodb://localhost:27017")
    await session.initialize()

    agent = Agent(name="MongoAgent", instructions="You are helpful.")

    result = await Runner.run(
        agent, "Remember: my API key rotates on Fridays.",
        session=session, session_id="ops-team-conv"
    )
    print(result.final_output)

asyncio.run(main())
```

## Testing Custom Sessions

Every custom session should pass a standard test suite that validates the SessionABC contract:

```python
import pytest
import asyncio

async def session_contract_tests(session):
    """Test suite that validates any SessionABC implementation."""
    sid = "test-session-001"

    # Start clean
    await session.clear_session(sid)

    # Test empty session
    items = await session.get_items(sid)
    assert items == [], "Empty session should return empty list"

    # Test add and retrieve
    await session.add_items(sid, [{"role": "user", "content": "Hello"}])
    items = await session.get_items(sid)
    assert len(items) == 1
    assert items[0]["content"] == "Hello"

    # Test multiple adds
    await session.add_items(sid, [
        {"role": "assistant", "content": "Hi there"},
        {"role": "user", "content": "How are you?"},
    ])
    items = await session.get_items(sid)
    assert len(items) == 3

    # Test ordering
    assert items[0]["content"] == "Hello"
    assert items[1]["content"] == "Hi there"
    assert items[2]["content"] == "How are you?"

    # Test limit
    items = await session.get_items(sid, limit=2)
    assert len(items) == 2

    # Test pop_item
    popped = await session.pop_item(sid)
    assert popped["content"] == "How are you?"
    items = await session.get_items(sid)
    assert len(items) == 2

    # Test pop from empty
    await session.clear_session(sid)
    popped = await session.pop_item(sid)
    assert popped is None

    # Test session isolation
    await session.add_items("session-a", [{"data": "a"}])
    await session.add_items("session-b", [{"data": "b"}])
    assert (await session.get_items("session-a"))[0]["data"] == "a"
    assert (await session.get_items("session-b"))[0]["data"] == "b"

    # Cleanup
    await session.clear_session("session-a")
    await session.clear_session("session-b")

    print("All contract tests passed!")

# Run against your implementation
# asyncio.run(session_contract_tests(DynamoDBSession()))
# asyncio.run(session_contract_tests(MongoDBSession("mongodb://localhost")))
```

Building to the SessionABC protocol means your custom session integrates seamlessly with all SDK features — compaction, encryption, session sharing, and multi-agent handoffs work out of the box.

**Sources:**

- [https://openai.github.io/openai-agents-python/sessions/](https://openai.github.io/openai-agents-python/sessions/)
- [https://github.com/openai/openai-agents-python](https://github.com/openai/openai-agents-python)

---

Source: https://callsphere.ai/blog/custom-session-implementations-session-abc-protocol
