TL;DR: Go的goroutine+channel模型天然适配SSE事件流,net/http标准库原生支持http.Flusher接口。本文从MCP协议SSE传输规范入手,用约300行Go代码从零实现一个完整的SSE Transport——涵盖连接管理、JSON-RPC路由、心跳保活与优雅关闭,最终集成为可部署的MCP Server。

Key Takeaways

  • 双通道架构:MCP SSE Transport通过GET /sse建立Server→Client推送通道,通过POST /message接收Client→Server的JSON-RPC请求,两个HTTP端点协同完成双向通信
  • Go天然适配:每个SSE连接由一个goroutine维护,channel作为消息管道,http.Flusher实现实时推送——无需任何第三方SSE库
  • endpoint协商机制:SSE连接建立后,Server必须先发送一个endpoint事件,告知Client后续POST消息的目标URL(含session ID)
  • 生产级加固:心跳机制防止代理超时断连,context.Context实现优雅关闭,sync.RWMutex保障并发安全

为什么用Go实现MCP SSE Transport

MCP(Model Context Protocol)的SSE传输层本质是一个长连接事件流服务。选择Go语言实现它有3个核心优势:

goroutine的轻量并发模型完美匹配SSE场景。 每个SSE客户端连接需要一个独立的执行上下文来维持长连接、监听消息、发送心跳。Go的goroutine仅占用约4KB初始栈空间,单机轻松支撑10万级并发连接——同样的连接数在Node.js中需要精心管理事件循环,在Java中需要虚拟线程或响应式框架。

channel机制是事件流的天然抽象。 SSE的核心模式是"生产者向消费者推送事件"。Go的channel正是为这种模式设计的:消息处理器将JSON-RPC响应写入channel,SSE handler从channel读取并flush到HTTP流。零额外依赖,零回调地狱。

标准库net/http原生支持Flusher接口。 Go的http.ResponseWriter可以断言为http.Flusher,调用Flush()立即将缓冲区数据推送到客户端。不需要任何SSE框架,标准库就是最好的SSE框架。

MCP SSE Transport协议规范解析

MCP协议定义了两种传输方式:stdio和HTTP+SSE。SSE传输层的核心设计是双通道架构——一个用于Server到Client的事件推送,另一个用于Client到Server的消息发送。

sequenceDiagram participant Client as MCP Client participant Server as MCP Server Client->>Server: GET /sse Note right of Server: 建立SSE长连接 Server-->>Client: event: endpoint
data: /message?sessionId=abc123 Note left of Client: 获知POST目标地址 Client->>Server: POST /message?sessionId=abc123
Content-Type: application/json
{"jsonrpc":"2.0","method":"initialize",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{...}} Client->>Server: POST /message?sessionId=abc123
{"jsonrpc":"2.0","method":"tools/list",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{"tools":...}} Client->>Server: POST /message?sessionId=abc123
{"jsonrpc":"2.0","method":"tools/call",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{"content":...}}

关键协议细节:

  1. GET /sse 建立SSE连接后,Server必须首先发送一个event: endpoint事件,data字段携带该session对应的POST URL(包含唯一的sessionId查询参数,通常建议使用 UUID生成器 生成可靠的会话ID)
  2. POST /message 接收Client发来的JSON-RPC 2.0消息(如initializetools/listtools/call),Server处理后将响应通过SSE通道推送回去
  3. SSE事件格式遵循标准规范:event: message\ndata: {JSON}\n\n
  4. 连接断开时Server需要清理对应session的所有资源

从零实现:核心数据结构

先定义SSE Transport的核心结构体。每个客户端连接对应一个SessionSSETransport管理所有活跃session:

go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/google/uuid"
)

type JSONRPCMessage struct {
	JSONRPC string          `json:"jsonrpc"`
	ID      any             `json:"id,omitempty"`
	Method  string          `json:"method,omitempty"`
	Params  json.RawMessage `json:"params,omitempty"`
	Result  json.RawMessage `json:"result,omitempty"`
	Error   *JSONRPCError   `json:"error,omitempty"`
}

type JSONRPCError struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
}

type Session struct {
	ID       string
	messages chan JSONRPCMessage
	done     chan struct{}
	ctx      context.Context
	cancel   context.CancelFunc
}

type SSETransport struct {
	sessions   map[string]*Session
	mu         sync.RWMutex
	basePath   string
	onMessage  func(session *Session, msg JSONRPCMessage)
	heartbeat  time.Duration
}

