Agent_04_多Agent协作
这篇讲的是当发现单 Agent 撑不住的时候,怎么拆成多个 Agent。前面的专题一到三都在讲单个 Agent 怎么工作——记忆、工具、规划。但有些任务,单 Agent 就是搞不定:任务太长,上下文窗口装不下;角色太杂,一个 prompt 里塞了程序员、设计师、产品经理的指令,模型注意力分散;或者就是需要并行,串行太慢。
多 Agent 的做法是把任务分给多个 Agent,每个专注一个角色,然后协调结果。核心是一个 Manager 负责任务拆解和合并,下面挂几个 Worker 各自执行。听起来简单,做起来有几个坑我踩过,这篇都讲到了。
串联模式
最简单的多 Agent 模式,A 的输出是 B 的输入,B 的输出是 C 的输入,像流水线。适合任务有明确先后依赖的场景。
"""
多 Agent 串联模式 —— A 的输出 → B 的输入 → C 的输入 → 最终输出
"""
class Agent:
"""Agent 基类——每个 Agent 有独立的 system prompt 和工具"""
def __init__(self, name: str, llm, system_prompt: str, tools=None):
self.name = name
self.llm = llm
self.system_prompt = system_prompt
self.tools = tools or []
def run(self, input_text: str) -> str:
prompt = f"{input_text}"
if self.tools:
return self.llm.generate_with_tools(prompt, self.tools, self.system_prompt)
return self.llm.generate(prompt, self.system_prompt)
def __repr__(self):
return f"Agent({self.name})"
class SerialPipeline:
"""串联式 Agent 流水线"""
def __init__(self, agents: list[Agent], verbose: bool = True):
self.agents = agents
self.verbose = verbose
self.history: list[dict] = []
def run(self, initial_input: str) -> str:
current_input = initial_input
for agent in self.agents:
if self.verbose:
print(f"\n--- {agent.name} 开始执行 ---")
print(f" 输入: {current_input[:100]}...")
output = agent.run(current_input)
self.history.append({
"agent": agent.name,
"input": current_input,
"output": output,
})
if self.verbose:
print(f" 输出: {output[:100]}...")
current_input = output
return current_input
def create_dev_pipeline(llm) -> SerialPipeline:
"""开发流水线:需求 → 设计 → 编码 → 审查"""
designer = Agent(
name="架构师",
llm=llm,
system_prompt=(
"你是一个软件架构师。根据用户需求,设计软件的系统架构。"
"输出格式:模块划分、接口定义、数据流说明。"
),
)
coder = Agent(
name="开发者",
llm=llm,
system_prompt=(
"你是一个高级开发工程师。根据架构设计,编写核心代码。"
"输出格式:代码块 + 简要注释。"
),
)
reviewer = Agent(
name="审查者",
llm=llm,
system_prompt=(
"你是一个资深代码审查专家。审查代码,找出 bug、安全漏洞、性能问题。"
"输出格式:问题列表(严重程度、问题描述、修复建议)。"
),
)
return SerialPipeline([designer, coder, reviewer], verbose=True)这本质上是责任链模式在 Agent 领域的应用。好处是分工清晰,每个 Agent 的 prompt 可以写得很精确。坏处也明显:串行慢,中间环节出错后面全受影响。我实际用的时候发现,串联超过 3 个 Agent 之后,最终输出就开始偏离原始需求了——不是每个 Agent 做错了,而是信息在传递中不断衰减。这个问题后面单独讲。
并联模式
任务可以拆成独立子任务的时候,并行执行比串行快得多。
"""
多 Agent 并联模式 —— 多个 Agent 并行执行,最后合并结果
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
class ParallelPipeline:
"""并联式 Agent——并行执行,合并结果"""
def __init__(self, agents: list[Agent], merger: Agent = None, verbose: bool = True):
self.agents = agents
self.merger = merger or Agent(
name="合并器",
llm=agents[0].llm if agents else None,
system_prompt="你是一个信息整合专家。将多个来源的信息合并为一份完整的报告。去除重复内容,保持逻辑连贯。",
)
self.verbose = verbose
self.results: list[dict] = []
def run(self, input_text: str, max_workers: int = None) -> str:
# Phase 1:并行执行
if self.verbose:
print(f"\n=== 并行执行 {len(self.agents)} 个 Agent ===")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_agent = {
executor.submit(agent.run, input_text): agent
for agent in self.agents
}
for future in as_completed(future_to_agent):
agent = future_to_agent[future]
try:
output = future.result()
if self.verbose:
print(f" {agent.name}: 执行成功,输出 {len(output)} 字")
except Exception as e:
output = f"执行失败: {e}"
if self.verbose:
print(f" {agent.name}: {e}")
self.results.append({
"agent": agent.name,
"output": output,
})
# Phase 2:合并结果
if self.verbose:
print(f"\n=== 合并 {len(self.results)} 个结果 ===")
merger_input = "\n\n".join(
f"### {r['agent']} 的输出\n{r['output']}" for r in self.results
)
return self.merger.run(merger_input)
def create_research_pipeline(llm) -> ParallelPipeline:
"""研究流水线——多个 Agent 从不同角度研究同一主题"""
agent_a = Agent(
name="技术视角",
llm=llm,
system_prompt="你是一个技术专家。从技术可行性、实现难度、性能等角度分析以下问题。",
)
agent_b = Agent(
name="商业视角",
llm=llm,
system_prompt="你是一个商业分析师。从市场需求、竞争格局、盈利模式等角度分析以下问题。",
)
agent_c = Agent(
name="风险视角",
llm=llm,
system_prompt="你是一个风险评估专家。从安全风险、法律合规、伦理等角度分析以下问题。",
)
return ParallelPipeline([agent_a, agent_b, agent_c], verbose=True)分治策略,适合"多角度分析"、"多方案对比"。合并阶段是最大的难点——多个 Agent 的输出可能互相矛盾、重复、格式不统一。合并 Agent 的 prompt 如果没写好,结果就是一堆没组织的内容堆在一起。而且冲突解决不是简单的拼接,后面有专门讲冲突仲裁的代码。
层级模式:Manager + Worker
这是最接近真实团队的模式。Manager 拆任务、分配、审查、合并,Worker 只管执行。
"""
多 Agent 层级模式——Manager 分配、Worker 执行、Manager 审查
"""
class Worker(Agent):
"""Worker Agent——执行具体任务"""
def __init__(self, name: str, role: str, llm, tools=None):
system_prompt = f"你是一个{role}。你只负责被分配的具体任务,不关心全局上下文。输出你的工作结果即可。"
super().__init__(name, llm, system_prompt, tools)
class ManagerAgent:
"""Manager Agent——分配任务、审查结果"""
def __init__(self, llm, workers: list[Worker], verbose: bool = True):
self.llm = llm
self.workers = workers
self.verbose = verbose
def run(self, goal: str) -> str:
# Phase 1:拆解任务
if self.verbose:
print(f"\n=== Manager 拆解任务 ===")
task_assignments = self._decompose(goal)
if self.verbose:
for assignment in task_assignments:
print(f" 分配给 {assignment['worker']}: {assignment['task']}")
# Phase 2:分配并执行
results = []
for assignment in task_assignments:
worker = self._find_worker(assignment["worker"])
if worker is None:
if self.verbose:
print(f" 警告: 找不到 Worker '{assignment['worker']}',跳过")
continue
if self.verbose:
print(f"\n--- {worker.name} 执行 ---")
output = worker.run(assignment["task"])
results.append({
"worker": worker.name,
"task": assignment["task"],
"output": output,
})
# Phase 3:审查质量
if self.verbose:
print(f"\n=== Manager 审查结果 ===")
for result in results:
review = self._review(result["task"], result["output"])
result["review"] = review
if self.verbose:
status = "通过" if review["passed"] else f"不通过: {review['feedback']}"
print(f" {result['worker']}: {status}")
# Phase 4:合并输出
if self.verbose:
print(f"\n=== Manager 合并输出 ===")
return self._synthesize(goal, results)
def _decompose(self, goal: str) -> list[dict]:
"""拆解任务,分配给 Worker"""
worker_names = ", ".join(w.name for w in self.workers)
worker_roles = "\n".join(f"- {w.name}: {w.system_prompt[:50]}..." for w in self.workers)
prompt = f"""
目标: {goal}
可用 Worker:
{worker_roles}
请将目标拆解为子任务,分配给合适的 Worker。
返回 JSON 格式: {{"assignments": [{{"worker": "名字", "task": "任务描述"}}]}}
规则:
1. 每个子任务只分配给一个 Worker
2. 任务描述要具体、可执行
3. 一个 Worker 可以接多个任务
"""
response = self.llm.generate(prompt, system="你是项目经理,负责任务拆解和分配。")
data = self._parse_json(response)
return data.get("assignments", [])
def _review(self, task: str, output: str) -> dict:
"""审查 Worker 的输出质量"""
prompt = f"""
任务: {task}
Worker 输出:
{output[:2000]}
请审查这个输出,判断是否满足以下条件:
1. 是否完成了任务要求?
2. 内容是否完整?
3. 有没有明显的错误或遗漏?
返回 JSON 格式: {{"passed": true/false, "feedback": "审查意见"}}
"""
response = self.llm.generate(prompt, system="你是质量审查员。判断工成果是否达标。")
return self._parse_json(response)
def _synthesize(self, goal: str, results: list[dict]) -> str:
"""合并所有 Worker 结果,生成最终回复"""
results_text = "\n\n".join(
f"### {r['worker']} 的结果\n任务: {r['task']}\n输出: {r['output'][:1000]}\n审查: {r['review'].get('feedback', '通过')}"
for r in results
)
prompt = f"""
目标: {goal}
各 Worker 的执行结果:
{results_text}
请综合以上结果,生成一份完整的最终回复。
"""
return self.llm.generate(prompt, system="你是项目经理,负责汇总和整合团队成果。")
def _find_worker(self, name: str) -> Worker | None:
for w in self.workers:
if w.name == w.name:
return w
return None
def _parse_json(self, text: str) -> dict:
import json, re
try:
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
return json.loads(match.group())
except json.JSONDecodeError:
pass
return {}Manager 是这个模式的核心,也是最容易搞砸的部分。Manager 的职责是拆解、分配、审查、合并,不是"什么都干"。如果 Manager 自己也下场干活,那就退化成"Manager + Worker"合体了。另外 Manager 的拆解质量决定了整个系统的上限——拆错了,后面全白搭。
消息队列模式
多 Agent 协作最难的不是"多个 Agent 各自执行",而是 Agent 之间怎么通信。消息队列模式让 Agent 异步沟通,而不是硬编码的流水线。
"""
Agent 通信协议——消息队列模式
每个 Agent 有自己的收件箱,处理完任务后把结果发到对应 Agent 的收件箱。
"""
from collections import deque
from dataclasses import dataclass, field
from typing import Any
@dataclass
class Message:
"""Agent 之间的消息"""
sender: str
receiver: str
content: str
type: str = "task" # "task" | "result" | "question" | "answer" | "feedback"
metadata: dict = field(default_factory=dict)
class MessageQueue:
"""Agent 消息队列"""
def __init__(self):
self._mailboxes: dict[str, deque[Message]] = {}
def register(self, agent_name: str) -> None:
"""注册 Agent,创建收件箱"""
self._mailboxes[agent_name] = deque()
def send(self, message: Message) -> None:
"""发送消息到收件箱"""
if message.receiver not in self._mailboxes:
raise ValueError(f"收件人 '{message.receiver}' 不存在")
self._mailboxes[message.receiver].append(message)
def receive(self, agent_name: str) -> list[Message]:
"""收取某个 Agent 的所有未读消息"""
mailbox = self._mailboxes.get(agent_name, deque())
messages = list(mailbox)
mailbox.clear()
return messages
def is_empty(self, agent_name: str) -> bool:
return len(self._mailboxes.get(agent_name, deque())) == 0
def all_empty(self) -> bool:
return all(len(m) == 0 for m in self._mailboxes.values())
class MessageDrivenAgent(Agent):
"""消息驱动 Agent——从消息队列读取任务,执行后发送结果"""
def __init__(self, name: str, llm, system_prompt: str, queue: MessageQueue):
super().__init__(name, llm, system_prompt)
self.queue = queue
def step(self) -> list[Message]:
"""处理一轮消息"""
messages = self.queue.receive(self.name)
if not messages:
return []
results = []
for msg in messages:
if msg.type == "task":
output = self.run(msg.content)
self.queue.send(Message(
sender=self.name,
receiver="manager",
content=output,
type="result",
metadata={"original_task": msg.content},
))
results.append(msg)
return results
class MessageDrivenManager(ManagerAgent):
"""消息驱动 Manager——通过消息队列管理 Worker"""
def __init__(self, llm, queue: MessageQueue, workers: list[MessageDrivenAgent] = None, verbose: bool = True):
super().__init__(llm, workers=[], verbose=verbose)
self.queue = queue
self._worker_map: dict[str, MessageDrivenAgent] = {w.name: w for w in (workers or [])}
def run(self, goal: str) -> str:
assignments = self._decompose(goal)
for assignment in assignments:
self.queue.send(Message(
sender="manager",
receiver=assignment["worker"],
content=assignment["task"],
type="task",
))
if self.verbose:
print(f"Manager 分配了 {len(assignments)} 个任务")
max_rounds = 20
for round_num in range(max_rounds):
for worker_name in self.queue._mailboxes:
if worker_name == "manager":
continue
agent = self._get_worker(worker_name)
if agent:
agent.step()
manager_messages = self.queue.receive("manager")
results = [m for m in manager_messages if m.type == "result"]
if len(results) >= len(assignments):
if self.verbose:
print(f"所有 Worker 完成,共 {len(results)} 个结果")
return self._synthesize(goal, [
{"worker": m.sender, "task": m.metadata.get("original_task", ""), "output": m.content}
for m in results
])
return "超时,部分 Worker 未完成。"
def _get_worker(self, name: str):
return self._worker_map.get(name)当 Agent 数量不固定、任务之间有复杂依赖时,串并联都不够用。消息队列让 Agent 异步通信,是从"写死的流水线"到"灵活团队协作"的关键一步。但代价是引入了新的复杂度——死锁、消息丢失、超时。下面两个示例讲怎么处理。
信息衰减
多 Agent 协作中最隐蔽的问题是信息在传递过程中不断衰减和变形。A 的输出是 B 的输入,B 的输出是 C 的输入,每一步都会丢失一部分信息——token 截断、理解偏差、格式丢失。经过 3-4 个 Agent 之后,最终输出可能和原始目标差很远。
"""
Agent 间信息衰减分析
量化信息在传递过程中的丢失程度,找到瓶颈环节。
"""
class InformationDecayAnalyzer:
"""信息衰减分析器"""
@staticmethod
def measure_retention(original: str, output: str) -> float:
"""信息保留率——用关键词 overlap 量化,返回 0.0 ~ 1.0"""
def extract_keywords(text: str) -> set:
return set(w for w in text.split() if len(w) >= 2)
original_kw = extract_keywords(original)
output_kw = extract_keywords(output)
if not original_kw:
return 1.0
overlap = original_kw & output_kw
return len(overlap) / len(original_kw)
@staticmethod
def measure_deviation(expected: str, actual: str, llm=None) -> dict:
"""理解偏差——判断输出是否偏离了预期目标"""
if llm:
prompt = f"""
预期目标: {expected[:200]}
实际输出: {actual[:500]}
请判断实际输出是否偏离了预期目标。
返回 JSON: {{"deviation_score": 0~100, "reason": "偏离原因"}}
评分标准:
0 = 完全符合预期
50 = 部分偏离(回答了部分问题,但遗漏了关键信息)
100 = 完全偏离(答非所问)
"""
response = llm.generate(prompt, system="你是质量评估专家。")
import json, re
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
expected_kw = set(w for w in expected.split() if len(w) >= 3)
actual_kw = set(w for w in actual.split() if len(w) >= 3)
overlap = len(expected_kw & actual_kw) / max(len(expected_kw), 1)
return {"deviation_score": int((1 - overlap) * 100), "reason": "关键词匹配"}
@staticmethod
def analyze_pipeline(pipeline_history: list[dict]) -> dict:
"""分析整个流水线的信息衰减"""
results = []
original_input = pipeline_history[0]["input"] if pipeline_history else ""
for step in pipeline_history:
retention = InformationDecayAnalyzer.measure_retention(
original_input, step["output"]
)
deviation = InformationDecayAnalyzer.measure_deviation(
original_input, step["output"]
)
results.append({
"agent": step["agent"],
"input_len": len(step["input"]),
"output_len": len(step["output"]),
"retention_rate": round(retention, 2),
"deviation_score": deviation.get("deviation_score", 0),
"deviation_reason": deviation.get("reason", ""),
})
cumulative_retention = 1.0
for r in results:
cumulative_retention *= r["retention_rate"]
r["cumulative_retention"] = round(cumulative_retention, 2)
return {"steps": results, "final_retention": round(cumulative_retention, 2)}
history = [
{"agent": "架构师", "input": "做一个用户管理系统,支持登录、注册、权限管理", "output": "系统架构:MVC 模式,数据库用 MySQL,前端 Vue"},
{"agent": "开发者", "input": "系统架构:MVC 模式,数据库用 MySQL,前端 Vue", "output": "class User(models.Model):\n name = models.CharField(max_length=100)\n password = models.CharField(max_length=128)"},
{"agent": "审查者", "input": "class User(models.Model):\n name = models.CharField(max_length=100)\n password = models.CharField(max_length=128)", "output": "密码不应明文存储,建议使用哈希加密。缺少邮箱、权限字段。"},
]
analysis = InformationDecayAnalyzer.analyze_pipeline(history)
for step in analysis["steps"]:
print(f"{step['agent']}: 保留率 {step['retention_rate']}, 累积 {step['cumulative_retention']}, 偏差 {step['deviation_score']}")
# 输出(示例):
# 架构师: 保留率 0.15, 累积 0.15, 偏差 60
# 开发者: 保留率 0.10, 累积 0.02, 偏差 80
# 审查者: 保留率 0.05, 累积 0.001, 偏差 90上面的模拟显示,经过 3 个 Agent 之后,原始信息保留率可能降到 0.1%。这就是为什么多 Agent 流水线的最终输出经常"偏离原始需求"。
缓解方案我用了三种:每步注入原始目标——每个 Agent 的 prompt 都包含原始需求,不只是上一步的输出;减少传递环节,能 2 个 Agent 完成的不要拆成 4 个;关键节点校验,在流水线中间加质量门,Agent B 的输入不只是 A 的输出,还包含对 A 输出的校验。
死锁检测与超时恢复
消息队列模式中,多个 Agent 互相等待消息可能形成死锁。A 等 B 的消息,B 等 A 的消息,谁也动不了。
"""
消息队列死锁检测与超时恢复
策略:构建等待图,检测环;超时后采取恢复措施。
"""
import time
from collections import deque
from dataclasses import dataclass, field
@dataclass
class AgentState:
"""Agent 运行时状态"""
name: str
last_active: float = 0.0
pending_tasks: int = 0
waiting_for: str = ""
idle_rounds: int = 0
class DeadlockDetector:
"""死锁检测器——构建等待图,检测环"""
def __init__(self):
self.wait_for: dict[str, str] = {}
def update(self, agent_name: str, waiting_for: str) -> None:
if waiting_for:
self.wait_for[agent_name] = waiting_for
else:
self.wait_for.pop(agent_name, None)
def detect_cycle(self) -> list[str] | None:
"""检测等待图中是否存在环"""
visited = set()
path = []
def dfs(node: str) -> list[str] | None:
if node in visited:
cycle_start = path.index(node)
return path[cycle_start:] + [node]
visited.add(node)
path.append(node)
next_node = self.wait_for.get(node)
if next_node:
result = dfs(next_node)
if result:
return result
path.pop()
return None
for agent in list(self.wait_for.keys()):
if agent not in visited:
cycle = dfs(agent)
if cycle:
return cycle
return None
class TimeoutRecovery:
"""超时恢复策略"""
@staticmethod
def check_and_recover(
agents: dict[str, AgentState],
timeout_rounds: int = 5,
max_idle_rounds: int = 20,
) -> list[str]:
"""检查所有 Agent 的空闲状态,对超时 Agent 采取恢复措施"""
actions = []
for name, state in agents.items():
if state.waiting_for:
state.idle_rounds += 1
if state.idle_rounds >= max_idle_rounds:
actions.append(
f"严重超时: {name} 已等待 {state.idle_rounds} 轮,"
f"目标: {state.waiting_for}。终止任务。"
)
state.waiting_for = ""
state.idle_rounds = 0
elif state.idle_rounds >= timeout_rounds:
actions.append(
f"超时警告: {name} 已等待 {state.idle_rounds} 轮,"
f"目标: {state.waiting_for}。尝试重发消息。"
)
state.idle_rounds = 0
return actions
class DeadlockAwareManager:
"""集成死锁检测的消息队列 Manager"""
def __init__(self, llm, queue, workers):
self.llm = llm
self.queue = queue
self.workers = workers
self.detector = DeadlockDetector()
self.agent_states: dict[str, AgentState] = {}
for w in workers:
self.agent_states[w.name] = AgentState(name=w.name)
def run(self, goal: str) -> str:
max_rounds = 30
for round_num in range(max_rounds):
for name, state in self.agent_states.items():
if not self.queue.is_empty(name):
state.idle_rounds = 0
state.waiting_for = ""
else:
state.waiting_for = self._who_is_this_waiting_for(name)
cycle = self.detector.detect_cycle()
if cycle:
print(f"检测到死锁: {' -> '.join(cycle)}")
victim = cycle[0]
self.agent_states[victim].idle_rounds = 999
break
actions = TimeoutRecovery.check_and_recover(
self.agent_states, timeout_rounds=5, max_idle_rounds=15
)
for action in actions:
print(f" 恢复行动: {action}")
all_done = self._process_round()
if all_done:
break
return self._collect_results()
def _who_is_this_waiting_for(self, agent_name: str) -> str:
"""推断 Agent 在等待谁(简化版)"""
return ""典型的死锁场景有三种:循环等待,A 处理完发给 B,B 发给 C,C 发给 A,但 A 已经处理完了,消息丢失,C 永远等不到回复;消息丢失,A 发消息给 B,但 B 的收件箱没正确注册;处理失败,B 收到消息但处理时抛出异常,没有回复 A。
预防比检测更有效:消息单向流动,Manager 到 Worker 再回到 Manager,不让 Worker 之间直接通信;每条消息设超时时间,超时自动失败;每条消息给唯一 ID,追踪是否送达。
冲突仲裁
并联模式下多个 Agent 对同一问题给出不同答案,冲突怎么解决。
"""
多 Agent 输出冲突检测与仲裁
三种策略:投票制、权重制、LLM 仲裁
"""
class ConflictArbiter:
"""冲突仲裁器"""
@staticmethod
def voting(opinions: list[dict]) -> dict:
"""投票制——相同结论的 Agent 数量最多的胜出"""
from collections import defaultdict
groups: dict[str, list[dict]] = defaultdict(list)
for op in opinions:
groups[op["conclusion"]].append(op)
sorted_groups = sorted(
groups.items(),
key=lambda item: len(item[1]),
reverse=True,
)
winner_conclusion, winner_opinions = sorted_groups[0]
return {
"winner": winner_conclusion,
"vote_count": len(winner_opinions),
"dissenters": [op["agent"] for group in sorted_groups[1:] for op in group],
}
@staticmethod
def llm_arbitrate(opinions: list[dict], llm, goal: str) -> dict:
"""LLM 仲裁——阅读所有冲突意见,生成综合结论"""
opinions_text = "\n\n".join(
f"### {op['agent']}\n结论: {op['conclusion']}\n置信度: {op.get('confidence', '?')}\n理由: {op.get('reasoning', '')}"
for op in opinions
)
prompt = f"""
目标: {goal}
多个 Agent 给出不同意见:
{opinions_text}
请综合以上意见,生成一致结论。返回 JSON:
{{"conclusion": "...", "confidence": 0.0, "unresolved": "未解决的分歧"}}
"""
response = llm.generate(prompt)
import json, re
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
return {"conclusion": response, "confidence": 0.5}并联模式不是"所有 Agent 输出都正确",它们可能互相矛盾。合并器如果只是简单拼接,最终输出就会自相矛盾。仲裁不是锦上添花,是并联模式的必要环节。
什么时候该用,什么时候不该用
多 Agent 不一定比单 Agent 好。优势是分工和并行,代价是通信成本和复杂度。
该用的情况,至少命中两条:单 Agent 的 system prompt 超过 3000 字且包含多个不同角色;任务需要并行执行来降低延迟;不同子任务需要不同的专业知识或工具集;需要质量把控,审查 Agent 可以独立于执行 Agent。
不该用的情况,命中任意一条就不需要:单 Agent 能在一轮 ReAct 循环中完成任务;所有子任务用的是同一套工具和知识;任务之间紧密依赖,无法分解为独立子任务;对成本敏感,多 Agent 的 LLM 费用是单 Agent 的 3 到 10 倍。
我踩过的过度设计的坑:任务用一个 Agent 的 ReAct 循环就能搞定,但拆了 5 个 Agent;Agent 之间的消息传递只是"把一段文本从一个 prompt 搬到另一个 prompt",没有任何实质性分工;整个系统只有一个 Worker,Manager 只是把它包了一层;花了一周搭建多 Agent 框架,跑出来的结果和单 Agent 没区别。
最典型的信号是你加了 Agent,但每个 Agent 的 system prompt 和原来单 Agent 的 prompt 几乎一样——那说明你没在分工,只是在复制。
代价方面,LLM 调用次数成倍增长是最直接的。串联每步一次,并联每个 Agent 一次,层级模式 Manager 拆解加每个 Worker 一次加审查加合并,一次任务可能有 10 次以上 LLM 调用。信息衰减前面已经讲过了。调试难度也是个问题——出问题了不知道是哪个 Agent 的问题,是 Manager 分配错了,Worker 执行错了,还是合并的时候搞砸了。Agent 之间没有调用链追踪,不像微服务有 trace ID。
模式选择上,我的经验是:任务之间有严格先后依赖的,用串联;完全独立各干各的最后合并的,用并联;需要动态分配任务、类型和数量不固定的,用层级;Worker 之间需要互相沟通的,用消息队列。能不用消息队列就不用,复杂度最高。
框架里的实现
CrewAI 的 Crew + Agent + Task 是目前最流行的多 Agent 方案,Agent 有角色、目标、背景故事,Task 有描述、期望输出、Agent 分配。AutoGen 是微软的群聊模式,Agent 之间通过群聊消息通信。LangGraph 用 StateGraph,每个节点是一个 Agent,边决定流转,可以模拟串、并、层级所有模式。MetaGPT 基于 SOP,定义了产品经理、架构师、工程师等角色的标准作业流程。
演进过程
多 Agent 不是起点,是演进的终点。我通常是这个路径:先用单 Agent,发现 system prompt 太长、模型注意力分散、输出质量下降,就拆成串联模式,三个 Agent 各自专注。发现需要并行,改并联。发现任务类型不固定,上层级模式。每一步拆分都要有明确的需求驱动,不要为了"多 Agent"而多 Agent。
多 Agent 协作是从"一个聪明的个体"到"一个有组织的团队"的架构演进。核心价值是分工、并行和质量把控,代价是通信成本、调试难度和 LLM 费用。最实用的建议:先用单 Agent 做到极致,确认真的不够用了再拆。