核心摘要

当企业同时使用 GPT-4、Claude、Gemini 和开源模型时,每个团队直接调用不同供应商 API 会导致成本失控、缺乏统一监控、无法灵活切换模型。LLM Gateway 将传统 API 网关模式应用于大模型场景,通过统一入口实现多模型智能路由基于 Token 的精细限流实时成本追踪自动降级容错。本文从架构设计到生产级代码实现,构建一个完整的 LLM Gateway 方案。

本文是 AI 架构师课程 系列的第 18 篇,建议结合 Agent 可观测性工程 一起阅读。


目录

  1. 核心要点
  2. 为什么需要 LLM Gateway
  3. LLM Gateway 整体架构
  4. 模型路由策略
  5. 限流与配额管理
  6. 成本管控与分析
  7. 高级特性
  8. 开源方案对比
  9. 生产最佳实践
  10. 常见问题
  11. 总结
  12. 相关资源

核心要点

  • 统一入口解决 API 碎片化:业务代码面向 Gateway 抽象编程,底层模型供应商可热切换,消除供应商锁定
  • Token 维度限流是 LLM 场景的必需品:传统请求级限流无法准确控制成本,一个长 Prompt 请求可能消耗 100 个短请求的 Token
  • 智能路由降本 30-60%:基于任务复杂度自动选择最具性价比的模型,简单任务用小模型、复杂任务才用大模型
  • 语义缓存是最被低估的优化手段:高重复度场景下缓存命中率可达 30-50%,直接节省对应比例的模型调用费用
  • 降级链路保障 99.9% 可用性:主模型 → 备用模型 → 本地小模型的三级降级策略,确保业务永不中断
  • 成本归因到团队/项目级别:像云账单一样精确追踪每个团队、每个项目的模型消耗,支撑内部计费和预算管控

为什么需要 LLM Gateway

多模型时代的管理困境

现代 AI 团队面临的典型场景:

问题 表现 影响
API 碎片化 10+ 个团队各自对接不同模型供应商 重复对接工作、无法统一监控
成本黑洞 无法精确追踪每个项目的模型消耗 月底账单超预期 3-5 倍
供应商锁定 业务代码深度绑定特定模型 API 切换模型需要重写调用层
限流不一致 各团队各自管理 API Key 和配额 某个团队超用导致全部被限流
缺乏容错 模型服务宕机直接影响业务 无降级策略,单点故障

从传统 API Gateway 到 LLM Gateway

传统的 API Gateway(Kong、Envoy、Nginx)已经解决了微服务间的路由、限流和认证问题。LLM Gateway 将这些成熟模式适配到大模型场景,同时增加了 LLM 特有的能力:

graph LR A[传统 API Gateway] --> B[请求路由] A --> C[请求级限流] A --> D[认证鉴权] A --> E[负载均衡] F[LLM Gateway 额外能力] --> G[Token 级限流] F --> H[模型智能路由] F --> I[语义缓存] F --> J[成本追踪] F --> K[Prompt 安全防护] F --> L[流式响应处理]

LLM Gateway 整体架构

核心组件

graph TB subgraph CL["Clients"] C1[Team A] C2[Team B] C3[Team C] end subgraph Gateway["LLM Gateway"] AUTH[认证鉴权] ROUTER[智能路由器] RL[Token 限流器] CACHE[语义缓存] COST[成本追踪器] FALL[降级管理器] OBS[可观测性] end subgraph Providers["模型供应商"] P1[OpenAI GPT-4o] P2[Anthropic Claude] P3[Google Gemini] P4[本地 Ollama] end C1 --> AUTH C2 --> AUTH C3 --> AUTH AUTH --> ROUTER ROUTER --> RL RL --> CACHE CACHE -->|缓存未命中| FALL FALL --> P1 FALL --> P2 FALL --> P3 FALL --> P4 COST --> OBS

请求生命周期

一个完整的 LLM 请求在 Gateway 中经历以下阶段:

sequenceDiagram participant Client as 业务服务 participant GW as LLM Gateway participant Cache as 语义缓存 participant Router as 路由器 participant Provider as 模型供应商 participant Cost as 成本追踪 Client->>GW: POST /v1/chat/completions GW->>GW: 认证 & 提取团队/项目标识 GW->>GW: Token 限流检查 GW->>Cache: 语义相似度查询 alt 缓存命中 Cache-->>Client: 返回缓存结果 else 缓存未命中 GW->>Router: 路由决策(模型选择) Router->>Provider: 转发请求 Provider-->>GW: 返回响应 GW->>Cache: 写入缓存 GW->>Cost: 记录 Token 消耗 GW-->>Client: 返回响应 end

