核心摘要
RAG(Retrieval-Augmented Generation)是当前大模型落地最务实的架构模式——让模型基于检索到的真实数据回答问题,而非依赖训练时的参数记忆。Eino 框架为 Go 开发者提供了完整的 RAG Pipeline 组件体系,从文档加载到向量检索再到答案生成,每个环节都有标准接口和生产级实现。
本文将带你完整走通 Eino RAG Pipeline 的全链路实现,并深入探讨生产环境中的优化策略。
目录
- RAG 核心工作流回顾
- Eino RAG 组件全景
- Document Loader:多源文档加载
- Document Transformer:切分策略
- Embedding:向量化
- Indexer:向量入库
- Retriever:语义检索
- Reranker:精排策略
- 生产优化方案
- 实战:企业知识库问答系统
- 最佳实践
- 总结
- 相关资源
RAG 核心工作流回顾
RAG 将"检索"与"生成"分为两个阶段,核心流程可概括为三步:
Index 阶段(离线):将原始文档切分、向量化后写入检索引擎。
Retrieve 阶段(在线):将用户问题向量化后,从索引中召回相关文档片段。
Generate 阶段(在线):将召回的上下文与用户问题一起送入 LLM 生成答案。
Eino RAG 组件全景
Eino 将 RAG Pipeline 中的每个环节都抽象为独立的 Go interface:
| 组件 | 接口 | 职责 |
|---|---|---|
| Document Loader | DocumentLoader |
从各种数据源加载原始文档 |
| Document Transformer | DocumentTransformer |
文档切分、清洗、元数据增强 |
| Embedding | Embedding |
文本转向量 |
| Indexer | Indexer |
向量写入存储引擎 |
| Retriever | Retriever |
从向量库中检索相关文档 |
这套组件的核心设计原则:接口抽象 + Option 模式。上层编排代码只依赖接口,底层实现(用 ElasticSearch 还是 VikingDB、用 OpenAI Embedding 还是本地模型)可以随时替换。
Document Loader:多源文档加载
Document Loader 负责将各种格式的原始数据加载为统一的 Document 结构。
接口定义
type DocumentLoader interface {
Load(ctx context.Context, opts ...LoaderOption) ([]*Document, error)
}
type Document struct {
Content string
MetaData map[string]any
}
Eino 内置实现
Eino 提供了多种开箱即用的 Loader:
import (
"github.com/cloudwego/eino-ext/components/document/loader/file"
"github.com/cloudwego/eino-ext/components/document/loader/web"
"github.com/cloudwego/eino-ext/components/document/loader/s3"
)
// 本地文件加载
fileLoader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{
Path: "./docs/product-manual.pdf",
})
// Web 页面加载
webLoader, _ := web.NewWebLoader(ctx, &web.WebLoaderConfig{
URLs: []string{
"https://docs.example.com/api-reference",
"https://docs.example.com/getting-started",
},
MaxDepth: 2,
})
// S3 对象存储加载
s3Loader, _ := s3.NewS3Loader(ctx, &s3.S3LoaderConfig{
Bucket: "knowledge-base",
Prefix: "documents/",
Region: "us-east-1",
})
docs, err := fileLoader.Load(ctx)
自定义 Loader
对于特殊数据源(如企业内部 Wiki、Confluence 等),只需实现 DocumentLoader 接口:
type ConfluenceLoader struct {
client *confluence.Client
spaceKey string
}
func (l *ConfluenceLoader) Load(ctx context.Context, opts ...LoaderOption) ([]*Document, error) {
pages, err := l.client.GetSpacePages(ctx, l.spaceKey)
if err != nil {
return nil, err
}
docs := make([]*Document, 0, len(pages))
for _, page := range pages {
docs = append(docs, &Document{
Content: page.Body,
MetaData: map[string]any{
"source": page.URL,
"title": page.Title,
"updated_at": page.UpdatedAt,
},
})
}
return docs, nil
}
Document Transformer:切分策略
文档切分是 RAG 质量的关键瓶颈。切分过粗,检索时噪声过大;切分过细,丢失上下文语义。
接口定义
type DocumentTransformer interface {
Transform(ctx context.Context, docs []*Document, opts ...TransformerOption) ([]*Document, error)
}
文本切分器
import "github.com/cloudwego/eino-ext/components/document/transformer"
// 按 token 数切分,保留 overlap 上下文
splitter, _ := transformer.NewTokenSplitter(ctx, &transformer.TokenSplitterConfig{
ChunkSize: 512, // 每个 chunk 最大 512 tokens
ChunkOverlap: 64, // 相邻 chunk 重叠 64 tokens
ModelName: "gpt-4",
})
chunks, err := splitter.Transform(ctx, docs)
HTML 结构化切分
对于 Web 抓取的 HTML 内容,按 DOM 结构切分能保留更好的语义边界:
htmlSplitter, _ := transformer.NewHTMLSplitter(ctx, &transformer.HTMLSplitterConfig{
Headers: []string{"h1", "h2", "h3"},
ChunkSize: 1000,
})
递归字符切分
对于没有明确结构的纯文本,递归切分是最通用的策略:
recursiveSplitter, _ := transformer.NewRecursiveCharSplitter(ctx, &transformer.RecursiveCharSplitterConfig{
ChunkSize: 800,
ChunkOverlap: 100,
Separators: []string{"\n\n", "\n", "。", ".", " "},
})
递归切分会按 Separators 的优先级逐级尝试,优先在段落边界处切分,保证语义完整性。
Embedding:向量化
Embedding 组件将文本转换为高维向量,是连接"文本世界"和"向量空间"的桥梁。
type Embedding interface {
EmbedStrings(ctx context.Context, texts []string, opts ...EmbeddingOption) ([][]float64, error)
}
OpenAI Embedding
import "github.com/cloudwego/eino-ext/components/embedding/openai"
embedder, _ := openai.NewEmbedding(ctx, &openai.EmbeddingConfig{
Model: "text-embedding-3-small",
APIKey: os.Getenv("OPENAI_API_KEY"),
})
vectors, err := embedder.EmbedStrings(ctx, []string{
"Eino 是字节跳动开源的 Go AI 应用开发框架",
"RAG 将检索与生成相结合提升回答质量",
})
// vectors[0] => []float64{0.012, -0.034, ...} (1536 维)
Ark Embedding(火山引擎)
import "github.com/cloudwego/eino-ext/components/embedding/ark"
arkEmbedder, _ := ark.NewEmbedding(ctx, &ark.EmbeddingConfig{
EndpointID: os.Getenv("ARK_ENDPOINT_ID"),
AK: os.Getenv("ARK_AK"),
SK: os.Getenv("ARK_SK"),
})
批量处理优化
生产环境中向量化大量文档时,注意控制批次大小和并发:
const batchSize = 100
for i := 0; i < len(texts); i += batchSize {
end := min(i+batchSize, len(texts))
batch := texts[i:end]
vectors, err := embedder.EmbedStrings(ctx, batch)
if err != nil {
return fmt.Errorf("embedding batch %d failed: %w", i/batchSize, err)
}
// 写入 Indexer...
}
Indexer:向量入库
Indexer 将文档及其向量写入存储引擎,供后续检索使用。
type Indexer interface {
Store(ctx context.Context, docs []*Document, opts ...IndexerOption) ([]string, error)
}
ElasticSearch Indexer
import "github.com/cloudwego/eino-ext/components/indexer/es"
esIndexer, _ := es.NewIndexer(ctx, &es.IndexerConfig{
Addresses: []string{"http://localhost:9200"},
Index: "knowledge-base",
Username: os.Getenv("ES_USER"),
Password: os.Getenv("ES_PASS"),
})
ids, err := esIndexer.Store(ctx, chunks)
VikingDB Indexer
import "github.com/cloudwego/eino-ext/components/indexer/vikingdb"
vdbIndexer, _ := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{
Collection: "enterprise-kb",
Host: os.Getenv("VIKINGDB_HOST"),
AK: os.Getenv("VIKINGDB_AK"),
SK: os.Getenv("VIKINGDB_SK"),
})
Retriever:语义检索
Retriever 是 RAG 在线阶段的核心——将用户 query 转化为向量,从索引中召回最相关的文档片段。
type Retriever interface {
Retrieve(ctx context.Context, query string, opts ...RetrieverOption) ([]*Document, error)
}
基础语义检索
import "github.com/cloudwego/eino-ext/components/retriever/es"
retriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "knowledge-base",
TopK: 10,
VectorField: "embedding",
})
results, err := retriever.Retrieve(ctx, "Eino 如何实现流式输出?")
for _, doc := range results {
fmt.Printf("Score: %.4f | %s\n", doc.MetaData["score"], doc.Content[:100])
}
稀疏检索(BM25)
除了稠密向量检索,Eino 也支持基于关键词的稀疏检索:
bm25Retriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "knowledge-base",
TopK: 10,
SearchMode: es.SearchModeBM25,
TextField: "content",
})
多路召回融合(Hybrid Search)
生产环境推荐将稠密检索和稀疏检索结合,互补各自的短板:
import "github.com/cloudwego/eino/compose"
// 构建多路召回 Graph
graph := compose.NewGraph[string, []*Document]()
graph.AddNode("dense_retriever", denseRetriever)
graph.AddNode("sparse_retriever", sparseRetriever)
graph.AddNode("merge_results", compose.NewLambda(mergeAndDeduplicate))
graph.AddEdge(compose.START, "dense_retriever")
graph.AddEdge(compose.START, "sparse_retriever")
graph.AddEdge("dense_retriever", "merge_results")
graph.AddEdge("sparse_retriever", "merge_results")
graph.AddEdge("merge_results", compose.END)
Reranker:精排策略
Retriever 召回的文档可能存在噪声,Reranker 通过更精细的相关性计算做二次排序。
ScoreReranker
基于简单分数阈值过滤低质量结果:
import "github.com/cloudwego/eino-ext/components/retriever/reranker"
scoreReranker := reranker.NewScoreReranker(&reranker.ScoreRerankerConfig{
MinScore: 0.7,
TopN: 5,
})
rerankedDocs, err := scoreReranker.Rerank(ctx, query, retrievedDocs)
Cross-Encoder Reranker
使用 Cross-Encoder 模型(如 bge-reranker)计算 query-document pair 的精细相关度:
crossReranker := reranker.NewCrossEncoderReranker(&reranker.CrossEncoderConfig{
Model: "bge-reranker-v2-m3",
Endpoint: os.Getenv("RERANKER_ENDPOINT"),
TopN: 5,
})
// Cross-Encoder 将 query 和每个 doc 拼接后过模型打分
// 比 bi-encoder 的向量余弦相似度更精准,但计算成本更高
finalDocs, err := crossReranker.Rerank(ctx, query, retrievedDocs)
选型建议
| 方案 | 精度 | 延迟 | 适用场景 |
|---|---|---|---|
| ScoreReranker | 低 | <1ms | 对延迟极度敏感的场景 |
| Cross-Encoder | 高 | 50-200ms | 对质量要求高、候选集<50 |
| 两级组合 | 高 | 适中 | 先 Score 过滤到 top-20,再 Cross-Encoder 精排到 top-5 |
生产优化方案
1. 切分策略对检索质量的影响
不同的文档类型需要不同的切分策略:
| 文档类型 | 推荐策略 | Chunk Size | Overlap |
|---|---|---|---|
| API 文档 | HTML 结构切分 | 800-1200 | 100 |
| 技术博客 | 递归字符切分 | 500-800 | 80 |
| FAQ | 按 QA 对切分 | 不限 | 0 |
| 代码文件 | 按函数/类切分 | 不限 | 0 |
2. 混合检索(Hybrid Search)
单纯的语义检索在某些场景下有盲区:
- 用户搜索 "ERR_CONNECTION_REFUSED" 这样的错误码时,稠密向量可能无法精确匹配
- 精确的产品名称、型号等关键词需要精确匹配
生产推荐方案:稠密检索 70% 权重 + 稀疏检索 30% 权重,通过 RRF(Reciprocal Rank Fusion)合并排名:
func mergeByRRF(denseResults, sparseResults []*Document, k int) []*Document {
scores := make(map[string]float64)
const rrfK = 60
for rank, doc := range denseResults {
id := doc.MetaData["id"].(string)
scores[id] += 0.7 / float64(rrfK+rank+1)
}
for rank, doc := range sparseResults {
id := doc.MetaData["id"].(string)
scores[id] += 0.3 / float64(rrfK+rank+1)
}
// 按融合分数排序后返回 top-k
return sortByScoreAndTruncate(scores, k)
}
3. 缓存与增量索引
生产环境中,完整重建索引成本很高。推荐的增量策略:
type IncrementalIndexer struct {
indexer Indexer
embedding Embedding
cache *redis.Client
}
func (i *IncrementalIndexer) IndexIfChanged(ctx context.Context, doc *Document) error {
// 计算文档内容哈希
hash := sha256.Sum256([]byte(doc.Content))
cacheKey := fmt.Sprintf("doc:hash:%s", doc.MetaData["id"])
// 检查是否有变更
cached, _ := i.cache.Get(ctx, cacheKey).Result()
if cached == hex.EncodeToString(hash[:]) {
return nil // 未变更,跳过
}
// 向量化并写入索引
vectors, err := i.embedding.EmbedStrings(ctx, []string{doc.Content})
if err != nil {
return err
}
doc.MetaData["embedding"] = vectors[0]
_, err = i.indexer.Store(ctx, []*Document{doc})
if err != nil {
return err
}
// 更新缓存
i.cache.Set(ctx, cacheKey, hex.EncodeToString(hash[:]), 0)
return nil
}
实战:企业知识库问答系统
下面用 Eino 构建一个完整的企业知识库问答系统,串联全部 RAG 组件:
系统架构
索引构建
package main
import (
"context"
"log"
"github.com/cloudwego/eino-ext/components/document/loader/file"
"github.com/cloudwego/eino-ext/components/document/transformer"
embeddingOAI "github.com/cloudwego/eino-ext/components/embedding/openai"
"github.com/cloudwego/eino-ext/components/indexer/es"
)
func buildIndex(ctx context.Context) error {
// 1. 加载文档
loader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{
Path: "./knowledge-base/",
Glob: "**/*.md",
})
docs, err := loader.Load(ctx)
if err != nil {
return err
}
log.Printf("Loaded %d documents", len(docs))
// 2. 切分文档
splitter, _ := transformer.NewRecursiveCharSplitter(ctx, &transformer.RecursiveCharSplitterConfig{
ChunkSize: 600,
ChunkOverlap: 80,
Separators: []string{"\n\n", "\n", "。", ". ", " "},
})
chunks, err := splitter.Transform(ctx, docs)
if err != nil {
return err
}
log.Printf("Split into %d chunks", len(chunks))
// 3. 向量化
embedder, _ := embeddingOAI.NewEmbedding(ctx, &embeddingOAI.EmbeddingConfig{
Model: "text-embedding-3-small",
})
// 分批处理
const batchSize = 50
for i := 0; i < len(chunks); i += batchSize {
end := min(i+batchSize, len(chunks))
batch := chunks[i:end]
texts := make([]string, len(batch))
for j, chunk := range batch {
texts[j] = chunk.Content
}
vectors, err := embedder.EmbedStrings(ctx, texts)
if err != nil {
return err
}
for j, vec := range vectors {
batch[j].MetaData["embedding"] = vec
}
}
// 4. 写入 ES
indexer, _ := es.NewIndexer(ctx, &es.IndexerConfig{
Addresses: []string{"http://localhost:9200"},
Index: "enterprise-kb",
})
_, err = indexer.Store(ctx, chunks)
return err
}
在线问答
func answerQuestion(ctx context.Context, question string) (string, error) {
// 1. 多路检索
denseRetriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "enterprise-kb",
TopK: 15,
VectorField: "embedding",
})
sparseRetriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "enterprise-kb",
TopK: 15,
SearchMode: es.SearchModeBM25,
TextField: "content",
})
denseResults, _ := denseRetriever.Retrieve(ctx, question)
sparseResults, _ := sparseRetriever.Retrieve(ctx, question)
// 2. 融合排序
merged := mergeByRRF(denseResults, sparseResults, 10)
// 3. Reranker 精排
reranker := reranker.NewCrossEncoderReranker(&reranker.CrossEncoderConfig{
Model: "bge-reranker-v2-m3",
TopN: 5,
})
finalDocs, _ := reranker.Rerank(ctx, question, merged)
// 4. 拼装上下文
var contextStr string
for i, doc := range finalDocs {
contextStr += fmt.Sprintf("[%d] %s\n\n", i+1, doc.Content)
}
// 5. 调用 LLM 生成答案
prompt := fmt.Sprintf(`基于以下参考资料回答用户问题。如果资料中没有相关信息,请明确告知。
参考资料:
%s
用户问题:%s`, contextStr, question)
model, _ := ark.NewChatModel(ctx, &ark.ChatModelConfig{
EndpointID: os.Getenv("ARK_CHAT_ENDPOINT"),
})
resp, err := model.Generate(ctx, []*Message{
{Role: RoleUser, Content: prompt},
})
if err != nil {
return "", err
}
return resp.Content, nil
}
最佳实践
- 先 BM25 后语义:调试阶段先用 BM25 baseline 确认文档覆盖度,再叠加语义检索
- Chunk 大小实验:不同数据集最优切分大小不同,建议用评测集对比 256/512/800/1024 四档
- 保留元数据:每个 chunk 必须保留
source、title、page等溯源信息,方便问答时引用 - Embedding 模型选型:多语言场景选
text-embedding-3-large,纯中文可考虑 BGE-M3 - 监控检索质量:上线后持续收集 bad case,用 MRR/NDCG 指标量化检索效果
总结
Eino 的 RAG Pipeline 体系将复杂的知识检索增强生成分解为标准化的组件链路。通过 Document Loader → Transformer → Embedding → Indexer 完成离线索引构建,通过 Retriever → Reranker → ChatModel 完成在线问答,每个环节都有清晰的接口契约和可替换的实现。
关键的生产经验:切分策略决定了检索的天花板,Hybrid Search 和 Reranker 决定了检索的下限。投入精力优化这三个环节,往往比更换更贵的模型更有效。