---
title: "Migrating Agent Data: Moving Conversations, Sessions, and Memory Between Systems"
description: "Learn how to migrate conversations, sessions, and agent memory between AI systems with zero downtime. Covers data export, transformation, import validation, and cutover strategies."
canonical: https://callsphere.ai/blog/migrating-agent-data-conversations-sessions-memory-between-systems
category: "Learn Agentic AI"
tags: ["Data Migration", "Agent Memory", "Conversations", "Zero Downtime", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-08T19:22:09.905Z
---

# Migrating Agent Data: Moving Conversations, Sessions, and Memory Between Systems

> Learn how to migrate conversations, sessions, and agent memory between AI systems with zero downtime. Covers data export, transformation, import validation, and cutover strategies.

## Why Agent Data Migration Is Harder Than Regular Data Migration

Agent data has unique characteristics that make migration challenging. Conversations have temporal ordering that must be preserved. Session state references tool call IDs and function outputs that are framework-specific. Memory stores may contain embeddings tied to a particular model version. And users expect continuity — they do not want to re-explain context after a system change.

A well-planned migration preserves all of this while the system stays online.

## Step 1: Define a Canonical Data Format

Before exporting anything, define a framework-agnostic format that captures all the information you need.

```mermaid
flowchart TD
    MSG(["New message"])
    WORKING["Working memory
rolling window"]
    EPISODIC[("Episodic memory
past sessions")]
    SEMANTIC[("Semantic memory
facts and preferences")]
    SUM["Summarizer
compresses old turns"]
    ROUTER{"Retrieve
needed memories"}
    PROMPT["Assembled context"]
    LLM["LLM"]
    UPD["Memory updater
writes new facts"]
    MSG --> WORKING --> ROUTER
    ROUTER -->|Past sessions| EPISODIC
    ROUTER -->|User facts| SEMANTIC
    EPISODIC --> SUM --> PROMPT
    SEMANTIC --> PROMPT
    WORKING --> PROMPT --> LLM --> UPD
    UPD --> EPISODIC
    UPD --> SEMANTIC
    style ROUTER fill:#4f46e5,stroke:#4338ca,color:#fff
    style LLM fill:#f59e0b,stroke:#d97706,color:#1f2937
    style EPISODIC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style SEMANTIC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
```

```python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json

@dataclass
class CanonicalMessage:
    role: str  # "user", "assistant", "system", "tool"
    content: str
    timestamp: datetime
    tool_call_id: Optional[str] = None
    tool_name: Optional[str] = None
    metadata: dict = field(default_factory=dict)

@dataclass
class CanonicalSession:
    session_id: str
    user_id: str
    messages: list[CanonicalMessage]
    created_at: datetime
    updated_at: datetime
    agent_name: str
    metadata: dict = field(default_factory=dict)

def serialize_session(session: CanonicalSession) -> str:
    """Serialize to JSON for transport."""
    return json.dumps({
        "session_id": session.session_id,
        "user_id": session.user_id,
        "messages": [
            {
                "role": m.role,
                "content": m.content,
                "timestamp": m.timestamp.isoformat(),
                "tool_call_id": m.tool_call_id,
                "tool_name": m.tool_name,
                "metadata": m.metadata,
            }
            for m in session.messages
        ],
        "created_at": session.created_at.isoformat(),
        "updated_at": session.updated_at.isoformat(),
        "agent_name": session.agent_name,
        "metadata": session.metadata,
    }, indent=2)
```

## Step 2: Export from the Source System

Write an exporter that reads from your current storage and transforms to the canonical format.

```python
import asyncpg

async def export_sessions(
    db_url: str,
    batch_size: int = 500,
) -> list[CanonicalSession]:
    """Export sessions from PostgreSQL in batches."""
    conn = await asyncpg.connect(db_url)
    sessions = []
    offset = 0

    while True:
        rows = await conn.fetch(
            """
            SELECT s.id, s.user_id, s.created_at, s.updated_at,
                   s.agent_name, s.metadata
            FROM sessions s
            ORDER BY s.created_at
            LIMIT $1 OFFSET $2
            """,
            batch_size, offset,
        )
        if not rows:
            break

        for row in rows:
            messages = await conn.fetch(
                """
                SELECT role, content, created_at, tool_call_id,
                       tool_name, metadata
                FROM messages
                WHERE session_id = $1
                ORDER BY created_at
                """,
                row["id"],
            )
            sessions.append(CanonicalSession(
                session_id=str(row["id"]),
                user_id=str(row["user_id"]),
                messages=[
                    CanonicalMessage(
                        role=m["role"],
                        content=m["content"],
                        timestamp=m["created_at"],
                        tool_call_id=m.get("tool_call_id"),
                        tool_name=m.get("tool_name"),
                        metadata=m.get("metadata") or {},
                    )
                    for m in messages
                ],
                created_at=row["created_at"],
                updated_at=row["updated_at"],
                agent_name=row["agent_name"],
                metadata=row.get("metadata") or {},
            ))
        offset += batch_size

    await conn.close()
    return sessions
```

## Step 3: Import and Validate

Import into the target system with validation checks at every step.

```python
async def import_sessions(
    sessions: list[CanonicalSession],
    target_db_url: str,
) -> dict:
    """Import sessions with validation."""
    conn = await asyncpg.connect(target_db_url)
    stats = {"imported": 0, "skipped": 0, "errors": 0}

    for session in sessions:
        try:
            # Check for duplicates
            existing = await conn.fetchval(
                "SELECT 1 FROM sessions WHERE id = $1",
                session.session_id,
            )
            if existing:
                stats["skipped"] += 1
                continue

            async with conn.transaction():
                await conn.execute(
                    """INSERT INTO sessions
                       (id, user_id, agent_name, created_at, updated_at)
                       VALUES ($1, $2, $3, $4, $5)""",
                    session.session_id, session.user_id,
                    session.agent_name, session.created_at,
                    session.updated_at,
                )
                for msg in session.messages:
                    await conn.execute(
                        """INSERT INTO messages
                           (session_id, role, content, created_at)
                           VALUES ($1, $2, $3, $4)""",
                        session.session_id, msg.role,
                        msg.content, msg.timestamp,
                    )
            stats["imported"] += 1
        except Exception as e:
            stats["errors"] += 1
            print(f"Error importing {session.session_id}: {e}")

    await conn.close()
    return stats
```

## Step 4: Validate Counts and Integrity

After import, run integrity checks to make sure nothing was lost.

```python
async def validate_migration(source_url: str, target_url: str):
    src = await asyncpg.connect(source_url)
    tgt = await asyncpg.connect(target_url)

    src_sessions = await src.fetchval("SELECT count(*) FROM sessions")
    tgt_sessions = await tgt.fetchval("SELECT count(*) FROM sessions")
    src_messages = await src.fetchval("SELECT count(*) FROM messages")
    tgt_messages = await tgt.fetchval("SELECT count(*) FROM messages")

    print(f"Sessions: source={src_sessions}, target={tgt_sessions}")
    print(f"Messages: source={src_messages}, target={tgt_messages}")
    assert src_sessions == tgt_sessions, "Session count mismatch"
    assert src_messages == tgt_messages, "Message count mismatch"
```

## FAQ

### How do I handle active sessions during migration?

Use a write-ahead approach. Set a cutoff timestamp, export all sessions up to that point, then replay any new writes that occurred during the export. A CDC (Change Data Capture) stream from tools like Debezium can capture these delta writes automatically.

### Should I migrate tool call results or just the conversation text?

Migrate tool call results. They provide context that the agent used to formulate responses. Without them, resuming a conversation in the new system may produce inconsistent follow-ups because the agent loses the factual grounding from previous tool calls.

### What about memory stores like vector databases?

Vector memory requires special handling because embeddings are model-specific. If you are changing embedding models, you must re-embed the source documents rather than copying vectors directly. Plan for the re-embedding compute cost.

---

#DataMigration #AgentMemory #Conversations #ZeroDowntime #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/migrating-agent-data-conversations-sessions-memory-between-systems
