Agent_05_LangGraph状态机

zbhgis 浩瀚地学222412 分钟
创建于 更新于

一、LangGraph 是什么

LangGraph 是 LangChain 团队推出的图式 Agent 框架。它的核心创新不是加了什么新能力,而是换了一种表达方式——Agent 的执行流程不再是一个 while 循环,而是一张图

这张图由三样东西构成:

  • 节点(Node):一个操作单元,比如 LLM 调用、工具执行、条件判断
  • 边(Edge):节点之间的流转规则
  • 状态(State):整张图共享的数据,所有节点都读写它
之前专题二到四讲的 ReAct、Plan-Execute、多 Agent 协作,本质上都可以用一个 while 循环实现。那为什么要用图来表达?实际使用中我遇到循环结构的三个痛点:
  1. 流程不灵活:想在中间加一步人工审批,或者某一步之后分叉,循环的代码结构要大改
  2. 状态散乱:循环里的变量散落在 context 字典、局部变量、全局变量里,改多了自己都乱
  3. 不好讲给别人听:循环的逻辑得读代码才能理解。图的节点和边直接就是流程图
LangGraph 的做法是把执行流程从"代码逻辑"变成"数据结构"。改流程不用改代码,改图的结构就行。

二、从零实现一个最简状态图引擎

这个例子不依赖 LangGraph,从零实现一个最简的状态图引擎。理解了底层机制,再看 LangGraph 就不会觉得它神秘了。

python
"""
最简状态图引擎——从零理解 LangGraph 的底层机制
"""

from typing import Any, Callable
from dataclasses import dataclass, field


@dataclass
class State:
    """图的状态——所有节点共享的数据"""
    data: dict = field(default_factory=dict)

    def get(self, key: str, default=None) -> Any:
        return self.data.get(key, default)

    def set(self, key: str, value: Any) -> None:
        self.data[key] = value

    def update(self, updates: dict) -> None:
        self.data.update(updates)

    def __repr__(self):
        return f"State({self.data})"


class Node:
    """图中的一个节点——接收 State,修改 State,返回"""

    def __init__(self, name: str, func: Callable[[State], dict]):
        self.name = name
        self.func = func  # func(state) -> dict (更新)

    def execute(self, state: State) -> State:
        updates = self.func(state)
        state.update(updates)
        return state

    def __repr__(self):
        return f"Node({self.name})"


class StateGraph:
    """状态图——节点 + 边的容器"""

    def __init__(self, state_schema: type = State):
        self.nodes: dict[str, Node] = {}
        self.edges: dict[str, str] = {}  # from_node -> to_node
        self.conditional_edges: dict[str, Callable] = {}  # from_node -> condition_func
        self.conditional_targets: dict[str, dict] = {}  # from_node -> {condition_value: to_node}
        self.entry_point: str | None = None
        self.state_schema = state_schema

    def add_node(self, name: str, func: Callable[[State], dict]) -> None:
        self.nodes[name] = Node(name, func)

    def set_entry_point(self, name: str) -> None:
        self.entry_point = name

    def add_edge(self, from_node: str, to_node: str) -> None:
        """添加固定边:from_node 执行完后,无条件走到 to_node"""
        self.edges[from_node] = to_node

    def add_conditional_edges(
        self,
        from_node: str,
        condition: Callable[[State], str],
        targets: dict[str, str],
    ) -> None:
        """
        添加条件边:from_node 执行完后,根据 condition 的结果决定走哪条路
        condition 函数返回一个字符串(路由 key),targets 字典映射 key -> 目标节点名。
        """
        self.conditional_edges[from_node] = condition
        self.conditional_targets[from_node] = targets

    def compile(self) -> "CompiledGraph":
        return CompiledGraph(self)


