Skip to content
Learn Agentic AI
Learn Agentic AI11 min read2 views

Event Sourcing for AI Agents: Recording Every Decision for Replay and Audit

Implement event sourcing for AI agent systems to create a complete audit trail of every decision, enable workflow replay for debugging, and build projections that reconstruct agent state from event history.

Why Event Sourcing Fits AI Agents

Traditional state-based systems store only the current state. When an AI agent makes a series of tool calls, reasoning steps, and decisions, you only see the final output. But debugging an agent that produced a wrong answer requires understanding the full chain: which tools were called, what data came back, how the LLM interpreted the results, and where the reasoning went wrong.

Event sourcing solves this by storing every state change as an immutable event. Instead of updating a "current state" record, you append events. The current state is always derivable by replaying the event stream from the beginning.

Designing the Event Store

The event store is an append-only log. Each event carries a timestamp, a type, the agent session it belongs to, and a payload:

flowchart TD
    START["Event Sourcing for AI Agents: Recording Every Dec…"] --> A
    A["Why Event Sourcing Fits AI Agents"]
    A --> B
    B["Designing the Event Store"]
    B --> C
    C["Recording Agent Decisions"]
    C --> D
    D["Building Projections"]
    D --> E
    E["Replaying for Debugging"]
    E --> F
    F["Persistent Event Store with SQLite"]
    F --> G
    G["FAQ"]
    G --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import uuid
import json
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import Any

@dataclass
class AgentEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    session_id: str = ""
    event_type: str = ""
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    payload: dict[str, Any] = field(default_factory=dict)
    version: int = 0

class EventStore:
    """Append-only event store for agent sessions."""

    def __init__(self):
        self._events: list[AgentEvent] = []

    def append(self, event: AgentEvent):
        event.version = len(self._events) + 1
        self._events.append(event)

    def get_events(
        self, session_id: str, after_version: int = 0
    ) -> list[AgentEvent]:
        return [
            e for e in self._events
            if e.session_id == session_id and e.version > after_version
        ]

    def get_all_events(self) -> list[AgentEvent]:
        return list(self._events)

Recording Agent Decisions

Wrap your agent execution to emit events at each decision point:

class EventSourcedAgent:
    """Agent that records every action as an event."""

    def __init__(self, session_id: str, store: EventStore):
        self.session_id = session_id
        self.store = store

    def _emit(self, event_type: str, payload: dict):
        self.store.append(AgentEvent(
            session_id=self.session_id,
            event_type=event_type,
            payload=payload,
        ))

    async def run(self, user_message: str):
        self._emit("user_message_received", {"message": user_message})

        # Step 1: Decide which tool to call
        tool_choice = await self._decide_tool(user_message)
        self._emit("tool_selected", {
            "tool": tool_choice["name"],
            "reasoning": tool_choice["reasoning"],
        })

        # Step 2: Execute the tool
        tool_result = await self._execute_tool(tool_choice)
        self._emit("tool_executed", {
            "tool": tool_choice["name"],
            "result": tool_result,
        })

        # Step 3: Generate final response
        response = await self._generate_response(user_message, tool_result)
        self._emit("response_generated", {
            "response": response,
            "tokens_used": len(response.split()),
        })

        return response

Every decision becomes a first-class record. You know exactly which tool was chosen, why the LLM chose it, what the tool returned, and what final answer was produced.

Building Projections

A projection reads the event stream and builds a view optimized for a specific query. For example, a "session summary" projection:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

@dataclass
class SessionSummary:
    session_id: str
    total_messages: int = 0
    tools_used: list[str] = field(default_factory=list)
    total_tokens: int = 0
    errors: list[str] = field(default_factory=list)
    started_at: str = ""
    last_activity: str = ""

def build_session_summary(
    store: EventStore, session_id: str
) -> SessionSummary:
    """Project events into a session summary."""
    summary = SessionSummary(session_id=session_id)
    events = store.get_events(session_id)

    for event in events:
        if not summary.started_at:
            summary.started_at = event.timestamp
        summary.last_activity = event.timestamp

        if event.event_type == "user_message_received":
            summary.total_messages += 1
        elif event.event_type == "tool_executed":
            summary.tools_used.append(event.payload["tool"])
        elif event.event_type == "response_generated":
            summary.total_tokens += event.payload.get("tokens_used", 0)
        elif event.event_type == "error_occurred":
            summary.errors.append(event.payload.get("error", ""))

    return summary

