---
title: "Task Queues for AI Agents: Celery, RQ, and Dramatiq for Background Agent Jobs"
description: "Set up background task queues for AI agent workloads using Celery, RQ, and Dramatiq. Learn worker patterns, retry policies, and result backends for reliable agent job processing."
canonical: https://callsphere.ai/blog/task-queues-ai-agents-celery-rq-dramatiq-background-jobs
category: "Learn Agentic AI"
tags: ["Python", "Task Queues", "Celery", "Background Jobs", "AI Agents"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-06-05T08:01:19.739Z
---

# Task Queues for AI Agents: Celery, RQ, and Dramatiq for Background Agent Jobs

> Set up background task queues for AI agent workloads using Celery, RQ, and Dramatiq. Learn worker patterns, retry policies, and result backends for reliable agent job processing.

## When asyncio Is Not Enough

asyncio excels at concurrent I/O within a single process. But many AI agent workloads need more: durable job processing that survives server restarts, distributed execution across multiple workers, scheduled recurring jobs, and reliable retry semantics. Task queues provide all of this.

Common AI agent use cases for task queues include: batch document processing, periodic knowledge base updates, long-running multi-step agent workflows that exceed HTTP request timeouts, and fan-out patterns where one user request triggers dozens of agent jobs.

## Celery: The Industry Standard

Celery is the most widely deployed Python task queue. It supports multiple brokers (Redis, RabbitMQ), result backends, and has mature tooling for monitoring and administration.

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
# celery_app.py
from celery import Celery

app = Celery(
    "ai_agents",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,          # Acknowledge after completion
    worker_prefetch_multiplier=1,  # One task at a time per worker
    task_time_limit=300,           # Hard 5-minute timeout
    task_soft_time_limit=270,      # Soft timeout triggers exception
)

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
)
def process_document(self, document_id: str, user_id: str) -> dict:
    """Process a document through an AI pipeline."""
    try:
        # Fetch document
        doc = fetch_document(document_id)

        # Run LLM analysis (synchronous in Celery workers)
        summary = call_llm_sync(f"Summarize: {doc['content']}")
        entities = call_llm_sync(f"Extract entities: {doc['content']}")
        classification = call_llm_sync(
            f"Classify this document: {doc['content']}"
        )

        result = {
            "document_id": document_id,
            "summary": summary,
            "entities": entities,
            "classification": classification,
        }
        save_results(document_id, result)
        return result

    except LLMRateLimitError as exc:
        raise self.retry(exc=exc, countdown=120)
    except LLMTimeoutError as exc:
        raise self.retry(exc=exc, countdown=60)
```

Key Celery configuration choices for AI workloads:

- **task_acks_late=True** ensures a task is re-delivered if the worker crashes mid-execution. Critical for expensive LLM workflows.
- **worker_prefetch_multiplier=1** prevents workers from grabbing tasks they cannot process immediately, important when each task takes seconds.
- **task_soft_time_limit** gives the task a chance to clean up before the hard timeout kills it.

## RQ: Simplicity First

Redis Queue (RQ) trades Celery's feature richness for simplicity. It is an excellent choice when your requirements are straightforward and you already use Redis.

```python
# tasks.py
import time
from redis import Redis
from rq import Queue
from rq.job import Job

redis_conn = Redis(host="localhost", port=6379)
agent_queue = Queue("agent_jobs", connection=redis_conn)

def run_agent_workflow(query: str, session_id: str) -> dict:
    """Background agent workflow — runs in RQ worker process."""
    # Step 1: Retrieve context
    docs = search_vector_store(query)

    # Step 2: Call LLM
    response = call_llm_sync(
        prompt=query,
        context=docs,
    )

    # Step 3: Store results
    save_conversation(session_id, query, response)
    return {"session_id": session_id, "response": response}

# Enqueue from your web application
job = agent_queue.enqueue(
    run_agent_workflow,
    "How do I configure async pipelines?",
    "session_abc123",
    job_timeout=300,
    retry=3,
    result_ttl=3600,  # Keep results for 1 hour
)

