---
title: "AI Agent for Database Operations: Automated Query Optimization and Index Management"
description: "Build an AI agent that monitors PostgreSQL performance, detects slow queries, recommends optimal indexes, schedules vacuum operations, and plans for capacity growth."
canonical: https://callsphere.ai/blog/ai-agent-database-operations-query-optimization-index-management
category: "Learn Agentic AI"
tags: ["Database", "PostgreSQL", "Query Optimization", "DevOps", "Python", "Agentic AI"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:45.368Z
---

# AI Agent for Database Operations: Automated Query Optimization and Index Management

> Build an AI agent that monitors PostgreSQL performance, detects slow queries, recommends optimal indexes, schedules vacuum operations, and plans for capacity growth.

## Why Database Operations Need an Agent

Database performance degrades gradually. A query that ran in 5ms at launch takes 500ms with 10 million rows. An index that was perfect for v1 of your schema becomes a liability after v5. Dead tuples pile up. Connection pools saturate. By the time a human notices, users are already complaining. An AI database operations agent monitors these signals continuously and acts before problems become outages.

## Slow Query Detection

The agent reads from PostgreSQL's `pg_stat_statements` extension, which tracks query execution statistics without modifying your application.

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
import asyncpg
from dataclasses import dataclass
from typing import Optional

@dataclass
class SlowQuery:
    query: str
    calls: int
    total_time_ms: float
    mean_time_ms: float
    rows_returned: int
    shared_blks_hit: int
    shared_blks_read: int
    cache_hit_ratio: float

class QueryMonitor:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=5)

    async def get_slow_queries(
        self, min_mean_ms: float = 100, min_calls: int = 10, limit: int = 20
    ) -> list[SlowQuery]:
        rows = await self.pool.fetch("""
            SELECT
                query,
                calls,
                total_exec_time AS total_time_ms,
                mean_exec_time AS mean_time_ms,
                rows,
                shared_blks_hit,
                shared_blks_read,
                CASE WHEN shared_blks_hit + shared_blks_read = 0
                     THEN 1.0
                     ELSE shared_blks_hit::float /
                          (shared_blks_hit + shared_blks_read)
                END AS cache_hit_ratio
            FROM pg_stat_statements
            WHERE mean_exec_time > $1
              AND calls > $2
              AND query NOT LIKE '%pg_stat%'
            ORDER BY total_exec_time DESC
            LIMIT $3
        """, min_mean_ms, min_calls, limit)

        return [SlowQuery(
            query=r["query"],
            calls=r["calls"],
            total_time_ms=r["total_time_ms"],
            mean_time_ms=r["mean_time_ms"],
            rows_returned=r["rows"],
            shared_blks_hit=r["shared_blks_hit"],
            shared_blks_read=r["shared_blks_read"],
            cache_hit_ratio=r["cache_hit_ratio"],
        ) for r in rows]
```

## Index Recommendation Engine

The agent analyzes query plans and existing indexes to recommend new indexes or identify unused ones.

```python
import openai
import json

async def get_query_plan(pool: asyncpg.Pool, query: str) -> str:
    rows = await pool.fetch(f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}")
    return json.dumps(rows[0]["QUERY PLAN"], indent=2)

async def get_existing_indexes(pool: asyncpg.Pool, table: str) -> list[dict]:
    rows = await pool.fetch("""
        SELECT
            indexname, indexdef,
            idx_scan, idx_tup_read, idx_tup_fetch,
            pg_size_pretty(pg_relation_size(indexrelid)) AS size
        FROM pg_stat_user_indexes
        JOIN pg_indexes ON indexname = pg_stat_user_indexes.indexrelname
        WHERE schemaname = 'public' AND tablename = $1
        ORDER BY idx_scan DESC
    """, table)
    return [dict(r) for r in rows]

async def recommend_indexes(
    pool: asyncpg.Pool, slow_query: SlowQuery
) -> dict:
    plan = await get_query_plan(pool, slow_query.query)

    # Extract table names from the query for index lookup
    tables = await pool.fetch("""
        SELECT DISTINCT tablename FROM pg_tables
        WHERE schemaname = 'public'
    """)
    table_names = [t["tablename"] for t in tables]

    mentioned_tables = [
        t for t in table_names if t in slow_query.query.lower()
    ]

    existing = {}
    for table in mentioned_tables:
        existing[table] = await get_existing_indexes(pool, table)

    client = openai.AsyncOpenAI()
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": f"""Analyze this slow PostgreSQL query and recommend indexes.