func NewSSETransport(basePath string, heartbeat time.Duration) *SSETransport {
	return &SSETransport{
		sessions:  make(map[string]*Session),
		basePath:  basePath,
		heartbeat: heartbeat,
	}
}

func (t *SSETransport) SetMessageHandler(handler func(session *Session, msg JSONRPCMessage)) {
	t.onMessage = handler
}

func (t *SSETransport) SendToSession(sessionID string, msg JSONRPCMessage) error {
	t.mu.RLock()
	session, exists := t.sessions[sessionID]
	t.mu.RUnlock()

	if !exists {
		return fmt.Errorf("session %s not found", sessionID)
	}

	select {
	case session.messages <- msg:
		return nil
	case <-session.ctx.Done():
		return fmt.Errorf("session %s closed", sessionID)
	case <-time.After(5 * time.Second):
		return fmt.Errorf("send to session %s timed out", sessionID)
	}
}

设计要点: Session.messages是一个带缓冲的channel,充当SSE事件的发送队列。sync.RWMutex保护session map的并发读写——读操作(查询session)远多于写操作(创建/删除session),读写锁比互斥锁性能更优。

实现SSE连接处理器

SSE连接处理器是整个Transport的核心。它需要完成4件事:创建session、发送endpoint事件、持续推送消息、处理断连。

go
func (t *SSETransport) HandleSSE(w http.ResponseWriter, r *http.Request) {
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "streaming not supported", http.StatusInternalServerError)
		return
	}

	ctx, cancel := context.WithCancel(r.Context())
	session := &Session{
		ID:       uuid.New().String(),
		messages: make(chan JSONRPCMessage, 64),
		done:     make(chan struct{}),
		ctx:      ctx,
		cancel:   cancel,
	}

	t.mu.Lock()
	t.sessions[session.ID] = session
	t.mu.Unlock()

	defer func() {
		t.mu.Lock()
		delete(t.sessions, session.ID)
		t.mu.Unlock()
		cancel()
		close(session.done)
		log.Printf("session %s disconnected", session.ID)
	}()

	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("X-Accel-Buffering", "no")

	endpoint := fmt.Sprintf("%s/message?sessionId=%s", t.basePath, session.ID)
	fmt.Fprintf(w, "event: endpoint\ndata: %s\n\n", endpoint)
	flusher.Flush()
	log.Printf("session %s connected, endpoint: %s", session.ID, endpoint)

	heartbeatTicker := time.NewTicker(t.heartbeat)
	defer heartbeatTicker.Stop()

	for {
		select {
		case msg := <-session.messages:
			data, err := json.Marshal(msg)
			if err != nil {
				log.Printf("marshal error for session %s: %v", session.ID, err)
				continue
			}
			fmt.Fprintf(w, "event: message\ndata: %s\n\n", data)
			flusher.Flush()

		case <-heartbeatTicker.C:
			fmt.Fprintf(w, ": heartbeat\n\n")
			flusher.Flush()

		case <-ctx.Done():
			return
		}
	}
}

3个关键实现细节:

  • X-Accel-Buffering: no — 告知Nginx等反向代理禁用响应缓冲,否则SSE事件会被攒成批次再发送,客户端感知到巨大延迟
  • 心跳机制 — 每隔固定间隔发送SSE注释行(: heartbeat\n\n),防止中间代理因空闲超时关闭连接。30秒是常用值,根据你的基础设施调整
  • defer清理 — session断连后立即从map中移除并关闭context,防止消息堆积和goroutine泄漏

实现消息接收处理器

POST /message端点接收Client发来的JSON-RPC请求,通过sessionId路由到对应session:

go
func (t *SSETransport) HandleMessage(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}

	sessionID := r.URL.Query().Get("sessionId")
	if sessionID == "" {
		http.Error(w, "missing sessionId", http.StatusBadRequest)
		return
	}

	t.mu.RLock()
	session, exists := t.sessions[sessionID]
	t.mu.RUnlock()

	if !exists {
		http.Error(w, "session not found", http.StatusNotFound)
		return
	}

	var msg JSONRPCMessage
	decoder := json.NewDecoder(r.Body)
	decoder.DisallowUnknownFields()
	if err := decoder.Decode(&msg); err != nil {
		resp := JSONRPCMessage{
			JSONRPC: "2.0",
			Error: &JSONRPCError{
				Code:    -32700,
				Message: "parse error: " + err.Error(),
			},
		}
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusBadRequest)
		json.NewEncoder(w).Encode(resp)
		return
	}

	if t.onMessage != nil {
		t.onMessage(session, msg)
	}

	w.WriteHeader(http.StatusAccepted)
}