class CompiledGraph:
    """编译后的图——可以执行"""

    def __init__(self, graph: StateGraph):
        self.graph = graph

    def invoke(self, initial_state: dict = None, max_steps: int = 100) -> State:
        """执行图,从 entry_point 开始,直到无法继续"""
        state = self.graph.state_schema()
        if initial_state:
            state.update(initial_state)

        current = self.graph.entry_point
        step_count = 0

        while current and current != "END" and step_count < max_steps:
            if current not in self.graph.nodes:
                current = None
                continue
            node = self.graph.nodes[current]
            state = node.execute(state)

            if current in self.graph.conditional_edges:
                condition_func = self.graph.conditional_edges[current]
                route_key = condition_func(state)
                targets = self.graph.conditional_targets[current]
                current = targets.get(route_key)
            elif current in self.graph.edges:
                current = self.graph.edges[current]
            else:
                current = None

            step_count += 1

        return state

整个执行过程就是从 entry_point 开始沿着边遍历,每到一个节点执行它。条件边让图可以动态分叉,循环边(某个条件边的目标指回前面的节点)让图可以循环。LangGraph 的底层逻辑就是这个,只是加了很多额外的东西:流式输出、检查点、持久化、可视化。


三、用状态图实现 ReAct Agent

ReAct 本质上就是一张有循环边的图:

python
"""
ReAct Agent 作为状态图

节点:
- llm_node: LLM 输出 Thought + Action/Final Answer
- tool_node: 执行工具,得到 Observation
- end_node: 输出最终答案

边:
- llm_node → 条件边:如果 is_final → end_node,否则 → tool_node
- tool_node → 固定边 → llm_node(循环)

图示:
  [llm] ──is_final──► [end]


    └── [tool] ──固定──┘
       (循环边)
"""


def create_react_graph(llm, tool_registry) -> CompiledGraph:
    """用状态图构建 ReAct Agent"""

    graph = StateGraph()

    def llm_func(state: State) -> dict:
        messages = state.get("messages", [])
        system = state.get("system_prompt", "")
        response = llm.generate_react(messages, system)
        return {
            "messages": messages + [{"role": "assistant", "content": response}],
            "is_final": response.is_final,
            "tool_call": response.tool_call if not response.is_final else None,
        }

    def tool_func(state: State) -> dict:
        tool_call = state.get("tool_call")
        if tool_call is None:
            return {"observation": "无工具调用"}

        result = tool_registry.execute(tool_call["name"], **tool_call.get("args", {}))
        return {
            "observation": result.output if result.success else f"Error: {result.error}",
            "messages": state.get("messages", []) + [
                {"role": "tool", "content": str(result.output), "name": tool_call["name"]}
            ],
        }

    def router(state: State) -> str:
        if state.get("is_final"):
            return "final"
        return "needs_tool"

    graph.add_node("llm", llm_func)
    graph.add_node("tool", tool_func)
    graph.set_entry_point("llm")

    graph.add_conditional_edges("llm", router, {
        "final": "END",
        "needs_tool": "tool",
    })

    graph.add_edge("tool", "llm")

    return graph.compile()


# 使用
graph = create_react_graph(llm, tool_registry)
state = graph.invoke({
    "messages": [{"role": "user", "content": "分析这份销售数据"}],
    "system_prompt": "你是一个 ReAct Agent...",
})
print(state.get("messages")[-1]["content"])

用状态图实现 ReAct 比 while 循环好在几个地方:图的节点和边可以直接渲染为流程图;想在 LLM 和 tool 之间加一个人工审批节点,只需要加一个节点和两条边;每一步的 State 都是快照,中间状态可以追踪。当然,如果你的 ReAct 循环就是 20 行代码的事,那直接写循环就行,没必要引入图的复杂度。


四、用状态图实现 Plan-Execute

Plan-Execute 也是一张图,而且没有循环:

python
"""
Plan-Execute 作为状态图

节点:
- planner: LLM 生成计划
- executor: 按计划执行
- synthesizer: 综合结果

边:
- planner → executor → synthesizer → END
"""


