Skip to content
Dead Letter Queues for Failed Agent Tasks: Capturing and Reprocessing Failures
Learn Agentic AI10 min read11 views

Dead Letter Queues for Failed Agent Tasks: Capturing and Reprocessing Failures

Design dead letter queue systems for AI agent workflows that capture failed tasks with full context, enable automatic reprocessing, and support manual review for permanently failed operations.

Why Failed Agent Tasks Need a Second Chance

In any AI agent system processing significant volume, some tasks will fail. An LLM returns an unusable response, a tool throws an unexpected error, or a downstream service is temporarily unavailable. Without a structured place for failed tasks, those requests are simply lost — the user gets an error, and the failure vanishes into log files that nobody reads.

A dead letter queue (DLQ) captures failed tasks along with their full execution context, enabling automatic retries for transient failures and human review for permanent ones.

Designing the DLQ Data Model

A good DLQ entry captures everything needed to understand, reproduce, and retry the failure.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Optional
import uuid
import json

class FailureStatus(Enum):
    PENDING_RETRY = "pending_retry"
    RETRYING = "retrying"
    PENDING_REVIEW = "pending_review"
    RESOLVED = "resolved"
    DISCARDED = "discarded"

@dataclass
class DLQEntry:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    task_id: str = ""
    task_type: str = ""
    payload: dict = field(default_factory=dict)
    error_message: str = ""
    error_type: str = ""
    stack_trace: str = ""
    attempt_count: int = 0
    max_retries: int = 3
    status: FailureStatus = FailureStatus.PENDING_RETRY
    created_at: datetime = field(default_factory=datetime.utcnow)
    last_attempted_at: Optional[datetime] = None
    resolved_at: Optional[datetime] = None
    context: dict = field(default_factory=dict)

    def should_retry(self) -> bool:
        return (
            self.status == FailureStatus.PENDING_RETRY
            and self.attempt_count < self.max_retries
        )

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "task_id": self.task_id,
            "task_type": self.task_type,
            "payload": self.payload,
            "error_message": self.error_message,
            "error_type": self.error_type,
            "attempt_count": self.attempt_count,
            "max_retries": self.max_retries,
            "status": self.status.value,
            "created_at": self.created_at.isoformat(),
        }

Building the Dead Letter Queue

The DLQ provides methods for adding failed tasks, retrieving tasks for retry, and marking tasks as resolved or escalated.

import asyncio
from collections import defaultdict

class DeadLetterQueue:
    def __init__(self):
        self.entries: dict[str, DLQEntry] = {}
        self.by_status: dict[FailureStatus, list[str]] = defaultdict(list)
        self._lock = asyncio.Lock()

    async def add(self, entry: DLQEntry):
        async with self._lock:
            self.entries[entry.id] = entry
            self.by_status[entry.status].append(entry.id)

    async def get_retryable(self, batch_size: int = 10) -> list[DLQEntry]:
        async with self._lock:
            retryable = []
            pending_ids = self.by_status.get(FailureStatus.PENDING_RETRY, [])
            for entry_id in list(pending_ids):
                entry = self.entries[entry_id]
                if entry.should_retry():
                    entry.status = FailureStatus.RETRYING
                    retryable.append(entry)
                    if len(retryable) >= batch_size:
                        break
            return retryable

    async def mark_resolved(self, entry_id: str):
        async with self._lock:
            entry = self.entries.get(entry_id)
            if entry:
                entry.status = FailureStatus.RESOLVED
                entry.resolved_at = datetime.utcnow()

    async def escalate_to_review(self, entry_id: str):
        async with self._lock:
            entry = self.entries.get(entry_id)
            if entry:
                entry.status = FailureStatus.PENDING_REVIEW

    async def get_stats(self) -> dict:
        counts = defaultdict(int)
        for entry in self.entries.values():
            counts[entry.status.value] += 1
        return dict(counts)

Integrating DLQ with the Agent Pipeline

Wrap your agent execution in a handler that catches failures and routes them to the DLQ.

import traceback

class ResilientAgentRunner:
    def __init__(self, agent, dlq: DeadLetterQueue):
        self.agent = agent
        self.dlq = dlq

    async def execute(self, task_id: str, task_type: str, payload: dict) -> dict:
        try:
            result = await self.agent.run(payload)
            return {"status": "success", "result": result}
        except Exception as exc:
            entry = DLQEntry(
                task_id=task_id,
                task_type=task_type,
                payload=payload,
                error_message=str(exc),
                error_type=type(exc).__name__,
                stack_trace=traceback.format_exc(),
                attempt_count=1,
                context={
                    "agent_type": type(self.agent).__name__,
                },
            )
            await self.dlq.add(entry)
            return {"status": "failed", "dlq_entry_id": entry.id}

Automatic Retry Worker

A background worker periodically pulls retryable entries from the DLQ and re-executes them.

class DLQRetryWorker:
    def __init__(self, dlq: DeadLetterQueue, agent_runner: ResilientAgentRunner):
        self.dlq = dlq
        self.runner = agent_runner
        self.running = False

    async def start(self, interval_seconds: float = 30.0):
        self.running = True
        while self.running:
            entries = await self.dlq.get_retryable(batch_size=5)
            for entry in entries:
                entry.attempt_count += 1
                entry.last_attempted_at = datetime.utcnow()
                try:
                    await self.runner.agent.run(entry.payload)
                    await self.dlq.mark_resolved(entry.id)
                except Exception:
                    if entry.attempt_count >= entry.max_retries:
                        await self.dlq.escalate_to_review(entry.id)
                    else:
                        entry.status = FailureStatus.PENDING_RETRY

            await asyncio.sleep(interval_seconds)

    def stop(self):
        self.running = False

Persistence with PostgreSQL

For production, back the DLQ with a database table rather than in-memory storage. A simple SQL schema captures the essential fields and supports efficient querying.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

CREATE TABLE agent_dlq (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    task_id TEXT NOT NULL,
    task_type TEXT NOT NULL,
    payload JSONB NOT NULL,
    error_message TEXT,
    error_type TEXT,
    stack_trace TEXT,
    attempt_count INT DEFAULT 0,
    max_retries INT DEFAULT 3,
    status TEXT DEFAULT 'pending_retry',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    last_attempted_at TIMESTAMPTZ,
    resolved_at TIMESTAMPTZ
);

CREATE INDEX idx_dlq_status ON agent_dlq(status);
CREATE INDEX idx_dlq_created ON agent_dlq(created_at);

FAQ

When should a failed task go directly to manual review instead of retrying?

Tasks that fail due to invalid input, business logic violations, or permission errors should skip retries entirely and go straight to manual review. These are deterministic failures — retrying will produce the same error. Only transient failures like network timeouts, rate limits, and temporary service unavailability benefit from retries.

How long should DLQ entries be retained?

Retain pending and in-review entries indefinitely until resolved. For resolved entries, keep them for 30 to 90 days for audit and analysis purposes, then archive or delete. The historical data is valuable for identifying recurring failure patterns and improving the agent.

How do I prevent the DLQ from growing unbounded during a major outage?

Implement a circuit breaker that stops accepting new DLQ entries when the queue exceeds a threshold (e.g., 10,000 pending items). At that point, return errors directly to callers and alert the operations team. Also set a maximum age for unresolved entries — automatically discard entries older than a configurable threshold and log the discard for review.


#DeadLetterQueue #TaskProcessing #FailureRecovery #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.