核心摘要

AI Agent 的非确定性本质让传统可观测性方案完全失效——你无法用 HTTP 状态码判断一个"幻觉回答"是否是 Bug。本文提出 Agent 可观测性工程的三大支柱架构:分布式追踪(Trace) 捕获多步决策路径,评估工程(Eval) 量化输出质量,故障调试(Debugging) 实现根因定位。结合 OpenTelemetry、LangSmith、LangFuse 和 Arize Phoenix 的生产级代码实现,构建从开发到运维的闭环可观测体系。

本文是 智能体从验证到上线的十大陷阱 的可观测性深度扩展,建议结合 企业 LLMOps 平台架构 一起阅读。


目录

  1. 核心要点
  2. 为什么 Agent 可观测性不同于传统 APM
  3. 三大支柱架构总览
  4. 分布式追踪:捕获 Agent 决策路径
  5. 评估工程:量化 Agent 输出质量
  6. 故障调试:Agent 失败模式与根因定位
  7. 构建生产级可观测性技术栈
  8. 生产环境最佳实践
  9. 常见问题
  10. 总结
  11. 相关资源

核心要点

  • Agent 可观测性是三维问题:Trace 解决"发生了什么",Eval 解决"质量如何",Debug 解决"为什么出错"——三者缺一不可
  • OpenTelemetry 是 Trace 层的标准选择:通过自定义 Span 属性记录 LLM 特有信息(Token、模型、Temperature),利用现有基础设施
  • LLM-as-Judge 不是银弹:必须配合结构化 Rubric、多模型交叉验证和人工校准才能可靠
  • 90% 的 Agent 故障可归为 5 种模式:无限循环、工具误用、幻觉行动、上下文溢出、推理漂移
  • 可观测性必须在第一天构建:后期补装的成本是初始构建的 10 倍,且覆盖率永远不完整
  • 采样策略决定成本上限:生产环境全量采集不现实,基于重要性的分层采样可降低 80% 存储成本

为什么 Agent 可观测性不同于传统 APM

传统应用的可观测性建立在确定性假设之上:相同输入产生相同输出,错误有明确的异常类型,性能瓶颈可通过延迟分布定位。AI Agent 彻底打破了这些假设。

维度 传统 APM Agent 可观测性
输出确定性 相同输入 → 相同输出 相同输入 → 不同输出(Temperature > 0)
错误定义 HTTP 5xx / 异常 语义错误(幻觉、不相关、有害)
调用链深度 通常 3-10 跳 单次请求可达 10-50 次 LLM 调用
性能指标 延迟、吞吐量 + Token 用量、推理质量、忠实度
根因分析 堆栈追踪 需要语义级推理路径回溯
成本模型 固定基础设施成本 按 Token 动态计费,成本不可预测
python
# 传统 APM:一个请求 = 一个 Span
# Agent 可观测性:一个用户请求 = 一棵 Span 树

# 传统方式
@trace_request
def handle_request(request):
    result = process(request)
    return result  # 200 OK = 成功

# Agent 方式 —— 200 OK 不代表成功
@trace_agent_request
def handle_agent_request(request):
    plan = await llm.plan(request)         # Span: planning
    for step in plan.steps:
        tool_result = await execute(step)  # Span: tool_call
        validation = await llm.validate(tool_result)  # Span: validation
        if not validation.is_faithful:
            # HTTP 200,但语义上是失败的
            raise SemanticError("Output not faithful to source")
    return synthesize(results)

Agent 的一次用户请求可能经过以下路径:意图识别 → 计划生成 → 工具选择 → 参数构造 → 工具执行 → 结果验证 → 输出合成。每一步都是非确定性的,每一步都可能产生语义错误,而这些错误不会抛出异常。


三大支柱架构总览

graph TB subgraph "用户请求" U[用户输入] end subgraph "Trace 层 - 发生了什么" T1[分布式追踪] T2[Span 树构建] T3["Token/延迟/成本"] T4[上下文传播] end subgraph "Eval 层 - 质量如何" E1[在线评估] E2[离线评估] E3[LLM-as-Judge] E4[自定义指标] end subgraph "Debug 层 - 为什么出错" D1[故障模式识别] D2[Time-Travel 回放] D3[日志关联] D4[根因分析] end U --> T1 T1 --> T2 --> T3 --> T4 T4 --> E1 E1 --> E2 --> E3 --> E4 E4 --> D1 D1 --> D2 --> D3 --> D4 style T1 fill:#e1f5fe style E1 fill:#f3e5f5 style D1 fill:#fff3e0

