TL;DR

When enterprises use GPT-4, Claude, Gemini, and open-source models simultaneously, teams calling different provider APIs directly leads to uncontrolled costs, no unified monitoring, and inability to switch models flexibly. An LLM Gateway applies the traditional API gateway pattern to the LLM domain, providing a unified entry point for intelligent multi-model routing, token-based rate limiting, real-time cost tracking, and automatic fallback with graceful degradation. This guide covers architecture design through production-ready code implementation for a complete LLM Gateway solution.

This is article #18 in the AI Architect Course series. For observability integration, see Agent Observability Engineering.


Table of Contents

  1. Key Takeaways
  2. Why You Need an LLM Gateway
  3. LLM Gateway Architecture Overview
  4. Model Routing Strategies
  5. Rate Limiting & Quota Management
  6. Cost Management & Analytics
  7. Advanced Features
  8. Open Source Solutions Comparison
  9. Production Best Practices
  10. FAQ
  11. Summary
  12. Related Resources

Key Takeaways

  • Unified entry point eliminates API fragmentation: Business code programs against the Gateway abstraction; underlying model providers can be hot-swapped without vendor lock-in
  • Token-level rate limiting is essential for LLM workloads: Traditional request-count limiting cannot accurately control costs—one long prompt may consume 100x the tokens of a short request
  • Intelligent routing reduces costs 30-60%: Automatically select the most cost-effective model based on task complexity—simple tasks use small models, only complex tasks warrant large models
  • Semantic caching is the most underrated optimization: In high-repetition scenarios, cache hit rates of 30-50% directly save the corresponding proportion of model invocation costs
  • Fallback chains guarantee 99.9% availability: A three-tier degradation strategy (primary → secondary → local model) ensures business continuity regardless of provider outages
  • Cost attribution at team/project granularity: Track every team's and project's model consumption as precisely as a cloud bill, supporting internal chargeback and budget governance

Why You Need an LLM Gateway

The Multi-Model Management Challenge

Modern AI teams face a common set of operational pain points:

Problem Symptom Impact
API Fragmentation 10+ teams each integrating different model providers Duplicated integration work, no unified observability
Cost Black Holes No precise tracking of per-project model consumption End-of-month bills 3-5x over budget
Vendor Lock-in Business code tightly coupled to specific model APIs Switching models requires rewriting the call layer
Inconsistent Limits Teams managing their own API keys and quotas One team's overuse throttles everyone
No Fault Tolerance Model service downtime directly impacts production No degradation strategy, single points of failure

From Traditional API Gateway to LLM Gateway

Traditional API Gateways (Kong, Envoy, Nginx) have solved routing, rate limiting, and authentication for microservices. An LLM Gateway adapts these proven patterns to the large model domain while adding LLM-specific capabilities:

graph LR A[Traditional API Gateway] --> B[Request Routing] A --> C[Request-Level Rate Limiting] A --> D[Authentication] A --> E[Load Balancing] F[LLM Gateway Additions] --> G[Token-Level Rate Limiting] F --> H[Intelligent Model Routing] F --> I[Semantic Caching] F --> J[Cost Tracking] F --> K[Prompt Security] F --> L[Streaming Response Handling]

LLM Gateway Architecture Overview

Core Components

graph TB subgraph CL["Clients"] C1[Team A] C2[Team B] C3[Team C] end subgraph Gateway["LLM Gateway"] AUTH["Auth & Identity"] ROUTER[Intelligent Router] RL[Token Rate Limiter] CACHE[Semantic Cache] COST[Cost Tracker] FALL[Fallback Manager] OBS[Observability] end subgraph Providers["Model Providers"] P1[OpenAI GPT-4o] P2[Anthropic Claude] P3[Google Gemini] P4[Local Ollama] end C1 --> AUTH C2 --> AUTH C3 --> AUTH AUTH --> ROUTER ROUTER --> RL RL --> CACHE CACHE -->|Cache Miss| FALL FALL --> P1 FALL --> P2 FALL --> P3 FALL --> P4 COST --> OBS

Request Lifecycle

Every LLM request passes through the following stages within the Gateway:

sequenceDiagram participant Client as Business Service participant GW as LLM Gateway participant Cache as Semantic Cache participant Router as Router participant Provider as Model Provider participant Cost as Cost Tracker Client->>GW: POST /v1/chat/completions GW->>GW: Authenticate & extract team/project ID GW->>GW: Token rate limit check GW->>Cache: Semantic similarity query alt Cache Hit Cache-->>Client: Return cached result else Cache Miss GW->>Router: Routing decision (model selection) Router->>Provider: Forward request Provider-->>GW: Return response GW->>Cache: Write to cache GW->>Cost: Record token consumption GW-->>Client: Return response end

