Skip to content
Event Replay and Dead Letter Processing for AI Agent Systems
Learn Agentic AI12 min read14 views

Event Replay and Dead Letter Processing for AI Agent Systems

Build resilient event replay infrastructure and dead letter queue management for AI agent systems with proper logging, recovery patterns, and operational tooling in Python.

Why Event Replay Matters for AI Agents

AI agents fail. LLM APIs go down, rate limits are hit, prompts produce invalid output, and downstream services become unavailable. In a traditional system, a failed HTTP request gets retried by the client. In an event-driven AI agent system, a failed event means a lost action — a support ticket that never gets triaged, a payment failure that never gets handled, a lead that never gets scored.

Event replay and dead letter queue (DLQ) processing solve this problem. Every event is logged when received. Events that fail processing are moved to a DLQ with full error context. Engineers can inspect failed events, fix the underlying issue, and replay them — either individually or in bulk. This transforms your agent system from fragile to resilient.

Event Logging Infrastructure

The foundation is a complete event log. Every event that enters your system gets stored with its full payload, processing status, and metadata.

flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway<br/>auth plus rate limit"]
    APP["FastAPI app<br/>handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer<br/>business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
from datetime import datetime
from enum import Enum
from pydantic import BaseModel
import uuid
import json
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import Text, DateTime, Index

class Base(DeclarativeBase):
    pass

class EventStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    DEAD_LETTERED = "dead_lettered"
    REPLAYED = "replayed"

class EventLog(Base):
    __tablename__ = "event_log"

    id: Mapped[str] = mapped_column(primary_key=True, default=lambda: str(uuid.uuid4()))
    event_type: Mapped[str] = mapped_column(index=True)
    source: Mapped[str] = mapped_column(index=True)
    payload: Mapped[str] = mapped_column(Text)
    status: Mapped[str] = mapped_column(default=EventStatus.PENDING, index=True)
    error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
    retry_count: Mapped[int] = mapped_column(default=0)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow, index=True
    )
    processed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
    original_event_id: Mapped[str | None] = mapped_column(nullable=True)

    __table_args__ = (
        Index("idx_status_created", "status", "created_at"),
    )

The composite index on status and created_at is critical. It enables efficient queries for "show me all failed events from the last hour" without scanning the entire table.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Wrapping Event Processing with Logging

Wrap every event handler with logging that captures success, failure, and error details.

from sqlalchemy.ext.asyncio import async_sessionmaker

engine = create_async_engine("postgresql+asyncpg://localhost/agent_events")
async_session = async_sessionmaker(engine, class_=AsyncSession)

MAX_RETRIES = 3

async def process_event_with_logging(
    event_type: str,
    source: str,
    payload: dict,
    handler,
    event_id: str | None = None,
):
    log_id = event_id or str(uuid.uuid4())

    async with async_session() as session:
        log_entry = EventLog(
            id=log_id,
            event_type=event_type,
            source=source,
            payload=json.dumps(payload),
            status=EventStatus.PROCESSING,
        )
        session.add(log_entry)
        await session.commit()

    try:
        await handler(payload)
        async with async_session() as session:
            log_entry = await session.get(EventLog, log_id)
            log_entry.status = EventStatus.COMPLETED
            log_entry.processed_at = datetime.utcnow()
            await session.commit()

    except Exception as e:
        async with async_session() as session:
            log_entry = await session.get(EventLog, log_id)
            log_entry.retry_count += 1
            log_entry.error_message = f"{type(e).__name__}: {str(e)}"

            if log_entry.retry_count >= MAX_RETRIES:
                log_entry.status = EventStatus.DEAD_LETTERED
            else:
                log_entry.status = EventStatus.FAILED

            await session.commit()

        if log_entry.retry_count < MAX_RETRIES:
            await schedule_retry(log_id, delay_seconds=2 ** log_entry.retry_count)
        raise

The retry logic uses exponential backoff — 2 seconds, 4 seconds, 8 seconds. After the maximum retries, the event moves to the dead letter state.

Dead Letter Queue Management

Build an API to inspect, manage, and replay dead-lettered events.

from fastapi import FastAPI, Query
from sqlalchemy import select, func

app = FastAPI()

@app.get("/admin/dlq")
async def list_dead_letters(
    event_type: str | None = None,
    source: str | None = None,
    limit: int = Query(default=50, le=200),
    offset: int = Query(default=0, ge=0),
):
    async with async_session() as session:
        query = select(EventLog).where(
            EventLog.status == EventStatus.DEAD_LETTERED
        )
        if event_type:
            query = query.where(EventLog.event_type == event_type)
        if source:
            query = query.where(EventLog.source == source)

        query = query.order_by(EventLog.created_at.desc())
        query = query.offset(offset).limit(limit)

        result = await session.execute(query)
        events = result.scalars().all()

        count_query = select(func.count()).select_from(EventLog).where(
            EventLog.status == EventStatus.DEAD_LETTERED
        )
        count_result = await session.execute(count_query)
        total = count_result.scalar()

    return {
        "events": [format_event(e) for e in events],
        "total": total,
        "limit": limit,
        "offset": offset,
    }

