Agent_04_多Agent协作

zbhgis 浩瀚地学213411 分钟
创建于 更新于

这篇讲的是当发现单 Agent 撑不住的时候,怎么拆成多个 Agent。前面的专题一到三都在讲单个 Agent 怎么工作——记忆、工具、规划。但有些任务,单 Agent 就是搞不定:任务太长,上下文窗口装不下;角色太杂,一个 prompt 里塞了程序员、设计师、产品经理的指令,模型注意力分散;或者就是需要并行,串行太慢。

多 Agent 的做法是把任务分给多个 Agent,每个专注一个角色,然后协调结果。核心是一个 Manager 负责任务拆解和合并,下面挂几个 Worker 各自执行。听起来简单,做起来有几个坑我踩过,这篇都讲到了。


串联模式

最简单的多 Agent 模式,A 的输出是 B 的输入,B 的输出是 C 的输入,像流水线。适合任务有明确先后依赖的场景。

python
"""
多 Agent 串联模式 —— A 的输出 → B 的输入 → C 的输入 → 最终输出
"""


class Agent:
    """Agent 基类——每个 Agent 有独立的 system prompt 和工具"""

    def __init__(self, name: str, llm, system_prompt: str, tools=None):
        self.name = name
        self.llm = llm
        self.system_prompt = system_prompt
        self.tools = tools or []

    def run(self, input_text: str) -> str:
        prompt = f"{input_text}"
        if self.tools:
            return self.llm.generate_with_tools(prompt, self.tools, self.system_prompt)
        return self.llm.generate(prompt, self.system_prompt)

    def __repr__(self):
        return f"Agent({self.name})"


class SerialPipeline:
    """串联式 Agent 流水线"""

    def __init__(self, agents: list[Agent], verbose: bool = True):
        self.agents = agents
        self.verbose = verbose
        self.history: list[dict] = []

    def run(self, initial_input: str) -> str:
        current_input = initial_input

        for agent in self.agents:
            if self.verbose:
                print(f"\n--- {agent.name} 开始执行 ---")
                print(f"  输入: {current_input[:100]}...")

            output = agent.run(current_input)

            self.history.append({
                "agent": agent.name,
                "input": current_input,
                "output": output,
            })

            if self.verbose:
                print(f"  输出: {output[:100]}...")

            current_input = output

        return current_input


def create_dev_pipeline(llm) -> SerialPipeline:
    """开发流水线:需求 → 设计 → 编码 → 审查"""

    designer = Agent(
        name="架构师",
        llm=llm,
        system_prompt=(
            "你是一个软件架构师。根据用户需求,设计软件的系统架构。"
            "输出格式:模块划分、接口定义、数据流说明。"
        ),
    )

    coder = Agent(
        name="开发者",
        llm=llm,
        system_prompt=(
            "你是一个高级开发工程师。根据架构设计,编写核心代码。"
            "输出格式:代码块 + 简要注释。"
        ),
    )

    reviewer = Agent(
        name="审查者",
        llm=llm,
        system_prompt=(
            "你是一个资深代码审查专家。审查代码,找出 bug、安全漏洞、性能问题。"
            "输出格式:问题列表(严重程度、问题描述、修复建议)。"
        ),
    )

    return SerialPipeline([designer, coder, reviewer], verbose=True)

这本质上是责任链模式在 Agent 领域的应用。好处是分工清晰,每个 Agent 的 prompt 可以写得很精确。坏处也明显:串行慢,中间环节出错后面全受影响。我实际用的时候发现,串联超过 3 个 Agent 之后,最终输出就开始偏离原始需求了——不是每个 Agent 做错了,而是信息在传递中不断衰减。这个问题后面单独讲。


并联模式

任务可以拆成独立子任务的时候,并行执行比串行快得多。

python
"""
多 Agent 并联模式 —— 多个 Agent 并行执行,最后合并结果
"""

from concurrent.futures import ThreadPoolExecutor, as_completed


