TL;DR

Traditional observability breaks down for AI Agents—you cannot use HTTP status codes to determine whether a hallucinated answer is a bug. This guide presents a three-pillar architecture for Agent observability: Distributed Tracing captures multi-step decision paths, Evaluation Engineering quantifies output quality, and Debugging enables root cause analysis. With production-ready code using OpenTelemetry, LangSmith, LangFuse, and Arize Phoenix, you will build a closed-loop observability system from development through operations.

This article extends the observability concepts from AI Agent: 10 Pitfalls from POC to Production. For the broader operations platform, see Enterprise LLMOps Architecture Guide.


Table of Contents

  1. Key Takeaways
  2. Why Agent Observability Differs from Traditional APM
  3. The Three Pillars Architecture
  4. Distributed Tracing for LLM Agents
  5. Evaluation Engineering
  6. Debugging Agent Failures
  7. Building the Observability Stack
  8. Production Best Practices
  9. FAQ
  10. Summary
  11. Related Resources

Key Takeaways

  • Agent observability is a three-dimensional problem: Traces answer "what happened," Evals answer "how good was it," Debug answers "why did it fail"—all three are essential.
  • OpenTelemetry is the standard for the Trace layer: Extend Semantic Conventions with custom Span attributes for LLM-specific data (tokens, model, temperature) and leverage existing infrastructure.
  • LLM-as-Judge is not a silver bullet: It requires structured rubrics, multi-model cross-validation, and human calibration to be reliable.
  • 90% of Agent failures fall into 5 patterns: Infinite loops, tool misuse, hallucinated actions, context overflow, and reasoning drift.
  • Observability must be built from Day 1: Retrofitting costs 10x more than initial implementation and never achieves complete coverage.
  • Sampling strategy determines cost ceiling: Full collection is impractical in production—importance-based tiered sampling reduces storage costs by 80%.

Why Agent Observability Differs from Traditional APM

Traditional application observability rests on deterministic assumptions: identical inputs produce identical outputs, errors have clear exception types, and performance bottlenecks are locatable through latency distributions. AI Agents fundamentally break every one of these assumptions.

Dimension Traditional APM Agent Observability
Output determinism Same input → Same output Same input → Different output (temperature > 0)
Error definition HTTP 5xx / Exceptions Semantic errors (hallucination, irrelevance, harm)
Call chain depth Typically 3-10 hops Single request can trigger 10-50 LLM calls
Performance metrics Latency, throughput + Token usage, reasoning quality, faithfulness
Root cause analysis Stack traces Requires semantic reasoning path reconstruction
Cost model Fixed infrastructure Per-token dynamic billing, unpredictable costs
python
# Traditional APM: one request = one Span
# Agent observability: one user request = a Span tree

# Traditional approach
@trace_request
def handle_request(request):
    result = process(request)
    return result  # 200 OK = success

# Agent approach — 200 OK does NOT mean success
@trace_agent_request
async def handle_agent_request(request):
    plan = await llm.plan(request)         # Span: planning
    for step in plan.steps:
        tool_result = await execute(step)  # Span: tool_call
        validation = await llm.validate(tool_result)  # Span: validation
        if not validation.is_faithful:
            # HTTP 200, but semantically a failure
            raise SemanticError("Output not faithful to source")
    return synthesize(results)

A single user request to an Agent may traverse: intent classification → plan generation → tool selection → parameter construction → tool execution → result validation → output synthesis. Every step is non-deterministic. Every step can produce semantic errors. None of these errors throw exceptions.


The Three Pillars Architecture

graph TB subgraph "User Request" U[User Input] end subgraph "Trace Layer — What Happened" T1[Distributed Tracing] T2[Span Tree Construction] T3["Token/Latency/Cost"] T4[Context Propagation] end subgraph "Eval Layer — How Good Was It" E1[Online Evaluation] E2[Offline Evaluation] E3[LLM-as-Judge] E4[Custom Metrics] end subgraph "Debug Layer — Why Did It Fail" D1[Failure Mode Detection] D2[Time-Travel Replay] D3[Log Correlation] D4[Root Cause Analysis] end U --> T1 T1 --> T2 --> T3 --> T4 T4 --> E1 E1 --> E2 --> E3 --> E4 E4 --> D1 D1 --> D2 --> D3 --> D4 style T1 fill:#e1f5fe style E1 fill:#f3e5f5 style D1 fill:#fff3e0

