Skip to content
Learn Agentic AI
Learn Agentic AI14 min read2 views

Task Queues for AI Agents: Celery, RQ, and Dramatiq for Background Agent Jobs

Set up background task queues for AI agent workloads using Celery, RQ, and Dramatiq. Learn worker patterns, retry policies, and result backends for reliable agent job processing.

When asyncio Is Not Enough

asyncio excels at concurrent I/O within a single process. But many AI agent workloads need more: durable job processing that survives server restarts, distributed execution across multiple workers, scheduled recurring jobs, and reliable retry semantics. Task queues provide all of this.

Common AI agent use cases for task queues include: batch document processing, periodic knowledge base updates, long-running multi-step agent workflows that exceed HTTP request timeouts, and fan-out patterns where one user request triggers dozens of agent jobs.

Celery: The Industry Standard

Celery is the most widely deployed Python task queue. It supports multiple brokers (Redis, RabbitMQ), result backends, and has mature tooling for monitoring and administration.

flowchart TD
    START["Task Queues for AI Agents: Celery, RQ, and Dramat…"] --> A
    A["When asyncio Is Not Enough"]
    A --> B
    B["Celery: The Industry Standard"]
    B --> C
    C["RQ: Simplicity First"]
    C --> D
    D["Dramatiq: Modern and Reliable"]
    D --> E
    E["Choosing the Right Queue"]
    E --> F
    F["Retry Policies for LLM Tasks"]
    F --> G
    G["FAQ"]
    G --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
# celery_app.py
from celery import Celery

app = Celery(
    "ai_agents",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,          # Acknowledge after completion
    worker_prefetch_multiplier=1,  # One task at a time per worker
    task_time_limit=300,           # Hard 5-minute timeout
    task_soft_time_limit=270,      # Soft timeout triggers exception
)

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
)
def process_document(self, document_id: str, user_id: str) -> dict:
    """Process a document through an AI pipeline."""
    try:
        # Fetch document
        doc = fetch_document(document_id)

        # Run LLM analysis (synchronous in Celery workers)
        summary = call_llm_sync(f"Summarize: {doc['content']}")
        entities = call_llm_sync(f"Extract entities: {doc['content']}")
        classification = call_llm_sync(
            f"Classify this document: {doc['content']}"
        )

        result = {
            "document_id": document_id,
            "summary": summary,
            "entities": entities,
            "classification": classification,
        }
        save_results(document_id, result)
        return result

    except LLMRateLimitError as exc:
        raise self.retry(exc=exc, countdown=120)
    except LLMTimeoutError as exc:
        raise self.retry(exc=exc, countdown=60)

Key Celery configuration choices for AI workloads:

  • task_acks_late=True ensures a task is re-delivered if the worker crashes mid-execution. Critical for expensive LLM workflows.
  • worker_prefetch_multiplier=1 prevents workers from grabbing tasks they cannot process immediately, important when each task takes seconds.
  • task_soft_time_limit gives the task a chance to clean up before the hard timeout kills it.

RQ: Simplicity First

Redis Queue (RQ) trades Celery's feature richness for simplicity. It is an excellent choice when your requirements are straightforward and you already use Redis.

# tasks.py
import time
from redis import Redis
from rq import Queue
from rq.job import Job

redis_conn = Redis(host="localhost", port=6379)
agent_queue = Queue("agent_jobs", connection=redis_conn)

def run_agent_workflow(query: str, session_id: str) -> dict:
    """Background agent workflow — runs in RQ worker process."""
    # Step 1: Retrieve context
    docs = search_vector_store(query)

    # Step 2: Call LLM
    response = call_llm_sync(
        prompt=query,
        context=docs,
    )

    # Step 3: Store results
    save_conversation(session_id, query, response)
    return {"session_id": session_id, "response": response}

# Enqueue from your web application
job = agent_queue.enqueue(
    run_agent_workflow,
    "How do I configure async pipelines?",
    "session_abc123",
    job_timeout=300,
    retry=3,
    result_ttl=3600,  # Keep results for 1 hour
)

# Check job status later
job = Job.fetch(job.id, connection=redis_conn)
if job.is_finished:
    print(job.result)
elif job.is_failed:
    print(f"Failed: {job.exc_info}")

Start workers with: rq worker agent_jobs --with-scheduler

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Dramatiq: Modern and Reliable

Dramatiq is a newer alternative that emphasizes reliability and performance. It supports both Redis and RabbitMQ as brokers.

# dramatiq_tasks.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends.redis import RedisBackend

