Chat Agent Analytics: Tracking Conversations and Extracting Insights
Build a comprehensive analytics pipeline for chat agents using OpenAI's tracing system to extract intent, sentiment, topics, and performance metrics from every conversation.
Why Chat Agent Analytics Matter
Deploying a chat agent without analytics is like running a business without financial statements. You cannot improve what you do not measure. Conversation analytics help you understand what users actually ask, where the agent struggles, which topics drive the most engagement, and how agent performance changes over time.
In this post, we build a full analytics pipeline: tracing every conversation turn, extracting structured insights (intent, sentiment, topic), storing them for analysis, and displaying key metrics on a dashboard.
Setting Up Tracing with the Agents SDK
OpenAI's Agents SDK includes a built-in tracing system. Every agent run produces a trace with spans for LLM calls, tool invocations, handoffs, and guardrail checks. Tracing is enabled by default:
flowchart TD
START["Chat Agent Analytics: Tracking Conversations and …"] --> A
A["Why Chat Agent Analytics Matter"]
A --> B
B["Setting Up Tracing with the Agents SDK"]
B --> C
C["Custom Trace Processors for Analytics"]
C --> D
D["Extracting Intent, Sentiment, and Topics"]
D --> E
E["Building the Analytics Pipeline"]
E --> F
F["Database Schema for Analytics"]
F --> G
G["Dashboard Metrics Queries"]
G --> H
H["FastAPI Dashboard Endpoint"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from agents import Agent, Runner, trace
agent = Agent(
name="Support Agent",
instructions="You are a helpful customer support agent for Acme SaaS.",
)
async def handle_message(user_id: str, message: str):
# The trace wraps the entire interaction with metadata
with trace("support_conversation", metadata={"user_id": user_id}):
result = await Runner.run(agent, input=message)
return result.final_output
Each trace is automatically sent to the OpenAI dashboard, where you can inspect individual conversations, see token usage per turn, and identify slow tool calls.
Custom Trace Processors for Analytics
For production analytics, you need to process traces programmatically. The SDK supports custom trace processors that receive trace data in real time:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from agents.tracing import TracingProcessor, Trace, Span
import json
from datetime import datetime
class AnalyticsTraceProcessor(TracingProcessor):
"""Captures trace data and writes it to our analytics store."""
def __init__(self, analytics_store):
self.store = analytics_store
def on_trace_start(self, trace: Trace) -> None:
self.store.start_session(
trace_id=trace.trace_id,
metadata=trace.metadata,
started_at=datetime.utcnow(),
)
def on_span_end(self, span: Span) -> None:
# Capture LLM call details
if span.span_type == "llm":
self.store.record_llm_call(
trace_id=span.trace_id,
model=span.data.get("model"),
input_tokens=span.data.get("input_tokens", 0),
output_tokens=span.data.get("output_tokens", 0),
latency_ms=span.duration_ms,
)
# Capture tool invocations
elif span.span_type == "tool":
self.store.record_tool_call(
trace_id=span.trace_id,
tool_name=span.data.get("tool_name"),
success=span.data.get("success", True),
latency_ms=span.duration_ms,
)
def on_trace_end(self, trace: Trace) -> None:
self.store.end_session(
trace_id=trace.trace_id,
ended_at=datetime.utcnow(),
)
Register the processor at startup:
from agents.tracing import set_trace_processors
analytics_store = PostgresAnalyticsStore(dsn="postgresql://...")
processor = AnalyticsTraceProcessor(analytics_store)
set_trace_processors([processor])
Extracting Intent, Sentiment, and Topics
Raw conversation logs are not enough. You need structured signals. We build an extraction agent that analyzes each conversation after it completes:
from pydantic import BaseModel, Field
from typing import List
class ConversationInsights(BaseModel):
primary_intent: str = Field(
description="The user's main goal, e.g. 'billing_inquiry', 'bug_report', 'feature_request'"
)
secondary_intents: List[str] = Field(
default_factory=list,
description="Any additional intents detected"
)
sentiment: str = Field(
description="Overall sentiment: positive, neutral, negative, frustrated"
)
sentiment_trajectory: str = Field(
description="How sentiment changed: improved, stable, declined"
)
topics: List[str] = Field(
description="Key topics discussed, e.g. ['pricing', 'enterprise plan', 'SSO']"
)
resolution_status: str = Field(
description="resolved, partially_resolved, unresolved, escalated"
)
effort_score: int = Field(
ge=1, le=5,
description="Estimated customer effort: 1=effortless, 5=very difficult"
)
insights_agent = Agent(
name="Conversation Analyst",
instructions="""Analyze the provided conversation transcript and extract
structured insights. Be precise about intent classification. Assess
sentiment based on word choice, punctuation, and tone shifts. Identify
all distinct topics discussed. Evaluate whether the user's issue was
actually resolved based on the conversation outcome.""",
output_type=ConversationInsights,
)
async def analyze_conversation(transcript: str) -> ConversationInsights:
result = await Runner.run(
insights_agent,
input=f"Analyze this conversation transcript:\n\n{transcript}",
)
return result.final_output_as(ConversationInsights)
Building the Analytics Pipeline
The pipeline runs asynchronously after each conversation:
import asyncio
from dataclasses import dataclass
from datetime import datetime
import asyncpg
@dataclass
class ConversationRecord:
conversation_id: str
user_id: str
transcript: str
turn_count: int
started_at: datetime
ended_at: datetime
total_tokens: int
class AnalyticsPipeline:
def __init__(self, db_pool: asyncpg.Pool):
self.db = db_pool
async def process_conversation(self, record: ConversationRecord):
# Step 1: Extract insights using the analysis agent
insights = await analyze_conversation(record.transcript)
# Step 2: Store structured analytics
await self.db.execute(
"""
INSERT INTO conversation_analytics (
conversation_id, user_id, primary_intent,
secondary_intents, sentiment, sentiment_trajectory,
topics, resolution_status, effort_score,
turn_count, total_tokens, duration_seconds,
created_at
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
""",
record.conversation_id,
record.user_id,
insights.primary_intent,
insights.secondary_intents,
insights.sentiment,
insights.sentiment_trajectory,
insights.topics,
insights.resolution_status,
insights.effort_score,
record.turn_count,
record.total_tokens,
(record.ended_at - record.started_at).total_seconds(),
datetime.utcnow(),
)
# Step 3: Update real-time counters
await self._update_counters(insights)
async def _update_counters(self, insights: ConversationInsights):
# Increment intent counters
await self.db.execute(
"""
INSERT INTO intent_counts (intent, count, last_seen)
VALUES ($1, 1, NOW())
ON CONFLICT (intent) DO UPDATE
SET count = intent_counts.count + 1, last_seen = NOW()
""",
insights.primary_intent,
)
# Update topic frequency
for topic in insights.topics:
await self.db.execute(
"""
INSERT INTO topic_counts (topic, count, last_seen)
VALUES ($1, 1, NOW())
ON CONFLICT (topic) DO UPDATE
SET count = topic_counts.count + 1, last_seen = NOW()
""",
topic,
)
Database Schema for Analytics
CREATE TABLE conversation_analytics (
id SERIAL PRIMARY KEY,
conversation_id TEXT UNIQUE NOT NULL,
user_id TEXT NOT NULL,
primary_intent TEXT NOT NULL,
secondary_intents TEXT[] DEFAULT '{}',
sentiment TEXT NOT NULL,
sentiment_trajectory TEXT NOT NULL,
topics TEXT[] DEFAULT '{}',
resolution_status TEXT NOT NULL,
effort_score INTEGER NOT NULL,
turn_count INTEGER NOT NULL,
total_tokens INTEGER NOT NULL,
duration_seconds FLOAT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_analytics_intent ON conversation_analytics(primary_intent);
CREATE INDEX idx_analytics_sentiment ON conversation_analytics(sentiment);
CREATE INDEX idx_analytics_created ON conversation_analytics(created_at);
CREATE TABLE intent_counts (
intent TEXT PRIMARY KEY,
count INTEGER DEFAULT 0,
last_seen TIMESTAMP
);
CREATE TABLE topic_counts (
topic TEXT PRIMARY KEY,
count INTEGER DEFAULT 0,
last_seen TIMESTAMP
);
Dashboard Metrics Queries
Here are the key metrics every chat agent dashboard needs:
class DashboardMetrics:
def __init__(self, db: asyncpg.Pool):
self.db = db
async def get_overview(self, days: int = 7) -> dict:
row = await self.db.fetchrow(
"""
SELECT
COUNT(*) as total_conversations,
AVG(turn_count) as avg_turns,
AVG(duration_seconds) as avg_duration,
AVG(effort_score) as avg_effort,
AVG(total_tokens) as avg_tokens,
COUNT(*) FILTER (WHERE resolution_status = 'resolved')
* 100.0 / NULLIF(COUNT(*), 0) as resolution_rate,
COUNT(*) FILTER (WHERE sentiment = 'negative')
* 100.0 / NULLIF(COUNT(*), 0) as negative_rate
FROM conversation_analytics
WHERE created_at > NOW() - INTERVAL '%s days'
""",
days,
)
return dict(row)
async def get_top_intents(self, limit: int = 10) -> list:
rows = await self.db.fetch(
"""
SELECT intent, count
FROM intent_counts
ORDER BY count DESC
LIMIT $1
""",
limit,
)
return [dict(r) for r in rows]
async def get_sentiment_trend(self, days: int = 30) -> list:
rows = await self.db.fetch(
"""
SELECT
DATE(created_at) as day,
COUNT(*) FILTER (WHERE sentiment = 'positive') as positive,
COUNT(*) FILTER (WHERE sentiment = 'neutral') as neutral,
COUNT(*) FILTER (WHERE sentiment = 'negative') as negative,
COUNT(*) FILTER (WHERE sentiment = 'frustrated') as frustrated
FROM conversation_analytics
WHERE created_at > NOW() - INTERVAL '%s days'
GROUP BY DATE(created_at)
ORDER BY day
""",
days,
)
return [dict(r) for r in rows]
async def get_unresolved_topics(self) -> list:
rows = await self.db.fetch(
"""
SELECT UNNEST(topics) as topic, COUNT(*) as count
FROM conversation_analytics
WHERE resolution_status = 'unresolved'
AND created_at > NOW() - INTERVAL '7 days'
GROUP BY topic
ORDER BY count DESC
LIMIT 10
"""
)
return [dict(r) for r in rows]
FastAPI Dashboard Endpoint
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()
async def get_db():
pool = await asyncpg.create_pool(dsn="postgresql://...")
try:
yield pool
finally:
await pool.close()
@app.get("/api/analytics/dashboard")
async def dashboard(days: int = 7, db: asyncpg.Pool = Depends(get_db)):
metrics = DashboardMetrics(db)
overview = await metrics.get_overview(days)
intents = await metrics.get_top_intents()
sentiment = await metrics.get_sentiment_trend(days)
unresolved = await metrics.get_unresolved_topics()
return {
"overview": overview,
"top_intents": intents,
"sentiment_trend": sentiment,
"unresolved_topics": unresolved,
}
Alerting on Anomalies
Set up alerts for when metrics deviate from baselines:
async def check_anomalies(metrics: DashboardMetrics):
overview = await metrics.get_overview(days=1)
# Alert if negative sentiment exceeds 25%
if overview["negative_rate"] and overview["negative_rate"] > 25:
await send_alert(
channel="slack",
message=f"High negative sentiment: {overview['negative_rate']:.1f}% "
f"in the last 24 hours (threshold: 25%)",
)
# Alert if resolution rate drops below 60%
if overview["resolution_rate"] and overview["resolution_rate"] < 60:
await send_alert(
channel="slack",
message=f"Low resolution rate: {overview['resolution_rate']:.1f}% "
f"(threshold: 60%)",
)
This analytics pipeline gives you full visibility into your chat agent's performance and enables data-driven improvements to instructions, tools, and conversation design.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.