The relationship between the three pillars: Traces provide the data foundation, Evals define quality standards, and Debug closes the feedback loop. Without Traces, Evals and Debug have no data; without Evals, Traces cannot indicate whether something is wrong; without Debug capabilities, discovered problems cannot be resolved.


Distributed Tracing for LLM Agents

OpenTelemetry Integration

OpenTelemetry is the de facto standard for observability. By extending its Semantic Conventions, it adapts perfectly to LLM scenarios. The core approach models every LLM call, tool call, and reasoning step as a Span with custom attributes for Agent-specific information.

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

# Initialize Tracer
resource = Resource.create({
    "service.name": "agent-service",
    "service.version": "1.2.0",
    "deployment.environment": "production",
})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("agent.core", "1.0.0")


class AgentTracer:
    """Agent observability tracer wrapping OpenTelemetry Span creation"""

    def __init__(self, tracer):
        self.tracer = tracer

    def trace_llm_call(self, model: str, messages: list, temperature: float = 0.7):
        """Trace a single LLM call"""
        span = self.tracer.start_span(
            name=f"llm.chat.{model}",
            attributes={
                "llm.model": model,
                "llm.temperature": temperature,
                "llm.message_count": len(messages),
                "llm.system_prompt_tokens": self._count_tokens(messages[0]) if messages else 0,
            }
        )
        return span

    def trace_tool_call(self, tool_name: str, parameters: dict):
        """Trace a tool execution"""
        span = self.tracer.start_span(
            name=f"tool.execute.{tool_name}",
            attributes={
                "tool.name": tool_name,
                "tool.parameters": str(parameters)[:1024],  # Truncate to prevent overflow
            }
        )
        return span

    def trace_agent_step(self, step_type: str, step_index: int):
        """Trace an agent reasoning step"""
        span = self.tracer.start_span(
            name=f"agent.step.{step_type}",
            attributes={
                "agent.step.type": step_type,
                "agent.step.index": step_index,
            }
        )
        return span

    def _count_tokens(self, message) -> int:
        return len(str(message)) // 4  # Rough approximation

LangSmith vs LangFuse vs Arize Phoenix

Feature LangSmith LangFuse Arize Phoenix
Open Source ❌ Closed ✅ MIT ✅ Apache 2.0
Self-Hosting
LangChain Integration Native deep integration SDK integration SDK integration
Playground ✅ Full-featured ✅ Basic ✅ Basic
Dataset Management ✅ Built-in ✅ Built-in ✅ Built-in
Evaluation Framework ✅ Complete ✅ Complete ✅ Phoenix Evals
Real-time Monitoring
Pricing Model Per-trace volume Self-hosted free Self-hosted free
Best For LangChain power users Privacy-sensitive enterprise Research teams

LangFuse Integration Example (Python):

python
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

langfuse = Langfuse(
    public_key="pk-xxx",
    secret_key="sk-xxx",
    host="https://your-langfuse-instance.com"
)


@observe(name="agent-pipeline")
async def run_agent_pipeline(user_query: str, session_id: str):
    """Full agent pipeline with observability"""
    langfuse_context.update_current_trace(
        session_id=session_id,
        user_id="user-123",
        metadata={"pipeline_version": "2.1.0"}
    )

    # Step 1: Intent classification
    intent = await classify_intent(user_query)

    # Step 2: Plan generation
    plan = await generate_plan(user_query, intent)

    # Step 3: Execute plan steps
    results = []
    for step in plan.steps:
        result = await execute_step(step)
        results.append(result)

    # Step 4: Synthesize output
    output = await synthesize_output(results, user_query)

    # Record evaluation score
    langfuse_context.score_current_trace(
        name="output_quality",
        value=await evaluate_output(output, user_query),
        comment="Automated quality score"
    )

    return output


@observe(name="classify-intent", capture_input=True, capture_output=True)
async def classify_intent(query: str) -> str:
    """Intent classification with automatic I/O capture"""
    response = await llm.chat(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Classify user intent into categories..."},
            {"role": "user", "content": query}
        ],
        temperature=0.1
    )
    return response.content


