---
title: "Long-Running Agent Tasks: Durable Execution with Temporal and Celery"
description: "Discover how to build resilient long-running agent workflows using durable execution engines like Temporal and Celery, with activity retries, saga patterns, and persistent state across process restarts."
canonical: https://callsphere.ai/blog/long-running-agent-tasks-durable-execution-temporal-celery
category: "Learn Agentic AI"
tags: ["Temporal", "Celery", "Durable Execution", "Workflow Engines", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-08T02:42:38.763Z
---

# Long-Running Agent Tasks: Durable Execution with Temporal and Celery

> Discover how to build resilient long-running agent workflows using durable execution engines like Temporal and Celery, with activity retries, saga patterns, and persistent state across process restarts.

## The Problem with Ephemeral Agent Runs

Most agent frameworks treat each invocation as a short-lived function call. The agent receives a prompt, calls some tools, and returns a result — all within a single process lifetime. But real-world agent tasks often take minutes, hours, or even days. A due diligence agent might need to collect data from 50 sources over several hours. A monitoring agent runs indefinitely.

When these long-running tasks crash — and they will — you lose all progress. The agent has no memory of which steps completed, what intermediate results were produced, or where it left off. This is where **durable execution** comes in.

## Durable Execution: The Core Idea

Durable execution means that workflow state survives process failures. If the worker crashes after completing step 3 of 10, it resumes at step 4 when restarted — not step 1. Two popular approaches in the Python ecosystem are **Temporal** and **Celery**.

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

## Building Agent Workflows with Temporal

Temporal separates workflows (orchestration logic) from activities (actual work). The workflow is deterministic and replayed on failure. Activities are the non-deterministic side-effecting operations.

```python
from temporalio import workflow, activity
from datetime import timedelta
import asyncio

@activity.defn
async def fetch_source_data(source_url: str) -> dict:
    """Activity: fetch data from a single source."""
    # This runs in a worker and can be retried independently
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(source_url, timeout=30)
        return response.json()

@activity.defn
async def analyze_with_llm(data: dict) -> str:
    """Activity: send collected data to an LLM for analysis."""
    from openai import AsyncOpenAI
    client = AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Analyze the following data."},
            {"role": "user", "content": str(data)},
        ],
    )
    return response.choices[0].message.content

@workflow.defn
class ResearchWorkflow:
    """Durable workflow that survives crashes."""

    @workflow.run
    async def run(self, sources: list[str]) -> str:
        # Each activity call is persisted to Temporal history
        collected = []
        for source in sources:
            data = await workflow.execute_activity(
                fetch_source_data,
                source,
                start_to_close_timeout=timedelta(minutes=2),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
            collected.append(data)

        # If the worker crashes here, it resumes AFTER the loop
        analysis = await workflow.execute_activity(
            analyze_with_llm,
            {"sources": collected},
            start_to_close_timeout=timedelta(minutes=5),
        )
        return analysis
```

If the worker crashes after fetching 8 of 10 sources, Temporal replays the workflow history. It skips the 8 completed activities (their results are stored) and resumes fetching source 9.

## Saga Pattern for Multi-Step Compensation

When a long workflow fails partway through, you often need to undo earlier steps. The saga pattern pairs each action with a compensation:

```python
from dataclasses import dataclass
from typing import Callable, Awaitable

@dataclass
class SagaStep:
    action: Callable[..., Awaitable]
    compensation: Callable[..., Awaitable]
    name: str

class SagaOrchestrator:
    def __init__(self):
        self.completed_steps: list[SagaStep] = []

    async def execute(self, steps: list[SagaStep], context: dict):
        for step in steps:
            try:
                await step.action(context)
                self.completed_steps.append(step)
            except Exception as e:
                print(f"Step '{step.name}' failed: {e}")
                await self.compensate()
                raise

    async def compensate(self):
        """Roll back completed steps in reverse order."""
        for step in reversed(self.completed_steps):
            try:
                await step.compensation({})
            except Exception as comp_error:
                print(f"Compensation for '{step.name}' failed: {comp_error}")
```

## Celery for Simpler Durability

If Temporal feels heavyweight, Celery provides task queuing with retries and result persistence:

```python
from celery import Celery, chain

app = Celery("agent_tasks", broker="redis://localhost:6379/0")
app.conf.result_backend = "redis://localhost:6379/1"

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_data(self, source_url: str):
    try:
        import httpx
        response = httpx.get(source_url, timeout=30)
        return response.json()
    except Exception as exc:
        self.retry(exc=exc)

@app.task
def analyze_data(data: dict):
    # LLM analysis step
    return {"analysis": "completed", "data": data}

# Chain tasks: fetch then analyze
pipeline = chain(
    fetch_data.s("https://api.example.com/data"),
    analyze_data.s(),
)
result = pipeline.apply_async()
```

## State Persistence Strategies

Regardless of the engine, persist your agent state at meaningful checkpoints:

```python
import json
from pathlib import Path

class CheckpointManager:
    def __init__(self, workflow_id: str, storage_dir: str = "./checkpoints"):
        self.path = Path(storage_dir) / f"{workflow_id}.json"
        self.path.parent.mkdir(parents=True, exist_ok=True)

    def save(self, state: dict):
        self.path.write_text(json.dumps(state, default=str))

    def load(self) -> dict | None:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return None

    def clear(self):
        self.path.unlink(missing_ok=True)
```

## FAQ

### When should I use Temporal versus Celery for agent workflows?

Use Temporal when your workflow has complex branching, long-running wait states (hours or days), or when you need the replay guarantee that ensures exactly-once semantics. Use Celery when you need a simple task queue with retries and your workflows are linear chains of tasks without complex orchestration logic.

### How does Temporal replay work without re-executing completed activities?

Temporal records every activity completion in its event history. During replay, the workflow code runs again, but when it hits `execute_activity`, Temporal checks the history. If that activity already completed, it returns the stored result immediately instead of dispatching it to a worker. This makes replay deterministic and fast.

### What happens to in-flight LLM calls when a worker crashes?

The LLM call becomes orphaned — the API may still process it, but the result is lost. Temporal handles this with activity timeouts and retries. When the worker restarts and replays, it re-dispatches the activity. To avoid paying for the orphaned call, set short `start_to_close_timeout` values and implement idempotency on your LLM wrapper so duplicate calls return cached results.

---

#Temporal #Celery #DurableExecution #WorkflowEngines #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/long-running-agent-tasks-durable-execution-temporal-celery