Query: {slow_query.query}
Mean execution time: {slow_query.mean_time_ms:.1f}ms
Total calls: {slow_query.calls}
Cache hit ratio: {slow_query.cache_hit_ratio:.3f}

Query plan:
{plan}

Existing indexes on mentioned tables:
{json.dumps(existing, indent=2, default=str)}

Return JSON with:
- recommended_indexes: list of CREATE INDEX statements
- unused_indexes: list of indexes with zero scans that could be dropped
- explanation: why these indexes help
- estimated_improvement: percentage speed improvement estimate"""
        }],
        response_format={"type": "json_object"},
        temperature=0.0,
    )
    return json.loads(response.choices[0].message.content)
```

## Vacuum Scheduling

The agent monitors dead tuple counts and schedules vacuum operations during low-traffic windows.

```python
from datetime import datetime

@dataclass
class TableBloat:
    table_name: str
    live_tuples: int
    dead_tuples: int
    dead_ratio: float
    last_vacuum: Optional[datetime]
    last_autovacuum: Optional[datetime]
    table_size: str

async def check_vacuum_needs(pool: asyncpg.Pool) -> list[TableBloat]:
    rows = await pool.fetch("""
        SELECT
            relname AS table_name,
            n_live_tup AS live_tuples,
            n_dead_tup AS dead_tuples,
            CASE WHEN n_live_tup = 0 THEN 0
                 ELSE n_dead_tup::float / n_live_tup
            END AS dead_ratio,
            last_vacuum,
            last_autovacuum,
            pg_size_pretty(pg_total_relation_size(relid)) AS table_size
        FROM pg_stat_user_tables
        WHERE n_dead_tup > 1000
        ORDER BY n_dead_tup DESC
    """)
    return [TableBloat(**dict(r)) for r in rows]

async def schedule_vacuum(
    pool: asyncpg.Pool, table: str, is_low_traffic: bool
) -> str:
    if not is_low_traffic:
        return f"Skipping vacuum for {table}: not in low-traffic window"

    await pool.execute(f"VACUUM (VERBOSE, ANALYZE) {table}")
    return f"Vacuum completed for {table}"

def is_low_traffic_window() -> bool:
    hour = datetime.utcnow().hour
    return 2  dict:
    """Analyze table growth and project when limits will be reached."""
    rows = await pool.fetch("""
        SELECT
            pg_total_relation_size($1) AS current_bytes,
            (SELECT setting::bigint * pg_size_bytes('1kB')
             FROM pg_settings WHERE name = 'max_wal_size') AS max_wal,
            (SELECT setting FROM pg_settings
             WHERE name = 'max_connections') AS max_conn
    """, table)

    current_size = rows[0]["current_bytes"]

    # Simulate historical growth (in production, store daily snapshots)
    daily_growth_rate = current_size * 0.02  # 2% daily growth estimate
    days_to_100gb = (100 * 1024**3 - current_size) / daily_growth_rate

    return {
        "table": table,
        "current_size_gb": current_size / (1024**3),
        "daily_growth_gb": daily_growth_rate / (1024**3),
        "days_to_100gb": max(0, int(days_to_100gb)),
        "recommendation": (
            "Consider partitioning" if days_to_100gb < 90
            else "Growth is manageable"
        ),
    }
```

## FAQ

### How do I safely apply index recommendations in production?

Always create indexes with `CREATE INDEX CONCURRENTLY` to avoid locking writes. The agent should generate the concurrent version of the DDL. Test the index on a replica first by running the slow query before and after index creation. Only apply to production after confirming improvement on the replica.

### Should the agent run VACUUM FULL automatically?

Never. VACUUM FULL rewrites the entire table and takes an exclusive lock, blocking all reads and writes. The regular VACUUM (which the agent schedules) is safe for online operation. If VACUUM FULL is needed, the agent should flag it as a manual action requiring a maintenance window.

### How does the agent handle parameterized queries in pg_stat_statements?

PostgreSQL normalizes queries in pg_stat_statements by replacing literal values with `$1`, `$2`, etc. The agent works with these normalized forms since that is what matters for index recommendations. When generating EXPLAIN plans, it substitutes reasonable sample values for the parameters.

---

#Database #PostgreSQL #QueryOptimization #DevOps #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/ai-agent-database-operations-query-optimization-index-management
