TL;DR

LLM outputs are inherently streaming — models generate tokens one at a time, not as a complete response. How do you efficiently propagate stream data through an AI Agent orchestration pipeline? How do you inject logging, tracing, and monitoring without coupling them to business logic? Eino solves both with StreamReader/StreamWriter primitives and a Callback aspect system. This article dissects these two core mechanisms from low-level primitives to production-grade observability.


Table of Contents

  1. Key Takeaways
  2. Why Streaming Matters
  3. Eino Streaming Paradigm
  4. Component Streaming Matrix
  5. Streaming in Orchestration
  6. Callback Aspect System
  7. Production Observability
  8. Practice: Full-Chain Logging and Latency Tracing
  9. Best Practices
  10. FAQ
  11. Summary
  12. Related Resources

Key Takeaways

  • Dual invocation modes: Every Eino component supports both Invoke (complete response) and Stream (streaming via StreamReader)
  • StreamReader/StreamWriter: Type-safe generic streaming primitives that form the foundation of Eino's stream architecture
  • Automatic stream conversion: The orchestration engine handles concatenation, splitting, and type conversion between nodes transparently
  • Four callback hooks: OnStart / OnEnd / OnStartWithStream / OnEndWithStream cover the entire execution lifecycle
  • Scope control: Three granularity levels for callback injection — global, per-type, and per-node
  • One-click OTel integration: Built-in OpenTelemetry handler automatically generates Trace Spans for every node

Why Streaming Matters

In traditional API call patterns, clients must wait for the LLM to generate all tokens before receiving a response. For a 500-token answer, this means users may wait 3-5 seconds before seeing the first character. Streaming addresses this from three dimensions:

Dimension Non-Streaming Streaming
Time to first token 3-5s (returns after full generation) 100-300ms (first token returned immediately)
Memory usage Must buffer complete response Process chunk by chunk, constant memory
User experience Long wait with no feedback Real-time typewriter effect

In Agentic Workflows, streaming value is amplified further: the Agent's Chain of Thought can be displayed in real-time, making the interaction process transparent and trustworthy.

Beyond UX benefits, streaming is essential for complex multi-step Agent architectures. When an Agent decides to call a tool, the streaming pipeline allows the orchestration engine to begin processing tool results before the full LLM response finishes generating. This pipelining reduces end-to-end latency significantly in workflows where multiple LLM calls are chained together.


Eino Streaming Paradigm

Invoke vs Stream

Eino defines two invocation interfaces for every component. The Invoke mode follows a request-response pattern familiar from traditional RPC — you call a function and block until the complete result is ready. The Stream mode returns a StreamReader immediately and delivers results incrementally. Both modes share the same underlying component logic; the framework handles the conversion between them transparently:

go
// Invoke: wait for the complete result
result, err := chatModel.Generate(ctx, messages)

// Stream: returns a StreamReader for chunk-by-chunk reading
streamReader, err := chatModel.Stream(ctx, messages)
for {
    chunk, err := streamReader.Recv()
    if err == io.EOF {
        break
    }
    fmt.Print(chunk.Content)
}

StreamReader and StreamWriter

StreamReader[T] and StreamWriter[T] are the core primitives of Eino's streaming system. They leverage Go generics for type-safe stream communication:

go
// Create a stream pair
reader, writer := schema.Pipe[*schema.Message](bufferSize)

// Writer side (typically inside the component)
go func() {
    defer writer.Close()
    for _, chunk := range chunks {
        if closed := writer.Send(chunk, nil); closed {
            return
        }
    }
}()

// Reader side (consumed by downstream)
for {
    msg, err := reader.Recv()
    if err == io.EOF {
        break
    }
    process(msg)
}

Key design decisions:

  • Type safety: Generic parameter [T] ensures compile-time type checking
  • Backpressure: bufferSize controls buffer capacity, preventing producer from running too far ahead
  • Graceful shutdown: Close() signals stream completion; Recv() returns io.EOF

Component Streaming Matrix

Different components have varying levels of stream support. Eino's orchestration engine automatically adapts to these differences:

Component Type Invoke Input Stream Input Invoke Output Stream Output
ChatModel
PromptTemplate
Retriever
ToolNode
Lambda Optional Optional
Transformer

Key insight: ChatModel is the only core component that natively supports Stream output. The orchestration engine's stream passthrough capability ensures that even when intermediate nodes don't support streaming, end users still get a streaming experience.


Streaming in Orchestration

When you call an orchestration Graph in Stream mode, the Eino engine automatically handles three types of stream conversions between nodes:

graph LR A["ChatModel (Stream Output)"] -->|"StreamReader[Message]"| B{"Downstream Type?"} B -->|"Supports Stream Input"| C["Transformer (Direct Passthrough)"] B -->|"Invoke Only"| D["Auto Concat"] D --> E["Lambda (Complete Value)"] A -->|"Fork"| F["Callback Observer"] A -->|"Fork"| G["Another Consumer"]