def create_plan_execute_graph(llm, tool_registry) -> CompiledGraph:
    graph = StateGraph()

    def planner_func(state: State) -> dict:
        goal = state.get("goal", "")
        plan = llm.generate_plan(goal)
        return {"plan": plan, "current_step": 0}

    def executor_func(state: State) -> dict:
        plan = state.get("plan", {"steps": []})
        step_idx = state.get("current_step", 0)
        results = state.get("results", [])

        if step_idx >= len(plan["steps"]):
            return {"execution_done": True, "results": results}

        step = plan["steps"][step_idx]
        result = tool_registry.execute(step["tool_name"], **step.get("args", {}))
        results.append({"step": step, "result": result})

        return {
            "current_step": step_idx + 1,
            "results": results,
        }

    def executor_router(state: State) -> str:
        plan = state.get("plan", {"steps": []})
        step_idx = state.get("current_step", 0)
        if step_idx >= len(plan["steps"]):
            return "done"
        return "continue"

    def synthesizer_func(state: State) -> dict:
        results = state.get("results", [])
        goal = state.get("goal", "")
        report = llm.synthesize(goal, results)
        return {"final_answer": report}

    graph.add_node("planner", planner_func)
    graph.add_node("executor", executor_func)
    graph.add_node("synthesizer", synthesizer_func)

    graph.set_entry_point("planner")
    graph.add_edge("planner", "executor")

    graph.add_conditional_edges("executor", executor_router, {
        "continue": "executor",
        "done": "synthesizer",
    })

    graph.add_edge("synthesizer", "END")

    return graph.compile()

ReAct 和 Plan-Execute 的图结构对比很简单:ReAct 是两个节点(LLM + Tool)之间循环,Plan-Execute 是三个节点(Planner + Executor + Synthesizer)线性串起来,只有 Executor 内部自循环。


五、多 Agent 协作

多 Agent 也可以用图表示,每个 Agent 是一个节点:

python
class Agent:
    """简化版 Agent(完整版见专题四)"""

    def __init__(self, name: str, llm, system_prompt: str, tools=None):
        self.name = name
        self.llm = llm
        self.system_prompt = system_prompt
        self.tools = tools or []

    def run(self, input_text: str) -> str:
        return self.llm.generate(input_text, self.system_prompt)


from typing import Callable


def create_dev_pipeline_graph(agents: dict[str, Agent]) -> CompiledGraph:
    graph = StateGraph()

    def make_agent_func(agent: Agent) -> Callable:
        def func(state: State) -> dict:
            input_text = state.get("input", "")
            output = agent.run(input_text)
            return {
                "input": output,
                f"{agent.name}_output": output,
            }
        return func

    agent_names = list(agents.keys())
    for i, name in enumerate(agent_names):
        graph.add_node(name, make_agent_func(agents[name]))

    graph.set_entry_point(agent_names[0])

    for i in range(len(agent_names) - 1):
        graph.add_edge(agent_names[i], agent_names[i + 1])

    return graph.compile()


# 使用
agents = {
    "designer": Agent("架构师", llm, "你是架构师..."),
    "coder": Agent("开发者", llm, "你是开发者..."),
    "reviewer": Agent("审查者", llm, "你是审查者..."),
}
graph = create_dev_pipeline_graph(agents)
state = graph.invoke({"input": "做一个用户管理系统"})
print(state.get("reviewer_output"))


六、Checkpoint 持久化——中断恢复

生产环境的 Agent 可能被中断——网络超时、服务重启、token 耗尽。每次中断都从头执行,成本和时间都不可接受。

python
"""
Checkpoint 持久化 —— 中断后恢复
"""

import json
import os
import time
from typing import Any