Event Replay Engine

The replay engine re-processes dead-lettered events through the original handler, creating a clear audit trail.

from fastapi import HTTPException

@app.post("/admin/dlq/{event_id}/replay")
async def replay_single_event(event_id: str):
    async with async_session() as session:
        event = await session.get(EventLog, event_id)
        if not event:
            raise HTTPException(status_code=404, detail="Event not found")
        if event.status != EventStatus.DEAD_LETTERED:
            raise HTTPException(
                status_code=400,
                detail=f"Event status is {event.status}, not dead_lettered",
            )

    payload = json.loads(event.payload)
    handler = get_handler_for_event_type(event.event_type)

    new_event_id = str(uuid.uuid4())
    await process_event_with_logging(
        event_type=event.event_type,
        source=event.source,
        payload=payload,
        handler=handler,
        event_id=new_event_id,
    )

    async with async_session() as session:
        original = await session.get(EventLog, event_id)
        original.status = EventStatus.REPLAYED
        replay = await session.get(EventLog, new_event_id)
        replay.original_event_id = event_id
        await session.commit()

    return {"status": "replayed", "new_event_id": new_event_id}

@app.post("/admin/dlq/replay-batch")
async def replay_batch(
    event_type: str | None = None,
    source: str | None = None,
    max_events: int = Query(default=100, le=1000),
):
    async with async_session() as session:
        query = select(EventLog).where(
            EventLog.status == EventStatus.DEAD_LETTERED
        )
        if event_type:
            query = query.where(EventLog.event_type == event_type)
        if source:
            query = query.where(EventLog.source == source)

        query = query.order_by(EventLog.created_at.asc()).limit(max_events)
        result = await session.execute(query)
        events = result.scalars().all()

    results = {"total": len(events), "succeeded": 0, "failed": 0}
    for event in events:
        try:
            payload = json.loads(event.payload)
            handler = get_handler_for_event_type(event.event_type)
            await handler(payload)

            async with async_session() as session:
                original = await session.get(EventLog, event.id)
                original.status = EventStatus.REPLAYED
                await session.commit()

            results["succeeded"] += 1
        except Exception:
            results["failed"] += 1

    return results

DLQ Analytics Dashboard

Provide visibility into failure patterns so you can identify systemic issues.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

@app.get("/admin/dlq/stats")
async def dlq_stats():
    async with async_session() as session:
        by_type = await session.execute(
            select(
                EventLog.event_type,
                func.count().label("count"),
            )
            .where(EventLog.status == EventStatus.DEAD_LETTERED)
            .group_by(EventLog.event_type)
            .order_by(func.count().desc())
        )

        by_error = await session.execute(
            select(
                EventLog.error_message,
                func.count().label("count"),
            )
            .where(EventLog.status == EventStatus.DEAD_LETTERED)
            .group_by(EventLog.error_message)
            .order_by(func.count().desc())
            .limit(10)
        )

    return {
        "by_event_type": [
            {"event_type": row[0], "count": row[1]}
            for row in by_type.all()
        ],
        "top_errors": [
            {"error": row[0], "count": row[1]}
            for row in by_error.all()
        ],
    }

Seeing that 90% of dead-lettered events share the same error message tells you exactly what to fix. After the fix, a single batch replay recovers all those events.

FAQ

How long should I retain event logs?

Retain completed events for 30-90 days depending on compliance requirements, and dead-lettered events indefinitely until they are resolved. Use partitioned tables or time-based indexes to keep queries fast. Archive old events to cold storage (S3) for long-term auditing.

Should I replay events in order?

Yes, when events have causal dependencies. For example, if event A creates a customer record and event B updates that record, replaying B before A will fail. Process replays in chronological order (ORDER BY created_at ASC) by default, and group by entity ID when strict ordering matters.

How do I handle events where the payload schema has changed since original processing?

Version your event schemas. Store the schema version in the event log alongside the payload. When replaying old events, use a migration function that transforms old payload formats to the current schema before processing. This prevents replay failures due to schema evolution.


#EventReplay #DeadLetterQueue #Reliability #AIAgents #FastAPI #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

AI Engineering

Self-Correcting Agents: How Model-Native Loops Handle Failure in 2026

Self-correction is now a property of the model, not the framework. What that means for production agent reliability, voice/chat fallbacks, and CallSphere.