---
title: "LLM Observability: Tracing, Logging, and Debugging AI Systems"
description: "A practical guide to implementing observability in LLM applications, covering distributed tracing for multi-step agents, structured logging, cost tracking, quality monitoring, and debugging production issues with tools like LangSmith, Langfuse, and custom solutions."
canonical: https://callsphere.ai/blog/llm-observability-tracing-logging-debugging
category: "Agentic AI"
tags: ["LLM Observability", "Tracing", "Monitoring", "Debugging", "MLOps", "AI Engineering"]
author: "CallSphere Team"
published: 2026-01-10T00:00:00.000Z
updated: 2026-05-06T01:02:40.050Z
---

# LLM Observability: Tracing, Logging, and Debugging AI Systems

> A practical guide to implementing observability in LLM applications, covering distributed tracing for multi-step agents, structured logging, cost tracking, quality monitoring, and debugging production issues with tools like LangSmith, Langfuse, and custom solutions.

## Why LLM Observability Is Different

Traditional application observability tracks request latency, error rates, and resource utilization. LLM applications need all of that plus a new dimension: **output quality**. A 200 OK response that contains a hallucinated answer is a failure that standard monitoring will miss.

LLM observability covers four pillars:

1. **Tracing**: Following the complete execution path through multi-step agent workflows
2. **Quality monitoring**: Detecting degradation in model output quality over time
3. **Cost tracking**: Understanding and optimizing token usage and API spend
4. **Debugging**: Reproducing and diagnosing issues in non-deterministic systems

## Distributed Tracing for LLM Agents

An AI agent making three tool calls, two retrieval queries, and a final generation step is a distributed system. Each step can fail independently, and understanding the full execution path is essential for debugging.

```mermaid
flowchart LR
    APP(["Agent or API"])
    SDK["OTel SDK
GenAI conventions"]
    COL["OTel Collector"]
    subgraph BACKENDS["Backends"]
        TR[("Traces
Tempo or Honeycomb")]
        MET[("Metrics
Prometheus")]
        LOG[("Logs
Loki or ELK")]
    end
    DASH["Grafana plus alerts"]
    PAGE(["Pager"])
    APP --> SDK --> COL
    COL --> TR
    COL --> MET
    COL --> LOG
    TR --> DASH
    MET --> DASH
    LOG --> DASH
    DASH --> PAGE
    style SDK fill:#4f46e5,stroke:#4338ca,color:#fff
    style DASH fill:#f59e0b,stroke:#d97706,color:#1f2937
    style PAGE fill:#dc2626,stroke:#b91c1c,color:#fff
```

### OpenTelemetry-Based Tracing

```python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import functools

# Initialize tracing
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("llm-agent")

def trace_llm_call(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(
            f"llm.{func.__name__}",
            attributes={
                "llm.model": kwargs.get("model", "unknown"),
                "llm.max_tokens": kwargs.get("max_tokens", 0),
            }
        ) as span:
            try:
                result = await func(*args, **kwargs)
                span.set_attribute("llm.input_tokens", result.usage.input_tokens)
                span.set_attribute("llm.output_tokens", result.usage.output_tokens)
                span.set_attribute("llm.stop_reason", result.stop_reason)
                return result
            except Exception as e:
                span.set_status(trace.StatusCode.ERROR, str(e))
                span.record_exception(e)
                raise
    return wrapper

def trace_tool_call(tool_name: str):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            with tracer.start_as_current_span(
                f"tool.{tool_name}",
                attributes={"tool.name": tool_name}
            ) as span:
                result = await func(*args, **kwargs)
                span.set_attribute("tool.result_length", len(str(result)))
                return result
        return wrapper
    return decorator

def trace_retrieval(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        with tracer.start_as_current_span("retrieval") as span:
            results = await func(*args, **kwargs)
            span.set_attribute("retrieval.num_results", len(results))
            span.set_attribute("retrieval.top_score",
                             results[0].score if results else 0)
            return results
    return wrapper
```

### Agent Trace Structure

A typical agent trace looks like this:

```
[Agent Run: 2.3s] agent.handle_request
  |-- [120ms] llm.plan_steps          (input: 450 tokens, output: 180 tokens)
  |-- [340ms] retrieval.search         (query: "refund policy", results: 5)
  |-- [45ms]  tool.validate_order_id   (order: #12345, result: valid)
  |-- [890ms] llm.generate_response    (input: 2100 tokens, output: 340 tokens)
  |-- [15ms]  output.filter            (pii_detected: false)
```

## Structured Logging for LLM Systems

Standard logging (`logger.info("Generated response")`) is nearly useless for debugging LLM issues. Structured logging captures the context needed for investigation:

```python
import structlog
import hashlib

logger = structlog.get_logger()

class LLMLogger:
    @staticmethod
    async def log_request(
        run_id: str,
        model: str,
        messages: list,
        response,
        duration_ms: float,
    ):
        # Hash sensitive content for privacy
        input_hash = hashlib.sha256(
            str(messages).encode()
        ).hexdigest()[:12]

        logger.info(
            "llm.request",
            run_id=run_id,
            model=model,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            total_tokens=response.usage.input_tokens + response.usage.output_tokens,
            duration_ms=round(duration_ms, 2),
            stop_reason=response.stop_reason,
            input_hash=input_hash,
            num_messages=len(messages),
            estimated_cost=calculate_cost(
                model, response.usage.input_tokens, response.usage.output_tokens
            ),
        )

    @staticmethod
    async def log_quality_issue(
        run_id: str,
        issue_type: str,
        details: dict,
    ):
        logger.warning(
            "llm.quality_issue",
            run_id=run_id,
            issue_type=issue_type,
            **details,
        )
```