Gateway Skeleton Implementation (Python)

python
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
import time
import hashlib

app = FastAPI()

class ChatRequest(BaseModel):
    model: str
    messages: list[dict]
    temperature: float = 0.7
    max_tokens: Optional[int] = None
    metadata: Optional[dict] = None  # team_id, project_id

class GatewayConfig:
    def __init__(self):
        self.route_rules: dict = {}
        self.rate_limits: dict = {}
        self.fallback_chains: dict = {}
        self.cost_config: dict = {}

class LLMGateway:
    def __init__(self, config: GatewayConfig):
        self.config = config
        self.router = ModelRouter(config.route_rules)
        self.rate_limiter = TokenRateLimiter(config.rate_limits)
        self.cache = SemanticCache()
        self.cost_tracker = CostTracker(config.cost_config)
        self.fallback_manager = FallbackManager(config.fallback_chains)

    async def process_request(self, request: ChatRequest, team_id: str) -> dict:
        # 1. Rate limit check
        if not await self.rate_limiter.allow(team_id, request):
            raise HTTPException(429, "Token quota exceeded")

        # 2. Semantic cache lookup
        cached = await self.cache.get(request.messages)
        if cached:
            return cached

        # 3. Routing decision
        target_model = await self.router.route(request)

        # 4. Model call with fallback
        response = await self.fallback_manager.call_with_fallback(
            target_model, request
        )

        # 5. Async: cache write + cost recording
        await self.cache.set(request.messages, response)
        await self.cost_tracker.record(team_id, target_model, response)

        return response

Model Routing Strategies

Content-Based Routing (Task Complexity)

The most effective cost-reduction strategy: route simple tasks to cheap small models, and only send complex tasks to expensive large models.

python
from enum import Enum
import tiktoken

class TaskComplexity(Enum):
    SIMPLE = "simple"      # Format conversion, simple Q&A
    MODERATE = "moderate"  # Summarization, translation, general reasoning
    COMPLEX = "complex"    # Code generation, math reasoning, long analysis

class ContentBasedRouter:
    """Intelligent routing based on content complexity"""

    COMPLEXITY_SIGNALS = {
        "code_keywords": ["implement", "debug", "refactor", "algorithm"],
        "reasoning_keywords": ["analyze", "compare", "evaluate", "prove"],
        "simple_keywords": ["format", "convert", "translate", "summarize"],
    }

    MODEL_MAP = {
        TaskComplexity.SIMPLE: "gpt-4o-mini",
        TaskComplexity.MODERATE: "claude-3-5-haiku",
        TaskComplexity.COMPLEX: "claude-sonnet-4",
    }

    def estimate_complexity(self, messages: list[dict]) -> TaskComplexity:
        last_message = messages[-1]["content"].lower()
        token_count = len(tiktoken.encoding_for_model("gpt-4").encode(last_message))

        # Long prompts typically indicate complex tasks
        if token_count > 2000:
            return TaskComplexity.COMPLEX

        # Keyword matching
        for keyword in self.COMPLEXITY_SIGNALS["code_keywords"]:
            if keyword in last_message:
                return TaskComplexity.COMPLEX

        for keyword in self.COMPLEXITY_SIGNALS["reasoning_keywords"]:
            if keyword in last_message:
                return TaskComplexity.MODERATE

        return TaskComplexity.SIMPLE

    async def route(self, request: "ChatRequest") -> str:
        complexity = self.estimate_complexity(request.messages)
        base_model = self.MODEL_MAP[complexity]

        # Respect explicit model selection
        if request.model and request.model != "auto":
            return request.model

        return base_model

Budget-Aware Routing

typescript
// TypeScript: Budget-aware router
interface BudgetConfig {
  teamId: string;
  monthlyBudget: number;       // Monthly budget in USD
  currentSpend: number;        // Current month spend
  alertThreshold: number;      // Alert threshold (0.8 = 80%)
  hardLimitThreshold: number;  // Hard limit threshold (0.95 = 95%)
}

interface ModelPricing {
  model: string;
  inputCostPer1K: number;   // Cost per 1K input tokens
  outputCostPer1K: number;  // Cost per 1K output tokens
}

