Skip to content
Learn Agentic AI
Learn Agentic AI13 min read4 views

Chat Agent Analytics: Tracking Conversations and Extracting Insights

Build a comprehensive analytics pipeline for chat agents using OpenAI's tracing system to extract intent, sentiment, topics, and performance metrics from every conversation.

Why Chat Agent Analytics Matter

Deploying a chat agent without analytics is like running a business without financial statements. You cannot improve what you do not measure. Conversation analytics help you understand what users actually ask, where the agent struggles, which topics drive the most engagement, and how agent performance changes over time.

In this post, we build a full analytics pipeline: tracing every conversation turn, extracting structured insights (intent, sentiment, topic), storing them for analysis, and displaying key metrics on a dashboard.

Setting Up Tracing with the Agents SDK

OpenAI's Agents SDK includes a built-in tracing system. Every agent run produces a trace with spans for LLM calls, tool invocations, handoffs, and guardrail checks. Tracing is enabled by default:

flowchart TD
    START["Chat Agent Analytics: Tracking Conversations and …"] --> A
    A["Why Chat Agent Analytics Matter"]
    A --> B
    B["Setting Up Tracing with the Agents SDK"]
    B --> C
    C["Custom Trace Processors for Analytics"]
    C --> D
    D["Extracting Intent, Sentiment, and Topics"]
    D --> E
    E["Building the Analytics Pipeline"]
    E --> F
    F["Database Schema for Analytics"]
    F --> G
    G["Dashboard Metrics Queries"]
    G --> H
    H["FastAPI Dashboard Endpoint"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from agents import Agent, Runner, trace

agent = Agent(
    name="Support Agent",
    instructions="You are a helpful customer support agent for Acme SaaS.",
)

async def handle_message(user_id: str, message: str):
    # The trace wraps the entire interaction with metadata
    with trace("support_conversation", metadata={"user_id": user_id}):
        result = await Runner.run(agent, input=message)
        return result.final_output

Each trace is automatically sent to the OpenAI dashboard, where you can inspect individual conversations, see token usage per turn, and identify slow tool calls.

Custom Trace Processors for Analytics

For production analytics, you need to process traces programmatically. The SDK supports custom trace processors that receive trace data in real time:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from agents.tracing import TracingProcessor, Trace, Span
import json
from datetime import datetime

class AnalyticsTraceProcessor(TracingProcessor):
    """Captures trace data and writes it to our analytics store."""

    def __init__(self, analytics_store):
        self.store = analytics_store

    def on_trace_start(self, trace: Trace) -> None:
        self.store.start_session(
            trace_id=trace.trace_id,
            metadata=trace.metadata,
            started_at=datetime.utcnow(),
        )

    def on_span_end(self, span: Span) -> None:
        # Capture LLM call details
        if span.span_type == "llm":
            self.store.record_llm_call(
                trace_id=span.trace_id,
                model=span.data.get("model"),
                input_tokens=span.data.get("input_tokens", 0),
                output_tokens=span.data.get("output_tokens", 0),
                latency_ms=span.duration_ms,
            )
        # Capture tool invocations
        elif span.span_type == "tool":
            self.store.record_tool_call(
                trace_id=span.trace_id,
                tool_name=span.data.get("tool_name"),
                success=span.data.get("success", True),
                latency_ms=span.duration_ms,
            )

    def on_trace_end(self, trace: Trace) -> None:
        self.store.end_session(
            trace_id=trace.trace_id,
            ended_at=datetime.utcnow(),
        )

Register the processor at startup:

from agents.tracing import set_trace_processors

analytics_store = PostgresAnalyticsStore(dsn="postgresql://...")
processor = AnalyticsTraceProcessor(analytics_store)
set_trace_processors([processor])

Extracting Intent, Sentiment, and Topics

Raw conversation logs are not enough. You need structured signals. We build an extraction agent that analyzes each conversation after it completes:

from pydantic import BaseModel, Field
from typing import List

class ConversationInsights(BaseModel):
    primary_intent: str = Field(
        description="The user's main goal, e.g. 'billing_inquiry', 'bug_report', 'feature_request'"
    )
    secondary_intents: List[str] = Field(
        default_factory=list,
        description="Any additional intents detected"
    )
    sentiment: str = Field(
        description="Overall sentiment: positive, neutral, negative, frustrated"
    )
    sentiment_trajectory: str = Field(
        description="How sentiment changed: improved, stable, declined"
    )
    topics: List[str] = Field(
        description="Key topics discussed, e.g. ['pricing', 'enterprise plan', 'SSO']"
    )
    resolution_status: str = Field(
        description="resolved, partially_resolved, unresolved, escalated"
    )
    effort_score: int = Field(
        ge=1, le=5,
        description="Estimated customer effort: 1=effortless, 5=very difficult"
    )

insights_agent = Agent(
    name="Conversation Analyst",
    instructions="""Analyze the provided conversation transcript and extract
    structured insights. Be precise about intent classification. Assess
    sentiment based on word choice, punctuation, and tone shifts. Identify
    all distinct topics discussed. Evaluate whether the user's issue was
    actually resolved based on the conversation outcome.""",
    output_type=ConversationInsights,
)

async def analyze_conversation(transcript: str) -> ConversationInsights:
    result = await Runner.run(
        insights_agent,
        input=f"Analyze this conversation transcript:\n\n{transcript}",
    )
    return result.final_output_as(ConversationInsights)

Building the Analytics Pipeline

The pipeline runs asynchronously after each conversation:

import asyncio
from dataclasses import dataclass
from datetime import datetime
import asyncpg

@dataclass
class ConversationRecord:
    conversation_id: str
    user_id: str
    transcript: str
    turn_count: int
    started_at: datetime
    ended_at: datetime
    total_tokens: int

class AnalyticsPipeline:
    def __init__(self, db_pool: asyncpg.Pool):
        self.db = db_pool

    async def process_conversation(self, record: ConversationRecord):
        # Step 1: Extract insights using the analysis agent
        insights = await analyze_conversation(record.transcript)

        # Step 2: Store structured analytics
        await self.db.execute(
            """
            INSERT INTO conversation_analytics (
                conversation_id, user_id, primary_intent,
                secondary_intents, sentiment, sentiment_trajectory,
                topics, resolution_status, effort_score,
                turn_count, total_tokens, duration_seconds,
                created_at
            ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
            """,
            record.conversation_id,
            record.user_id,
            insights.primary_intent,
            insights.secondary_intents,
            insights.sentiment,
            insights.sentiment_trajectory,
            insights.topics,
            insights.resolution_status,
            insights.effort_score,
            record.turn_count,
            record.total_tokens,
            (record.ended_at - record.started_at).total_seconds(),
            datetime.utcnow(),
        )

        # Step 3: Update real-time counters
        await self._update_counters(insights)

    async def _update_counters(self, insights: ConversationInsights):
        # Increment intent counters
        await self.db.execute(
            """
            INSERT INTO intent_counts (intent, count, last_seen)
            VALUES ($1, 1, NOW())
            ON CONFLICT (intent) DO UPDATE
            SET count = intent_counts.count + 1, last_seen = NOW()
            """,
            insights.primary_intent,
        )

        # Update topic frequency
        for topic in insights.topics:
            await self.db.execute(
                """
                INSERT INTO topic_counts (topic, count, last_seen)
                VALUES ($1, 1, NOW())
                ON CONFLICT (topic) DO UPDATE
                SET count = topic_counts.count + 1, last_seen = NOW()
                """,
                topic,
            )

Database Schema for Analytics

CREATE TABLE conversation_analytics (
    id SERIAL PRIMARY KEY,
    conversation_id TEXT UNIQUE NOT NULL,
    user_id TEXT NOT NULL,
    primary_intent TEXT NOT NULL,
    secondary_intents TEXT[] DEFAULT '{}',
    sentiment TEXT NOT NULL,
    sentiment_trajectory TEXT NOT NULL,
    topics TEXT[] DEFAULT '{}',
    resolution_status TEXT NOT NULL,
    effort_score INTEGER NOT NULL,
    turn_count INTEGER NOT NULL,
    total_tokens INTEGER NOT NULL,
    duration_seconds FLOAT NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_analytics_intent ON conversation_analytics(primary_intent);
CREATE INDEX idx_analytics_sentiment ON conversation_analytics(sentiment);
CREATE INDEX idx_analytics_created ON conversation_analytics(created_at);

CREATE TABLE intent_counts (
    intent TEXT PRIMARY KEY,
    count INTEGER DEFAULT 0,
    last_seen TIMESTAMP
);

CREATE TABLE topic_counts (
    topic TEXT PRIMARY KEY,
    count INTEGER DEFAULT 0,
    last_seen TIMESTAMP
);

Dashboard Metrics Queries

Here are the key metrics every chat agent dashboard needs:

class DashboardMetrics:
    def __init__(self, db: asyncpg.Pool):
        self.db = db

    async def get_overview(self, days: int = 7) -> dict:
        row = await self.db.fetchrow(
            """
            SELECT
                COUNT(*) as total_conversations,
                AVG(turn_count) as avg_turns,
                AVG(duration_seconds) as avg_duration,
                AVG(effort_score) as avg_effort,
                AVG(total_tokens) as avg_tokens,
                COUNT(*) FILTER (WHERE resolution_status = 'resolved')
                    * 100.0 / NULLIF(COUNT(*), 0) as resolution_rate,
                COUNT(*) FILTER (WHERE sentiment = 'negative')
                    * 100.0 / NULLIF(COUNT(*), 0) as negative_rate
            FROM conversation_analytics
            WHERE created_at > NOW() - INTERVAL '%s days'
            """,
            days,
        )
        return dict(row)

    async def get_top_intents(self, limit: int = 10) -> list:
        rows = await self.db.fetch(
            """
            SELECT intent, count
            FROM intent_counts
            ORDER BY count DESC
            LIMIT $1
            """,
            limit,
        )
        return [dict(r) for r in rows]

    async def get_sentiment_trend(self, days: int = 30) -> list:
        rows = await self.db.fetch(
            """
            SELECT
                DATE(created_at) as day,
                COUNT(*) FILTER (WHERE sentiment = 'positive') as positive,
                COUNT(*) FILTER (WHERE sentiment = 'neutral') as neutral,
                COUNT(*) FILTER (WHERE sentiment = 'negative') as negative,
                COUNT(*) FILTER (WHERE sentiment = 'frustrated') as frustrated
            FROM conversation_analytics
            WHERE created_at > NOW() - INTERVAL '%s days'
            GROUP BY DATE(created_at)
            ORDER BY day
            """,
            days,
        )
        return [dict(r) for r in rows]

    async def get_unresolved_topics(self) -> list:
        rows = await self.db.fetch(
            """
            SELECT UNNEST(topics) as topic, COUNT(*) as count
            FROM conversation_analytics
            WHERE resolution_status = 'unresolved'
            AND created_at > NOW() - INTERVAL '7 days'
            GROUP BY topic
            ORDER BY count DESC
            LIMIT 10
            """
        )
        return [dict(r) for r in rows]

FastAPI Dashboard Endpoint

from fastapi import FastAPI, Depends
import asyncpg

app = FastAPI()

async def get_db():
    pool = await asyncpg.create_pool(dsn="postgresql://...")
    try:
        yield pool
    finally:
        await pool.close()

@app.get("/api/analytics/dashboard")
async def dashboard(days: int = 7, db: asyncpg.Pool = Depends(get_db)):
    metrics = DashboardMetrics(db)
    overview = await metrics.get_overview(days)
    intents = await metrics.get_top_intents()
    sentiment = await metrics.get_sentiment_trend(days)
    unresolved = await metrics.get_unresolved_topics()

    return {
        "overview": overview,
        "top_intents": intents,
        "sentiment_trend": sentiment,
        "unresolved_topics": unresolved,
    }

Alerting on Anomalies

Set up alerts for when metrics deviate from baselines:

async def check_anomalies(metrics: DashboardMetrics):
    overview = await metrics.get_overview(days=1)

    # Alert if negative sentiment exceeds 25%
    if overview["negative_rate"] and overview["negative_rate"] > 25:
        await send_alert(
            channel="slack",
            message=f"High negative sentiment: {overview['negative_rate']:.1f}% "
                    f"in the last 24 hours (threshold: 25%)",
        )

    # Alert if resolution rate drops below 60%
    if overview["resolution_rate"] and overview["resolution_rate"] < 60:
        await send_alert(
            channel="slack",
            message=f"Low resolution rate: {overview['resolution_rate']:.1f}% "
                    f"(threshold: 60%)",
        )

This analytics pipeline gives you full visibility into your chat agent's performance and enables data-driven improvements to instructions, tools, and conversation design.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.