核心摘要

RAG(Retrieval-Augmented Generation)是当前大模型落地最务实的架构模式——让模型基于检索到的真实数据回答问题,而非依赖训练时的参数记忆。Eino 框架为 Go 开发者提供了完整的 RAG Pipeline 组件体系,从文档加载到向量检索再到答案生成,每个环节都有标准接口和生产级实现。

本文将带你完整走通 Eino RAG Pipeline 的全链路实现,并深入探讨生产环境中的优化策略。

本文是 Eino 框架系列 第七篇。如果你对 Eino 组件体系还不熟悉,建议先阅读 核心组件详解


目录

  1. RAG 核心工作流回顾
  2. Eino RAG 组件全景
  3. Document Loader:多源文档加载
  4. Document Transformer:切分策略
  5. Embedding:向量化
  6. Indexer:向量入库
  7. Retriever:语义检索
  8. Reranker:精排策略
  9. 生产优化方案
  10. 实战:企业知识库问答系统
  11. 最佳实践
  12. 总结
  13. 相关资源

RAG 核心工作流回顾

RAG 将"检索"与"生成"分为两个阶段,核心流程可概括为三步:

graph LR A[Index 索引构建] --> B[Retrieve 检索召回] B --> C[Generate 答案生成] subgraph idx["Index"] A1[文档加载] --> A2[切分预处理] A2 --> A3[向量化] A3 --> A4[写入向量库] end subgraph ret["Retrieve"] B1[Query 向量化] --> B2[相似度检索] B2 --> B3[Rerank 精排] end subgraph gen["Generate"] C1[拼装 Prompt] --> C2[LLM 生成] end

Index 阶段(离线):将原始文档切分、向量化后写入检索引擎。
Retrieve 阶段(在线):将用户问题向量化后,从索引中召回相关文档片段。
Generate 阶段(在线):将召回的上下文与用户问题一起送入 LLM 生成答案。


Eino RAG 组件全景

Eino 将 RAG Pipeline 中的每个环节都抽象为独立的 Go interface:

组件 接口 职责
Document Loader DocumentLoader 从各种数据源加载原始文档
Document Transformer DocumentTransformer 文档切分、清洗、元数据增强
Embedding Embedding 文本转向量
Indexer Indexer 向量写入存储引擎
Retriever Retriever 从向量库中检索相关文档

这套组件的核心设计原则:接口抽象 + Option 模式。上层编排代码只依赖接口,底层实现(用 ElasticSearch 还是 VikingDB、用 OpenAI Embedding 还是本地模型)可以随时替换。


Document Loader:多源文档加载

Document Loader 负责将各种格式的原始数据加载为统一的 Document 结构。

接口定义

go
type DocumentLoader interface {
    Load(ctx context.Context, opts ...LoaderOption) ([]*Document, error)
}

type Document struct {
    Content  string
    MetaData map[string]any
}

Eino 内置实现

Eino 提供了多种开箱即用的 Loader:

go
import (
    "github.com/cloudwego/eino-ext/components/document/loader/file"
    "github.com/cloudwego/eino-ext/components/document/loader/web"
    "github.com/cloudwego/eino-ext/components/document/loader/s3"
)

// 本地文件加载
fileLoader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{
    Path: "./docs/product-manual.pdf",
})

// Web 页面加载
webLoader, _ := web.NewWebLoader(ctx, &web.WebLoaderConfig{
    URLs: []string{
        "https://docs.example.com/api-reference",
        "https://docs.example.com/getting-started",
    },
    MaxDepth: 2,
})

// S3 对象存储加载
s3Loader, _ := s3.NewS3Loader(ctx, &s3.S3LoaderConfig{
    Bucket: "knowledge-base",
    Prefix: "documents/",
    Region: "us-east-1",
})

docs, err := fileLoader.Load(ctx)

自定义 Loader

对于特殊数据源(如企业内部 Wiki、Confluence 等),只需实现 DocumentLoader 接口:

go
type ConfluenceLoader struct {
    client *confluence.Client
    spaceKey string
}

