Agent_05_LangGraph状态机
一、LangGraph 是什么
LangGraph 是 LangChain 团队推出的图式 Agent 框架。它的核心创新不是加了什么新能力,而是换了一种表达方式——Agent 的执行流程不再是一个 while 循环,而是一张图。
这张图由三样东西构成:
- 节点(Node):一个操作单元,比如 LLM 调用、工具执行、条件判断
- 边(Edge):节点之间的流转规则
- 状态(State):整张图共享的数据,所有节点都读写它
- 流程不灵活:想在中间加一步人工审批,或者某一步之后分叉,循环的代码结构要大改
- 状态散乱:循环里的变量散落在
context字典、局部变量、全局变量里,改多了自己都乱 - 不好讲给别人听:循环的逻辑得读代码才能理解。图的节点和边直接就是流程图
二、从零实现一个最简状态图引擎
这个例子不依赖 LangGraph,从零实现一个最简的状态图引擎。理解了底层机制,再看 LangGraph 就不会觉得它神秘了。
"""
最简状态图引擎——从零理解 LangGraph 的底层机制
"""
from typing import Any, Callable
from dataclasses import dataclass, field
@dataclass
class State:
"""图的状态——所有节点共享的数据"""
data: dict = field(default_factory=dict)
def get(self, key: str, default=None) -> Any:
return self.data.get(key, default)
def set(self, key: str, value: Any) -> None:
self.data[key] = value
def update(self, updates: dict) -> None:
self.data.update(updates)
def __repr__(self):
return f"State({self.data})"
class Node:
"""图中的一个节点——接收 State,修改 State,返回"""
def __init__(self, name: str, func: Callable[[State], dict]):
self.name = name
self.func = func # func(state) -> dict (更新)
def execute(self, state: State) -> State:
updates = self.func(state)
state.update(updates)
return state
def __repr__(self):
return f"Node({self.name})"
class StateGraph:
"""状态图——节点 + 边的容器"""
def __init__(self, state_schema: type = State):
self.nodes: dict[str, Node] = {}
self.edges: dict[str, str] = {} # from_node -> to_node
self.conditional_edges: dict[str, Callable] = {} # from_node -> condition_func
self.conditional_targets: dict[str, dict] = {} # from_node -> {condition_value: to_node}
self.entry_point: str | None = None
self.state_schema = state_schema
def add_node(self, name: str, func: Callable[[State], dict]) -> None:
self.nodes[name] = Node(name, func)
def set_entry_point(self, name: str) -> None:
self.entry_point = name
def add_edge(self, from_node: str, to_node: str) -> None:
"""添加固定边:from_node 执行完后,无条件走到 to_node"""
self.edges[from_node] = to_node
def add_conditional_edges(
self,
from_node: str,
condition: Callable[[State], str],
targets: dict[str, str],
) -> None:
"""
添加条件边:from_node 执行完后,根据 condition 的结果决定走哪条路
condition 函数返回一个字符串(路由 key),targets 字典映射 key -> 目标节点名。
"""
self.conditional_edges[from_node] = condition
self.conditional_targets[from_node] = targets
def compile(self) -> "CompiledGraph":
return CompiledGraph(self)
class CompiledGraph:
"""编译后的图——可以执行"""
def __init__(self, graph: StateGraph):
self.graph = graph
def invoke(self, initial_state: dict = None, max_steps: int = 100) -> State:
"""执行图,从 entry_point 开始,直到无法继续"""
state = self.graph.state_schema()
if initial_state:
state.update(initial_state)
current = self.graph.entry_point
step_count = 0
while current and current != "END" and step_count < max_steps:
if current not in self.graph.nodes:
current = None
continue
node = self.graph.nodes[current]
state = node.execute(state)
if current in self.graph.conditional_edges:
condition_func = self.graph.conditional_edges[current]
route_key = condition_func(state)
targets = self.graph.conditional_targets[current]
current = targets.get(route_key)
elif current in self.graph.edges:
current = self.graph.edges[current]
else:
current = None
step_count += 1
return state整个执行过程就是从 entry_point 开始沿着边遍历,每到一个节点执行它。条件边让图可以动态分叉,循环边(某个条件边的目标指回前面的节点)让图可以循环。LangGraph 的底层逻辑就是这个,只是加了很多额外的东西:流式输出、检查点、持久化、可视化。
三、用状态图实现 ReAct Agent
ReAct 本质上就是一张有循环边的图:
"""
ReAct Agent 作为状态图
节点:
- llm_node: LLM 输出 Thought + Action/Final Answer
- tool_node: 执行工具,得到 Observation
- end_node: 输出最终答案
边:
- llm_node → 条件边:如果 is_final → end_node,否则 → tool_node
- tool_node → 固定边 → llm_node(循环)
图示:
[llm] ──is_final──► [end]
▲
│
└── [tool] ──固定──┘
(循环边)
"""
def create_react_graph(llm, tool_registry) -> CompiledGraph:
"""用状态图构建 ReAct Agent"""
graph = StateGraph()
def llm_func(state: State) -> dict:
messages = state.get("messages", [])
system = state.get("system_prompt", "")
response = llm.generate_react(messages, system)
return {
"messages": messages + [{"role": "assistant", "content": response}],
"is_final": response.is_final,
"tool_call": response.tool_call if not response.is_final else None,
}
def tool_func(state: State) -> dict:
tool_call = state.get("tool_call")
if tool_call is None:
return {"observation": "无工具调用"}
result = tool_registry.execute(tool_call["name"], **tool_call.get("args", {}))
return {
"observation": result.output if result.success else f"Error: {result.error}",
"messages": state.get("messages", []) + [
{"role": "tool", "content": str(result.output), "name": tool_call["name"]}
],
}
def router(state: State) -> str:
if state.get("is_final"):
return "final"
return "needs_tool"
graph.add_node("llm", llm_func)
graph.add_node("tool", tool_func)
graph.set_entry_point("llm")
graph.add_conditional_edges("llm", router, {
"final": "END",
"needs_tool": "tool",
})
graph.add_edge("tool", "llm")
return graph.compile()
# 使用
graph = create_react_graph(llm, tool_registry)
state = graph.invoke({
"messages": [{"role": "user", "content": "分析这份销售数据"}],
"system_prompt": "你是一个 ReAct Agent...",
})
print(state.get("messages")[-1]["content"])用状态图实现 ReAct 比 while 循环好在几个地方:图的节点和边可以直接渲染为流程图;想在 LLM 和 tool 之间加一个人工审批节点,只需要加一个节点和两条边;每一步的 State 都是快照,中间状态可以追踪。当然,如果你的 ReAct 循环就是 20 行代码的事,那直接写循环就行,没必要引入图的复杂度。
四、用状态图实现 Plan-Execute
Plan-Execute 也是一张图,而且没有循环:
"""
Plan-Execute 作为状态图
节点:
- planner: LLM 生成计划
- executor: 按计划执行
- synthesizer: 综合结果
边:
- planner → executor → synthesizer → END
"""
def create_plan_execute_graph(llm, tool_registry) -> CompiledGraph:
graph = StateGraph()
def planner_func(state: State) -> dict:
goal = state.get("goal", "")
plan = llm.generate_plan(goal)
return {"plan": plan, "current_step": 0}
def executor_func(state: State) -> dict:
plan = state.get("plan", {"steps": []})
step_idx = state.get("current_step", 0)
results = state.get("results", [])
if step_idx >= len(plan["steps"]):
return {"execution_done": True, "results": results}
step = plan["steps"][step_idx]
result = tool_registry.execute(step["tool_name"], **step.get("args", {}))
results.append({"step": step, "result": result})
return {
"current_step": step_idx + 1,
"results": results,
}
def executor_router(state: State) -> str:
plan = state.get("plan", {"steps": []})
step_idx = state.get("current_step", 0)
if step_idx >= len(plan["steps"]):
return "done"
return "continue"
def synthesizer_func(state: State) -> dict:
results = state.get("results", [])
goal = state.get("goal", "")
report = llm.synthesize(goal, results)
return {"final_answer": report}
graph.add_node("planner", planner_func)
graph.add_node("executor", executor_func)
graph.add_node("synthesizer", synthesizer_func)
graph.set_entry_point("planner")
graph.add_edge("planner", "executor")
graph.add_conditional_edges("executor", executor_router, {
"continue": "executor",
"done": "synthesizer",
})
graph.add_edge("synthesizer", "END")
return graph.compile()ReAct 和 Plan-Execute 的图结构对比很简单:ReAct 是两个节点(LLM + Tool)之间循环,Plan-Execute 是三个节点(Planner + Executor + Synthesizer)线性串起来,只有 Executor 内部自循环。
五、多 Agent 协作
多 Agent 也可以用图表示,每个 Agent 是一个节点:
class Agent:
"""简化版 Agent(完整版见专题四)"""
def __init__(self, name: str, llm, system_prompt: str, tools=None):
self.name = name
self.llm = llm
self.system_prompt = system_prompt
self.tools = tools or []
def run(self, input_text: str) -> str:
return self.llm.generate(input_text, self.system_prompt)
from typing import Callable
def create_dev_pipeline_graph(agents: dict[str, Agent]) -> CompiledGraph:
graph = StateGraph()
def make_agent_func(agent: Agent) -> Callable:
def func(state: State) -> dict:
input_text = state.get("input", "")
output = agent.run(input_text)
return {
"input": output,
f"{agent.name}_output": output,
}
return func
agent_names = list(agents.keys())
for i, name in enumerate(agent_names):
graph.add_node(name, make_agent_func(agents[name]))
graph.set_entry_point(agent_names[0])
for i in range(len(agent_names) - 1):
graph.add_edge(agent_names[i], agent_names[i + 1])
return graph.compile()
# 使用
agents = {
"designer": Agent("架构师", llm, "你是架构师..."),
"coder": Agent("开发者", llm, "你是开发者..."),
"reviewer": Agent("审查者", llm, "你是审查者..."),
}
graph = create_dev_pipeline_graph(agents)
state = graph.invoke({"input": "做一个用户管理系统"})
print(state.get("reviewer_output"))六、Checkpoint 持久化——中断恢复
生产环境的 Agent 可能被中断——网络超时、服务重启、token 耗尽。每次中断都从头执行,成本和时间都不可接受。
"""
Checkpoint 持久化 —— 中断后恢复
"""
import json
import os
import time
from typing import Any
class FileCheckpointStore:
"""文件存储 Checkpoint——每次节点执行后,把 State 写入 JSON 文件"""
def __init__(self, directory: str = ".checkpoints"):
self.directory = directory
os.makedirs(directory, exist_ok=True)
def save(self, run_id: str, state: dict, step: int) -> None:
checkpoint = {
"run_id": run_id,
"step": step,
"state": state,
"timestamp": time.time(),
}
path = os.path.join(self.directory, f"{run_id}_step{step}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(checkpoint, f, ensure_ascii=False, indent=2)
def load_latest(self, run_id: str) -> dict | None:
files = [
f for f in os.listdir(self.directory)
if f.startswith(run_id) and f.endswith(".json")
]
if not files:
return None
files.sort(key=lambda f: int(f.split("_step")[1].split(".")[0]))
latest = files[-1]
with open(os.path.join(self.directory, latest), "r", encoding="utf-8") as f:
return json.load(f)
def cleanup(self, run_id: str, keep_latest: int = 1) -> None:
files = [
f for f in os.listdir(self.directory)
if f.startswith(run_id) and f.endswith(".json")
]
files.sort(key=lambda f: int(f.split("_step")[1].split(".")[0]))
for f in files[:-keep_latest]:
os.remove(os.path.join(self.directory, f))
class CheckpointableCompiledGraph:
"""支持 Checkpoint 的图执行引擎"""
def __init__(self, graph: StateGraph, checkpoint_store: FileCheckpointStore = None):
self.graph = graph
self.checkpoint_store = checkpoint_store or FileCheckpointStore()
def invoke(
self,
initial_state: dict = None,
run_id: str = "default",
max_steps: int = 100,
checkpoint_every: int = 1,
) -> State:
checkpoint = self.checkpoint_store.load_latest(run_id)
if checkpoint:
print(f" 恢复 Checkpoint: run={run_id}, step={checkpoint['step']}")
state = self.graph.state_schema()
state.update(checkpoint["state"]["data"])
current = self._get_next_node_after_step(checkpoint["step"])
step_count = checkpoint["step"]
else:
state = self.graph.state_schema()
if initial_state:
state.update(initial_state)
current = self.graph.entry_point
step_count = 0
while current and current != "END" and step_count < max_steps:
if current not in self.graph.nodes:
current = None
continue
node = self.graph.nodes[current]
state = node.execute(state)
step_count += 1
if step_count % checkpoint_every == 0:
self.checkpoint_store.save(run_id, {"data": state.data}, step_count)
if current in self.graph.conditional_edges:
condition_func = self.graph.conditional_edges[current]
route_key = condition_func(state)
targets = self.graph.conditional_targets[current]
current = targets.get(route_key)
elif current in self.graph.edges:
current = self.graph.edges[current]
else:
current = None
self.checkpoint_store.cleanup(run_id, keep_latest=1)
return state
def _get_next_node_after_step(self, completed_step: int) -> str | None:
# 简化实现:实际应该在 Checkpoint 中保存 "current_node" 字段
return self.graph.entry_pointCheckpoint 的工程实践要点:不要每步都存,频繁 IO 拖慢执行,checkpointevery=3 是个合理的折中;State 要可序列化,不能包含函数或连接池这类对象;每次执行完成清理旧文件,否则磁盘会满;不同执行的 Checkpoint 用 runid 隔离。
七、流式输出——节点执行进度实时推送
状态图执行可能需要 10 到 60 秒,如果用户只看到一个转圈,体验很差。流式输出让每个节点执行完立即推送更新:
"""
流式输出 —— 节点执行进度实时推送
"""
from typing import Callable
class StreamEvent:
NODE_START = "node_start"
NODE_END = "node_end"
STATE_UPDATE = "state_update"
ERROR = "error"
COMPLETE = "complete"
class StreamSubscriber:
def on_event(self, event_type: str, data: dict) -> None:
...
class ConsoleSubscriber(StreamSubscriber):
def on_event(self, event_type: str, data: dict) -> None:
if event_type == StreamEvent.NODE_START:
print(f" ▶ {data['node']} 开始执行")
elif event_type == StreamEvent.NODE_END:
print(f" ✓ {data['node']} 执行完成 ({data.get('duration_ms', '?')}ms)")
elif event_type == StreamEvent.STATE_UPDATE:
keys = ", ".join(data.get("changed_keys", []))
print(f" State 更新: {keys}")
elif event_type == StreamEvent.ERROR:
print(f" ✗ 错误: {data.get('error', '')}")
elif event_type == StreamEvent.COMPLETE:
print(f" ✓ 执行完成,共 {data.get('steps', 0)} 步")
class StreamableCompiledGraph:
def __init__(self, graph: StateGraph):
self.graph = graph
self._subscribers: list[StreamSubscriber] = []
def subscribe(self, subscriber: StreamSubscriber) -> None:
self._subscribers.append(subscriber)
def _emit(self, event_type: str, data: dict) -> None:
for sub in self._subscribers:
try:
sub.on_event(event_type, data)
except Exception:
pass
def invoke(self, initial_state: dict = None, max_steps: int = 100) -> State:
import time
state = self.graph.state_schema()
if initial_state:
state.update(initial_state)
current = self.graph.entry_point
step_count = 0
while current and current != "END" and step_count < max_steps:
if current not in self.graph.nodes:
current = None
continue
node = self.graph.nodes[current]
self._emit(StreamEvent.NODE_START, {"node": current, "step": step_count + 1})
start_time = time.time()
previous_data = dict(state.data)
state = node.execute(state)
duration = (time.time() - start_time) * 1000
self._emit(StreamEvent.NODE_END, {
"node": current,
"step": step_count + 1,
"duration_ms": round(duration, 1),
})
changed_keys = [k for k in state.data if state.data[k] != previous_data.get(k)]
if changed_keys:
self._emit(StreamEvent.STATE_UPDATE, {"changed_keys": changed_keys})
step_count += 1
if current in self.graph.conditional_edges:
condition_func = self.graph.conditional_edges[current]
route_key = condition_func(state)
targets = self.graph.conditional_targets[current]
current = targets.get(route_key)
elif current in self.graph.edges:
current = self.graph.edges[current]
else:
current = None
self._emit(StreamEvent.COMPLETE, {"steps": step_count})
return state
# 使用示例
stream_graph = StreamableCompiledGraph(graph)
stream_graph.subscribe(ConsoleSubscriber())
class WebSocketSubscriber(StreamSubscriber):
def __init__(self, ws):
self.ws = ws
def on_event(self, event_type: str, data: dict) -> None:
import json
self.ws.send(json.dumps({"type": event_type, "data": data}))这就是标准的观察者模式。每个节点执行前后各发射一个事件,订阅者决定怎么处理——控制台打印、推送到前端 WebSocket、或者写入日志都行。订阅者的异常不会中断图的执行。
八、时间旅行调试
有了 Checkpoint 持久化,可以回退到任意步骤的 State,查看当时的数据状态,甚至从那个点重新执行。
"""
时间旅行调试 —— 查看和回退到任意步骤的 State 快照
"""
class TimeTravelDebugger:
def __init__(self, checkpoint_store: FileCheckpointStore, graph: StateGraph):
self.store = checkpoint_store
self.graph = graph
def list_snapshots(self, run_id: str) -> list[dict]:
import os
files = [
f for f in os.listdir(self.store.directory)
if f.startswith(run_id) and f.endswith(".json")
]
files.sort(key=lambda f: int(f.split("_step")[1].split(".")[0]))
snapshots = []
for f in files:
with open(os.path.join(self.store.directory, f), "r", encoding="utf-8") as fp:
cp = json.load(fp)
snapshots.append({
"step": cp["step"],
"timestamp": cp["timestamp"],
"state_keys": list(cp["state"]["data"].keys()),
})
return snapshots
def view_snapshot(self, run_id: str, step: int) -> dict | None:
import os
path = os.path.join(self.store.directory, f"{run_id}_step{step}.json")
if not os.path.exists(path):
return None
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def diff_snapshots(self, run_id: str, step_a: int, step_b: int) -> dict:
snap_a = self.view_snapshot(run_id, step_a)
snap_b = self.view_snapshot(run_id, step_b)
if not snap_a or not snap_b:
return {"error": "快照不存在"}
state_a = snap_a["state"]["data"]
state_b = snap_b["state"]["data"]
all_keys = set(state_a.keys()) | set(state_b.keys())
diff = {"added": [], "removed": [], "changed": []}
for key in all_keys:
if key not in state_a:
diff["added"].append(key)
elif key not in state_b:
diff["removed"].append(key)
elif state_a[key] != state_b[key]:
diff["changed"].append(key)
return diff
def replay_from(self, run_id: str, step: int, graph) -> State:
checkpoint = self.view_snapshot(run_id, step)
if not checkpoint:
raise ValueError(f"快照不存在: run={run_id}, step={step}")
print(f" 从 step {step} 开始重新执行")
state = self.graph.state_schema()
state.update(checkpoint["state"]["data"])
return graph.invoke({"data": state.data})
# 使用示例
debugger = TimeTravelDebugger(FileCheckpointStore(), graph)
snapshots = debugger.list_snapshots("run_20250101")
for s in snapshots:
print(f"Step {s['step']}: State 包含 {s['state_keys']}")
snap = debugger.view_snapshot("run_20250101", 3)
print(f"State: {snap['state']}")
diff = debugger.diff_snapshots("run_20250101", 2, 5)
print(f"新增字段: {diff['added']}")
print(f"删除字段: {diff['removed']}")
print(f"变更字段: {diff['changed']}")传统调试方式是"从头执行、打断点、看变量"。有了快照以后,可以对比两个步骤之间的差异来定位问题出在哪一步,不用从头重跑(可能耗时 60 秒),直接从问题步骤之前的快照开始。修复 bug 后,用同一个快照重新执行验证修复效果,这本身就是很好的回归测试手段。
九、实践中的一些经验
状态图和 while 循环怎么选
看场景。流程复杂、需要可视化、中间需要人工干预、流程会经常改——状态图更合适。流程简单、固定不变、不需要可视化——一个 while 循环 10 行代码能搞定,不需要搞一张图。至少我目前的项目里,简单的 ReAct 循环我都是直接写循环,只有流程超过 3 个节点且有条件分叉时才考虑状态图。
LangGraph 学习曲线陡在哪
实际用下来,难点主要在三个地方:
- State 的设计:State 是所有节点共享的。怎么设计 schema——哪些字段、什么类型、节点之间怎么传递数据——需要经验。设计不好 State 会越来越臃肿
- 条件边的路由逻辑:路由函数返回一个字符串,targets 字典映射字符串到节点。这个间接层让调试变难——路由函数返回了一个拼写错误的字符串,你找了半天才发现
- LangGraph 特有的概念:Checkpoint、Interrupt、Streaming。这些都是增值功能,但也增加了学习负担
LangGraph vs CrewAI vs AutoGen
| 框架 | 核心理念 | 适合场景 | 学习曲线 |
|---|---|---|---|
| LangGraph | 状态图 | 复杂流程、需要精确控制每一步 | 高 |
| CrewAI | 角色扮演 | 多 Agent 协作、任务分配 | 低 |
| AutoGen | 群聊 | 多 Agent 自由沟通、探索性任务 | 中 |
什么时候 LangGraph 是过度设计
Agent 就是一个 ReAct 循环,20 行代码搞定,不需要状态图。流程是线性的 A 到 B 到 C,没有条件分叉、没有循环、不需要人工干预,直接按顺序调用就行。不需要可视化、不需要检查点、不需要流式输出,那 LangGraph 的核心增值功能都没用到。
State 设计
State 的 schema 是状态图最重要的设计决策,它决定了所有节点之间的数据契约。我总结三个原则:
最小化:只放节点之间必须共享的数据。每个节点内部产生的中间变量不放 State。如果一个变量只有节点 A 用,它不应该出现在 State 中。
命名空间:不同类别的数据用不同的前缀,比如 llmthought、toolresult、user_preference。避免 key 冲突——两个节点都往 State 里写一个叫 result 的 key 会互相覆盖。
不可变更新:节点不直接修改 State,而是返回一个变更字典,由图的执行引擎负责合并。这样每个节点的变更是可追溯的。
# 不可变更新——节点返回变更,不直接修改 State
def analyze_node(state: AgentState) -> dict:
data = state["tool_results"].get("step_1_result")
result = {"analysis": f"分析结果: {len(data)} 条记录"}
return result # 由图引擎合并循环边的无限循环预防
循环边是实现 ReAct 等模式的关键,但条件判断失误会陷入无限循环。我的做法是三层防护:
# 第一层:max_steps 硬限制
def invoke(self, ..., max_steps: int = 100) -> State:
step_count = 0
while current and step_count < max_steps:
step_count += 1
# 第二层:循环检测——同一个节点连续执行 N 次就报警
def detect_loop(history: list[str], threshold: int = 5) -> bool:
if len(history) < threshold:
return False
return len(set(history[-threshold:])) == 1
# 第三层:进度检查——State 中必须有"进展"标记
def check_progress(state: State, last_progress_hash: int) -> bool:
current_hash = hash(str(state.data))
if current_hash == last_progress_hash:
return False
return True典型的死循环场景是 LLM 节点输出 isfinal=False 走向 Tool,Tool 执行完回到 LLM,LLM 又输出 isfinal=False……循环直到 max_steps 耗尽。根本原因是 LLM 的判断条件不够严格。我在 system prompt 里加上一条约束——"如果你已经调用了 5 次工具还没有结论,请总结已有信息并输出 Final Answer"——能解决大部分这类问题。
十、写在最后
LangGraph 的状态机思想是 Agent 框架中最灵活、最通用的架构方式。它把 Agent 的执行流程从"代码逻辑"变成了"数据结构",改流程不用改代码,改图就行。但灵活性是有代价的——学习曲线陡、State 设计难、调试复杂。最简单的 ReAct 循环,一个 while 循环 20 行就能搞定,不需要状态图。
整个系列回顾一下:
- 专题一:Agent = LLM + Memory + Tools + Planner,四大组件可替换
- 专题二:ReAct = Thought → Action → Observation 循环,处理不确定性
- 专题三:Plan-Execute = 先规划再执行,效率和可预测性优先
- 专题四:多 Agent = 串联/并联/层级,分工与协作的权衡
- 专题五:状态图 = 节点 + 边 + 状态,Agent 的通用执行引擎