Three Automatic Conversions

1. Stream Concatenation

When a downstream node only accepts complete values, the engine automatically collects all stream fragments and merges them:

go
// Engine internal equivalent logic
fullMessage := concatenate(streamReader) // collect all chunks
nextNode.Invoke(ctx, fullMessage)         // pass complete value

2. Stream Fork/Split

When a single stream needs to be consumed by multiple downstream nodes, the engine automatically forks the stream:

go
// One StreamReader duplicated into multiple copies
readers := stream.Split(originalReader, consumerCount)
// Each consumer independently receives the complete stream data

3. Stream Merge

When multiple upstream streams need to converge into a single downstream node, the engine merges them by completion order:

go
// Multiple upstream streams merged into one
mergedReader := stream.Merge(reader1, reader2, reader3)

Callback Aspect System

Why Callbacks

In production environments, you need to monitor every step of Agent execution: record inputs/outputs, measure latency, track exceptions. These are classic cross-cutting concerns — functionality that spans multiple modules but doesn't belong in any single one. If you scatter logging and tracing code throughout your business logic, you get tight coupling, code duplication, and maintenance nightmares.

Eino's Callback system provides an AOP (Aspect-Oriented Programming) style solution. Similar to middleware in web frameworks or interceptors in gRPC, callbacks let you define behavior that runs before and after every node execution without modifying the node's core logic. The key difference from traditional middleware is that Eino's callbacks are stream-aware — they understand both batch and streaming execution patterns.

Four-Phase Hooks

When each node executes, the Eino engine triggers four hooks in sequence:

Hook Trigger Point Parameters
OnStart Before node begins ctx, RunInfo, CallbackInput
OnStartWithStream Node begins with stream input ctx, RunInfo, StreamReader
OnEnd After node completes ctx, RunInfo, CallbackOutput
OnEndWithStream Node completes with stream output ctx, RunInfo, StreamReader

Building with HandlerBuilder

go
handler := callbacks.NewHandlerBuilder().
    OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
        log.Printf("[%s] execution started, component: %s, input: %v",
            info.Name, info.Type, input)
        return ctx
    }).
    OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
        log.Printf("[%s] execution completed, component: %s, output: %v",
            info.Name, info.Type, output)
        return ctx
    }).
    OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
        sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
        log.Printf("[%s] stream output started, component: %s", info.Name, info.Type)
        return ctx
    }).
    Build()

Scope Control

Callbacks support three granularity levels of scope control:

go
// 1. Global: applies to all nodes in the Graph
result, err := compiledGraph.Invoke(ctx, input,
    compose.WithCallbacks(handler))

// 2. Per component type: applies only to specific component types
result, err := compiledGraph.Invoke(ctx, input,
    compose.WithCallbacks(handler).DesignateType(components.ComponentOfChatModel))

// 3. Per node name: applies only to the designated node
result, err := compiledGraph.Invoke(ctx, input,
    compose.WithCallbacks(handler).DesignateNode("llm_node"))

This layered design enables you to:

  • Inject OTel tracing globally (affects all nodes)
  • Record token usage only for ChatModel nodes
  • Add alerting logic only for specific critical nodes

Production Observability

OpenTelemetry Integration

Eino includes a built-in OpenTelemetry Callback Handler that automatically maps each node's execution to an OTel Span:

sequenceDiagram participant Client participant Graph as "CompiledGraph" participant Prompt as "PromptTemplate" participant LLM as "ChatModel" participant Tool as "ToolNode" participant OTel as "OTel Collector" Client->>Graph: Stream(ctx, input) Graph->>OTel: StartSpan("graph.execute") Graph->>Prompt: OnStart → Span("prompt_node") Prompt->>Graph: OnEnd Graph->>LLM: OnStart → Span("llm_node") LLM-->>Graph: OnEndWithStream (streaming output) Graph->>Tool: OnStart → Span("tool_node") Tool->>Graph: OnEnd Graph->>OTel: EndSpan (all complete) Graph-->>Client: StreamReader

Configuring the OTel Handler

go
import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    einootel "github.com/cloudwego/eino/callbacks/otel"
)

func initTracer() func() {
    exporter, _ := otlptracehttp.New(ctx,
        otlptracehttp.WithEndpoint("otel-collector:4318"),
    )
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceName("my-agent-service"),
        )),
    )
    otel.SetTracerProvider(tp)
    return func() { tp.Shutdown(ctx) }
}

// Create OTel Callback Handler
otelHandler := einootel.NewHandler()

// Inject into Graph execution
result, err := graph.Invoke(ctx, input,
    compose.WithCallbacks(otelHandler))

Each execution generates Spans containing:

  • Node name and component type
  • Input/output summaries
  • Execution duration
  • Error information (if any)

Practice: Full-Chain Logging and Latency Tracing

Here's a complete example showing how to use Callbacks for both logging and latency tracking simultaneously:

go
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    "github.com/cloudwego/eino/callbacks"
    "github.com/cloudwego/eino/compose"
    "github.com/cloudwego/eino/components/model"
    "github.com/cloudwego/eino/schema"
)

// Build a Callback Handler with latency tracing
func buildTracingHandler() callbacks.Handler {
    return callbacks.NewHandlerBuilder().
        OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
            start := time.Now()
            ctx = context.WithValue(ctx, "start_"+info.Name, start)
            log.Printf("[TRACE] ▶ Node=%s Type=%s started", info.Name, info.Type)
            return ctx
        }).
        OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
            if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
                duration := time.Since(start)
                log.Printf("[TRACE] ◀ Node=%s Type=%s duration=%v",
                    info.Name, info.Type, duration)
            }
            return ctx
        }).
        OnEndWithStreamFn(func(ctx context.Context, info *callbacks.RunInfo,
            sr *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
            if start, ok := ctx.Value("start_" + info.Name).(time.Time); ok {
                ttft := time.Since(start)
                log.Printf("[TRACE] ⇥ Node=%s first_chunk_latency=%v", info.Name, ttft)
            }
            return ctx
        }).
        Build()
}

func main() {
    ctx := context.Background()

    // Build Graph (refer to the previous article for orchestration setup)
    g := compose.NewGraph[string, string]()

    // ... add nodes and edges ...

    compiled, err := g.Compile(ctx)
    if err != nil {
        log.Fatal(err)
    }

    // Inject tracing handler
    handler := buildTracingHandler()

    // Stream execution
    sr, err := compiled.Stream(ctx, "Hello, please analyze this code for performance issues",
        compose.WithCallbacks(handler))
    if err != nil {
        log.Fatal(err)
    }

    // Consume streaming output
    for {
        chunk, err := sr.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        fmt.Print(chunk)
    }
    fmt.Println()
}

Example output:

code
[TRACE] ▶ Node=prompt_template Type=PromptTemplate started
[TRACE] ◀ Node=prompt_template Type=PromptTemplate duration=1.2ms
[TRACE] ▶ Node=llm Type=ChatModel started
[TRACE] ⇥ Node=llm first_chunk_latency=156ms
[TRACE] ▶ Node=output_parser Type=Lambda started
[TRACE] ◀ Node=output_parser Type=Lambda duration=0.3ms

Best Practices

Streaming:

  • Always use Stream mode for user-facing interfaces to maximize responsiveness
  • For intermediate processing nodes, let the engine handle conversions automatically if you don't need to observe intermediate stream data
  • Set appropriate bufferSize — too large wastes memory, too small causes frequent backpressure

Callback Design:

  • Register universal observability (OTel, logging) as global Callbacks
  • Scope business-specific monitoring (token counting, cost tracking) to specific component types
  • Avoid blocking operations in Callbacks — use async channels to send data to background goroutines
  • Leverage context.Context to pass state between OnStart/OnEnd (e.g., timers)

Production Deployment:


FAQ

Q: How do you handle network interruptions in streaming mode?

A: StreamReader.Recv() will return an error (not io.EOF). We recommend implementing fallback handling for non-EOF errors in your consumption loop — log the breakpoint position for retry logic or display a user-friendly notification.

Q: Do Callbacks impact main flow performance?

A: Callbacks execute synchronously in the same goroutine. If your callback logic is heavy (e.g., writing to a database), push data into an async queue inside the callback to avoid blocking the main flow.

Q: Can you register multiple Callback Handlers simultaneously?

A: Yes. Multiple handlers execute in registration order. OnStart runs in forward order, OnEnd runs in reverse order — similar to middleware onion model.

Q: Can stream Callbacks modify data in the stream?

A: OnEndWithStream receives a copy of the stream (implemented via Fork). Consuming this copy doesn't affect the original stream received by downstream nodes. This guarantees pure observation semantics for Callbacks.


Summary

Streaming and the Callback aspect system represent Eino's critical leap from "functional" to "production-ready." These two mechanisms solve fundamentally different problems — streaming addresses latency and user experience, while callbacks address observability and operational excellence — but they work together seamlessly within Eino's orchestration engine.

StreamReader/StreamWriter primitives provide type-safe stream abstractions for orchestration, enabling Go's compile-time safety guarantees to extend into the streaming domain. The engine's automatic stream conversions (concatenation, splitting, merging) shield developers from protocol differences between components, letting you focus on business logic rather than plumbing.

The Callback system achieves full-chain observability without invading business code. Its three-level scope control (global, per-type, per-node) gives you surgical precision in applying monitoring behaviors, while the stream-aware hooks ensure that streaming execution is just as observable as batch execution.

With these two mechanisms mastered, you can:

  • Deliver real-time streaming response experiences to users
  • Track latency and exceptions at every execution step in production
  • Build complete distributed tracing pipelines with OpenTelemetry
  • Gain deep visibility into your Agent's decision-making process

In the next article, we'll move into Eino DevKit: Build Your First Agent, applying the streaming and callback mechanisms from this article to build a real Agent.