Skip to content
Learn Agentic AI
Learn Agentic AI10 min read2 views

ETL for AI Agent Training Data: Extracting and Transforming Conversation Logs

Build an ETL pipeline that extracts conversation logs from AI agent systems, anonymizes PII, transforms them into training-ready formats, and filters for quality to improve agent performance.

Why Conversation Logs Are Your Most Valuable Data

Every conversation your AI agent handles is a data point about what users actually ask, how the agent responds, and where it fails. This data is far more valuable than synthetic training sets because it reflects real user language, real edge cases, and real failure modes specific to your domain.

But raw conversation logs are messy. They contain PII that cannot be stored in training sets, they include failed conversations that would teach the model bad habits, and they are in whatever format your logging system uses rather than the format your training pipeline needs. An ETL pipeline transforms raw logs into clean, anonymized, quality-filtered training data.

Extracting Logs from Multiple Sources

Agent conversation logs typically live in multiple places: database tables, JSON log files, and third-party platforms. The extraction layer normalizes all sources into a common format.

flowchart TD
    START["ETL for AI Agent Training Data: Extracting and Tr…"] --> A
    A["Why Conversation Logs Are Your Most Val…"]
    A --> B
    B["Extracting Logs from Multiple Sources"]
    B --> C
    C["PII Anonymization"]
    C --> D
    D["Quality Filtering"]
    D --> E
    E["Format Conversion for Fine-Tuning"]
    E --> F
    F["FAQ"]
    F --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
from enum import Enum
import json

class MessageRole(str, Enum):
    USER = "user"
    ASSISTANT = "assistant"
    SYSTEM = "system"
    TOOL = "tool"

@dataclass
class Message:
    role: MessageRole
    content: str
    timestamp: Optional[datetime] = None
    tool_name: Optional[str] = None
    tool_args: Optional[dict] = None

@dataclass
class Conversation:
    id: str
    messages: List[Message]
    metadata: dict
    source: str