POST handler的职责是接收和路由,不直接返回JSON-RPC响应。响应通过SSE通道异步推送——这正是MCP SSE Transport双通道架构的核心设计。Client发送POST后收到202 Accepted,随后从SSE流中读取实际响应。

如果你在调试阶段遇到JSON解析问题,推荐使用JSON格式化工具验证请求体结构是否符合JSON-RPC 2.0规范。

集成MCP Server

将SSE Transport与MCP协议逻辑组装到一起。以下代码注册了一个示例Tool,处理initializetools/listtools/call三个核心方法:

go
func main() {
	transport := NewSSETransport("", 30*time.Second)

	transport.SetMessageHandler(func(session *Session, msg JSONRPCMessage) {
		var response JSONRPCMessage
		response.JSONRPC = "2.0"
		response.ID = msg.ID

		switch msg.Method {
		case "initialize":
			result, _ := json.Marshal(map[string]any{
				"protocolVersion": "2024-11-05",
				"capabilities": map[string]any{
					"tools": map[string]any{},
				},
				"serverInfo": map[string]any{
					"name":    "go-mcp-server",
					"version": "1.0.0",
				},
			})
			response.Result = result

		case "notifications/initialized":
			return

		case "tools/list":
			result, _ := json.Marshal(map[string]any{
				"tools": []map[string]any{
					{
						"name":        "get_timestamp",
						"description": "Returns the current Unix timestamp",
						"inputSchema": map[string]any{
							"type":       "object",
							"properties": map[string]any{},
						},
					},
				},
			})
			response.Result = result

		case "tools/call":
			var params struct {
				Name string `json:"name"`
			}
			json.Unmarshal(msg.Params, &params)

			if params.Name == "get_timestamp" {
				content, _ := json.Marshal(map[string]any{
					"content": []map[string]any{
						{
							"type": "text",
							"text": fmt.Sprintf("Current timestamp: %d", time.Now().Unix()),
						},
					},
				})
				response.Result = content
			} else {
				response.Error = &JSONRPCError{
					Code:    -32601,
					Message: "tool not found: " + params.Name,
				}
			}

		default:
			response.Error = &JSONRPCError{
				Code:    -32601,
				Message: "method not found: " + msg.Method,
			}
		}

		if err := transport.SendToSession(session.ID, response); err != nil {
			log.Printf("failed to send response: %v", err)
		}
	})

	mux := http.NewServeMux()
	mux.HandleFunc("/sse", transport.HandleSSE)
	mux.HandleFunc("/message", transport.HandleMessage)

	server := &http.Server{
		Addr:         ":8080",
		Handler:      mux,
		ReadTimeout:  10 * time.Second,
		WriteTimeout: 0,
		IdleTimeout:  120 * time.Second,
	}

	log.Println("MCP SSE Server listening on :8080")
	log.Println("  SSE endpoint:     GET  /sse")
	log.Println("  Message endpoint: POST /message?sessionId=<id>")

	if err := server.ListenAndServe(); err != nil {
		log.Fatal(err)
	}
}

WriteTimeout: 0是关键配置。 SSE连接是长连接,如果设置了WriteTimeout,连接会在超时后被强制关闭。将其设为0表示不限制写超时,由心跳机制和客户端断连来控制连接生命周期。

启动Server后,用curl验证:

bash
# 终端1:建立SSE连接
curl -N http://localhost:8080/sse

# 输出:
# event: endpoint
# data: /message?sessionId=550e8400-e29b-41d4-a716-446655440000

# 终端2:发送initialize请求
curl -X POST "http://localhost:8080/message?sessionId=550e8400-e29b-41d4-a716-446655440000" \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{}}}'

# 终端1的SSE流会收到:
# event: message
# data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"serverInfo":{"name":"go-mcp-server","version":"1.0.0"}}}

在开发MCP Server时,你经常需要将JSON响应结构转换为Go struct。JSON to Go在线转换工具可以将任意JSON自动生成带标签的Go结构体定义,大幅提升开发效率。

生产环境加固

从Demo到生产部署,需要关注4个维度:

优雅关闭

