---
title: "Background Tasks in FastAPI for AI Agents: Async Processing and Task Queues"
description: "Implement background processing for AI agent workloads using FastAPI BackgroundTasks, Celery integration, and custom task queues. Learn task status tracking, webhook notifications, and long-running agent job management."
canonical: https://callsphere.ai/blog/background-tasks-fastapi-ai-agents-async-processing-queues
category: "Learn Agentic AI"
tags: ["FastAPI", "Background Tasks", "Celery", "AI Agents", "Async Processing"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-07T08:03:59.004Z
---

# Background Tasks in FastAPI for AI Agents: Async Processing and Task Queues

> Implement background processing for AI agent workloads using FastAPI BackgroundTasks, Celery integration, and custom task queues. Learn task status tracking, webhook notifications, and long-running agent job management.

## Why Background Tasks for AI Agents

Not every AI agent interaction fits a synchronous request-response cycle. Research agents that scrape and summarize dozens of pages, batch processing of documents through an LLM, training custom embeddings, or generating lengthy reports can take minutes. Forcing users to hold an HTTP connection open for that long is unreliable and frustrating.

Background tasks let you accept the request immediately, return a task ID, and process the work asynchronously. The client polls for status or receives a webhook notification when the work completes. This pattern is essential for production AI agent systems.

## FastAPI Built-in BackgroundTasks

For lightweight tasks that complete in seconds, FastAPI's built-in `BackgroundTasks` is the simplest option:

```mermaid
flowchart LR
    CLIENT(["Client SDK"])
    GW["API Gateway
auth plus rate limit"]
    APP["FastAPI app
handlers and DI"]
    VAL["Pydantic validation"]
    SVC["Service layer
business logic"]
    DB[(Database)]
    QUEUE[(Background queue)]
    OBS[(Tracing)]
    CLIENT --> GW --> APP --> VAL --> SVC
    SVC --> DB
    SVC --> QUEUE
    SVC --> OBS
    SVC --> CLIENT
    style GW fill:#4f46e5,stroke:#4338ca,color:#fff
    style APP fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
```

```python
from fastapi import BackgroundTasks

async def log_agent_interaction(
    session_id: str,
    user_message: str,
    agent_response: str,
    latency_ms: float,
):
    """Save interaction to analytics database."""
    async with get_db_session() as db:
        log = InteractionLog(
            session_id=session_id,
            user_message=user_message,
            agent_response=agent_response,
            latency_ms=latency_ms,
            created_at=datetime.utcnow(),
        )
        db.add(log)
        await db.commit()

@router.post("/chat")
async def chat(
    request: ChatRequest,
    background_tasks: BackgroundTasks,
    llm_service: LLMService = Depends(get_llm_service),
):
    start = time.monotonic()
    response = await llm_service.generate(request.messages)
    latency = (time.monotonic() - start) * 1000

    # Log asynchronously - response returns immediately
    background_tasks.add_task(
        log_agent_interaction,
        session_id=request.session_id,
        user_message=request.messages[-1].content,
        agent_response=response,
        latency_ms=latency,
    )

    return {"response": response}
```

The response is sent to the client immediately. The logging happens afterward in the background. However, `BackgroundTasks` runs in the same process, so if the server restarts, pending tasks are lost.

## Task Status Tracking with In-Memory Store

For tasks that take longer, implement a status tracking system:

```python
import uuid
from enum import Enum

class TaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class TaskInfo(BaseModel):
    task_id: str
    status: TaskStatus
    result: Optional[dict] = None
    error: Optional[str] = None
    created_at: datetime
    completed_at: Optional[datetime] = None

# In production, use Redis instead
task_store: dict[str, TaskInfo] = {}

async def run_research_task(task_id: str, query: str):
    task_store[task_id].status = TaskStatus.RUNNING
    try:
        result = await research_agent.deep_research(query)
        task_store[task_id].status = TaskStatus.COMPLETED
        task_store[task_id].result = result
        task_store[task_id].completed_at = datetime.utcnow()
    except Exception as e:
        task_store[task_id].status = TaskStatus.FAILED
        task_store[task_id].error = str(e)

@router.post("/research", status_code=202)
async def start_research(
    request: ResearchRequest,
    background_tasks: BackgroundTasks,
):
    task_id = str(uuid.uuid4())
    task_store[task_id] = TaskInfo(
        task_id=task_id,
        status=TaskStatus.PENDING,
        created_at=datetime.utcnow(),
    )
    background_tasks.add_task(
        run_research_task, task_id, request.query
    )
    return {"task_id": task_id, "status": "pending"}

@router.get("/research/{task_id}")
async def get_research_status(task_id: str):
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(404, "Task not found")
    return task
```

The endpoint returns HTTP 202 Accepted with a task ID. The client polls `GET /research/{task_id}` to check progress.

## Celery for Distributed Task Queues

For production workloads, use Celery with Redis as the broker. This gives you persistent task queues, automatic retries, worker scaling, and task monitoring:

```python
from celery import Celery

celery_app = Celery(
    "agent_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_track_started=True,
    task_time_limit=600,  # 10 minute hard limit
    task_soft_time_limit=540,  # 9 minute soft limit
)

@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
)
def process_document_batch(self, document_ids: list[str]):
    try:
        results = []
        for i, doc_id in enumerate(document_ids):
            result = analyze_document_sync(doc_id)
            results.append(result)
            # Update progress
            self.update_state(
                state="PROGRESS",
                meta={"current": i + 1, "total": len(document_ids)},
            )
        return {"results": results, "count": len(results)}
    except ExternalServiceError as e:
        raise self.retry(exc=e)
```

Integrate Celery tasks into your FastAPI endpoints:

```python
@router.post("/batch-analyze", status_code=202)
async def batch_analyze(request: BatchAnalyzeRequest):
    task = process_document_batch.delay(request.document_ids)
    return {"task_id": task.id, "status": "queued"}

@router.get("/batch-analyze/{task_id}")
async def get_batch_status(task_id: str):
    result = celery_app.AsyncResult(task_id)
    response = {"task_id": task_id, "status": result.status}

    if result.status == "PROGRESS":
        response["progress"] = result.info
    elif result.status == "SUCCESS":
        response["result"] = result.result
    elif result.status == "FAILURE":
        response["error"] = str(result.result)

    return response
```

## Webhook Notifications

Instead of polling, let clients register a webhook URL to receive notifications when tasks complete:

```python
import httpx

async def notify_webhook(
    webhook_url: str, task_id: str, result: dict
):
    async with httpx.AsyncClient() as client:
        await client.post(
            webhook_url,
            json={
                "task_id": task_id,
                "status": "completed",
                "result": result,
            },
            timeout=10.0,
        )

@router.post("/research", status_code=202)
async def start_research(
    request: ResearchRequest,
    background_tasks: BackgroundTasks,
):
    task_id = str(uuid.uuid4())
    background_tasks.add_task(
        run_and_notify,
        task_id,
        request.query,
        request.webhook_url,
    )
    return {"task_id": task_id}
```

## FAQ

### When should I use BackgroundTasks versus Celery?

Use FastAPI `BackgroundTasks` for fire-and-forget operations that take under 30 seconds and where losing a task on server restart is acceptable, like logging, sending notifications, or cache warming. Use Celery for anything that takes longer, needs retries, requires progress tracking, or must survive server restarts. If you are processing user-submitted documents through an LLM, that is a Celery task. If you are logging an API call, that is a BackgroundTask.

### How do I prevent duplicate task submissions?

Use an idempotency key. Have clients send a unique key with each request. Before creating a new task, check if a task with that key already exists in your store. If it does, return the existing task ID instead of creating a duplicate. Store the mapping from idempotency key to task ID in Redis with a TTL matching your task retention period.

### Can background tasks access FastAPI dependencies?

FastAPI `BackgroundTasks` functions do not have access to the dependency injection system. Dependencies like database sessions from `Depends(get_db)` are closed before the background task runs. You must create new database sessions and clients inside the background task function itself, or pass the necessary data as plain arguments rather than injected dependencies.

---

#FastAPI #BackgroundTasks #Celery #AIAgents #AsyncProcessing #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/background-tasks-fastapi-ai-agents-async-processing-queues
