Skip to content
Durable Execution: Long-Running Agents with Temporal
Learn Agentic AI13 min read39 views

Durable Execution: Long-Running Agents with Temporal

Build long-running AI agents that survive crashes, handle human-in-the-loop approvals, and manage multi-hour workflows using Temporal, DBOS, and Restate with the OpenAI Agents SDK.

The Problem with Long-Running Agents

Most agent workflows complete in seconds — a user asks a question, the agent calls some tools, and returns an answer. But some agent tasks take minutes, hours, or even days. Consider a code migration agent that processes thousands of files across a codebase, a research agent that gathers data from dozens of sources over hours, or an approval workflow where the agent pauses and waits for a human to approve before proceeding.

Standard agent execution has no durability. If the process crashes mid-workflow, all progress is lost. If a human approval takes two hours, the agent process must stay alive the entire time, consuming resources. If the server restarts, the workflow must start from scratch.

Durable execution frameworks solve this by persisting workflow state at each step. If the process crashes after step 7 of a 20-step workflow, it automatically resumes at step 8 when the process restarts.

Temporal Overview

Temporal is the most popular durable execution framework. It separates workflow logic from activity execution, persists every state transition, and automatically retries failed activities. Here is how it integrates with the Agents SDK.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff

Setting Up Temporal with the Agents SDK

First, install the dependencies:

pip install temporalio openai-agents

Define your agent activities — these are the individual units of work that Temporal will persist and retry:

# activities.py
from temporalio import activity
from agents import Agent, Runner

research_agent = Agent(
    name="ResearchAgent",
    model="gpt-4.1",
    instructions="Research the given topic and return a structured summary.",
)

writer_agent = Agent(
    name="WriterAgent",
    model="gpt-4.1",
    instructions="Write a polished report based on the research findings.",
)

reviewer_agent = Agent(
    name="ReviewerAgent",
    model="gpt-5",
    instructions="Review the report for accuracy and completeness. Return approved or revision_needed.",
)

@activity.defn
async def research_topic(topic: str) -> str:
    """Run the research agent. Temporal will retry on failure."""
    result = await Runner.run(research_agent, input=f"Research this topic: {topic}")
    return result.final_output

@activity.defn
async def write_report(research: str) -> str:
    """Run the writer agent."""
    result = await Runner.run(writer_agent, input=f"Write a report from this research:\n{research}")
    return result.final_output

@activity.defn
async def review_report(report: str) -> str:
    """Run the reviewer agent."""
    result = await Runner.run(reviewer_agent, input=f"Review this report:\n{report}")
    return result.final_output

Defining the Workflow

The workflow orchestrates activities and handles the revision loop:

# workflows.py
from temporalio import workflow
from datetime import timedelta

with workflow.unsafe.imports_passed_through():
    from activities import research_topic, write_report, review_report

@workflow.defn
class ReportGenerationWorkflow:
    """A durable multi-agent workflow for generating reviewed reports."""

    def __init__(self):
        self._human_approved = False
        self._revision_notes = ""

    @workflow.run
    async def run(self, topic: str) -> str:
        # Step 1: Research (retried up to 3 times on failure)
        research = await workflow.execute_activity(
            research_topic,
            topic,
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=workflow.RetryPolicy(maximum_attempts=3),
        )

        # Step 2: Write the report
        report = await workflow.execute_activity(
            write_report,
            research,
            start_to_close_timeout=timedelta(minutes=5),
        )

        # Step 3: AI review loop (max 3 revisions)
        for revision in range(3):
            review_result = await workflow.execute_activity(
                review_report,
                report,
                start_to_close_timeout=timedelta(minutes=5),
            )

            if "approved" in review_result.lower():
                break

            # Rewrite based on review feedback
            report = await workflow.execute_activity(
                write_report,
                f"Original report:\n{report}\n\nRevision feedback:\n{review_result}",
                start_to_close_timeout=timedelta(minutes=5),
            )

        # Step 4: Wait for human approval (can take hours)
        await workflow.wait_condition(lambda: self._human_approved)

        return report

    @workflow.signal
    async def approve(self) -> None:
        """Signal from a human to approve the report."""
        self._human_approved = True

    @workflow.signal
    async def request_revision(self, notes: str) -> None:
        """Signal from a human requesting revisions."""
        self._revision_notes = notes

    @workflow.query
    def get_status(self) -> str:
        """Query the current workflow status."""
        if self._human_approved:
            return "approved"
        return "pending_approval"