基础 Gateway 骨架实现(Python)

python
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
import time
import hashlib

app = FastAPI()

class ChatRequest(BaseModel):
    model: str
    messages: list[dict]
    temperature: float = 0.7
    max_tokens: Optional[int] = None
    metadata: Optional[dict] = None  # team_id, project_id

class GatewayConfig:
    def __init__(self):
        self.route_rules: dict = {}
        self.rate_limits: dict = {}
        self.fallback_chains: dict = {}
        self.cost_config: dict = {}

class LLMGateway:
    def __init__(self, config: GatewayConfig):
        self.config = config
        self.router = ModelRouter(config.route_rules)
        self.rate_limiter = TokenRateLimiter(config.rate_limits)
        self.cache = SemanticCache()
        self.cost_tracker = CostTracker(config.cost_config)
        self.fallback_manager = FallbackManager(config.fallback_chains)

    async def process_request(self, request: ChatRequest, team_id: str) -> dict:
        # 1. 限流检查
        if not await self.rate_limiter.allow(team_id, request):
            raise HTTPException(429, "Token quota exceeded")

        # 2. 语义缓存查询
        cached = await self.cache.get(request.messages)
        if cached:
            return cached

        # 3. 路由决策
        target_model = await self.router.route(request)

        # 4. 带降级的模型调用
        response = await self.fallback_manager.call_with_fallback(
            target_model, request
        )

        # 5. 异步任务:缓存写入 + 成本记录
        await self.cache.set(request.messages, response)
        await self.cost_tracker.record(team_id, target_model, response)

        return response

模型路由策略

基于任务复杂度的路由

最有效的降本策略:让简单任务使用便宜的小模型,只有复杂任务才路由到昂贵的大模型。

python
from enum import Enum
import tiktoken

class TaskComplexity(Enum):
    SIMPLE = "simple"      # 格式转换、简单问答
    MODERATE = "moderate"  # 摘要、翻译、一般推理
    COMPLEX = "complex"    # 代码生成、数学推理、长文分析

class ContentBasedRouter:
    """基于内容复杂度的智能路由"""

    COMPLEXITY_SIGNALS = {
        "code_keywords": ["implement", "debug", "refactor", "algorithm"],
        "reasoning_keywords": ["analyze", "compare", "evaluate", "prove"],
        "simple_keywords": ["format", "convert", "translate", "summarize"],
    }

    MODEL_MAP = {
        TaskComplexity.SIMPLE: "gpt-4o-mini",
        TaskComplexity.MODERATE: "claude-3-5-haiku",
        TaskComplexity.COMPLEX: "claude-sonnet-4",
    }

    def estimate_complexity(self, messages: list[dict]) -> TaskComplexity:
        last_message = messages[-1]["content"].lower()
        token_count = len(tiktoken.encoding_for_model("gpt-4").encode(last_message))

        # 长 Prompt 通常是复杂任务
        if token_count > 2000:
            return TaskComplexity.COMPLEX

        # 关键词匹配
        for keyword in self.COMPLEXITY_SIGNALS["code_keywords"]:
            if keyword in last_message:
                return TaskComplexity.COMPLEX

        for keyword in self.COMPLEXITY_SIGNALS["reasoning_keywords"]:
            if keyword in last_message:
                return TaskComplexity.MODERATE

        return TaskComplexity.SIMPLE

    async def route(self, request: "ChatRequest") -> str:
        complexity = self.estimate_complexity(request.messages)
        base_model = self.MODEL_MAP[complexity]

        # 如果用户显式指定了模型,尊重用户选择
        if request.model and request.model != "auto":
            return request.model

        return base_model

基于预算的路由

typescript
// TypeScript: 预算感知路由器
interface BudgetConfig {
  teamId: string;
  monthlyBudget: number;       // 月预算(美元)
  currentSpend: number;        // 当月已消耗
  alertThreshold: number;      // 告警阈值(0.8 = 80%)
  hardLimitThreshold: number;  // 硬限制阈值(0.95 = 95%)
}

interface ModelPricing {
  model: string;
  inputCostPer1K: number;   // 每 1K input token 成本
  outputCostPer1K: number;  // 每 1K output token 成本
}