class BudgetAwareRouter {
  private pricing: Map<string, ModelPricing> = new Map([
    ['gpt-4o', { model: 'gpt-4o', inputCostPer1K: 0.0025, outputCostPer1K: 0.01 }],
    ['gpt-4o-mini', { model: 'gpt-4o-mini', inputCostPer1K: 0.00015, outputCostPer1K: 0.0006 }],
    ['claude-sonnet-4', { model: 'claude-sonnet-4', inputCostPer1K: 0.003, outputCostPer1K: 0.015 }],
    ['claude-3-5-haiku', { model: 'claude-3-5-haiku', inputCostPer1K: 0.0008, outputCostPer1K: 0.004 }],
  ]);

  async route(request: ChatRequest, budget: BudgetConfig): Promise<string> {
    const spendRatio = budget.currentSpend / budget.monthlyBudget;

    // Over hard limit: force cheapest model
    if (spendRatio >= budget.hardLimitThreshold) {
      console.warn(`[Budget] Team ${budget.teamId} hit hard limit, forcing cheapest model`);
      return 'gpt-4o-mini';
    }

    // Over alert threshold: downgrade to mid-tier
    if (spendRatio >= budget.alertThreshold) {
      return this.selectMidTierModel(request);
    }

    // Budget available: route normally
    return request.model || 'gpt-4o';
  }

  private selectMidTierModel(request: ChatRequest): string {
    const isCodeTask = request.messages.some(m =>
      m.content.includes('```') || m.content.includes('code')
    );
    return isCodeTask ? 'claude-3-5-haiku' : 'gpt-4o-mini';
  }
}

A/B Testing and Canary Deployments for Models

python
import random
from dataclasses import dataclass

@dataclass
class CanaryRule:
    model_a: str           # Baseline model
    model_b: str           # Candidate model
    traffic_percent: float # Candidate traffic percentage (0-1)
    metric_key: str        # Evaluation metric

class CanaryRouter:
    """Model canary deployment router"""

    def __init__(self):
        self.active_canaries: dict[str, CanaryRule] = {}
        self.metrics_collector = MetricsCollector()

    async def route(self, request: "ChatRequest", canary_id: str) -> str:
        rule = self.active_canaries.get(canary_id)
        if not rule:
            return request.model

        # Allocate by traffic percentage
        if random.random() < rule.traffic_percent:
            selected = rule.model_b
            variant = "canary"
        else:
            selected = rule.model_a
            variant = "baseline"

        # Tag request for downstream analysis
        self.metrics_collector.tag_request(
            request_id=request.metadata.get("request_id"),
            variant=variant,
            model=selected
        )
        return selected

Rate Limiting & Quota Management

Why Request-Level Limiting Falls Short

Dimension Request-Level Limiting Token-Level Limiting
Granularity Requests per minute (RPM) Tokens per minute (TPM)
Cost Accuracy Low—a 10K token request counts the same as 100 tokens High—precisely reflects actual resource consumption
Use Case Prevent burst floods Control actual costs
Implementation Complexity Simple Moderate (requires token counting)

Best practice: Dual-layer limiting = Request-level + Token-level enforced simultaneously

Redis-Based Token Rate Limiter

python
import redis.asyncio as redis
import time
import tiktoken

class TokenRateLimiter:
    """Sliding window token-based rate limiter"""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.encoder = tiktoken.encoding_for_model("gpt-4")

    def count_tokens(self, messages: list[dict]) -> int:
        total = 0
        for msg in messages:
            total += len(self.encoder.encode(msg.get("content", "")))
            total += 4  # role + formatting overhead
        return total

    async def allow(self, team_id: str, request: "ChatRequest") -> bool:
        """Check if the request is within quota"""
        token_count = self.count_tokens(request.messages)
        now = time.time()
        window_key = f"ratelimit:token:{team_id}"
        window_size = 60  # 1-minute sliding window

        pipe = self.redis.pipeline()
        # Remove expired entries
        pipe.zremrangebyscore(window_key, 0, now - window_size)
        # Get current window token total
        pipe.zrangebyscore(window_key, now - window_size, now)
        results = await pipe.execute()

        current_tokens = sum(int(score) for score in results[1]) if results[1] else 0
        limit = await self.get_team_limit(team_id)

        if current_tokens + token_count > limit:
            return False

        # Record this request's token count
        await self.redis.zadd(window_key, {f"{now}:{token_count}": now})
        await self.redis.expire(window_key, window_size + 10)
        return True

    async def get_team_limit(self, team_id: str) -> int:
        """Get team's TPM limit"""
        config = await self.redis.hget("team_limits", team_id)
        return int(config) if config else 100_000  # Default 100K TPM

    async def record_actual_usage(self, team_id: str, actual_tokens: int):
        """Reconcile actual token usage after streaming response completes"""
        now = time.time()
        window_key = f"ratelimit:token:{team_id}"
        await self.redis.zadd(window_key, {f"{now}:output:{actual_tokens}": now})