class FileCheckpointStore:
    """文件存储 Checkpoint——每次节点执行后,把 State 写入 JSON 文件"""

    def __init__(self, directory: str = ".checkpoints"):
        self.directory = directory
        os.makedirs(directory, exist_ok=True)

    def save(self, run_id: str, state: dict, step: int) -> None:
        checkpoint = {
            "run_id": run_id,
            "step": step,
            "state": state,
            "timestamp": time.time(),
        }
        path = os.path.join(self.directory, f"{run_id}_step{step}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(checkpoint, f, ensure_ascii=False, indent=2)

    def load_latest(self, run_id: str) -> dict | None:
        files = [
            f for f in os.listdir(self.directory)
            if f.startswith(run_id) and f.endswith(".json")
        ]
        if not files:
            return None

        files.sort(key=lambda f: int(f.split("_step")[1].split(".")[0]))
        latest = files[-1]

        with open(os.path.join(self.directory, latest), "r", encoding="utf-8") as f:
            return json.load(f)

    def cleanup(self, run_id: str, keep_latest: int = 1) -> None:
        files = [
            f for f in os.listdir(self.directory)
            if f.startswith(run_id) and f.endswith(".json")
        ]
        files.sort(key=lambda f: int(f.split("_step")[1].split(".")[0]))
        for f in files[:-keep_latest]:
            os.remove(os.path.join(self.directory, f))


class CheckpointableCompiledGraph:
    """支持 Checkpoint 的图执行引擎"""

    def __init__(self, graph: StateGraph, checkpoint_store: FileCheckpointStore = None):
        self.graph = graph
        self.checkpoint_store = checkpoint_store or FileCheckpointStore()

    def invoke(
        self,
        initial_state: dict = None,
        run_id: str = "default",
        max_steps: int = 100,
        checkpoint_every: int = 1,
    ) -> State:
        checkpoint = self.checkpoint_store.load_latest(run_id)

        if checkpoint:
            print(f"  恢复 Checkpoint: run={run_id}, step={checkpoint['step']}")
            state = self.graph.state_schema()
            state.update(checkpoint["state"]["data"])
            current = self._get_next_node_after_step(checkpoint["step"])
            step_count = checkpoint["step"]
        else:
            state = self.graph.state_schema()
            if initial_state:
                state.update(initial_state)
            current = self.graph.entry_point
            step_count = 0

        while current and current != "END" and step_count < max_steps:
            if current not in self.graph.nodes:
                current = None
                continue

            node = self.graph.nodes[current]
            state = node.execute(state)
            step_count += 1

            if step_count % checkpoint_every == 0:
                self.checkpoint_store.save(run_id, {"data": state.data}, step_count)

            if current in self.graph.conditional_edges:
                condition_func = self.graph.conditional_edges[current]
                route_key = condition_func(state)
                targets = self.graph.conditional_targets[current]
                current = targets.get(route_key)
            elif current in self.graph.edges:
                current = self.graph.edges[current]
            else:
                current = None

        self.checkpoint_store.cleanup(run_id, keep_latest=1)
        return state

    def _get_next_node_after_step(self, completed_step: int) -> str | None:
        # 简化实现:实际应该在 Checkpoint 中保存 "current_node" 字段
        return self.graph.entry_point

Checkpoint 的工程实践要点:不要每步都存,频繁 IO 拖慢执行,checkpointevery=3 是个合理的折中;State 要可序列化,不能包含函数或连接池这类对象;每次执行完成清理旧文件,否则磁盘会满;不同执行的 Checkpoint 用 runid 隔离。


七、流式输出——节点执行进度实时推送

状态图执行可能需要 10 到 60 秒,如果用户只看到一个转圈,体验很差。流式输出让每个节点执行完立即推送更新:

python
"""
流式输出 —— 节点执行进度实时推送
"""

from typing import Callable


class StreamEvent:
    NODE_START = "node_start"
    NODE_END = "node_end"
    STATE_UPDATE = "state_update"
    ERROR = "error"
    COMPLETE = "complete"


class StreamSubscriber:
    def on_event(self, event_type: str, data: dict) -> None:
        ...


class ConsoleSubscriber(StreamSubscriber):
    def on_event(self, event_type: str, data: dict) -> None:
        if event_type == StreamEvent.NODE_START:
            print(f"  ▶ {data['node']} 开始执行")
        elif event_type == StreamEvent.NODE_END:
            print(f"  ✓ {data['node']} 执行完成 ({data.get('duration_ms', '?')}ms)")
        elif event_type == StreamEvent.STATE_UPDATE:
            keys = ", ".join(data.get("changed_keys", []))
            print(f"    State 更新: {keys}")
        elif event_type == StreamEvent.ERROR:
            print(f"  ✗ 错误: {data.get('error', '')}")
        elif event_type == StreamEvent.COMPLETE:
            print(f"  ✓ 执行完成,共 {data.get('steps', 0)} 步")


class StreamableCompiledGraph:

    def __init__(self, graph: StateGraph):
        self.graph = graph
        self._subscribers: list[StreamSubscriber] = []

    def subscribe(self, subscriber: StreamSubscriber) -> None:
        self._subscribers.append(subscriber)

    def _emit(self, event_type: str, data: dict) -> None:
        for sub in self._subscribers:
            try:
                sub.on_event(event_type, data)
            except Exception:
                pass

    def invoke(self, initial_state: dict = None, max_steps: int = 100) -> State:
        import time

        state = self.graph.state_schema()
        if initial_state:
            state.update(initial_state)

        current = self.graph.entry_point
        step_count = 0

        while current and current != "END" and step_count < max_steps:
            if current not in self.graph.nodes:
                current = None
                continue

            node = self.graph.nodes[current]

            self._emit(StreamEvent.NODE_START, {"node": current, "step": step_count + 1})
            start_time = time.time()

            previous_data = dict(state.data)
            state = node.execute(state)

            duration = (time.time() - start_time) * 1000

            self._emit(StreamEvent.NODE_END, {
                "node": current,
                "step": step_count + 1,
                "duration_ms": round(duration, 1),
            })

            changed_keys = [k for k in state.data if state.data[k] != previous_data.get(k)]
            if changed_keys:
                self._emit(StreamEvent.STATE_UPDATE, {"changed_keys": changed_keys})

            step_count += 1

            if current in self.graph.conditional_edges:
                condition_func = self.graph.conditional_edges[current]
                route_key = condition_func(state)
                targets = self.graph.conditional_targets[current]
                current = targets.get(route_key)
            elif current in self.graph.edges:
                current = self.graph.edges[current]
            else:
                current = None

        self._emit(StreamEvent.COMPLETE, {"steps": step_count})
        return state


# 使用示例
stream_graph = StreamableCompiledGraph(graph)
stream_graph.subscribe(ConsoleSubscriber())

class WebSocketSubscriber(StreamSubscriber):
    def __init__(self, ws):
        self.ws = ws

    def on_event(self, event_type: str, data: dict) -> None:
        import json
        self.ws.send(json.dumps({"type": event_type, "data": data}))

这就是标准的观察者模式。每个节点执行前后各发射一个事件,订阅者决定怎么处理——控制台打印、推送到前端 WebSocket、或者写入日志都行。订阅者的异常不会中断图的执行。


八、时间旅行调试

有了 Checkpoint 持久化,可以回退到任意步骤的 State,查看当时的数据状态,甚至从那个点重新执行。

python
"""
时间旅行调试 —— 查看和回退到任意步骤的 State 快照
"""


class TimeTravelDebugger:

    def __init__(self, checkpoint_store: FileCheckpointStore, graph: StateGraph):
        self.store = checkpoint_store
        self.graph = graph

    def list_snapshots(self, run_id: str) -> list[dict]:
        import os
        files = [
            f for f in os.listdir(self.store.directory)
            if f.startswith(run_id) and f.endswith(".json")
        ]
        files.sort(key=lambda f: int(f.split("_step")[1].split(".")[0]))

        snapshots = []
        for f in files:
            with open(os.path.join(self.store.directory, f), "r", encoding="utf-8") as fp:
                cp = json.load(fp)
                snapshots.append({
                    "step": cp["step"],
                    "timestamp": cp["timestamp"],
                    "state_keys": list(cp["state"]["data"].keys()),
                })
        return snapshots

    def view_snapshot(self, run_id: str, step: int) -> dict | None:
        import os
        path = os.path.join(self.store.directory, f"{run_id}_step{step}.json")
        if not os.path.exists(path):
            return None
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)

    def diff_snapshots(self, run_id: str, step_a: int, step_b: int) -> dict:
        snap_a = self.view_snapshot(run_id, step_a)
        snap_b = self.view_snapshot(run_id, step_b)

        if not snap_a or not snap_b:
            return {"error": "快照不存在"}

        state_a = snap_a["state"]["data"]
        state_b = snap_b["state"]["data"]

        all_keys = set(state_a.keys()) | set(state_b.keys())

        diff = {"added": [], "removed": [], "changed": []}
        for key in all_keys:
            if key not in state_a:
                diff["added"].append(key)
            elif key not in state_b:
                diff["removed"].append(key)
            elif state_a[key] != state_b[key]:
                diff["changed"].append(key)

        return diff

    def replay_from(self, run_id: str, step: int, graph) -> State:
        checkpoint = self.view_snapshot(run_id, step)
        if not checkpoint:
            raise ValueError(f"快照不存在: run={run_id}, step={step}")

        print(f"  从 step {step} 开始重新执行")
        state = self.graph.state_schema()
        state.update(checkpoint["state"]["data"])
        return graph.invoke({"data": state.data})