@observe(name="execute-tool")
async def execute_step(step):
    """Tool execution with detailed tracing"""
    langfuse_context.update_current_observation(
        metadata={"tool": step.tool_name, "retry_count": 0}
    )
    try:
        result = await tool_registry.execute(step.tool_name, step.parameters)
        langfuse_context.update_current_observation(
            level="DEFAULT",
            status_message="success"
        )
        return result
    except Exception as e:
        langfuse_context.update_current_observation(
            level="ERROR",
            status_message=str(e)
        )
        raise

Trace Flow Architecture

sequenceDiagram participant User participant Gateway as API Gateway participant Agent as Agent Runtime participant LLM as LLM Provider participant Tool as Tool Server participant Collector as OTel Collector participant Backend as Trace Backend User->>Gateway: POST /chat (trace_id=abc123) Gateway->>Agent: Forward + inject W3C traceparent Agent->>LLM: Plan generation Agent-->>Collector: Span: llm.plan (tokens=450, latency=1.2s) LLM-->>Agent: Plan with 3 steps loop For each step Agent->>Tool: Execute tool call Agent-->>Collector: Span: tool.execute (tool=search) Tool-->>Agent: Tool result Agent->>LLM: Validate result Agent-->>Collector: Span: llm.validate (faithful=true) LLM-->>Agent: Validation result end Agent->>LLM: Synthesize final output Agent-->>Collector: Span: llm.synthesize (tokens=800) LLM-->>Agent: Final response Agent->>Gateway: Response Gateway->>User: Final answer Collector->>Backend: Batch export spans

Evaluation Engineering

Online vs Offline Evaluation

Evaluation engineering has two complementary dimensions:

Dimension Online Eval Offline Eval
Timing Real-time, per-request Batch, scheduled or CI/CD triggered
Latency impact Adds 200-500ms None
Coverage Sampled (1-10%) Full test set
Use case Quality monitoring, anomaly detection Version comparison, regression testing
Eval complexity Lightweight (latency, format) Deep (faithfulness, completeness)
Cost Proportional to production traffic Fixed (dataset size × eval runs)

LLM-as-Judge Implementation

LLM-as-Judge is the most widely adopted approach for evaluating Agent outputs. The core challenge is ensuring the evaluation itself is reliable.

typescript
// TypeScript: Structured LLM-as-Judge Evaluator
import { OpenAI } from "openai";
import { z } from "zod";

// Define structured output schema for evaluation dimensions
const EvalResultSchema = z.object({
  faithfulness: z.object({
    score: z.number().min(0).max(1),
    reasoning: z.string(),
    evidence: z.array(z.string()),
  }),
  relevance: z.object({
    score: z.number().min(0).max(1),
    reasoning: z.string(),
  }),
  completeness: z.object({
    score: z.number().min(0).max(1),
    missing_aspects: z.array(z.string()),
  }),
  hallucination: z.object({
    detected: z.boolean(),
    hallucinated_claims: z.array(z.string()),
    severity: z.enum(["none", "minor", "major", "critical"]),
  }),
});

type EvalResult = z.infer<typeof EvalResultSchema>;

interface EvalInput {
  query: string;
  context: string[];
  response: string;
  groundTruth?: string;
}

class LLMJudgeEvaluator {
  private client: OpenAI;
  private model: string;

  constructor(apiKey: string, model = "gpt-4o") {
    this.client = new OpenAI({ apiKey });
    this.model = model;
  }

  async evaluate(input: EvalInput): Promise<EvalResult> {
    const rubric = this.buildRubric(input);

    const response = await this.client.chat.completions.create({
      model: this.model,
      temperature: 0.1, // Low temperature for evaluation consistency
      response_format: { type: "json_object" },
      messages: [
        {
          role: "system",
          content: `You are an expert evaluator for AI agent outputs.
Evaluate the response strictly according to the rubric provided.
Return ONLY a JSON object matching the specified schema.
Be critical and precise - do not inflate scores.`,
        },
        {
          role: "user",
          content: rubric,
        },
      ],
    });

    const parsed = JSON.parse(response.choices[0].message.content!);
    return EvalResultSchema.parse(parsed);
  }