三大支柱之间的关系是:Trace 提供数据基础,Eval 定义质量标准,Debug 实现问题闭环。缺少 Trace,Eval 和 Debug 就无数据可用;缺少 Eval,有 Trace 也无法判断是否有问题;缺少 Debug 能力,发现问题也无法修复。


分布式追踪:捕获 Agent 决策路径

OpenTelemetry 集成方案

OpenTelemetry 是可观测性领域的事实标准,通过扩展 Semantic Conventions 可以完美适配 LLM 场景。核心思路是将每次 LLM 调用、工具调用和推理步骤建模为 Span,同时通过自定义属性记录 Agent 特有信息。

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

# 初始化 Tracer
resource = Resource.create({
    "service.name": "agent-service",
    "service.version": "1.2.0",
    "deployment.environment": "production",
})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("agent.core", "1.0.0")


class AgentTracer:
    """Agent 可观测性追踪器,封装 OpenTelemetry Span 创建逻辑"""

    def __init__(self, tracer):
        self.tracer = tracer

    def trace_llm_call(self, model: str, messages: list, temperature: float = 0.7):
        """追踪单次 LLM 调用"""
        span = self.tracer.start_span(
            name=f"llm.chat.{model}",
            attributes={
                "llm.model": model,
                "llm.temperature": temperature,
                "llm.message_count": len(messages),
                "llm.system_prompt_tokens": self._count_tokens(messages[0]) if messages else 0,
            }
        )
        return span

    def trace_tool_call(self, tool_name: str, parameters: dict):
        """追踪工具调用"""
        span = self.tracer.start_span(
            name=f"tool.execute.{tool_name}",
            attributes={
                "tool.name": tool_name,
                "tool.parameters": str(parameters)[:1024],  # 截断防溢出
            }
        )
        return span

    def trace_agent_step(self, step_type: str, step_index: int):
        """追踪 Agent 推理步骤"""
        span = self.tracer.start_span(
            name=f"agent.step.{step_type}",
            attributes={
                "agent.step.type": step_type,
                "agent.step.index": step_index,
            }
        )
        return span

    def _count_tokens(self, message) -> int:
        return len(str(message)) // 4  # 粗略估算

LangSmith / LangFuse 实战对比

特性 LangSmith LangFuse Arize Phoenix
开源 ❌ 闭源 ✅ MIT ✅ Apache 2.0
自托管
LangChain 集成 原生深度集成 SDK 集成 SDK 集成
Playground ✅ 功能强大 ✅ 基础 ✅ 基础
Dataset 管理 ✅ 内置 ✅ 内置 ✅ 内置
评估框架 ✅ 完整 ✅ 完整 ✅ Phoenix Evals
实时监控
定价模型 按 Trace 量 自托管免费 自托管免费
适合场景 LangChain 重度用户 数据敏感型企业 研究型团队

LangFuse 集成示例(Python):

python
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

langfuse = Langfuse(
    public_key="pk-xxx",
    secret_key="sk-xxx",
    host="https://your-langfuse-instance.com"
)


@observe(name="agent-pipeline")
async def run_agent_pipeline(user_query: str, session_id: str):
    """完整的 Agent 流水线追踪"""
    langfuse_context.update_current_trace(
        session_id=session_id,
        user_id="user-123",
        metadata={"pipeline_version": "2.1.0"}
    )

    # Step 1: 意图识别
    intent = await classify_intent(user_query)

    # Step 2: 计划生成
    plan = await generate_plan(user_query, intent)

    # Step 3: 执行计划
    results = []
    for step in plan.steps:
        result = await execute_step(step)
        results.append(result)

    # Step 4: 合成输出
    output = await synthesize_output(results, user_query)

    # 记录评估分数
    langfuse_context.score_current_trace(
        name="output_quality",
        value=await evaluate_output(output, user_query),
        comment="Automated quality score"
    )

    return output


@observe(name="classify-intent", capture_input=True, capture_output=True)
async def classify_intent(query: str) -> str:
    """意图分类 - 自动追踪输入输出"""
    response = await llm.chat(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Classify user intent into categories..."},
            {"role": "user", "content": query}
        ],
        temperature=0.1
    )
    return response.content