# 使用示例
debugger = TimeTravelDebugger(FileCheckpointStore(), graph)

snapshots = debugger.list_snapshots("run_20250101")
for s in snapshots:
    print(f"Step {s['step']}: State 包含 {s['state_keys']}")

snap = debugger.view_snapshot("run_20250101", 3)
print(f"State: {snap['state']}")

diff = debugger.diff_snapshots("run_20250101", 2, 5)
print(f"新增字段: {diff['added']}")
print(f"删除字段: {diff['removed']}")
print(f"变更字段: {diff['changed']}")

传统调试方式是"从头执行、打断点、看变量"。有了快照以后,可以对比两个步骤之间的差异来定位问题出在哪一步,不用从头重跑(可能耗时 60 秒),直接从问题步骤之前的快照开始。修复 bug 后,用同一个快照重新执行验证修复效果,这本身就是很好的回归测试手段。


九、实践中的一些经验

状态图和 while 循环怎么选

看场景。流程复杂、需要可视化、中间需要人工干预、流程会经常改——状态图更合适。流程简单、固定不变、不需要可视化——一个 while 循环 10 行代码能搞定,不需要搞一张图。至少我目前的项目里,简单的 ReAct 循环我都是直接写循环,只有流程超过 3 个节点且有条件分叉时才考虑状态图。