result_backend = RedisBackend(url="redis://localhost:6379/2")
broker = RedisBroker(url="redis://localhost:6379/0")
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)

@dramatiq.actor(
    max_retries=3,
    min_backoff=30_000,     # 30 seconds minimum retry delay
    max_backoff=300_000,    # 5 minutes maximum retry delay
    time_limit=300_000,     # 5-minute timeout
    store_results=True,
)
def analyze_conversation(conversation_id: str) -> dict:
    """Analyze a conversation with multiple LLM passes."""
    conversation = load_conversation(conversation_id)
    transcript = conversation["transcript"]

    sentiment = call_llm_sync(f"Analyze sentiment: {transcript}")
    summary = call_llm_sync(f"Summarize: {transcript}")
    action_items = call_llm_sync(f"Extract action items: {transcript}")

    result = {
        "conversation_id": conversation_id,
        "sentiment": sentiment,
        "summary": summary,
        "action_items": action_items,
    }
    store_analysis(conversation_id, result)
    return result

# Send the task
message = analyze_conversation.send("conv_12345")

# Retrieve results later
result = message.get_result(block=True, timeout=60_000)

Choosing the Right Queue

Feature Celery RQ Dramatiq
Broker support Redis, RabbitMQ, SQS Redis only Redis, RabbitMQ
Complexity High Low Medium
Async worker support Yes (with gevent) No No (uses threads)
Monitoring Flower, Celery Events rq-dashboard Built-in CLI
Best for Complex workflows Simple background jobs Reliable processing

For AI agent workloads, Celery is the safest choice for complex multi-step workflows. RQ works well for simple fire-and-forget LLM jobs. Dramatiq is the best balance of simplicity and reliability.

Retry Policies for LLM Tasks

LLM API calls fail in predictable ways. Configure retries accordingly.

# Celery retry configuration per failure type
@app.task(bind=True, max_retries=5)
def smart_retry_task(self, prompt: str):
    try:
        return call_llm_sync(prompt)
    except RateLimitError as exc:
        # Back off aggressively for rate limits
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
    except TimeoutError as exc:
        # Moderate backoff for timeouts
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
    except ServerError as exc:
        # Quick retry for transient server errors
        raise self.retry(exc=exc, countdown=5)
    except AuthenticationError:
        # Never retry auth failures
        raise

FAQ

Can I use asyncio inside Celery workers?

Yes, but it requires careful setup. You can run asyncio.run() inside a task function, or use Celery's gevent or eventlet pool. However, mixing Celery's process model with asyncio adds complexity. For purely I/O-bound workloads, consider using an async-native queue like arq (which runs on asyncio natively) or keeping asyncio and Celery separate — use Celery for job dispatch and asyncio within each job.

How do I handle long-running agent workflows that take 10+ minutes?

Set task_time_limit high enough to accommodate the workflow. Use task_soft_time_limit to detect approaching timeouts and checkpoint progress. For very long workflows, break them into multiple chained tasks using Celery chains or chords, so each step is individually retryable.

What result backend should I use?

Redis is the simplest and works well for results you query within minutes. For durable results, use PostgreSQL or MongoDB. Set result_expires to auto-clean old results. For AI workloads, store the actual analysis results in your application database and use the task queue result backend only for job status tracking.


#Python #TaskQueues #Celery #BackgroundJobs #AIAgents #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Use Cases

Automating Client Document Collection: How AI Agents Chase Missing Tax Documents and Reduce Filing Delays

See how AI agents automate tax document collection — chasing missing W-2s, 1099s, and receipts via calls and texts to eliminate the #1 CPA bottleneck.

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

API Design for AI Agent Tool Functions: Best Practices and Anti-Patterns

How to design tool functions that LLMs can use effectively with clear naming, enum parameters, structured responses, informative error messages, and documentation.

Learn Agentic AI

AI Agents for IT Helpdesk: L1 Automation, Ticket Routing, and Knowledge Base Integration

Build IT helpdesk AI agents with multi-agent architecture for triage, device, network, and security issues. RAG-powered knowledge base, automated ticket creation, routing, and escalation.

Learn Agentic AI

Computer Use in GPT-5.4: Building AI Agents That Navigate Desktop Applications

Technical guide to GPT-5.4's computer use capabilities for building AI agents that interact with desktop UIs, browser automation, and real-world application workflows.

Learn Agentic AI

Prompt Engineering for AI Agents: System Prompts, Tool Descriptions, and Few-Shot Patterns

Agent-specific prompt engineering techniques: crafting effective system prompts, writing clear tool descriptions for function calling, and few-shot examples that improve complex task performance.