Hierarchical Quota Management

typescript
// TypeScript: Hierarchical quota management
interface QuotaConfig {
  organization: {
    monthlyTokens: number;    // Org-level monthly quota
    maxConcurrent: number;    // Max concurrent requests
  };
  team: {
    dailyTokens: number;      // Team-level daily quota
    rpm: number;              // Requests per minute
    tpm: number;              // Tokens per minute
  };
  user: {
    hourlyTokens: number;     // User-level hourly quota
    rpm: number;
  };
}

class HierarchicalQuotaManager {
  private redis: Redis;

  async checkQuota(
    orgId: string,
    teamId: string,
    userId: string,
    estimatedTokens: number
  ): Promise<{ allowed: boolean; reason?: string }> {
    // Check from finest granularity first
    const checks = [
      this.checkUserQuota(userId, estimatedTokens),
      this.checkTeamQuota(teamId, estimatedTokens),
      this.checkOrgQuota(orgId, estimatedTokens),
    ];

    const results = await Promise.all(checks);

    for (const result of results) {
      if (!result.allowed) {
        return result;
      }
    }

    return { allowed: true };
  }

  private async checkTeamQuota(
    teamId: string,
    tokens: number
  ): Promise<{ allowed: boolean; reason?: string }> {
    const key = `quota:team:${teamId}:${this.getTodayKey()}`;
    const current = await this.redis.get(key);
    const used = parseInt(current || '0');
    const limit = await this.getTeamDailyLimit(teamId);

    if (used + tokens > limit) {
      return {
        allowed: false,
        reason: `Team daily quota exceeded: ${used}/${limit} tokens`
      };
    }
    return { allowed: true };
  }

  private getTodayKey(): string {
    return new Date().toISOString().split('T')[0];
  }
}

Cost Management & Analytics

Real-Time Cost Tracking

python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import asyncio

@dataclass
class UsageRecord:
    timestamp: datetime
    team_id: str
    project_id: str
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    latency_ms: int
    cache_hit: bool = False
    request_id: str = ""

class CostTracker:
    """Real-time cost tracking with budget enforcement"""

    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},         # per 1M tokens
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet-4": {"input": 3.00, "output": 15.00},
        "claude-3-5-haiku": {"input": 0.80, "output": 4.00},
        "gemini-2.5-pro": {"input": 1.25, "output": 10.00},
    }

    def __init__(self):
        self.records: list[UsageRecord] = []
        self.budget_alerts: dict[str, float] = {}

    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        pricing = self.PRICING.get(model)
        if not pricing:
            return 0.0
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        return round(input_cost + output_cost, 6)

    async def record(
        self,
        team_id: str,
        project_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: int,
        cache_hit: bool = False,
    ):
        cost = 0.0 if cache_hit else self.calculate_cost(model, input_tokens, output_tokens)

        record = UsageRecord(
            timestamp=datetime.utcnow(),
            team_id=team_id,
            project_id=project_id,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost_usd=cost,
            latency_ms=latency_ms,
            cache_hit=cache_hit,
        )
        self.records.append(record)

        # Check budget alerts
        await self._check_budget_alert(team_id, cost)

    async def _check_budget_alert(self, team_id: str, new_cost: float):
        monthly_spend = await self.get_monthly_spend(team_id)
        budget = self.budget_alerts.get(team_id, float("inf"))

        if monthly_spend / budget > 0.8:
            await self._send_alert(team_id, monthly_spend, budget)

    async def get_monthly_spend(self, team_id: str) -> float:
        return sum(
            r.cost_usd for r in self.records
            if r.team_id == team_id
            and r.timestamp.month == datetime.utcnow().month
        )

Cost Dashboard Metrics