class BudgetAwareRouter {
  private pricing: Map<string, ModelPricing> = new Map([
    ['gpt-4o', { model: 'gpt-4o', inputCostPer1K: 0.0025, outputCostPer1K: 0.01 }],
    ['gpt-4o-mini', { model: 'gpt-4o-mini', inputCostPer1K: 0.00015, outputCostPer1K: 0.0006 }],
    ['claude-sonnet-4', { model: 'claude-sonnet-4', inputCostPer1K: 0.003, outputCostPer1K: 0.015 }],
    ['claude-3-5-haiku', { model: 'claude-3-5-haiku', inputCostPer1K: 0.0008, outputCostPer1K: 0.004 }],
  ]);

  async route(request: ChatRequest, budget: BudgetConfig): Promise<string> {
    const spendRatio = budget.currentSpend / budget.monthlyBudget;

    // 超过硬限制,强制使用最便宜的模型
    if (spendRatio >= budget.hardLimitThreshold) {
      console.warn(`[Budget] Team ${budget.teamId} hit hard limit, forcing cheapest model`);
      return 'gpt-4o-mini';
    }

    // 超过告警阈值,降级到中等模型
    if (spendRatio >= budget.alertThreshold) {
      return this.selectMidTierModel(request);
    }

    // 预算充足,正常路由
    return request.model || 'gpt-4o';
  }

  private selectMidTierModel(request: ChatRequest): string {
    // 根据任务类型选择性价比最优的中等模型
    const isCodeTask = request.messages.some(m =>
      m.content.includes('```') || m.content.includes('code')
    );
    return isCodeTask ? 'claude-3-5-haiku' : 'gpt-4o-mini';
  }
}

A/B 测试与灰度发布

python
import random
from dataclasses import dataclass

@dataclass
class CanaryRule:
    model_a: str           # 基准模型
    model_b: str           # 候选模型
    traffic_percent: float # 候选模型流量比例 (0-1)
    metric_key: str        # 评估指标

class CanaryRouter:
    """模型灰度发布路由器"""

    def __init__(self):
        self.active_canaries: dict[str, CanaryRule] = {}
        self.metrics_collector = MetricsCollector()

    async def route(self, request: "ChatRequest", canary_id: str) -> str:
        rule = self.active_canaries.get(canary_id)
        if not rule:
            return request.model

        # 按流量比例分配
        if random.random() < rule.traffic_percent:
            selected = rule.model_b
            variant = "canary"
        else:
            selected = rule.model_a
            variant = "baseline"

        # 记录分组信息,用于后续分析
        self.metrics_collector.tag_request(
            request_id=request.metadata.get("request_id"),
            variant=variant,
            model=selected
        )
        return selected

限流与配额管理

为什么请求级限流不够

维度 请求级限流 Token 级限流
粒度 每分钟请求数(RPM) 每分钟 Token 数(TPM)
成本精度 低——一个 10K Token 请求和 100 Token 请求同等计数 高——精确反映实际资源消耗
适用场景 防止突发洪峰 控制实际成本
实现复杂度 简单 中等(需要 Token 计数)

最佳实践:双层限流 = 请求级 + Token 级同时生效

Redis 实现的 Token 限流器

python
import redis.asyncio as redis
import time
import tiktoken

class TokenRateLimiter:
    """基于滑动窗口的 Token 限流器"""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.encoder = tiktoken.encoding_for_model("gpt-4")

    def count_tokens(self, messages: list[dict]) -> int:
        total = 0
        for msg in messages:
            total += len(self.encoder.encode(msg.get("content", "")))
            total += 4  # role + formatting overhead
        return total

    async def allow(self, team_id: str, request: "ChatRequest") -> bool:
        """检查是否允许请求通过"""
        token_count = self.count_tokens(request.messages)
        now = time.time()
        window_key = f"ratelimit:token:{team_id}"
        window_size = 60  # 1 分钟滑动窗口

        pipe = self.redis.pipeline()
        # 清理过期条目
        pipe.zremrangebyscore(window_key, 0, now - window_size)
        # 获取当前窗口内的 Token 总量
        pipe.zrangebyscore(window_key, now - window_size, now)
        results = await pipe.execute()

        current_tokens = sum(int(score) for score in results[1]) if results[1] else 0
        limit = await self.get_team_limit(team_id)

        if current_tokens + token_count > limit:
            return False

        # 记录本次请求的 Token 数
        await self.redis.zadd(window_key, {f"{now}:{token_count}": now})
        await self.redis.expire(window_key, window_size + 10)
        return True

    async def get_team_limit(self, team_id: str) -> int:
        """获取团队的 TPM 限制"""
        config = await self.redis.hget("team_limits", team_id)
        return int(config) if config else 100_000  # 默认 100K TPM

    async def record_actual_usage(self, team_id: str, actual_tokens: int):
        """流式响应完成后,校正实际 Token 消耗"""
        now = time.time()
        window_key = f"ratelimit:token:{team_id}"
        await self.redis.zadd(window_key, {f"{now}:output:{actual_tokens}": now})