func (l *ConfluenceLoader) Load(ctx context.Context, opts ...LoaderOption) ([]*Document, error) {
    pages, err := l.client.GetSpacePages(ctx, l.spaceKey)
    if err != nil {
        return nil, err
    }

    docs := make([]*Document, 0, len(pages))
    for _, page := range pages {
        docs = append(docs, &Document{
            Content: page.Body,
            MetaData: map[string]any{
                "source": page.URL,
                "title":  page.Title,
                "updated_at": page.UpdatedAt,
            },
        })
    }
    return docs, nil
}

Document Transformer:切分策略

文档切分是 RAG 质量的关键瓶颈。切分过粗,检索时噪声过大;切分过细,丢失上下文语义。

接口定义

go
type DocumentTransformer interface {
    Transform(ctx context.Context, docs []*Document, opts ...TransformerOption) ([]*Document, error)
}

文本切分器

go
import "github.com/cloudwego/eino-ext/components/document/transformer"

// 按 token 数切分,保留 overlap 上下文
splitter, _ := transformer.NewTokenSplitter(ctx, &transformer.TokenSplitterConfig{
    ChunkSize:    512,   // 每个 chunk 最大 512 tokens
    ChunkOverlap: 64,    // 相邻 chunk 重叠 64 tokens
    ModelName:    "gpt-4",
})

chunks, err := splitter.Transform(ctx, docs)

HTML 结构化切分

对于 Web 抓取的 HTML 内容,按 DOM 结构切分能保留更好的语义边界:

go
htmlSplitter, _ := transformer.NewHTMLSplitter(ctx, &transformer.HTMLSplitterConfig{
    Headers: []string{"h1", "h2", "h3"},
    ChunkSize: 1000,
})

递归字符切分

对于没有明确结构的纯文本,递归切分是最通用的策略:

go
recursiveSplitter, _ := transformer.NewRecursiveCharSplitter(ctx, &transformer.RecursiveCharSplitterConfig{
    ChunkSize:    800,
    ChunkOverlap: 100,
    Separators:   []string{"\n\n", "\n", "。", ".", " "},
})

递归切分会按 Separators 的优先级逐级尝试,优先在段落边界处切分,保证语义完整性。


Embedding:向量化

Embedding 组件将文本转换为高维向量,是连接"文本世界"和"向量空间"的桥梁。

go
type Embedding interface {
    EmbedStrings(ctx context.Context, texts []string, opts ...EmbeddingOption) ([][]float64, error)
}

OpenAI Embedding

go
import "github.com/cloudwego/eino-ext/components/embedding/openai"

embedder, _ := openai.NewEmbedding(ctx, &openai.EmbeddingConfig{
    Model:  "text-embedding-3-small",
    APIKey: os.Getenv("OPENAI_API_KEY"),
})

vectors, err := embedder.EmbedStrings(ctx, []string{
    "Eino 是字节跳动开源的 Go AI 应用开发框架",
    "RAG 将检索与生成相结合提升回答质量",
})
// vectors[0] => []float64{0.012, -0.034, ...}  (1536 维)

Ark Embedding(火山引擎)

go
import "github.com/cloudwego/eino-ext/components/embedding/ark"

arkEmbedder, _ := ark.NewEmbedding(ctx, &ark.EmbeddingConfig{
    EndpointID: os.Getenv("ARK_ENDPOINT_ID"),
    AK:         os.Getenv("ARK_AK"),
    SK:         os.Getenv("ARK_SK"),
})

批量处理优化

生产环境中向量化大量文档时,注意控制批次大小和并发:

go
const batchSize = 100

for i := 0; i < len(texts); i += batchSize {
    end := min(i+batchSize, len(texts))
    batch := texts[i:end]

    vectors, err := embedder.EmbedStrings(ctx, batch)
    if err != nil {
        return fmt.Errorf("embedding batch %d failed: %w", i/batchSize, err)
    }
    // 写入 Indexer...
}

Indexer:向量入库

Indexer 将文档及其向量写入存储引擎,供后续检索使用。

go
type Indexer interface {
    Store(ctx context.Context, docs []*Document, opts ...IndexerOption) ([]string, error)
}

