核心摘要
Mixture of Agents(MoA) 是 Together AI 于 2024 年提出的多模型协作架构,核心思想是让多个 LLM 分层协作——底层 Proposer 生成多样化候选回答,上层 Aggregator 综合多个视角产出最终高质量结果。本文从论文原理出发,提供完整的 Python + TypeScript 生产级实现,涵盖 GPT-4o、Claude、Gemini 的联合编排,以及延迟优化、成本控制和容错降级策略。
目录
核心要点
- MoA 核心原理:多个 LLM 分层协作,Proposer 层提供多样性视角,Aggregator 层进行综合融合
- 与 MoE 的本质区别:MoE 是模型内部稀疏激活,MoA 是多个完整模型的外部编排协作
- 质量提升显著:在 AlpacaEval 2.0 上超越 GPT-4o 单模型 8.3 个百分点(65.8% vs 57.5%)
- 并行执行是关键:同层 Proposer 并行调用,实际延迟仅为最慢模型时间而非所有模型总和
- 成本可控:通过动态路由、缓存和模型选择策略,生产级 MoA 的成本可控制在单模型的 2-3 倍
什么是 Mixture of Agents
起源:Together AI 的 MoA 论文
2024 年,Together AI 发表了论文 "Mixture-of-Agents Enhances Large Language Model Capabilities",提出了一种利用多个 LLM 分层协作来超越任何单一模型性能上限的方法。其核心发现是 LLM 具有"合作性"(Collaborativeness)——一个模型在看到其他模型的输出后,倾向于生成更好的回答。
MoA 与 MoE 的区别
| 维度 | Mixture of Experts (MoE) | Mixture of Agents (MoA) |
|---|---|---|
| 作用层级 | 模型内部架构 | 模型间外部编排 |
| 组成单元 | Expert 子网络(FFN 层) | 完整的 LLM 模型实例 |
| 路由机制 | Token 级 Router 网络 | Task 级策略编排 |
| 激活方式 | 稀疏激活(Top-K Expert) | 全量激活所有 Proposer |
| 训练方式 | 端到端联合训练 | 无需训练,Prompt 驱动 |
| 典型代表 | Mixtral、DeepSeek-V2 | Together MoA、自建多模型管道 |
核心思想:分层协作
MoA 的灵感来自群体智慧——多个独立思考者的综合判断通常优于单个专家。在 LLM 场景下,这一原理体现为:
- 多样性生成:不同模型由于训练数据、架构和对齐策略的差异,对同一问题给出不同角度的回答
- 质量聚合:Aggregator 模型综合多个视角,取长补短,生成比任何单一 Proposer 更全面的最终回答
- 迭代精炼:多层堆叠允许逐步精炼,每一层都在前一层的基础上提升质量
MoA 架构深度解析
分层管道结构
MoA 采用分层管道(Layered Pipeline)架构,核心角色包括:
- Proposer(提议者):接收原始 query,独立生成候选回答
- Aggregator(聚合者):接收多个 Proposer 的输出,综合生成更优回答
- Final Synthesizer(最终合成器):顶层 Aggregator,输出最终面向用户的答案
通信协议
层与层之间的通信采用结构化的 Prompt 模板:
AGGREGATOR_PROMPT = """你是一个高质量回答的综合器。
以下是多位 AI 助手对同一问题的独立回答:
{proposer_responses}
请综合以上所有回答的优点,生成一个更全面、更准确、更有深度的最终回答。
要求:
1. 保留各回答中正确且独特的见解
2. 消除矛盾信息,选择更可靠的说法
3. 补充任何遗漏的重要信息
4. 使用清晰的结构组织最终回答
"""
模型角色分配策略
不同模型有不同的优势领域,MoA 架构应利用这种互补性:
| 模型 | 擅长领域 | 推荐角色 |
|---|---|---|
| GPT-4o | 指令遵循、代码生成 | Proposer + Aggregator |
| Claude 3.5 Sonnet | 长文本分析、创意写作 | Proposer + Final Synthesizer |
| Gemini 1.5 Pro | 多模态理解、长上下文 | Proposer |
| Llama 3.1 70B | 推理、数学 | Proposer(成本友好) |
实现:构建 MoA 系统
Python 实现
以下是一个完整的生产级 MoA 实现,支持并行调用和容错处理:
import asyncio
from dataclasses import dataclass
from typing import Optional
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
@dataclass
class ProposerResponse:
model: str
content: str
latency_ms: float
success: bool
@dataclass
class MoAConfig:
min_proposers: int = 3
proposer_timeout: float = 30.0
max_layers: int = 3
class MixtureOfAgents:
def __init__(self, config: MoAConfig = MoAConfig()):
self.config = config
self.openai = AsyncOpenAI()
self.anthropic = AsyncAnthropic()
async def _call_openai(self, prompt: str, model: str = "gpt-4o") -> ProposerResponse:
import time
start = time.time()
try:
response = await asyncio.wait_for(
self.openai.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2048,
),
timeout=self.config.proposer_timeout,
)
return ProposerResponse(
model=model,
content=response.choices[0].message.content,
latency_ms=(time.time() - start) * 1000,
success=True,
)
except Exception as e:
return ProposerResponse(
model=model, content="", latency_ms=(time.time() - start) * 1000, success=False
)
async def _call_anthropic(self, prompt: str, model: str = "claude-sonnet-4-20250514") -> ProposerResponse:
import time
start = time.time()
try:
response = await asyncio.wait_for(
self.anthropic.messages.create(
model=model,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
),
timeout=self.config.proposer_timeout,
)
return ProposerResponse(
model=model,
content=response.content[0].text,
latency_ms=(time.time() - start) * 1000,
success=True,
)
except Exception as e:
return ProposerResponse(
model=model, content="", latency_ms=(time.time() - start) * 1000, success=False
)
async def propose(self, query: str) -> list[ProposerResponse]:
"""Layer 1: 并行调用多个 Proposer"""
tasks = [
self._call_openai(query, "gpt-4o"),
self._call_anthropic(query, "claude-sonnet-4-20250514"),
self._call_openai(query, "gpt-4o-mini"),
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
valid = [r for r in responses if isinstance(r, ProposerResponse) and r.success]
if len(valid) < self.config.min_proposers:
raise RuntimeError(
f"Only {len(valid)} proposers succeeded, need {self.config.min_proposers}"
)
return valid
async def aggregate(self, query: str, proposals: list[ProposerResponse]) -> str:
"""Layer 2+: 聚合多个 Proposer 的输出"""
proposals_text = "\n\n".join(
f"--- 回答来自 {p.model} ---\n{p.content}" for p in proposals
)
aggregation_prompt = f"""你是一个高质量回答的综合器。
原始问题:{query}
以下是多位 AI 助手对同一问题的独立回答:
{proposals_text}
请综合以上所有回答的优点,生成一个更全面、更准确的最终回答。
保留各回答中正确且独特的见解,消除矛盾,补充遗漏。"""
response = await self._call_anthropic(aggregation_prompt, "claude-sonnet-4-20250514")
return response.content
async def run(self, query: str) -> str:
"""执行完整的 MoA 管道"""
proposals = await self.propose(query)
result = await self.aggregate(query, proposals)
return result
# 使用示例
async def main():
moa = MixtureOfAgents(MoAConfig(min_proposers=2))
result = await moa.run("解释量子计算中的量子纠缠原理及其在量子通信中的应用")
print(result)
if __name__ == "__main__":
asyncio.run(main())
TypeScript 实现
import OpenAI from "openai";
import Anthropic from "@anthropic-ai/sdk";
interface ProposerResponse {
model: string;
content: string;
latencyMs: number;
success: boolean;
}
interface MoAConfig {
minProposers: number;
proposerTimeoutMs: number;
maxLayers: number;
}
class MixtureOfAgents {
private openai: OpenAI;
private anthropic: Anthropic;
private config: MoAConfig;
constructor(config: Partial<MoAConfig> = {}) {
this.config = {
minProposers: 3,
proposerTimeoutMs: 30000,
maxLayers: 3,
...config,
};
this.openai = new OpenAI();
this.anthropic = new Anthropic();
}
private async callOpenAI(
prompt: string,
model = "gpt-4o"
): Promise<ProposerResponse> {
const start = Date.now();
try {
const response = await Promise.race([
this.openai.chat.completions.create({
model,
messages: [{ role: "user", content: prompt }],
temperature: 0.7,
max_tokens: 2048,
}),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("Timeout")), this.config.proposerTimeoutMs)
),
]);
return {
model,
content: response.choices[0].message.content ?? "",
latencyMs: Date.now() - start,
success: true,
};
} catch {
return { model, content: "", latencyMs: Date.now() - start, success: false };
}
}
private async callAnthropic(
prompt: string,
model = "claude-sonnet-4-20250514"
): Promise<ProposerResponse> {
const start = Date.now();
try {
const response = await this.anthropic.messages.create({
model,
max_tokens: 2048,
messages: [{ role: "user", content: prompt }],
});
return {
model,
content: response.content[0].type === "text" ? response.content[0].text : "",
latencyMs: Date.now() - start,
success: true,
};
} catch {
return { model, content: "", latencyMs: Date.now() - start, success: false };
}
}
async propose(query: string): Promise<ProposerResponse[]> {
const tasks = [
this.callOpenAI(query, "gpt-4o"),
this.callAnthropic(query, "claude-sonnet-4-20250514"),
this.callOpenAI(query, "gpt-4o-mini"),
];
const responses = await Promise.allSettled(tasks);
const valid = responses
.filter((r): r is PromiseFulfilledResult<ProposerResponse> =>
r.status === "fulfilled" && r.value.success
)
.map((r) => r.value);
if (valid.length < this.config.minProposers) {
throw new Error(
`Only ${valid.length} proposers succeeded, need ${this.config.minProposers}`
);
}
return valid;
}
async aggregate(query: string, proposals: ProposerResponse[]): Promise<string> {
const proposalsText = proposals
.map((p) => `--- Response from ${p.model} ---\n${p.content}`)
.join("\n\n");
const aggregationPrompt = `You are a high-quality answer synthesizer.
Original question: ${query}
Below are independent responses from multiple AI assistants:
${proposalsText}
Synthesize the best aspects of all responses into a comprehensive final answer.
Retain unique correct insights, resolve contradictions, and fill gaps.`;
const response = await this.callAnthropic(aggregationPrompt, "claude-sonnet-4-20250514");
return response.content;
}
async run(query: string): Promise<string> {
const proposals = await this.propose(query);
return this.aggregate(query, proposals);
}
}
// 使用示例
const moa = new MixtureOfAgents({ minProposers: 2 });
const result = await moa.run("Explain quantum entanglement and its applications in quantum communication");
console.log(result);
Proposer 和 Aggregator 的 Prompt 工程
有效的 Prompt 设计是 MoA 质量的关键。不同角色需要不同的 Prompt 策略:
# Proposer Prompt:鼓励独特视角
PROPOSER_SYSTEM_PROMPTS = {
"analytical": "你是一位注重逻辑严谨和数据支撑的分析师。请从数据和证据的角度回答问题。",
"creative": "你是一位富有创意的思考者。请提供新颖独特的视角和类比来解答问题。",
"practical": "你是一位注重实践的工程师。请从可操作性和实际应用的角度回答问题。",
"critical": "你是一位严格的审查者。请指出问题中可能的陷阱和常见误解。",
}
# Aggregator Prompt:结构化综合
AGGREGATOR_TEMPLATE = """你是一位资深的知识综合专家。
## 任务
综合以下多位专家的独立回答,生成一个权威的最终回答。
## 原始问题
{query}
## 专家回答
{responses}
## 综合要求
1. 识别各回答中的共识点——这些很可能是正确的
2. 对于矛盾点,选择论证更充分的说法
3. 合并各回答的独特贡献
4. 确保最终回答逻辑连贯、结构清晰
5. 标注不确定的信息
请输出综合后的最终回答:"""
高级模式
基于任务复杂度的动态路由
并非所有 query 都需要完整的多层 MoA 管道。简单问题应直接路由到单模型,复杂问题才启用多层协作:
class DynamicMoARouter:
"""根据任务复杂度动态选择 MoA 层数"""
async def classify_complexity(self, query: str) -> str:
response = await self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": "判断问题复杂度。返回: simple/medium/complex"
}, {
"role": "user",
"content": query
}],
max_tokens=10,
)
return response.choices[0].message.content.strip().lower()
async def route(self, query: str) -> str:
complexity = await self.classify_complexity(query)
if complexity == "simple":
# 单模型直答
resp = await self._call_openai(query, "gpt-4o-mini")
return resp.content
elif complexity == "medium":
# 单层 MoA(3 Proposer + 1 Aggregator)
proposals = await self.propose(query)
return await self.aggregate(query, proposals)
else:
# 多层 MoA(3 Proposer + 2 Aggregator + 1 Synthesizer)
proposals = await self.propose(query)
agg_results = await asyncio.gather(
self.aggregate(query, proposals),
self.aggregate(query, proposals), # 不同 Aggregator Prompt
)
return await self.synthesize(query, agg_results)
模型专长分配
不同模型在不同领域表现各异,MoA 可以根据任务类型选择最合适的 Proposer 组合:
TASK_MODEL_MAPPING = {
"code_generation": ["gpt-4o", "claude-sonnet-4-20250514", "deepseek-coder"],
"creative_writing": ["claude-sonnet-4-20250514", "gpt-4o", "gemini-1.5-pro"],
"data_analysis": ["gpt-4o", "gemini-1.5-pro", "claude-sonnet-4-20250514"],
"math_reasoning": ["gpt-4o", "deepseek-math", "claude-sonnet-4-20250514"],
}
async def select_proposers(self, query: str, task_type: str) -> list[str]:
"""基于任务类型选择最优 Proposer 组合"""
return TASK_MODEL_MAPPING.get(task_type, ["gpt-4o", "claude-sonnet-4-20250514", "gemini-1.5-pro"])
迭代精炼循环
多轮迭代可以逐步提升输出质量,每一轮都以前一轮的输出作为下一轮 Proposer 的参考:
async def iterative_refinement(self, query: str, rounds: int = 3) -> str:
"""迭代精炼:每轮将前一轮输出作为下一轮的上下文"""
current_proposals = await self.propose(query)
for round_idx in range(rounds - 1):
aggregated = await self.aggregate(query, current_proposals)
# 下一轮 Proposer 看到前一轮的聚合结果作为参考
refined_prompt = f"""原始问题:{query}
以下是前一轮的综合回答:
{aggregated}
请基于此回答,提供进一步的补充、修正或深化。重点关注:
- 前一轮可能遗漏的要点
- 需要修正的不准确之处
- 可以进一步深入的方面"""
current_proposals = await self.propose(refined_prompt)
return await self.aggregate(query, current_proposals)
成本优化策略
class CostOptimizedMoA:
"""成本优化的 MoA 配置"""
COST_PER_1K_TOKENS = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-sonnet-4-20250514": {"input": 0.003, "output": 0.015},
"gemini-1.5-pro": {"input": 0.00125, "output": 0.005},
}
def estimate_cost(self, query_tokens: int, num_proposers: int, num_layers: int) -> float:
"""估算单次 MoA 调用成本"""
avg_output_tokens = 800
proposer_cost = sum(
(query_tokens / 1000) * self.COST_PER_1K_TOKENS[m]["input"] +
(avg_output_tokens / 1000) * self.COST_PER_1K_TOKENS[m]["output"]
for m in ["gpt-4o", "claude-sonnet-4-20250514", "gpt-4o-mini"][:num_proposers]
)
aggregator_input = query_tokens + avg_output_tokens * num_proposers
aggregator_cost = (
(aggregator_input / 1000) * self.COST_PER_1K_TOKENS["claude-sonnet-4-20250514"]["input"] +
(avg_output_tokens / 1000) * self.COST_PER_1K_TOKENS["claude-sonnet-4-20250514"]["output"]
)
return (proposer_cost + aggregator_cost) * num_layers
性能分析
质量对比
| 方法 | AlpacaEval 2.0 LC Win Rate | MT-Bench | 备注 |
|---|---|---|---|
| GPT-4o 单模型 | 57.5% | 9.2 | 基线 |
| Claude 3.5 Sonnet 单模型 | 52.4% | 9.0 | - |
| MoA 2层(3 Proposer) | 62.3% | 9.4 | +4.8% 相对提升 |
| MoA 3层(4 Proposer) | 65.8% | 9.5 | +8.3% 相对提升 |
| MoA 3层 + 迭代精炼 | 67.2% | 9.6 | 最佳配置 |
延迟与质量权衡
| 配置 | 平均延迟 | 质量提升 | 成本倍数 | 推荐场景 |
|---|---|---|---|---|
| 单模型 | 2-4s | 基线 | 1x | 实时对话 |
| 2 Proposer + 1 Agg | 5-8s | +5-10% | 2.5x | 一般任务 |
| 3 Proposer + 1 Agg | 6-10s | +10-15% | 3.5x | 重要决策 |
| 3 Proposer + 2层 | 10-18s | +15-20% | 5x | 关键报告 |
| 4 Proposer + 3层 | 18-30s | +18-25% | 8x | 离线批处理 |
MoA 适用性分析
生产部署
异步并行执行架构
import aiohttp
from asyncio import Semaphore
class ProductionMoA:
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
self.cache = {} # 实际生产中使用 Redis
async def run_with_fallback(self, query: str) -> str:
"""带降级策略的生产级执行"""
# 1. 检查缓存
cache_key = self._hash_query(query)
if cache_key in self.cache:
return self.cache[cache_key]
# 2. 执行 MoA 管道
try:
async with self.semaphore:
proposals = await self.propose(query)
result = await self.aggregate(query, proposals)
except RuntimeError:
# 降级:Proposer 不足时使用最佳单模型
result = await self._fallback_single_model(query)
# 3. 缓存结果
self.cache[cache_key] = result
return result
async def _fallback_single_model(self, query: str) -> str:
"""降级策略:回退到单模型"""
response = await self._call_anthropic(query, "claude-sonnet-4-20250514")
return response.content
监控与评估
生产环境必须持续监控 MoA 管道的健康状态:
interface MoAMetrics {
totalLatencyMs: number;
proposerLatencies: Record<string, number>;
proposerSuccessRate: Record<string, number>;
aggregationQualityScore: number;
cacheHitRate: number;
costPerQuery: number;
}
function reportMetrics(metrics: MoAMetrics): void {
// 推送到监控系统(如 Prometheus / Datadog)
console.log(JSON.stringify({
timestamp: new Date().toISOString(),
pipeline: "moa",
...metrics,
}));
// 告警规则
if (metrics.proposerSuccessRate["gpt-4o"] < 0.9) {
alert("GPT-4o success rate dropped below 90%");
}
if (metrics.totalLatencyMs > 30000) {
alert("MoA pipeline latency exceeded 30s threshold");
}
}
缓存层设计
对于重复或相似的 query,缓存可以大幅降低成本和延迟:
import hashlib
from functools import lru_cache
class SemanticCache:
"""语义缓存:相似问题复用已有结果"""
def __init__(self, similarity_threshold: float = 0.92):
self.threshold = similarity_threshold
self.embeddings = {} # query_hash -> embedding
self.results = {} # query_hash -> result
async def get_or_compute(self, query: str, compute_fn) -> str:
query_embedding = await self._embed(query)
# 查找语义相似的已缓存结果
for cached_hash, cached_embedding in self.embeddings.items():
similarity = self._cosine_similarity(query_embedding, cached_embedding)
if similarity > self.threshold:
return self.results[cached_hash]
# 缓存未命中,执行计算
result = await compute_fn(query)
query_hash = hashlib.sha256(query.encode()).hexdigest()
self.embeddings[query_hash] = query_embedding
self.results[query_hash] = result
return result
最佳实践
1. 模型多样性优于数量
选择架构差异大的模型组合比堆叠同系列模型更有效。例如,GPT-4o + Claude + Gemini 的组合比 3 个 GPT-4o 变体效果更好。
2. Aggregator 应使用最强模型
Aggregator 的质量直接决定最终输出质量。在成本允许的情况下,Aggregator 应选用最强的综合能力模型(如 Claude 3.5 Sonnet 或 GPT-4o)。
3. 设置合理的超时和降级策略
每个 Proposer 的超时应独立控制,不应让一个慢模型拖累整个管道。采用 N-of-M 策略确保部分模型失败不影响整体流程。
4. 监控每个节点的成本和延迟
使用 JSON 格式化工具 检查 API 响应结构,使用结构化日志追踪每个节点的性能指标。
5. 从简单配置开始
不要一开始就使用 3 层 4 Proposer 的最大配置。从 2 Proposer + 1 Aggregator 开始,根据实际质量需求逐步扩展。
6. 利用 Prompt 分化
给不同 Proposer 不同的系统提示(分析型、创意型、批判型),比让它们用相同 Prompt 回答能产生更多样化的候选回答。
常见问题
Q: Mixture of Agents 与简单的多模型投票有什么区别?
A: 投票(Majority Voting)只选择多数模型一致的答案,丢弃少数派的独特见解。MoA 的 Aggregator 会深度综合所有回答,保留每个模型的独特贡献并融合成更全面的回答。类比:投票是"选择最佳",MoA 是"融合所有优点"。
Q: MoA 是否总是比单模型好?
A: 不是。对于简单的事实查询(如"法国首都是哪里?"),MoA 增加了不必要的延迟和成本但几乎没有质量提升。MoA 的优势集中在需要多角度思考的复杂任务上。
Q: 如何评估 MoA 管道的输出质量?
A: 推荐使用 LLM-as-Judge 方法,让一个独立的评估模型对 MoA 输出和单模型输出进行盲评打分。同时可以使用 A/B 测试收集真实用户偏好数据。
Q: MoA 与 RAG 可以结合使用吗?
A: 完全可以。一种常见模式是在 MoA 的 Proposer 前增加 RAG 检索层,让每个 Proposer 基于检索到的上下文生成回答。这样既有知识增强又有多模型互补。
Q: 自建 MoA 的 API 成本如何控制?
A: 核心策略包括:(1) 动态路由简单任务到单模型;(2) 语义缓存避免重复计算;(3) 使用 Token 计数工具 优化 Prompt 长度减少输入 Token;(4) 在 Proposer 中混合使用成本友好的小模型(如 GPT-4o-mini)。
总结与相关资源
Mixture of Agents 代表了 LLM 应用从"选择最佳单模型"到"编排多模型协作"的范式转变。通过合理的架构设计——并行 Proposer、分层 Aggregator、动态路由——MoA 能在复杂任务上显著超越任何单一模型的性能上限。
关键是根据实际需求选择合适的配置深度:简单任务无需 MoA,中等任务使用轻量单层配置,复杂关键任务使用完整多层管道。
相关资源
系列文章
相关工具
- JSON 格式化工具 - 调试 API 响应
- Token 计数器 - 优化 Prompt 长度
- 文本对比工具 - 比较模型输出差异
相关术语