多级配额体系

typescript
// TypeScript: 层级化配额管理
interface QuotaConfig {
  organization: {
    monthlyTokens: number;    // 组织级月配额
    maxConcurrent: number;    // 最大并发请求
  };
  team: {
    dailyTokens: number;      // 团队级日配额
    rpm: number;              // 每分钟请求数
    tpm: number;              // 每分钟 Token 数
  };
  user: {
    hourlyTokens: number;     // 用户级小时配额
    rpm: number;
  };
}

class HierarchicalQuotaManager {
  private redis: Redis;

  async checkQuota(
    orgId: string,
    teamId: string,
    userId: string,
    estimatedTokens: number
  ): Promise<{ allowed: boolean; reason?: string }> {
    // 从最细粒度开始检查
    const checks = [
      this.checkUserQuota(userId, estimatedTokens),
      this.checkTeamQuota(teamId, estimatedTokens),
      this.checkOrgQuota(orgId, estimatedTokens),
    ];

    const results = await Promise.all(checks);

    for (const result of results) {
      if (!result.allowed) {
        return result;
      }
    }

    return { allowed: true };
  }

  private async checkTeamQuota(
    teamId: string,
    tokens: number
  ): Promise<{ allowed: boolean; reason?: string }> {
    const key = `quota:team:${teamId}:${this.getTodayKey()}`;
    const current = await this.redis.get(key);
    const used = parseInt(current || '0');
    const limit = await this.getTeamDailyLimit(teamId);

    if (used + tokens > limit) {
      return {
        allowed: false,
        reason: `Team daily quota exceeded: ${used}/${limit} tokens`
      };
    }
    return { allowed: true };
  }

  private getTodayKey(): string {
    return new Date().toISOString().split('T')[0];
  }
}

成本管控与分析

实时成本追踪

python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import asyncio

@dataclass
class UsageRecord:
    timestamp: datetime
    team_id: str
    project_id: str
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    latency_ms: int
    cache_hit: bool = False
    request_id: str = ""

class CostTracker:
    """实时成本追踪与预算管控"""

    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},         # per 1M tokens
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet-4": {"input": 3.00, "output": 15.00},
        "claude-3-5-haiku": {"input": 0.80, "output": 4.00},
        "gemini-2.5-pro": {"input": 1.25, "output": 10.00},
    }

    def __init__(self):
        self.records: list[UsageRecord] = []
        self.budget_alerts: dict[str, float] = {}

    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        pricing = self.PRICING.get(model)
        if not pricing:
            return 0.0
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        return round(input_cost + output_cost, 6)

    async def record(
        self,
        team_id: str,
        project_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: int,
        cache_hit: bool = False,
    ):
        cost = 0.0 if cache_hit else self.calculate_cost(model, input_tokens, output_tokens)

        record = UsageRecord(
            timestamp=datetime.utcnow(),
            team_id=team_id,
            project_id=project_id,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost_usd=cost,
            latency_ms=latency_ms,
            cache_hit=cache_hit,
        )
        self.records.append(record)

        # 检查预算告警
        await self._check_budget_alert(team_id, cost)

    async def _check_budget_alert(self, team_id: str, new_cost: float):
        monthly_spend = await self.get_monthly_spend(team_id)
        budget = self.budget_alerts.get(team_id, float("inf"))

        if monthly_spend / budget > 0.8:
            await self._send_alert(team_id, monthly_spend, budget)

    async def get_monthly_spend(self, team_id: str) -> float:
        # 实际生产中从时序数据库查询
        return sum(
            r.cost_usd for r in self.records
            if r.team_id == team_id
            and r.timestamp.month == datetime.utcnow().month
        )

成本仪表盘指标

指标 计算方式 告警阈值
每请求平均成本 月总成本 / 请求数 环比增长 > 20%
模型成本占比 各模型消耗 / 总消耗 单模型占比 > 60%
缓存节省金额 缓存命中数 × 平均请求成本 命中率 < 10% 告警
团队预算消耗率 当月消耗 / 月预算 > 80% 预警,> 95% 限流
Token 效率 有效输出 Token / 总 Token < 50% 说明 Prompt 过长