ElasticSearch Indexer

go
import "github.com/cloudwego/eino-ext/components/indexer/es"

esIndexer, _ := es.NewIndexer(ctx, &es.IndexerConfig{
    Addresses: []string{"http://localhost:9200"},
    Index:     "knowledge-base",
    Username:  os.Getenv("ES_USER"),
    Password:  os.Getenv("ES_PASS"),
})

ids, err := esIndexer.Store(ctx, chunks)

VikingDB Indexer

go
import "github.com/cloudwego/eino-ext/components/indexer/vikingdb"

vdbIndexer, _ := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{
    Collection: "enterprise-kb",
    Host:       os.Getenv("VIKINGDB_HOST"),
    AK:         os.Getenv("VIKINGDB_AK"),
    SK:         os.Getenv("VIKINGDB_SK"),
})

Retriever:语义检索

Retriever 是 RAG 在线阶段的核心——将用户 query 转化为向量,从索引中召回最相关的文档片段。

go
type Retriever interface {
    Retrieve(ctx context.Context, query string, opts ...RetrieverOption) ([]*Document, error)
}

基础语义检索

go
import "github.com/cloudwego/eino-ext/components/retriever/es"

retriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
    Addresses:  []string{"http://localhost:9200"},
    Index:      "knowledge-base",
    TopK:       10,
    VectorField: "embedding",
})

results, err := retriever.Retrieve(ctx, "Eino 如何实现流式输出?")
for _, doc := range results {
    fmt.Printf("Score: %.4f | %s\n", doc.MetaData["score"], doc.Content[:100])
}

稀疏检索(BM25)

除了稠密向量检索,Eino 也支持基于关键词的稀疏检索:

go
bm25Retriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
    Addresses: []string{"http://localhost:9200"},
    Index:     "knowledge-base",
    TopK:      10,
    SearchMode: es.SearchModeBM25,
    TextField:  "content",
})

多路召回融合(Hybrid Search)

生产环境推荐将稠密检索和稀疏检索结合,互补各自的短板:

go
import "github.com/cloudwego/eino/compose"

// 构建多路召回 Graph
graph := compose.NewGraph[string, []*Document]()

graph.AddNode("dense_retriever", denseRetriever)
graph.AddNode("sparse_retriever", sparseRetriever)
graph.AddNode("merge_results", compose.NewLambda(mergeAndDeduplicate))

graph.AddEdge(compose.START, "dense_retriever")
graph.AddEdge(compose.START, "sparse_retriever")
graph.AddEdge("dense_retriever", "merge_results")
graph.AddEdge("sparse_retriever", "merge_results")
graph.AddEdge("merge_results", compose.END)

Reranker:精排策略

Retriever 召回的文档可能存在噪声,Reranker 通过更精细的相关性计算做二次排序。

ScoreReranker

基于简单分数阈值过滤低质量结果:

go
import "github.com/cloudwego/eino-ext/components/retriever/reranker"

scoreReranker := reranker.NewScoreReranker(&reranker.ScoreRerankerConfig{
    MinScore: 0.7,
    TopN:     5,
})

rerankedDocs, err := scoreReranker.Rerank(ctx, query, retrievedDocs)

Cross-Encoder Reranker

使用 Cross-Encoder 模型(如 bge-reranker)计算 query-document pair 的精细相关度:

go
crossReranker := reranker.NewCrossEncoderReranker(&reranker.CrossEncoderConfig{
    Model:    "bge-reranker-v2-m3",
    Endpoint: os.Getenv("RERANKER_ENDPOINT"),
    TopN:     5,
})

// Cross-Encoder 将 query 和每个 doc 拼接后过模型打分
// 比 bi-encoder 的向量余弦相似度更精准,但计算成本更高
finalDocs, err := crossReranker.Rerank(ctx, query, retrievedDocs)

选型建议

方案 精度 延迟 适用场景
ScoreReranker <1ms 对延迟极度敏感的场景
Cross-Encoder 50-200ms 对质量要求高、候选集<50
两级组合 适中 先 Score 过滤到 top-20,再 Cross-Encoder 精排到 top-5