class ParallelPipeline:
    """并联式 Agent——并行执行,合并结果"""

    def __init__(self, agents: list[Agent], merger: Agent = None, verbose: bool = True):
        self.agents = agents
        self.merger = merger or Agent(
            name="合并器",
            llm=agents[0].llm if agents else None,
            system_prompt="你是一个信息整合专家。将多个来源的信息合并为一份完整的报告。去除重复内容,保持逻辑连贯。",
        )
        self.verbose = verbose
        self.results: list[dict] = []

    def run(self, input_text: str, max_workers: int = None) -> str:
        # Phase 1:并行执行
        if self.verbose:
            print(f"\n=== 并行执行 {len(self.agents)} 个 Agent ===")

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_agent = {
                executor.submit(agent.run, input_text): agent
                for agent in self.agents
            }

            for future in as_completed(future_to_agent):
                agent = future_to_agent[future]
                try:
                    output = future.result()
                    if self.verbose:
                        print(f"  {agent.name}: 执行成功,输出 {len(output)} 字")
                except Exception as e:
                    output = f"执行失败: {e}"
                    if self.verbose:
                        print(f"  {agent.name}: {e}")

                self.results.append({
                    "agent": agent.name,
                    "output": output,
                })

        # Phase 2:合并结果
        if self.verbose:
            print(f"\n=== 合并 {len(self.results)} 个结果 ===")

        merger_input = "\n\n".join(
            f"### {r['agent']} 的输出\n{r['output']}" for r in self.results
        )
        return self.merger.run(merger_input)


def create_research_pipeline(llm) -> ParallelPipeline:
    """研究流水线——多个 Agent 从不同角度研究同一主题"""

    agent_a = Agent(
        name="技术视角",
        llm=llm,
        system_prompt="你是一个技术专家。从技术可行性、实现难度、性能等角度分析以下问题。",
    )

    agent_b = Agent(
        name="商业视角",
        llm=llm,
        system_prompt="你是一个商业分析师。从市场需求、竞争格局、盈利模式等角度分析以下问题。",
    )

    agent_c = Agent(
        name="风险视角",
        llm=llm,
        system_prompt="你是一个风险评估专家。从安全风险、法律合规、伦理等角度分析以下问题。",
    )

    return ParallelPipeline([agent_a, agent_b, agent_c], verbose=True)

分治策略,适合"多角度分析"、"多方案对比"。合并阶段是最大的难点——多个 Agent 的输出可能互相矛盾、重复、格式不统一。合并 Agent 的 prompt 如果没写好,结果就是一堆没组织的内容堆在一起。而且冲突解决不是简单的拼接,后面有专门讲冲突仲裁的代码。


层级模式:Manager + Worker

这是最接近真实团队的模式。Manager 拆任务、分配、审查、合并,Worker 只管执行。

python
"""
多 Agent 层级模式——Manager 分配、Worker 执行、Manager 审查
"""


class Worker(Agent):
    """Worker Agent——执行具体任务"""

    def __init__(self, name: str, role: str, llm, tools=None):
        system_prompt = f"你是一个{role}。你只负责被分配的具体任务,不关心全局上下文。输出你的工作结果即可。"
        super().__init__(name, llm, system_prompt, tools)