LangGraph 学习曲线陡在哪

实际用下来,难点主要在三个地方:

  1. State 的设计:State 是所有节点共享的。怎么设计 schema——哪些字段、什么类型、节点之间怎么传递数据——需要经验。设计不好 State 会越来越臃肿
  2. 条件边的路由逻辑:路由函数返回一个字符串,targets 字典映射字符串到节点。这个间接层让调试变难——路由函数返回了一个拼写错误的字符串,你找了半天才发现
  3. LangGraph 特有的概念:Checkpoint、Interrupt、Streaming。这些都是增值功能,但也增加了学习负担

LangGraph vs CrewAI vs AutoGen

框架核心理念适合场景学习曲线
LangGraph状态图复杂流程、需要精确控制每一步
CrewAI角色扮演多 Agent 协作、任务分配
AutoGen群聊多 Agent 自由沟通、探索性任务
更深一层的选型标准我个人建议看这几个:你的流程有"人工审批"环节吗——只有 LangGraph 原生支持 Interrupt;你需要流式输出吗——LangGraph 原生支持 Streaming;你需要中断后恢复吗——LangGraph 有 Checkpoint 机制;你的团队有 Python 背景吗——LangGraph 的 Python API 学习曲线陡,CrewAI 更直觉化。

什么时候 LangGraph 是过度设计

