TL;DR
RAG (Retrieval-Augmented Generation) is the most pragmatic architecture pattern for production LLM applications—letting models answer questions based on retrieved real data rather than relying on parametric memory from training. Eino provides Go developers with a complete RAG Pipeline component system, from document ingestion to vector search to answer generation, with standard interfaces and production-grade implementations at every stage.
This article walks through the complete Eino RAG Pipeline implementation and explores production optimization strategies in depth.
This is the seventh article in the Eino Framework series. If you're not yet familiar with Eino's component system, we recommend reading the Core Components Guide first.
Table of Contents
- RAG Core Workflow Review
- Eino RAG Component Overview
- Document Loader: Multi-Source Ingestion
- Document Transformer: Chunking Strategies
- Embedding: Vectorization
- Indexer: Vector Storage
- Retriever: Semantic Search
- Reranker: Re-ranking Strategies
- Production Optimizations
- Practice: Enterprise Knowledge Base Q&A System
- Best Practices
- Summary
- Related Resources
RAG Core Workflow Review
RAG separates "retrieval" and "generation" into two stages. The core workflow can be summarized in three steps:
Index Phase (offline): Split raw documents, vectorize them, and write to the search engine.
Retrieve Phase (online): Vectorize the user's question and recall relevant document chunks from the index.
Generate Phase (online): Feed the retrieved context along with the question to the LLM for answer generation.
Eino RAG Component Overview
Eino abstracts each stage of the RAG Pipeline into independent Go interfaces:
| Component | Interface | Responsibility |
|---|---|---|
| Document Loader | DocumentLoader |
Load raw documents from various data sources |
| Document Transformer | DocumentTransformer |
Chunking, cleaning, metadata enrichment |
| Embedding | Embedding |
Text to vector conversion |
| Indexer | Indexer |
Write vectors to storage engines |
| Retriever | Retriever |
Retrieve relevant documents from vector stores |
The core design principle: interface abstraction + Option pattern. Upper-layer orchestration code depends only on interfaces, while underlying implementations (ElasticSearch vs. VikingDB, OpenAI Embedding vs. local models) can be swapped at any time.
Document Loader: Multi-Source Ingestion
Document Loader transforms raw data from various formats into a unified Document structure.
Interface Definition
type DocumentLoader interface {
Load(ctx context.Context, opts ...LoaderOption) ([]*Document, error)
}
type Document struct {
Content string
MetaData map[string]any
}
Built-in Implementations
Eino provides multiple out-of-the-box loaders:
import (
"github.com/cloudwego/eino-ext/components/document/loader/file"
"github.com/cloudwego/eino-ext/components/document/loader/web"
"github.com/cloudwego/eino-ext/components/document/loader/s3"
)
// Local file loading
fileLoader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{
Path: "./docs/product-manual.pdf",
})
// Web page loading
webLoader, _ := web.NewWebLoader(ctx, &web.WebLoaderConfig{
URLs: []string{
"https://docs.example.com/api-reference",
"https://docs.example.com/getting-started",
},
MaxDepth: 2,
})
// S3 object storage loading
s3Loader, _ := s3.NewS3Loader(ctx, &s3.S3LoaderConfig{
Bucket: "knowledge-base",
Prefix: "documents/",
Region: "us-east-1",
})
docs, err := fileLoader.Load(ctx)
Custom Loader
For specialized data sources (internal wikis, Confluence, etc.), simply implement the DocumentLoader interface:
type ConfluenceLoader struct {
client *confluence.Client
spaceKey string
}
func (l *ConfluenceLoader) Load(ctx context.Context, opts ...LoaderOption) ([]*Document, error) {
pages, err := l.client.GetSpacePages(ctx, l.spaceKey)
if err != nil {
return nil, err
}
docs := make([]*Document, 0, len(pages))
for _, page := range pages {
docs = append(docs, &Document{
Content: page.Body,
MetaData: map[string]any{
"source": page.URL,
"title": page.Title,
"updated_at": page.UpdatedAt,
},
})
}
return docs, nil
}
Document Transformer: Chunking Strategies
Document chunking is the critical bottleneck for RAG quality. Chunks that are too coarse introduce retrieval noise; chunks that are too fine lose contextual semantics.
Interface Definition
type DocumentTransformer interface {
Transform(ctx context.Context, docs []*Document, opts ...TransformerOption) ([]*Document, error)
}
Token-Based Splitter
import "github.com/cloudwego/eino-ext/components/document/transformer"
// Split by token count with overlap for context preservation
splitter, _ := transformer.NewTokenSplitter(ctx, &transformer.TokenSplitterConfig{
ChunkSize: 512, // Max 512 tokens per chunk
ChunkOverlap: 64, // 64 tokens overlap between adjacent chunks
ModelName: "gpt-4",
})
chunks, err := splitter.Transform(ctx, docs)
HTML Structure-Aware Splitting
For web-crawled HTML content, splitting by DOM structure preserves better semantic boundaries:
htmlSplitter, _ := transformer.NewHTMLSplitter(ctx, &transformer.HTMLSplitterConfig{
Headers: []string{"h1", "h2", "h3"},
ChunkSize: 1000,
})
Recursive Character Splitting
For plain text without clear structure, recursive splitting is the most universal strategy:
recursiveSplitter, _ := transformer.NewRecursiveCharSplitter(ctx, &transformer.RecursiveCharSplitterConfig{
ChunkSize: 800,
ChunkOverlap: 100,
Separators: []string{"\n\n", "\n", ". ", " "},
})
Recursive splitting attempts separators in priority order, preferring paragraph boundaries to maintain semantic integrity.
Embedding: Vectorization
The Embedding component converts text into high-dimensional vectors—the bridge between the "text world" and "vector space."
type Embedding interface {
EmbedStrings(ctx context.Context, texts []string, opts ...EmbeddingOption) ([][]float64, error)
}
OpenAI Embedding
import "github.com/cloudwego/eino-ext/components/embedding/openai"
embedder, _ := openai.NewEmbedding(ctx, &openai.EmbeddingConfig{
Model: "text-embedding-3-small",
APIKey: os.Getenv("OPENAI_API_KEY"),
})
vectors, err := embedder.EmbedStrings(ctx, []string{
"Eino is ByteDance's open-source Go AI application framework",
"RAG combines retrieval with generation to improve answer quality",
})
// vectors[0] => []float64{0.012, -0.034, ...} (1536 dimensions)
Ark Embedding (Volcano Engine)
import "github.com/cloudwego/eino-ext/components/embedding/ark"
arkEmbedder, _ := ark.NewEmbedding(ctx, &ark.EmbeddingConfig{
EndpointID: os.Getenv("ARK_ENDPOINT_ID"),
AK: os.Getenv("ARK_AK"),
SK: os.Getenv("ARK_SK"),
})
Batch Processing Optimization
When vectorizing large document collections in production, control batch size and concurrency:
const batchSize = 100
for i := 0; i < len(texts); i += batchSize {
end := min(i+batchSize, len(texts))
batch := texts[i:end]
vectors, err := embedder.EmbedStrings(ctx, batch)
if err != nil {
return fmt.Errorf("embedding batch %d failed: %w", i/batchSize, err)
}
// Write to Indexer...
}
Indexer: Vector Storage
The Indexer writes documents and their vectors to storage engines for subsequent retrieval.
type Indexer interface {
Store(ctx context.Context, docs []*Document, opts ...IndexerOption) ([]string, error)
}
ElasticSearch Indexer
import "github.com/cloudwego/eino-ext/components/indexer/es"
esIndexer, _ := es.NewIndexer(ctx, &es.IndexerConfig{
Addresses: []string{"http://localhost:9200"},
Index: "knowledge-base",
Username: os.Getenv("ES_USER"),
Password: os.Getenv("ES_PASS"),
})
ids, err := esIndexer.Store(ctx, chunks)
VikingDB Indexer
import "github.com/cloudwego/eino-ext/components/indexer/vikingdb"
vdbIndexer, _ := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{
Collection: "enterprise-kb",
Host: os.Getenv("VIKINGDB_HOST"),
AK: os.Getenv("VIKINGDB_AK"),
SK: os.Getenv("VIKINGDB_SK"),
})
Retriever: Semantic Search
The Retriever is the core of RAG's online phase—converting the user query into a vector and recalling the most relevant document chunks from the index.
type Retriever interface {
Retrieve(ctx context.Context, query string, opts ...RetrieverOption) ([]*Document, error)
}
Basic Semantic Retrieval
import "github.com/cloudwego/eino-ext/components/retriever/es"
retriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "knowledge-base",
TopK: 10,
VectorField: "embedding",
})
results, err := retriever.Retrieve(ctx, "How does Eino implement streaming output?")
for _, doc := range results {
fmt.Printf("Score: %.4f | %s\n", doc.MetaData["score"], doc.Content[:100])
}
Sparse Retrieval (BM25)
Beyond dense vector retrieval, Eino also supports keyword-based sparse retrieval:
bm25Retriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "knowledge-base",
TopK: 10,
SearchMode: es.SearchModeBM25,
TextField: "content",
})
Multi-Path Recall Fusion (Hybrid Search)
Production environments benefit from combining dense and sparse retrieval to compensate for each other's blind spots:
import "github.com/cloudwego/eino/compose"
// Build multi-path recall Graph
graph := compose.NewGraph[string, []*Document]()
graph.AddNode("dense_retriever", denseRetriever)
graph.AddNode("sparse_retriever", sparseRetriever)
graph.AddNode("merge_results", compose.NewLambda(mergeAndDeduplicate))
graph.AddEdge(compose.START, "dense_retriever")
graph.AddEdge(compose.START, "sparse_retriever")
graph.AddEdge("dense_retriever", "merge_results")
graph.AddEdge("sparse_retriever", "merge_results")
graph.AddEdge("merge_results", compose.END)
Reranker: Re-ranking Strategies
Documents recalled by the Retriever may contain noise. The Reranker performs secondary scoring through more fine-grained relevance computation.
ScoreReranker
Filter low-quality results based on a simple score threshold:
import "github.com/cloudwego/eino-ext/components/retriever/reranker"
scoreReranker := reranker.NewScoreReranker(&reranker.ScoreRerankerConfig{
MinScore: 0.7,
TopN: 5,
})
rerankedDocs, err := scoreReranker.Rerank(ctx, query, retrievedDocs)
Cross-Encoder Reranker
Use a Cross-Encoder model (e.g., bge-reranker) to compute fine-grained query-document pair relevance:
crossReranker := reranker.NewCrossEncoderReranker(&reranker.CrossEncoderConfig{
Model: "bge-reranker-v2-m3",
Endpoint: os.Getenv("RERANKER_ENDPOINT"),
TopN: 5,
})
// Cross-Encoder concatenates query with each doc and scores through the model
// More precise than bi-encoder cosine similarity, but higher compute cost
finalDocs, err := crossReranker.Rerank(ctx, query, retrievedDocs)
Selection Guide
| Approach | Precision | Latency | Use Case |
|---|---|---|---|
| ScoreReranker | Low | <1ms | Latency-critical scenarios |
| Cross-Encoder | High | 50-200ms | High quality requirements, candidates <50 |
| Two-stage combo | High | Moderate | Score filter to top-20, then Cross-Encoder to top-5 |
Production Optimizations
1. Chunking Strategy Impact on Retrieval Quality
Different document types require different chunking strategies:
| Document Type | Recommended Strategy | Chunk Size | Overlap |
|---|---|---|---|
| API docs | HTML structure splitting | 800-1200 | 100 |
| Technical blogs | Recursive character splitting | 500-800 | 80 |
| FAQ | Split by QA pairs | Unlimited | 0 |
| Code files | Split by function/class | Unlimited | 0 |
2. Hybrid Search
Pure semantic search has blind spots in certain scenarios:
- When users search for error codes like "ERR_CONNECTION_REFUSED", dense vectors may not achieve exact matches
- Exact product names, model numbers, and other keywords need precise matching
Production recommendation: Dense retrieval at 70% weight + sparse retrieval at 30% weight, merged via RRF (Reciprocal Rank Fusion):
func mergeByRRF(denseResults, sparseResults []*Document, k int) []*Document {
scores := make(map[string]float64)
const rrfK = 60
for rank, doc := range denseResults {
id := doc.MetaData["id"].(string)
scores[id] += 0.7 / float64(rrfK+rank+1)
}
for rank, doc := range sparseResults {
id := doc.MetaData["id"].(string)
scores[id] += 0.3 / float64(rrfK+rank+1)
}
// Sort by fused score and return top-k
return sortByScoreAndTruncate(scores, k)
}
3. Caching and Incremental Indexing
In production, full index rebuilds are expensive. Here's an incremental strategy:
type IncrementalIndexer struct {
indexer Indexer
embedding Embedding
cache *redis.Client
}
func (i *IncrementalIndexer) IndexIfChanged(ctx context.Context, doc *Document) error {
// Compute content hash
hash := sha256.Sum256([]byte(doc.Content))
cacheKey := fmt.Sprintf("doc:hash:%s", doc.MetaData["id"])
// Check for changes
cached, _ := i.cache.Get(ctx, cacheKey).Result()
if cached == hex.EncodeToString(hash[:]) {
return nil // Unchanged, skip
}
// Vectorize and write to index
vectors, err := i.embedding.EmbedStrings(ctx, []string{doc.Content})
if err != nil {
return err
}
doc.MetaData["embedding"] = vectors[0]
_, err = i.indexer.Store(ctx, []*Document{doc})
if err != nil {
return err
}
// Update cache
i.cache.Set(ctx, cacheKey, hex.EncodeToString(hash[:]), 0)
return nil
}
Practice: Enterprise Knowledge Base Q&A System
Let's build a complete enterprise knowledge base Q&A system with Eino, connecting all RAG components:
System Architecture
Index Construction
package main
import (
"context"
"log"
"github.com/cloudwego/eino-ext/components/document/loader/file"
"github.com/cloudwego/eino-ext/components/document/transformer"
embeddingOAI "github.com/cloudwego/eino-ext/components/embedding/openai"
"github.com/cloudwego/eino-ext/components/indexer/es"
)
func buildIndex(ctx context.Context) error {
// 1. Load documents
loader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{
Path: "./knowledge-base/",
Glob: "**/*.md",
})
docs, err := loader.Load(ctx)
if err != nil {
return err
}
log.Printf("Loaded %d documents", len(docs))
// 2. Chunk documents
splitter, _ := transformer.NewRecursiveCharSplitter(ctx, &transformer.RecursiveCharSplitterConfig{
ChunkSize: 600,
ChunkOverlap: 80,
Separators: []string{"\n\n", "\n", ". ", " "},
})
chunks, err := splitter.Transform(ctx, docs)
if err != nil {
return err
}
log.Printf("Split into %d chunks", len(chunks))
// 3. Vectorize
embedder, _ := embeddingOAI.NewEmbedding(ctx, &embeddingOAI.EmbeddingConfig{
Model: "text-embedding-3-small",
})
// Process in batches
const batchSize = 50
for i := 0; i < len(chunks); i += batchSize {
end := min(i+batchSize, len(chunks))
batch := chunks[i:end]
texts := make([]string, len(batch))
for j, chunk := range batch {
texts[j] = chunk.Content
}
vectors, err := embedder.EmbedStrings(ctx, texts)
if err != nil {
return err
}
for j, vec := range vectors {
batch[j].MetaData["embedding"] = vec
}
}
// 4. Write to ES
indexer, _ := es.NewIndexer(ctx, &es.IndexerConfig{
Addresses: []string{"http://localhost:9200"},
Index: "enterprise-kb",
})
_, err = indexer.Store(ctx, chunks)
return err
}
Online Q&A
func answerQuestion(ctx context.Context, question string) (string, error) {
// 1. Multi-path retrieval
denseRetriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "enterprise-kb",
TopK: 15,
VectorField: "embedding",
})
sparseRetriever, _ := es.NewRetriever(ctx, &es.RetrieverConfig{
Addresses: []string{"http://localhost:9200"},
Index: "enterprise-kb",
TopK: 15,
SearchMode: es.SearchModeBM25,
TextField: "content",
})
denseResults, _ := denseRetriever.Retrieve(ctx, question)
sparseResults, _ := sparseRetriever.Retrieve(ctx, question)
// 2. Merge rankings
merged := mergeByRRF(denseResults, sparseResults, 10)
// 3. Reranker
reranker := reranker.NewCrossEncoderReranker(&reranker.CrossEncoderConfig{
Model: "bge-reranker-v2-m3",
TopN: 5,
})
finalDocs, _ := reranker.Rerank(ctx, question, merged)
// 4. Assemble context
var contextStr string
for i, doc := range finalDocs {
contextStr += fmt.Sprintf("[%d] %s\n\n", i+1, doc.Content)
}
// 5. Generate answer with LLM
prompt := fmt.Sprintf(`Answer the user's question based on the following reference materials. If the materials don't contain relevant information, explicitly say so.
Reference Materials:
%s
User Question: %s`, contextStr, question)
model, _ := ark.NewChatModel(ctx, &ark.ChatModelConfig{
EndpointID: os.Getenv("ARK_CHAT_ENDPOINT"),
})
resp, err := model.Generate(ctx, []*Message{
{Role: RoleUser, Content: prompt},
})
if err != nil {
return "", err
}
return resp.Content, nil
}
Best Practices
- Start with BM25 baseline: During development, verify document coverage with BM25 first, then layer on semantic retrieval
- Experiment with chunk sizes: Different datasets have different optimal sizes—benchmark 256/512/800/1024 with an evaluation set
- Preserve metadata: Every chunk must retain
source,title,pagetraceability information for citation in answers - Embedding model selection: For multilingual scenarios use
text-embedding-3-large; for Chinese-only consider BGE-M3 - Monitor retrieval quality: Continuously collect bad cases post-launch, quantify retrieval effectiveness with MRR/NDCG metrics
Summary
Eino's RAG Pipeline system decomposes the complex retrieval-augmented generation workflow into standardized component chains. The offline indexing path (Document Loader → Transformer → Embedding → Indexer) and online Q&A path (Retriever → Reranker → ChatModel) each have clear interface contracts with swappable implementations.
The key production insight: chunking strategy determines the retrieval ceiling, while Hybrid Search and Reranker determine the retrieval floor. Investing effort in optimizing these three aspects often yields better returns than switching to a more expensive model.