核心摘要
AI Agent 的非确定性本质让传统可观测性方案完全失效——你无法用 HTTP 状态码判断一个"幻觉回答"是否是 Bug。本文提出 Agent 可观测性工程的三大支柱架构:分布式追踪(Trace) 捕获多步决策路径,评估工程(Eval) 量化输出质量,故障调试(Debugging) 实现根因定位。结合 OpenTelemetry、LangSmith、LangFuse 和 Arize Phoenix 的生产级代码实现,构建从开发到运维的闭环可观测体系。
本文是 智能体从验证到上线的十大陷阱 的可观测性深度扩展,建议结合 企业 LLMOps 平台架构 一起阅读。
目录
- 核心要点
- 为什么 Agent 可观测性不同于传统 APM
- 三大支柱架构总览
- 分布式追踪:捕获 Agent 决策路径
- 评估工程:量化 Agent 输出质量
- 故障调试:Agent 失败模式与根因定位
- 构建生产级可观测性技术栈
- 生产环境最佳实践
- 常见问题
- 总结
- 相关资源
核心要点
- Agent 可观测性是三维问题:Trace 解决"发生了什么",Eval 解决"质量如何",Debug 解决"为什么出错"——三者缺一不可
- OpenTelemetry 是 Trace 层的标准选择:通过自定义 Span 属性记录 LLM 特有信息(Token、模型、Temperature),利用现有基础设施
- LLM-as-Judge 不是银弹:必须配合结构化 Rubric、多模型交叉验证和人工校准才能可靠
- 90% 的 Agent 故障可归为 5 种模式:无限循环、工具误用、幻觉行动、上下文溢出、推理漂移
- 可观测性必须在第一天构建:后期补装的成本是初始构建的 10 倍,且覆盖率永远不完整
- 采样策略决定成本上限:生产环境全量采集不现实,基于重要性的分层采样可降低 80% 存储成本
为什么 Agent 可观测性不同于传统 APM
传统应用的可观测性建立在确定性假设之上:相同输入产生相同输出,错误有明确的异常类型,性能瓶颈可通过延迟分布定位。AI Agent 彻底打破了这些假设。
| 维度 | 传统 APM | Agent 可观测性 |
|---|---|---|
| 输出确定性 | 相同输入 → 相同输出 | 相同输入 → 不同输出(Temperature > 0) |
| 错误定义 | HTTP 5xx / 异常 | 语义错误(幻觉、不相关、有害) |
| 调用链深度 | 通常 3-10 跳 | 单次请求可达 10-50 次 LLM 调用 |
| 性能指标 | 延迟、吞吐量 | + Token 用量、推理质量、忠实度 |
| 根因分析 | 堆栈追踪 | 需要语义级推理路径回溯 |
| 成本模型 | 固定基础设施成本 | 按 Token 动态计费,成本不可预测 |
# 传统 APM:一个请求 = 一个 Span
# Agent 可观测性:一个用户请求 = 一棵 Span 树
# 传统方式
@trace_request
def handle_request(request):
result = process(request)
return result # 200 OK = 成功
# Agent 方式 —— 200 OK 不代表成功
@trace_agent_request
def handle_agent_request(request):
plan = await llm.plan(request) # Span: planning
for step in plan.steps:
tool_result = await execute(step) # Span: tool_call
validation = await llm.validate(tool_result) # Span: validation
if not validation.is_faithful:
# HTTP 200,但语义上是失败的
raise SemanticError("Output not faithful to source")
return synthesize(results)
Agent 的一次用户请求可能经过以下路径:意图识别 → 计划生成 → 工具选择 → 参数构造 → 工具执行 → 结果验证 → 输出合成。每一步都是非确定性的,每一步都可能产生语义错误,而这些错误不会抛出异常。
三大支柱架构总览
三大支柱之间的关系是:Trace 提供数据基础,Eval 定义质量标准,Debug 实现问题闭环。缺少 Trace,Eval 和 Debug 就无数据可用;缺少 Eval,有 Trace 也无法判断是否有问题;缺少 Debug 能力,发现问题也无法修复。
分布式追踪:捕获 Agent 决策路径
OpenTelemetry 集成方案
OpenTelemetry 是可观测性领域的事实标准,通过扩展 Semantic Conventions 可以完美适配 LLM 场景。核心思路是将每次 LLM 调用、工具调用和推理步骤建模为 Span,同时通过自定义属性记录 Agent 特有信息。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
# 初始化 Tracer
resource = Resource.create({
"service.name": "agent-service",
"service.version": "1.2.0",
"deployment.environment": "production",
})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agent.core", "1.0.0")
class AgentTracer:
"""Agent 可观测性追踪器,封装 OpenTelemetry Span 创建逻辑"""
def __init__(self, tracer):
self.tracer = tracer
def trace_llm_call(self, model: str, messages: list, temperature: float = 0.7):
"""追踪单次 LLM 调用"""
span = self.tracer.start_span(
name=f"llm.chat.{model}",
attributes={
"llm.model": model,
"llm.temperature": temperature,
"llm.message_count": len(messages),
"llm.system_prompt_tokens": self._count_tokens(messages[0]) if messages else 0,
}
)
return span
def trace_tool_call(self, tool_name: str, parameters: dict):
"""追踪工具调用"""
span = self.tracer.start_span(
name=f"tool.execute.{tool_name}",
attributes={
"tool.name": tool_name,
"tool.parameters": str(parameters)[:1024], # 截断防溢出
}
)
return span
def trace_agent_step(self, step_type: str, step_index: int):
"""追踪 Agent 推理步骤"""
span = self.tracer.start_span(
name=f"agent.step.{step_type}",
attributes={
"agent.step.type": step_type,
"agent.step.index": step_index,
}
)
return span
def _count_tokens(self, message) -> int:
return len(str(message)) // 4 # 粗略估算
LangSmith / LangFuse 实战对比
| 特性 | LangSmith | LangFuse | Arize Phoenix |
|---|---|---|---|
| 开源 | ❌ 闭源 | ✅ MIT | ✅ Apache 2.0 |
| 自托管 | ❌ | ✅ | ✅ |
| LangChain 集成 | 原生深度集成 | SDK 集成 | SDK 集成 |
| Playground | ✅ 功能强大 | ✅ 基础 | ✅ 基础 |
| Dataset 管理 | ✅ 内置 | ✅ 内置 | ✅ 内置 |
| 评估框架 | ✅ 完整 | ✅ 完整 | ✅ Phoenix Evals |
| 实时监控 | ✅ | ✅ | ✅ |
| 定价模型 | 按 Trace 量 | 自托管免费 | 自托管免费 |
| 适合场景 | LangChain 重度用户 | 数据敏感型企业 | 研究型团队 |
LangFuse 集成示例(Python):
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
langfuse = Langfuse(
public_key="pk-xxx",
secret_key="sk-xxx",
host="https://your-langfuse-instance.com"
)
@observe(name="agent-pipeline")
async def run_agent_pipeline(user_query: str, session_id: str):
"""完整的 Agent 流水线追踪"""
langfuse_context.update_current_trace(
session_id=session_id,
user_id="user-123",
metadata={"pipeline_version": "2.1.0"}
)
# Step 1: 意图识别
intent = await classify_intent(user_query)
# Step 2: 计划生成
plan = await generate_plan(user_query, intent)
# Step 3: 执行计划
results = []
for step in plan.steps:
result = await execute_step(step)
results.append(result)
# Step 4: 合成输出
output = await synthesize_output(results, user_query)
# 记录评估分数
langfuse_context.score_current_trace(
name="output_quality",
value=await evaluate_output(output, user_query),
comment="Automated quality score"
)
return output
@observe(name="classify-intent", capture_input=True, capture_output=True)
async def classify_intent(query: str) -> str:
"""意图分类 - 自动追踪输入输出"""
response = await llm.chat(
model="gpt-4o",
messages=[
{"role": "system", "content": "Classify user intent into categories..."},
{"role": "user", "content": query}
],
temperature=0.1
)
return response.content
@observe(name="execute-tool")
async def execute_step(step):
"""工具执行步骤 - 追踪工具调用细节"""
langfuse_context.update_current_observation(
metadata={"tool": step.tool_name, "retry_count": 0}
)
try:
result = await tool_registry.execute(step.tool_name, step.parameters)
langfuse_context.update_current_observation(
level="DEFAULT",
status_message="success"
)
return result
except Exception as e:
langfuse_context.update_current_observation(
level="ERROR",
status_message=str(e)
)
raise
Trace 流转架构
评估工程:量化 Agent 输出质量
在线评估 vs 离线评估
评估工程分为两个互补维度:
| 维度 | 在线评估(Online Eval) | 离线评估(Offline Eval) |
|---|---|---|
| 执行时机 | 实时,随请求触发 | 批量,定时或 CI/CD 触发 |
| 延迟影响 | 增加 200-500ms | 无影响 |
| 覆盖率 | 采样(1-10%) | 全量测试集 |
| 适用场景 | 质量监控、异常检测 | 版本对比、回归测试 |
| 评估复杂度 | 轻量指标(延迟、格式) | 深度指标(忠实度、完整性) |
| 成本 | 生产流量比例 | 固定(测试集大小 × 评估次数) |
LLM-as-Judge 模式实现
LLM-as-Judge 是当前最主流的 Agent 输出评估方式。核心挑战在于确保评估本身的可靠性。
// TypeScript: 结构化 LLM-as-Judge 评估器
import { OpenAI } from "openai";
import { z } from "zod";
// 定义评估维度的结构化输出 Schema
const EvalResultSchema = z.object({
faithfulness: z.object({
score: z.number().min(0).max(1),
reasoning: z.string(),
evidence: z.array(z.string()),
}),
relevance: z.object({
score: z.number().min(0).max(1),
reasoning: z.string(),
}),
completeness: z.object({
score: z.number().min(0).max(1),
missing_aspects: z.array(z.string()),
}),
hallucination: z.object({
detected: z.boolean(),
hallucinated_claims: z.array(z.string()),
severity: z.enum(["none", "minor", "major", "critical"]),
}),
});
type EvalResult = z.infer<typeof EvalResultSchema>;
interface EvalInput {
query: string;
context: string[];
response: string;
groundTruth?: string;
}
class LLMJudgeEvaluator {
private client: OpenAI;
private model: string;
constructor(apiKey: string, model = "gpt-4o") {
this.client = new OpenAI({ apiKey });
this.model = model;
}
async evaluate(input: EvalInput): Promise<EvalResult> {
const rubric = this.buildRubric(input);
const response = await this.client.chat.completions.create({
model: this.model,
temperature: 0.1, // 低温度提高评估一致性
response_format: { type: "json_object" },
messages: [
{
role: "system",
content: `You are an expert evaluator for AI agent outputs.
Evaluate the response strictly according to the rubric provided.
Return ONLY a JSON object matching the specified schema.
Be critical and precise - do not inflate scores.`,
},
{
role: "user",
content: rubric,
},
],
});
const parsed = JSON.parse(response.choices[0].message.content!);
return EvalResultSchema.parse(parsed);
}
private buildRubric(input: EvalInput): string {
return `## Evaluation Task
### User Query
${input.query}
### Retrieved Context
${input.context.map((c, i) => `[${i + 1}] ${c}`).join("\n")}
### Agent Response
${input.response}
${input.groundTruth ? `### Ground Truth\n${input.groundTruth}` : ""}
### Scoring Rubric
**Faithfulness** (0-1): Does the response ONLY contain claims supported by the context?
- 1.0: Every claim is directly supported by context
- 0.7: Minor unsupported claims that don't affect correctness
- 0.3: Contains speculative claims without evidence
- 0.0: Fabricates information contradicting context
**Relevance** (0-1): Does the response address the user's actual question?
- 1.0: Directly and completely answers the query
- 0.5: Partially addresses the query with some tangential content
- 0.0: Completely off-topic
**Completeness** (0-1): Does the response cover all aspects of the query?
- 1.0: Addresses all sub-questions and aspects
- 0.5: Covers main points but misses important details
- 0.0: Only superficially touches the topic
**Hallucination Detection**: Identify any claims NOT supported by context.
Return your evaluation as a JSON object.`;
}
}
自定义评估指标:Ragas 集成
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
from datasets import Dataset
# 构建评估数据集
eval_dataset = Dataset.from_dict({
"question": [
"如何配置 OpenTelemetry 进行 LLM 追踪?",
"LangSmith 和 LangFuse 哪个更适合私有化部署?",
],
"answer": [
agent_responses[0], # Agent 实际输出
agent_responses[1],
],
"contexts": [
[retrieved_context_1], # 检索到的上下文
[retrieved_context_2],
],
"ground_truth": [
"使用 OpenTelemetry SDK 创建自定义 Span...",
"LangFuse 支持自托管部署,适合私有化需求...",
],
})
# 执行评估
results = evaluate(
dataset=eval_dataset,
metrics=[
faithfulness, # 忠实度:输出是否忠于上下文
answer_relevancy, # 相关性:输出是否回答了问题
context_precision, # 上下文精度:检索内容是否相关
context_recall, # 上下文召回:是否检索到足够信息
],
)
print(results.to_pandas())
# Output:
# faithfulness answer_relevancy context_precision context_recall
# 0 0.92 0.88 0.85 0.78
# 1 0.95 0.91 0.90 0.82
故障调试:Agent 失败模式与根因定位
五大故障模式分类
生产环境中 90% 的 Agent 故障可归为以下五种模式:
| 故障模式 | 表现 | 检测方法 | 根因 |
|---|---|---|---|
| 无限循环 | Agent 反复调用同一工具 | Span 重复模式检测 | 停止条件缺失 / 推理死锁 |
| 工具误用 | 调用错误工具或传入无效参数 | 工具调用成功率监控 | 工具描述不清晰 / 上下文不足 |
| 幻觉行动 | 执行不存在的工具或虚构参数 | 工具注册表校验 | 幻觉泛化到行动层 |
| 上下文溢出 | 中间结果丢失,输出不完整 | Token 用量监控 | 超出上下文窗口限制 |
| 推理漂移 | 输出逐渐偏离原始目标 | 语义相似度追踪 | 长链推理中目标信息衰减 |
Time-Travel 调试实现
import json
from dataclasses import dataclass, field
from typing import Any
from datetime import datetime
@dataclass
class AgentSnapshot:
"""Agent 运行时快照,支持 Time-Travel 调试"""
timestamp: datetime
step_index: int
step_type: str
input_state: dict
output_state: dict
llm_messages: list
llm_response: str
tool_calls: list = field(default_factory=list)
metadata: dict = field(default_factory=dict)
class TimeTravelDebugger:
"""Time-Travel 调试器:记录每一步快照,支持回放和断点"""
def __init__(self):
self.snapshots: list[AgentSnapshot] = []
self.breakpoints: dict[str, callable] = {}
def capture(self, step_index: int, step_type: str, **kwargs) -> AgentSnapshot:
"""捕获当前步骤的完整快照"""
snapshot = AgentSnapshot(
timestamp=datetime.utcnow(),
step_index=step_index,
step_type=step_type,
input_state=kwargs.get("input_state", {}),
output_state=kwargs.get("output_state", {}),
llm_messages=kwargs.get("messages", []),
llm_response=kwargs.get("response", ""),
tool_calls=kwargs.get("tool_calls", []),
metadata=kwargs.get("metadata", {}),
)
self.snapshots.append(snapshot)
self._check_breakpoints(snapshot)
return snapshot
def replay_from(self, step_index: int) -> list[AgentSnapshot]:
"""从指定步骤开始回放"""
return [s for s in self.snapshots if s.step_index >= step_index]
def detect_loop(self, window_size: int = 5) -> bool:
"""检测循环模式:连续 N 步的工具调用序列是否重复"""
if len(self.snapshots) < window_size * 2:
return False
recent = self.snapshots[-window_size:]
previous = self.snapshots[-window_size * 2:-window_size]
recent_pattern = [(s.step_type, tuple(s.tool_calls)) for s in recent]
previous_pattern = [(s.step_type, tuple(s.tool_calls)) for s in previous]
return recent_pattern == previous_pattern
def find_divergence_point(self, expected_trace: list[dict]) -> int:
"""对比实际执行和预期路径,找到分叉点"""
for i, (actual, expected) in enumerate(zip(self.snapshots, expected_trace)):
if actual.step_type != expected.get("step_type"):
return i
if actual.output_state != expected.get("expected_output"):
return i
return -1
def _check_breakpoints(self, snapshot: AgentSnapshot):
"""检查是否触发调试断点"""
for name, condition in self.breakpoints.items():
if condition(snapshot):
print(f"[BREAKPOINT] {name} triggered at step {snapshot.step_index}")
self._dump_snapshot(snapshot)
def _dump_snapshot(self, snapshot: AgentSnapshot):
"""输出快照详情"""
print(json.dumps({
"step": snapshot.step_index,
"type": snapshot.step_type,
"input": snapshot.input_state,
"output": snapshot.output_state,
"tools": snapshot.tool_calls,
}, indent=2, ensure_ascii=False))
# 使用示例
debugger = TimeTravelDebugger()
# 设置断点:当检测到循环时暂停
debugger.breakpoints["loop_detected"] = lambda s: debugger.detect_loop()
# 设置断点:Token 用量超过阈值
debugger.breakpoints["token_overflow"] = lambda s: s.metadata.get("total_tokens", 0) > 100000
日志关联策略
// TypeScript: 结构化日志与 Trace 关联
import { SpanContext, trace } from "@opentelemetry/api";
import pino from "pino";
interface AgentLogEntry {
traceId: string;
spanId: string;
level: "info" | "warn" | "error" | "debug";
event: string;
agentId: string;
stepIndex: number;
data: Record<string, unknown>;
}
class AgentLogger {
private logger: pino.Logger;
constructor(serviceName: string) {
this.logger = pino({
name: serviceName,
formatters: {
log: (obj) => {
// 自动注入当前 Trace 上下文
const span = trace.getActiveSpan();
if (span) {
const ctx: SpanContext = span.spanContext();
return {
...obj,
traceId: ctx.traceId,
spanId: ctx.spanId,
traceFlags: ctx.traceFlags,
};
}
return obj;
},
},
});
}
logAgentStep(entry: Omit<AgentLogEntry, "traceId" | "spanId">): void {
const span = trace.getActiveSpan();
const ctx = span?.spanContext();
this.logger[entry.level]({
traceId: ctx?.traceId ?? "unknown",
spanId: ctx?.spanId ?? "unknown",
event: entry.event,
agentId: entry.agentId,
stepIndex: entry.stepIndex,
...entry.data,
});
}
logToolResult(toolName: string, success: boolean, latencyMs: number): void {
this.logAgentStep({
level: success ? "info" : "error",
event: "tool_execution_complete",
agentId: "current",
stepIndex: -1,
data: { toolName, success, latencyMs },
});
}
}
构建生产级可观测性技术栈
架构总览
工具选型对比
| 维度 | LangSmith | LangFuse | Arize Phoenix | 自建方案 |
|---|---|---|---|---|
| 部署复杂度 | ⭐ 零部署 | ⭐⭐ Docker/K8s | ⭐⭐ Docker | ⭐⭐⭐⭐⭐ |
| 数据控制 | ❌ 云端 | ✅ 完全控制 | ✅ 完全控制 | ✅ 完全控制 |
| 评估能力 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐(需自建) |
| 生态集成 | LangChain 原生 | 框架无关 | 框架无关 | 完全自定义 |
| 团队协作 | ✅ 内置 | ✅ 内置 | ⚠️ 基础 | ❌ 需自建 |
| 成本(月) | $400-2000 | 自托管成本 | 自托管成本 | 基础设施 + 人力 |
| 适合阶段 | 快速验证 / 中型团队 | 大型企业 / 合规要求 | 研究 / 小型团队 | 超大规模 |
采样策略设计
from enum import Enum
from typing import Optional
import random
import hashlib
class ImportanceLevel(Enum):
CRITICAL = "critical" # 100% 采样 - 付费操作、高风险决策
HIGH = "high" # 50% 采样 - 工具执行失败、长链推理
MEDIUM = "medium" # 10% 采样 - 正常请求
LOW = "low" # 1% 采样 - 健康检查、内部调用
class AdaptiveSampler:
"""基于重要性的自适应采样器"""
SAMPLE_RATES = {
ImportanceLevel.CRITICAL: 1.0,
ImportanceLevel.HIGH: 0.5,
ImportanceLevel.MEDIUM: 0.1,
ImportanceLevel.LOW: 0.01,
}
def __init__(self, error_boost_multiplier: float = 5.0):
self.error_boost = error_boost_multiplier
self.error_rate_window: list[bool] = []
def should_sample(
self,
trace_id: str,
importance: ImportanceLevel,
is_error: bool = False,
) -> bool:
"""决定是否采样当前 Trace"""
base_rate = self.SAMPLE_RATES[importance]
# 错误请求提升采样率
if is_error:
base_rate = min(1.0, base_rate * self.error_boost)
# 基于 trace_id 的确定性采样(同一 trace 始终一致)
hash_value = int(hashlib.md5(trace_id.encode()).hexdigest()[:8], 16)
threshold = hash_value / 0xFFFFFFFF
return threshold < base_rate
def classify_importance(self, request_metadata: dict) -> ImportanceLevel:
"""根据请求元数据分类重要性"""
if request_metadata.get("involves_payment"):
return ImportanceLevel.CRITICAL
if request_metadata.get("tool_count", 0) > 5:
return ImportanceLevel.HIGH
if request_metadata.get("is_internal"):
return ImportanceLevel.LOW
return ImportanceLevel.MEDIUM
生产环境最佳实践
1. 分层告警策略
# 告警规则配置示例
ALERT_RULES = {
"agent_loop_detected": {
"condition": "span_repeat_count > 3 within 30s",
"severity": "critical",
"action": "auto_terminate + page_oncall",
},
"hallucination_rate_spike": {
"condition": "hallucination_rate > 0.15 for 5min",
"severity": "high",
"action": "alert_oncall + increase_sampling",
},
"latency_p99_breach": {
"condition": "agent_latency_p99 > 30s for 3min",
"severity": "medium",
"action": "alert_channel",
},
"token_cost_anomaly": {
"condition": "hourly_token_cost > 2x daily_average",
"severity": "high",
"action": "alert_oncall + enable_rate_limit",
},
"eval_score_degradation": {
"condition": "faithfulness_score_avg < 0.7 for 15min",
"severity": "high",
"action": "rollback_prompt_version",
},
}
2. Trace 数据生命周期管理
| 数据类型 | 保留策略 | 存储位置 | 理由 |
|---|---|---|---|
| 完整 Trace(含 LLM I/O) | 7 天 | Hot Storage | 调试需要 |
| Trace 元数据(无 I/O) | 90 天 | Warm Storage | 趋势分析 |
| 评估分数 | 永久 | Cold Storage | 历史对比 |
| 聚合指标 | 永久 | Time-series DB | 仪表盘 |
| 错误 Trace 快照 | 365 天 | Hot Storage | 事后分析 |
3. 可观测性成熟度模型
生产就绪最低要求:L3 级别
- L0 → L1:集成 OpenTelemetry + 追踪平台(1-2 天)
- L1 → L2:建立 Golden Dataset + CI 评估(1-2 周)
- L2 → L3:Grafana 仪表盘 + AlertManager 规则(1 周)
- L3 → L4:异常检测 ML 模型 + 自动回滚 Pipeline(1-2 月)
4. 安全与隐私
class PIIScrubber:
"""在 Trace 存储前清洗 PII 信息"""
PATTERNS = {
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"phone": r"\b\d{3}[-.]?\d{4}[-.]?\d{4}\b",
"credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
}
def scrub(self, text: str) -> str:
import re
for pii_type, pattern in self.PATTERNS.items():
text = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
return text
常见问题
Q1: Agent 可观测性与传统 APM 有什么区别?
传统 APM 关注请求延迟、错误率和吞吐量等确定性指标,而 Agent 可观测性需要捕获非确定性的 LLM 推理路径、多步决策链路、工具调用结果质量以及语义层面的输出评估。Agent 的单次请求可能触发 10-50 次内部 LLM 调用,每次都有不同的输出分布。
Q2: LangSmith 和 LangFuse 应该选哪个?
LangSmith 适合深度绑定 LangChain 生态的团队,功能最完整但闭源;LangFuse 是开源方案,支持自托管,适合对数据隐私有要求的企业。如果预算充足且不需要自托管,LangSmith 的 Playground 和 Dataset 功能更成熟;如果需要完全控制数据,LangFuse 是更好的选择。
Q3: 如何实现 LLM-as-Judge 评估而不引入新的幻觉?
关键是约束评估维度和提供评分标准(Rubric)。将评估拆分为多个独立维度(忠实度、相关性、完整性),每个维度使用结构化输出(JSON)并要求返回置信度分数。同时使用多个不同模型交叉评估,当评分差异超过阈值时触发人工审核。
Q4: 生产环境中 Agent 无限循环如何排查?
首先通过 Trace 的循环检测器(检测重复 Span 模式)自动告警。排查时查看 Trace Timeline 中的重复工具调用序列,检查 Agent 的推理日志是否出现重复的思考模式。根本修复需要在 Agent 框架层面设置最大迭代次数、状态变化检测和死锁超时。
Q5: 如何量化 Agent 系统的可观测性成熟度?
分为 5 个等级:L0(无追踪,仅日志)→ L1(基础 Trace,手动检查)→ L2(结构化 Eval,自动回归)→ L3(实时监控仪表盘,异常告警)→ L4(自动化根因分析,自愈机制)。大多数团队停留在 L1,生产就绪至少需要 L3。
总结
Agent 可观测性工程不是可选附加项,而是生产部署的前提条件。本文提出的三大支柱架构——Trace、Eval、Debug——构成了一个从数据采集到问题闭环的完整体系:
- Trace 层提供原始数据:通过 OpenTelemetry 捕获每一步的输入、输出、延迟和 Token 用量
- Eval 层定义质量标准:通过 LLM-as-Judge 和 Ragas 框架量化输出的忠实度、相关性和完整性
- Debug 层实现问题闭环:通过 Time-Travel 回放和循环检测定位根因
可观测性的投入回报比在 Agent 系统中远高于传统应用。一个没有 Trace 的 Agent 系统如同在黑暗中开车——出了事故你甚至不知道为什么。
使用 JSON Formatter 格式化评估数据输出,或借助 Text Diff 对比不同版本的 Prompt 变更。更多 Agent 工程实践请阅读本系列其他文章。
相关资源
- AI Agent 开发完全指南 - Agent 基础架构入门
- 企业 LLMOps 平台架构指南 - 完整运维体系
- LLM-as-Judge 评估模式详解 - 评估方法论深度
- AI Agent 术语表 - 核心概念解释
- 幻觉 (Hallucination) - 理解 Agent 幻觉问题
- OpenTelemetry - 可观测性标准协议