@observe(name="execute-tool")
async def execute_step(step):
    """工具执行步骤 - 追踪工具调用细节"""
    langfuse_context.update_current_observation(
        metadata={"tool": step.tool_name, "retry_count": 0}
    )
    try:
        result = await tool_registry.execute(step.tool_name, step.parameters)
        langfuse_context.update_current_observation(
            level="DEFAULT",
            status_message="success"
        )
        return result
    except Exception as e:
        langfuse_context.update_current_observation(
            level="ERROR",
            status_message=str(e)
        )
        raise

Trace 流转架构

sequenceDiagram participant User as 用户 participant Gateway as API Gateway participant Agent as Agent Runtime participant LLM as LLM Provider participant Tool as Tool Server participant Collector as OTel Collector participant Backend as Trace Backend User->>Gateway: POST /chat (trace_id=abc123) Gateway->>Agent: Forward + inject W3C traceparent Agent->>LLM: Plan generation Agent-->>Collector: Span: llm.plan (tokens=450, latency=1.2s) LLM-->>Agent: Plan with 3 steps loop For each step Agent->>Tool: Execute tool call Agent-->>Collector: Span: tool.execute (tool=search) Tool-->>Agent: Tool result Agent->>LLM: Validate result Agent-->>Collector: Span: llm.validate (faithful=true) LLM-->>Agent: Validation result end Agent->>LLM: Synthesize final output Agent-->>Collector: Span: llm.synthesize (tokens=800) LLM-->>Agent: Final response Agent->>Gateway: Response Gateway->>User: Final answer Collector->>Backend: Batch export spans

评估工程:量化 Agent 输出质量

在线评估 vs 离线评估

评估工程分为两个互补维度:

维度 在线评估(Online Eval) 离线评估(Offline Eval)
执行时机 实时,随请求触发 批量,定时或 CI/CD 触发
延迟影响 增加 200-500ms 无影响
覆盖率 采样(1-10%) 全量测试集
适用场景 质量监控、异常检测 版本对比、回归测试
评估复杂度 轻量指标(延迟、格式) 深度指标(忠实度、完整性)
成本 生产流量比例 固定(测试集大小 × 评估次数)

LLM-as-Judge 模式实现

LLM-as-Judge 是当前最主流的 Agent 输出评估方式。核心挑战在于确保评估本身的可靠性。

typescript
// TypeScript: 结构化 LLM-as-Judge 评估器
import { OpenAI } from "openai";
import { z } from "zod";

// 定义评估维度的结构化输出 Schema
const EvalResultSchema = z.object({
  faithfulness: z.object({
    score: z.number().min(0).max(1),
    reasoning: z.string(),
    evidence: z.array(z.string()),
  }),
  relevance: z.object({
    score: z.number().min(0).max(1),
    reasoning: z.string(),
  }),
  completeness: z.object({
    score: z.number().min(0).max(1),
    missing_aspects: z.array(z.string()),
  }),
  hallucination: z.object({
    detected: z.boolean(),
    hallucinated_claims: z.array(z.string()),
    severity: z.enum(["none", "minor", "major", "critical"]),
  }),
});

type EvalResult = z.infer<typeof EvalResultSchema>;

interface EvalInput {
  query: string;
  context: string[];
  response: string;
  groundTruth?: string;
}

class LLMJudgeEvaluator {
  private client: OpenAI;
  private model: string;

  constructor(apiKey: string, model = "gpt-4o") {
    this.client = new OpenAI({ apiKey });
    this.model = model;
  }

  async evaluate(input: EvalInput): Promise<EvalResult> {
    const rubric = this.buildRubric(input);

    const response = await this.client.chat.completions.create({
      model: this.model,
      temperature: 0.1, // 低温度提高评估一致性
      response_format: { type: "json_object" },
      messages: [
        {
          role: "system",
          content: `You are an expert evaluator for AI agent outputs.
Evaluate the response strictly according to the rubric provided.
Return ONLY a JSON object matching the specified schema.
Be critical and precise - do not inflate scores.`,
        },
        {
          role: "user",
          content: rubric,
        },
      ],
    });

    const parsed = JSON.parse(response.choices[0].message.content!);
    return EvalResultSchema.parse(parsed);
  }

