核心摘要
在构建 LLM 应用时,单一模型调用远远不够——你需要将 Prompt 模板、模型推理、工具调用、结果处理等多个组件串联成完整的执行流。Eino 提供了三种编排 API:Chain(线性管道)、Graph(图编排)和 Workflow(字段级映射),覆盖从简单管道到复杂 Agent 的全部场景。本文通过完整代码和架构图,带你掌握这三种编排方式的核心原理与实战用法。
目录
- 核心要点
- 为什么需要编排
- Chain:线性编排
- Graph:图编排
- Workflow:字段级映射
- 编排核心能力
- 三种 API 对比
- 实战:构建 Tool Calling Agent
- 最佳实践
- 常见问题
- 总结
- 相关资源
核心要点
- Chain:线性 DAG,代码最简洁,适合「模板 → 模型 → 后处理」类直线管道
- Graph:有向图(支持循环),通过 Branch 实现条件路由,是构建 Agentic Workflow 的首选
- Workflow:有向无环图 + 字段级 MapFields 映射,适合多输入节点的复杂数据流
- 类型安全:三种 API 均通过 Go 泛型在编译期检查输入/输出类型
- 流式内建:编排引擎自动处理 Stream 拼接、分裂与合并
为什么需要编排
现代 LLM 应用通常不是一次简单的 API 调用,而是由多个步骤组成的执行流:
用户输入 → Prompt 组装 → 模型推理 → 工具调用 → 结果解析 → 响应生成
如果手动拼接这些步骤,你会面临:
- 类型不匹配:上游输出与下游输入格式不一致
- 流式传播:需要手动处理 Stream 的转发和拼接
- 并发管理:独立节点无法自动并行执行
- 可观测性:缺少统一的日志、追踪和指标注入点
Eino 的编排引擎从根本上解决了这些问题。它将组件抽象为节点(Node),通过编排 API 定义执行拓扑,再由 Compile() 生成高效的可执行实例。
Chain:线性编排
Chain 是最简单的编排方式——一条直线从头走到尾,没有分支、没有循环。
适用场景
- Prompt 模板 → ChatModel → 输出解析
- 文本预处理 → 向量化 → 存储
- 任何纯粹的管道式处理
代码示例
package main
import (
"context"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
func buildChain(ctx context.Context) {
// 创建 Chain:输入 map[string]any,输出 *schema.Message
chain, _ := compose.NewChain[map[string]any, *schema.Message]().
AppendChatTemplate(prompt). // 节点1:组装 Prompt
AppendChatModel(model). // 节点2:调用模型
AppendLambda(extractContent). // 节点3:提取内容
Compile(ctx)
// 执行
result, err := chain.Invoke(ctx, map[string]any{
"query": "what's your name?",
})
if err != nil {
panic(err)
}
fmt.Println(result.Content)
}
Chain 的设计哲学是极简:通过 Append 方法逐个添加节点,编排代码几乎与业务逻辑一样直观。
Graph:图编排
当你的执行流需要条件分支或循环时,Chain 就力不从心了。Graph 通过 Node、Edge 和 Branch 三个原语构建任意有向图。
核心概念
| 概念 | 说明 |
|---|---|
| Node | 图中的一个组件实例(ChatModel、Tool、Lambda 等) |
| Edge | 节点间的 1→1 连接 |
| Branch | N→1 条件路由,根据上游输出决定走哪条路径 |
| START / END | 特殊的入口/出口节点 |
| Compile | 验证图结构并生成可执行的 Runnable 实例 |
Agent 图结构(Mermaid 图)
这是一个典型的 ReAct Agent 循环:模型决定是否调用工具,调用后结果回传模型继续推理,直到不再需要工具调用为止。
代码示例
func buildAgentGraph(ctx context.Context) {
graph := compose.NewGraph[map[string]any, *schema.Message]()
// 添加节点
_ = graph.AddChatTemplateNode("node_template", chatTpl)
_ = graph.AddChatModelNode("node_model", chatModel)
_ = graph.AddToolsNode("node_tools", toolsNode)
_ = graph.AddLambdaNode("node_converter", takeOne)
// 定义边
_ = graph.AddEdge(compose.START, "node_template")
_ = graph.AddEdge("node_template", "node_model")
// 条件分支:模型输出是否包含工具调用
branch := compose.NewBranch(func(ctx context.Context, msg *schema.Message) (string, error) {
if len(msg.ToolCalls) > 0 {
return "node_tools", nil
}
return compose.END, nil
})
_ = graph.AddBranch("node_model", branch)
// 工具调用后的回路
_ = graph.AddEdge("node_tools", "node_converter")
_ = graph.AddEdge("node_converter", "node_model")
// 编译并执行
compiledGraph, err := graph.Compile(ctx)
if err != nil {
panic(err)
}
out, err := compiledGraph.Invoke(ctx, map[string]any{
"query": "北京这周末天气怎么样",
})
fmt.Println(out.Content)
}
Graph 的强大之处在于支持循环——node_converter → node_model 的回边让模型可以反复调用工具,形成自主决策的 Agent 循环。
Workflow:字段级映射
Workflow 是 DAG(无循环),但相比 Graph 增加了字段级数据映射能力。当多个下游节点需要上游输出的不同字段时,Workflow 是最佳选择。
数据流结构(Mermaid 图)
代码示例
func buildWorkflow(ctx context.Context) {
wf := compose.NewWorkflow[[]*schema.Message, *schema.Message]()
// 添加节点并定义输入映射
wf.AddChatModelNode("model", m).AddInput(compose.START)
wf.AddLambdaNode("lambda1", compose.InvokableLambda(lambda1)).
AddInput("model", compose.MapFields("Content", "Input"))
wf.AddLambdaNode("lambda2", compose.InvokableLambda(lambda2)).
AddInput("model", compose.MapFields("Role", "Role"))
wf.AddLambdaNode("lambda3", compose.InvokableLambda(lambda3)).
AddInput("lambda1", compose.MapFields("Output", "Query")).
AddInput("lambda2", compose.MapFields("Output", "MetaData"))
wf.End().AddInput("lambda3")
// 编译并执行
runnable, err := wf.Compile(ctx)
if err != nil {
panic(err)
}
result, err := runnable.Invoke(ctx, messages)
fmt.Println(result.Content)
}
MapFields("Content", "Input") 的含义:将上游 model 节点输出的 Content 字段,映射为下游 lambda1 节点输入的 Input 字段。这种精确的字段映射避免了传递整个对象带来的耦合。
编排核心能力
Eino 的编排引擎不仅仅是拓扑定义,它在底层提供了一整套运行时能力:
类型安全
通过 Go 泛型,编排 API 在编译期就能检查节点间的类型匹配:
// 编译器会检查 map[string]any → *schema.Message 的类型链路
chain, _ := compose.NewChain[map[string]any, *schema.Message]()
流式处理
编排引擎内置 Stream 处理三大机制:
| 场景 | 处理方式 |
|---|---|
| 上游 Stream + 下游需完整值 | 自动拼接(Concatenate) |
| 一个 Stream 被多个下游消费 | 自动分裂(Split) |
| 多个 Stream 汇入一个节点 | 自动合并(Merge) |
并发管理
Compile 阶段会分析图拓扑,自动识别可并行执行的节点路径,无需手动管理 goroutine。
切面注入(Aspect)
通过 Callback 机制,可以在不修改业务代码的情况下注入:
- 日志:记录每个节点的输入输出
- 追踪:自动生成 Span 用于分布式追踪
- 指标:统计节点耗时、Token 消耗等
Option 分配
支持按节点或按类型统一配置运行参数:
// 为特定节点设置 Option
compiledGraph.Invoke(ctx, input,
compose.WithNodeOption("node_model", model.WithTemperature(0.7)),
)
三种 API 对比
| 维度 | Chain | Graph | Workflow |
|---|---|---|---|
| 拓扑结构 | 线性 DAG | 有向图(支持循环) | DAG(无循环) |
| 数据传递 | 节点间完整传递 | 节点间完整传递 | 字段级映射 |
| 条件分支 | ❌ 不支持 | ✅ Branch | ❌ 不支持 |
| 循环 | ❌ 不支持 | ✅ 支持 | ❌ 不支持 |
| 多输入节点 | ❌ 不支持 | ❌ 不支持 | ✅ 支持 |
| 代码复杂度 | 最低 | 中等 | 中等 |
| 典型场景 | 简单管道 | Agent、条件路由 | 复杂数据转换 |
选择建议:
- 流程是直线?→ 用 Chain
- 需要分支或循环?→ 用 Graph
- 需要精确控制字段流转?→ 用 Workflow
实战:构建 Tool Calling Agent
下面是一个完整的 Tool Calling Agent 示例,结合了前文介绍的 ChatModel 和 Tool 组件:
package main
import (
"context"
"fmt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/schema"
)
// 定义工具:获取天气
func getWeather(ctx context.Context, params map[string]string) (string, error) {
city := params["city"]
return fmt.Sprintf("%s 本周末晴,气温 22-28°C", city), nil
}
func main() {
ctx := context.Background()
// 初始化组件
chatModel := initChatModel() // ChatModel 实例
chatTpl := initChatTemplate() // Prompt 模板
toolsNode := initToolsNode() // 工具节点
converter := initConverter() // 工具结果转换
// 构建 Graph
graph := compose.NewGraph[map[string]any, *schema.Message]()
_ = graph.AddChatTemplateNode("template", chatTpl)
_ = graph.AddChatModelNode("model", chatModel)
_ = graph.AddToolsNode("tools", toolsNode)
_ = graph.AddLambdaNode("converter", converter)
// 定义执行拓扑
_ = graph.AddEdge(compose.START, "template")
_ = graph.AddEdge("template", "model")
// 条件分支:是否需要调用工具
branch := compose.NewBranch(func(ctx context.Context, msg *schema.Message) (string, error) {
if len(msg.ToolCalls) > 0 {
return "tools", nil
}
return compose.END, nil
})
_ = graph.AddBranch("model", branch)
// 工具调用回路
_ = graph.AddEdge("tools", "converter")
_ = graph.AddEdge("converter", "model")
// 编译
agent, err := graph.Compile(ctx)
if err != nil {
panic(fmt.Sprintf("compile error: %v", err))
}
// 执行
result, err := agent.Invoke(ctx, map[string]any{
"query": "北京这周末天气怎么样?",
})
if err != nil {
panic(err)
}
fmt.Println(result.Content)
// 输出:北京本周末晴,气温 22-28°C,适合户外活动...
}
这段代码展示了 Eino Graph 编排的核心优势:用声明式的拓扑定义取代命令式的流程控制,让 Agent 逻辑清晰可维护。
最佳实践
从简单开始,按需升级
不要过度设计——如果 Chain 能满足需求,就不要用 Graph。编排 API 的选择应该匹配实际复杂度。
善用 Compile 的验证能力
Compile() 不仅生成可执行实例,还会验证图的完整性。建议在应用启动时执行编译,将图定义错误提前暴露。
利用 Option 做运行时调优
不要硬编码模型参数。通过 WithNodeOption 在调用时动态调整 Temperature、MaxTokens 等参数,便于 A/B 测试和灰度。
为关键节点添加 Callback
生产环境中务必为 ChatModel 节点添加 Callback,记录 Token 消耗和响应延迟,这是成本控制的基础。
利用 Go 的类型系统做防御
定义清晰的输入/输出结构体,利用泛型让编译器帮你检查类型链路。处理 JSON 数据结构时,使用 JSON to Go 工具快速生成类型定义。
常见问题
Chain 可以添加条件分支吗?
不可以。Chain 严格限制为线性 DAG。如果需要条件分支,请切换到 Graph。
Graph 的循环会不会导致死循环?
Branch 的路由函数负责决定何时退出循环。建议设置最大迭代次数作为安全阀,避免模型陷入无限工具调用。
Workflow 支持循环吗?
不支持。Workflow 是严格的 DAG(有向无环图)。如果需要循环加字段映射,可以考虑在 Graph 中嵌套 Workflow 作为子图。
编排的性能开销大吗?
编排引擎的开销极小(微秒级),相比 LLM 推理的毫秒到秒级延迟完全可以忽略。Compile 阶段会预计算执行计划,运行时几乎零开销。
如何调试复杂的 Graph?
利用 Callback 机制在每个节点的入口和出口打印日志。Eino 同时支持集成 OpenTelemetry,可以在分布式追踪系统中可视化每个节点的执行情况。
总结
Eino 的编排引擎为 Go 开发者提供了一套完整且类型安全的组件组合方案。三种 API 覆盖了从简单管道到复杂 Agent 的全部需求:
- Chain 解决 80% 的简单串联场景
- Graph 解决需要条件分支和循环的 Agent 场景
- Workflow 解决需要精确数据流转的复杂转换场景
结合 Eino 框架总览中介绍的设计理念和核心组件,你已经具备了用 Eino 构建生产级 AI 应用的全部基础知识。