TL;DR
When enterprises use GPT-4, Claude, Gemini, and open-source models simultaneously, teams calling different provider APIs directly leads to uncontrolled costs, no unified monitoring, and inability to switch models flexibly. An LLM Gateway applies the traditional API gateway pattern to the LLM domain, providing a unified entry point for intelligent multi-model routing, token-based rate limiting, real-time cost tracking, and automatic fallback with graceful degradation. This guide covers architecture design through production-ready code implementation for a complete LLM Gateway solution.
This is article #18 in the AI Architect Course series. For observability integration, see Agent Observability Engineering.
Table of Contents
- Key Takeaways
- Why You Need an LLM Gateway
- LLM Gateway Architecture Overview
- Model Routing Strategies
- Rate Limiting & Quota Management
- Cost Management & Analytics
- Advanced Features
- Open Source Solutions Comparison
- Production Best Practices
- FAQ
- Summary
- Related Resources
Key Takeaways
- Unified entry point eliminates API fragmentation: Business code programs against the Gateway abstraction; underlying model providers can be hot-swapped without vendor lock-in
- Token-level rate limiting is essential for LLM workloads: Traditional request-count limiting cannot accurately control costs—one long prompt may consume 100x the tokens of a short request
- Intelligent routing reduces costs 30-60%: Automatically select the most cost-effective model based on task complexity—simple tasks use small models, only complex tasks warrant large models
- Semantic caching is the most underrated optimization: In high-repetition scenarios, cache hit rates of 30-50% directly save the corresponding proportion of model invocation costs
- Fallback chains guarantee 99.9% availability: A three-tier degradation strategy (primary → secondary → local model) ensures business continuity regardless of provider outages
- Cost attribution at team/project granularity: Track every team's and project's model consumption as precisely as a cloud bill, supporting internal chargeback and budget governance
Why You Need an LLM Gateway
The Multi-Model Management Challenge
Modern AI teams face a common set of operational pain points:
| Problem | Symptom | Impact |
|---|---|---|
| API Fragmentation | 10+ teams each integrating different model providers | Duplicated integration work, no unified observability |
| Cost Black Holes | No precise tracking of per-project model consumption | End-of-month bills 3-5x over budget |
| Vendor Lock-in | Business code tightly coupled to specific model APIs | Switching models requires rewriting the call layer |
| Inconsistent Limits | Teams managing their own API keys and quotas | One team's overuse throttles everyone |
| No Fault Tolerance | Model service downtime directly impacts production | No degradation strategy, single points of failure |
From Traditional API Gateway to LLM Gateway
Traditional API Gateways (Kong, Envoy, Nginx) have solved routing, rate limiting, and authentication for microservices. An LLM Gateway adapts these proven patterns to the large model domain while adding LLM-specific capabilities:
LLM Gateway Architecture Overview
Core Components
Request Lifecycle
Every LLM request passes through the following stages within the Gateway:
Gateway Skeleton Implementation (Python)
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
import time
import hashlib
app = FastAPI()
class ChatRequest(BaseModel):
model: str
messages: list[dict]
temperature: float = 0.7
max_tokens: Optional[int] = None
metadata: Optional[dict] = None # team_id, project_id
class GatewayConfig:
def __init__(self):
self.route_rules: dict = {}
self.rate_limits: dict = {}
self.fallback_chains: dict = {}
self.cost_config: dict = {}
class LLMGateway:
def __init__(self, config: GatewayConfig):
self.config = config
self.router = ModelRouter(config.route_rules)
self.rate_limiter = TokenRateLimiter(config.rate_limits)
self.cache = SemanticCache()
self.cost_tracker = CostTracker(config.cost_config)
self.fallback_manager = FallbackManager(config.fallback_chains)
async def process_request(self, request: ChatRequest, team_id: str) -> dict:
# 1. Rate limit check
if not await self.rate_limiter.allow(team_id, request):
raise HTTPException(429, "Token quota exceeded")
# 2. Semantic cache lookup
cached = await self.cache.get(request.messages)
if cached:
return cached
# 3. Routing decision
target_model = await self.router.route(request)
# 4. Model call with fallback
response = await self.fallback_manager.call_with_fallback(
target_model, request
)
# 5. Async: cache write + cost recording
await self.cache.set(request.messages, response)
await self.cost_tracker.record(team_id, target_model, response)
return response
Model Routing Strategies
Content-Based Routing (Task Complexity)
The most effective cost-reduction strategy: route simple tasks to cheap small models, and only send complex tasks to expensive large models.
from enum import Enum
import tiktoken
class TaskComplexity(Enum):
SIMPLE = "simple" # Format conversion, simple Q&A
MODERATE = "moderate" # Summarization, translation, general reasoning
COMPLEX = "complex" # Code generation, math reasoning, long analysis
class ContentBasedRouter:
"""Intelligent routing based on content complexity"""
COMPLEXITY_SIGNALS = {
"code_keywords": ["implement", "debug", "refactor", "algorithm"],
"reasoning_keywords": ["analyze", "compare", "evaluate", "prove"],
"simple_keywords": ["format", "convert", "translate", "summarize"],
}
MODEL_MAP = {
TaskComplexity.SIMPLE: "gpt-4o-mini",
TaskComplexity.MODERATE: "claude-3-5-haiku",
TaskComplexity.COMPLEX: "claude-sonnet-4",
}
def estimate_complexity(self, messages: list[dict]) -> TaskComplexity:
last_message = messages[-1]["content"].lower()
token_count = len(tiktoken.encoding_for_model("gpt-4").encode(last_message))
# Long prompts typically indicate complex tasks
if token_count > 2000:
return TaskComplexity.COMPLEX
# Keyword matching
for keyword in self.COMPLEXITY_SIGNALS["code_keywords"]:
if keyword in last_message:
return TaskComplexity.COMPLEX
for keyword in self.COMPLEXITY_SIGNALS["reasoning_keywords"]:
if keyword in last_message:
return TaskComplexity.MODERATE
return TaskComplexity.SIMPLE
async def route(self, request: "ChatRequest") -> str:
complexity = self.estimate_complexity(request.messages)
base_model = self.MODEL_MAP[complexity]
# Respect explicit model selection
if request.model and request.model != "auto":
return request.model
return base_model
Budget-Aware Routing
// TypeScript: Budget-aware router
interface BudgetConfig {
teamId: string;
monthlyBudget: number; // Monthly budget in USD
currentSpend: number; // Current month spend
alertThreshold: number; // Alert threshold (0.8 = 80%)
hardLimitThreshold: number; // Hard limit threshold (0.95 = 95%)
}
interface ModelPricing {
model: string;
inputCostPer1K: number; // Cost per 1K input tokens
outputCostPer1K: number; // Cost per 1K output tokens
}
class BudgetAwareRouter {
private pricing: Map<string, ModelPricing> = new Map([
['gpt-4o', { model: 'gpt-4o', inputCostPer1K: 0.0025, outputCostPer1K: 0.01 }],
['gpt-4o-mini', { model: 'gpt-4o-mini', inputCostPer1K: 0.00015, outputCostPer1K: 0.0006 }],
['claude-sonnet-4', { model: 'claude-sonnet-4', inputCostPer1K: 0.003, outputCostPer1K: 0.015 }],
['claude-3-5-haiku', { model: 'claude-3-5-haiku', inputCostPer1K: 0.0008, outputCostPer1K: 0.004 }],
]);
async route(request: ChatRequest, budget: BudgetConfig): Promise<string> {
const spendRatio = budget.currentSpend / budget.monthlyBudget;
// Over hard limit: force cheapest model
if (spendRatio >= budget.hardLimitThreshold) {
console.warn(`[Budget] Team ${budget.teamId} hit hard limit, forcing cheapest model`);
return 'gpt-4o-mini';
}
// Over alert threshold: downgrade to mid-tier
if (spendRatio >= budget.alertThreshold) {
return this.selectMidTierModel(request);
}
// Budget available: route normally
return request.model || 'gpt-4o';
}
private selectMidTierModel(request: ChatRequest): string {
const isCodeTask = request.messages.some(m =>
m.content.includes('```') || m.content.includes('code')
);
return isCodeTask ? 'claude-3-5-haiku' : 'gpt-4o-mini';
}
}
A/B Testing and Canary Deployments for Models
import random
from dataclasses import dataclass
@dataclass
class CanaryRule:
model_a: str # Baseline model
model_b: str # Candidate model
traffic_percent: float # Candidate traffic percentage (0-1)
metric_key: str # Evaluation metric
class CanaryRouter:
"""Model canary deployment router"""
def __init__(self):
self.active_canaries: dict[str, CanaryRule] = {}
self.metrics_collector = MetricsCollector()
async def route(self, request: "ChatRequest", canary_id: str) -> str:
rule = self.active_canaries.get(canary_id)
if not rule:
return request.model
# Allocate by traffic percentage
if random.random() < rule.traffic_percent:
selected = rule.model_b
variant = "canary"
else:
selected = rule.model_a
variant = "baseline"
# Tag request for downstream analysis
self.metrics_collector.tag_request(
request_id=request.metadata.get("request_id"),
variant=variant,
model=selected
)
return selected
Rate Limiting & Quota Management
Why Request-Level Limiting Falls Short
| Dimension | Request-Level Limiting | Token-Level Limiting |
|---|---|---|
| Granularity | Requests per minute (RPM) | Tokens per minute (TPM) |
| Cost Accuracy | Low—a 10K token request counts the same as 100 tokens | High—precisely reflects actual resource consumption |
| Use Case | Prevent burst floods | Control actual costs |
| Implementation Complexity | Simple | Moderate (requires token counting) |
Best practice: Dual-layer limiting = Request-level + Token-level enforced simultaneously
Redis-Based Token Rate Limiter
import redis.asyncio as redis
import time
import tiktoken
class TokenRateLimiter:
"""Sliding window token-based rate limiter"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.encoder = tiktoken.encoding_for_model("gpt-4")
def count_tokens(self, messages: list[dict]) -> int:
total = 0
for msg in messages:
total += len(self.encoder.encode(msg.get("content", "")))
total += 4 # role + formatting overhead
return total
async def allow(self, team_id: str, request: "ChatRequest") -> bool:
"""Check if the request is within quota"""
token_count = self.count_tokens(request.messages)
now = time.time()
window_key = f"ratelimit:token:{team_id}"
window_size = 60 # 1-minute sliding window
pipe = self.redis.pipeline()
# Remove expired entries
pipe.zremrangebyscore(window_key, 0, now - window_size)
# Get current window token total
pipe.zrangebyscore(window_key, now - window_size, now)
results = await pipe.execute()
current_tokens = sum(int(score) for score in results[1]) if results[1] else 0
limit = await self.get_team_limit(team_id)
if current_tokens + token_count > limit:
return False
# Record this request's token count
await self.redis.zadd(window_key, {f"{now}:{token_count}": now})
await self.redis.expire(window_key, window_size + 10)
return True
async def get_team_limit(self, team_id: str) -> int:
"""Get team's TPM limit"""
config = await self.redis.hget("team_limits", team_id)
return int(config) if config else 100_000 # Default 100K TPM
async def record_actual_usage(self, team_id: str, actual_tokens: int):
"""Reconcile actual token usage after streaming response completes"""
now = time.time()
window_key = f"ratelimit:token:{team_id}"
await self.redis.zadd(window_key, {f"{now}:output:{actual_tokens}": now})
Hierarchical Quota Management
// TypeScript: Hierarchical quota management
interface QuotaConfig {
organization: {
monthlyTokens: number; // Org-level monthly quota
maxConcurrent: number; // Max concurrent requests
};
team: {
dailyTokens: number; // Team-level daily quota
rpm: number; // Requests per minute
tpm: number; // Tokens per minute
};
user: {
hourlyTokens: number; // User-level hourly quota
rpm: number;
};
}
class HierarchicalQuotaManager {
private redis: Redis;
async checkQuota(
orgId: string,
teamId: string,
userId: string,
estimatedTokens: number
): Promise<{ allowed: boolean; reason?: string }> {
// Check from finest granularity first
const checks = [
this.checkUserQuota(userId, estimatedTokens),
this.checkTeamQuota(teamId, estimatedTokens),
this.checkOrgQuota(orgId, estimatedTokens),
];
const results = await Promise.all(checks);
for (const result of results) {
if (!result.allowed) {
return result;
}
}
return { allowed: true };
}
private async checkTeamQuota(
teamId: string,
tokens: number
): Promise<{ allowed: boolean; reason?: string }> {
const key = `quota:team:${teamId}:${this.getTodayKey()}`;
const current = await this.redis.get(key);
const used = parseInt(current || '0');
const limit = await this.getTeamDailyLimit(teamId);
if (used + tokens > limit) {
return {
allowed: false,
reason: `Team daily quota exceeded: ${used}/${limit} tokens`
};
}
return { allowed: true };
}
private getTodayKey(): string {
return new Date().toISOString().split('T')[0];
}
}
Cost Management & Analytics
Real-Time Cost Tracking
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import asyncio
@dataclass
class UsageRecord:
timestamp: datetime
team_id: str
project_id: str
model: str
input_tokens: int
output_tokens: int
cost_usd: float
latency_ms: int
cache_hit: bool = False
request_id: str = ""
class CostTracker:
"""Real-time cost tracking with budget enforcement"""
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00}, # per 1M tokens
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00},
"gemini-2.5-pro": {"input": 1.25, "output": 10.00},
}
def __init__(self):
self.records: list[UsageRecord] = []
self.budget_alerts: dict[str, float] = {}
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
pricing = self.PRICING.get(model)
if not pricing:
return 0.0
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return round(input_cost + output_cost, 6)
async def record(
self,
team_id: str,
project_id: str,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: int,
cache_hit: bool = False,
):
cost = 0.0 if cache_hit else self.calculate_cost(model, input_tokens, output_tokens)
record = UsageRecord(
timestamp=datetime.utcnow(),
team_id=team_id,
project_id=project_id,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
latency_ms=latency_ms,
cache_hit=cache_hit,
)
self.records.append(record)
# Check budget alerts
await self._check_budget_alert(team_id, cost)
async def _check_budget_alert(self, team_id: str, new_cost: float):
monthly_spend = await self.get_monthly_spend(team_id)
budget = self.budget_alerts.get(team_id, float("inf"))
if monthly_spend / budget > 0.8:
await self._send_alert(team_id, monthly_spend, budget)
async def get_monthly_spend(self, team_id: str) -> float:
return sum(
r.cost_usd for r in self.records
if r.team_id == team_id
and r.timestamp.month == datetime.utcnow().month
)
Cost Dashboard Metrics
| Metric | Calculation | Alert Threshold |
|---|---|---|
| Average Cost per Request | Monthly total / request count | >20% month-over-month increase |
| Model Cost Distribution | Per-model spend / total spend | Single model >60% of total |
| Cache Savings | Cache hits × average request cost | Hit rate <10% triggers alert |
| Team Budget Burn Rate | Current month spend / monthly budget | >80% warning, >95% throttle |
| Token Efficiency | Effective output tokens / total tokens | <50% indicates over-prompting |
Advanced Features
Semantic Caching
Traditional caching matches exact strings. Semantic caching uses vector similarity—"What's the weather in Beijing today" and "Today's Beijing weather?" can hit the same cache entry.
import numpy as np
from typing import Optional
class SemanticCache:
"""Vector similarity-based semantic cache"""
def __init__(self, similarity_threshold: float = 0.93):
self.threshold = similarity_threshold
self.embeddings: list[np.ndarray] = []
self.responses: list[dict] = []
self.ttl_seconds = 3600 # 1 hour TTL
async def get_embedding(self, text: str) -> np.ndarray:
# Use lightweight embedding model
# Recommended: text-embedding-3-small (low cost, fast)
response = await embedding_client.create(
model="text-embedding-3-small",
input=text
)
return np.array(response.data[0].embedding)
async def get(self, messages: list[dict]) -> Optional[dict]:
query_text = messages[-1]["content"]
query_embedding = await self.get_embedding(query_text)
if not self.embeddings:
return None
# Cosine similarity
similarities = np.dot(self.embeddings, query_embedding) / (
np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(query_embedding)
)
max_idx = np.argmax(similarities)
max_sim = similarities[max_idx]
if max_sim >= self.threshold:
return self.responses[max_idx]
return None
async def set(self, messages: list[dict], response: dict):
query_text = messages[-1]["content"]
embedding = await self.get_embedding(query_text)
self.embeddings.append(embedding)
self.responses.append(response)
Automatic Fallback Chains
import asyncio
from dataclasses import dataclass
@dataclass
class FallbackChain:
primary: str
secondary: str
tertiary: str
timeout_ms: int = 30000
class FallbackManager:
"""Three-tier fallback manager"""
DEFAULT_CHAINS = {
"gpt-4o": FallbackChain("gpt-4o", "claude-sonnet-4", "gpt-4o-mini"),
"claude-sonnet-4": FallbackChain("claude-sonnet-4", "gpt-4o", "claude-3-5-haiku"),
}
async def call_with_fallback(self, model: str, request: "ChatRequest") -> dict:
chain = self.DEFAULT_CHAINS.get(model, FallbackChain(model, "gpt-4o-mini", "gpt-4o-mini"))
for i, target in enumerate([chain.primary, chain.secondary, chain.tertiary]):
try:
response = await asyncio.wait_for(
self._call_model(target, request),
timeout=chain.timeout_ms / 1000
)
if i > 0:
# Record fallback event
await self._record_fallback(model, target, i)
return response
except (asyncio.TimeoutError, Exception) as e:
if i == 2: # All models failed
raise HTTPException(503, f"All models failed: {str(e)}")
continue
async def _call_model(self, model: str, request: "ChatRequest") -> dict:
# Actual model provider API call
...
async def _record_fallback(self, original: str, actual: str, level: int):
# Record degradation metrics, trigger alerts
...
Observability Integration (OpenTelemetry)
from opentelemetry import trace
from opentelemetry.trace import StatusCode
tracer = trace.get_tracer("llm-gateway")
class ObservableGateway:
"""Gateway with OpenTelemetry instrumentation"""
async def process_request(self, request: "ChatRequest", team_id: str) -> dict:
with tracer.start_as_current_span("gateway.process") as span:
span.set_attribute("llm.team_id", team_id)
span.set_attribute("llm.requested_model", request.model)
span.set_attribute("llm.message_count", len(request.messages))
# Routing decision
with tracer.start_as_current_span("gateway.route") as route_span:
target_model = await self.router.route(request)
route_span.set_attribute("llm.routed_model", target_model)
# Model inference
with tracer.start_as_current_span("gateway.inference") as infer_span:
response = await self.call_model(target_model, request)
infer_span.set_attribute("llm.input_tokens", response["usage"]["input"])
infer_span.set_attribute("llm.output_tokens", response["usage"]["output"])
infer_span.set_attribute("llm.cost_usd", response["cost"])
span.set_status(StatusCode.OK)
return response
Open Source Solutions Comparison
| Feature | LiteLLM | Portkey | Helicone | Custom Build |
|---|---|---|---|---|
| Model Support | 100+ | 200+ | Major models | On demand |
| Unified API Format | ✅ OpenAI-compatible | ✅ OpenAI-compatible | ✅ Proxy mode | Custom |
| Load Balancing | ✅ | ✅ | ❌ | Custom |
| Semantic Caching | ✅ | ✅ | ❌ | Custom |
| Cost Tracking | ✅ Basic | ✅ Full dashboard | ✅ Visual | Custom |
| Fallback Chains | ✅ | ✅ | ❌ | Custom |
| Prompt Management | ❌ | ✅ | ❌ | Custom |
| Self-Hosted | ✅ Open source | ✅ Open core | ❌ SaaS only | ✅ |
| Production Readiness | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | Depends on investment |
| Team Size Fit | 5-50 | 10-500 | 1-20 | 50+ |
Selection Recommendations
- Quick Start (ship in 1 week): LiteLLM — open source, one-line model switching
- Enterprise Requirements: Portkey — complete Gateway + observability + prompt management
- Observability Only: Helicone — zero-intrusion proxy mode, observe without managing
- Deep Customization: Build on top of LiteLLM—keep the core routing logic, customize rate limiting and billing
Production Best Practices
1. Progressive Migration Strategy
Phase 1: Proxy mode (pass-through all requests, collect baseline data)
Phase 2: Enable caching (validate hit rates and correctness)
Phase 3: Enable routing (canary 10% traffic to intelligent routing)
Phase 4: Enable rate limiting (gradually tighten quotas)
Phase 5: Full rollout (close direct connection channels)
2. Critical Operations Metrics
| Metric | SLO | Alert Condition |
|---|---|---|
| Gateway P99 Latency | < 100ms (excluding model inference) | > 200ms |
| Cache Hit Rate | > 15% | < 5% sustained for 1 hour |
| Fallback Trigger Rate | < 1% | > 5% sustained for 5 minutes |
| Token Rate Limit Rejection | < 2% | > 10% |
| Model Call Success Rate | > 99.5% | < 98% |
3. Security Hardening
- All requests pass through prompt injection detection (keyword + classifier dual defense)
- Sensitive data (PII) is redacted before sending to external models
- API key rotation with least-privilege principles
- Audit logs capture all model calls (full prompts, encrypted at rest)
For more on prompt security, see Prompt Injection Attack and Defense Guide.
FAQ
Q1: How is an LLM Gateway different from a traditional API Gateway?
Traditional API Gateways handle deterministic HTTP requests with simple request-count rate limiting. LLM Gateways must handle non-deterministic model calls where rate limiting extends from request counts to token counts. They also manage streaming responses, intelligent model routing, semantic caching, and cost attribution—challenges that don't exist in traditional API management.
Q2: Should I build a custom LLM Gateway or use an open-source solution?
If your team uses fewer than 5 models with under 1M monthly calls, start with open-source solutions like LiteLLM or Portkey. When you exceed 10 models, need deep routing customization, or have strict data security requirements, build on top of open-source solutions rather than from scratch.
Q3: How should token-based and request-based rate limiting work together?
Deploy both simultaneously: request-based limiting (e.g., 100 RPM) as the first defense against burst traffic, and token-based limiting (e.g., 100K TPM) as the second line for actual cost control. For streaming requests, reconcile actual token consumption asynchronously after the response completes.
Q4: What cache hit rates can semantic caching achieve?
It depends on the use case. Customer support with repetitive questions can achieve 30-50% hit rates, while code generation typically falls below 5%. Set appropriate similarity thresholds (recommended 0.92-0.95) and configure them per use case for optimal results.
Q5: How do you achieve zero-downtime model switching?
Decouple routing rules from business code through the Gateway configuration. Business code declares intent (e.g., "I need a coding-capable model"), and the Gateway routes to the specific model based on current configuration. Switching models only requires a config update—no code changes, no downtime.
Summary
An LLM Gateway is essential infrastructure for the multi-model era. By providing a unified entry point for model routing, token-based rate limiting, cost tracking, and fallback management, enterprises can reduce LLM operational costs by 30-60% while improving availability from "hope for the best" to 99.9%.
Key design principles:
- Layered Rate Limiting: Request-level + token-level dual protection
- Intelligent Routing: Task complexity determines model selection, not manual specification
- Cost Visibility: Every dollar traceable to team and project
- Progressive Migration: Start with pass-through proxy, gradually enable advanced features
- Open Source First: Build on LiteLLM/Portkey rather than from scratch
Use JSON Formatter to debug Gateway config files and API responses, and Text Diff to compare outputs across different models.