Different projections can answer different questions: "Which sessions used the search tool?" or "What was the average token count per session?" — all derived from the same event stream.

Replaying for Debugging

The most powerful feature of event sourcing is replay. When an agent produces a bad result, load its event stream and step through it:

def replay_session(store: EventStore, session_id: str):
    """Replay a session step by step for debugging."""
    events = store.get_events(session_id)
    print(f"Replaying session {session_id} ({len(events)} events)")

    for event in events:
        print(f"  [{event.timestamp}] {event.event_type}")
        if event.event_type == "tool_selected":
            print(f"    Tool: {event.payload['tool']}")
            print(f"    Reasoning: {event.payload['reasoning']}")
        elif event.event_type == "tool_executed":
            result = str(event.payload["result"])[:200]
            print(f"    Result: {result}")
        elif event.event_type == "error_occurred":
            print(f"    ERROR: {event.payload['error']}")

You can also build automated regression testing by replaying a session with a different LLM model and comparing the decision points.

Persistent Event Store with SQLite

For production use, persist events to a database:

import sqlite3

class SQLiteEventStore(EventStore):
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS events (
                event_id TEXT PRIMARY KEY,
                session_id TEXT NOT NULL,
                event_type TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                payload TEXT NOT NULL,
                version INTEGER NOT NULL
            )
        """)
        self.conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_session ON events(session_id, version)"
        )

    def append(self, event: AgentEvent):
        event.version = self._next_version(event.session_id)
        self.conn.execute(
            "INSERT INTO events VALUES (?, ?, ?, ?, ?, ?)",
            (event.event_id, event.session_id, event.event_type,
             event.timestamp, json.dumps(event.payload), event.version),
        )
        self.conn.commit()

    def _next_version(self, session_id: str) -> int:
        row = self.conn.execute(
            "SELECT MAX(version) FROM events WHERE session_id = ?",
            (session_id,),
        ).fetchone()
        return (row[0] or 0) + 1

FAQ

How is event sourcing different from just logging agent actions?

Logging is unstructured and designed for humans to read. Event sourcing produces structured, typed events that can be replayed programmatically to reconstruct state. You can build multiple projections from the same events, run automated regression tests, and guarantee that your audit trail is complete because the events are the source of truth — not a side effect.

Does event sourcing add significant overhead to agent execution?

The overhead is minimal. Appending a small JSON event to a database or file takes microseconds compared to the seconds spent on LLM calls and tool executions. The storage cost grows linearly with the number of events, but agent sessions rarely produce more than a few hundred events, so storage is not a practical concern.

How do I handle schema evolution when event payloads change over time?

Use versioned event types. When a payload schema changes, create a new version (e.g., tool_executed_v2) and write an upcaster that converts v1 events to v2 format during replay. This ensures old events remain readable without modifying the immutable event store.


#EventSourcing #AuditTrail #Debugging #AgentArchitecture #Python #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

Microservices for AI Agents: Service Decomposition and Inter-Agent Communication

How to structure AI agents as microservices with proper service boundaries, gRPC communication, circuit breakers, health checks, and service mesh integration.

Learn Agentic AI

Event-Driven Agent Architectures: Using NATS, Kafka, and Redis Streams for Agent Communication

Deep dive into event-driven patterns for AI agent coordination: pub/sub messaging, dead letter queues, exactly-once processing with NATS, Kafka, and Redis Streams.

Learn Agentic AI

Building a Research Agent with Web Search and Report Generation: Complete Tutorial

Build a research agent that searches the web, extracts and synthesizes data, and generates formatted reports using OpenAI Agents SDK and web search tools.

Learn Agentic AI

OpenAI Agents SDK in 2026: Building Multi-Agent Systems with Handoffs and Guardrails

Complete tutorial on the OpenAI Agents SDK covering agent creation, tool definitions, handoff patterns between specialist agents, and input/output guardrails for safe AI systems.