Agent 就是一个 ReAct 循环,20 行代码搞定,不需要状态图。流程是线性的 A 到 B 到 C,没有条件分叉、没有循环、不需要人工干预,直接按顺序调用就行。不需要可视化、不需要检查点、不需要流式输出,那 LangGraph 的核心增值功能都没用到。

State 设计

State 的 schema 是状态图最重要的设计决策,它决定了所有节点之间的数据契约。我总结三个原则:

最小化:只放节点之间必须共享的数据。每个节点内部产生的中间变量不放 State。如果一个变量只有节点 A 用,它不应该出现在 State 中。

命名空间:不同类别的数据用不同的前缀,比如 llmthoughttoolresultuser_preference。避免 key 冲突——两个节点都往 State 里写一个叫 result 的 key 会互相覆盖。

不可变更新:节点不直接修改 State,而是返回一个变更字典,由图的执行引擎负责合并。这样每个节点的变更是可追溯的。

python
# 不可变更新——节点返回变更,不直接修改 State
def analyze_node(state: AgentState) -> dict:
    data = state["tool_results"].get("step_1_result")
    result = {"analysis": f"分析结果: {len(data)} 条记录"}
    return result  # 由图引擎合并

循环边的无限循环预防

循环边是实现 ReAct 等模式的关键,但条件判断失误会陷入无限循环。我的做法是三层防护:

python
# 第一层:max_steps 硬限制
def invoke(self, ..., max_steps: int = 100) -> State:
    step_count = 0
    while current and step_count < max_steps:
        step_count += 1

# 第二层:循环检测——同一个节点连续执行 N 次就报警
def detect_loop(history: list[str], threshold: int = 5) -> bool:
    if len(history) < threshold:
        return False
    return len(set(history[-threshold:])) == 1

# 第三层:进度检查——State 中必须有"进展"标记
def check_progress(state: State, last_progress_hash: int) -> bool:
    current_hash = hash(str(state.data))
    if current_hash == last_progress_hash:
        return False
    return True

典型的死循环场景是 LLM 节点输出 isfinal=False 走向 Tool,Tool 执行完回到 LLM,LLM 又输出 isfinal=False……循环直到 max_steps 耗尽。根本原因是 LLM 的判断条件不够严格。我在 system prompt 里加上一条约束——"如果你已经调用了 5 次工具还没有结论,请总结已有信息并输出 Final Answer"——能解决大部分这类问题。


十、写在最后

LangGraph 的状态机思想是 Agent 框架中最灵活、最通用的架构方式。它把 Agent 的执行流程从"代码逻辑"变成了"数据结构",改流程不用改代码,改图就行。但灵活性是有代价的——学习曲线陡、State 设计难、调试复杂。最简单的 ReAct 循环,一个 while 循环 20 行就能搞定,不需要状态图。

整个系列回顾一下:

  • 专题一:Agent = LLM + Memory + Tools + Planner,四大组件可替换
  • 专题二:ReAct = Thought → Action → Observation 循环,处理不确定性
  • 专题三:Plan-Execute = 先规划再执行,效率和可预测性优先
  • 专题四:多 Agent = 串联/并联/层级,分工与协作的权衡
  • 专题五:状态图 = 节点 + 边 + 状态,Agent 的通用执行引擎
不是"哪个框架最好",而是"哪个模式最适合你的场景"。至少我目前的情况是这样。