---
title: "AI Agent for Capacity Planning: Predicting Resource Needs Before They Become Critical"
description: "Build an AI agent that analyzes infrastructure usage trends, forecasts resource exhaustion, sets dynamic threshold alerts, and generates scaling recommendations before outages occur."
canonical: https://callsphere.ai/blog/ai-agent-capacity-planning-predicting-resource-needs
category: "Learn Agentic AI"
tags: ["Capacity Planning", "Forecasting", "SRE", "DevOps", "Python", "Agentic AI"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:44.001Z
---

# AI Agent for Capacity Planning: Predicting Resource Needs Before They Become Critical

> Build an AI agent that analyzes infrastructure usage trends, forecasts resource exhaustion, sets dynamic threshold alerts, and generates scaling recommendations before outages occur.

## The Capacity Planning Problem

Capacity planning fails in two directions. Over-provision and you waste money. Under-provision and you face outages. Static thresholds like "alert at 80% disk" are better than nothing but they do not account for growth rate. A disk at 80% that grows 0.1% per day gives you months. A disk at 60% that grows 5% per day gives you a week. An AI capacity planning agent focuses on trajectories rather than snapshots.

## Collecting Historical Resource Data

The agent needs time-series data for compute, memory, disk, network, and application-specific metrics. It stores daily snapshots for trend analysis.

```mermaid
flowchart LR
    REL(["Release of
AI Agent for Capacity
Planning"])
    NEW1["What's new
flagship feature 1"]
    NEW2["What's new
flagship feature 2"]
    NEW3["What's new
flagship feature 3"]
    BREAK{"Breaking
changes?"}
    MIG["Migration steps"]
    UPG(["Upgrade now"])
    WAIT(["Pin current,
upgrade later"])
    REL --> NEW1
    REL --> NEW2
    REL --> NEW3
    NEW1 --> BREAK
    NEW2 --> BREAK
    NEW3 --> BREAK
    BREAK -->|Yes| MIG --> UPG
    BREAK -->|No| UPG
    BREAK -->|Risk averse| WAIT
    style REL fill:#4f46e5,stroke:#4338ca,color:#fff
    style BREAK fill:#f59e0b,stroke:#d97706,color:#1f2937
    style UPG fill:#059669,stroke:#047857,color:#fff
    style WAIT fill:#0ea5e9,stroke:#0369a1,color:#fff
```

```python
import asyncpg
import httpx
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class ResourceSnapshot:
    resource_id: str
    resource_type: str  # "cpu", "memory", "disk", "connections"
    current_value: float
    max_value: float
    utilization_pct: float
    timestamp: datetime

class CapacityCollector:
    def __init__(self, prometheus_url: str, db_dsn: str):
        self.prom_url = prometheus_url
        self.db_dsn = db_dsn
        self.http = httpx.AsyncClient(timeout=30)

    async def collect_snapshots(self) -> list[ResourceSnapshot]:
        queries = {
            "cpu": (
                'avg(rate(container_cpu_usage_seconds_total[5m])) by (pod)',
                'avg(kube_pod_container_resource_limits{resource="cpu"}) by (pod)',
            ),
            "memory": (
                'avg(container_memory_usage_bytes) by (pod)',
                'avg(kube_pod_container_resource_limits{resource="memory"}) by (pod)',
            ),
            "disk": (
                'node_filesystem_size_bytes - node_filesystem_avail_bytes',
                'node_filesystem_size_bytes',
            ),
        }

        snapshots = []
        for rtype, (usage_q, limit_q) in queries.items():
            usage = await self._query_prometheus(usage_q)
            limits = await self._query_prometheus(limit_q)

            for metric in usage:
                pod = metric["metric"].get("pod", "node")
                value = float(metric["value"][1])
                limit = self._find_limit(limits, pod)
                if limit and limit > 0:
                    snapshots.append(ResourceSnapshot(
                        resource_id=pod,
                        resource_type=rtype,
                        current_value=value,
                        max_value=limit,
                        utilization_pct=(value / limit) * 100,
                        timestamp=datetime.utcnow(),
                    ))
        return snapshots

    async def _query_prometheus(self, query: str) -> list:
        resp = await self.http.get(
            f"{self.prom_url}/api/v1/query",
            params={"query": query},
        )
        return resp.json()["data"]["result"]

    def _find_limit(self, limits: list, pod: str) -> Optional[float]:
        for m in limits:
            if m["metric"].get("pod") == pod:
                return float(m["value"][1])
        return None

    async def store_snapshot(self, snapshot: ResourceSnapshot):
        pool = await asyncpg.create_pool(self.db_dsn)
        await pool.execute("""
            INSERT INTO capacity_snapshots
            (resource_id, resource_type, current_value, max_value,
             utilization_pct, timestamp)
            VALUES ($1, $2, $3, $4, $5, $6)
        """, snapshot.resource_id, snapshot.resource_type,
            snapshot.current_value, snapshot.max_value,
            snapshot.utilization_pct, snapshot.timestamp)
        await pool.close()
```

## Trend Analysis and Forecasting

The agent uses linear regression on historical snapshots to project when resources will be exhausted.

```python
import numpy as np
from scipy.stats import linregress

@dataclass
class CapacityForecast:
    resource_id: str
    resource_type: str
    current_pct: float
    growth_rate_per_day: float
    days_to_80_pct: Optional[int]
    days_to_90_pct: Optional[int]
    days_to_100_pct: Optional[int]
    confidence: float
    trend: str  # "growing", "stable", "shrinking"

class TrendAnalyzer:
    def __init__(self, warning_days: int = 14, critical_days: int = 7):
        self.warning_days = warning_days
        self.critical_days = critical_days

    def forecast(
        self, snapshots: list[ResourceSnapshot]
    ) -> CapacityForecast:
        if len(snapshots)  Optional[int]:
            if daily_growth  0:
            trend = "growing"
        else:
            trend = "shrinking"

        return CapacityForecast(
            resource_id=snapshots[-1].resource_id,
            resource_type=snapshots[-1].resource_type,
            current_pct=current,
            growth_rate_per_day=daily_growth,
            days_to_80_pct=days_to_threshold(80),
            days_to_90_pct=days_to_threshold(90),
            days_to_100_pct=days_to_threshold(100),
            confidence=r_value ** 2,
            trend=trend,
        )

    def _insufficient_data(self, latest: ResourceSnapshot) -> CapacityForecast:
        return CapacityForecast(
            resource_id=latest.resource_id,
            resource_type=latest.resource_type,
            current_pct=latest.utilization_pct,
            growth_rate_per_day=0.0,
            days_to_80_pct=None,
            days_to_90_pct=None,
            days_to_100_pct=None,
            confidence=0.0,
            trend="unknown",
        )
```

## Scaling Recommendations with LLM Reasoning

The agent uses an LLM to turn raw forecasts into actionable scaling recommendations.

```python
import openai
import json

async def generate_scaling_plan(
    forecasts: list[CapacityForecast],
) -> list[dict]:
    critical = [f for f in forecasts if f.days_to_90_pct is not None and f.days_to_90_pct  list[dict]:
    alerts = []
    for f in forecasts:
        if f.days_to_100_pct is not None and f.days_to_100_pct <= 3:
            alerts.append({
                "severity": "critical",
                "resource": f.resource_id,
                "message": (
                    f"{f.resource_type} at {f.current_pct:.1f}% and growing "
                    f"{f.growth_rate_per_day:.1f}%/day. Exhaustion in "
                    f"{f.days_to_100_pct} days."
                ),
            })
        elif f.days_to_90_pct is not None and f.days_to_90_pct <= 7:
            alerts.append({
                "severity": "warning",
                "resource": f.resource_id,
                "message": (
                    f"{f.resource_type} at {f.current_pct:.1f}%, "
                    f"reaching 90% in {f.days_to_90_pct} days."
                ),
            })
    return alerts
```

## FAQ

### How do I account for seasonal traffic patterns like Black Friday or month-end processing?

Augment linear regression with seasonal decomposition. Store at least one full cycle of historical data (one year for annual patterns, one month for monthly). Use the seasonal component to adjust forecasts. The agent should flag upcoming high-traffic events from a calendar and factor in the expected multiplier.

### What if the growth rate changes suddenly due to a new feature launch?

Use a weighted regression that gives more importance to recent data points. A 7-day exponentially weighted average reacts faster to trend changes than a flat 90-day average. The agent should also watch for change points where the growth rate itself shifts and alert when the slope increases significantly.

### How do I handle resources that have hard limits that cannot be scaled (like database connections)?

For hard-limited resources, the agent must recommend architectural changes rather than simple scaling. If PostgreSQL max_connections is at 80% and growing, the recommendation might be to add PgBouncer for connection pooling or to implement connection sharing in the application layer. The LLM reasoning step should know about these architectural options.

---

#CapacityPlanning #Forecasting #SRE #DevOps #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/ai-agent-capacity-planning-predicting-resource-needs