class ManagerAgent:
    """Manager Agent——分配任务、审查结果"""

    def __init__(self, llm, workers: list[Worker], verbose: bool = True):
        self.llm = llm
        self.workers = workers
        self.verbose = verbose

    def run(self, goal: str) -> str:
        # Phase 1:拆解任务
        if self.verbose:
            print(f"\n=== Manager 拆解任务 ===")

        task_assignments = self._decompose(goal)

        if self.verbose:
            for assignment in task_assignments:
                print(f"  分配给 {assignment['worker']}: {assignment['task']}")

        # Phase 2:分配并执行
        results = []
        for assignment in task_assignments:
            worker = self._find_worker(assignment["worker"])
            if worker is None:
                if self.verbose:
                    print(f"  警告: 找不到 Worker '{assignment['worker']}',跳过")
                continue

            if self.verbose:
                print(f"\n--- {worker.name} 执行 ---")

            output = worker.run(assignment["task"])
            results.append({
                "worker": worker.name,
                "task": assignment["task"],
                "output": output,
            })

        # Phase 3:审查质量
        if self.verbose:
            print(f"\n=== Manager 审查结果 ===")

        for result in results:
            review = self._review(result["task"], result["output"])
            result["review"] = review

            if self.verbose:
                status = "通过" if review["passed"] else f"不通过: {review['feedback']}"
                print(f"  {result['worker']}: {status}")

        # Phase 4:合并输出
        if self.verbose:
            print(f"\n=== Manager 合并输出 ===")

        return self._synthesize(goal, results)

    def _decompose(self, goal: str) -> list[dict]:
        """拆解任务,分配给 Worker"""
        worker_names = ", ".join(w.name for w in self.workers)
        worker_roles = "\n".join(f"- {w.name}: {w.system_prompt[:50]}..." for w in self.workers)

        prompt = f"""
目标: {goal}

可用 Worker:
{worker_roles}

请将目标拆解为子任务,分配给合适的 Worker。
返回 JSON 格式: {{"assignments": [{{"worker": "名字", "task": "任务描述"}}]}}

规则:
1. 每个子任务只分配给一个 Worker
2. 任务描述要具体、可执行
3. 一个 Worker 可以接多个任务
"""
        response = self.llm.generate(prompt, system="你是项目经理,负责任务拆解和分配。")
        data = self._parse_json(response)
        return data.get("assignments", [])

    def _review(self, task: str, output: str) -> dict:
        """审查 Worker 的输出质量"""
        prompt = f"""
任务: {task}

Worker 输出:
{output[:2000]}

请审查这个输出,判断是否满足以下条件:
1. 是否完成了任务要求?
2. 内容是否完整?
3. 有没有明显的错误或遗漏?

返回 JSON 格式: {{"passed": true/false, "feedback": "审查意见"}}
"""
        response = self.llm.generate(prompt, system="你是质量审查员。判断工成果是否达标。")
        return self._parse_json(response)

    def _synthesize(self, goal: str, results: list[dict]) -> str:
        """合并所有 Worker 结果,生成最终回复"""
        results_text = "\n\n".join(
            f"### {r['worker']} 的结果\n任务: {r['task']}\n输出: {r['output'][:1000]}\n审查: {r['review'].get('feedback', '通过')}"
            for r in results
        )

        prompt = f"""
目标: {goal}

各 Worker 的执行结果:
{results_text}

请综合以上结果,生成一份完整的最终回复。
"""
        return self.llm.generate(prompt, system="你是项目经理,负责汇总和整合团队成果。")

    def _find_worker(self, name: str) -> Worker | None:
        for w in self.workers:
            if w.name == w.name:
                return w
        return None

    def _parse_json(self, text: str) -> dict:
        import json, re
        try:
            match = re.search(r'\{.*\}', text, re.DOTALL)
            if match:
                return json.loads(match.group())
        except json.JSONDecodeError:
            pass
        return {}

Manager 是这个模式的核心,也是最容易搞砸的部分。Manager 的职责是拆解、分配、审查、合并,不是"什么都干"。如果 Manager 自己也下场干活,那就退化成"Manager + Worker"合体了。另外 Manager 的拆解质量决定了整个系统的上限——拆错了,后面全白搭。


消息队列模式

多 Agent 协作最难的不是"多个 Agent 各自执行",而是 Agent 之间怎么通信。消息队列模式让 Agent 异步沟通,而不是硬编码的流水线。

python
"""
Agent 通信协议——消息队列模式
每个 Agent 有自己的收件箱,处理完任务后把结果发到对应 Agent 的收件箱。
"""

from collections import deque
from dataclasses import dataclass, field
from typing import Any


@dataclass
class Message:
    """Agent 之间的消息"""
    sender: str
    receiver: str
    content: str
    type: str = "task"  # "task" | "result" | "question" | "answer" | "feedback"
    metadata: dict = field(default_factory=dict)


class MessageQueue:
    """Agent 消息队列"""

    def __init__(self):
        self._mailboxes: dict[str, deque[Message]] = {}

    def register(self, agent_name: str) -> None:
        """注册 Agent,创建收件箱"""
        self._mailboxes[agent_name] = deque()

    def send(self, message: Message) -> None:
        """发送消息到收件箱"""
        if message.receiver not in self._mailboxes:
            raise ValueError(f"收件人 '{message.receiver}' 不存在")
        self._mailboxes[message.receiver].append(message)

    def receive(self, agent_name: str) -> list[Message]:
        """收取某个 Agent 的所有未读消息"""
        mailbox = self._mailboxes.get(agent_name, deque())
        messages = list(mailbox)
        mailbox.clear()
        return messages

    def is_empty(self, agent_name: str) -> bool:
        return len(self._mailboxes.get(agent_name, deque())) == 0

    def all_empty(self) -> bool:
        return all(len(m) == 0 for m in self._mailboxes.values())


