核心摘要

LLM 的输出天然是流式的——模型逐 token 生成,而非一次性返回完整答案。如何在 AI Agent 编排中高效传递流数据,又如何在不侵入业务逻辑的前提下注入日志、追踪和监控?Eino 通过 StreamReader/StreamWriter 原语Callback 切面系统 给出了优雅的工程化方案。本文将从底层原语到生产级可观测性,完整拆解这两大核心机制。


目录

  1. 核心要点
  2. 为什么需要流式处理
  3. Eino 流式范式
  4. 组件流式能力矩阵
  5. 编排中的流式透传
  6. Callback 切面系统
  7. 生产级可观测性
  8. 实战:全链路日志与耗时追踪
  9. 最佳实践
  10. 常见问题
  11. 总结
  12. 相关资源

核心要点

  • 双模调用:每个 Eino 组件均支持 Invoke(完整返回)和 Stream(流式返回)两种模式
  • StreamReader/StreamWriter:类型安全的泛型流原语,是 Eino 流式体系的基石
  • 自动流转换:编排引擎在节点间自动处理流拼接、分裂和类型转换
  • Callback 四钩子:OnStart / OnEnd / OnStartWithStream / OnEndWithStream 覆盖全部执行生命周期
  • 作用域控制:支持全局、按类型、按节点三级粒度的 Callback 注入
  • OTel 一键集成:内置 OpenTelemetry Handler,每个节点自动生成 Trace Span

为什么需要流式处理

在传统的 API 调用模式中,客户端必须等待 LLM 生成全部 token 后才能收到响应。对于一个 500 token 的回答,这意味着用户可能要等待 3-5 秒才看到第一个字符。流式处理从三个维度解决这一问题:

维度 非流式 流式
首 token 延迟 3-5s(全部生成后返回) 100-300ms(首 token 即返回)
内存占用 需缓存完整响应 逐片段处理,常数级内存
用户体验 长时间无反馈 实时打字效果

Agentic Workflow 中,流式处理的价值被进一步放大:Agent 的思考链(Chain of Thought)可以实时展示给用户,让交互过程透明可见。


Eino 流式范式

Invoke vs Stream

Eino 为每个组件定义了两种调用接口:

go
// Invoke:等待完整结果
result, err := chatModel.Generate(ctx, messages)

// Stream:返回 StreamReader,逐片段读取
streamReader, err := chatModel.Stream(ctx, messages)
for {
    chunk, err := streamReader.Recv()
    if err == io.EOF {
        break
    }
    fmt.Print(chunk.Content)
}

StreamReader 与 StreamWriter

StreamReader[T]StreamWriter[T] 是 Eino 流式体系的核心原语。它们基于 Go 泛型实现类型安全的流式通信:

go
// 创建流对
reader, writer := schema.Pipe[*schema.Message](bufferSize)

// 写入端(通常在组件内部)
go func() {
    defer writer.Close()
    for _, chunk := range chunks {
        if closed := writer.Send(chunk, nil); closed {
            return
        }
    }
}()

// 读取端(由下游消费)
for {
    msg, err := reader.Recv()
    if err == io.EOF {
        break
    }
    process(msg)
}

关键设计:

  • 类型安全:泛型参数 [T] 确保编译期类型检查
  • 背压机制bufferSize 控制缓冲区大小,避免生产者过快
  • 优雅关闭Close() 通知读取端流结束,Recv() 返回 io.EOF

组件流式能力矩阵

不同组件对流的支持程度不同。Eino 的编排引擎能自动适配这些差异:

组件类型 Invoke 输入 Stream 输入 Invoke 输出 Stream 输出
ChatModel
PromptTemplate
Retriever
ToolNode
Lambda 可选 可选
Transformer

关键洞察:ChatModel 是唯一天然支持 Stream 输出的核心组件。编排引擎的流式透传能力确保了即使中间节点不支持流式,最终用户仍可获得流式体验。


编排中的流式透传

当你在 Graph 中以 Stream 模式调用编排时,Eino 引擎在节点间自动处理三种流转换:

graph LR A["ChatModel (Stream输出)"] -->|"StreamReader[Message]"| B{"下游类型?"} B -->|"支持 Stream 输入"| C["Transformer (直接透传)"] B -->|"仅支持 Invoke"| D["自动拼接 (Concat)"] D --> E["Lambda (完整值)"] A -->|"分裂 (Fork)"| F["Callback 观察"] A -->|"分裂 (Fork)"| G["另一个下游消费者"]

三种自动转换

1. 流拼接(Concatenation)