高级特性

语义缓存

传统缓存按精确字符串匹配,语义缓存基于向量相似度——"北京今天天气怎么样"和"今天北京什么天气"可以命中同一条缓存。

python
import numpy as np
from typing import Optional

class SemanticCache:
    """基于向量相似度的语义缓存"""

    def __init__(self, similarity_threshold: float = 0.93):
        self.threshold = similarity_threshold
        self.embeddings: list[np.ndarray] = []
        self.responses: list[dict] = []
        self.ttl_seconds = 3600  # 1 小时过期

    async def get_embedding(self, text: str) -> np.ndarray:
        # 使用轻量级 embedding 模型
        # 生产中推荐 text-embedding-3-small(成本低、速度快)
        response = await embedding_client.create(
            model="text-embedding-3-small",
            input=text
        )
        return np.array(response.data[0].embedding)

    async def get(self, messages: list[dict]) -> Optional[dict]:
        query_text = messages[-1]["content"]
        query_embedding = await self.get_embedding(query_text)

        if not self.embeddings:
            return None

        # 计算余弦相似度
        similarities = np.dot(self.embeddings, query_embedding) / (
            np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(query_embedding)
        )
        max_idx = np.argmax(similarities)
        max_sim = similarities[max_idx]

        if max_sim >= self.threshold:
            return self.responses[max_idx]
        return None

    async def set(self, messages: list[dict], response: dict):
        query_text = messages[-1]["content"]
        embedding = await self.get_embedding(query_text)
        self.embeddings.append(embedding)
        self.responses.append(response)

自动降级链路

python
import asyncio
from dataclasses import dataclass

@dataclass
class FallbackChain:
    primary: str
    secondary: str
    tertiary: str
    timeout_ms: int = 30000

class FallbackManager:
    """三级降级管理器"""

    DEFAULT_CHAINS = {
        "gpt-4o": FallbackChain("gpt-4o", "claude-sonnet-4", "gpt-4o-mini"),
        "claude-sonnet-4": FallbackChain("claude-sonnet-4", "gpt-4o", "claude-3-5-haiku"),
    }

    async def call_with_fallback(self, model: str, request: "ChatRequest") -> dict:
        chain = self.DEFAULT_CHAINS.get(model, FallbackChain(model, "gpt-4o-mini", "gpt-4o-mini"))

        for i, target in enumerate([chain.primary, chain.secondary, chain.tertiary]):
            try:
                response = await asyncio.wait_for(
                    self._call_model(target, request),
                    timeout=chain.timeout_ms / 1000
                )
                if i > 0:
                    # 记录降级事件
                    await self._record_fallback(model, target, i)
                return response
            except (asyncio.TimeoutError, Exception) as e:
                if i == 2:  # 所有模型都失败
                    raise HTTPException(503, f"All models failed: {str(e)}")
                continue

    async def _call_model(self, model: str, request: "ChatRequest") -> dict:
        # 实际调用模型供应商 API
        ...

    async def _record_fallback(self, original: str, actual: str, level: int):
        # 记录降级指标,触发告警
        ...

可观测性集成(OpenTelemetry)

python
from opentelemetry import trace
from opentelemetry.trace import StatusCode

tracer = trace.get_tracer("llm-gateway")

class ObservableGateway:
    """集成 OpenTelemetry 的可观测 Gateway"""

    async def process_request(self, request: "ChatRequest", team_id: str) -> dict:
        with tracer.start_as_current_span("gateway.process") as span:
            span.set_attribute("llm.team_id", team_id)
            span.set_attribute("llm.requested_model", request.model)
            span.set_attribute("llm.message_count", len(request.messages))

            # 路由决策
            with tracer.start_as_current_span("gateway.route") as route_span:
                target_model = await self.router.route(request)
                route_span.set_attribute("llm.routed_model", target_model)

            # 模型调用
            with tracer.start_as_current_span("gateway.inference") as infer_span:
                response = await self.call_model(target_model, request)
                infer_span.set_attribute("llm.input_tokens", response["usage"]["input"])
                infer_span.set_attribute("llm.output_tokens", response["usage"]["output"])
                infer_span.set_attribute("llm.cost_usd", response["cost"])

            span.set_status(StatusCode.OK)
            return response

开源方案对比

