核心摘要
当企业同时使用 GPT-4、Claude、Gemini 和开源模型时,每个团队直接调用不同供应商 API 会导致成本失控、缺乏统一监控、无法灵活切换模型。LLM Gateway 将传统 API 网关模式应用于大模型场景,通过统一入口实现多模型智能路由、基于 Token 的精细限流、实时成本追踪和自动降级容错。本文从架构设计到生产级代码实现,构建一个完整的 LLM Gateway 方案。
本文是 AI 架构师课程 系列的第 18 篇,建议结合 Agent 可观测性工程 一起阅读。
目录
核心要点
- 统一入口解决 API 碎片化:业务代码面向 Gateway 抽象编程,底层模型供应商可热切换,消除供应商锁定
- Token 维度限流是 LLM 场景的必需品:传统请求级限流无法准确控制成本,一个长 Prompt 请求可能消耗 100 个短请求的 Token
- 智能路由降本 30-60%:基于任务复杂度自动选择最具性价比的模型,简单任务用小模型、复杂任务才用大模型
- 语义缓存是最被低估的优化手段:高重复度场景下缓存命中率可达 30-50%,直接节省对应比例的模型调用费用
- 降级链路保障 99.9% 可用性:主模型 → 备用模型 → 本地小模型的三级降级策略,确保业务永不中断
- 成本归因到团队/项目级别:像云账单一样精确追踪每个团队、每个项目的模型消耗,支撑内部计费和预算管控
为什么需要 LLM Gateway
多模型时代的管理困境
现代 AI 团队面临的典型场景:
| 问题 | 表现 | 影响 |
|---|---|---|
| API 碎片化 | 10+ 个团队各自对接不同模型供应商 | 重复对接工作、无法统一监控 |
| 成本黑洞 | 无法精确追踪每个项目的模型消耗 | 月底账单超预期 3-5 倍 |
| 供应商锁定 | 业务代码深度绑定特定模型 API | 切换模型需要重写调用层 |
| 限流不一致 | 各团队各自管理 API Key 和配额 | 某个团队超用导致全部被限流 |
| 缺乏容错 | 模型服务宕机直接影响业务 | 无降级策略,单点故障 |
从传统 API Gateway 到 LLM Gateway
传统的 API Gateway(Kong、Envoy、Nginx)已经解决了微服务间的路由、限流和认证问题。LLM Gateway 将这些成熟模式适配到大模型场景,同时增加了 LLM 特有的能力:
LLM Gateway 整体架构
核心组件
请求生命周期
一个完整的 LLM 请求在 Gateway 中经历以下阶段:
基础 Gateway 骨架实现(Python)
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
import time
import hashlib
app = FastAPI()
class ChatRequest(BaseModel):
model: str
messages: list[dict]
temperature: float = 0.7
max_tokens: Optional[int] = None
metadata: Optional[dict] = None # team_id, project_id
class GatewayConfig:
def __init__(self):
self.route_rules: dict = {}
self.rate_limits: dict = {}
self.fallback_chains: dict = {}
self.cost_config: dict = {}
class LLMGateway:
def __init__(self, config: GatewayConfig):
self.config = config
self.router = ModelRouter(config.route_rules)
self.rate_limiter = TokenRateLimiter(config.rate_limits)
self.cache = SemanticCache()
self.cost_tracker = CostTracker(config.cost_config)
self.fallback_manager = FallbackManager(config.fallback_chains)
async def process_request(self, request: ChatRequest, team_id: str) -> dict:
# 1. 限流检查
if not await self.rate_limiter.allow(team_id, request):
raise HTTPException(429, "Token quota exceeded")
# 2. 语义缓存查询
cached = await self.cache.get(request.messages)
if cached:
return cached
# 3. 路由决策
target_model = await self.router.route(request)
# 4. 带降级的模型调用
response = await self.fallback_manager.call_with_fallback(
target_model, request
)
# 5. 异步任务:缓存写入 + 成本记录
await self.cache.set(request.messages, response)
await self.cost_tracker.record(team_id, target_model, response)
return response
模型路由策略
基于任务复杂度的路由
最有效的降本策略:让简单任务使用便宜的小模型,只有复杂任务才路由到昂贵的大模型。
from enum import Enum
import tiktoken
class TaskComplexity(Enum):
SIMPLE = "simple" # 格式转换、简单问答
MODERATE = "moderate" # 摘要、翻译、一般推理
COMPLEX = "complex" # 代码生成、数学推理、长文分析
class ContentBasedRouter:
"""基于内容复杂度的智能路由"""
COMPLEXITY_SIGNALS = {
"code_keywords": ["implement", "debug", "refactor", "algorithm"],
"reasoning_keywords": ["analyze", "compare", "evaluate", "prove"],
"simple_keywords": ["format", "convert", "translate", "summarize"],
}
MODEL_MAP = {
TaskComplexity.SIMPLE: "gpt-4o-mini",
TaskComplexity.MODERATE: "claude-3-5-haiku",
TaskComplexity.COMPLEX: "claude-sonnet-4",
}
def estimate_complexity(self, messages: list[dict]) -> TaskComplexity:
last_message = messages[-1]["content"].lower()
token_count = len(tiktoken.encoding_for_model("gpt-4").encode(last_message))
# 长 Prompt 通常是复杂任务
if token_count > 2000:
return TaskComplexity.COMPLEX
# 关键词匹配
for keyword in self.COMPLEXITY_SIGNALS["code_keywords"]:
if keyword in last_message:
return TaskComplexity.COMPLEX
for keyword in self.COMPLEXITY_SIGNALS["reasoning_keywords"]:
if keyword in last_message:
return TaskComplexity.MODERATE
return TaskComplexity.SIMPLE
async def route(self, request: "ChatRequest") -> str:
complexity = self.estimate_complexity(request.messages)
base_model = self.MODEL_MAP[complexity]
# 如果用户显式指定了模型,尊重用户选择
if request.model and request.model != "auto":
return request.model
return base_model
基于预算的路由
// TypeScript: 预算感知路由器
interface BudgetConfig {
teamId: string;
monthlyBudget: number; // 月预算(美元)
currentSpend: number; // 当月已消耗
alertThreshold: number; // 告警阈值(0.8 = 80%)
hardLimitThreshold: number; // 硬限制阈值(0.95 = 95%)
}
interface ModelPricing {
model: string;
inputCostPer1K: number; // 每 1K input token 成本
outputCostPer1K: number; // 每 1K output token 成本
}
class BudgetAwareRouter {
private pricing: Map<string, ModelPricing> = new Map([
['gpt-4o', { model: 'gpt-4o', inputCostPer1K: 0.0025, outputCostPer1K: 0.01 }],
['gpt-4o-mini', { model: 'gpt-4o-mini', inputCostPer1K: 0.00015, outputCostPer1K: 0.0006 }],
['claude-sonnet-4', { model: 'claude-sonnet-4', inputCostPer1K: 0.003, outputCostPer1K: 0.015 }],
['claude-3-5-haiku', { model: 'claude-3-5-haiku', inputCostPer1K: 0.0008, outputCostPer1K: 0.004 }],
]);
async route(request: ChatRequest, budget: BudgetConfig): Promise<string> {
const spendRatio = budget.currentSpend / budget.monthlyBudget;
// 超过硬限制,强制使用最便宜的模型
if (spendRatio >= budget.hardLimitThreshold) {
console.warn(`[Budget] Team ${budget.teamId} hit hard limit, forcing cheapest model`);
return 'gpt-4o-mini';
}
// 超过告警阈值,降级到中等模型
if (spendRatio >= budget.alertThreshold) {
return this.selectMidTierModel(request);
}
// 预算充足,正常路由
return request.model || 'gpt-4o';
}
private selectMidTierModel(request: ChatRequest): string {
// 根据任务类型选择性价比最优的中等模型
const isCodeTask = request.messages.some(m =>
m.content.includes('```') || m.content.includes('code')
);
return isCodeTask ? 'claude-3-5-haiku' : 'gpt-4o-mini';
}
}
A/B 测试与灰度发布
import random
from dataclasses import dataclass
@dataclass
class CanaryRule:
model_a: str # 基准模型
model_b: str # 候选模型
traffic_percent: float # 候选模型流量比例 (0-1)
metric_key: str # 评估指标
class CanaryRouter:
"""模型灰度发布路由器"""
def __init__(self):
self.active_canaries: dict[str, CanaryRule] = {}
self.metrics_collector = MetricsCollector()
async def route(self, request: "ChatRequest", canary_id: str) -> str:
rule = self.active_canaries.get(canary_id)
if not rule:
return request.model
# 按流量比例分配
if random.random() < rule.traffic_percent:
selected = rule.model_b
variant = "canary"
else:
selected = rule.model_a
variant = "baseline"
# 记录分组信息,用于后续分析
self.metrics_collector.tag_request(
request_id=request.metadata.get("request_id"),
variant=variant,
model=selected
)
return selected
限流与配额管理
为什么请求级限流不够
| 维度 | 请求级限流 | Token 级限流 |
|---|---|---|
| 粒度 | 每分钟请求数(RPM) | 每分钟 Token 数(TPM) |
| 成本精度 | 低——一个 10K Token 请求和 100 Token 请求同等计数 | 高——精确反映实际资源消耗 |
| 适用场景 | 防止突发洪峰 | 控制实际成本 |
| 实现复杂度 | 简单 | 中等(需要 Token 计数) |
最佳实践:双层限流 = 请求级 + Token 级同时生效
Redis 实现的 Token 限流器
import redis.asyncio as redis
import time
import tiktoken
class TokenRateLimiter:
"""基于滑动窗口的 Token 限流器"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.encoder = tiktoken.encoding_for_model("gpt-4")
def count_tokens(self, messages: list[dict]) -> int:
total = 0
for msg in messages:
total += len(self.encoder.encode(msg.get("content", "")))
total += 4 # role + formatting overhead
return total
async def allow(self, team_id: str, request: "ChatRequest") -> bool:
"""检查是否允许请求通过"""
token_count = self.count_tokens(request.messages)
now = time.time()
window_key = f"ratelimit:token:{team_id}"
window_size = 60 # 1 分钟滑动窗口
pipe = self.redis.pipeline()
# 清理过期条目
pipe.zremrangebyscore(window_key, 0, now - window_size)
# 获取当前窗口内的 Token 总量
pipe.zrangebyscore(window_key, now - window_size, now)
results = await pipe.execute()
current_tokens = sum(int(score) for score in results[1]) if results[1] else 0
limit = await self.get_team_limit(team_id)
if current_tokens + token_count > limit:
return False
# 记录本次请求的 Token 数
await self.redis.zadd(window_key, {f"{now}:{token_count}": now})
await self.redis.expire(window_key, window_size + 10)
return True
async def get_team_limit(self, team_id: str) -> int:
"""获取团队的 TPM 限制"""
config = await self.redis.hget("team_limits", team_id)
return int(config) if config else 100_000 # 默认 100K TPM
async def record_actual_usage(self, team_id: str, actual_tokens: int):
"""流式响应完成后,校正实际 Token 消耗"""
now = time.time()
window_key = f"ratelimit:token:{team_id}"
await self.redis.zadd(window_key, {f"{now}:output:{actual_tokens}": now})
多级配额体系
// TypeScript: 层级化配额管理
interface QuotaConfig {
organization: {
monthlyTokens: number; // 组织级月配额
maxConcurrent: number; // 最大并发请求
};
team: {
dailyTokens: number; // 团队级日配额
rpm: number; // 每分钟请求数
tpm: number; // 每分钟 Token 数
};
user: {
hourlyTokens: number; // 用户级小时配额
rpm: number;
};
}
class HierarchicalQuotaManager {
private redis: Redis;
async checkQuota(
orgId: string,
teamId: string,
userId: string,
estimatedTokens: number
): Promise<{ allowed: boolean; reason?: string }> {
// 从最细粒度开始检查
const checks = [
this.checkUserQuota(userId, estimatedTokens),
this.checkTeamQuota(teamId, estimatedTokens),
this.checkOrgQuota(orgId, estimatedTokens),
];
const results = await Promise.all(checks);
for (const result of results) {
if (!result.allowed) {
return result;
}
}
return { allowed: true };
}
private async checkTeamQuota(
teamId: string,
tokens: number
): Promise<{ allowed: boolean; reason?: string }> {
const key = `quota:team:${teamId}:${this.getTodayKey()}`;
const current = await this.redis.get(key);
const used = parseInt(current || '0');
const limit = await this.getTeamDailyLimit(teamId);
if (used + tokens > limit) {
return {
allowed: false,
reason: `Team daily quota exceeded: ${used}/${limit} tokens`
};
}
return { allowed: true };
}
private getTodayKey(): string {
return new Date().toISOString().split('T')[0];
}
}
成本管控与分析
实时成本追踪
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import asyncio
@dataclass
class UsageRecord:
timestamp: datetime
team_id: str
project_id: str
model: str
input_tokens: int
output_tokens: int
cost_usd: float
latency_ms: int
cache_hit: bool = False
request_id: str = ""
class CostTracker:
"""实时成本追踪与预算管控"""
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00}, # per 1M tokens
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00},
"gemini-2.5-pro": {"input": 1.25, "output": 10.00},
}
def __init__(self):
self.records: list[UsageRecord] = []
self.budget_alerts: dict[str, float] = {}
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
pricing = self.PRICING.get(model)
if not pricing:
return 0.0
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return round(input_cost + output_cost, 6)
async def record(
self,
team_id: str,
project_id: str,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: int,
cache_hit: bool = False,
):
cost = 0.0 if cache_hit else self.calculate_cost(model, input_tokens, output_tokens)
record = UsageRecord(
timestamp=datetime.utcnow(),
team_id=team_id,
project_id=project_id,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
latency_ms=latency_ms,
cache_hit=cache_hit,
)
self.records.append(record)
# 检查预算告警
await self._check_budget_alert(team_id, cost)
async def _check_budget_alert(self, team_id: str, new_cost: float):
monthly_spend = await self.get_monthly_spend(team_id)
budget = self.budget_alerts.get(team_id, float("inf"))
if monthly_spend / budget > 0.8:
await self._send_alert(team_id, monthly_spend, budget)
async def get_monthly_spend(self, team_id: str) -> float:
# 实际生产中从时序数据库查询
return sum(
r.cost_usd for r in self.records
if r.team_id == team_id
and r.timestamp.month == datetime.utcnow().month
)
成本仪表盘指标
| 指标 | 计算方式 | 告警阈值 |
|---|---|---|
| 每请求平均成本 | 月总成本 / 请求数 | 环比增长 > 20% |
| 模型成本占比 | 各模型消耗 / 总消耗 | 单模型占比 > 60% |
| 缓存节省金额 | 缓存命中数 × 平均请求成本 | 命中率 < 10% 告警 |
| 团队预算消耗率 | 当月消耗 / 月预算 | > 80% 预警,> 95% 限流 |
| Token 效率 | 有效输出 Token / 总 Token | < 50% 说明 Prompt 过长 |
高级特性
语义缓存
传统缓存按精确字符串匹配,语义缓存基于向量相似度——"北京今天天气怎么样"和"今天北京什么天气"可以命中同一条缓存。
import numpy as np
from typing import Optional
class SemanticCache:
"""基于向量相似度的语义缓存"""
def __init__(self, similarity_threshold: float = 0.93):
self.threshold = similarity_threshold
self.embeddings: list[np.ndarray] = []
self.responses: list[dict] = []
self.ttl_seconds = 3600 # 1 小时过期
async def get_embedding(self, text: str) -> np.ndarray:
# 使用轻量级 embedding 模型
# 生产中推荐 text-embedding-3-small(成本低、速度快)
response = await embedding_client.create(
model="text-embedding-3-small",
input=text
)
return np.array(response.data[0].embedding)
async def get(self, messages: list[dict]) -> Optional[dict]:
query_text = messages[-1]["content"]
query_embedding = await self.get_embedding(query_text)
if not self.embeddings:
return None
# 计算余弦相似度
similarities = np.dot(self.embeddings, query_embedding) / (
np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(query_embedding)
)
max_idx = np.argmax(similarities)
max_sim = similarities[max_idx]
if max_sim >= self.threshold:
return self.responses[max_idx]
return None
async def set(self, messages: list[dict], response: dict):
query_text = messages[-1]["content"]
embedding = await self.get_embedding(query_text)
self.embeddings.append(embedding)
self.responses.append(response)
自动降级链路
import asyncio
from dataclasses import dataclass
@dataclass
class FallbackChain:
primary: str
secondary: str
tertiary: str
timeout_ms: int = 30000
class FallbackManager:
"""三级降级管理器"""
DEFAULT_CHAINS = {
"gpt-4o": FallbackChain("gpt-4o", "claude-sonnet-4", "gpt-4o-mini"),
"claude-sonnet-4": FallbackChain("claude-sonnet-4", "gpt-4o", "claude-3-5-haiku"),
}
async def call_with_fallback(self, model: str, request: "ChatRequest") -> dict:
chain = self.DEFAULT_CHAINS.get(model, FallbackChain(model, "gpt-4o-mini", "gpt-4o-mini"))
for i, target in enumerate([chain.primary, chain.secondary, chain.tertiary]):
try:
response = await asyncio.wait_for(
self._call_model(target, request),
timeout=chain.timeout_ms / 1000
)
if i > 0:
# 记录降级事件
await self._record_fallback(model, target, i)
return response
except (asyncio.TimeoutError, Exception) as e:
if i == 2: # 所有模型都失败
raise HTTPException(503, f"All models failed: {str(e)}")
continue
async def _call_model(self, model: str, request: "ChatRequest") -> dict:
# 实际调用模型供应商 API
...
async def _record_fallback(self, original: str, actual: str, level: int):
# 记录降级指标,触发告警
...
可观测性集成(OpenTelemetry)
from opentelemetry import trace
from opentelemetry.trace import StatusCode
tracer = trace.get_tracer("llm-gateway")
class ObservableGateway:
"""集成 OpenTelemetry 的可观测 Gateway"""
async def process_request(self, request: "ChatRequest", team_id: str) -> dict:
with tracer.start_as_current_span("gateway.process") as span:
span.set_attribute("llm.team_id", team_id)
span.set_attribute("llm.requested_model", request.model)
span.set_attribute("llm.message_count", len(request.messages))
# 路由决策
with tracer.start_as_current_span("gateway.route") as route_span:
target_model = await self.router.route(request)
route_span.set_attribute("llm.routed_model", target_model)
# 模型调用
with tracer.start_as_current_span("gateway.inference") as infer_span:
response = await self.call_model(target_model, request)
infer_span.set_attribute("llm.input_tokens", response["usage"]["input"])
infer_span.set_attribute("llm.output_tokens", response["usage"]["output"])
infer_span.set_attribute("llm.cost_usd", response["cost"])
span.set_status(StatusCode.OK)
return response
开源方案对比
| 特性 | LiteLLM | Portkey | Helicone | 自建方案 |
|---|---|---|---|---|
| 模型支持数量 | 100+ | 200+ | 主流模型 | 按需 |
| 统一 API 格式 | ✅ OpenAI 兼容 | ✅ OpenAI 兼容 | ✅ 代理模式 | 自定义 |
| 负载均衡 | ✅ | ✅ | ❌ | 自建 |
| 语义缓存 | ✅ | ✅ | ❌ | 自建 |
| 成本追踪 | ✅ 基础 | ✅ 完整仪表盘 | ✅ 可视化 | 自建 |
| 降级链路 | ✅ | ✅ | ❌ | 自建 |
| Prompt 管理 | ❌ | ✅ | ❌ | 自建 |
| 自托管 | ✅ 开源 | ✅ 开源核心 | ❌ SaaS | ✅ |
| 生产就绪度 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 取决于投入 |
| 适合团队规模 | 5-50 人 | 10-500 人 | 1-20 人 | 50+ 人 |
推荐选型
- 快速启动(1 周内上线):LiteLLM — 开源免费,一行代码切换模型
- 企业级需求:Portkey — 完整的 Gateway + 可观测 + Prompt 管理
- 纯观测需求:Helicone — 零侵入代理模式,只看不管
- 深度定制:基于 LiteLLM 二次开发,保留核心路由逻辑,定制限流和计费
生产最佳实践
1. 渐进式迁移策略
Phase 1: 代理模式(透传所有请求,收集基线数据)
Phase 2: 启用缓存(验证缓存命中率和正确性)
Phase 3: 启用路由(灰度 10% 流量到智能路由)
Phase 4: 启用限流(逐步收紧配额)
Phase 5: 全量上线(关闭直连通道)
2. 关键运维指标
| 指标 | SLO | 告警条件 |
|---|---|---|
| Gateway P99 延迟 | < 100ms(不含模型推理) | > 200ms |
| 缓存命中率 | > 15% | < 5% 持续 1 小时 |
| 降级触发率 | < 1% | > 5% 持续 5 分钟 |
| Token 限流拒绝率 | < 2% | > 10% |
| 模型调用成功率 | > 99.5% | < 98% |
3. 安全防护
- 所有请求经过 Prompt 注入检测(关键词 + 分类器双重防线)
- 敏感数据(PII)在发送到外部模型前脱敏
- API Key 轮换与最小权限原则
- 审计日志记录所有模型调用(含完整 Prompt,加密存储)
更多关于 Prompt 安全防护的内容,参考 Prompt 注入攻防实战。
常见问题
Q1:LLM Gateway 和传统 API Gateway 有什么区别?
传统 API Gateway 处理的是确定性的 HTTP 请求,按请求数限流即可。LLM Gateway 需要处理非确定性的模型调用,限流维度从请求数扩展到 Token 数,还需要处理流式响应、模型路由、语义缓存和成本归因等 LLM 特有问题。
Q2:自建 LLM Gateway 还是使用开源方案?
如果团队少于 5 个模型、月调用量低于 100 万次,建议直接使用 LiteLLM 或 Portkey 等开源方案。当团队超过 10 个模型、需要深度定制路由策略或有严格的数据安全要求时,在开源方案基础上二次开发是更好的选择。
Q3:Token 限流和请求限流应该如何配合使用?
建议同时启用两种限流:请求限流(如 100 RPM)作为第一道防线防止突发洪峰,Token 限流(如 100K TPM)作为第二道防线控制实际成本。对于流式请求,需要在请求完成后异步扣减实际消耗的 Token 数。
Q4:语义缓存的命中率一般能达到多少?
取决于业务场景。客服类重复问答可以达到 30-50% 命中率,代码生成类通常低于 5%。关键是设置合理的相似度阈值(推荐 0.92-0.95),建议按场景分别配置阈值。
Q5:如何实现模型的无损热切换?
通过 Gateway 的路由配置实现。将路由规则与业务代码解耦——业务代码只声明"需要一个 coding 能力的模型",Gateway 根据当前配置将请求路由到具体模型。切换模型时只需更新 Gateway 配置,无需修改任何业务代码。
总结
LLM Gateway 是多模型时代的基础设施必备组件。通过统一入口实现模型路由、Token 限流、成本追踪和降级容错,可以将企业的 LLM 运维成本降低 30-60%,同时将可用性从"看运气"提升到 99.9%。
关键设计原则:
- 分层限流:请求级 + Token 级双重防护
- 智能路由:任务复杂度决定模型选择,而非人工指定
- 成本可见:每一分钱都能追溯到团队和项目
- 渐进迁移:从透传代理开始,逐步启用高级特性
- 开源优先:站在 LiteLLM/Portkey 的肩膀上二次开发
使用 JSON Formatter 调试 Gateway 的配置文件和 API 响应,使用 文本对比工具 比较不同模型的输出差异。