TL;DR: 当你的 AI 应用需要同时对接数十个 MCP Server 时,直接让每个 LLM Client 维护独立连接会导致连接爆炸、运维失控。MCP Gateway 作为统一接入层,承担协议转换、连接复用、负载均衡、安全管控和可观测性等职责。本文以 Go 语言为实现基础,从零构建一个支撑万级并发的生产级 MCP Gateway,并提供完整的架构图解和可运行代码。
Key Takeaways
- MCP Gateway 是多 MCP Server 场景下不可或缺的架构层,解决连接管理、安全边界和可观测性三大核心问题
- SSE 长连接池 + 心跳检测是 Gateway 稳定性的基石,Go 的 goroutine 模型天然适配高并发场景
- 基于 Tool 名称的智能路由结合一致性哈希,可以在不停机的情况下动态扩缩 MCP Server 实例
- 令牌桶 + 信号量双层限流构成背压机制,防止下游 MCP Server 被流量洪峰击穿
- 熔断器模式(Circuit Breaker)是生产环境必备的容错手段,需要结合重试策略和优雅降级
为什么需要 MCP Gateway
在早期的 MCP 架构中,每个 AI 应用(Claude Desktop、Cursor、自定义 Agent)直接与 MCP Server 建立一对一连接。当系统规模扩大时,这种模式暴露出四个致命问题:
连接管理失控:假设有 N 个 Client 和 M 个 MCP Server,全连接模式需要维护 N×M 条 SSE 长连接。当 N=100、M=20 时,系统中同时存在 2000 条长连接,每条连接都消耗服务端的文件描述符和内存。
安全边界模糊:每个 MCP Server 都需要独立实现认证鉴权逻辑(如 JWT 校验),导致安全策略分散、难以统一管控。一旦某个 Server 的鉴权实现存在漏洞,整个系统的安全防线就会被突破。
可观测性盲区:分散的 MCP Server 各自输出日志和指标,缺乏统一的请求链路追踪。当 Tool 调用出现延迟或错误时,排查问题需要登录多台服务器交叉比对。
协议适配碎片化:不同版本的 MCP Server 可能支持不同的 Transport(stdio、SSE、Streamable HTTP),Client 需要为每种 Transport 编写适配代码。
MCP Gateway 通过在 Client 和 Server 之间引入统一的代理层,将上述 N×M 的复杂度收敛为 N×1 + 1×M,同时集中处理安全、监控和协议转换。
MCP Gateway 核心架构
下面的架构图展示了 MCP Gateway 的整体设计。Client 统一通过 Gateway 接入,Gateway 内部按功能划分为接入层、路由层、连接池层和可观测层:
一个完整的请求流经 Gateway 的时序如下:
JSON-RPC tools/call GW->>Auth: Validate JWT Token Auth-->>GW: User Context GW->>Router: Route by tool name Router->>CB: Check circuit state alt Circuit Open CB-->>GW: Reject |"fallback response"| GW-->>Client: Error |"service unavailable"| else Circuit Closed/Half-Open CB->>Pool: Acquire connection Pool->>Server: Forward JSON-RPC Server-->>Pool: Tool result Pool-->>CB: Report success CB-->>GW: Response GW-->>Client: SSE event |"tool result"| end
连接管理与复用
MCP Gateway 的核心挑战在于管理与下游 MCP Server 之间的 SSE 长连接。每条 SSE 连接都是有状态的——它绑定了特定的 Session 和 Capabilities 协商结果。直接为每个 Client 请求创建新连接会导致严重的资源浪费。
连接池的设计目标是:复用已建立的 SSE 连接,同时保证连接的健康状态。以下是基于 Go 实现的连接池核心结构:
package gateway
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
type ConnState int
const (
ConnIdle ConnState = iota
ConnActive
ConnDraining
)
type SSEConn struct {
ID string
ServerURL string
State ConnState
CreatedAt time.Time
LastUsed time.Time
mu sync.Mutex
client *http.Client
eventCh chan []byte
closeCh chan struct{}
}
type ConnPool struct {
mu sync.RWMutex
conns map[string][]*SSEConn // serverURL -> connections
maxPerHost int
idleTimeout time.Duration
maxLifetime time.Duration
}
func NewConnPool(maxPerHost int, idleTimeout, maxLifetime time.Duration) *ConnPool {
pool := &ConnPool{
conns: make(map[string][]*SSEConn),
maxPerHost: maxPerHost,
idleTimeout: idleTimeout,
maxLifetime: maxLifetime,
}
go pool.evictLoop()
return pool
}
func (p *ConnPool) Acquire(ctx context.Context, serverURL string) (*SSEConn, error) {
p.mu.Lock()
defer p.mu.Unlock()
conns := p.conns[serverURL]
for _, conn := range conns {
conn.mu.Lock()
if conn.State == ConnIdle && time.Since(conn.CreatedAt) < p.maxLifetime {
conn.State = ConnActive
conn.LastUsed = time.Now()
conn.mu.Unlock()
return conn, nil
}
conn.mu.Unlock()
}
if len(conns) >= p.maxPerHost {
return nil, fmt.Errorf("connection pool exhausted for %s", serverURL)
}
conn, err := p.dial(ctx, serverURL)
if err != nil {
return nil, err
}
conn.State = ConnActive
p.conns[serverURL] = append(p.conns[serverURL], conn)
return conn, nil
}
func (p *ConnPool) Release(conn *SSEConn) {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.State = ConnIdle
conn.LastUsed = time.Now()
}
func (p *ConnPool) evictLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
p.mu.Lock()
for url, conns := range p.conns {
alive := conns[:0]
for _, conn := range conns {
conn.mu.Lock()
expired := conn.State == ConnIdle &&
(time.Since(conn.LastUsed) > p.idleTimeout ||
time.Since(conn.CreatedAt) > p.maxLifetime)
if expired {
close(conn.closeCh)
conn.mu.Unlock()
continue
}
conn.mu.Unlock()
alive = append(alive, conn)
}
p.conns[url] = alive
}
p.mu.Unlock()
}
}
func (p *ConnPool) dial(ctx context.Context, serverURL string) (*SSEConn, error) {
conn := &SSEConn{
ID: fmt.Sprintf("conn-%d", time.Now().UnixNano()),
ServerURL: serverURL,
CreatedAt: time.Now(),
LastUsed: time.Now(),
client: &http.Client{Timeout: 0},
eventCh: make(chan []byte, 256),
closeCh: make(chan struct{}),
}
req, err := http.NewRequestWithContext(ctx, "GET", serverURL+"/sse", nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "text/event-stream")
go conn.readLoop(req)
return conn, nil
}
func (c *SSEConn) readLoop(req *http.Request) {
resp, err := c.client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
buf := make([]byte, 4096)
for {
select {
case <-c.closeCh:
return
default:
n, err := resp.Body.Read(buf)
if err != nil {
return
}
if n > 0 {
data := make([]byte, n)
copy(data, buf[:n])
c.eventCh <- data
}
}
}
}
连接池通过 evictLoop 每 30 秒扫描一次,清除超过空闲超时或生命周期上限的连接。maxPerHost 限制了对单个 MCP Server 的最大连接数,防止某个慢 Server 耗尽连接池资源。
请求路由与负载均衡
MCP Gateway 的路由层需要根据 tools/call 请求中的 Tool 名称,将请求分发到注册了该 Tool 的 MCP Server。这要求 Gateway 维护一张 Tool → Server 的映射表,并在 Server 上下线时动态更新。
路由策略分为三层:
- Tool 名称匹配:精确匹配请求的 Tool 名称到目标 Server 集群
- 负载均衡选择:在目标集群内选择一个具体实例
- 健康检查过滤:跳过处于熔断状态的实例
package gateway
import (
"hash/crc32"
"sort"
"sync"
)
type ServerInfo struct {
URL string
Weight int
Tools []string
Healthy bool
}
type ToolRouter struct {
mu sync.RWMutex
toolMap map[string][]*ServerInfo // toolName -> servers
hashRing *ConsistentHash
}
type ConsistentHash struct {
ring map[uint32]*ServerInfo
keys []uint32
replicas int
}
func NewConsistentHash(replicas int) *ConsistentHash {
return &ConsistentHash{
ring: make(map[uint32]*ServerInfo),
replicas: replicas,
}
}
func (ch *ConsistentHash) Add(server *ServerInfo) {
for i := 0; i < ch.replicas; i++ {
key := crc32.ChecksumIEEE([]byte(fmt.Sprintf("%s-%d", server.URL, i)))
ch.ring[key] = server
ch.keys = append(ch.keys, key)
}
sort.Slice(ch.keys, func(i, j int) bool { return ch.keys[i] < ch.keys[j] })
}
func (ch *ConsistentHash) Get(key string) *ServerInfo {
if len(ch.keys) == 0 {
return nil
}
hash := crc32.ChecksumIEEE([]byte(key))
idx := sort.Search(len(ch.keys), func(i int) bool { return ch.keys[i] >= hash })
if idx >= len(ch.keys) {
idx = 0
}
return ch.ring[ch.keys[idx]]
}
func NewToolRouter() *ToolRouter {
return &ToolRouter{
toolMap: make(map[string][]*ServerInfo),
hashRing: NewConsistentHash(150),
}
}
func (r *ToolRouter) Register(server *ServerInfo) {
r.mu.Lock()
defer r.mu.Unlock()
for _, tool := range server.Tools {
r.toolMap[tool] = append(r.toolMap[tool], server)
}
r.hashRing.Add(server)
}
func (r *ToolRouter) Route(toolName, sessionID string) (*ServerInfo, error) {
r.mu.RLock()
defer r.mu.RUnlock()
servers, ok := r.toolMap[toolName]
if !ok || len(servers) == 0 {
return nil, fmt.Errorf("no server registered for tool: %s", toolName)
}
healthy := make([]*ServerInfo, 0, len(servers))
for _, s := range servers {
if s.Healthy {
healthy = append(healthy, s)
}
}
if len(healthy) == 0 {
return nil, fmt.Errorf("all servers for tool %s are unhealthy", toolName)
}
if len(healthy) == 1 {
return healthy[0], nil
}
target := r.hashRing.Get(sessionID + ":" + toolName)
if target != nil && target.Healthy {
return target, nil
}
return healthy[0], nil
}
路由使用一致性哈希的关键优势:当某个 MCP Server 下线时,只有它负责的那部分请求会被重新分配,其余 Session 的路由保持不变。这对于有状态的 MCP 交互(如多轮 Tool 调用依赖上下文)尤为重要。
并发控制与背压机制
高并发场景下,如果不对请求速率进行控制,下游 MCP Server 极易被流量洪峰击垮。Gateway 需要实现两层防护:信号量控制并发数 + 令牌桶控制请求速率。
package gateway
import (
"context"
"sync"
"time"
)
type RateLimiter struct {
tokens chan struct{}
maxTokens int
refillRate time.Duration
stopCh chan struct{}
}
func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, maxTokens),
maxTokens: maxTokens,
refillRate: refillRate,
stopCh: make(chan struct{}),
}
for i := 0; i < maxTokens; i++ {
rl.tokens <- struct{}{}
}
go rl.refill()
return rl
}
func (rl *RateLimiter) refill() {
ticker := time.NewTicker(rl.refillRate)
defer ticker.Stop()
for {
select {
case <-rl.stopCh:
return
case <-ticker.C:
select {
case rl.tokens <- struct{}{}:
default:
}
}
}
}
func (rl *RateLimiter) Allow(ctx context.Context) bool {
select {
case <-rl.tokens:
return true
case <-ctx.Done():
return false
}
}
type BackpressureController struct {
semaphore chan struct{}
rateLimiter *RateLimiter
queueSize int64
mu sync.Mutex
metrics *BackpressureMetrics
}
type BackpressureMetrics struct {
Accepted int64
Rejected int64
Queued int64
}
func NewBackpressureController(maxConcurrent, maxRPS int) *BackpressureController {
return &BackpressureController{
semaphore: make(chan struct{}, maxConcurrent),
rateLimiter: NewRateLimiter(maxRPS, time.Second/time.Duration(maxRPS)),
metrics: &BackpressureMetrics{},
}
}
func (bp *BackpressureController) Execute(
ctx context.Context,
fn func(context.Context) (any, error),
) (any, error) {
if !bp.rateLimiter.Allow(ctx) {
bp.mu.Lock()
bp.metrics.Rejected++
bp.mu.Unlock()
return nil, fmt.Errorf("rate limit exceeded")
}
select {
case bp.semaphore <- struct{}{}:
defer func() { <-bp.semaphore }()
case <-ctx.Done():
bp.mu.Lock()
bp.metrics.Rejected++
bp.mu.Unlock()
return nil, ctx.Err()
}
bp.mu.Lock()
bp.metrics.Accepted++
bp.mu.Unlock()
return fn(ctx)
}
令牌桶通过 Go channel 实现,refill goroutine 按照配置的速率向 channel 补充令牌。信号量同样用带缓冲的 channel 实现,maxConcurrent 控制同时执行的最大请求数。两层防护确保即使前端流量突增,下游 MCP Server 也不会被压垮。
分布式 Session 管理
单机 Gateway 的 Session 直接存储在内存中。当 Gateway 水平扩展到多节点时,需要引入 Redis 作为共享 Session Store,确保同一客户端的后续请求能正确关联到已建立的 MCP Session 上下文。
Session 管理的核心挑战在于 SSE 连接的有状态性。一个 Client 的 SSE 连接固定在 Gateway 节点 A 上,但后续的 HTTP POST(/message)可能被负载均衡器分发到节点 B。解决方案是 Session 亲和性 + Redis 状态同步:
package gateway
import (
"context"
"encoding/json"
"fmt"
"time"
)
type SessionData struct {
SessionID string `json:"session_id"`
UserID string `json:"user_id"`
GatewayNode string `json:"gateway_node"`
ServerURL string `json:"server_url"`
Capabilities map[string]any `json:"capabilities"`
Metadata map[string]string `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
LastActiveAt time.Time `json:"last_active_at"`
TTL time.Duration `json:"ttl"`
}
type RedisSessionStore struct {
client RedisClient
keyPrefix string
defaultTTL time.Duration
}
type RedisClient interface {
Get(ctx context.Context, key string) (string, error)
Set(ctx context.Context, key string, value any, ttl time.Duration) error
Del(ctx context.Context, keys ...string) error
Publish(ctx context.Context, channel string, message any) error
}
func NewRedisSessionStore(client RedisClient, prefix string, ttl time.Duration) *RedisSessionStore {
return &RedisSessionStore{
client: client,
keyPrefix: prefix,
defaultTTL: ttl,
}
}
func (s *RedisSessionStore) Save(ctx context.Context, session *SessionData) error {
key := fmt.Sprintf("%s:session:%s", s.keyPrefix, session.SessionID)
data, err := json.Marshal(session)
if err != nil {
return fmt.Errorf("marshal session: %w", err)
}
ttl := session.TTL
if ttl == 0 {
ttl = s.defaultTTL
}
return s.client.Set(ctx, key, data, ttl)
}
func (s *RedisSessionStore) Load(ctx context.Context, sessionID string) (*SessionData, error) {
key := fmt.Sprintf("%s:session:%s", s.keyPrefix, sessionID)
val, err := s.client.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("load session %s: %w", sessionID, err)
}
var session SessionData
if err := json.Unmarshal([]byte(val), &session); err != nil {
return nil, fmt.Errorf("unmarshal session: %w", err)
}
return &session, nil
}
func (s *RedisSessionStore) UpdateActivity(ctx context.Context, sessionID string) error {
session, err := s.Load(ctx, sessionID)
if err != nil {
return err
}
session.LastActiveAt = time.Now()
return s.Save(ctx, session)
}
func (s *RedisSessionStore) NotifyNode(ctx context.Context, targetNode, sessionID, message string) error {
channel := fmt.Sprintf("%s:node:%s", s.keyPrefix, targetNode)
payload := map[string]string{
"session_id": sessionID,
"message": message,
}
data, _ := json.Marshal(payload)
return s.client.Publish(ctx, channel, data)
}
当节点 B 收到一个 POST 请求时,它从 Redis 加载 Session 数据,找到 SSE 连接所在的节点 A,然后通过 Redis Pub/Sub 将消息转发给节点 A。节点 A 再通过已有的 SSE 连接将响应推送给 Client。
可观测性与监控
在调试 MCP 请求时,经常需要检查 JSON-RPC 消息的格式是否正确——可以使用 JSON 格式化工具 快速验证消息结构。生产级 Gateway 必须具备完善的可观测性体系,覆盖指标(Metrics)、日志(Logging)和链路追踪(Tracing)三个维度。
核心指标采集方面,Gateway 应暴露以下 Prometheus 指标:
| 指标名称 | 类型 | 说明 |
|---|---|---|
mcp_gateway_active_connections |
Gauge | 当前活跃 SSE 连接数 |
mcp_gateway_requests_total |
Counter | JSON-RPC 请求总量(按 method 分标签) |
mcp_gateway_request_duration_seconds |
Histogram | 请求延迟分布 |
mcp_gateway_tool_calls_total |
Counter | Tool 调用次数(按 tool_name 分标签) |
mcp_gateway_circuit_breaker_state |
Gauge | 熔断器状态(0=closed, 1=half-open, 2=open) |
mcp_gateway_connection_pool_size |
Gauge | 连接池大小(按 server_url 分标签) |
mcp_gateway_backpressure_rejected_total |
Counter | 被限流拒绝的请求数 |
链路追踪使用 OpenTelemetry,在 Gateway 入口为每个请求生成 Trace ID,并在转发到下游 MCP Server 时传播上下文。这确保了从 Client 发起 tools/call 到 MCP Server 执行 Tool 再到返回结果的完整链路可追溯。
审计日志应记录每次 Tool 调用的关键信息:调用方身份(从 JWT 中提取)、目标 Tool 名称、参数摘要(脱敏处理)、响应时间和状态码。这些数据对于安全审计和成本计量都至关重要。
生产级容错设计
MCP Server 可能因部署更新、资源耗尽或网络分区而暂时不可用。Gateway 必须实现熔断器模式来隔离故障,避免级联失败。
package gateway
import (
"fmt"
"sync"
"time"
)
type CircuitState int
const (
StateClosed CircuitState = iota // 正常放行
StateOpen // 熔断,拒绝请求
StateHalfOpen // 半开,试探性放行
)
type CircuitBreaker struct {
mu sync.Mutex
state CircuitState
failureCount int
successCount int
failureThreshold int
successThreshold int
timeout time.Duration
lastFailureTime time.Time
onStateChange func(from, to CircuitState)
}
type CircuitBreakerConfig struct {
FailureThreshold int
SuccessThreshold int
Timeout time.Duration
OnStateChange func(from, to CircuitState)
}
func NewCircuitBreaker(cfg CircuitBreakerConfig) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: cfg.FailureThreshold,
successThreshold: cfg.SuccessThreshold,
timeout: cfg.Timeout,
onStateChange: cfg.OnStateChange,
}
}
func (cb *CircuitBreaker) Allow() (bool, error) {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
return true, nil
case StateOpen:
if time.Since(cb.lastFailureTime) > cb.timeout {
cb.transitionTo(StateHalfOpen)
return true, nil
}
return false, fmt.Errorf("circuit breaker is open")
case StateHalfOpen:
return true, nil
}
return false, fmt.Errorf("unknown circuit state")
}
func (cb *CircuitBreaker) RecordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
cb.failureCount = 0
case StateHalfOpen:
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.transitionTo(StateClosed)
}
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.lastFailureTime = time.Now()
switch cb.state {
case StateClosed:
cb.failureCount++
if cb.failureCount >= cb.failureThreshold {
cb.transitionTo(StateOpen)
}
case StateHalfOpen:
cb.transitionTo(StateOpen)
}
}
func (cb *CircuitBreaker) transitionTo(newState CircuitState) {
oldState := cb.state
cb.state = newState
cb.failureCount = 0
cb.successCount = 0
if cb.onStateChange != nil {
cb.onStateChange(oldState, newState)
}
}
func (cb *CircuitBreaker) State() CircuitState {
cb.mu.Lock()
defer cb.mu.Unlock()
return cb.state
}
熔断器的状态机包含三个状态:Closed(正常放行,统计失败次数)→ Open(连续失败超过阈值,直接拒绝所有请求)→ Half-Open(超时后试探性放行少量请求,成功则恢复,失败则重新熔断)。
重试策略方面,推荐采用指数退避 + 抖动(Exponential Backoff with Jitter),避免多个 Gateway 节点在同一时刻对恢复中的 MCP Server 发起重试风暴。优雅降级则根据业务场景决定:对于非关键的 Tool 调用(如日志查询),返回缓存的历史结果;对于关键调用(如数据写入),直接返回错误并通知上层 Agent 切换到备用 Tool。
FAQ
Q: MCP Gateway 和 API Gateway 有什么区别? A: MCP Gateway 专为 MCP 协议 设计,需要处理 SSE 长连接管理、JSON-RPC 消息路由、Tool/Resource 注册发现等 MCP 特有机制。传统 API Gateway 主要处理 HTTP 短连接的请求-响应模型,无法原生支持 MCP 的 SSE + HTTP POST 双通道传输。
Q: 单个 MCP Gateway 节点能支撑多少并发连接? A: 取决于实现语言和硬件配置。Go 实现的 MCP Gateway 在 4 核 8GB 服务器上通常能维持 10,000+ 活跃 SSE 连接,每秒处理 5,000+ JSON-RPC 消息。性能瓶颈通常在下游 MCP Server 的处理能力,而非 Gateway 本身。
Q: 如何解决 SSE 连接的水平扩展问题? A: SSE 是有状态的长连接,不能简单用轮询负载均衡。推荐使用 Session 亲和性(IP Hash 或 Cookie Hash)确保同一客户端的 SSE 连接和 POST 消息路由到同一 Gateway 节点。跨节点消息传递可以通过 Redis Pub/Sub 实现。
Q: Gateway 层会增加多少延迟? A: 在连接复用的情况下,Gateway 引入的额外延迟通常在 1-3ms 范围内(不含下游处理时间)。主要开销来自 JWT 验证、路由查表和指标采集。对于 Tool 调用通常需要数百毫秒到数秒的场景,Gateway 的延迟可以忽略不计。
Q: 如何平滑迁移现有的 MCP Server 到 Gateway 架构? A: 分三步走。第一步,在 Gateway 中注册现有 MCP Server,但不改变 Client 的连接方式。第二步,将部分 Client 通过 DNS 切换或配置变更切到 Gateway。第三步,验证稳定后全量切换,并下线 Client 直连 Server 的旧通道。整个过程中 MCP Server 无需任何改动。
总结
MCP Gateway 是企业级 AI 应用架构中不可缺少的基础设施层。通过本文介绍的连接池、智能路由、背压控制、分布式 Session 和熔断器机制,你可以构建一个支撑万级并发、具备生产级容错能力的 MCP 网关。
在实际开发中,调试 MCP 消息时可以使用 JSON 格式化工具 验证 JSON-RPC 消息结构,使用 JWT 生成工具 快速生成测试 Token 验证鉴权流程。
如果你对 MCP 协议本身还不够熟悉,建议先阅读 MCP 协议完全指南 了解基础概念,再通过 MCP 协议高阶实战 掌握企业级 Server 的构建方法,然后再回到本文研究 Gateway 层的分布式架构设计。