特性 LiteLLM Portkey Helicone 自建方案
模型支持数量 100+ 200+ 主流模型 按需
统一 API 格式 ✅ OpenAI 兼容 ✅ OpenAI 兼容 ✅ 代理模式 自定义
负载均衡 自建
语义缓存 自建
成本追踪 ✅ 基础 ✅ 完整仪表盘 ✅ 可视化 自建
降级链路 自建
Prompt 管理 自建
自托管 ✅ 开源 ✅ 开源核心 ❌ SaaS
生产就绪度 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐ 取决于投入
适合团队规模 5-50 人 10-500 人 1-20 人 50+ 人

推荐选型

  • 快速启动(1 周内上线):LiteLLM — 开源免费,一行代码切换模型
  • 企业级需求:Portkey — 完整的 Gateway + 可观测 + Prompt 管理
  • 纯观测需求:Helicone — 零侵入代理模式,只看不管
  • 深度定制:基于 LiteLLM 二次开发,保留核心路由逻辑,定制限流和计费

生产最佳实践

1. 渐进式迁移策略

code
Phase 1: 代理模式(透传所有请求,收集基线数据)
Phase 2: 启用缓存(验证缓存命中率和正确性)
Phase 3: 启用路由(灰度 10% 流量到智能路由)
Phase 4: 启用限流(逐步收紧配额)
Phase 5: 全量上线(关闭直连通道)

2. 关键运维指标

指标 SLO 告警条件
Gateway P99 延迟 < 100ms(不含模型推理) > 200ms
缓存命中率 > 15% < 5% 持续 1 小时
降级触发率 < 1% > 5% 持续 5 分钟
Token 限流拒绝率 < 2% > 10%
模型调用成功率 > 99.5% < 98%

3. 安全防护

  • 所有请求经过 Prompt 注入检测(关键词 + 分类器双重防线)
  • 敏感数据(PII)在发送到外部模型前脱敏
  • API Key 轮换与最小权限原则
  • 审计日志记录所有模型调用(含完整 Prompt,加密存储)

更多关于 Prompt 安全防护的内容,参考 Prompt 注入攻防实战


常见问题

Q1:LLM Gateway 和传统 API Gateway 有什么区别?

传统 API Gateway 处理的是确定性的 HTTP 请求,按请求数限流即可。LLM Gateway 需要处理非确定性的模型调用,限流维度从请求数扩展到 Token 数,还需要处理流式响应、模型路由、语义缓存和成本归因等 LLM 特有问题。

Q2:自建 LLM Gateway 还是使用开源方案?

如果团队少于 5 个模型、月调用量低于 100 万次,建议直接使用 LiteLLM 或 Portkey 等开源方案。当团队超过 10 个模型、需要深度定制路由策略或有严格的数据安全要求时,在开源方案基础上二次开发是更好的选择。

Q3:Token 限流和请求限流应该如何配合使用?

建议同时启用两种限流:请求限流(如 100 RPM)作为第一道防线防止突发洪峰,Token 限流(如 100K TPM)作为第二道防线控制实际成本。对于流式请求,需要在请求完成后异步扣减实际消耗的 Token 数。

Q4:语义缓存的命中率一般能达到多少?

取决于业务场景。客服类重复问答可以达到 30-50% 命中率,代码生成类通常低于 5%。关键是设置合理的相似度阈值(推荐 0.92-0.95),建议按场景分别配置阈值。

Q5:如何实现模型的无损热切换?

通过 Gateway 的路由配置实现。将路由规则与业务代码解耦——业务代码只声明"需要一个 coding 能力的模型",Gateway 根据当前配置将请求路由到具体模型。切换模型时只需更新 Gateway 配置,无需修改任何业务代码。


总结

LLM Gateway 是多模型时代的基础设施必备组件。通过统一入口实现模型路由、Token 限流、成本追踪和降级容错,可以将企业的 LLM 运维成本降低 30-60%,同时将可用性从"看运气"提升到 99.9%。

关键设计原则:

  1. 分层限流:请求级 + Token 级双重防护
  2. 智能路由:任务复杂度决定模型选择,而非人工指定
  3. 成本可见:每一分钱都能追溯到团队和项目
  4. 渐进迁移:从透传代理开始,逐步启用高级特性
  5. 开源优先:站在 LiteLLM/Portkey 的肩膀上二次开发

使用 JSON Formatter 调试 Gateway 的配置文件和 API 响应,使用 文本对比工具 比较不同模型的输出差异。


相关资源