Metric Calculation Alert Threshold
Average Cost per Request Monthly total / request count >20% month-over-month increase
Model Cost Distribution Per-model spend / total spend Single model >60% of total
Cache Savings Cache hits × average request cost Hit rate <10% triggers alert
Team Budget Burn Rate Current month spend / monthly budget >80% warning, >95% throttle
Token Efficiency Effective output tokens / total tokens <50% indicates over-prompting

Advanced Features

Semantic Caching

Traditional caching matches exact strings. Semantic caching uses vector similarity—"What's the weather in Beijing today" and "Today's Beijing weather?" can hit the same cache entry.

python
import numpy as np
from typing import Optional

class SemanticCache:
    """Vector similarity-based semantic cache"""

    def __init__(self, similarity_threshold: float = 0.93):
        self.threshold = similarity_threshold
        self.embeddings: list[np.ndarray] = []
        self.responses: list[dict] = []
        self.ttl_seconds = 3600  # 1 hour TTL

    async def get_embedding(self, text: str) -> np.ndarray:
        # Use lightweight embedding model
        # Recommended: text-embedding-3-small (low cost, fast)
        response = await embedding_client.create(
            model="text-embedding-3-small",
            input=text
        )
        return np.array(response.data[0].embedding)

    async def get(self, messages: list[dict]) -> Optional[dict]:
        query_text = messages[-1]["content"]
        query_embedding = await self.get_embedding(query_text)

        if not self.embeddings:
            return None

        # Cosine similarity
        similarities = np.dot(self.embeddings, query_embedding) / (
            np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(query_embedding)
        )
        max_idx = np.argmax(similarities)
        max_sim = similarities[max_idx]

        if max_sim >= self.threshold:
            return self.responses[max_idx]
        return None

    async def set(self, messages: list[dict], response: dict):
        query_text = messages[-1]["content"]
        embedding = await self.get_embedding(query_text)
        self.embeddings.append(embedding)
        self.responses.append(response)

Automatic Fallback Chains

python
import asyncio
from dataclasses import dataclass

@dataclass
class FallbackChain:
    primary: str
    secondary: str
    tertiary: str
    timeout_ms: int = 30000

class FallbackManager:
    """Three-tier fallback manager"""

    DEFAULT_CHAINS = {
        "gpt-4o": FallbackChain("gpt-4o", "claude-sonnet-4", "gpt-4o-mini"),
        "claude-sonnet-4": FallbackChain("claude-sonnet-4", "gpt-4o", "claude-3-5-haiku"),
    }

    async def call_with_fallback(self, model: str, request: "ChatRequest") -> dict:
        chain = self.DEFAULT_CHAINS.get(model, FallbackChain(model, "gpt-4o-mini", "gpt-4o-mini"))

        for i, target in enumerate([chain.primary, chain.secondary, chain.tertiary]):
            try:
                response = await asyncio.wait_for(
                    self._call_model(target, request),
                    timeout=chain.timeout_ms / 1000
                )
                if i > 0:
                    # Record fallback event
                    await self._record_fallback(model, target, i)
                return response
            except (asyncio.TimeoutError, Exception) as e:
                if i == 2:  # All models failed
                    raise HTTPException(503, f"All models failed: {str(e)}")
                continue

    async def _call_model(self, model: str, request: "ChatRequest") -> dict:
        # Actual model provider API call
        ...

    async def _record_fallback(self, original: str, actual: str, level: int):
        # Record degradation metrics, trigger alerts
        ...

Observability Integration (OpenTelemetry)

python
from opentelemetry import trace
from opentelemetry.trace import StatusCode

tracer = trace.get_tracer("llm-gateway")

class ObservableGateway:
    """Gateway with OpenTelemetry instrumentation"""

    async def process_request(self, request: "ChatRequest", team_id: str) -> dict:
        with tracer.start_as_current_span("gateway.process") as span:
            span.set_attribute("llm.team_id", team_id)
            span.set_attribute("llm.requested_model", request.model)
            span.set_attribute("llm.message_count", len(request.messages))

            # Routing decision
            with tracer.start_as_current_span("gateway.route") as route_span:
                target_model = await self.router.route(request)
                route_span.set_attribute("llm.routed_model", target_model)

            # Model inference
            with tracer.start_as_current_span("gateway.inference") as infer_span:
                response = await self.call_model(target_model, request)
                infer_span.set_attribute("llm.input_tokens", response["usage"]["input"])
                infer_span.set_attribute("llm.output_tokens", response["usage"]["output"])
                infer_span.set_attribute("llm.cost_usd", response["cost"])

            span.set_status(StatusCode.OK)
            return response

