Skip to content
Task Queues for AI Agents: Celery, RQ, and Dramatiq for Background Agent Jobs
Learn Agentic AI14 min read24 views

Task Queues for AI Agents: Celery, RQ, and Dramatiq for Background Agent Jobs

Set up background task queues for AI agent workloads using Celery, RQ, and Dramatiq. Learn worker patterns, retry policies, and result backends for reliable agent job processing.

When asyncio Is Not Enough

asyncio excels at concurrent I/O within a single process. But many AI agent workloads need more: durable job processing that survives server restarts, distributed execution across multiple workers, scheduled recurring jobs, and reliable retry semantics. Task queues provide all of this.

Common AI agent use cases for task queues include: batch document processing, periodic knowledge base updates, long-running multi-step agent workflows that exceed HTTP request timeouts, and fan-out patterns where one user request triggers dozens of agent jobs.

Celery: The Industry Standard

Celery is the most widely deployed Python task queue. It supports multiple brokers (Redis, RabbitMQ), result backends, and has mature tooling for monitoring and administration.

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
# celery_app.py
from celery import Celery

app = Celery(
    "ai_agents",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,          # Acknowledge after completion
    worker_prefetch_multiplier=1,  # One task at a time per worker
    task_time_limit=300,           # Hard 5-minute timeout
    task_soft_time_limit=270,      # Soft timeout triggers exception
)

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
)
def process_document(self, document_id: str, user_id: str) -> dict:
    """Process a document through an AI pipeline."""
    try:
        # Fetch document
        doc = fetch_document(document_id)

        # Run LLM analysis (synchronous in Celery workers)
        summary = call_llm_sync(f"Summarize: {doc['content']}")
        entities = call_llm_sync(f"Extract entities: {doc['content']}")
        classification = call_llm_sync(
            f"Classify this document: {doc['content']}"
        )

        result = {
            "document_id": document_id,
            "summary": summary,
            "entities": entities,
            "classification": classification,
        }
        save_results(document_id, result)
        return result

    except LLMRateLimitError as exc:
        raise self.retry(exc=exc, countdown=120)
    except LLMTimeoutError as exc:
        raise self.retry(exc=exc, countdown=60)

Key Celery configuration choices for AI workloads:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
  • task_acks_late=True ensures a task is re-delivered if the worker crashes mid-execution. Critical for expensive LLM workflows.
  • worker_prefetch_multiplier=1 prevents workers from grabbing tasks they cannot process immediately, important when each task takes seconds.
  • task_soft_time_limit gives the task a chance to clean up before the hard timeout kills it.

RQ: Simplicity First

Redis Queue (RQ) trades Celery's feature richness for simplicity. It is an excellent choice when your requirements are straightforward and you already use Redis.

# tasks.py
import time
from redis import Redis
from rq import Queue
from rq.job import Job

redis_conn = Redis(host="localhost", port=6379)
agent_queue = Queue("agent_jobs", connection=redis_conn)

def run_agent_workflow(query: str, session_id: str) -> dict:
    """Background agent workflow — runs in RQ worker process."""
    # Step 1: Retrieve context
    docs = search_vector_store(query)

    # Step 2: Call LLM
    response = call_llm_sync(
        prompt=query,
        context=docs,
    )

    # Step 3: Store results
    save_conversation(session_id, query, response)
    return {"session_id": session_id, "response": response}

# Enqueue from your web application
job = agent_queue.enqueue(
    run_agent_workflow,
    "How do I configure async pipelines?",
    "session_abc123",
    job_timeout=300,
    retry=3,
    result_ttl=3600,  # Keep results for 1 hour
)

# Check job status later
job = Job.fetch(job.id, connection=redis_conn)
if job.is_finished:
    print(job.result)
elif job.is_failed:
    print(f"Failed: {job.exc_info}")

Start workers with: rq worker agent_jobs --with-scheduler

Dramatiq: Modern and Reliable

Dramatiq is a newer alternative that emphasizes reliability and performance. It supports both Redis and RabbitMQ as brokers.

# dramatiq_tasks.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends.redis import RedisBackend

result_backend = RedisBackend(url="redis://localhost:6379/2")
broker = RedisBroker(url="redis://localhost:6379/0")
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)

@dramatiq.actor(
    max_retries=3,
    min_backoff=30_000,     # 30 seconds minimum retry delay
    max_backoff=300_000,    # 5 minutes maximum retry delay
    time_limit=300_000,     # 5-minute timeout
    store_results=True,
)
def analyze_conversation(conversation_id: str) -> dict:
    """Analyze a conversation with multiple LLM passes."""
    conversation = load_conversation(conversation_id)
    transcript = conversation["transcript"]

    sentiment = call_llm_sync(f"Analyze sentiment: {transcript}")
    summary = call_llm_sync(f"Summarize: {transcript}")
    action_items = call_llm_sync(f"Extract action items: {transcript}")

    result = {
        "conversation_id": conversation_id,
        "sentiment": sentiment,
        "summary": summary,
        "action_items": action_items,
    }
    store_analysis(conversation_id, result)
    return result

# Send the task
message = analyze_conversation.send("conv_12345")

# Retrieve results later
result = message.get_result(block=True, timeout=60_000)

Choosing the Right Queue

Feature Celery RQ Dramatiq
Broker support Redis, RabbitMQ, SQS Redis only Redis, RabbitMQ
Complexity High Low Medium
Async worker support Yes (with gevent) No No (uses threads)
Monitoring Flower, Celery Events rq-dashboard Built-in CLI
Best for Complex workflows Simple background jobs Reliable processing

For AI agent workloads, Celery is the safest choice for complex multi-step workflows. RQ works well for simple fire-and-forget LLM jobs. Dramatiq is the best balance of simplicity and reliability.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Retry Policies for LLM Tasks

LLM API calls fail in predictable ways. Configure retries accordingly.

# Celery retry configuration per failure type
@app.task(bind=True, max_retries=5)
def smart_retry_task(self, prompt: str):
    try:
        return call_llm_sync(prompt)
    except RateLimitError as exc:
        # Back off aggressively for rate limits
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
    except TimeoutError as exc:
        # Moderate backoff for timeouts
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
    except ServerError as exc:
        # Quick retry for transient server errors
        raise self.retry(exc=exc, countdown=5)
    except AuthenticationError:
        # Never retry auth failures
        raise

FAQ

Can I use asyncio inside Celery workers?

Yes, but it requires careful setup. You can run asyncio.run() inside a task function, or use Celery's gevent or eventlet pool. However, mixing Celery's process model with asyncio adds complexity. For purely I/O-bound workloads, consider using an async-native queue like arq (which runs on asyncio natively) or keeping asyncio and Celery separate — use Celery for job dispatch and asyncio within each job.

How do I handle long-running agent workflows that take 10+ minutes?

Set task_time_limit high enough to accommodate the workflow. Use task_soft_time_limit to detect approaching timeouts and checkpoint progress. For very long workflows, break them into multiple chained tasks using Celery chains or chords, so each step is individually retryable.

What result backend should I use?

Redis is the simplest and works well for results you query within minutes. For durable results, use PostgreSQL or MongoDB. Set result_expires to auto-clean old results. For AI workloads, store the actual analysis results in your application database and use the task queue result backend only for job status tracking.


#Python #TaskQueues #Celery #BackgroundJobs #AIAgents #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.