当下游节点只接受完整值时,引擎自动收集所有流片段并合并:

go
// 引擎内部等效逻辑
fullMessage := concatenate(streamReader) // 收集所有 chunk
nextNode.Invoke(ctx, fullMessage)         // 传递完整值

2. 流分裂(Fork/Split)

当一个流需要被多个下游节点消费时,引擎自动分裂流:

go
// 一个 StreamReader 被复制为多份
readers := stream.Split(originalReader, consumerCount)
// 每个 consumer 独立消费完整的流数据

3. 流合并(Merge)

当多个上游的流需要汇入同一下游时,引擎按完成顺序合并:

go
// 多个上游流合并为一个
mergedReader := stream.Merge(reader1, reader2, reader3)

Callback 切面系统

为什么需要 Callback

在生产环境中,你需要对 Agent 的每一步执行进行监控:记录输入输出、统计耗时、追踪异常。这些「横切关注点」如果散落在业务代码中会导致严重耦合。Eino 的 Callback 系统提供了 AOP(面向切面编程)风格的解决方案。

四阶段钩子

每个节点执行时,Eino 引擎会按顺序触发四个钩子:

钩子 触发时机 输入参数
OnStart 节点开始前 ctx, RunInfo, CallbackInput
OnStartWithStream 节点以流方式开始 ctx, RunInfo, StreamReader
OnEnd 节点完成后 ctx, RunInfo, CallbackOutput
OnEndWithStream 节点以流方式结束 ctx, RunInfo, StreamReader

使用 HandlerBuilder 构建

go
handler := callbacks.NewHandlerBuilder().
    OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
        log.Printf("[%s] 开始执行, 组件: %s, 输入: %v",
            info.Name, info.Type, input)
        return ctx
    }).
    OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
        log.Printf("[%s] 执行完成, 组件: %s, 输出: %v",
            info.Name, info.Type, output)
        return ctx
    }).
    OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
        sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
        log.Printf("[%s] 流式输出开始, 组件: %s", info.Name, info.Type)
        return ctx
    }).
    Build()

作用域控制

Callback 支持三级粒度的作用域控制:

go
// 1. 全局:对 Graph 中所有节点生效
result, err := compiledGraph.Invoke(ctx, input,
    compose.WithCallbacks(handler))

// 2. 按组件类型:仅对特定类型组件生效
result, err := compiledGraph.Invoke(ctx, input,
    compose.WithCallbacks(handler).DesignateType(components.ComponentOfChatModel))

// 3. 按节点名称:仅对指定节点生效
result, err := compiledGraph.Invoke(ctx, input,
    compose.WithCallbacks(handler).DesignateNode("llm_node"))

这种分层设计让你可以:

  • 全局注入 OTel 追踪(对所有节点生效)
  • 仅对 ChatModel 节点记录 token 用量
  • 仅对特定关键节点添加告警逻辑

生产级可观测性

OpenTelemetry 集成

Eino 内置了 OpenTelemetry Callback Handler,每个节点的执行自动映射为 OTel Span:

sequenceDiagram participant Client participant Graph as "CompiledGraph" participant Prompt as "PromptTemplate" participant LLM as "ChatModel" participant Tool as "ToolNode" participant OTel as "OTel Collector" Client->>Graph: Stream(ctx, input) Graph->>OTel: StartSpan("graph.execute") Graph->>Prompt: OnStart → Span("prompt_node") Prompt->>Graph: OnEnd Graph->>LLM: OnStart → Span("llm_node") LLM-->>Graph: OnEndWithStream (流式输出) Graph->>Tool: OnStart → Span("tool_node") Tool->>Graph: OnEnd Graph->>OTel: EndSpan (全部完成) Graph-->>Client: StreamReader

配置 OTel Handler

go
import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    einootel "github.com/cloudwego/eino/callbacks/otel"
)

func initTracer() func() {
    exporter, _ := otlptracehttp.New(ctx,
        otlptracehttp.WithEndpoint("otel-collector:4318"),
    )
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceName("my-agent-service"),
        )),
    )
    otel.SetTracerProvider(tp)
    return func() { tp.Shutdown(ctx) }
}

// 创建 OTel Callback Handler
otelHandler := einootel.NewHandler()

// 注入到 Graph 执行
result, err := graph.Invoke(ctx, input,
    compose.WithCallbacks(otelHandler))

每次执行会生成包含以下信息的 Span:

  • 节点名称和组件类型
  • 输入/输出摘要
  • 执行耗时
  • 错误信息(如有)

实战:全链路日志与耗时追踪