Open Source Solutions Comparison

Feature LiteLLM Portkey Helicone Custom Build
Model Support 100+ 200+ Major models On demand
Unified API Format ✅ OpenAI-compatible ✅ OpenAI-compatible ✅ Proxy mode Custom
Load Balancing Custom
Semantic Caching Custom
Cost Tracking ✅ Basic ✅ Full dashboard ✅ Visual Custom
Fallback Chains Custom
Prompt Management Custom
Self-Hosted ✅ Open source ✅ Open core ❌ SaaS only
Production Readiness ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐ Depends on investment
Team Size Fit 5-50 10-500 1-20 50+

Selection Recommendations

  • Quick Start (ship in 1 week): LiteLLM — open source, one-line model switching
  • Enterprise Requirements: Portkey — complete Gateway + observability + prompt management
  • Observability Only: Helicone — zero-intrusion proxy mode, observe without managing
  • Deep Customization: Build on top of LiteLLM—keep the core routing logic, customize rate limiting and billing

Production Best Practices

1. Progressive Migration Strategy

code
Phase 1: Proxy mode (pass-through all requests, collect baseline data)
Phase 2: Enable caching (validate hit rates and correctness)
Phase 3: Enable routing (canary 10% traffic to intelligent routing)
Phase 4: Enable rate limiting (gradually tighten quotas)
Phase 5: Full rollout (close direct connection channels)

2. Critical Operations Metrics

Metric SLO Alert Condition
Gateway P99 Latency < 100ms (excluding model inference) > 200ms
Cache Hit Rate > 15% < 5% sustained for 1 hour
Fallback Trigger Rate < 1% > 5% sustained for 5 minutes
Token Rate Limit Rejection < 2% > 10%
Model Call Success Rate > 99.5% < 98%

3. Security Hardening

  • All requests pass through prompt injection detection (keyword + classifier dual defense)
  • Sensitive data (PII) is redacted before sending to external models
  • API key rotation with least-privilege principles
  • Audit logs capture all model calls (full prompts, encrypted at rest)

For more on prompt security, see Prompt Injection Attack and Defense Guide.


FAQ

Q1: How is an LLM Gateway different from a traditional API Gateway?

Traditional API Gateways handle deterministic HTTP requests with simple request-count rate limiting. LLM Gateways must handle non-deterministic model calls where rate limiting extends from request counts to token counts. They also manage streaming responses, intelligent model routing, semantic caching, and cost attribution—challenges that don't exist in traditional API management.

Q2: Should I build a custom LLM Gateway or use an open-source solution?

If your team uses fewer than 5 models with under 1M monthly calls, start with open-source solutions like LiteLLM or Portkey. When you exceed 10 models, need deep routing customization, or have strict data security requirements, build on top of open-source solutions rather than from scratch.

Q3: How should token-based and request-based rate limiting work together?

Deploy both simultaneously: request-based limiting (e.g., 100 RPM) as the first defense against burst traffic, and token-based limiting (e.g., 100K TPM) as the second line for actual cost control. For streaming requests, reconcile actual token consumption asynchronously after the response completes.

Q4: What cache hit rates can semantic caching achieve?

It depends on the use case. Customer support with repetitive questions can achieve 30-50% hit rates, while code generation typically falls below 5%. Set appropriate similarity thresholds (recommended 0.92-0.95) and configure them per use case for optimal results.

Q5: How do you achieve zero-downtime model switching?

Decouple routing rules from business code through the Gateway configuration. Business code declares intent (e.g., "I need a coding-capable model"), and the Gateway routes to the specific model based on current configuration. Switching models only requires a config update—no code changes, no downtime.


Summary

An LLM Gateway is essential infrastructure for the multi-model era. By providing a unified entry point for model routing, token-based rate limiting, cost tracking, and fallback management, enterprises can reduce LLM operational costs by 30-60% while improving availability from "hope for the best" to 99.9%.

Key design principles:

  1. Layered Rate Limiting: Request-level + token-level dual protection
  2. Intelligent Routing: Task complexity determines model selection, not manual specification
  3. Cost Visibility: Every dollar traceable to team and project
  4. Progressive Migration: Start with pass-through proxy, gradually enable advanced features
  5. Open Source First: Build on LiteLLM/Portkey rather than from scratch

Use JSON Formatter to debug Gateway config files and API responses, and Text Diff to compare outputs across different models.