class MessageDrivenAgent(Agent):
    """消息驱动 Agent——从消息队列读取任务,执行后发送结果"""

    def __init__(self, name: str, llm, system_prompt: str, queue: MessageQueue):
        super().__init__(name, llm, system_prompt)
        self.queue = queue

    def step(self) -> list[Message]:
        """处理一轮消息"""
        messages = self.queue.receive(self.name)
        if not messages:
            return []

        results = []
        for msg in messages:
            if msg.type == "task":
                output = self.run(msg.content)
                self.queue.send(Message(
                    sender=self.name,
                    receiver="manager",
                    content=output,
                    type="result",
                    metadata={"original_task": msg.content},
                ))
                results.append(msg)

        return results


class MessageDrivenManager(ManagerAgent):
    """消息驱动 Manager——通过消息队列管理 Worker"""

    def __init__(self, llm, queue: MessageQueue, workers: list[MessageDrivenAgent] = None, verbose: bool = True):
        super().__init__(llm, workers=[], verbose=verbose)
        self.queue = queue
        self._worker_map: dict[str, MessageDrivenAgent] = {w.name: w for w in (workers or [])}

    def run(self, goal: str) -> str:
        assignments = self._decompose(goal)

        for assignment in assignments:
            self.queue.send(Message(
                sender="manager",
                receiver=assignment["worker"],
                content=assignment["task"],
                type="task",
            ))

        if self.verbose:
            print(f"Manager 分配了 {len(assignments)} 个任务")

        max_rounds = 20
        for round_num in range(max_rounds):
            for worker_name in self.queue._mailboxes:
                if worker_name == "manager":
                    continue
                agent = self._get_worker(worker_name)
                if agent:
                    agent.step()

            manager_messages = self.queue.receive("manager")
            results = [m for m in manager_messages if m.type == "result"]

            if len(results) >= len(assignments):
                if self.verbose:
                    print(f"所有 Worker 完成,共 {len(results)} 个结果")
                return self._synthesize(goal, [
                    {"worker": m.sender, "task": m.metadata.get("original_task", ""), "output": m.content}
                    for m in results
                ])

        return "超时,部分 Worker 未完成。"

    def _get_worker(self, name: str):
        return self._worker_map.get(name)

当 Agent 数量不固定、任务之间有复杂依赖时,串并联都不够用。消息队列让 Agent 异步通信,是从"写死的流水线"到"灵活团队协作"的关键一步。但代价是引入了新的复杂度——死锁、消息丢失、超时。下面两个示例讲怎么处理。


信息衰减

多 Agent 协作中最隐蔽的问题是信息在传递过程中不断衰减和变形。A 的输出是 B 的输入,B 的输出是 C 的输入,每一步都会丢失一部分信息——token 截断、理解偏差、格式丢失。经过 3-4 个 Agent 之后,最终输出可能和原始目标差很远。

python
"""
Agent 间信息衰减分析
量化信息在传递过程中的丢失程度,找到瓶颈环节。
"""


