TL;DR
LLM outputs are inherently streaming — models generate tokens one at a time, not as a complete response. How do you efficiently propagate stream data through an AI Agent orchestration pipeline? How do you inject logging, tracing, and monitoring without coupling them to business logic? Eino solves both with StreamReader/StreamWriter primitives and a Callback aspect system. This article dissects these two core mechanisms from low-level primitives to production-grade observability.
Table of Contents
- Key Takeaways
- Why Streaming Matters
- Eino Streaming Paradigm
- Component Streaming Matrix
- Streaming in Orchestration
- Callback Aspect System
- Production Observability
- Practice: Full-Chain Logging and Latency Tracing
- Best Practices
- FAQ
- Summary
- Related Resources
Key Takeaways
- Dual invocation modes: Every Eino component supports both
Invoke(complete response) andStream(streaming via StreamReader) - StreamReader/StreamWriter: Type-safe generic streaming primitives that form the foundation of Eino's stream architecture
- Automatic stream conversion: The orchestration engine handles concatenation, splitting, and type conversion between nodes transparently
- Four callback hooks: OnStart / OnEnd / OnStartWithStream / OnEndWithStream cover the entire execution lifecycle
- Scope control: Three granularity levels for callback injection — global, per-type, and per-node
- One-click OTel integration: Built-in OpenTelemetry handler automatically generates Trace Spans for every node
Why Streaming Matters
In traditional API call patterns, clients must wait for the LLM to generate all tokens before receiving a response. For a 500-token answer, this means users may wait 3-5 seconds before seeing the first character. Streaming addresses this from three dimensions:
| Dimension | Non-Streaming | Streaming |
|---|---|---|
| Time to first token | 3-5s (returns after full generation) | 100-300ms (first token returned immediately) |
| Memory usage | Must buffer complete response | Process chunk by chunk, constant memory |
| User experience | Long wait with no feedback | Real-time typewriter effect |
In Agentic Workflows, streaming value is amplified further: the Agent's Chain of Thought can be displayed in real-time, making the interaction process transparent and trustworthy.
Beyond UX benefits, streaming is essential for complex multi-step Agent architectures. When an Agent decides to call a tool, the streaming pipeline allows the orchestration engine to begin processing tool results before the full LLM response finishes generating. This pipelining reduces end-to-end latency significantly in workflows where multiple LLM calls are chained together.
Eino Streaming Paradigm
Invoke vs Stream
Eino defines two invocation interfaces for every component. The Invoke mode follows a request-response pattern familiar from traditional RPC — you call a function and block until the complete result is ready. The Stream mode returns a StreamReader immediately and delivers results incrementally. Both modes share the same underlying component logic; the framework handles the conversion between them transparently:
// Invoke: wait for the complete result
result, err := chatModel.Generate(ctx, messages)
// Stream: returns a StreamReader for chunk-by-chunk reading
streamReader, err := chatModel.Stream(ctx, messages)
for {
chunk, err := streamReader.Recv()
if err == io.EOF {
break
}
fmt.Print(chunk.Content)
}
StreamReader and StreamWriter
StreamReader[T] and StreamWriter[T] are the core primitives of Eino's streaming system. They leverage Go generics for type-safe stream communication:
// Create a stream pair
reader, writer := schema.Pipe[*schema.Message](bufferSize)
// Writer side (typically inside the component)
go func() {
defer writer.Close()
for _, chunk := range chunks {
if closed := writer.Send(chunk, nil); closed {
return
}
}
}()
// Reader side (consumed by downstream)
for {
msg, err := reader.Recv()
if err == io.EOF {
break
}
process(msg)
}
Key design decisions:
- Type safety: Generic parameter
[T]ensures compile-time type checking - Backpressure:
bufferSizecontrols buffer capacity, preventing producer from running too far ahead - Graceful shutdown:
Close()signals stream completion;Recv()returnsio.EOF
Component Streaming Matrix
Different components have varying levels of stream support. Eino's orchestration engine automatically adapts to these differences:
| Component Type | Invoke Input | Stream Input | Invoke Output | Stream Output |
|---|---|---|---|---|
| ChatModel | ✅ | ❌ | ✅ | ✅ |
| PromptTemplate | ✅ | ❌ | ✅ | ❌ |
| Retriever | ✅ | ❌ | ✅ | ❌ |
| ToolNode | ✅ | ❌ | ✅ | ❌ |
| Lambda | ✅ | Optional | ✅ | Optional |
| Transformer | ✅ | ✅ | ✅ | ✅ |
Key insight: ChatModel is the only core component that natively supports Stream output. The orchestration engine's stream passthrough capability ensures that even when intermediate nodes don't support streaming, end users still get a streaming experience.
Streaming in Orchestration
When you call an orchestration Graph in Stream mode, the Eino engine automatically handles three types of stream conversions between nodes:
Three Automatic Conversions
1. Stream Concatenation
When a downstream node only accepts complete values, the engine automatically collects all stream fragments and merges them:
// Engine internal equivalent logic
fullMessage := concatenate(streamReader) // collect all chunks
nextNode.Invoke(ctx, fullMessage) // pass complete value
2. Stream Fork/Split
When a single stream needs to be consumed by multiple downstream nodes, the engine automatically forks the stream:
// One StreamReader duplicated into multiple copies
readers := stream.Split(originalReader, consumerCount)
// Each consumer independently receives the complete stream data
3. Stream Merge
When multiple upstream streams need to converge into a single downstream node, the engine merges them by completion order:
// Multiple upstream streams merged into one
mergedReader := stream.Merge(reader1, reader2, reader3)
Callback Aspect System
Why Callbacks
In production environments, you need to monitor every step of Agent execution: record inputs/outputs, measure latency, track exceptions. These are classic cross-cutting concerns — functionality that spans multiple modules but doesn't belong in any single one. If you scatter logging and tracing code throughout your business logic, you get tight coupling, code duplication, and maintenance nightmares.
Eino's Callback system provides an AOP (Aspect-Oriented Programming) style solution. Similar to middleware in web frameworks or interceptors in gRPC, callbacks let you define behavior that runs before and after every node execution without modifying the node's core logic. The key difference from traditional middleware is that Eino's callbacks are stream-aware — they understand both batch and streaming execution patterns.
Four-Phase Hooks
When each node executes, the Eino engine triggers four hooks in sequence:
| Hook | Trigger Point | Parameters |
|---|---|---|
OnStart |
Before node begins | ctx, RunInfo, CallbackInput |
OnStartWithStream |
Node begins with stream input | ctx, RunInfo, StreamReader |
OnEnd |
After node completes | ctx, RunInfo, CallbackOutput |
OnEndWithStream |
Node completes with stream output | ctx, RunInfo, StreamReader |
Building with HandlerBuilder
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[%s] execution started, component: %s, input: %v",
info.Name, info.Type, input)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("[%s] execution completed, component: %s, output: %v",
info.Name, info.Type, output)
return ctx
}).
OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
log.Printf("[%s] stream output started, component: %s", info.Name, info.Type)
return ctx
}).
Build()
Scope Control
Callbacks support three granularity levels of scope control:
// 1. Global: applies to all nodes in the Graph
result, err := compiledGraph.Invoke(ctx, input,
compose.WithCallbacks(handler))
// 2. Per component type: applies only to specific component types
result, err := compiledGraph.Invoke(ctx, input,
compose.WithCallbacks(handler).DesignateType(components.ComponentOfChatModel))
// 3. Per node name: applies only to the designated node
result, err := compiledGraph.Invoke(ctx, input,
compose.WithCallbacks(handler).DesignateNode("llm_node"))
This layered design enables you to:
- Inject OTel tracing globally (affects all nodes)
- Record token usage only for ChatModel nodes
- Add alerting logic only for specific critical nodes
Production Observability
OpenTelemetry Integration
Eino includes a built-in OpenTelemetry Callback Handler that automatically maps each node's execution to an OTel Span:
Configuring the OTel Handler
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
einootel "github.com/cloudwego/eino/callbacks/otel"
)
func initTracer() func() {
exporter, _ := otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint("otel-collector:4318"),
)
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-agent-service"),
)),
)
otel.SetTracerProvider(tp)
return func() { tp.Shutdown(ctx) }
}
// Create OTel Callback Handler
otelHandler := einootel.NewHandler()
// Inject into Graph execution
result, err := graph.Invoke(ctx, input,
compose.WithCallbacks(otelHandler))
Each execution generates Spans containing:
- Node name and component type
- Input/output summaries
- Execution duration
- Error information (if any)
Practice: Full-Chain Logging and Latency Tracing
Here's a complete example showing how to use Callbacks for both logging and latency tracking simultaneously:
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
)
// Build a Callback Handler with latency tracing
func buildTracingHandler() callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
start := time.Now()
ctx = context.WithValue(ctx, "start_"+info.Name, start)
log.Printf("[TRACE] ▶ Node=%s Type=%s started", info.Name, info.Type)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
duration := time.Since(start)
log.Printf("[TRACE] ◀ Node=%s Type=%s duration=%v",
info.Name, info.Type, duration)
}
return ctx
}).
OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
ttft := time.Since(start)
log.Printf("[TRACE] ⇥ Node=%s first_chunk_latency=%v", info.Name, ttft)
}
return ctx
}).
Build()
}
func main() {
ctx := context.Background()
// Build Graph (refer to the previous article for orchestration setup)
g := compose.NewGraph[string, string]()
// ... add nodes and edges ...
compiled, err := g.Compile(ctx)
if err != nil {
log.Fatal(err)
}
// Inject tracing handler
handler := buildTracingHandler()
// Stream execution
sr, err := compiled.Stream(ctx, "Hello, please analyze this code for performance issues",
compose.WithCallbacks(handler))
if err != nil {
log.Fatal(err)
}
// Consume streaming output
for {
chunk, err := sr.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Print(chunk)
}
fmt.Println()
}
Example output:
[TRACE] ▶ Node=prompt_template Type=PromptTemplate started
[TRACE] ◀ Node=prompt_template Type=PromptTemplate duration=1.2ms
[TRACE] ▶ Node=llm Type=ChatModel started
[TRACE] ⇥ Node=llm first_chunk_latency=156ms
[TRACE] ▶ Node=output_parser Type=Lambda started
[TRACE] ◀ Node=output_parser Type=Lambda duration=0.3ms
Best Practices
Streaming:
- Always use
Streammode for user-facing interfaces to maximize responsiveness - For intermediate processing nodes, let the engine handle conversions automatically if you don't need to observe intermediate stream data
- Set appropriate
bufferSize— too large wastes memory, too small causes frequent backpressure
Callback Design:
- Register universal observability (OTel, logging) as global Callbacks
- Scope business-specific monitoring (token counting, cost tracking) to specific component types
- Avoid blocking operations in Callbacks — use async channels to send data to background goroutines
- Leverage
context.Contextto pass state between OnStart/OnEnd (e.g., timers)
Production Deployment:
- Use the JSON Formatter tool to make OTel JSON trace output more readable
- Use JSON to Go Struct to quickly structure Callback output types
FAQ
Q: How do you handle network interruptions in streaming mode?
A: StreamReader.Recv() will return an error (not io.EOF). We recommend implementing fallback handling for non-EOF errors in your consumption loop — log the breakpoint position for retry logic or display a user-friendly notification.
Q: Do Callbacks impact main flow performance?
A: Callbacks execute synchronously in the same goroutine. If your callback logic is heavy (e.g., writing to a database), push data into an async queue inside the callback to avoid blocking the main flow.
Q: Can you register multiple Callback Handlers simultaneously?
A: Yes. Multiple handlers execute in registration order. OnStart runs in forward order, OnEnd runs in reverse order — similar to middleware onion model.
Q: Can stream Callbacks modify data in the stream?
A: OnEndWithStream receives a copy of the stream (implemented via Fork). Consuming this copy doesn't affect the original stream received by downstream nodes. This guarantees pure observation semantics for Callbacks.
Summary
Streaming and the Callback aspect system represent Eino's critical leap from "functional" to "production-ready." These two mechanisms solve fundamentally different problems — streaming addresses latency and user experience, while callbacks address observability and operational excellence — but they work together seamlessly within Eino's orchestration engine.
StreamReader/StreamWriter primitives provide type-safe stream abstractions for orchestration, enabling Go's compile-time safety guarantees to extend into the streaming domain. The engine's automatic stream conversions (concatenation, splitting, merging) shield developers from protocol differences between components, letting you focus on business logic rather than plumbing.
The Callback system achieves full-chain observability without invading business code. Its three-level scope control (global, per-type, per-node) gives you surgical precision in applying monitoring behaviors, while the stream-aware hooks ensure that streaming execution is just as observable as batch execution.
With these two mechanisms mastered, you can:
- Deliver real-time streaming response experiences to users
- Track latency and exceptions at every execution step in production
- Build complete distributed tracing pipelines with OpenTelemetry
- Gain deep visibility into your Agent's decision-making process
In the next article, we'll move into Eino DevKit: Build Your First Agent, applying the streaming and callback mechanisms from this article to build a real Agent.
Related Resources
- Previous in series: Eino Orchestration Engine: Chain, Graph, and Workflow in Practice
- Next in series: Eino DevKit: Build Your First Agent
- Glossary: AI Agent | Agentic Workflow
- Tools: JSON Formatter | JSON to Go Struct
- Official Docs: Eino GitHub Repository | OpenTelemetry Go SDK