生产优化方案

1. 切分策略对检索质量的影响

不同的文档类型需要不同的切分策略:

文档类型 推荐策略 Chunk Size Overlap
API 文档 HTML 结构切分 800-1200 100
技术博客 递归字符切分 500-800 80
FAQ 按 QA 对切分 不限 0
代码文件 按函数/类切分 不限 0

2. 混合检索(Hybrid Search)

单纯的语义检索在某些场景下有盲区:

  • 用户搜索 "ERR_CONNECTION_REFUSED" 这样的错误码时,稠密向量可能无法精确匹配
  • 精确的产品名称、型号等关键词需要精确匹配

生产推荐方案:稠密检索 70% 权重 + 稀疏检索 30% 权重,通过 RRF(Reciprocal Rank Fusion)合并排名:

go
func mergeByRRF(denseResults, sparseResults []*Document, k int) []*Document {
    scores := make(map[string]float64)
    const rrfK = 60

    for rank, doc := range denseResults {
        id := doc.MetaData["id"].(string)
        scores[id] += 0.7 / float64(rrfK+rank+1)
    }
    for rank, doc := range sparseResults {
        id := doc.MetaData["id"].(string)
        scores[id] += 0.3 / float64(rrfK+rank+1)
    }

    // 按融合分数排序后返回 top-k
    return sortByScoreAndTruncate(scores, k)
}

3. 缓存与增量索引

生产环境中,完整重建索引成本很高。推荐的增量策略:

go
type IncrementalIndexer struct {
    indexer   Indexer
    embedding Embedding
    cache     *redis.Client
}

func (i *IncrementalIndexer) IndexIfChanged(ctx context.Context, doc *Document) error {
    // 计算文档内容哈希
    hash := sha256.Sum256([]byte(doc.Content))
    cacheKey := fmt.Sprintf("doc:hash:%s", doc.MetaData["id"])

    // 检查是否有变更
    cached, _ := i.cache.Get(ctx, cacheKey).Result()
    if cached == hex.EncodeToString(hash[:]) {
        return nil // 未变更,跳过
    }

    // 向量化并写入索引
    vectors, err := i.embedding.EmbedStrings(ctx, []string{doc.Content})
    if err != nil {
        return err
    }

    doc.MetaData["embedding"] = vectors[0]
    _, err = i.indexer.Store(ctx, []*Document{doc})
    if err != nil {
        return err
    }

    // 更新缓存
    i.cache.Set(ctx, cacheKey, hex.EncodeToString(hash[:]), 0)
    return nil
}

实战:企业知识库问答系统

下面用 Eino 构建一个完整的企业知识库问答系统,串联全部 RAG 组件:

系统架构

graph TB subgraph offline["离线索引"] A[文档源] --> B[Document Loader] B --> C[Document Transformer] C --> D[Embedding] D --> E["Indexer → ES"] end subgraph online["在线问答"] F[用户提问] --> G[Query Embedding] G --> H[Retriever 多路召回] H --> I[Reranker 精排] I --> J[Prompt 拼装] J --> K[ChatModel 生成] K --> L[回答] end

索引构建

go
package main

import (
    "context"
    "log"

    "github.com/cloudwego/eino-ext/components/document/loader/file"
    "github.com/cloudwego/eino-ext/components/document/transformer"
    embeddingOAI "github.com/cloudwego/eino-ext/components/embedding/openai"
    "github.com/cloudwego/eino-ext/components/indexer/es"
)