class InformationDecayAnalyzer:
    """信息衰减分析器"""

    @staticmethod
    def measure_retention(original: str, output: str) -> float:
        """信息保留率——用关键词 overlap 量化,返回 0.0 ~ 1.0"""
        def extract_keywords(text: str) -> set:
            return set(w for w in text.split() if len(w) >= 2)

        original_kw = extract_keywords(original)
        output_kw = extract_keywords(output)

        if not original_kw:
            return 1.0

        overlap = original_kw & output_kw
        return len(overlap) / len(original_kw)

    @staticmethod
    def measure_deviation(expected: str, actual: str, llm=None) -> dict:
        """理解偏差——判断输出是否偏离了预期目标"""
        if llm:
            prompt = f"""
预期目标: {expected[:200]}

实际输出: {actual[:500]}

请判断实际输出是否偏离了预期目标。
返回 JSON: {{"deviation_score": 0~100, "reason": "偏离原因"}}

评分标准:
0 = 完全符合预期
50 = 部分偏离(回答了部分问题,但遗漏了关键信息)
100 = 完全偏离(答非所问)
"""
            response = llm.generate(prompt, system="你是质量评估专家。")
            import json, re
            match = re.search(r'\{.*\}', response, re.DOTALL)
            if match:
                try:
                    return json.loads(match.group())
                except json.JSONDecodeError:
                    pass

        expected_kw = set(w for w in expected.split() if len(w) >= 3)
        actual_kw = set(w for w in actual.split() if len(w) >= 3)
        overlap = len(expected_kw & actual_kw) / max(len(expected_kw), 1)
        return {"deviation_score": int((1 - overlap) * 100), "reason": "关键词匹配"}

    @staticmethod
    def analyze_pipeline(pipeline_history: list[dict]) -> dict:
        """分析整个流水线的信息衰减"""
        results = []
        original_input = pipeline_history[0]["input"] if pipeline_history else ""

        for step in pipeline_history:
            retention = InformationDecayAnalyzer.measure_retention(
                original_input, step["output"]
            )
            deviation = InformationDecayAnalyzer.measure_deviation(
                original_input, step["output"]
            )

            results.append({
                "agent": step["agent"],
                "input_len": len(step["input"]),
                "output_len": len(step["output"]),
                "retention_rate": round(retention, 2),
                "deviation_score": deviation.get("deviation_score", 0),
                "deviation_reason": deviation.get("reason", ""),
            })

        cumulative_retention = 1.0
        for r in results:
            cumulative_retention *= r["retention_rate"]
            r["cumulative_retention"] = round(cumulative_retention, 2)

        return {"steps": results, "final_retention": round(cumulative_retention, 2)}


history = [
    {"agent": "架构师", "input": "做一个用户管理系统,支持登录、注册、权限管理", "output": "系统架构:MVC 模式,数据库用 MySQL,前端 Vue"},
    {"agent": "开发者", "input": "系统架构:MVC 模式,数据库用 MySQL,前端 Vue", "output": "class User(models.Model):\n    name = models.CharField(max_length=100)\n    password = models.CharField(max_length=128)"},
    {"agent": "审查者", "input": "class User(models.Model):\n    name = models.CharField(max_length=100)\n    password = models.CharField(max_length=128)", "output": "密码不应明文存储,建议使用哈希加密。缺少邮箱、权限字段。"},
]

analysis = InformationDecayAnalyzer.analyze_pipeline(history)
for step in analysis["steps"]:
    print(f"{step['agent']}: 保留率 {step['retention_rate']}, 累积 {step['cumulative_retention']}, 偏差 {step['deviation_score']}")
# 输出(示例):
# 架构师: 保留率 0.15, 累积 0.15, 偏差 60
# 开发者: 保留率 0.10, 累积 0.02, 偏差 80
# 审查者: 保留率 0.05, 累积 0.001, 偏差 90

上面的模拟显示,经过 3 个 Agent 之后,原始信息保留率可能降到 0.1%。这就是为什么多 Agent 流水线的最终输出经常"偏离原始需求"。

缓解方案我用了三种:每步注入原始目标——每个 Agent 的 prompt 都包含原始需求,不只是上一步的输出;减少传递环节,能 2 个 Agent 完成的不要拆成 4 个;关键节点校验,在流水线中间加质量门,Agent B 的输入不只是 A 的输出,还包含对 A 输出的校验。


死锁检测与超时恢复

消息队列模式中,多个 Agent 互相等待消息可能形成死锁。A 等 B 的消息,B 等 A 的消息,谁也动不了。

python
"""
消息队列死锁检测与超时恢复
策略:构建等待图,检测环;超时后采取恢复措施。
"""

import time
from collections import deque
from dataclasses import dataclass, field


@dataclass
class AgentState:
    """Agent 运行时状态"""
    name: str
    last_active: float = 0.0
    pending_tasks: int = 0
    waiting_for: str = ""
    idle_rounds: int = 0


class DeadlockDetector:
    """死锁检测器——构建等待图,检测环"""

    def __init__(self):
        self.wait_for: dict[str, str] = {}

    def update(self, agent_name: str, waiting_for: str) -> None:
        if waiting_for:
            self.wait_for[agent_name] = waiting_for
        else:
            self.wait_for.pop(agent_name, None)

    def detect_cycle(self) -> list[str] | None:
        """检测等待图中是否存在环"""
        visited = set()
        path = []

        def dfs(node: str) -> list[str] | None:
            if node in visited:
                cycle_start = path.index(node)
                return path[cycle_start:] + [node]

            visited.add(node)
            path.append(node)

            next_node = self.wait_for.get(node)
            if next_node:
                result = dfs(next_node)
                if result:
                    return result

            path.pop()
            return None

        for agent in list(self.wait_for.keys()):
            if agent not in visited:
                cycle = dfs(agent)
                if cycle:
                    return cycle

        return None


