核心摘要
LLM 的输出天然是流式的——模型逐 token 生成,而非一次性返回完整答案。如何在 AI Agent 编排中高效传递流数据,又如何在不侵入业务逻辑的前提下注入日志、追踪和监控?Eino 通过 StreamReader/StreamWriter 原语 和 Callback 切面系统 给出了优雅的工程化方案。本文将从底层原语到生产级可观测性,完整拆解这两大核心机制。
目录
核心要点
- 双模调用:每个 Eino 组件均支持
Invoke(完整返回)和Stream(流式返回)两种模式 - StreamReader/StreamWriter:类型安全的泛型流原语,是 Eino 流式体系的基石
- 自动流转换:编排引擎在节点间自动处理流拼接、分裂和类型转换
- Callback 四钩子:OnStart / OnEnd / OnStartWithStream / OnEndWithStream 覆盖全部执行生命周期
- 作用域控制:支持全局、按类型、按节点三级粒度的 Callback 注入
- OTel 一键集成:内置 OpenTelemetry Handler,每个节点自动生成 Trace Span
为什么需要流式处理
在传统的 API 调用模式中,客户端必须等待 LLM 生成全部 token 后才能收到响应。对于一个 500 token 的回答,这意味着用户可能要等待 3-5 秒才看到第一个字符。流式处理从三个维度解决这一问题:
| 维度 | 非流式 | 流式 |
|---|---|---|
| 首 token 延迟 | 3-5s(全部生成后返回) | 100-300ms(首 token 即返回) |
| 内存占用 | 需缓存完整响应 | 逐片段处理,常数级内存 |
| 用户体验 | 长时间无反馈 | 实时打字效果 |
在 Agentic Workflow 中,流式处理的价值被进一步放大:Agent 的思考链(Chain of Thought)可以实时展示给用户,让交互过程透明可见。
Eino 流式范式
Invoke vs Stream
Eino 为每个组件定义了两种调用接口:
// Invoke:等待完整结果
result, err := chatModel.Generate(ctx, messages)
// Stream:返回 StreamReader,逐片段读取
streamReader, err := chatModel.Stream(ctx, messages)
for {
chunk, err := streamReader.Recv()
if err == io.EOF {
break
}
fmt.Print(chunk.Content)
}
StreamReader 与 StreamWriter
StreamReader[T] 和 StreamWriter[T] 是 Eino 流式体系的核心原语。它们基于 Go 泛型实现类型安全的流式通信:
// 创建流对
reader, writer := schema.Pipe[*schema.Message](bufferSize)
// 写入端(通常在组件内部)
go func() {
defer writer.Close()
for _, chunk := range chunks {
if closed := writer.Send(chunk, nil); closed {
return
}
}
}()
// 读取端(由下游消费)
for {
msg, err := reader.Recv()
if err == io.EOF {
break
}
process(msg)
}
关键设计:
- 类型安全:泛型参数
[T]确保编译期类型检查 - 背压机制:
bufferSize控制缓冲区大小,避免生产者过快 - 优雅关闭:
Close()通知读取端流结束,Recv()返回io.EOF
组件流式能力矩阵
不同组件对流的支持程度不同。Eino 的编排引擎能自动适配这些差异:
| 组件类型 | Invoke 输入 | Stream 输入 | Invoke 输出 | Stream 输出 |
|---|---|---|---|---|
| ChatModel | ✅ | ❌ | ✅ | ✅ |
| PromptTemplate | ✅ | ❌ | ✅ | ❌ |
| Retriever | ✅ | ❌ | ✅ | ❌ |
| ToolNode | ✅ | ❌ | ✅ | ❌ |
| Lambda | ✅ | 可选 | ✅ | 可选 |
| Transformer | ✅ | ✅ | ✅ | ✅ |
关键洞察:ChatModel 是唯一天然支持 Stream 输出的核心组件。编排引擎的流式透传能力确保了即使中间节点不支持流式,最终用户仍可获得流式体验。
编排中的流式透传
当你在 Graph 中以 Stream 模式调用编排时,Eino 引擎在节点间自动处理三种流转换:
三种自动转换
1. 流拼接(Concatenation)
当下游节点只接受完整值时,引擎自动收集所有流片段并合并:
// 引擎内部等效逻辑
fullMessage := concatenate(streamReader) // 收集所有 chunk
nextNode.Invoke(ctx, fullMessage) // 传递完整值
2. 流分裂(Fork/Split)
当一个流需要被多个下游节点消费时,引擎自动分裂流:
// 一个 StreamReader 被复制为多份
readers := stream.Split(originalReader, consumerCount)
// 每个 consumer 独立消费完整的流数据
3. 流合并(Merge)
当多个上游的流需要汇入同一下游时,引擎按完成顺序合并:
// 多个上游流合并为一个
mergedReader := stream.Merge(reader1, reader2, reader3)
Callback 切面系统
为什么需要 Callback
在生产环境中,你需要对 Agent 的每一步执行进行监控:记录输入输出、统计耗时、追踪异常。这些「横切关注点」如果散落在业务代码中会导致严重耦合。Eino 的 Callback 系统提供了 AOP(面向切面编程)风格的解决方案。
四阶段钩子
每个节点执行时,Eino 引擎会按顺序触发四个钩子:
| 钩子 | 触发时机 | 输入参数 |
|---|---|---|
OnStart |
节点开始前 | ctx, RunInfo, CallbackInput |
OnStartWithStream |
节点以流方式开始 | ctx, RunInfo, StreamReader |
OnEnd |
节点完成后 | ctx, RunInfo, CallbackOutput |
OnEndWithStream |
节点以流方式结束 | ctx, RunInfo, StreamReader |
使用 HandlerBuilder 构建
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[%s] 开始执行, 组件: %s, 输入: %v",
info.Name, info.Type, input)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("[%s] 执行完成, 组件: %s, 输出: %v",
info.Name, info.Type, output)
return ctx
}).
OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
log.Printf("[%s] 流式输出开始, 组件: %s", info.Name, info.Type)
return ctx
}).
Build()
作用域控制
Callback 支持三级粒度的作用域控制:
// 1. 全局:对 Graph 中所有节点生效
result, err := compiledGraph.Invoke(ctx, input,
compose.WithCallbacks(handler))
// 2. 按组件类型:仅对特定类型组件生效
result, err := compiledGraph.Invoke(ctx, input,
compose.WithCallbacks(handler).DesignateType(components.ComponentOfChatModel))
// 3. 按节点名称:仅对指定节点生效
result, err := compiledGraph.Invoke(ctx, input,
compose.WithCallbacks(handler).DesignateNode("llm_node"))
这种分层设计让你可以:
- 全局注入 OTel 追踪(对所有节点生效)
- 仅对 ChatModel 节点记录 token 用量
- 仅对特定关键节点添加告警逻辑
生产级可观测性
OpenTelemetry 集成
Eino 内置了 OpenTelemetry Callback Handler,每个节点的执行自动映射为 OTel Span:
配置 OTel Handler
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
einootel "github.com/cloudwego/eino/callbacks/otel"
)
func initTracer() func() {
exporter, _ := otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint("otel-collector:4318"),
)
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-agent-service"),
)),
)
otel.SetTracerProvider(tp)
return func() { tp.Shutdown(ctx) }
}
// 创建 OTel Callback Handler
otelHandler := einootel.NewHandler()
// 注入到 Graph 执行
result, err := graph.Invoke(ctx, input,
compose.WithCallbacks(otelHandler))
每次执行会生成包含以下信息的 Span:
- 节点名称和组件类型
- 输入/输出摘要
- 执行耗时
- 错误信息(如有)
实战:全链路日志与耗时追踪
以下是一个完整的示例,展示如何同时使用 Callback 实现日志记录和耗时统计:
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
)
// 构建带耗时追踪的 Callback Handler
func buildTracingHandler() callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
start := time.Now()
ctx = context.WithValue(ctx, "start_"+info.Name, start)
log.Printf("[TRACE] ▶ Node=%s Type=%s started", info.Name, info.Type)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
duration := time.Since(start)
log.Printf("[TRACE] ◀ Node=%s Type=%s duration=%v",
info.Name, info.Type, duration)
}
return ctx
}).
OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
ttft := time.Since(start)
log.Printf("[TRACE] ⇥ Node=%s first_chunk_latency=%v", info.Name, ttft)
}
return ctx
}).
Build()
}
func main() {
ctx := context.Background()
// 构建 Graph(参考上一篇文章的编排方式)
g := compose.NewGraph[string, string]()
// ... 添加节点和边 ...
compiled, err := g.Compile(ctx)
if err != nil {
log.Fatal(err)
}
// 注入 tracing handler
handler := buildTracingHandler()
// 流式执行
sr, err := compiled.Stream(ctx, "你好,请帮我分析这段代码的性能问题",
compose.WithCallbacks(handler))
if err != nil {
log.Fatal(err)
}
// 消费流式输出
for {
chunk, err := sr.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Print(chunk)
}
fmt.Println()
}
运行输出示例:
[TRACE] ▶ Node=prompt_template Type=PromptTemplate started
[TRACE] ◀ Node=prompt_template Type=PromptTemplate duration=1.2ms
[TRACE] ▶ Node=llm Type=ChatModel started
[TRACE] ⇥ Node=llm first_chunk_latency=156ms
[TRACE] ▶ Node=output_parser Type=Lambda started
[TRACE] ◀ Node=output_parser Type=Lambda duration=0.3ms
最佳实践
流式处理:
- 面向用户的接口始终使用
Stream模式,最大化响应体验 - 对中间处理节点,如果不需要观察中间流数据,让引擎自动处理转换
- 设置合理的
bufferSize,过大浪费内存,过小导致背压频繁
Callback 设计:
- 将通用可观测性(OTel、日志)注册为全局 Callback
- 业务相关的监控(token 计数、成本追踪)限定到具体组件类型
- Callback 中避免阻塞操作——使用异步 channel 将数据发送到后台 goroutine
- 利用
context.Context在 OnStart/OnEnd 之间传递状态(如计时器)
生产部署:
- 配合 JSON 格式化工具 让 OTel 输出的 JSON trace 更易读
- 使用 JSON 转 Go Struct 快速将 Callback 输出结构化
常见问题
Q: 流式模式下网络中断怎么处理?
A: StreamReader.Recv() 会返回 error(非 io.EOF)。建议在消费循环中对非 EOF 错误做降级处理,记录断点位置用于重试或向用户提示。
Q: Callback 执行是否影响主流程性能?
A: Callback 在同一 goroutine 中同步执行。如果回调逻辑较重(如写数据库),应在回调内部将数据推入异步队列,避免阻塞主流程。
Q: 可以同时注册多个 Callback Handler 吗?
A: 可以。多个 Handler 按注册顺序依次执行。OnStart 按正序执行,OnEnd 按逆序执行,类似中间件洋葱模型。
Q: 流式 Callback 能否修改流中的数据?
A: OnEndWithStream 接收的是流的 副本(通过 Fork 实现),对副本的消费不影响下游节点接收到的原始流。这保证了 Callback 的纯观察语义。
总结
流式处理和 Callback 切面是 Eino 从「能用」到「生产可用」的关键跨越。StreamReader/StreamWriter 原语为编排提供了类型安全的流抽象,编排引擎的自动流转换屏蔽了组件间的协议差异,而 Callback 系统则在不侵入业务代码的前提下实现了全链路可观测性。
掌握这两个机制后,你可以:
- 为用户提供实时的流式响应体验
- 在生产环境中追踪每一步执行的耗时和异常
- 基于 OpenTelemetry 构建完整的分布式追踪链路
在下一篇文章中,我们将进入 Eino DevKit 实战:构建你的第一个 Agent,将本文的流式和回调机制应用到真实的 Agent 构建中。