class LogExtractor:
    async def extract_from_db(self, db_pool) -> List[Conversation]:
        async with db_pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT
                    c.id,
                    c.created_at,
                    c.metadata,
                    json_agg(
                        json_build_object(
                            'role', m.role,
                            'content', m.content,
                            'timestamp', m.created_at,
                            'tool_name', m.tool_name,
                            'tool_args', m.tool_args
                        ) ORDER BY m.created_at
                    ) AS messages
                FROM conversations c
                JOIN messages m ON m.conversation_id = c.id
                WHERE c.created_at >= NOW() - INTERVAL '7 days'
                GROUP BY c.id, c.created_at, c.metadata
            """)

        conversations = []
        for row in rows:
            messages = [
                Message(
                    role=MessageRole(m["role"]),
                    content=m["content"],
                    timestamp=m.get("timestamp"),
                    tool_name=m.get("tool_name"),
                    tool_args=m.get("tool_args"),
                )
                for m in row["messages"]
            ]
            conversations.append(Conversation(
                id=str(row["id"]),
                messages=messages,
                metadata=dict(row["metadata"]) if row["metadata"] else {},
                source="database",
            ))
        return conversations

PII Anonymization

Training data must never contain personally identifiable information. Build a pipeline that detects and replaces PII before any data leaves the extraction stage.

import re
from typing import Dict, List

class PIIAnonymizer:
    PATTERNS = {
        "email": (
            r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
            "[EMAIL_REDACTED]"
        ),
        "phone": (
            r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "[PHONE_REDACTED]"
        ),
        "ssn": (
            r"\b\d{3}-\d{2}-\d{4}\b",
            "[SSN_REDACTED]"
        ),
        "credit_card": (
            r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
            "[CC_REDACTED]"
        ),
        "ip_address": (
            r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
            "[IP_REDACTED]"
        ),
    }

    def __init__(self, custom_patterns: Dict[str, tuple] = None):
        self.patterns = {**self.PATTERNS}
        if custom_patterns:
            self.patterns.update(custom_patterns)
        self.stats = {key: 0 for key in self.patterns}

    def anonymize_text(self, text: str) -> str:
        for name, (pattern, replacement) in self.patterns.items():
            matches = re.findall(pattern, text)
            self.stats[name] += len(matches)
            text = re.sub(pattern, replacement, text)
        return text

    def anonymize_conversation(
        self, conv: Conversation
    ) -> Conversation:
        clean_messages = []
        for msg in conv.messages:
            clean_messages.append(Message(
                role=msg.role,
                content=self.anonymize_text(msg.content),
                timestamp=msg.timestamp,
                tool_name=msg.tool_name,
                tool_args=(
                    self._anonymize_dict(msg.tool_args)
                    if msg.tool_args else None
                ),
            ))
        return Conversation(
            id=conv.id,
            messages=clean_messages,
            metadata={},  # strip metadata entirely
            source=conv.source,
        )

    def _anonymize_dict(self, d: dict) -> dict:
        result = {}
        for k, v in d.items():
            if isinstance(v, str):
                result[k] = self.anonymize_text(v)
            elif isinstance(v, dict):
                result[k] = self._anonymize_dict(v)
            else:
                result[k] = v
        return result

Quality Filtering

Not every conversation should become training data. Filter out conversations that are too short, contain errors, or represent edge cases that would confuse the model.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

@dataclass
class QualityScore:
    conversation_id: str
    turn_count: int
    avg_response_length: int
    has_tool_use: bool
    has_error: bool
    user_satisfaction: Optional[float]
    passes: bool
    rejection_reason: Optional[str] = None

class QualityFilter:
    def __init__(
        self,
        min_turns: int = 3,
        min_avg_response_length: int = 50,
        max_turns: int = 50,
    ):
        self.min_turns = min_turns
        self.min_avg_response_length = min_avg_response_length
        self.max_turns = max_turns

    def evaluate(self, conv: Conversation) -> QualityScore:
        user_msgs = [m for m in conv.messages if m.role == MessageRole.USER]
        asst_msgs = [m for m in conv.messages if m.role == MessageRole.ASSISTANT]
        turn_count = len(user_msgs)

        avg_length = 0
        if asst_msgs:
            avg_length = sum(len(m.content) for m in asst_msgs) // len(asst_msgs)

        has_tool = any(m.role == MessageRole.TOOL for m in conv.messages)

        error_indicators = [
            "error", "sorry, i cannot", "i don't have access",
            "something went wrong",
        ]
        has_error = any(
            any(ind in m.content.lower() for ind in error_indicators)
            for m in asst_msgs
        )

        passes = True
        reason = None
        if turn_count < self.min_turns:
            passes, reason = False, f"Too few turns: {turn_count}"
        elif turn_count > self.max_turns:
            passes, reason = False, f"Too many turns: {turn_count}"
        elif avg_length < self.min_avg_response_length:
            passes, reason = False, f"Responses too short: {avg_length}"
        elif has_error:
            passes, reason = False, "Contains error responses"

        return QualityScore(
            conversation_id=conv.id,
            turn_count=turn_count,
            avg_response_length=avg_length,
            has_tool_use=has_tool,
            has_error=has_error,
            user_satisfaction=None,
            passes=passes,
            rejection_reason=reason,
        )

Format Conversion for Fine-Tuning

Convert filtered conversations to the JSONL format expected by training APIs.

def to_openai_format(conv: Conversation) -> dict:
    messages = []
    for msg in conv.messages:
        if msg.role == MessageRole.TOOL:
            messages.append({
                "role": "tool",
                "content": msg.content,
                "tool_call_id": msg.tool_name,
            })
        else:
            messages.append({
                "role": msg.role.value,
                "content": msg.content,
            })
    return {"messages": messages}

def export_training_data(
    conversations: List[Conversation],
    output_path: str,
):
    with open(output_path, "w") as f:
        for conv in conversations:
            line = json.dumps(to_openai_format(conv))
            f.write(line + "\n")

FAQ

How do I handle PII that regex patterns miss, like names and addresses?

Regex catches structured PII like emails and phone numbers. For unstructured PII like names and addresses, use a named entity recognition model such as spaCy's en_core_web_lg or a dedicated PII detection service. Run NER as a second pass after regex replacement, and replace detected PERSON, GPE, and ADDRESS entities with placeholders.

How many conversations do I need for effective fine-tuning?

OpenAI recommends a minimum of 50 examples for fine-tuning, but meaningful improvement typically requires 500 to 1,000 high-quality conversations. Quality matters more than quantity — 200 well-filtered conversations outperform 2,000 noisy ones. Start with a small dataset, evaluate the fine-tuned model, and add more data where you see gaps.

Should I include conversations where the agent used tools?

Yes, including tool-use conversations is especially valuable because tool calling is one of the hardest skills for agents to learn. Keep the tool call messages and tool response messages in the training data. This teaches the model when to invoke tools, how to format arguments, and how to synthesize tool outputs into natural responses.


#ETL #TrainingData #ConversationLogs #DataPipelines #PIIAnonymization #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

Table Extraction from Images and PDFs with AI: Building Reliable Data Pipelines

Build an AI-powered table extraction pipeline that detects tables in images and PDFs, recognizes cell boundaries, infers structure, and outputs clean CSV data for downstream consumption.

Learn Agentic AI

Building AI Data Import Agents: Mapping, Cleaning, and Validating Uploaded Data

Create an AI-powered data import pipeline that detects file formats, maps columns to your schema automatically, cleans messy data, and validates records before insertion.

Learn Agentic AI

Building an Agent Analytics Pipeline: Collecting, Storing, and Analyzing Conversation Data

Learn how to build an end-to-end analytics pipeline for AI agents, from event collection and schema design to data warehousing, ETL processing, and query patterns that surface actionable insights.

Learn Agentic AI

Synthetic Data Generation for Fine-Tuning: Using LLMs to Create Training Data

Learn how to use large language models to generate, filter, and validate synthetic training data for fine-tuning smaller models, with techniques for ensuring quality, diversity, and deduplication.

Large Language Models

How Synthetic Data Is Training the Next Generation of AI Models | CallSphere Blog

Synthetic data generation has become a core methodology for training competitive AI models. Learn how leading labs create synthetic training data, maintain quality controls, and avoid model collapse.