Skip to content
Learn Agentic AI
Learn Agentic AI12 min read0 views

Long-Running Agent Tasks: Durable Execution with Temporal and Celery

Discover how to build resilient long-running agent workflows using durable execution engines like Temporal and Celery, with activity retries, saga patterns, and persistent state across process restarts.

The Problem with Ephemeral Agent Runs

Most agent frameworks treat each invocation as a short-lived function call. The agent receives a prompt, calls some tools, and returns a result — all within a single process lifetime. But real-world agent tasks often take minutes, hours, or even days. A due diligence agent might need to collect data from 50 sources over several hours. A monitoring agent runs indefinitely.

When these long-running tasks crash — and they will — you lose all progress. The agent has no memory of which steps completed, what intermediate results were produced, or where it left off. This is where durable execution comes in.

Durable Execution: The Core Idea

Durable execution means that workflow state survives process failures. If the worker crashes after completing step 3 of 10, it resumes at step 4 when restarted — not step 1. Two popular approaches in the Python ecosystem are Temporal and Celery.

flowchart TD
    START["Long-Running Agent Tasks: Durable Execution with …"] --> A
    A["The Problem with Ephemeral Agent Runs"]
    A --> B
    B["Durable Execution: The Core Idea"]
    B --> C
    C["Building Agent Workflows with Temporal"]
    C --> D
    D["Saga Pattern for Multi-Step Compensation"]
    D --> E
    E["Celery for Simpler Durability"]
    E --> F
    F["State Persistence Strategies"]
    F --> G
    G["FAQ"]
    G --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff

Building Agent Workflows with Temporal

Temporal separates workflows (orchestration logic) from activities (actual work). The workflow is deterministic and replayed on failure. Activities are the non-deterministic side-effecting operations.

from temporalio import workflow, activity
from datetime import timedelta
import asyncio

@activity.defn
async def fetch_source_data(source_url: str) -> dict:
    """Activity: fetch data from a single source."""
    # This runs in a worker and can be retried independently
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(source_url, timeout=30)
        return response.json()

@activity.defn
async def analyze_with_llm(data: dict) -> str:
    """Activity: send collected data to an LLM for analysis."""
    from openai import AsyncOpenAI
    client = AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Analyze the following data."},
            {"role": "user", "content": str(data)},
        ],
    )
    return response.choices[0].message.content

@workflow.defn
class ResearchWorkflow:
    """Durable workflow that survives crashes."""

    @workflow.run
    async def run(self, sources: list[str]) -> str:
        # Each activity call is persisted to Temporal history
        collected = []
        for source in sources:
            data = await workflow.execute_activity(
                fetch_source_data,
                source,
                start_to_close_timeout=timedelta(minutes=2),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
            collected.append(data)

        # If the worker crashes here, it resumes AFTER the loop
        analysis = await workflow.execute_activity(
            analyze_with_llm,
            {"sources": collected},
            start_to_close_timeout=timedelta(minutes=5),
        )
        return analysis

If the worker crashes after fetching 8 of 10 sources, Temporal replays the workflow history. It skips the 8 completed activities (their results are stored) and resumes fetching source 9.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Saga Pattern for Multi-Step Compensation

When a long workflow fails partway through, you often need to undo earlier steps. The saga pattern pairs each action with a compensation:

from dataclasses import dataclass
from typing import Callable, Awaitable

@dataclass
class SagaStep:
    action: Callable[..., Awaitable]
    compensation: Callable[..., Awaitable]
    name: str

class SagaOrchestrator:
    def __init__(self):
        self.completed_steps: list[SagaStep] = []

    async def execute(self, steps: list[SagaStep], context: dict):
        for step in steps:
            try:
                await step.action(context)
                self.completed_steps.append(step)
            except Exception as e:
                print(f"Step '{step.name}' failed: {e}")
                await self.compensate()
                raise

    async def compensate(self):
        """Roll back completed steps in reverse order."""
        for step in reversed(self.completed_steps):
            try:
                await step.compensation({})
            except Exception as comp_error:
                print(f"Compensation for '{step.name}' failed: {comp_error}")

Celery for Simpler Durability

If Temporal feels heavyweight, Celery provides task queuing with retries and result persistence:

from celery import Celery, chain

app = Celery("agent_tasks", broker="redis://localhost:6379/0")
app.conf.result_backend = "redis://localhost:6379/1"

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_data(self, source_url: str):
    try:
        import httpx
        response = httpx.get(source_url, timeout=30)
        return response.json()
    except Exception as exc:
        self.retry(exc=exc)

@app.task
def analyze_data(data: dict):
    # LLM analysis step
    return {"analysis": "completed", "data": data}

# Chain tasks: fetch then analyze
pipeline = chain(
    fetch_data.s("https://api.example.com/data"),
    analyze_data.s(),
)
result = pipeline.apply_async()

State Persistence Strategies

Regardless of the engine, persist your agent state at meaningful checkpoints:

import json
from pathlib import Path

class CheckpointManager:
    def __init__(self, workflow_id: str, storage_dir: str = "./checkpoints"):
        self.path = Path(storage_dir) / f"{workflow_id}.json"
        self.path.parent.mkdir(parents=True, exist_ok=True)

    def save(self, state: dict):
        self.path.write_text(json.dumps(state, default=str))

    def load(self) -> dict | None:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return None

    def clear(self):
        self.path.unlink(missing_ok=True)

FAQ

When should I use Temporal versus Celery for agent workflows?

Use Temporal when your workflow has complex branching, long-running wait states (hours or days), or when you need the replay guarantee that ensures exactly-once semantics. Use Celery when you need a simple task queue with retries and your workflows are linear chains of tasks without complex orchestration logic.

How does Temporal replay work without re-executing completed activities?

Temporal records every activity completion in its event history. During replay, the workflow code runs again, but when it hits execute_activity, Temporal checks the history. If that activity already completed, it returns the stored result immediately instead of dispatching it to a worker. This makes replay deterministic and fast.

What happens to in-flight LLM calls when a worker crashes?

The LLM call becomes orphaned — the API may still process it, but the result is lost. Temporal handles this with activity timeouts and retries. When the worker restarts and replays, it re-dispatches the activity. To avoid paying for the orphaned call, set short start_to_close_timeout values and implement idempotency on your LLM wrapper so duplicate calls return cached results.


#Temporal #Celery #DurableExecution #WorkflowEngines #Python #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

OpenAI Agents SDK in 2026: Building Multi-Agent Systems with Handoffs and Guardrails

Complete tutorial on the OpenAI Agents SDK covering agent creation, tool definitions, handoff patterns between specialist agents, and input/output guardrails for safe AI systems.

Learn Agentic AI

Building a Research Agent with Web Search and Report Generation: Complete Tutorial

Build a research agent that searches the web, extracts and synthesizes data, and generates formatted reports using OpenAI Agents SDK and web search tools.

Learn Agentic AI

Build a Customer Support Agent from Scratch: Python, OpenAI, and Twilio in 60 Minutes

Step-by-step tutorial to build a production-ready customer support AI agent using Python FastAPI, OpenAI Agents SDK, and Twilio Voice with five integrated tools.

Learn Agentic AI

LangGraph Agent Patterns 2026: Building Stateful Multi-Step AI Workflows

Complete LangGraph tutorial covering state machines for agents, conditional edges, human-in-the-loop patterns, checkpointing, and parallel execution with full code examples.