TL;DR: Go的goroutine+channel模型天然适配SSE事件流,net/http标准库原生支持http.Flusher接口。本文从MCP协议SSE传输规范入手,用约300行Go代码从零实现一个完整的SSE Transport——涵盖连接管理、JSON-RPC路由、心跳保活与优雅关闭,最终集成为可部署的MCP Server。
Key Takeaways
- 双通道架构:MCP SSE Transport通过
GET /sse建立Server→Client推送通道,通过POST /message接收Client→Server的JSON-RPC请求,两个HTTP端点协同完成双向通信 - Go天然适配:每个SSE连接由一个goroutine维护,channel作为消息管道,
http.Flusher实现实时推送——无需任何第三方SSE库 - endpoint协商机制:SSE连接建立后,Server必须先发送一个
endpoint事件,告知Client后续POST消息的目标URL(含session ID) - 生产级加固:心跳机制防止代理超时断连,
context.Context实现优雅关闭,sync.RWMutex保障并发安全
为什么用Go实现MCP SSE Transport
MCP(Model Context Protocol)的SSE传输层本质是一个长连接事件流服务。选择Go语言实现它有3个核心优势:
goroutine的轻量并发模型完美匹配SSE场景。 每个SSE客户端连接需要一个独立的执行上下文来维持长连接、监听消息、发送心跳。Go的goroutine仅占用约4KB初始栈空间,单机轻松支撑10万级并发连接——同样的连接数在Node.js中需要精心管理事件循环,在Java中需要虚拟线程或响应式框架。
channel机制是事件流的天然抽象。 SSE的核心模式是"生产者向消费者推送事件"。Go的channel正是为这种模式设计的:消息处理器将JSON-RPC响应写入channel,SSE handler从channel读取并flush到HTTP流。零额外依赖,零回调地狱。
标准库net/http原生支持Flusher接口。 Go的http.ResponseWriter可以断言为http.Flusher,调用Flush()立即将缓冲区数据推送到客户端。不需要任何SSE框架,标准库就是最好的SSE框架。
MCP SSE Transport协议规范解析
MCP协议定义了两种传输方式:stdio和HTTP+SSE。SSE传输层的核心设计是双通道架构——一个用于Server到Client的事件推送,另一个用于Client到Server的消息发送。
data: /message?sessionId=abc123 Note left of Client: 获知POST目标地址 Client->>Server: POST /message?sessionId=abc123
Content-Type: application/json
{"jsonrpc":"2.0","method":"initialize",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{...}} Client->>Server: POST /message?sessionId=abc123
{"jsonrpc":"2.0","method":"tools/list",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{"tools":...}} Client->>Server: POST /message?sessionId=abc123
{"jsonrpc":"2.0","method":"tools/call",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{"content":...}}
关键协议细节:
- GET /sse 建立SSE连接后,Server必须首先发送一个
event: endpoint事件,data字段携带该session对应的POST URL(包含唯一的sessionId查询参数,通常建议使用 UUID生成器 生成可靠的会话ID) - POST /message 接收Client发来的JSON-RPC 2.0消息(如
initialize、tools/list、tools/call),Server处理后将响应通过SSE通道推送回去 - SSE事件格式遵循标准规范:
event: message\ndata: {JSON}\n\n - 连接断开时Server需要清理对应session的所有资源
从零实现:核心数据结构
先定义SSE Transport的核心结构体。每个客户端连接对应一个Session,SSETransport管理所有活跃session:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/google/uuid"
)
type JSONRPCMessage struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *JSONRPCError `json:"error,omitempty"`
}
type JSONRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}
type Session struct {
ID string
messages chan JSONRPCMessage
done chan struct{}
ctx context.Context
cancel context.CancelFunc
}
type SSETransport struct {
sessions map[string]*Session
mu sync.RWMutex
basePath string
onMessage func(session *Session, msg JSONRPCMessage)
heartbeat time.Duration
}
func NewSSETransport(basePath string, heartbeat time.Duration) *SSETransport {
return &SSETransport{
sessions: make(map[string]*Session),
basePath: basePath,
heartbeat: heartbeat,
}
}
func (t *SSETransport) SetMessageHandler(handler func(session *Session, msg JSONRPCMessage)) {
t.onMessage = handler
}
func (t *SSETransport) SendToSession(sessionID string, msg JSONRPCMessage) error {
t.mu.RLock()
session, exists := t.sessions[sessionID]
t.mu.RUnlock()
if !exists {
return fmt.Errorf("session %s not found", sessionID)
}
select {
case session.messages <- msg:
return nil
case <-session.ctx.Done():
return fmt.Errorf("session %s closed", sessionID)
case <-time.After(5 * time.Second):
return fmt.Errorf("send to session %s timed out", sessionID)
}
}
设计要点: Session.messages是一个带缓冲的channel,充当SSE事件的发送队列。sync.RWMutex保护session map的并发读写——读操作(查询session)远多于写操作(创建/删除session),读写锁比互斥锁性能更优。
实现SSE连接处理器
SSE连接处理器是整个Transport的核心。它需要完成4件事:创建session、发送endpoint事件、持续推送消息、处理断连。
func (t *SSETransport) HandleSSE(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
ctx, cancel := context.WithCancel(r.Context())
session := &Session{
ID: uuid.New().String(),
messages: make(chan JSONRPCMessage, 64),
done: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
t.mu.Lock()
t.sessions[session.ID] = session
t.mu.Unlock()
defer func() {
t.mu.Lock()
delete(t.sessions, session.ID)
t.mu.Unlock()
cancel()
close(session.done)
log.Printf("session %s disconnected", session.ID)
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
endpoint := fmt.Sprintf("%s/message?sessionId=%s", t.basePath, session.ID)
fmt.Fprintf(w, "event: endpoint\ndata: %s\n\n", endpoint)
flusher.Flush()
log.Printf("session %s connected, endpoint: %s", session.ID, endpoint)
heartbeatTicker := time.NewTicker(t.heartbeat)
defer heartbeatTicker.Stop()
for {
select {
case msg := <-session.messages:
data, err := json.Marshal(msg)
if err != nil {
log.Printf("marshal error for session %s: %v", session.ID, err)
continue
}
fmt.Fprintf(w, "event: message\ndata: %s\n\n", data)
flusher.Flush()
case <-heartbeatTicker.C:
fmt.Fprintf(w, ": heartbeat\n\n")
flusher.Flush()
case <-ctx.Done():
return
}
}
}
3个关键实现细节:
- X-Accel-Buffering: no — 告知Nginx等反向代理禁用响应缓冲,否则SSE事件会被攒成批次再发送,客户端感知到巨大延迟
- 心跳机制 — 每隔固定间隔发送SSE注释行(
: heartbeat\n\n),防止中间代理因空闲超时关闭连接。30秒是常用值,根据你的基础设施调整 - defer清理 — session断连后立即从map中移除并关闭context,防止消息堆积和goroutine泄漏
实现消息接收处理器
POST /message端点接收Client发来的JSON-RPC请求,通过sessionId路由到对应session:
func (t *SSETransport) HandleMessage(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
sessionID := r.URL.Query().Get("sessionId")
if sessionID == "" {
http.Error(w, "missing sessionId", http.StatusBadRequest)
return
}
t.mu.RLock()
session, exists := t.sessions[sessionID]
t.mu.RUnlock()
if !exists {
http.Error(w, "session not found", http.StatusNotFound)
return
}
var msg JSONRPCMessage
decoder := json.NewDecoder(r.Body)
decoder.DisallowUnknownFields()
if err := decoder.Decode(&msg); err != nil {
resp := JSONRPCMessage{
JSONRPC: "2.0",
Error: &JSONRPCError{
Code: -32700,
Message: "parse error: " + err.Error(),
},
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(resp)
return
}
if t.onMessage != nil {
t.onMessage(session, msg)
}
w.WriteHeader(http.StatusAccepted)
}
POST handler的职责是接收和路由,不直接返回JSON-RPC响应。响应通过SSE通道异步推送——这正是MCP SSE Transport双通道架构的核心设计。Client发送POST后收到202 Accepted,随后从SSE流中读取实际响应。
如果你在调试阶段遇到JSON解析问题,推荐使用JSON格式化工具验证请求体结构是否符合JSON-RPC 2.0规范。
集成MCP Server
将SSE Transport与MCP协议逻辑组装到一起。以下代码注册了一个示例Tool,处理initialize、tools/list和tools/call三个核心方法:
func main() {
transport := NewSSETransport("", 30*time.Second)
transport.SetMessageHandler(func(session *Session, msg JSONRPCMessage) {
var response JSONRPCMessage
response.JSONRPC = "2.0"
response.ID = msg.ID
switch msg.Method {
case "initialize":
result, _ := json.Marshal(map[string]any{
"protocolVersion": "2024-11-05",
"capabilities": map[string]any{
"tools": map[string]any{},
},
"serverInfo": map[string]any{
"name": "go-mcp-server",
"version": "1.0.0",
},
})
response.Result = result
case "notifications/initialized":
return
case "tools/list":
result, _ := json.Marshal(map[string]any{
"tools": []map[string]any{
{
"name": "get_timestamp",
"description": "Returns the current Unix timestamp",
"inputSchema": map[string]any{
"type": "object",
"properties": map[string]any{},
},
},
},
})
response.Result = result
case "tools/call":
var params struct {
Name string `json:"name"`
}
json.Unmarshal(msg.Params, ¶ms)
if params.Name == "get_timestamp" {
content, _ := json.Marshal(map[string]any{
"content": []map[string]any{
{
"type": "text",
"text": fmt.Sprintf("Current timestamp: %d", time.Now().Unix()),
},
},
})
response.Result = content
} else {
response.Error = &JSONRPCError{
Code: -32601,
Message: "tool not found: " + params.Name,
}
}
default:
response.Error = &JSONRPCError{
Code: -32601,
Message: "method not found: " + msg.Method,
}
}
if err := transport.SendToSession(session.ID, response); err != nil {
log.Printf("failed to send response: %v", err)
}
})
mux := http.NewServeMux()
mux.HandleFunc("/sse", transport.HandleSSE)
mux.HandleFunc("/message", transport.HandleMessage)
server := &http.Server{
Addr: ":8080",
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 0,
IdleTimeout: 120 * time.Second,
}
log.Println("MCP SSE Server listening on :8080")
log.Println(" SSE endpoint: GET /sse")
log.Println(" Message endpoint: POST /message?sessionId=<id>")
if err := server.ListenAndServe(); err != nil {
log.Fatal(err)
}
}
WriteTimeout: 0是关键配置。 SSE连接是长连接,如果设置了WriteTimeout,连接会在超时后被强制关闭。将其设为0表示不限制写超时,由心跳机制和客户端断连来控制连接生命周期。
启动Server后,用curl验证:
# 终端1:建立SSE连接
curl -N http://localhost:8080/sse
# 输出:
# event: endpoint
# data: /message?sessionId=550e8400-e29b-41d4-a716-446655440000
# 终端2:发送initialize请求
curl -X POST "http://localhost:8080/message?sessionId=550e8400-e29b-41d4-a716-446655440000" \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{}}}'
# 终端1的SSE流会收到:
# event: message
# data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"serverInfo":{"name":"go-mcp-server","version":"1.0.0"}}}
在开发MCP Server时,你经常需要将JSON响应结构转换为Go struct。JSON to Go在线转换工具可以将任意JSON自动生成带标签的Go结构体定义,大幅提升开发效率。
生产环境加固
从Demo到生产部署,需要关注4个维度:
优雅关闭
捕获系统信号,主动关闭所有SSE连接后再退出:
func (t *SSETransport) Shutdown(ctx context.Context) {
t.mu.Lock()
sessions := make([]*Session, 0, len(t.sessions))
for _, s := range t.sessions {
sessions = append(sessions, s)
}
t.mu.Unlock()
for _, s := range sessions {
s.cancel()
select {
case <-s.done:
case <-ctx.Done():
return
}
}
}
在main函数中配合signal.NotifyContext使用:
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
<-ctx.Done()
log.Println("shutting down...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
transport.Shutdown(shutdownCtx)
server.Shutdown(shutdownCtx)
超时与限流
| 配置项 | 推荐值 | 说明 |
|---|---|---|
| ReadTimeout | 10s | 防止慢读攻击 |
| WriteTimeout | 0 | SSE长连接必须禁用 |
| IdleTimeout | 120s | 空闲连接回收 |
| 心跳间隔 | 30s | 低于多数代理的60s超时 |
| 消息channel缓冲 | 64 | 突发消息缓冲,根据负载调整 |
| POST body限制 | 1MB | 使用http.MaxBytesReader限制 |
并发安全检查清单
sync.RWMutex保护session map:读操作用RLock,写操作用Lock- channel发送使用
select+超时,避免goroutine永久阻塞 context.Context传播取消信号,确保资源及时释放- 避免在持有锁时执行I/O操作(如网络调用)
错误恢复
对message handler添加panic recovery,防止单个请求的panic导致整个Server崩溃:
func (t *SSETransport) safeHandleMessage(session *Session, msg JSONRPCMessage) {
defer func() {
if r := recover(); r != nil {
log.Printf("panic recovered in session %s: %v", session.ID, r)
errResp := JSONRPCMessage{
JSONRPC: "2.0",
ID: msg.ID,
Error: &JSONRPCError{
Code: -32603,
Message: "internal error",
},
}
transport.SendToSession(session.ID, errResp)
}
}()
t.onMessage(session, msg)
}
进阶阅读
如果你对MCP协议还不熟悉,建议先阅读这两篇基础文章:
- MCP协议深度解析:构建AI应用的新范式 — 从零理解MCP的三大原语(Tools/Resources/Prompts)和JSON-RPC通信机制
- MCP协议高阶实战:构建企业级带认证的流式Server — 在Node.js中集成JWT鉴权、流式数据传输和多租户隔离
FAQ
Q: SSE连接被Nginx/CloudFlare等中间代理频繁断开怎么办?
A: 确保设置了X-Accel-Buffering: no响应头(Nginx)和适当的心跳间隔。CloudFlare默认100秒超时,心跳间隔需低于此值。同时在Nginx配置中设置proxy_read_timeout 86400s和proxy_buffering off。
Q: 如何监控SSE Transport的运行状态?
A: 暴露一个/healthz端点,返回当前活跃session数、消息吞吐量、goroutine数量(runtime.NumGoroutine())。结合Prometheus的gauge和counter指标,实现细粒度的可观测性。
Q: 单机能支撑多少SSE并发连接?
A: Go的goroutine模型下,4核8GB服务器通常可支撑5万以上的SSE并发连接。瓶颈通常在操作系统文件描述符限制(需调整ulimit -n)和网络带宽,而非Go本身的并发能力。
Q: MCP SSE Transport与WebSocket Transport相比有什么优劣?
A: SSE更简单——基于标准HTTP,天然兼容所有CDN和代理,客户端用EventSource API即可。WebSocket支持真正的双向通信,但需要额外的协议升级握手,部分企业防火墙可能拦截。MCP协议目前更推荐SSE作为HTTP传输方案。
Q: 如何为SSE Transport添加认证?
A: 在HandleSSE和HandleMessage前添加中间件,验证Authorization请求头中的Bearer Token。SSE连接建立时验证一次,后续POST请求每次都需验证。Token验证通过后将用户信息注入session上下文。
总结
Go语言实现MCP SSE Transport的核心优势在于:goroutine作为连接容器、channel作为消息管道、http.Flusher作为推送引擎——三者构成一个优雅的事件流处理模型,无需任何第三方依赖。
本文实现的约300行代码覆盖了SSE传输层的完整生命周期:连接建立→endpoint协商→消息路由→心跳保活→优雅关闭。在此基础上,你可以继续扩展认证、限流、可观测性等生产特性。
在开发MCP Server的过程中,你会频繁处理JSON-RPC消息结构和Go类型定义。推荐使用JSON to Go转换工具快速生成struct,以及JSON格式化工具调试协议消息——这两个工具能显著提升MCP开发体验。