class TimeoutRecovery:
    """超时恢复策略"""

    @staticmethod
    def check_and_recover(
        agents: dict[str, AgentState],
        timeout_rounds: int = 5,
        max_idle_rounds: int = 20,
    ) -> list[str]:
        """检查所有 Agent 的空闲状态,对超时 Agent 采取恢复措施"""
        actions = []

        for name, state in agents.items():
            if state.waiting_for:
                state.idle_rounds += 1

                if state.idle_rounds >= max_idle_rounds:
                    actions.append(
                        f"严重超时: {name} 已等待 {state.idle_rounds} 轮,"
                        f"目标: {state.waiting_for}。终止任务。"
                    )
                    state.waiting_for = ""
                    state.idle_rounds = 0

                elif state.idle_rounds >= timeout_rounds:
                    actions.append(
                        f"超时警告: {name} 已等待 {state.idle_rounds} 轮,"
                        f"目标: {state.waiting_for}。尝试重发消息。"
                    )
                    state.idle_rounds = 0

        return actions


class DeadlockAwareManager:
    """集成死锁检测的消息队列 Manager"""

    def __init__(self, llm, queue, workers):
        self.llm = llm
        self.queue = queue
        self.workers = workers
        self.detector = DeadlockDetector()
        self.agent_states: dict[str, AgentState] = {}

        for w in workers:
            self.agent_states[w.name] = AgentState(name=w.name)

    def run(self, goal: str) -> str:
        max_rounds = 30
        for round_num in range(max_rounds):
            for name, state in self.agent_states.items():
                if not self.queue.is_empty(name):
                    state.idle_rounds = 0
                    state.waiting_for = ""
                else:
                    state.waiting_for = self._who_is_this_waiting_for(name)

            cycle = self.detector.detect_cycle()
            if cycle:
                print(f"检测到死锁: {' -> '.join(cycle)}")
                victim = cycle[0]
                self.agent_states[victim].idle_rounds = 999
                break

            actions = TimeoutRecovery.check_and_recover(
                self.agent_states, timeout_rounds=5, max_idle_rounds=15
            )
            for action in actions:
                print(f"  恢复行动: {action}")

            all_done = self._process_round()
            if all_done:
                break

        return self._collect_results()

    def _who_is_this_waiting_for(self, agent_name: str) -> str:
        """推断 Agent 在等待谁(简化版)"""
        return ""

典型的死锁场景有三种:循环等待,A 处理完发给 B,B 发给 C,C 发给 A,但 A 已经处理完了,消息丢失,C 永远等不到回复;消息丢失,A 发消息给 B,但 B 的收件箱没正确注册;处理失败,B 收到消息但处理时抛出异常,没有回复 A。

预防比检测更有效:消息单向流动,Manager 到 Worker 再回到 Manager,不让 Worker 之间直接通信;每条消息设超时时间,超时自动失败;每条消息给唯一 ID,追踪是否送达。


冲突仲裁

并联模式下多个 Agent 对同一问题给出不同答案,冲突怎么解决。

python
"""
多 Agent 输出冲突检测与仲裁
三种策略:投票制、权重制、LLM 仲裁
"""