# Check job status later
job = Job.fetch(job.id, connection=redis_conn)
if job.is_finished:
    print(job.result)
elif job.is_failed:
    print(f"Failed: {job.exc_info}")
```

Start workers with: `rq worker agent_jobs --with-scheduler`

## Dramatiq: Modern and Reliable

Dramatiq is a newer alternative that emphasizes reliability and performance. It supports both Redis and RabbitMQ as brokers.

```python
# dramatiq_tasks.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends.redis import RedisBackend

result_backend = RedisBackend(url="redis://localhost:6379/2")
broker = RedisBroker(url="redis://localhost:6379/0")
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)

@dramatiq.actor(
    max_retries=3,
    min_backoff=30_000,     # 30 seconds minimum retry delay
    max_backoff=300_000,    # 5 minutes maximum retry delay
    time_limit=300_000,     # 5-minute timeout
    store_results=True,
)
def analyze_conversation(conversation_id: str) -> dict:
    """Analyze a conversation with multiple LLM passes."""
    conversation = load_conversation(conversation_id)
    transcript = conversation["transcript"]

    sentiment = call_llm_sync(f"Analyze sentiment: {transcript}")
    summary = call_llm_sync(f"Summarize: {transcript}")
    action_items = call_llm_sync(f"Extract action items: {transcript}")

    result = {
        "conversation_id": conversation_id,
        "sentiment": sentiment,
        "summary": summary,
        "action_items": action_items,
    }
    store_analysis(conversation_id, result)
    return result

# Send the task
message = analyze_conversation.send("conv_12345")

# Retrieve results later
result = message.get_result(block=True, timeout=60_000)
```

## Choosing the Right Queue

| Feature | Celery | RQ | Dramatiq |
| --- | --- | --- | --- |
| Broker support | Redis, RabbitMQ, SQS | Redis only | Redis, RabbitMQ |
| Complexity | High | Low | Medium |
| Async worker support | Yes (with gevent) | No | No (uses threads) |
| Monitoring | Flower, Celery Events | rq-dashboard | Built-in CLI |
| Best for | Complex workflows | Simple background jobs | Reliable processing |

For AI agent workloads, Celery is the safest choice for complex multi-step workflows. RQ works well for simple fire-and-forget LLM jobs. Dramatiq is the best balance of simplicity and reliability.

## Retry Policies for LLM Tasks

LLM API calls fail in predictable ways. Configure retries accordingly.

```python
# Celery retry configuration per failure type
@app.task(bind=True, max_retries=5)
def smart_retry_task(self, prompt: str):
    try:
        return call_llm_sync(prompt)
    except RateLimitError as exc:
        # Back off aggressively for rate limits
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
    except TimeoutError as exc:
        # Moderate backoff for timeouts
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
    except ServerError as exc:
        # Quick retry for transient server errors
        raise self.retry(exc=exc, countdown=5)
    except AuthenticationError:
        # Never retry auth failures
        raise
```

## FAQ

### Can I use asyncio inside Celery workers?

Yes, but it requires careful setup. You can run `asyncio.run()` inside a task function, or use Celery's gevent or eventlet pool. However, mixing Celery's process model with asyncio adds complexity. For purely I/O-bound workloads, consider using an async-native queue like arq (which runs on asyncio natively) or keeping asyncio and Celery separate — use Celery for job dispatch and asyncio within each job.

### How do I handle long-running agent workflows that take 10+ minutes?

Set `task_time_limit` high enough to accommodate the workflow. Use `task_soft_time_limit` to detect approaching timeouts and checkpoint progress. For very long workflows, break them into multiple chained tasks using Celery chains or chords, so each step is individually retryable.

### What result backend should I use?

Redis is the simplest and works well for results you query within minutes. For durable results, use PostgreSQL or MongoDB. Set `result_expires` to auto-clean old results. For AI workloads, store the actual analysis results in your application database and use the task queue result backend only for job status tracking.

---

#Python #TaskQueues #Celery #BackgroundJobs #AIAgents #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/task-queues-ai-agents-celery-rq-dramatiq-background-jobs
