---
title: "Durable Execution: Long-Running Agents with Temporal"
description: "Build long-running AI agents that survive crashes, handle human-in-the-loop approvals, and manage multi-hour workflows using Temporal, DBOS, and Restate with the OpenAI Agents SDK."
canonical: https://callsphere.ai/blog/durable-execution-long-running-agents-temporal
category: "Learn Agentic AI"
tags: ["OpenAI", "Temporal", "Durable Execution", "Workflow"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-06-04T03:28:51.907Z
---

# Durable Execution: Long-Running Agents with Temporal

> Build long-running AI agents that survive crashes, handle human-in-the-loop approvals, and manage multi-hour workflows using Temporal, DBOS, and Restate with the OpenAI Agents SDK.

## The Problem with Long-Running Agents

Most agent workflows complete in seconds — a user asks a question, the agent calls some tools, and returns an answer. But some agent tasks take minutes, hours, or even days. Consider a code migration agent that processes thousands of files across a codebase, a research agent that gathers data from dozens of sources over hours, or an approval workflow where the agent pauses and waits for a human to approve before proceeding.

Standard agent execution has no durability. If the process crashes mid-workflow, all progress is lost. If a human approval takes two hours, the agent process must stay alive the entire time, consuming resources. If the server restarts, the workflow must start from scratch.

Durable execution frameworks solve this by persisting workflow state at each step. If the process crashes after step 7 of a 20-step workflow, it automatically resumes at step 8 when the process restarts.

## Temporal Overview

Temporal is the most popular durable execution framework. It separates workflow logic from activity execution, persists every state transition, and automatically retries failed activities. Here is how it integrates with the Agents SDK.

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

## Setting Up Temporal with the Agents SDK

First, install the dependencies:

```bash
pip install temporalio openai-agents
```

Define your agent activities — these are the individual units of work that Temporal will persist and retry:

```python
# activities.py
from temporalio import activity
from agents import Agent, Runner

research_agent = Agent(
    name="ResearchAgent",
    model="gpt-4.1",
    instructions="Research the given topic and return a structured summary.",
)

writer_agent = Agent(
    name="WriterAgent",
    model="gpt-4.1",
    instructions="Write a polished report based on the research findings.",
)

reviewer_agent = Agent(
    name="ReviewerAgent",
    model="gpt-5",
    instructions="Review the report for accuracy and completeness. Return approved or revision_needed.",
)

@activity.defn
async def research_topic(topic: str) -> str:
    """Run the research agent. Temporal will retry on failure."""
    result = await Runner.run(research_agent, input=f"Research this topic: {topic}")
    return result.final_output

@activity.defn
async def write_report(research: str) -> str:
    """Run the writer agent."""
    result = await Runner.run(writer_agent, input=f"Write a report from this research:\n{research}")
    return result.final_output

@activity.defn
async def review_report(report: str) -> str:
    """Run the reviewer agent."""
    result = await Runner.run(reviewer_agent, input=f"Review this report:\n{report}")
    return result.final_output
```

## Defining the Workflow

The workflow orchestrates activities and handles the revision loop:

```python
# workflows.py
from temporalio import workflow
from datetime import timedelta

with workflow.unsafe.imports_passed_through():
    from activities import research_topic, write_report, review_report

@workflow.defn
class ReportGenerationWorkflow:
    """A durable multi-agent workflow for generating reviewed reports."""

    def __init__(self):
        self._human_approved = False
        self._revision_notes = ""

    @workflow.run
    async def run(self, topic: str) -> str:
        # Step 1: Research (retried up to 3 times on failure)
        research = await workflow.execute_activity(
            research_topic,
            topic,
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=workflow.RetryPolicy(maximum_attempts=3),
        )

        # Step 2: Write the report
        report = await workflow.execute_activity(
            write_report,
            research,
            start_to_close_timeout=timedelta(minutes=5),
        )

        # Step 3: AI review loop (max 3 revisions)
        for revision in range(3):
            review_result = await workflow.execute_activity(
                review_report,
                report,
                start_to_close_timeout=timedelta(minutes=5),
            )

            if "approved" in review_result.lower():
                break

            # Rewrite based on review feedback
            report = await workflow.execute_activity(
                write_report,
                f"Original report:\n{report}\n\nRevision feedback:\n{review_result}",
                start_to_close_timeout=timedelta(minutes=5),
            )

        # Step 4: Wait for human approval (can take hours)
        await workflow.wait_condition(lambda: self._human_approved)

        return report

    @workflow.signal
    async def approve(self) -> None:
        """Signal from a human to approve the report."""
        self._human_approved = True

    @workflow.signal
    async def request_revision(self, notes: str) -> None:
        """Signal from a human requesting revisions."""
        self._revision_notes = notes

    @workflow.query
    def get_status(self) -> str:
        """Query the current workflow status."""
        if self._human_approved:
            return "approved"
        return "pending_approval"
```

Every `execute_activity` call is persisted. If the worker crashes after the research step completes, it skips directly to the write step on restart — no re-running the research agent.

## Running the Temporal Worker

```python
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ReportGenerationWorkflow
from activities import research_topic, write_report, review_report

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="agent-workflows",
        workflows=[ReportGenerationWorkflow],
        activities=[research_topic, write_report, review_report],
    )

    print("Worker started, waiting for workflows...")
    await worker.run()

asyncio.run(main())
```

## Starting and Interacting with Workflows

```python
# client.py
import asyncio
from temporalio.client import Client
from workflows import ReportGenerationWorkflow

async def main():
    client = await Client.connect("localhost:7233")

    # Start the workflow
    handle = await client.start_workflow(
        ReportGenerationWorkflow.run,
        "The impact of AI agents on software engineering",
        id="report-001",
        task_queue="agent-workflows",
    )
    print(f"Workflow started: {handle.id}")

    # Query status
    status = await handle.query(ReportGenerationWorkflow.get_status)
    print(f"Status: {status}")

    # Human approves after reviewing
    await handle.signal(ReportGenerationWorkflow.approve)

    # Wait for completion
    result = await handle.result()
    print(f"Final report:\n{result}")

asyncio.run(main())
```

## Human-in-the-Loop Patterns

The signal mechanism enables sophisticated human-in-the-loop workflows:

```python
@workflow.defn
class ApprovalWorkflow:
    """Agent workflow that pauses at critical points for human approval."""

    def __init__(self):
        self._decisions: dict[str, str] = {}
        self._pending_decision: str | None = None

    @workflow.run
    async def run(self, task: str) -> str:
        # Agent generates a plan
        plan = await workflow.execute_activity(
            generate_plan,
            task,
            start_to_close_timeout=timedelta(minutes=5),
        )

        # Pause for human approval of the plan
        self._pending_decision = "plan_approval"
        await workflow.wait_condition(
            lambda: "plan_approval" in self._decisions,
            timeout=timedelta(hours=24),  # Wait up to 24 hours
        )

        if self._decisions.get("plan_approval") != "approved":
            return "Workflow cancelled: plan not approved"

        # Agent executes the plan
        result = await workflow.execute_activity(
            execute_plan,
            plan,
            start_to_close_timeout=timedelta(minutes=30),
        )

        return result

    @workflow.signal
    async def decide(self, decision_id: str, decision: str) -> None:
        self._decisions[decision_id] = decision

    @workflow.query
    def pending_decision(self) -> str | None:
        return self._pending_decision
```

The workflow can pause for up to 24 hours waiting for a human decision. During that time, the Temporal worker can shut down and restart — the workflow state is safely persisted.

## Alternative: DBOS for Simpler Durability

If Temporal feels heavyweight for your use case, DBOS provides lightweight durable execution using PostgreSQL as the state backend:

```python
from dbos import DBOS, SetWorkflowID
from agents import Agent, Runner

DBOS()

agent = Agent(
    name="ProcessingAgent",
    model="gpt-4.1",
    instructions="Process documents step by step.",
)

@DBOS.step()
async def process_document(doc: str) -> str:
    """Each step is automatically persisted to PostgreSQL."""
    result = await Runner.run(agent, input=f"Process this document: {doc}")
    return result.final_output

@DBOS.workflow()
async def document_pipeline(documents: list[str]) -> list[str]:
    """If the workflow crashes at document 5, it resumes from document 5."""
    results = []
    for doc in documents:
        result = await process_document(doc)
        results.append(result)
    return results
```

DBOS requires only a PostgreSQL database — no separate Temporal server to manage.

## Choosing the Right Framework

| Framework | Best For | State Backend | Complexity |
| --- | --- | --- | --- |
| Temporal | Complex workflows, enterprise scale | Temporal server + DB | High |
| DBOS | Simple durability, PostgreSQL shops | PostgreSQL | Low |
| Restate | Event-driven, serverless-friendly | Restate server | Medium |

For most agent applications, start with the simplest option that meets your requirements. If you need human-in-the-loop signals, complex retry policies, and workflow versioning, use Temporal. If you just need crash recovery for a sequential pipeline, DBOS or Restate will serve you well with much less infrastructure.

Durable execution transforms agents from stateless request handlers into reliable workflow engines. The combination of the Agents SDK's multi-agent orchestration with Temporal's state persistence creates systems that can handle workflows spanning minutes, hours, or days — surviving crashes, waiting for humans, and recovering gracefully from failures.

---

Source: https://callsphere.ai/blog/durable-execution-long-running-agents-temporal