  private buildRubric(input: EvalInput): string {
    return `## Evaluation Task

### User Query
${input.query}

### Retrieved Context
${input.context.map((c, i) => `[${i + 1}] ${c}`).join("\n")}

### Agent Response
${input.response}

${input.groundTruth ? `### Ground Truth\n${input.groundTruth}` : ""}

### Scoring Rubric

**Faithfulness** (0-1): Does the response ONLY contain claims supported by the context?
- 1.0: Every claim is directly supported by context
- 0.7: Minor unsupported claims that don't affect correctness
- 0.3: Contains speculative claims without evidence
- 0.0: Fabricates information contradicting context

**Relevance** (0-1): Does the response address the user's actual question?
- 1.0: Directly and completely answers the query
- 0.5: Partially addresses the query with some tangential content
- 0.0: Completely off-topic

**Completeness** (0-1): Does the response cover all aspects of the query?
- 1.0: Addresses all sub-questions and aspects
- 0.5: Covers main points but misses important details
- 0.0: Only superficially touches the topic

**Hallucination Detection**: Identify any claims NOT supported by context.

Return your evaluation as a JSON object.`;
  }
}

自定义评估指标:Ragas 集成

python
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
)
from datasets import Dataset

# 构建评估数据集
eval_dataset = Dataset.from_dict({
    "question": [
        "如何配置 OpenTelemetry 进行 LLM 追踪?",
        "LangSmith 和 LangFuse 哪个更适合私有化部署?",
    ],
    "answer": [
        agent_responses[0],  # Agent 实际输出
        agent_responses[1],
    ],
    "contexts": [
        [retrieved_context_1],  # 检索到的上下文
        [retrieved_context_2],
    ],
    "ground_truth": [
        "使用 OpenTelemetry SDK 创建自定义 Span...",
        "LangFuse 支持自托管部署,适合私有化需求...",
    ],
})

# 执行评估
results = evaluate(
    dataset=eval_dataset,
    metrics=[
        faithfulness,       # 忠实度:输出是否忠于上下文
        answer_relevancy,   # 相关性:输出是否回答了问题
        context_precision,  # 上下文精度:检索内容是否相关
        context_recall,     # 上下文召回:是否检索到足够信息
    ],
)

print(results.to_pandas())
# Output:
#   faithfulness  answer_relevancy  context_precision  context_recall
# 0         0.92              0.88               0.85            0.78
# 1         0.95              0.91               0.90            0.82

故障调试:Agent 失败模式与根因定位

五大故障模式分类

生产环境中 90% 的 Agent 故障可归为以下五种模式:

故障模式 表现 检测方法 根因
无限循环 Agent 反复调用同一工具 Span 重复模式检测 停止条件缺失 / 推理死锁
工具误用 调用错误工具或传入无效参数 工具调用成功率监控 工具描述不清晰 / 上下文不足
幻觉行动 执行不存在的工具或虚构参数 工具注册表校验 幻觉泛化到行动层
上下文溢出 中间结果丢失,输出不完整 Token 用量监控 超出上下文窗口限制
推理漂移 输出逐渐偏离原始目标 语义相似度追踪 长链推理中目标信息衰减

Time-Travel 调试实现

python
import json
from dataclasses import dataclass, field
from typing import Any
from datetime import datetime


@dataclass
class AgentSnapshot:
    """Agent 运行时快照,支持 Time-Travel 调试"""
    timestamp: datetime
    step_index: int
    step_type: str
    input_state: dict
    output_state: dict
    llm_messages: list
    llm_response: str
    tool_calls: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)


class TimeTravelDebugger:
    """Time-Travel 调试器:记录每一步快照,支持回放和断点"""

    def __init__(self):
        self.snapshots: list[AgentSnapshot] = []
        self.breakpoints: dict[str, callable] = {}

    def capture(self, step_index: int, step_type: str, **kwargs) -> AgentSnapshot:
        """捕获当前步骤的完整快照"""
        snapshot = AgentSnapshot(
            timestamp=datetime.utcnow(),
            step_index=step_index,
            step_type=step_type,
            input_state=kwargs.get("input_state", {}),
            output_state=kwargs.get("output_state", {}),
            llm_messages=kwargs.get("messages", []),
            llm_response=kwargs.get("response", ""),
            tool_calls=kwargs.get("tool_calls", []),
            metadata=kwargs.get("metadata", {}),
        )
        self.snapshots.append(snapshot)
        self._check_breakpoints(snapshot)
        return snapshot

    def replay_from(self, step_index: int) -> list[AgentSnapshot]:
        """从指定步骤开始回放"""
        return [s for s in self.snapshots if s.step_index >= step_index]

    def detect_loop(self, window_size: int = 5) -> bool:
        """检测循环模式:连续 N 步的工具调用序列是否重复"""
        if len(self.snapshots) < window_size * 2:
            return False

        recent = self.snapshots[-window_size:]
        previous = self.snapshots[-window_size * 2:-window_size]

        recent_pattern = [(s.step_type, tuple(s.tool_calls)) for s in recent]
        previous_pattern = [(s.step_type, tuple(s.tool_calls)) for s in previous]

        return recent_pattern == previous_pattern

    def find_divergence_point(self, expected_trace: list[dict]) -> int:
        """对比实际执行和预期路径,找到分叉点"""
        for i, (actual, expected) in enumerate(zip(self.snapshots, expected_trace)):
            if actual.step_type != expected.get("step_type"):
                return i
            if actual.output_state != expected.get("expected_output"):
                return i
        return -1

    def _check_breakpoints(self, snapshot: AgentSnapshot):
        """检查是否触发调试断点"""
        for name, condition in self.breakpoints.items():
            if condition(snapshot):
                print(f"[BREAKPOINT] {name} triggered at step {snapshot.step_index}")
                self._dump_snapshot(snapshot)

    def _dump_snapshot(self, snapshot: AgentSnapshot):
        """输出快照详情"""
        print(json.dumps({
            "step": snapshot.step_index,
            "type": snapshot.step_type,
            "input": snapshot.input_state,
            "output": snapshot.output_state,
            "tools": snapshot.tool_calls,
        }, indent=2, ensure_ascii=False))


# 使用示例
debugger = TimeTravelDebugger()

# 设置断点:当检测到循环时暂停
debugger.breakpoints["loop_detected"] = lambda s: debugger.detect_loop()

# 设置断点:Token 用量超过阈值
debugger.breakpoints["token_overflow"] = lambda s: s.metadata.get("total_tokens", 0) > 100000

日志关联策略

typescript
// TypeScript: 结构化日志与 Trace 关联
import { SpanContext, trace } from "@opentelemetry/api";
import pino from "pino";

interface AgentLogEntry {
  traceId: string;
  spanId: string;
  level: "info" | "warn" | "error" | "debug";
  event: string;
  agentId: string;
  stepIndex: number;
  data: Record<string, unknown>;
}

class AgentLogger {
  private logger: pino.Logger;

  constructor(serviceName: string) {
    this.logger = pino({
      name: serviceName,
      formatters: {
        log: (obj) => {
          // 自动注入当前 Trace 上下文
          const span = trace.getActiveSpan();
          if (span) {
            const ctx: SpanContext = span.spanContext();
            return {
              ...obj,
              traceId: ctx.traceId,
              spanId: ctx.spanId,
              traceFlags: ctx.traceFlags,
            };
          }
          return obj;
        },
      },
    });
  }

  logAgentStep(entry: Omit<AgentLogEntry, "traceId" | "spanId">): void {
    const span = trace.getActiveSpan();
    const ctx = span?.spanContext();

    this.logger[entry.level]({
      traceId: ctx?.traceId ?? "unknown",
      spanId: ctx?.spanId ?? "unknown",
      event: entry.event,
      agentId: entry.agentId,
      stepIndex: entry.stepIndex,
      ...entry.data,
    });
  }

  logToolResult(toolName: string, success: boolean, latencyMs: number): void {
    this.logAgentStep({
      level: success ? "info" : "error",
      event: "tool_execution_complete",
      agentId: "current",
      stepIndex: -1,
      data: { toolName, success, latencyMs },
    });
  }
}

构建生产级可观测性技术栈

架构总览

graph TB subgraph "应用层" A1[Agent Runtime] A2[LLM Gateway] A3[Tool Registry] end subgraph "采集层" B1["OTel SDK - Traces"] B2["LangFuse SDK - Evals"] B3["Structured Logger - Logs"] end subgraph "传输层" C1[OTel Collector] C2["Kafka Pulsar - Event Bus"] end subgraph "存储层" D1["Tempo Jaeger - Trace Store"] D2["PostgreSQL - Eval Store"] D3["Loki ES - Log Store"] D4["Prometheus - Metrics"] end subgraph "分析层" E1["Grafana - Dashboard"] E2["Alert Manager - 告警"] E3["Eval Pipeline - 评估"] E4["Root Cause - 根因分析"] end A1 --> B1 A1 --> B2 A1 --> B3 A2 --> B1 A3 --> B1 B1 --> C1 B2 --> C2 B3 --> C2 C1 --> D1 C1 --> D4 C2 --> D2 C2 --> D3 D1 --> E1 D2 --> E3 D3 --> E4 D4 --> E1 D4 --> E2

工具选型对比

维度 LangSmith LangFuse Arize Phoenix 自建方案
部署复杂度 ⭐ 零部署 ⭐⭐ Docker/K8s ⭐⭐ Docker ⭐⭐⭐⭐⭐
数据控制 ❌ 云端 ✅ 完全控制 ✅ 完全控制 ✅ 完全控制
评估能力 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐(需自建)
生态集成 LangChain 原生 框架无关 框架无关 完全自定义
团队协作 ✅ 内置 ✅ 内置 ⚠️ 基础 ❌ 需自建
成本(月) $400-2000 自托管成本 自托管成本 基础设施 + 人力
适合阶段 快速验证 / 中型团队 大型企业 / 合规要求 研究 / 小型团队 超大规模

采样策略设计

python
from enum import Enum
from typing import Optional
import random
import hashlib


class ImportanceLevel(Enum):
    CRITICAL = "critical"   # 100% 采样 - 付费操作、高风险决策
    HIGH = "high"           # 50% 采样 - 工具执行失败、长链推理
    MEDIUM = "medium"       # 10% 采样 - 正常请求
    LOW = "low"             # 1% 采样 - 健康检查、内部调用


class AdaptiveSampler:
    """基于重要性的自适应采样器"""

    SAMPLE_RATES = {
        ImportanceLevel.CRITICAL: 1.0,
        ImportanceLevel.HIGH: 0.5,
        ImportanceLevel.MEDIUM: 0.1,
        ImportanceLevel.LOW: 0.01,
    }

    def __init__(self, error_boost_multiplier: float = 5.0):
        self.error_boost = error_boost_multiplier
        self.error_rate_window: list[bool] = []

    def should_sample(
        self,
        trace_id: str,
        importance: ImportanceLevel,
        is_error: bool = False,
    ) -> bool:
        """决定是否采样当前 Trace"""
        base_rate = self.SAMPLE_RATES[importance]

        # 错误请求提升采样率
        if is_error:
            base_rate = min(1.0, base_rate * self.error_boost)

        # 基于 trace_id 的确定性采样(同一 trace 始终一致)
        hash_value = int(hashlib.md5(trace_id.encode()).hexdigest()[:8], 16)
        threshold = hash_value / 0xFFFFFFFF

        return threshold < base_rate

    def classify_importance(self, request_metadata: dict) -> ImportanceLevel:
        """根据请求元数据分类重要性"""
        if request_metadata.get("involves_payment"):
            return ImportanceLevel.CRITICAL
        if request_metadata.get("tool_count", 0) > 5:
            return ImportanceLevel.HIGH
        if request_metadata.get("is_internal"):
            return ImportanceLevel.LOW
        return ImportanceLevel.MEDIUM

生产环境最佳实践

1. 分层告警策略

python
# 告警规则配置示例
ALERT_RULES = {
    "agent_loop_detected": {
        "condition": "span_repeat_count > 3 within 30s",
        "severity": "critical",
        "action": "auto_terminate + page_oncall",
    },
    "hallucination_rate_spike": {
        "condition": "hallucination_rate > 0.15 for 5min",
        "severity": "high",
        "action": "alert_oncall + increase_sampling",
    },
    "latency_p99_breach": {
        "condition": "agent_latency_p99 > 30s for 3min",
        "severity": "medium",
        "action": "alert_channel",
    },
    "token_cost_anomaly": {
        "condition": "hourly_token_cost > 2x daily_average",
        "severity": "high",
        "action": "alert_oncall + enable_rate_limit",
    },
    "eval_score_degradation": {
        "condition": "faithfulness_score_avg < 0.7 for 15min",
        "severity": "high",
        "action": "rollback_prompt_version",
    },
}

2. Trace 数据生命周期管理

数据类型 保留策略 存储位置 理由
完整 Trace(含 LLM I/O) 7 天 Hot Storage 调试需要
Trace 元数据(无 I/O) 90 天 Warm Storage 趋势分析
评估分数 永久 Cold Storage 历史对比
聚合指标 永久 Time-series DB 仪表盘
错误 Trace 快照 365 天 Hot Storage 事后分析

3. 可观测性成熟度模型

graph LR L0["L0: 仅日志 - console.log"] L1["L1: 基础追踪 - 手动检查 Trace"] L2["L2: 结构化评估 - 自动回归测试"] L3["L3: 实时仪表盘 - 异常自动告警"] L4["L4: 自动根因分析 - 自愈与回滚"] L0 --> L1 --> L2 --> L3 --> L4 style L0 fill:#ffcdd2 style L1 fill:#fff9c4 style L2 fill:#c8e6c9 style L3 fill:#b3e5fc style L4 fill:#e1bee7

生产就绪最低要求:L3 级别

  • L0 → L1:集成 OpenTelemetry + 追踪平台(1-2 天)
  • L1 → L2:建立 Golden Dataset + CI 评估(1-2 周)
  • L2 → L3:Grafana 仪表盘 + AlertManager 规则(1 周)
  • L3 → L4:异常检测 ML 模型 + 自动回滚 Pipeline(1-2 月)

4. 安全与隐私

python
class PIIScrubber:
    """在 Trace 存储前清洗 PII 信息"""

    PATTERNS = {
        "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
        "phone": r"\b\d{3}[-.]?\d{4}[-.]?\d{4}\b",
        "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
    }

    def scrub(self, text: str) -> str:
        import re
        for pii_type, pattern in self.PATTERNS.items():
            text = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
        return text

常见问题

Q1: Agent 可观测性与传统 APM 有什么区别?

传统 APM 关注请求延迟、错误率和吞吐量等确定性指标,而 Agent 可观测性需要捕获非确定性的 LLM 推理路径、多步决策链路、工具调用结果质量以及语义层面的输出评估。Agent 的单次请求可能触发 10-50 次内部 LLM 调用,每次都有不同的输出分布。

Q2: LangSmith 和 LangFuse 应该选哪个?

LangSmith 适合深度绑定 LangChain 生态的团队,功能最完整但闭源;LangFuse 是开源方案,支持自托管,适合对数据隐私有要求的企业。如果预算充足且不需要自托管,LangSmith 的 Playground 和 Dataset 功能更成熟;如果需要完全控制数据,LangFuse 是更好的选择。

Q3: 如何实现 LLM-as-Judge 评估而不引入新的幻觉?

关键是约束评估维度和提供评分标准(Rubric)。将评估拆分为多个独立维度(忠实度、相关性、完整性),每个维度使用结构化输出(JSON)并要求返回置信度分数。同时使用多个不同模型交叉评估,当评分差异超过阈值时触发人工审核。

Q4: 生产环境中 Agent 无限循环如何排查?

首先通过 Trace 的循环检测器(检测重复 Span 模式)自动告警。排查时查看 Trace Timeline 中的重复工具调用序列,检查 Agent 的推理日志是否出现重复的思考模式。根本修复需要在 Agent 框架层面设置最大迭代次数、状态变化检测和死锁超时。

Q5: 如何量化 Agent 系统的可观测性成熟度?

分为 5 个等级:L0(无追踪,仅日志)→ L1(基础 Trace,手动检查)→ L2(结构化 Eval,自动回归)→ L3(实时监控仪表盘,异常告警)→ L4(自动化根因分析,自愈机制)。大多数团队停留在 L1,生产就绪至少需要 L3。


总结

Agent 可观测性工程不是可选附加项,而是生产部署的前提条件。本文提出的三大支柱架构——Trace、Eval、Debug——构成了一个从数据采集到问题闭环的完整体系:

  1. Trace 层提供原始数据:通过 OpenTelemetry 捕获每一步的输入、输出、延迟和 Token 用量
  2. Eval 层定义质量标准:通过 LLM-as-Judge 和 Ragas 框架量化输出的忠实度、相关性和完整性
  3. Debug 层实现问题闭环:通过 Time-Travel 回放和循环检测定位根因

可观测性的投入回报比在 Agent 系统中远高于传统应用。一个没有 Trace 的 Agent 系统如同在黑暗中开车——出了事故你甚至不知道为什么。

使用 JSON Formatter 格式化评估数据输出,或借助 Text Diff 对比不同版本的 Prompt 变更。更多 Agent 工程实践请阅读本系列其他文章。


相关资源