Agent_10_OpenClaw_Agent_Runtime
上一篇讲了 Gateway 怎么把消息从各个通道统一接入到系统。消息到了 Gateway 之后怎么变成 Agent 的实际行动?这就是 Agent Runtime 要解决的。
OpenClaw 的 Agent Runtime 有一个有意思的设计——双重循环执行(Dual-Loop Execution)。不是在一个 while 循环里处理 Thought → Action → Observation,而是分成两个独立的循环:一个"思考循环"(决定该做什么),一个"执行循环"(实际去做)。分开的原因是这两个循环有不同的节奏和失败模式。
单循环 vs 双循环
先看单循环的 ReAct 模式——专题二已经讲过:
# 单循环 ReAct —— 专题二的版本
for _ in range(max_steps):
thought, action = llm.think_and_act(context)
if action.is_final:
return action.answer
observation = tools.execute(action.name, **action.args)
context += f"\nThought: {thought}\nObservation: {observation}"单循环的问题:每执行一步都要调 LLM 决定下一步。如果工具执行很快(比如本地文件读写 10ms),LLM 调用要等 2-5 秒,整个循环被 LLM 拖累。更糟的是,如果工具执行失败需要重试,或者需要连续调 3 个工具但 LLM 每次只决定调 1 个,效率极低。
OpenClaw 的做法是拆开:
思考循环(Slow Loop) 执行循环(Fast Loop)
┌──────────────────┐ ┌──────────────────┐
│ 1. LLM 决定计划 │ │ 1. 按计划执行工具 │
│ 2. 输出步骤列表 │─────────►│ 2. 收集结果 │
│ 3. 等待执行完成 │◄─────────│ 3. 如果失败 │
│ 4. 审查结果 │ │ 重新思考 │
└──────────────────┘ └──────────────────┘
思考循环:慢(2-5秒/次),但只做决策
执行循环:快(毫秒级),只做工具调用json ... json\s\n(.?)\n
', text, re.DOTALL)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
pass
# Level 3: 正则匹配最外层大括号
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
return {}双重循环的核心优势:执行循环不需要 LLM。如果计划有 5 步工具调用,单循环需要 5 次 LLM(每步选工具),双循环只需要 2 次 LLM(一次计划 + 一次审查)。省掉的 3 次 LLM 调用直接变成更低的延迟和更少的 token 成本。
代价是:计划必须准确。如果 slowloop 生成的计划漏掉了关键步骤,fastloop 执行完再 review 才发现,需要重新走一轮 think → execute → review。
工具路由
Agent 有多个工具时,怎么把 LLM 的工具名映射到实际执行函数?OpenClaw 做了一个分层路由——不是简单的 name -> function 映射,而是按工具类型分路由策略。
"""
工具路由——不同工具类型走不同的执行策略
"""
from dataclasses import dataclass
from typing import Any, Callable
import subprocess
import asyncio
@dataclass
class ToolDefinition:
"""工具定义"""
name: str
description: str
handler: Callable # 执行函数
category: str = "general" # 工具分类
sandbox: bool = True # 是否沙箱执行
timeout: float = 30.0 # 执行超时
requires_approval: bool = False # 是否需要用户确认
class ToolRouter:
"""
工具路由器
分类路由策略:
- read: 只读操作(读文件、查询数据库),沙箱执行,不需要审批
- write: 写操作(写文件、创建资源),沙箱执行,部分需要审批
- execute: 外部执行(运行代码、调 API),不沙箱,需要审批
- general: 其他操作,默认沙箱执行
"""
def __init__(self, config: dict = None):
self.config = config or {}
self._tools: dict[str, ToolDefinition] = {}
self._category_routes: dict[str, list[str]] = {}
def register(self, tool: ToolDefinition) -> None:
self._tools[tool.name] = tool
category = tool.category
if category not in self._category_routes:
self._category_routes[category] = []
self._category_routes[category].append(tool.name)
def execute(self, name: str, **kwargs) -> "ToolResult":
"""执行工具——根据分类应用不同的执行策略"""
if name not in self._tools:
return ToolResult(success=False, error=f"未知工具: {name}")
tool = self._tools[name]
# 检查是否需要审批
if tool.requires_approval and not self._check_approved(name, kwargs):
return ToolResult(
success=False,
error=f"工具 {name} 需要用户审批",
suggestion="请先批准此操作",
)
# 根据分类执行
if tool.sandbox:
return self._execute_sandbox(tool, kwargs)
else:
return self._execute_direct(tool, kwargs)
def _execute_sandbox(self, tool: ToolDefinition, kwargs: dict) -> "ToolResult":
"""
沙箱执行——隔离的工具运行环境
真正的沙箱需要 Docker 容器或 gVisor。
这里是进程级隔离:通过 subprocess 在受限 Python 环境中执行。
"""
import subprocess
import tempfile
import json
import sys
import inspect
try:
# 把参数写入临时文件
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(kwargs, f)
params_path = f.name
# 获取 handler 源码
handler_source = inspect.getsource(tool.handler)
handler_name = tool.handler.__name__
work_dir = self.config.get("sandbox_dir", "./workspace")
# 构建受限的执行脚本
restricted_script = f"""
import sys, os, json
# 禁用危险模块
import builtins
_real_import = builtins.__import__
def restricted_import(name, *args, **kwargs):
blocked = ["socket", "subprocess", "ctypes", "pickle", "shelve"]
if name in blocked:
raise ImportError(f"沙箱中不允许导入 {{name}}")
return _real_import(name, *args, **kwargs)
builtins.__import__ = restricted_import
# 限制文件系统访问
_real_open = builtins.open
_allowed_dir = os.path.abspath("{work_dir}")
def restricted_open(file, mode="r", *args, **kwargs):
abs_path = os.path.abspath(file)
if not abs_path.startswith(_allowed_dir):
raise PermissionError(f"不允许访问 {{abs_path}}")
return _real_open(file, mode, *args, **kwargs)
builtins.open = restricted_open
{handler_source}
with open("{params_path}") as f:
params = json.load(f)
result = {handler_name}(**params)
print(json.dumps(result, default=str))
"""
# 在子进程中执行
result = subprocess.run(
[sys.executable, "-u", "-c", restricted_script],
capture_output=True,
text=True,
timeout=tool.timeout,
cwd=work_dir,
)
if result.returncode == 0:
output = json.loads(result.stdout)
return ToolResult(success=True, output=output)
else:
return ToolResult(
success=False,
error=f"沙箱执行失败: {result.stderr[:500]}",
)
except subprocess.TimeoutExpired:
return ToolResult(success=False, error=f"执行超时(>{tool.timeout}s)")
except Exception as e:
return ToolResult(success=False, error=str(e))
def _execute_direct(self, tool: ToolDefinition, kwargs: dict) -> "ToolResult":
"""直接执行——不受沙箱限制,需要审批"""
try:
result = tool.handler(**kwargs)
return ToolResult(success=True, output=result)
except Exception as e:
return ToolResult(success=False, error=str(e))
def _check_approved(self, name: str, kwargs: dict) -> bool:
"""检查工具是否已获批准"""
return self.config.get("auto_approve", False)
def list_by_category(self, category: str) -> list[str]:
return self._category_routes.get(category, [])
def get_all(self) -> dict[str, dict]:
return {
name: {"description": t.description, "category": t.category}
for name, t in self._tools.items()
}
@dataclass
class ToolResult:
success: bool
output: Any = None
error: str = ""
suggestion: str = ""
# 注册工具
router = ToolRouter(config={"auto_approve": True})
router.register(ToolDefinition(
name="read_file",
description="读取文件内容",
handler=lambda path: open(path).read(),
category="read",
sandbox=True,
))
router.register(ToolDefinition(
name="run_code",
description="执行 Python 代码",
handler=lambda code: exec(code),
category="execute",
sandbox=False,
requires_approval=True,
))分类的意义不是"给工具分组好看",而是安全策略。read 类的工具只读不写,可以随便跑不需要担心副作用。write 类的工具会改文件,需要在沙箱里限制可访问的文件路径。execute 类的工具跑外部代码,必须要求用户审批。
LLM Provider 故障切换
Agent Runtime 强依赖 LLM,但 LLM API 一定会出问题——超时、限流、服务降级。不能假设 LLM 永远可用。
"""
LLM Provider 故障切换——主提供商挂了自动降级
"""
import time
from dataclasses import dataclass
@dataclass
class ProviderConfig:
name: str
api_key: str
base_url: str
model: str
priority: int # 1 = 主,2 = 备,3 = 兜底
timeout: float = 30.0
class ProviderFailover:
"""
LLM 提供商故障切换
策略:
1. 按优先级尝试(主 -> 备 -> 兜底)
2. 记录失败历史,连续失败 3 次跳过该提供商
3. 兜底用小模型(更便宜、更快、但质量低)
"""
def __init__(self, providers: list[ProviderConfig]):
self._providers = sorted(providers, key=lambda p: p.priority)
self._failure_counts: dict[str, int] = {}
self._last_success: dict[str, float] = {p.name: 0 for p in providers}
async def generate(self, prompt: str, **kwargs) -> str:
"""按优先级尝试 LLM 提供商"""
for provider in self._providers:
# 连续失败 3 次的提供商跳过
if self._failure_counts.get(provider.name, 0) >= 3:
continue
try:
result = await self._call_provider(provider, prompt, **kwargs)
# 成功,重置失败计数
self._failure_counts[provider.name] = 0
self._last_success[provider.name] = time.time()
return result
except Exception as e:
self._failure_counts[provider.name] = (
self._failure_counts.get(provider.name, 0) + 1
)
print(f"Provider {provider.name} 失败: {e},切换到下一个")
raise RuntimeError("所有 LLM 提供商均不可用")
async def _call_provider(self, provider: ProviderConfig, prompt: str, **kwargs) -> str:
"""调用单个 LLM 提供商(简化实现)"""
import httpx
async with httpx.AsyncClient(timeout=provider.timeout) as client:
response = await client.post(
f"{provider.base_url}/chat/completions",
headers={"Authorization": f"Bearer {provider.api_key}"},
json={
"model": provider.model,
"messages": [{"role": "user", "content": prompt}],
**kwargs,
},
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]实际配置建议:主提供商用 Claude(质量好,适合做决策),备用用 GPT-4o(速度更快,适合快速回复),兜底用本地 Ollama(不依赖外部 API,即使断网也能用——只是质量差一些)。切换不是无代价的:不同模型的输出格式、tool call schema 不同,Failover 层需要做输出格式归一化。
OpenClaw 的 Agent 实例不是"一直活着"的。Gateway 管理它的创建和销毁,Agent 自己有状态流转。
"""
Agent 生命周期——创建、活跃、空闲、销毁
"""
from enum import Enum
import time
import json
import os
class AgentPhase(Enum):
CREATED = "created"
ACTIVE = "active"
IDLE = "idle"
DESTROYED = "destroyed"
class AgentLifecycle:
"""
Agent 生命周期管理
状态流转:
CREATED -> ACTIVE (收到第一条消息)
ACTIVE -> IDLE (超过 idle_timeout 没有消息)
IDLE -> ACTIVE (收到新消息)
IDLE -> DESTROYED (超过 destroy_timeout 没有消息)
"""
def __init__(self, idle_timeout: int = 1800, destroy_timeout: int = 7200):
self.idle_timeout = idle_timeout # 30 分钟无消息 -> 空闲
self.destroy_timeout = destroy_timeout # 2 小时无消息 -> 销毁
self._phases: dict[str, AgentPhase] = {}
self._timestamps: dict[str, dict] = {}
def transition(self, session_id: str, new_phase: AgentPhase) -> None:
old_phase = self._phases.get(session_id)
self._phases[session_id] = new_phase
self._timestamps[session_id] = {
"phase": new_phase.value,
"previous": old_phase.value if old_phase else None,
"timestamp": time.time(),
}
def check_idle(self, session_id: str, last_message_time: float) -> bool:
"""检查是否应该转为空闲"""
elapsed = time.time() - last_message_time
return elapsed > self.idle_timeout
def check_destroy(self, session_id: str, last_message_time: float) -> bool:
"""检查是否应该销毁"""
elapsed = time.time() - last_message_time
return elapsed > self.destroy_timeout
def get_phase(self, session_id: str) -> AgentPhase:
return self._phases.get(session_id, AgentPhase.CREATED)
class AgentPersistence:
"""
Agent 状态持久化——保存和恢复 Agent 状态
Agent 被销毁前,需要保存:
1. 对话历史
2. 工作区文件
3. 执行统计
恢复时从这些快照中重建 Agent。
"""
def __init__(self, base_dir: str = "./sessions"):
self.base_dir = base_dir
def save(self, session_id: str, agent: "AgentRuntime") -> None:
"""保存 Agent 快照"""
session_dir = os.path.join(self.base_dir, session_id)
os.makedirs(session_dir, exist_ok=True)
# 保存对话历史
messages_path = os.path.join(session_dir, "messages.json")
with open(messages_path, "w", encoding="utf-8") as f:
json.dump(agent._messages, f, ensure_ascii=False, indent=2)
# 保存元数据
meta = {
"session_id": session_id,
"total_llm_calls": agent._total_llm_calls,
"total_tool_calls": agent._total_tool_calls,
"total_tokens": agent._total_tokens,
"last_active": agent.last_active,
"saved_at": time.time(),
}
meta_path = os.path.join(session_dir, "meta.json")
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
def load(self, session_id: str, agent: "AgentRuntime") -> bool:
"""恢复 Agent 快照,返回是否成功"""
session_dir = os.path.join(self.base_dir, session_id)
messages_path = os.path.join(session_dir, "messages.json")
if os.path.exists(messages_path):
with open(messages_path, "r", encoding="utf-8") as f:
agent._messages = json.load(f)
return True
return False
def list_sessions(self) -> list[dict]:
"""列出所有已保存的 session"""
if not os.path.exists(self.base_dir):
return []
sessions = []
for sid in os.listdir(self.base_dir):
meta_path = os.path.join(self.base_dir, sid, "meta.json")
if os.path.exists(meta_path):
with open(meta_path, "r") as f:
meta = json.load(f)
sessions.append(meta)
return sessions为什么 Agent 不能一直活着?内存成本。每个 Agent 实例带着对话历史、工作区文件、工具状态,大约 50-200MB。如果有 1000 个用户但只有 50 个活跃,让 1000 个实例都活着就是浪费。
OpenClaw 的策略是 30 分钟无消息转空闲(清理工作区中生成的临时文件但不销毁实例),2 小时无消息销毁(持久化状态到磁盘)。用户再次发消息时从快照恢复。
工作区(Workspace)
每个 Agent 实例有一个独立的工作区目录。工具执行过程中产生的文件读写都在这个目录里,不同 Agent 的工作区互相隔离。
"""
工作区——Agent 的文件系统隔离
"""
import os
import shutil
from pathlib import Path
class Workspace:
"""
Agent 工作区
每个 Agent 实例有一个独立的目录。
工具的文件操作限制在这个目录内(如果 sandbox=True)。
会话销毁时可选清理工作区。
"""
def __init__(self, session_id: str, base_dir: str = "./workspaces"):
self.session_id = session_id
self.path = Path(base_dir) / session_id
self._ensure_exists()
def _ensure_exists(self) -> None:
self.path.mkdir(parents=True, exist_ok=True)
# 创建子目录
(self.path / "files").mkdir(exist_ok=True)
(self.path / "temp").mkdir(exist_ok=True)
(self.path / "memory").mkdir(exist_ok=True)
def resolve_path(self, relative_path: str, sandbox: bool = True) -> Path:
"""
解析文件路径
sandbox=True 时,路径限制在工作区内。
sandbox=False 时,路径可以是系统任意位置(需要审批)。
"""
if sandbox:
resolved = (self.path / relative_path).resolve()
if not str(resolved).startswith(str(self.path.resolve())):
raise ValueError(f"路径穿越攻击: {relative_path}")
return resolved
else:
return Path(relative_path).resolve()
def read(self, relative_path: str) -> str:
path = self.resolve_path(relative_path, sandbox=True)
return path.read_text(encoding="utf-8")
def write(self, relative_path: str, content: str) -> None:
path = self.resolve_path(relative_path, sandbox=True)
path.write_text(content, encoding="utf-8")
def list_files(self, subdir: str = "") -> list[str]:
base = self.path / subdir if subdir else self.path
if not base.exists():
return []
return [str(p.relative_to(self.path)) for p in base.rglob("*") if p.is_file()]
def size(self) -> int:
"""计算工作区总大小(字节)"""
total = 0
for p in self.path.rglob("*"):
if p.is_file():
total += p.stat().st_size
return total
def cleanup(self, keep_permanent: bool = True) -> None:
"""清理工作区"""
if keep_permanent:
# 只清理 temp 目录
temp_dir = self.path / "temp"
if temp_dir.exists():
shutil.rmtree(temp_dir)
temp_dir.mkdir()
else:
# 全部清理
if self.path.exists():
shutil.rmtree(self.path)
def __repr__(self):
return f"Workspace({self.session_id}, {self.size()} bytes)"
# 使用
ws = Workspace("session_abc")
ws.write("data.csv", "date,revenue\n2025-01,100\n2025-02,120")
print(ws.list_files()) # ['data.csv']
print(ws.read("data.csv"))
ws.cleanup(keep_permanent=True) # 只清理 temp路径穿越是工作区最重要的安全考量。如果工具接收用户输入的文件路径,而路径没有做 resolve() + startswith() 校验,用户可以传 ../../etc/passwd 来读取系统文件。OpenClaw 的 resolve_path 方法强制做了这个检查。
Context Engineering
Agent Runtime 不只是"调 LLM"。对话历史长了会爆 context window,工具结果多了会淹没核心信息。阿里 AgentScope 把 Context Engineering 作为核心能力——自动做截断、压缩、检索。
"""
Context Engineering——上下文窗口的自动管理
问题:
1. 对话历史越长,LLM 响应越慢,token 越贵
2. 对话历史太短,LLM 丢失关键上下文
3. 工具执行结果可能远大于用户消息本身
"""
from dataclasses import dataclass
import time
@dataclass
class ContextConfig:
max_context_tokens: int = 100000 # 模型 context window 上限
max_messages: int = 30 # 保留的最大消息数
reserve_for_output: int = 8000 # 给 LLM 输出预留 token
compression_threshold: float = 0.7 # 达到 70% window 时开始压缩
class ContextManager:
"""
上下文管理器
三级压缩:
1. 滑动窗口——保留最近 N 条对话
2. 摘要压缩——把早期对话合并为摘要
3. 语义筛选——按当前任务筛选相关历史
"""
def __init__(self, config: ContextConfig = None):
self.config = config or ContextConfig()
self._messages: list[dict] = []
self._summaries: list[str] = [] # 历史摘要
def add(self, role: str, content: str, tokens: int = None) -> None:
self._messages.append({
"role": role,
"content": content,
"tokens": tokens or len(content) // 4,
"timestamp": time.time(),
})
def build_context(self, query: str = "") -> list[dict]:
"""构建 LLM 上下文"""
total_tokens = self._current_tokens()
limit = self.config.max_context_tokens - self.config.reserve_for_output
if total_tokens <= limit * self.config.compression_threshold:
# 未超阈值,直接返回
return self._recent_messages()
if total_tokens <= limit:
# 接近阈值,压缩早期对话
return self._compress_early()
# 超过阈值,三级压缩
return self._aggressive_compress(query)
def _current_tokens(self) -> int:
return sum(m["tokens"] for m in self._messages)
def _recent_messages(self) -> list[dict]:
msgs = [{"role": m["role"], "content": m["content"]} for m in self._messages]
return msgs[-self.config.max_messages:]
def _compress_early(self) -> list[dict]:
"""把前 N 条消息合并为摘要"""
if len(self._messages) <= 5:
return self._recent_messages()
early_count = max(1, len(self._messages) // 3)
early = self._messages[:early_count]
# 简单摘要(实际应调 LLM 生成摘要)
summary_text = " | ".join(
f"{m['role']}: {m['content'][:30]}..." for m in early[:3]
)
self._summaries.append(f"[早期对话] {summary_text}")
remaining = self._messages[early_count:]
return [
{"role": "system", "content": self._summaries[-1]},
*[{"role": m["role"], "content": m["content"]} for m in remaining],
]
def _aggressive_compress(self, query: str) -> list[dict]:
"""激进压缩——按相关性筛选 + 摘要"""
if not query:
return self._compress_early()
# 语义筛选:保留与当前 query 相关的消息
query_tokens = set(query.lower().split())
scored = []
for m in self._messages:
msg_tokens = set(m["content"].lower().split())
score = len(query_tokens & msg_tokens) / max(len(query_tokens | msg_tokens), 1)
scored.append((score, m))
scored.sort(key=lambda x: x[0], reverse=True)
# 取 top K 条 + 最近 3 条
top_k = scored[:10]
recent = self._messages[-3:]
selected = [m for _, m in top_k] + recent
# 去重
seen = set()
unique = []
for m in selected:
key = id(m)
if key not in seen:
seen.add(key)
unique.append(m)
return [{"role": m["role"], "content": m["content"]} for m in unique]阿里 AgentScope 2025 年的重点方向就是 Context Engineering——让框架自动管理 context window,不用开发者手动控制截断和检索。这是从"能跑"到"生产可用"的关键一步。OpenClaw 目前的 buildcontext 只做了简单滑动窗口(取最近 5 条),在长对话场景下很快会遇到 context 不够的问题。
大厂对标:Agent Runtime 模式对比
美团 WOWService——Master-Sub Agent 分离:Master Agent 只做意图识别和任务路由,Sub Agent 各司其职(搜索、推荐、到店、外卖)。和 OpenClaw 双循环的差异在于——美团把"规划"从 Agent 内部抽出来变成了独立的 Master Agent,规划者(Master)和执行者(Sub)是两个独立进程,可以独立扩缩容。OpenClaw 的 slow loop 和 fast loop 在同一个进程内。
阿里 AgentScope——Actor 模型分布式:每个 Agent 是独立 Actor,通过 MsgHub 做 P2P 或广播通信。好处是天然支持多 Agent 协作场景(辩论、分工、竞争),而且 Actor 可以分布在不同的机器上。OpenClaw 的 Agent 是单实例的,没有跨实例协作能力。
蚂蚁 Ragent——分布式执行优化:基于 Ray 框架,Agent 训练速度通过 RL 和环境模拟优化提升了 14.6 倍。这种做法适合需要大量训练的垂直 Agent(如客服 Agent),但对个人项目来说太重了。
字节 Coze——标准化插件协议:所有工具都走 OpenAPI/Swagger 兼容的标准化协议,新增工具只需要按协议写配置。OpenClaw 的 Skill 系统是类似的思路,但 Coze 的协议更成熟——支持自动 schema 生成、参数校验、错误码标准化。
OpenClaw 的取舍很明确:单进程、简单、一个人能维护。Master-Sub 分离适合团队有多个 Agent 专家的场景,Actor 分布式适合需要水平扩展的场景,但 OpenClaw 面向的是 solo operator——一个进程管所有东西,简单直接。
双循环不是银弹。它省了 LLM 调用,但引入了一个风险:计划不准确。如果 slowloop 的计划漏掉了关键步骤,fastloop 执行完 review 才发现不对,要重新走一轮 think → execute → review。最坏情况比单循环还慢。
实际使用中,我的经验是:
- 用户请求明确、步骤可预见("读文件 -> 分析 -> 画图"):双循环比单循环快 40-60%(少 2-3 次 LLM 调用)
- 用户请求模糊、需要探索("帮我看看这份数据有什么异常"):单循环反而更好,因为每一步的决策依赖上一步的结果
- 工具数量多(10+):双循环的计划质量下降(LLM 要在 10+ 工具中选对),需要加一步"按用户请求筛选相关工具"的预处理