Every execute_activity call is persisted. If the worker crashes after the research step completes, it skips directly to the write step on restart — no re-running the research agent.

Running the Temporal Worker

# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ReportGenerationWorkflow
from activities import research_topic, write_report, review_report

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="agent-workflows",
        workflows=[ReportGenerationWorkflow],
        activities=[research_topic, write_report, review_report],
    )

    print("Worker started, waiting for workflows...")
    await worker.run()

asyncio.run(main())

Starting and Interacting with Workflows

# client.py
import asyncio
from temporalio.client import Client
from workflows import ReportGenerationWorkflow

async def main():
    client = await Client.connect("localhost:7233")

    # Start the workflow
    handle = await client.start_workflow(
        ReportGenerationWorkflow.run,
        "The impact of AI agents on software engineering",
        id="report-001",
        task_queue="agent-workflows",
    )
    print(f"Workflow started: {handle.id}")

    # Query status
    status = await handle.query(ReportGenerationWorkflow.get_status)
    print(f"Status: {status}")

    # Human approves after reviewing
    await handle.signal(ReportGenerationWorkflow.approve)

    # Wait for completion
    result = await handle.result()
    print(f"Final report:\n{result}")

asyncio.run(main())

Human-in-the-Loop Patterns

The signal mechanism enables sophisticated human-in-the-loop workflows:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

@workflow.defn
class ApprovalWorkflow:
    """Agent workflow that pauses at critical points for human approval."""

    def __init__(self):
        self._decisions: dict[str, str] = {}
        self._pending_decision: str | None = None

    @workflow.run
    async def run(self, task: str) -> str:
        # Agent generates a plan
        plan = await workflow.execute_activity(
            generate_plan,
            task,
            start_to_close_timeout=timedelta(minutes=5),
        )

        # Pause for human approval of the plan
        self._pending_decision = "plan_approval"
        await workflow.wait_condition(
            lambda: "plan_approval" in self._decisions,
            timeout=timedelta(hours=24),  # Wait up to 24 hours
        )

        if self._decisions.get("plan_approval") != "approved":
            return "Workflow cancelled: plan not approved"

        # Agent executes the plan
        result = await workflow.execute_activity(
            execute_plan,
            plan,
            start_to_close_timeout=timedelta(minutes=30),
        )

        return result

    @workflow.signal
    async def decide(self, decision_id: str, decision: str) -> None:
        self._decisions[decision_id] = decision

    @workflow.query
    def pending_decision(self) -> str | None:
        return self._pending_decision

The workflow can pause for up to 24 hours waiting for a human decision. During that time, the Temporal worker can shut down and restart — the workflow state is safely persisted.

Alternative: DBOS for Simpler Durability

If Temporal feels heavyweight for your use case, DBOS provides lightweight durable execution using PostgreSQL as the state backend:

from dbos import DBOS, SetWorkflowID
from agents import Agent, Runner

DBOS()

agent = Agent(
    name="ProcessingAgent",
    model="gpt-4.1",
    instructions="Process documents step by step.",
)

@DBOS.step()
async def process_document(doc: str) -> str:
    """Each step is automatically persisted to PostgreSQL."""
    result = await Runner.run(agent, input=f"Process this document: {doc}")
    return result.final_output

@DBOS.workflow()
async def document_pipeline(documents: list[str]) -> list[str]:
    """If the workflow crashes at document 5, it resumes from document 5."""
    results = []
    for doc in documents:
        result = await process_document(doc)
        results.append(result)
    return results

DBOS requires only a PostgreSQL database — no separate Temporal server to manage.

Choosing the Right Framework

Framework Best For State Backend Complexity
Temporal Complex workflows, enterprise scale Temporal server + DB High
DBOS Simple durability, PostgreSQL shops PostgreSQL Low
Restate Event-driven, serverless-friendly Restate server Medium

For most agent applications, start with the simplest option that meets your requirements. If you need human-in-the-loop signals, complex retry policies, and workflow versioning, use Temporal. If you just need crash recovery for a sequential pipeline, DBOS or Restate will serve you well with much less infrastructure.

Durable execution transforms agents from stateless request handlers into reliable workflow engines. The combination of the Agents SDK's multi-agent orchestration with Temporal's state persistence creates systems that can handle workflows spanning minutes, hours, or days — surviving crashes, waiting for humans, and recovering gracefully from failures.

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.