捕获系统信号,主动关闭所有SSE连接后再退出:

go
func (t *SSETransport) Shutdown(ctx context.Context) {
	t.mu.Lock()
	sessions := make([]*Session, 0, len(t.sessions))
	for _, s := range t.sessions {
		sessions = append(sessions, s)
	}
	t.mu.Unlock()

	for _, s := range sessions {
		s.cancel()
		select {
		case <-s.done:
		case <-ctx.Done():
			return
		}
	}
}

main函数中配合signal.NotifyContext使用:

go
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

go func() {
	if err := server.ListenAndServe(); err != http.ErrServerClosed {
		log.Fatal(err)
	}
}()

<-ctx.Done()
log.Println("shutting down...")

shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

transport.Shutdown(shutdownCtx)
server.Shutdown(shutdownCtx)

超时与限流

配置项 推荐值 说明
ReadTimeout 10s 防止慢读攻击
WriteTimeout 0 SSE长连接必须禁用
IdleTimeout 120s 空闲连接回收
心跳间隔 30s 低于多数代理的60s超时
消息channel缓冲 64 突发消息缓冲,根据负载调整
POST body限制 1MB 使用http.MaxBytesReader限制

并发安全检查清单

  • sync.RWMutex保护session map:读操作用RLock,写操作用Lock
  • channel发送使用select+超时,避免goroutine永久阻塞
  • context.Context传播取消信号,确保资源及时释放
  • 避免在持有锁时执行I/O操作(如网络调用)

错误恢复

对message handler添加panic recovery,防止单个请求的panic导致整个Server崩溃:

go
func (t *SSETransport) safeHandleMessage(session *Session, msg JSONRPCMessage) {
	defer func() {
		if r := recover(); r != nil {
			log.Printf("panic recovered in session %s: %v", session.ID, r)
			errResp := JSONRPCMessage{
				JSONRPC: "2.0",
				ID:      msg.ID,
				Error: &JSONRPCError{
					Code:    -32603,
					Message: "internal error",
				},
			}
			transport.SendToSession(session.ID, errResp)
		}
	}()
	t.onMessage(session, msg)
}

进阶阅读

如果你对MCP协议还不熟悉,建议先阅读这两篇基础文章:

FAQ

Q: SSE连接被Nginx/CloudFlare等中间代理频繁断开怎么办? A: 确保设置了X-Accel-Buffering: no响应头(Nginx)和适当的心跳间隔。CloudFlare默认100秒超时,心跳间隔需低于此值。同时在Nginx配置中设置proxy_read_timeout 86400sproxy_buffering off

Q: 如何监控SSE Transport的运行状态? A: 暴露一个/healthz端点,返回当前活跃session数、消息吞吐量、goroutine数量(runtime.NumGoroutine())。结合Prometheus的gauge和counter指标,实现细粒度的可观测性。

Q: 单机能支撑多少SSE并发连接? A: Go的goroutine模型下,4核8GB服务器通常可支撑5万以上的SSE并发连接。瓶颈通常在操作系统文件描述符限制(需调整ulimit -n)和网络带宽,而非Go本身的并发能力。

Q: MCP SSE Transport与WebSocket Transport相比有什么优劣? A: SSE更简单——基于标准HTTP,天然兼容所有CDN和代理,客户端用EventSource API即可。WebSocket支持真正的双向通信,但需要额外的协议升级握手,部分企业防火墙可能拦截。MCP协议目前更推荐SSE作为HTTP传输方案。

Q: 如何为SSE Transport添加认证? A: 在HandleSSEHandleMessage前添加中间件,验证Authorization请求头中的Bearer Token。SSE连接建立时验证一次,后续POST请求每次都需验证。Token验证通过后将用户信息注入session上下文。

总结

Go语言实现MCP SSE Transport的核心优势在于:goroutine作为连接容器、channel作为消息管道、http.Flusher作为推送引擎——三者构成一个优雅的事件流处理模型,无需任何第三方依赖。

本文实现的约300行代码覆盖了SSE传输层的完整生命周期:连接建立→endpoint协商→消息路由→心跳保活→优雅关闭。在此基础上,你可以继续扩展认证、限流、可观测性等生产特性。

在开发MCP Server的过程中,你会频繁处理JSON-RPC消息结构和Go类型定义。推荐使用JSON to Go转换工具快速生成struct,以及JSON格式化工具调试协议消息——这两个工具能显著提升MCP开发体验。