---
title: "AI Agent for Runbook Automation: Converting Human Procedures to Automated Workflows"
description: "Build an AI agent that parses human-written runbooks, converts them into executable automation workflows with verification steps, handles edge cases, and supports safe rollback."
canonical: https://callsphere.ai/blog/ai-agent-runbook-automation-converting-human-procedures-automated-workflows
category: "Learn Agentic AI"
tags: ["Runbook Automation", "DevOps", "SRE", "Workflow", "Python", "Agentic AI"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:45.811Z
---

# AI Agent for Runbook Automation: Converting Human Procedures to Automated Workflows

> Build an AI agent that parses human-written runbooks, converts them into executable automation workflows with verification steps, handles edge cases, and supports safe rollback.

## The Runbook Problem

Every operations team has runbooks. Markdown documents or wiki pages that describe step-by-step procedures for common tasks: "How to restart the payment service," "How to failover the database," "How to clear the cache during a traffic spike." The problem is that runbooks are written for humans and executed inconsistently. Steps get skipped. Verification checks are forgotten. The on-call engineer at 3 AM misreads step 7 and makes things worse. An AI runbook automation agent converts these human procedures into executable, verifiable workflows.

## Parsing Human-Written Runbooks

The agent reads runbooks in markdown format and extracts structured steps using an LLM.

```mermaid
flowchart LR
    INC(["Production incident"])
    DETECT["Detect
alerts plus user reports"]
    MIT["Mitigate
rollback or feature flag"]
    RES["Resolve"]
    DOC["Timeline doc
events plus actions"]
    RCA{"5 whys plus
causal graph"}
    AI["Action items
owner plus due date"]
    SHARE(["Blameless review"])
    LEARN[("Runbook plus
eval added")]
    INC --> DETECT --> MIT --> RES --> DOC --> RCA --> AI --> SHARE --> LEARN
    style RCA fill:#4f46e5,stroke:#4338ca,color:#fff
    style LEARN fill:#059669,stroke:#047857,color:#fff
```

```python
import openai
import json
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class StepType(Enum):
    COMMAND = "command"       # Shell command to execute
    VERIFICATION = "verify"  # Check that something is true
    DECISION = "decision"    # Branch based on condition
    WAIT = "wait"            # Pause for a duration
    NOTIFY = "notify"        # Alert a human or channel
    MANUAL = "manual"        # Requires human action

@dataclass
class RunbookStep:
    order: int
    description: str
    step_type: StepType
    command: Optional[str] = None
    expected_output: Optional[str] = None
    timeout_seconds: int = 60
    rollback_command: Optional[str] = None
    on_failure: str = "abort"  # "abort", "skip", "retry", "rollback"
    retries: int = 0
    conditions: dict = field(default_factory=dict)

@dataclass
class ParsedRunbook:
    title: str
    description: str
    steps: list[RunbookStep]
    prerequisites: list[str]
    estimated_duration_minutes: int
    risk_level: str

async def parse_runbook(markdown_content: str) -> ParsedRunbook:
    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """You convert human-written runbooks into
structured automation specs. For each step, determine:
- Whether it is a command, verification, decision, wait, notification, or manual step
- The exact shell command (if applicable)
- What output to expect for verification
- What to do on failure
- The rollback command if the step needs to be undone"""},
            {"role": "user", "content": f"""Parse this runbook into structured steps.

{markdown_content}

Return JSON with: title, description, steps (array of objects with: order,
description, step_type, command, expected_output, timeout_seconds,
rollback_command, on_failure, retries), prerequisites, estimated_duration_minutes,
risk_level."""},
        ],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    data = json.loads(response.choices[0].message.content)

    steps = [
        RunbookStep(
            order=s["order"],
            description=s["description"],
            step_type=StepType(s["step_type"]),
            command=s.get("command"),
            expected_output=s.get("expected_output"),
            timeout_seconds=s.get("timeout_seconds", 60),
            rollback_command=s.get("rollback_command"),
            on_failure=s.get("on_failure", "abort"),
            retries=s.get("retries", 0),
        ) for s in data["steps"]
    ]

    return ParsedRunbook(
        title=data["title"],
        description=data["description"],
        steps=steps,
        prerequisites=data.get("prerequisites", []),
        estimated_duration_minutes=data.get("estimated_duration_minutes", 10),
        risk_level=data.get("risk_level", "medium"),
    )
```

## The Execution Engine

The engine runs each step, verifies outcomes, handles failures, and supports rollback.

```python
import subprocess
import asyncio
import logging
import re
from datetime import datetime

logger = logging.getLogger("runbook-agent")

@dataclass
class StepResult:
    step: RunbookStep
    success: bool
    output: str
    error: str
    duration_seconds: float
    rolled_back: bool = False
    skipped: bool = False

class RunbookExecutor:
    def __init__(self):
        self.completed_steps: list[StepResult] = []
        self.execution_log: list[dict] = []

    async def execute(self, runbook: ParsedRunbook) -> list[StepResult]:
        logger.info(f"Starting runbook: {runbook.title}")
        self._log("start", f"Beginning execution of: {runbook.title}")

        for step in runbook.steps:
            result = await self._execute_step(step)
            self.completed_steps.append(result)

            if not result.success and step.on_failure == "abort":
                logger.error(f"Step {step.order} failed. Aborting.")
                self._log("abort", f"Aborted at step {step.order}")
                await self._rollback()
                break
            elif not result.success and step.on_failure == "rollback":
                await self._rollback()
                break

        self._log("complete", "Runbook execution finished")
        return self.completed_steps

    async def _execute_step(self, step: RunbookStep) -> StepResult:
        self._log("step_start", f"Step {step.order}: {step.description}")
        start = datetime.utcnow()

        if step.step_type == StepType.MANUAL:
            self._log("manual", f"Manual step required: {step.description}")
            return StepResult(
                step=step, success=True, output="Manual step - skipped in auto mode",
                error="", duration_seconds=0, skipped=True,
            )

        if step.step_type == StepType.WAIT:
            wait_secs = step.timeout_seconds
            logger.info(f"Waiting {wait_secs} seconds...")
            await asyncio.sleep(wait_secs)
            return StepResult(
                step=step, success=True, output=f"Waited {wait_secs}s",
                error="", duration_seconds=wait_secs,
            )

        if step.step_type == StepType.NOTIFY:
            await self._send_notification(step.description)
            return StepResult(
                step=step, success=True, output="Notification sent",
                error="", duration_seconds=0,
            )

        # Execute command with retries
        last_result = None
        for attempt in range(step.retries + 1):
            result = await self._run_command(step.command, step.timeout_seconds)
            elapsed = (datetime.utcnow() - start).total_seconds()

            if result.returncode == 0:
                # Verify expected output if specified
                if step.expected_output and step.step_type == StepType.VERIFICATION:
                    if not self._verify_output(result.stdout, step.expected_output):
                        last_result = StepResult(
                            step=step, success=False,
                            output=result.stdout, error="Output verification failed",
                            duration_seconds=elapsed,
                        )
                        continue

                return StepResult(
                    step=step, success=True,
                    output=result.stdout, error=result.stderr,
                    duration_seconds=elapsed,
                )

            last_result = StepResult(
                step=step, success=False,
                output=result.stdout, error=result.stderr,
                duration_seconds=elapsed,
            )
            if attempt  bool:
        """Check if actual output contains expected pattern."""
        return bool(re.search(expected, actual, re.IGNORECASE))

    async def _rollback(self):
        logger.warning("Initiating rollback...")
        self._log("rollback_start", "Beginning rollback of completed steps")

        for result in reversed(self.completed_steps):
            if result.success and result.step.rollback_command:
                logger.info(f"Rolling back step {result.step.order}")
                rb_result = await self._run_command(
                    result.step.rollback_command, result.step.timeout_seconds
                )
                result.rolled_back = rb_result.returncode == 0
                if not result.rolled_back:
                    logger.error(
                        f"Rollback failed for step {result.step.order}: "
                        f"{rb_result.stderr}"
                    )

    async def _send_notification(self, message: str):
        logger.info(f"Notification: {message}")

    def _log(self, event: str, message: str):
        self.execution_log.append({
            "timestamp": datetime.utcnow().isoformat(),
            "event": event,
            "message": message,
        })
```

## Converting Runbooks to Reusable Workflows

The agent stores parsed runbooks as YAML workflow definitions that can be version-controlled and reused.

```yaml
# workflows/restart-payment-service.yaml
name: Restart Payment Service
description: Safely restart the payment service with zero downtime
risk_level: medium
estimated_duration: 5m
prerequisites:
  - kubectl access to production namespace
  - payment-service has at least 2 replicas

steps:
  - order: 1
    type: verify
    description: Confirm service has multiple replicas
    command: "kubectl get deploy payment-service -n production -o jsonpath='{.spec.replicas}'"
    expected_output: "[2-9]"
    on_failure: abort

  - order: 2
    type: command
    description: Initiate rolling restart
    command: "kubectl rollout restart deploy/payment-service -n production"
    rollback_command: "kubectl rollout undo deploy/payment-service -n production"
    on_failure: rollback

  - order: 3
    type: wait
    description: Wait for pods to start rolling
    timeout_seconds: 30

  - order: 4
    type: verify
    description: Check rollout status
    command: "kubectl rollout status deploy/payment-service -n production --timeout=120s"
    expected_output: "successfully rolled out"
    on_failure: rollback
    retries: 2

  - order: 5
    type: verify
    description: Verify service health
    command: "curl -s http://payment-service.production.svc/health"
    expected_output: "ok"
    on_failure: rollback
    retries: 3
```

## Loading and Running YAML Workflows

```python
import yaml

def load_workflow(path: str) -> ParsedRunbook:
    with open(path) as f:
        data = yaml.safe_load(f)

    steps = [
        RunbookStep(
            order=s["order"],
            description=s["description"],
            step_type=StepType(s["type"]),
            command=s.get("command"),
            expected_output=s.get("expected_output"),
            timeout_seconds=s.get("timeout_seconds", 60),
            rollback_command=s.get("rollback_command"),
            on_failure=s.get("on_failure", "abort"),
            retries=s.get("retries", 0),
        ) for s in data["steps"]
    ]

    return ParsedRunbook(
        title=data["name"],
        description=data["description"],
        steps=steps,
        prerequisites=data.get("prerequisites", []),
        estimated_duration_minutes=int(data.get("estimated_duration", "10m").rstrip("m")),
        risk_level=data.get("risk_level", "medium"),
    )

async def main():
    runbook = load_workflow("workflows/restart-payment-service.yaml")
    executor = RunbookExecutor()
    results = await executor.execute(runbook)

    succeeded = sum(1 for r in results if r.success)
    failed = sum(1 for r in results if not r.success)
    print(f"Results: {succeeded} succeeded, {failed} failed")

    for r in results:
        status = "OK" if r.success else "FAIL"
        print(f"  Step {r.step.order} [{status}]: {r.step.description}")
```

## FAQ

### How do I handle runbooks that require interactive input or judgment calls?

Mark those steps as `StepType.MANUAL` during parsing. The executor pauses at manual steps and sends a notification to the on-call engineer with the context of what has been done so far and what decision is needed. The engineer provides input through Slack or a web UI, and the agent continues with the remaining automated steps.

### What if the original runbook is outdated or contains wrong commands?

The agent validates each command in a dry-run mode before executing. For kubectl commands, it uses `--dry-run=client`. For destructive commands, it checks preconditions first. If a command fails during dry-run, the agent flags the step as needing review and suggests an updated command based on the current cluster state.

### How do I version-control the automated workflows alongside the original runbooks?

Store both in the same Git repository. The original markdown runbook lives in `docs/runbooks/` and the generated YAML workflow lives in `workflows/`. The agent includes a comment in each YAML file linking to the source runbook and the parse date. When the markdown is updated, a CI job re-parses it and creates a PR with the updated workflow for human review.

---

#RunbookAutomation #DevOps #SRE #Workflow #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/ai-agent-runbook-automation-converting-human-procedures-automated-workflows
