---
title: "Database Schema Migrations for AI Agent Systems: Adding Features Without Downtime"
description: "Learn how to perform database schema migrations for AI agent systems with zero downtime. Covers online migrations, backward compatibility, data backfill, and rollback strategies."
canonical: https://callsphere.ai/blog/database-schema-migrations-ai-agent-systems-zero-downtime
category: "Learn Agentic AI"
tags: ["Database Migration", "Schema Changes", "Zero Downtime", "PostgreSQL", "Alembic"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:44.673Z
---

# Database Schema Migrations for AI Agent Systems: Adding Features Without Downtime

> Learn how to perform database schema migrations for AI agent systems with zero downtime. Covers online migrations, backward compatibility, data backfill, and rollback strategies.

## Why AI Agent Databases Are Tricky to Migrate

AI agent systems have database tables that grow in unpredictable ways. A conversations table might store 50,000 rows per day. A tool_calls table logs every function invocation with its arguments and results. A memory_store table holds vector embeddings that cannot be regenerated cheaply.

Adding a column, changing a constraint, or introducing a new table must happen without locking these high-traffic tables. A traditional `ALTER TABLE ... ADD COLUMN` with a `NOT NULL` constraint on a 10-million-row table will lock writes for minutes — and your agents will time out or lose messages.

## The Expand-Contract Pattern

The safest migration strategy for production systems is expand-contract (also called parallel change). It has three phases:

```mermaid
flowchart LR
    CUR(["On Current Vendor"])
    AUDIT["1. Audit current
flows and data"]
    EXPORT["2. Export contacts,
scripts, recordings"]
    BUILD["3. Build CallSphere
agent and integrations"]
    PILOT{"4. Pilot on
10 percent of traffic"}
    CUTOVER["5. Forward all
numbers"]
    LIVE(["Live on
CallSphere"])
    CUR --> AUDIT --> EXPORT --> BUILD --> PILOT
    PILOT -->|Pass| CUTOVER --> LIVE
    PILOT -->|Issues| BUILD
    style CUR fill:#dc2626,stroke:#b91c1c,color:#fff
    style PILOT fill:#f59e0b,stroke:#d97706,color:#1f2937
    style LIVE fill:#059669,stroke:#047857,color:#fff
```

1. **Expand**: Add the new column or table as nullable with no constraints
2. **Migrate**: Backfill existing data and update application code to write to both old and new columns
3. **Contract**: Remove the old column after all code reads from the new one

```python
"""
Alembic migration: Add sentiment_score to conversations.
Phase 1 (Expand) — add nullable column, no downtime.
"""
from alembic import op
import sqlalchemy as sa

revision = "042_add_sentiment_score"
down_revision = "041_add_tool_call_index"

def upgrade():
    # Phase 1: Add column as nullable — instant, no table lock
    op.add_column(
        "conversations",
        sa.Column(
            "sentiment_score",
            sa.Float(),
            nullable=True,
            comment="AI-computed sentiment, -1.0 to 1.0",
        ),
    )
    # Add index concurrently to avoid blocking writes
    op.execute(
        "CREATE INDEX CONCURRENTLY idx_conversations_sentiment "
        "ON conversations (sentiment_score) "
        "WHERE sentiment_score IS NOT NULL"
    )

def downgrade():
    op.drop_index("idx_conversations_sentiment")
    op.drop_column("conversations", "sentiment_score")
```

## Backfill Existing Data Without Locking

Never backfill with a single `UPDATE` on millions of rows. Process in batches.

```python
import asyncpg
import asyncio

async def backfill_sentiment_scores(
    db_url: str,
    batch_size: int = 1000,
    sleep_between_batches: float = 0.1,
):
    """Backfill sentiment scores in small batches."""
    conn = await asyncpg.connect(db_url)
    total_updated = 0

    while True:
        # Select a batch of rows missing the new column
        rows = await conn.fetch(
            """
            SELECT id, content
            FROM conversations
            WHERE sentiment_score IS NULL
            ORDER BY id
            LIMIT $1
            """,
            batch_size,
        )
        if not rows:
            break

        for row in rows:
            score = compute_sentiment(row["content"])
            await conn.execute(
                "UPDATE conversations SET sentiment_score = $1 WHERE id = $2",
                score, row["id"],
            )
            total_updated += 1

        # Yield to other connections
        await asyncio.sleep(sleep_between_batches)
        print(f"Backfilled {total_updated} rows...")

    await conn.close()
    print(f"Backfill complete: {total_updated} rows updated")

def compute_sentiment(text: str) -> float:
    """Compute sentiment score using a lightweight model."""
    # In production, use a fast local model or batch API calls
    from textblob import TextBlob
    return TextBlob(text).sentiment.polarity
```

## Dual-Write During Transition

While the backfill runs, update your application to write to both old and new schemas.

```python
class ConversationRepository:
    """Repository that supports both old and new schema."""

    async def save_message(
        self, conversation_id: str, role: str, content: str,
    ):
        sentiment = compute_sentiment(content) if role == "user" else None

        await self.conn.execute(
            """
            INSERT INTO messages (conversation_id, role, content)
            VALUES ($1, $2, $3)
            """,
            conversation_id, role, content,
        )

        # Dual-write: update the new column on the conversation
        if sentiment is not None:
            await self.conn.execute(
                """
                UPDATE conversations
                SET sentiment_score = $1, updated_at = now()
                WHERE id = $2
                """,
                sentiment, conversation_id,
            )
```

## Phase 3: Contract — Add Constraints

After the backfill completes and all code writes to the new column, add the constraint.

```python
"""Phase 3 migration: Make sentiment_score NOT NULL."""

revision = "044_sentiment_score_not_null"
down_revision = "043_backfill_sentiment"

def upgrade():
    # Validate that backfill is complete before adding constraint
    op.execute(
        "DO $$ BEGIN "
        "  IF EXISTS (SELECT 1 FROM conversations "
        "             WHERE sentiment_score IS NULL LIMIT 1) THEN "
        "    RAISE EXCEPTION 'Backfill incomplete'; "
        "  END IF; "
        "END $$"
    )
    op.alter_column(
        "conversations", "sentiment_score",
        nullable=False,
        server_default="0.0",
    )

def downgrade():
    op.alter_column(
        "conversations", "sentiment_score",
        nullable=True,
        server_default=None,
    )
```

## Rollback Strategy

Always have a rollback plan that does not require a reverse migration.

```python
import os

class FeatureFlags:
    @staticmethod
    def use_sentiment_score() -> bool:
        return os.getenv("FEATURE_SENTIMENT_SCORE", "false") == "true"

# In your API endpoint
async def get_conversation(conversation_id: str):
    conv = await repo.get_conversation(conversation_id)
    response = {"id": conv.id, "messages": conv.messages}

    if FeatureFlags.use_sentiment_score():
        response["sentiment_score"] = conv.sentiment_score

    return response
```

## FAQ

### How do I handle migrations on tables with tens of millions of rows?

Use `ALTER TABLE ... ADD COLUMN` with a nullable column and no default — this is instant in PostgreSQL 11+ because it only updates the catalog. Then backfill in batches of 1,000-5,000 rows with a small sleep between batches to avoid overwhelming the connection pool. Monitor replication lag if you have read replicas.

### What about adding indexes on large tables?

Always use `CREATE INDEX CONCURRENTLY` in PostgreSQL. This builds the index without holding a table lock, though it takes longer to complete. Never create indexes inside a transaction block when using CONCURRENTLY. With Alembic, use `op.execute()` for concurrent index creation rather than `op.create_index()`.

### How do I coordinate schema changes across multiple agent services?

Use the expand-contract pattern with API versioning. The database expands first (new columns are nullable), then each service is updated to use the new columns at its own pace. Only contract (remove old columns) after all services have been updated and deployed. Keep a migration tracker document so every team knows which phase the migration is in.

---

#DatabaseMigration #SchemaChanges #ZeroDowntime #PostgreSQL #Alembic #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/database-schema-migrations-ai-agent-systems-zero-downtime
