TL;DR: Go's goroutine + channel model is a natural fit for SSE event streams, and the standard library's net/http natively supports the http.Flusher interface. This article walks through the MCP protocol's SSE transport specification and implements a complete SSE Transport in ~300 lines of Go — covering connection management, JSON-RPC routing, heartbeat keep-alive, and graceful shutdown, culminating in a deployable MCP Server.
Key Takeaways
- Dual-channel architecture: MCP SSE Transport uses
GET /ssefor server-to-client event pushing andPOST /messagefor client-to-server JSON-RPC requests — two HTTP endpoints working in concert for bidirectional communication - Go's natural fit: Each SSE connection lives in its own goroutine, channels serve as message pipelines, and
http.Flusherenables real-time pushing — no third-party SSE libraries needed - Endpoint negotiation: After establishing the SSE connection, the server must send an
endpointevent telling the client which URL to POST messages to (including a unique session ID) - Production hardening: Heartbeats prevent proxy timeouts,
context.Contextenables graceful shutdown, andsync.RWMutexensures concurrent safety
Why Go for MCP SSE Transport
The SSE transport layer of MCP (Model Context Protocol) is fundamentally a long-lived connection event streaming service. Go offers three compelling advantages for this use case:
Goroutines are lightweight concurrency primitives that perfectly match SSE. Each SSE client connection needs a dedicated execution context to maintain the long-lived connection, listen for messages, and send heartbeats. Go goroutines consume only ~4KB of initial stack space, allowing a single machine to comfortably handle 100,000+ concurrent connections. The same connection count in Node.js requires careful event loop management; in Java, it demands virtual threads or reactive frameworks.
Channels are a natural abstraction for event streams. SSE's core pattern is "producer pushes events to consumer." Go channels are designed precisely for this: message handlers write JSON-RPC responses to a channel, and the SSE handler reads from it and flushes to the HTTP stream. Zero external dependencies, zero callback hell.
The standard library's net/http natively supports the Flusher interface. Go's http.ResponseWriter can be type-asserted to http.Flusher, and calling Flush() immediately pushes buffered data to the client. No SSE framework needed — the standard library is the best SSE framework.
MCP SSE Transport Protocol Deep Dive
The MCP protocol defines two transport mechanisms: stdio and HTTP+SSE. The SSE transport layer's core design is a dual-channel architecture — one channel for server-to-client event pushing, another for client-to-server message sending.
data: /message?sessionId=abc123 Note left of Client: Learn POST target URL Client->>Server: POST /message?sessionId=abc123
Content-Type: application/json
{"jsonrpc":"2.0","method":"initialize",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{...}} Client->>Server: POST /message?sessionId=abc123
{"jsonrpc":"2.0","method":"tools/list",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{"tools":...}} Client->>Server: POST /message?sessionId=abc123
{"jsonrpc":"2.0","method":"tools/call",...} Server-->>Client: event: message
data: {"jsonrpc":"2.0","result":{"content":...}}
Key protocol details:
- GET /sse — After establishing the SSE connection, the server must first send an
event: endpointevent. Thedatafield carries the POST URL for this session (including a unique sessionId query parameter, typically generated using a reliable UUID Generator) - POST /message — Receives JSON-RPC 2.0 messages from the client (e.g.,
initialize,tools/list,tools/call). The server processes them and pushes responses back through the SSE channel - SSE event formatting follows the standard spec:
event: message\ndata: {JSON}\n\n - When the connection drops, the server must clean up all resources associated with that session
Building the Core Data Structures
Start by defining the core structs for the SSE Transport. Each client connection maps to a Session, and SSETransport manages all active sessions:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/google/uuid"
)
type JSONRPCMessage struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *JSONRPCError `json:"error,omitempty"`
}
type JSONRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}
type Session struct {
ID string
messages chan JSONRPCMessage
done chan struct{}
ctx context.Context
cancel context.CancelFunc
}
type SSETransport struct {
sessions map[string]*Session
mu sync.RWMutex
basePath string
onMessage func(session *Session, msg JSONRPCMessage)
heartbeat time.Duration
}
func NewSSETransport(basePath string, heartbeat time.Duration) *SSETransport {
return &SSETransport{
sessions: make(map[string]*Session),
basePath: basePath,
heartbeat: heartbeat,
}
}
func (t *SSETransport) SetMessageHandler(handler func(session *Session, msg JSONRPCMessage)) {
t.onMessage = handler
}
func (t *SSETransport) SendToSession(sessionID string, msg JSONRPCMessage) error {
t.mu.RLock()
session, exists := t.sessions[sessionID]
t.mu.RUnlock()
if !exists {
return fmt.Errorf("session %s not found", sessionID)
}
select {
case session.messages <- msg:
return nil
case <-session.ctx.Done():
return fmt.Errorf("session %s closed", sessionID)
case <-time.After(5 * time.Second):
return fmt.Errorf("send to session %s timed out", sessionID)
}
}
Design notes: Session.messages is a buffered channel that acts as the SSE event send queue. sync.RWMutex protects concurrent read/write access to the session map — reads (session lookups) vastly outnumber writes (session creation/deletion), making an RWMutex more performant than a regular Mutex.
Implementing the SSE Connection Handler
The SSE connection handler is the heart of the transport. It performs 4 tasks: create a session, send the endpoint event, continuously push messages, and handle disconnection.
func (t *SSETransport) HandleSSE(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
ctx, cancel := context.WithCancel(r.Context())
session := &Session{
ID: uuid.New().String(),
messages: make(chan JSONRPCMessage, 64),
done: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
t.mu.Lock()
t.sessions[session.ID] = session
t.mu.Unlock()
defer func() {
t.mu.Lock()
delete(t.sessions, session.ID)
t.mu.Unlock()
cancel()
close(session.done)
log.Printf("session %s disconnected", session.ID)
}()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
endpoint := fmt.Sprintf("%s/message?sessionId=%s", t.basePath, session.ID)
fmt.Fprintf(w, "event: endpoint\ndata: %s\n\n", endpoint)
flusher.Flush()
log.Printf("session %s connected, endpoint: %s", session.ID, endpoint)
heartbeatTicker := time.NewTicker(t.heartbeat)
defer heartbeatTicker.Stop()
for {
select {
case msg := <-session.messages:
data, err := json.Marshal(msg)
if err != nil {
log.Printf("marshal error for session %s: %v", session.ID, err)
continue
}
fmt.Fprintf(w, "event: message\ndata: %s\n\n", data)
flusher.Flush()
case <-heartbeatTicker.C:
fmt.Fprintf(w, ": heartbeat\n\n")
flusher.Flush()
case <-ctx.Done():
return
}
}
}
Three critical implementation details:
- X-Accel-Buffering: no — Tells Nginx and similar reverse proxies to disable response buffering. Without this, SSE events get batched before delivery, causing severe client-side latency
- Heartbeat mechanism — Sends an SSE comment line (
: heartbeat\n\n) at regular intervals to prevent intermediary proxies from closing the connection due to idle timeouts. 30 seconds is a common value; adjust based on your infrastructure - Deferred cleanup — Immediately removes the session from the map and cancels the context on disconnect, preventing message accumulation and goroutine leaks
Implementing the Message Handler
The POST /message endpoint receives JSON-RPC requests from the client and routes them to the appropriate session via sessionId:
func (t *SSETransport) HandleMessage(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
sessionID := r.URL.Query().Get("sessionId")
if sessionID == "" {
http.Error(w, "missing sessionId", http.StatusBadRequest)
return
}
t.mu.RLock()
session, exists := t.sessions[sessionID]
t.mu.RUnlock()
if !exists {
http.Error(w, "session not found", http.StatusNotFound)
return
}
var msg JSONRPCMessage
decoder := json.NewDecoder(r.Body)
decoder.DisallowUnknownFields()
if err := decoder.Decode(&msg); err != nil {
resp := JSONRPCMessage{
JSONRPC: "2.0",
Error: &JSONRPCError{
Code: -32700,
Message: "parse error: " + err.Error(),
},
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(resp)
return
}
if t.onMessage != nil {
t.onMessage(session, msg)
}
w.WriteHeader(http.StatusAccepted)
}
The POST handler's responsibility is receiving and routing — it does not return JSON-RPC responses directly. Responses are pushed asynchronously through the SSE channel. This is the fundamental design of MCP's SSE Transport dual-channel architecture: the client sends a POST and receives 202 Accepted, then reads the actual response from the SSE stream.
If you encounter JSON parsing issues during debugging, use the JSON Formatter tool to validate that the request body conforms to the JSON-RPC 2.0 specification.
Integrating the MCP Server
Assemble the SSE Transport with MCP protocol logic. The following code registers a sample Tool and handles the three core methods — initialize, tools/list, and tools/call:
func main() {
transport := NewSSETransport("", 30*time.Second)
transport.SetMessageHandler(func(session *Session, msg JSONRPCMessage) {
var response JSONRPCMessage
response.JSONRPC = "2.0"
response.ID = msg.ID
switch msg.Method {
case "initialize":
result, _ := json.Marshal(map[string]any{
"protocolVersion": "2024-11-05",
"capabilities": map[string]any{
"tools": map[string]any{},
},
"serverInfo": map[string]any{
"name": "go-mcp-server",
"version": "1.0.0",
},
})
response.Result = result
case "notifications/initialized":
return
case "tools/list":
result, _ := json.Marshal(map[string]any{
"tools": []map[string]any{
{
"name": "get_timestamp",
"description": "Returns the current Unix timestamp",
"inputSchema": map[string]any{
"type": "object",
"properties": map[string]any{},
},
},
},
})
response.Result = result
case "tools/call":
var params struct {
Name string `json:"name"`
}
json.Unmarshal(msg.Params, ¶ms)
if params.Name == "get_timestamp" {
content, _ := json.Marshal(map[string]any{
"content": []map[string]any{
{
"type": "text",
"text": fmt.Sprintf("Current timestamp: %d", time.Now().Unix()),
},
},
})
response.Result = content
} else {
response.Error = &JSONRPCError{
Code: -32601,
Message: "tool not found: " + params.Name,
}
}
default:
response.Error = &JSONRPCError{
Code: -32601,
Message: "method not found: " + msg.Method,
}
}
if err := transport.SendToSession(session.ID, response); err != nil {
log.Printf("failed to send response: %v", err)
}
})
mux := http.NewServeMux()
mux.HandleFunc("/sse", transport.HandleSSE)
mux.HandleFunc("/message", transport.HandleMessage)
server := &http.Server{
Addr: ":8080",
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 0,
IdleTimeout: 120 * time.Second,
}
log.Println("MCP SSE Server listening on :8080")
log.Println(" SSE endpoint: GET /sse")
log.Println(" Message endpoint: POST /message?sessionId=<id>")
if err := server.ListenAndServe(); err != nil {
log.Fatal(err)
}
}
WriteTimeout: 0 is a critical setting. SSE connections are long-lived — if WriteTimeout is set, the connection gets forcibly closed after the timeout. Setting it to 0 disables write timeout, letting heartbeats and client disconnection control the connection lifecycle.
After starting the server, verify with curl:
# Terminal 1: Establish SSE connection
curl -N http://localhost:8080/sse
# Output:
# event: endpoint
# data: /message?sessionId=550e8400-e29b-41d4-a716-446655440000
# Terminal 2: Send initialize request
curl -X POST "http://localhost:8080/message?sessionId=550e8400-e29b-41d4-a716-446655440000" \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{}}}'
# Terminal 1's SSE stream receives:
# event: message
# data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"serverInfo":{"name":"go-mcp-server","version":"1.0.0"}}}
When developing MCP Servers, you frequently need to convert JSON response structures into Go structs. The JSON to Go online converter automatically generates Go struct definitions with proper tags from any JSON input, dramatically boosting development velocity.
Production Hardening
Moving from demo to production deployment requires attention to four dimensions:
Graceful Shutdown
Capture system signals and proactively close all SSE connections before exiting:
func (t *SSETransport) Shutdown(ctx context.Context) {
t.mu.Lock()
sessions := make([]*Session, 0, len(t.sessions))
for _, s := range t.sessions {
sessions = append(sessions, s)
}
t.mu.Unlock()
for _, s := range sessions {
s.cancel()
select {
case <-s.done:
case <-ctx.Done():
return
}
}
}
Use this with signal.NotifyContext in your main function:
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
<-ctx.Done()
log.Println("shutting down...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
transport.Shutdown(shutdownCtx)
server.Shutdown(shutdownCtx)
Timeouts and Rate Limiting
| Setting | Recommended Value | Notes |
|---|---|---|
| ReadTimeout | 10s | Prevents slow-read attacks |
| WriteTimeout | 0 | Must be disabled for SSE long connections |
| IdleTimeout | 120s | Reclaims idle connections |
| Heartbeat interval | 30s | Below most proxies' 60s timeout |
| Message channel buffer | 64 | Burst message buffer; tune to your load |
| POST body limit | 1MB | Enforce with http.MaxBytesReader |
Concurrency Safety Checklist
- Protect the session map with
sync.RWMutex:RLockfor reads,Lockfor writes - Use
selectwith timeout for channel sends to prevent goroutines from blocking indefinitely - Propagate cancellation signals via
context.Contextto ensure timely resource cleanup - Avoid performing I/O operations (e.g., network calls) while holding a lock
Panic Recovery
Add panic recovery to the message handler to prevent a single request's panic from crashing the entire server:
func (t *SSETransport) safeHandleMessage(session *Session, msg JSONRPCMessage) {
defer func() {
if r := recover(); r != nil {
log.Printf("panic recovered in session %s: %v", session.ID, r)
errResp := JSONRPCMessage{
JSONRPC: "2.0",
ID: msg.ID,
Error: &JSONRPCError{
Code: -32603,
Message: "internal error",
},
}
transport.SendToSession(session.ID, errResp)
}
}()
t.onMessage(session, msg)
}
Further Reading
If you're new to the MCP protocol, these two foundational articles are recommended reading:
- MCP Protocol Deep Dive: A New Paradigm for AI Applications — Understand MCP's three primitives (Tools/Resources/Prompts) and JSON-RPC communication from the ground up
- Advanced MCP Practice: Building Enterprise-Grade Streaming Servers with Authentication — Integrate JWT authentication, streaming data transfer, and multi-tenant isolation in Node.js
FAQ
Q: SSE connections keep getting dropped by Nginx/CloudFlare. What should I do?
A: Ensure you set the X-Accel-Buffering: no response header (for Nginx) and use an appropriate heartbeat interval. CloudFlare has a default 100-second timeout, so your heartbeat interval must be shorter. Also configure proxy_read_timeout 86400s and proxy_buffering off in your Nginx config.
Q: How do I monitor the SSE Transport's runtime state?
A: Expose a /healthz endpoint that returns active session count, message throughput, and goroutine count (runtime.NumGoroutine()). Combine with Prometheus gauge and counter metrics for fine-grained observability.
Q: How many concurrent SSE connections can a single machine handle?
A: With Go's goroutine model, a 4-core 8GB server can typically sustain 50,000+ concurrent SSE connections. The bottleneck is usually the OS file descriptor limit (adjust with ulimit -n) and network bandwidth, not Go's concurrency capability itself.
Q: How does MCP SSE Transport compare to WebSocket Transport?
A: SSE is simpler — it runs over standard HTTP, works natively with all CDNs and proxies, and clients only need the EventSource API. WebSocket supports true bidirectional communication but requires a protocol upgrade handshake, and some enterprise firewalls may block it. The MCP protocol currently recommends SSE as the preferred HTTP transport.
Q: How do I add authentication to the SSE Transport?
A: Add middleware before HandleSSE and HandleMessage to validate the Bearer Token in the Authorization header. Authenticate once when the SSE connection is established, and validate on every subsequent POST request. After token validation, inject user information into the session context.
Summary
Go's core advantage for implementing MCP SSE Transport lies in three primitives: goroutines as connection containers, channels as message pipelines, and http.Flusher as the push engine. Together they form an elegant event stream processing model with zero third-party dependencies.
The ~300 lines of code in this article cover the complete SSE transport lifecycle: connection establishment → endpoint negotiation → message routing → heartbeat keep-alive → graceful shutdown. From this foundation, you can extend with authentication, rate limiting, observability, and other production features.
Throughout MCP Server development, you'll constantly work with JSON-RPC message structures and Go type definitions. Use the JSON to Go converter to rapidly generate structs, and the JSON Formatter to debug protocol messages — these two tools significantly improve the MCP development experience.