## Cost Tracking and Optimization

LLM API costs can spiral without visibility. Build cost tracking into your observability layer:

```python
# Pricing as of early 2026 (per million tokens)
MODEL_PRICING = {
    "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
    "claude-haiku-4-20250514": {"input": 0.80, "output": 4.0},
    "claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
    "gpt-4o": {"input": 2.50, "output": 10.0},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
    return (
        (input_tokens / 1_000_000) * pricing["input"] +
        (output_tokens / 1_000_000) * pricing["output"]
    )

class CostTracker:
    def __init__(self, daily_budget: float = 100.0):
        self.daily_budget = daily_budget
        self.daily_spend = 0.0
        self.hourly_spend = {}

    def record(self, model: str, input_tokens: int, output_tokens: int):
        cost = calculate_cost(model, input_tokens, output_tokens)
        self.daily_spend += cost

        hour = datetime.now().strftime("%H")
        self.hourly_spend[hour] = self.hourly_spend.get(hour, 0) + cost

        if self.daily_spend > self.daily_budget * 0.8:
            logger.warning("cost.budget_warning",
                          daily_spend=self.daily_spend,
                          budget=self.daily_budget,
                          utilization=self.daily_spend / self.daily_budget)

        return cost
```

## Quality Monitoring

### Automated Quality Checks

Run lightweight quality checks on every response:

```python
class QualityMonitor:
    def check_response(self, query: str, response: str, context: list[str]) -> dict:
        checks = {
            "length_adequate": len(response) > 50,
            "not_refusal": not any(
                phrase in response.lower()
                for phrase in ["i cannot", "i'm unable", "i don't have"]
            ),
            "no_hallucination_markers": not any(
                phrase in response.lower()
                for phrase in ["as an ai", "i don't have access", "my training data"]
            ),
            "context_referenced": any(
                # Check if response references the provided context
                self._overlap_score(response, ctx) > 0.1
                for ctx in context
            ) if context else True,
        }

        score = sum(checks.values()) / len(checks)
        return {"checks": checks, "score": score, "passed": score >= 0.75}
```

### Drift Detection

Model behavior changes over time due to provider updates, prompt changes, or data distribution shifts. Monitor for drift:

```python
class DriftDetector:
    def __init__(self, baseline_metrics: dict):
        self.baseline = baseline_metrics
        self.window_size = 100
        self.recent_scores = []

    def record(self, quality_score: float, latency_ms: float, tokens: int):
        self.recent_scores.append({
            "quality": quality_score,
            "latency": latency_ms,
            "tokens": tokens,
        })

        if len(self.recent_scores) >= self.window_size:
            current = self._compute_metrics(self.recent_scores[-self.window_size:])
            drift = self._detect_drift(self.baseline, current)
            if drift:
                logger.warning("quality.drift_detected", **drift)
            self.recent_scores = self.recent_scores[-self.window_size:]

    def _detect_drift(self, baseline, current) -> dict | None:
        for metric in ["quality", "latency", "tokens"]:
            baseline_val = baseline[metric]
            current_val = current[metric]
            pct_change = (current_val - baseline_val) / baseline_val
            if abs(pct_change) > 0.15:  # 15% threshold
                return {
                    "metric": metric,
                    "baseline": baseline_val,
                    "current": current_val,
                    "pct_change": round(pct_change * 100, 1),
                }
        return None
```

## Observability Tools Comparison

| Tool | Type | Strengths | Pricing |
| --- | --- | --- | --- |
| LangSmith | Managed | Deep LangChain integration, playground | Free tier + usage-based |
| Langfuse | Open Source | Self-hostable, model-agnostic | Free (self-hosted) or cloud |
| Arize Phoenix | Open Source | Evaluation-focused, embeddings viz | Free |
| Helicone | Managed | Simple proxy setup, cost tracking | Free tier + usage-based |
| Custom (OTel) | DIY | Full control, no vendor lock-in | Infrastructure costs |

## Debugging Production Issues

### The Replay Pattern

Store full request/response pairs so you can replay issues locally:

```python
class RequestRecorder:
    def __init__(self, storage):
        self.storage = storage

    async def record(self, run_id: str, messages: list, response, metadata: dict):
        await self.storage.save({
            "run_id": run_id,
            "timestamp": datetime.utcnow().isoformat(),
            "messages": messages,
            "response": response.model_dump(),
            "metadata": metadata,
        })

    async def replay(self, run_id: str, override_model: str = None):
        """Replay a recorded request, optionally with a different model"""
        record = await self.storage.load(run_id)
        model = override_model or record["metadata"]["model"]
        return await client.messages.create(
            model=model,
            messages=record["messages"],
            max_tokens=record["metadata"].get("max_tokens", 4096),
        )
```

### Common Debugging Scenarios

1. **"The agent gave a wrong answer"**: Pull the full trace, check what context was retrieved, verify the retrieval was relevant, then examine if the generation step misused the context.
2. **"Latency spiked"**: Check trace spans for which step slowed down. Common culprits: retrieval latency (index issues), model provider latency (check status pages), or excessive tool calls (loop detection).
3. **"Costs jumped unexpectedly"**: Query hourly cost data. Look for context window bloat (messages array growing without summarization), retry loops, or a spike in traffic.

## Key Takeaways

LLM observability is not optional for production systems. At minimum, implement structured logging with token counts and costs, distributed tracing for multi-step agents, automated quality checks on every response, and a request recording system for debugging. The investment pays for itself the first time you need to debug a production issue that would otherwise be invisible.

---

Source: https://callsphere.ai/blog/llm-observability-tracing-logging-debugging
