Chat Agent Analytics: Tracking Conversations and Extracting Insights
Build a comprehensive analytics pipeline for chat agents using OpenAI's tracing system to extract intent, sentiment, topics, and performance metrics from every conversation.
Why Chat Agent Analytics Matter
Deploying a chat agent without analytics is like running a business without financial statements. You cannot improve what you do not measure. Conversation analytics help you understand what users actually ask, where the agent struggles, which topics drive the most engagement, and how agent performance changes over time.
In this post, we build a full analytics pipeline: tracing every conversation turn, extracting structured insights (intent, sentiment, topic), storing them for analysis, and displaying key metrics on a dashboard.
Setting Up Tracing with the Agents SDK
OpenAI's Agents SDK includes a built-in tracing system. Every agent run produces a trace with spans for LLM calls, tool invocations, handoffs, and guardrail checks. Tracing is enabled by default:
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
flowchart LR
APP(["Agent or API"])
SDK["OTel SDK<br/>GenAI conventions"]
COL["OTel Collector"]
subgraph BACKENDS["Backends"]
TR[("Traces<br/>Tempo or Honeycomb")]
MET[("Metrics<br/>Prometheus")]
LOG[("Logs<br/>Loki or ELK")]
end
DASH["Grafana plus alerts"]
PAGE(["Pager"])
APP --> SDK --> COL
COL --> TR
COL --> MET
COL --> LOG
TR --> DASH
MET --> DASH
LOG --> DASH
DASH --> PAGE
style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
from agents import Agent, Runner, trace
agent = Agent(
name="Support Agent",
instructions="You are a helpful customer support agent for Acme SaaS.",
)
async def handle_message(user_id: str, message: str):
# The trace wraps the entire interaction with metadata
with trace("support_conversation", metadata={"user_id": user_id}):
result = await Runner.run(agent, input=message)
return result.final_output
Each trace is automatically sent to the OpenAI dashboard, where you can inspect individual conversations, see token usage per turn, and identify slow tool calls.
Custom Trace Processors for Analytics
For production analytics, you need to process traces programmatically. The SDK supports custom trace processors that receive trace data in real time:
from agents.tracing import TracingProcessor, Trace, Span
import json
from datetime import datetime
class AnalyticsTraceProcessor(TracingProcessor):
"""Captures trace data and writes it to our analytics store."""
def __init__(self, analytics_store):
self.store = analytics_store
def on_trace_start(self, trace: Trace) -> None:
self.store.start_session(
trace_id=trace.trace_id,
metadata=trace.metadata,
started_at=datetime.utcnow(),
)
def on_span_end(self, span: Span) -> None:
# Capture LLM call details
if span.span_type == "llm":
self.store.record_llm_call(
trace_id=span.trace_id,
model=span.data.get("model"),
input_tokens=span.data.get("input_tokens", 0),
output_tokens=span.data.get("output_tokens", 0),
latency_ms=span.duration_ms,
)
# Capture tool invocations
elif span.span_type == "tool":
self.store.record_tool_call(
trace_id=span.trace_id,
tool_name=span.data.get("tool_name"),
success=span.data.get("success", True),
latency_ms=span.duration_ms,
)
def on_trace_end(self, trace: Trace) -> None:
self.store.end_session(
trace_id=trace.trace_id,
ended_at=datetime.utcnow(),
)
Register the processor at startup:
from agents.tracing import set_trace_processors
analytics_store = PostgresAnalyticsStore(dsn="postgresql://...")
processor = AnalyticsTraceProcessor(analytics_store)
set_trace_processors([processor])
Extracting Intent, Sentiment, and Topics
Raw conversation logs are not enough. You need structured signals. We build an extraction agent that analyzes each conversation after it completes:
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
from pydantic import BaseModel, Field
from typing import List
class ConversationInsights(BaseModel):
primary_intent: str = Field(
description="The user's main goal, e.g. 'billing_inquiry', 'bug_report', 'feature_request'"
)
secondary_intents: List[str] = Field(
default_factory=list,
description="Any additional intents detected"
)
sentiment: str = Field(
description="Overall sentiment: positive, neutral, negative, frustrated"
)
sentiment_trajectory: str = Field(
description="How sentiment changed: improved, stable, declined"
)
topics: List[str] = Field(
description="Key topics discussed, e.g. ['pricing', 'enterprise plan', 'SSO']"
)
resolution_status: str = Field(
description="resolved, partially_resolved, unresolved, escalated"
)
effort_score: int = Field(
ge=1, le=5,
description="Estimated customer effort: 1=effortless, 5=very difficult"
)
insights_agent = Agent(
name="Conversation Analyst",
instructions="""Analyze the provided conversation transcript and extract
structured insights. Be precise about intent classification. Assess
sentiment based on word choice, punctuation, and tone shifts. Identify
all distinct topics discussed. Evaluate whether the user's issue was
actually resolved based on the conversation outcome.""",
output_type=ConversationInsights,
)
async def analyze_conversation(transcript: str) -> ConversationInsights:
result = await Runner.run(
insights_agent,
input=f"Analyze this conversation transcript:\n\n{transcript}",
)
return result.final_output_as(ConversationInsights)
Building the Analytics Pipeline
The pipeline runs asynchronously after each conversation:
import asyncio
from dataclasses import dataclass
from datetime import datetime
import asyncpg
@dataclass
class ConversationRecord:
conversation_id: str
user_id: str
transcript: str
turn_count: int
started_at: datetime
ended_at: datetime
total_tokens: int
class AnalyticsPipeline:
def __init__(self, db_pool: asyncpg.Pool):
self.db = db_pool
async def process_conversation(self, record: ConversationRecord):
# Step 1: Extract insights using the analysis agent
insights = await analyze_conversation(record.transcript)
# Step 2: Store structured analytics
await self.db.execute(
"""
INSERT INTO conversation_analytics (
conversation_id, user_id, primary_intent,
secondary_intents, sentiment, sentiment_trajectory,
topics, resolution_status, effort_score,
turn_count, total_tokens, duration_seconds,
created_at
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
""",
record.conversation_id,
record.user_id,
insights.primary_intent,
insights.secondary_intents,
insights.sentiment,
insights.sentiment_trajectory,
insights.topics,
insights.resolution_status,
insights.effort_score,
record.turn_count,
record.total_tokens,
(record.ended_at - record.started_at).total_seconds(),
datetime.utcnow(),
)
# Step 3: Update real-time counters
await self._update_counters(insights)
async def _update_counters(self, insights: ConversationInsights):
# Increment intent counters
await self.db.execute(
"""
INSERT INTO intent_counts (intent, count, last_seen)
VALUES ($1, 1, NOW())
ON CONFLICT (intent) DO UPDATE
SET count = intent_counts.count + 1, last_seen = NOW()
""",
insights.primary_intent,
)
# Update topic frequency
for topic in insights.topics:
await self.db.execute(
"""
INSERT INTO topic_counts (topic, count, last_seen)
VALUES ($1, 1, NOW())
ON CONFLICT (topic) DO UPDATE
SET count = topic_counts.count + 1, last_seen = NOW()
""",
topic,
)
Database Schema for Analytics
CREATE TABLE conversation_analytics (
id SERIAL PRIMARY KEY,
conversation_id TEXT UNIQUE NOT NULL,
user_id TEXT NOT NULL,
primary_intent TEXT NOT NULL,
secondary_intents TEXT[] DEFAULT '{}',
sentiment TEXT NOT NULL,
sentiment_trajectory TEXT NOT NULL,
topics TEXT[] DEFAULT '{}',
resolution_status TEXT NOT NULL,
effort_score INTEGER NOT NULL,
turn_count INTEGER NOT NULL,
total_tokens INTEGER NOT NULL,
duration_seconds FLOAT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_analytics_intent ON conversation_analytics(primary_intent);
CREATE INDEX idx_analytics_sentiment ON conversation_analytics(sentiment);
CREATE INDEX idx_analytics_created ON conversation_analytics(created_at);
CREATE TABLE intent_counts (
intent TEXT PRIMARY KEY,
count INTEGER DEFAULT 0,
last_seen TIMESTAMP
);
CREATE TABLE topic_counts (
topic TEXT PRIMARY KEY,
count INTEGER DEFAULT 0,
last_seen TIMESTAMP
);
Dashboard Metrics Queries
Here are the key metrics every chat agent dashboard needs:
class DashboardMetrics:
def __init__(self, db: asyncpg.Pool):
self.db = db
async def get_overview(self, days: int = 7) -> dict:
row = await self.db.fetchrow(
"""
SELECT
COUNT(*) as total_conversations,
AVG(turn_count) as avg_turns,
AVG(duration_seconds) as avg_duration,
AVG(effort_score) as avg_effort,
AVG(total_tokens) as avg_tokens,
COUNT(*) FILTER (WHERE resolution_status = 'resolved')
* 100.0 / NULLIF(COUNT(*), 0) as resolution_rate,
COUNT(*) FILTER (WHERE sentiment = 'negative')
* 100.0 / NULLIF(COUNT(*), 0) as negative_rate
FROM conversation_analytics
WHERE created_at > NOW() - INTERVAL '%s days'
""",
days,
)
return dict(row)
async def get_top_intents(self, limit: int = 10) -> list:
rows = await self.db.fetch(
"""
SELECT intent, count
FROM intent_counts
ORDER BY count DESC
LIMIT $1
""",
limit,
)
return [dict(r) for r in rows]
async def get_sentiment_trend(self, days: int = 30) -> list:
rows = await self.db.fetch(
"""
SELECT
DATE(created_at) as day,
COUNT(*) FILTER (WHERE sentiment = 'positive') as positive,
COUNT(*) FILTER (WHERE sentiment = 'neutral') as neutral,
COUNT(*) FILTER (WHERE sentiment = 'negative') as negative,
COUNT(*) FILTER (WHERE sentiment = 'frustrated') as frustrated
FROM conversation_analytics
WHERE created_at > NOW() - INTERVAL '%s days'
GROUP BY DATE(created_at)
ORDER BY day
""",
days,
)
return [dict(r) for r in rows]
async def get_unresolved_topics(self) -> list:
rows = await self.db.fetch(
"""
SELECT UNNEST(topics) as topic, COUNT(*) as count
FROM conversation_analytics
WHERE resolution_status = 'unresolved'
AND created_at > NOW() - INTERVAL '7 days'
GROUP BY topic
ORDER BY count DESC
LIMIT 10
"""
)
return [dict(r) for r in rows]
FastAPI Dashboard Endpoint
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()
async def get_db():
pool = await asyncpg.create_pool(dsn="postgresql://...")
try:
yield pool
finally:
await pool.close()
@app.get("/api/analytics/dashboard")
async def dashboard(days: int = 7, db: asyncpg.Pool = Depends(get_db)):
metrics = DashboardMetrics(db)
overview = await metrics.get_overview(days)
intents = await metrics.get_top_intents()
sentiment = await metrics.get_sentiment_trend(days)
unresolved = await metrics.get_unresolved_topics()
return {
"overview": overview,
"top_intents": intents,
"sentiment_trend": sentiment,
"unresolved_topics": unresolved,
}
Alerting on Anomalies
Set up alerts for when metrics deviate from baselines:
async def check_anomalies(metrics: DashboardMetrics):
overview = await metrics.get_overview(days=1)
# Alert if negative sentiment exceeds 25%
if overview["negative_rate"] and overview["negative_rate"] > 25:
await send_alert(
channel="slack",
message=f"High negative sentiment: {overview['negative_rate']:.1f}% "
f"in the last 24 hours (threshold: 25%)",
)
# Alert if resolution rate drops below 60%
if overview["resolution_rate"] and overview["resolution_rate"] < 60:
await send_alert(
channel="slack",
message=f"Low resolution rate: {overview['resolution_rate']:.1f}% "
f"(threshold: 60%)",
)
This analytics pipeline gives you full visibility into your chat agent's performance and enables data-driven improvements to instructions, tools, and conversation design.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.