func buildIndex(ctx context.Context) error {
    // 1. 加载文档
    loader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{
        Path: "./knowledge-base/",
        Glob: "**/*.md",
    })
    docs, err := loader.Load(ctx)
    if err != nil {
        return err
    }
    log.Printf("Loaded %d documents", len(docs))

    // 2. 切分文档
    splitter, _ := transformer.NewRecursiveCharSplitter(ctx, &transformer.RecursiveCharSplitterConfig{
        ChunkSize:    600,
        ChunkOverlap: 80,
        Separators:   []string{"\n\n", "\n", "。", ". ", " "},
    })
    chunks, err := splitter.Transform(ctx, docs)
    if err != nil {
        return err
    }
    log.Printf("Split into %d chunks", len(chunks))

    // 3. 向量化
    embedder, _ := embeddingOAI.NewEmbedding(ctx, &embeddingOAI.EmbeddingConfig{
        Model: "text-embedding-3-small",
    })

    // 分批处理
    const batchSize = 50
    for i := 0; i < len(chunks); i += batchSize {
        end := min(i+batchSize, len(chunks))
        batch := chunks[i:end]

        texts := make([]string, len(batch))
        for j, chunk := range batch {
            texts[j] = chunk.Content
        }

        vectors, err := embedder.EmbedStrings(ctx, texts)
        if err != nil {
            return err
        }

        for j, vec := range vectors {
            batch[j].MetaData["embedding"] = vec
        }
    }

    // 4. 写入 ES
    indexer, _ := es.NewIndexer(ctx, &es.IndexerConfig{
        Addresses: []string{"http://localhost:9200"},
        Index:     "enterprise-kb",
    })

    _, err = indexer.Store(ctx, chunks)
    return err
}

在线问答

go
func answerQuestion(ctx context.Context, question string) (string, error) {
    // 1. 多路检索
    denseRetriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
        Addresses:   []string{"http://localhost:9200"},
        Index:       "enterprise-kb",
        TopK:        15,
        VectorField: "embedding",
    })

    sparseRetriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
        Addresses:  []string{"http://localhost:9200"},
        Index:      "enterprise-kb",
        TopK:       15,
        SearchMode: es.SearchModeBM25,
        TextField:  "content",
    })

    denseResults, _ := denseRetriever.Retrieve(ctx, question)
    sparseResults, _ := sparseRetriever.Retrieve(ctx, question)

    // 2. 融合排序
    merged := mergeByRRF(denseResults, sparseResults, 10)

    // 3. Reranker 精排
    reranker := reranker.NewCrossEncoderReranker(&reranker.CrossEncoderConfig{
        Model: "bge-reranker-v2-m3",
        TopN:  5,
    })
    finalDocs, _ := reranker.Rerank(ctx, question, merged)

    // 4. 拼装上下文
    var contextStr string
    for i, doc := range finalDocs {
        contextStr += fmt.Sprintf("[%d] %s\n\n", i+1, doc.Content)
    }

    // 5. 调用 LLM 生成答案
    prompt := fmt.Sprintf(`基于以下参考资料回答用户问题。如果资料中没有相关信息,请明确告知。

参考资料:
%s

用户问题:%s`, contextStr, question)

    model, _ := ark.NewChatModel(ctx, &ark.ChatModelConfig{
        EndpointID: os.Getenv("ARK_CHAT_ENDPOINT"),
    })

    resp, err := model.Generate(ctx, []*Message{
        {Role: RoleUser, Content: prompt},
    })
    if err != nil {
        return "", err
    }

    return resp.Content, nil
}

最佳实践

  1. 先 BM25 后语义:调试阶段先用 BM25 baseline 确认文档覆盖度,再叠加语义检索
  2. Chunk 大小实验:不同数据集最优切分大小不同,建议用评测集对比 256/512/800/1024 四档
  3. 保留元数据:每个 chunk 必须保留 sourcetitlepage 等溯源信息,方便问答时引用
  4. Embedding 模型选型:多语言场景选 text-embedding-3-large,纯中文可考虑 BGE-M3
  5. 监控检索质量:上线后持续收集 bad case,用 MRR/NDCG 指标量化检索效果

总结

Eino 的 RAG Pipeline 体系将复杂的知识检索增强生成分解为标准化的组件链路。通过 Document Loader → Transformer → Embedding → Indexer 完成离线索引构建,通过 Retriever → Reranker → ChatModel 完成在线问答,每个环节都有清晰的接口契约和可替换的实现。

关键的生产经验:切分策略决定了检索的天花板,Hybrid Search 和 Reranker 决定了检索的下限。投入精力优化这三个环节,往往比更换更贵的模型更有效。


相关资源