以下是一个完整的示例,展示如何同时使用 Callback 实现日志记录和耗时统计:

go
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    "github.com/cloudwego/eino/callbacks"
    "github.com/cloudwego/eino/compose"
    "github.com/cloudwego/eino/components/model"
    "github.com/cloudwego/eino/schema"
)

// 构建带耗时追踪的 Callback Handler
func buildTracingHandler() callbacks.Handler {
    return callbacks.NewHandlerBuilder().
        OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
            start := time.Now()
            ctx = context.WithValue(ctx, "start_"+info.Name, start)
            log.Printf("[TRACE] ▶ Node=%s Type=%s started", info.Name, info.Type)
            return ctx
        }).
        OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
            if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
                duration := time.Since(start)
                log.Printf("[TRACE] ◀ Node=%s Type=%s duration=%v",
                    info.Name, info.Type, duration)
            }
            return ctx
        }).
        OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
            sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
            if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
                ttft := time.Since(start)
                log.Printf("[TRACE] ⇥ Node=%s first_chunk_latency=%v", info.Name, ttft)
            }
            return ctx
        }).
        Build()
}

func main() {
    ctx := context.Background()

    // 构建 Graph(参考上一篇文章的编排方式)
    g := compose.NewGraph[string, string]()

    // ... 添加节点和边 ...

    compiled, err := g.Compile(ctx)
    if err != nil {
        log.Fatal(err)
    }

    // 注入 tracing handler
    handler := buildTracingHandler()

    // 流式执行
    sr, err := compiled.Stream(ctx, "你好,请帮我分析这段代码的性能问题",
        compose.WithCallbacks(handler))
    if err != nil {
        log.Fatal(err)
    }

    // 消费流式输出
    for {
        chunk, err := sr.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        fmt.Print(chunk)
    }
    fmt.Println()
}

运行输出示例:

code
[TRACE] ▶ Node=prompt_template Type=PromptTemplate started
[TRACE] ◀ Node=prompt_template Type=PromptTemplate duration=1.2ms
[TRACE] ▶ Node=llm Type=ChatModel started
[TRACE] ⇥ Node=llm first_chunk_latency=156ms
[TRACE] ▶ Node=output_parser Type=Lambda started
[TRACE] ◀ Node=output_parser Type=Lambda duration=0.3ms

最佳实践

流式处理

  • 面向用户的接口始终使用 Stream 模式,最大化响应体验
  • 对中间处理节点,如果不需要观察中间流数据,让引擎自动处理转换
  • 设置合理的 bufferSize,过大浪费内存,过小导致背压频繁

Callback 设计

  • 将通用可观测性(OTel、日志)注册为全局 Callback
  • 业务相关的监控(token 计数、成本追踪)限定到具体组件类型
  • Callback 中避免阻塞操作——使用异步 channel 将数据发送到后台 goroutine
  • 利用 context.Context 在 OnStart/OnEnd 之间传递状态(如计时器)

生产部署


常见问题

Q: 流式模式下网络中断怎么处理?

A: StreamReader.Recv() 会返回 error(非 io.EOF)。建议在消费循环中对非 EOF 错误做降级处理,记录断点位置用于重试或向用户提示。

Q: Callback 执行是否影响主流程性能?

A: Callback 在同一 goroutine 中同步执行。如果回调逻辑较重(如写数据库),应在回调内部将数据推入异步队列,避免阻塞主流程。

Q: 可以同时注册多个 Callback Handler 吗?

A: 可以。多个 Handler 按注册顺序依次执行。OnStart 按正序执行,OnEnd 按逆序执行,类似中间件洋葱模型。

Q: 流式 Callback 能否修改流中的数据?

A: OnEndWithStream 接收的是流的 副本(通过 Fork 实现),对副本的消费不影响下游节点接收到的原始流。这保证了 Callback 的纯观察语义。


总结

流式处理和 Callback 切面是 Eino 从「能用」到「生产可用」的关键跨越。StreamReader/StreamWriter 原语为编排提供了类型安全的流抽象,编排引擎的自动流转换屏蔽了组件间的协议差异,而 Callback 系统则在不侵入业务代码的前提下实现了全链路可观测性。

掌握这两个机制后,你可以:

  • 为用户提供实时的流式响应体验
  • 在生产环境中追踪每一步执行的耗时和异常
  • 基于 OpenTelemetry 构建完整的分布式追踪链路

在下一篇文章中,我们将进入 Eino DevKit 实战:构建你的第一个 Agent,将本文的流式和回调机制应用到真实的 Agent 构建中。


相关资源