class ConflictArbiter:
    """冲突仲裁器"""

    @staticmethod
    def voting(opinions: list[dict]) -> dict:
        """投票制——相同结论的 Agent 数量最多的胜出"""
        from collections import defaultdict

        groups: dict[str, list[dict]] = defaultdict(list)
        for op in opinions:
            groups[op["conclusion"]].append(op)

        sorted_groups = sorted(
            groups.items(),
            key=lambda item: len(item[1]),
            reverse=True,
        )

        winner_conclusion, winner_opinions = sorted_groups[0]
        return {
            "winner": winner_conclusion,
            "vote_count": len(winner_opinions),
            "dissenters": [op["agent"] for group in sorted_groups[1:] for op in group],
        }

    @staticmethod
    def llm_arbitrate(opinions: list[dict], llm, goal: str) -> dict:
        """LLM 仲裁——阅读所有冲突意见,生成综合结论"""
        opinions_text = "\n\n".join(
            f"### {op['agent']}\n结论: {op['conclusion']}\n置信度: {op.get('confidence', '?')}\n理由: {op.get('reasoning', '')}"
            for op in opinions
        )

        prompt = f"""
目标: {goal}

多个 Agent 给出不同意见:

{opinions_text}

请综合以上意见,生成一致结论。返回 JSON:
{{"conclusion": "...", "confidence": 0.0, "unresolved": "未解决的分歧"}}
"""
        response = llm.generate(prompt)
        import json, re
        match = re.search(r'\{.*\}', response, re.DOTALL)
        if match:
            try:
                return json.loads(match.group())
            except json.JSONDecodeError:
                pass
        return {"conclusion": response, "confidence": 0.5}

并联模式不是"所有 Agent 输出都正确",它们可能互相矛盾。合并器如果只是简单拼接,最终输出就会自相矛盾。仲裁不是锦上添花,是并联模式的必要环节。


什么时候该用,什么时候不该用

多 Agent 不一定比单 Agent 好。优势是分工和并行,代价是通信成本和复杂度。

该用的情况,至少命中两条:单 Agent 的 system prompt 超过 3000 字且包含多个不同角色;任务需要并行执行来降低延迟;不同子任务需要不同的专业知识或工具集;需要质量把控,审查 Agent 可以独立于执行 Agent。

不该用的情况,命中任意一条就不需要:单 Agent 能在一轮 ReAct 循环中完成任务;所有子任务用的是同一套工具和知识;任务之间紧密依赖,无法分解为独立子任务;对成本敏感,多 Agent 的 LLM 费用是单 Agent 的 3 到 10 倍。

我踩过的过度设计的坑:任务用一个 Agent 的 ReAct 循环就能搞定,但拆了 5 个 Agent;Agent 之间的消息传递只是"把一段文本从一个 prompt 搬到另一个 prompt",没有任何实质性分工;整个系统只有一个 Worker,Manager 只是把它包了一层;花了一周搭建多 Agent 框架,跑出来的结果和单 Agent 没区别。

最典型的信号是你加了 Agent,但每个 Agent 的 system prompt 和原来单 Agent 的 prompt 几乎一样——那说明你没在分工,只是在复制。

代价方面,LLM 调用次数成倍增长是最直接的。串联每步一次,并联每个 Agent 一次,层级模式 Manager 拆解加每个 Worker 一次加审查加合并,一次任务可能有 10 次以上 LLM 调用。信息衰减前面已经讲过了。调试难度也是个问题——出问题了不知道是哪个 Agent 的问题,是 Manager 分配错了,Worker 执行错了,还是合并的时候搞砸了。Agent 之间没有调用链追踪,不像微服务有 trace ID。

模式选择上,我的经验是:任务之间有严格先后依赖的,用串联;完全独立各干各的最后合并的,用并联;需要动态分配任务、类型和数量不固定的,用层级;Worker 之间需要互相沟通的,用消息队列。能不用消息队列就不用,复杂度最高。


框架里的实现

CrewAI 的 Crew + Agent + Task 是目前最流行的多 Agent 方案,Agent 有角色、目标、背景故事,Task 有描述、期望输出、Agent 分配。AutoGen 是微软的群聊模式,Agent 之间通过群聊消息通信。LangGraph 用 StateGraph,每个节点是一个 Agent,边决定流转,可以模拟串、并、层级所有模式。MetaGPT 基于 SOP,定义了产品经理、架构师、工程师等角色的标准作业流程。


演进过程

多 Agent 不是起点,是演进的终点。我通常是这个路径:先用单 Agent,发现 system prompt 太长、模型注意力分散、输出质量下降,就拆成串联模式,三个 Agent 各自专注。发现需要并行,改并联。发现任务类型不固定,上层级模式。每一步拆分都要有明确的需求驱动,不要为了"多 Agent"而多 Agent。


多 Agent 协作是从"一个聪明的个体"到"一个有组织的团队"的架构演进。核心价值是分工、并行和质量把控,代价是通信成本、调试难度和 LLM 费用。最实用的建议:先用单 Agent 做到极致,确认真的不够用了再拆。