  private buildRubric(input: EvalInput): string {
    return `## Evaluation Task

### User Query
${input.query}

### Retrieved Context
${input.context.map((c, i) => `[${i + 1}] ${c}`).join("\n")}

### Agent Response
${input.response}

${input.groundTruth ? `### Ground Truth\n${input.groundTruth}` : ""}

### Scoring Rubric

**Faithfulness** (0-1): Does the response ONLY contain claims supported by the context?
- 1.0: Every claim is directly supported by context
- 0.7: Minor unsupported claims that don't affect correctness
- 0.3: Contains speculative claims without evidence
- 0.0: Fabricates information contradicting context

**Relevance** (0-1): Does the response address the user's actual question?
- 1.0: Directly and completely answers the query
- 0.5: Partially addresses the query with some tangential content
- 0.0: Completely off-topic

**Completeness** (0-1): Does the response cover all aspects of the query?
- 1.0: Addresses all sub-questions and aspects
- 0.5: Covers main points but misses important details
- 0.0: Only superficially touches the topic

**Hallucination Detection**: Identify any claims NOT supported by context.

Return your evaluation as a JSON object.`;
  }
}

Custom Evaluation Metrics with Ragas

python
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
)
from datasets import Dataset

# Build evaluation dataset
eval_dataset = Dataset.from_dict({
    "question": [
        "How do I configure OpenTelemetry for LLM tracing?",
        "Which platform is better for self-hosted deployment, LangSmith or LangFuse?",
    ],
    "answer": [
        agent_responses[0],  # Actual agent output
        agent_responses[1],
    ],
    "contexts": [
        [retrieved_context_1],  # Retrieved context documents
        [retrieved_context_2],
    ],
    "ground_truth": [
        "Use OpenTelemetry SDK to create custom Spans with LLM-specific attributes...",
        "LangFuse supports self-hosted deployment and is better for private hosting...",
    ],
})

# Execute evaluation
results = evaluate(
    dataset=eval_dataset,
    metrics=[
        faithfulness,       # Is the output faithful to the context?
        answer_relevancy,   # Does the output answer the question?
        context_precision,  # Is the retrieved content relevant?
        context_recall,     # Was enough information retrieved?
    ],
)

print(results.to_pandas())
# Output:
#   faithfulness  answer_relevancy  context_precision  context_recall
# 0         0.92              0.88               0.85            0.78
# 1         0.95              0.91               0.90            0.82

Debugging Agent Failures

Five Failure Mode Categories

In production, 90% of Agent failures fall into five distinct patterns:

Failure Mode Symptoms Detection Method Root Cause
Infinite loops Agent repeatedly calls the same tool Repeated Span pattern detection Missing stop condition / reasoning deadlock
Tool misuse Wrong tool called or invalid parameters Tool call success rate monitoring Unclear tool descriptions / insufficient context
Hallucinated actions Executing non-existent tools or fabricated parameters Tool registry validation Hallucination generalized to the action layer
Context overflow Intermediate results lost, incomplete output Token usage monitoring Exceeded context window limits
Reasoning drift Output gradually deviates from original goal Semantic similarity tracking Goal information decay in long reasoning chains

Time-Travel Debugging Implementation

python
import json
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class AgentSnapshot:
    """Agent runtime snapshot for time-travel debugging"""
    timestamp: datetime
    step_index: int
    step_type: str
    input_state: dict
    output_state: dict
    llm_messages: list
    llm_response: str
    tool_calls: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)


class TimeTravelDebugger:
    """Time-travel debugger: captures snapshots at every step, supports replay and breakpoints"""

    def __init__(self):
        self.snapshots: list[AgentSnapshot] = []
        self.breakpoints: dict[str, callable] = {}

    def capture(self, step_index: int, step_type: str, **kwargs) -> AgentSnapshot:
        """Capture a complete snapshot of the current step"""
        snapshot = AgentSnapshot(
            timestamp=datetime.utcnow(),
            step_index=step_index,
            step_type=step_type,
            input_state=kwargs.get("input_state", {}),
            output_state=kwargs.get("output_state", {}),
            llm_messages=kwargs.get("messages", []),
            llm_response=kwargs.get("response", ""),
            tool_calls=kwargs.get("tool_calls", []),
            metadata=kwargs.get("metadata", {}),
        )
        self.snapshots.append(snapshot)
        self._check_breakpoints(snapshot)
        return snapshot

    def replay_from(self, step_index: int) -> list[AgentSnapshot]:
        """Replay execution from a specific step"""
        return [s for s in self.snapshots if s.step_index >= step_index]

    def detect_loop(self, window_size: int = 5) -> bool:
        """Detect loop patterns: are the last N tool call sequences repeated?"""
        if len(self.snapshots) < window_size * 2:
            return False

        recent = self.snapshots[-window_size:]
        previous = self.snapshots[-window_size * 2:-window_size]

        recent_pattern = [(s.step_type, tuple(s.tool_calls)) for s in recent]
        previous_pattern = [(s.step_type, tuple(s.tool_calls)) for s in previous]

        return recent_pattern == previous_pattern

    def find_divergence_point(self, expected_trace: list[dict]) -> int:
        """Compare actual execution against expected path, find divergence point"""
        for i, (actual, expected) in enumerate(zip(self.snapshots, expected_trace)):
            if actual.step_type != expected.get("step_type"):
                return i
            if actual.output_state != expected.get("expected_output"):
                return i
        return -1

    def _check_breakpoints(self, snapshot: AgentSnapshot):
        """Check if any debug breakpoints are triggered"""
        for name, condition in self.breakpoints.items():
            if condition(snapshot):
                print(f"[BREAKPOINT] {name} triggered at step {snapshot.step_index}")
                self._dump_snapshot(snapshot)

    def _dump_snapshot(self, snapshot: AgentSnapshot):
        """Print snapshot details"""
        print(json.dumps({
            "step": snapshot.step_index,
            "type": snapshot.step_type,
            "input": snapshot.input_state,
            "output": snapshot.output_state,
            "tools": snapshot.tool_calls,
        }, indent=2))


# Usage example
debugger = TimeTravelDebugger()

# Set breakpoint: pause when loop detected
debugger.breakpoints["loop_detected"] = lambda s: debugger.detect_loop()

# Set breakpoint: token usage exceeds threshold
debugger.breakpoints["token_overflow"] = lambda s: s.metadata.get("total_tokens", 0) > 100000

Log Correlation Strategy

typescript
// TypeScript: Structured logging with Trace correlation
import { SpanContext, trace } from "@opentelemetry/api";
import pino from "pino";

interface AgentLogEntry {
  traceId: string;
  spanId: string;
  level: "info" | "warn" | "error" | "debug";
  event: string;
  agentId: string;
  stepIndex: number;
  data: Record<string, unknown>;
}

class AgentLogger {
  private logger: pino.Logger;

  constructor(serviceName: string) {
    this.logger = pino({
      name: serviceName,
      formatters: {
        log: (obj) => {
          // Auto-inject current Trace context
          const span = trace.getActiveSpan();
          if (span) {
            const ctx: SpanContext = span.spanContext();
            return {
              ...obj,
              traceId: ctx.traceId,
              spanId: ctx.spanId,
              traceFlags: ctx.traceFlags,
            };
          }
          return obj;
        },
      },
    });
  }

  logAgentStep(entry: Omit<AgentLogEntry, "traceId" | "spanId">): void {
    const span = trace.getActiveSpan();
    const ctx = span?.spanContext();

    this.logger[entry.level]({
      traceId: ctx?.traceId ?? "unknown",
      spanId: ctx?.spanId ?? "unknown",
      event: entry.event,
      agentId: entry.agentId,
      stepIndex: entry.stepIndex,
      ...entry.data,
    });
  }

  logToolResult(toolName: string, success: boolean, latencyMs: number): void {
    this.logAgentStep({
      level: success ? "info" : "error",
      event: "tool_execution_complete",
      agentId: "current",
      stepIndex: -1,
      data: { toolName, success, latencyMs },
    });
  }
}

Building the Observability Stack

Architecture Overview

graph TB subgraph "Application Layer" A1[Agent Runtime] A2[LLM Gateway] A3[Tool Registry] end subgraph "Collection Layer" B1["OTel SDK - Traces"] B2["LangFuse SDK - Evals"] B3["Structured Logger - Logs"] end subgraph "Transport Layer" C1[OTel Collector] C2["Kafka Pulsar - Event Bus"] end subgraph "Storage Layer" D1["Tempo Jaeger - Trace Store"] D2["PostgreSQL - Eval Store"] D3["Loki ES - Log Store"] D4["Prometheus - Metrics"] end subgraph "Analysis Layer" E1["Grafana - Dashboard"] E2["Alert Manager - Alerting"] E3["Eval Pipeline - Evaluation"] E4["Root Cause - Analysis"] end A1 --> B1 A1 --> B2 A1 --> B3 A2 --> B1 A3 --> B1 B1 --> C1 B2 --> C2 B3 --> C2 C1 --> D1 C1 --> D4 C2 --> D2 C2 --> D3 D1 --> E1 D2 --> E3 D3 --> E4 D4 --> E1 D4 --> E2

Tool Comparison Matrix

Dimension LangSmith LangFuse Arize Phoenix Custom Build
Deployment complexity ⭐ Zero deploy ⭐⭐ Docker/K8s ⭐⭐ Docker ⭐⭐⭐⭐⭐
Data control ❌ Cloud-hosted ✅ Full control ✅ Full control ✅ Full control
Evaluation capability ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐ (DIY)
Ecosystem integration LangChain native Framework-agnostic Framework-agnostic Fully custom
Team collaboration ✅ Built-in ✅ Built-in ⚠️ Basic ❌ DIY
Monthly cost $400-2000 Self-hosting cost Self-hosting cost Infra + engineering
Best stage Rapid validation / mid-size Large enterprise / compliance Research / small teams Hyperscale

Sampling Strategy Design

python
from enum import Enum
import hashlib


class ImportanceLevel(Enum):
    CRITICAL = "critical"   # 100% sampling - payments, high-risk decisions
    HIGH = "high"           # 50% sampling - tool failures, long-chain reasoning
    MEDIUM = "medium"       # 10% sampling - normal requests
    LOW = "low"             # 1% sampling - health checks, internal calls


class AdaptiveSampler:
    """Importance-based adaptive sampler"""

    SAMPLE_RATES = {
        ImportanceLevel.CRITICAL: 1.0,
        ImportanceLevel.HIGH: 0.5,
        ImportanceLevel.MEDIUM: 0.1,
        ImportanceLevel.LOW: 0.01,
    }

    def __init__(self, error_boost_multiplier: float = 5.0):
        self.error_boost = error_boost_multiplier
        self.error_rate_window: list[bool] = []

    def should_sample(
        self,
        trace_id: str,
        importance: ImportanceLevel,
        is_error: bool = False,
    ) -> bool:
        """Determine whether to sample the current Trace"""
        base_rate = self.SAMPLE_RATES[importance]

        # Boost sampling rate for error requests
        if is_error:
            base_rate = min(1.0, base_rate * self.error_boost)

        # Deterministic sampling based on trace_id (consistent for same trace)
        hash_value = int(hashlib.md5(trace_id.encode()).hexdigest()[:8], 16)
        threshold = hash_value / 0xFFFFFFFF

        return threshold < base_rate

    def classify_importance(self, request_metadata: dict) -> ImportanceLevel:
        """Classify request importance based on metadata"""
        if request_metadata.get("involves_payment"):
            return ImportanceLevel.CRITICAL
        if request_metadata.get("tool_count", 0) > 5:
            return ImportanceLevel.HIGH
        if request_metadata.get("is_internal"):
            return ImportanceLevel.LOW
        return ImportanceLevel.MEDIUM

Production Best Practices

1. Tiered Alerting Strategy

python
# Alert rule configuration
ALERT_RULES = {
    "agent_loop_detected": {
        "condition": "span_repeat_count > 3 within 30s",
        "severity": "critical",
        "action": "auto_terminate + page_oncall",
    },
    "hallucination_rate_spike": {
        "condition": "hallucination_rate > 0.15 for 5min",
        "severity": "high",
        "action": "alert_oncall + increase_sampling",
    },
    "latency_p99_breach": {
        "condition": "agent_latency_p99 > 30s for 3min",
        "severity": "medium",
        "action": "alert_channel",
    },
    "token_cost_anomaly": {
        "condition": "hourly_token_cost > 2x daily_average",
        "severity": "high",
        "action": "alert_oncall + enable_rate_limit",
    },
    "eval_score_degradation": {
        "condition": "faithfulness_score_avg < 0.7 for 15min",
        "severity": "high",
        "action": "rollback_prompt_version",
    },
}

2. Trace Data Lifecycle Management

Data Type Retention Storage Tier Rationale
Full Trace (with LLM I/O) 7 days Hot Storage Active debugging
Trace metadata (no I/O) 90 days Warm Storage Trend analysis
Evaluation scores Permanent Cold Storage Historical comparison
Aggregated metrics Permanent Time-series DB Dashboards
Error Trace snapshots 365 days Hot Storage Post-mortem analysis

3. Observability Maturity Model

graph LR L0["L0: Logs Only - console.log"] L1["L1: Basic Tracing - Manual Inspection"] L2["L2: Structured Eval - Automated Regression"] L3["L3: Real-time Dashboards - Anomaly Alerting"] L4["L4: Auto Root Cause - Self-healing"] L0 --> L1 --> L2 --> L3 --> L4 style L0 fill:#ffcdd2 style L1 fill:#fff9c4 style L2 fill:#c8e6c9 style L3 fill:#b3e5fc style L4 fill:#e1bee7

Minimum production readiness: Level 3

  • L0 → L1: Integrate OpenTelemetry + tracing platform (1-2 days)
  • L1 → L2: Build Golden Dataset + CI evaluation pipeline (1-2 weeks)
  • L2 → L3: Grafana dashboards + AlertManager rules (1 week)
  • L3 → L4: Anomaly detection ML models + auto-rollback pipeline (1-2 months)

4. Security and Privacy

python
class PIIScrubber:
    """Scrub PII from Trace data before storage"""

    PATTERNS = {
        "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
        "phone": r"\b\d{3}[-.]?\d{4}[-.]?\d{4}\b",
        "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
    }

    def scrub(self, text: str) -> str:
        import re
        for pii_type, pattern in self.PATTERNS.items():
            text = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
        return text

FAQ

Q1: How is Agent observability different from traditional APM?

Traditional APM monitors deterministic metrics like latency, error rates, and throughput. Agent observability must capture non-deterministic LLM reasoning paths, multi-step decision chains, tool call quality, and semantic output evaluation. A single user request may trigger 10-50 internal LLM calls, each producing different output distributions.

Q2: Should I choose LangSmith or LangFuse?

LangSmith is ideal for teams heavily invested in the LangChain ecosystem—it offers the most complete feature set but is closed-source. LangFuse is open-source and supports self-hosting, making it better for enterprises with data privacy requirements. If you need full data control, choose LangFuse; if you want the best integrated playground and dataset management, choose LangSmith.

Q3: How do you implement LLM-as-Judge evaluation without introducing new hallucinations?

Constrain evaluation to specific dimensions with structured rubrics. Split evaluation into independent axes (faithfulness, relevance, completeness), require JSON structured output with confidence scores, use multiple models for cross-validation, and trigger human review when inter-model score divergence exceeds a threshold.

Q4: How do you debug infinite loops in production AI agents?

Deploy loop detection via repeated Span pattern matching in your trace backend. During investigation, examine the Trace Timeline for repeated tool call sequences and check reasoning logs for repetitive thinking patterns. Root fix requires max iteration limits, state-change detection, and deadlock timeouts at the agent framework level.

Q5: How do you measure Agent observability maturity?

Use a 5-level model: L0 (logs only) → L1 (basic tracing, manual inspection) → L2 (structured eval, automated regression) → L3 (real-time dashboards, anomaly alerts) → L4 (automated root cause analysis, self-healing). Most teams are stuck at L1; production readiness requires at least L3.


Summary

Agent observability engineering is not an optional add-on—it is a prerequisite for production deployment. The three-pillar architecture presented in this guide—Trace, Eval, Debug—forms a complete system from data collection to problem resolution:

  1. Trace Layer provides raw data: OpenTelemetry captures every step's input, output, latency, and token usage
  2. Eval Layer defines quality standards: LLM-as-Judge and Ragas quantify faithfulness, relevance, and completeness
  3. Debug Layer closes the loop: Time-travel replay and loop detection pinpoint root causes

The ROI of observability investment in Agent systems far exceeds traditional applications. An Agent system without tracing is like driving in the dark—when an accident happens, you do not even know why.

Use JSON Formatter to format evaluation data output, or leverage Text Diff to compare prompt version changes. For more Agent engineering practices, explore other articles in this series.