Agent_09_OpenClaw_Gateway架构
OpenClaw 是一个开源的自主 AI Agent 系统(前身是 Clawdbot,GitHub 100K+ stars)。这篇不重复介绍它能做什么,直接拆解它的核心架构——Gateway,以及为什么这种 hub-and-spoke 设计是多通道 Agent 系统的最优解。
为什么需要 Gateway
多通道 Agent 最常见的错误架构是:每个通道一个 Agent 实例。WhatsApp 一个、Telegram 一个、WebChat 一个,各自管理自己的 LLM 连接、对话历史、工具调用。
这种做法的问题:
- 会话状态分散:用户在 WhatsApp 和 Telegram 之间切换时,对话历史断了。你不知道这两个用户是不是同一个人。
- 工具调用无法共享:每个 Agent 实例维护自己的工具注册表,新增一个工具要注册到每个实例。
- LLM 连接无法复用:每个通道独立建立 LLM 连接,token 预算、速率限制无法统一管理。
- 权限和沙箱分散:每个通道独立管理用户权限,容易出安全漏洞。
┌──────────────┐
│ Gateway │
│ (控制面) │
│ │
WhatsApp ────────►│ │◄─────── Telegram
(Baileys) │ Session Mgr │ (grammY)
│ │
Slack ───────────►│ │◄─────── Discord
(bolt) │ Tool Router │ (discord.js)
│ │
Signal ──────────►│ │◄─────── WebChat
(signald) │ │ (native)
│ │
│ Agent Loop │
│ │
└──────┬───────┘
│
┌──────▼───────┐
│ LLM Provider │
│ (Cloud API) │
└──────────────┘Gateway 不是"消息转发器",它做了四件事:通道连接管理、会话路由与聚合、Agent 生命周期管理、工具调用路由。
Gateway 核心实现
"""
OpenClaw Gateway —— hub-and-spoke 消息网关
核心职责:
1. 管理所有消息通道的连接生命周期
2. 将不同通道的消息标准化为统一格式
3. 按会话 ID 路由到正确的 Agent 实例
4. 管理 Agent 实例的创建、复用、销毁
"""
from dataclasses import dataclass, field
from typing import Any
from enum import Enum
import time
import uuid
class ChannelType(Enum):
WHATSAPP = "whatsapp"
TELEGRAM = "telegram"
SLACK = "slack"
DISCORD = "discord"
SIGNAL = "signal"
IMESSAGE = "imessage"
WEBCHAT = "webchat"
@dataclass
class InboundMessage:
"""所有通道的标准化入站消息"""
message_id: str
channel: ChannelType
chat_id: str # 通道原始 ID(如 WhatsApp 的 phone_number@s.whatsapp.net)
user_id: str # 统一用户 ID(跨通道映射后的 ID)
content: str
attachments: list[str] = field(default_factory=list)
timestamp: float = field(default_factory=time.time)
reply_to: str | None = None # 回复的消息 ID
@dataclass
class OutboundMessage:
"""标准化出站消息"""
channel: ChannelType
chat_id: str
content: str
attachments: list[str] = field(default_factory=list)
reply_to: str | None = None
class Gateway:
"""
Gateway —— 所有通道的统一接入点
不是消息转发器,而是:
- 通道连接管理器(启动、断开、重连)
- 消息标准化(不同通道格式 -> 统一格式)
- 会话路由(chat_id -> Agent 实例)
- Agent 生命周期管理(创建、复用、超时销毁)
"""
def __init__(self, config: dict):
self.config = config
self._channels: dict[ChannelType, Any] = {}
self._agent_registry: dict[str, Any] = {} # session_id -> agent
self._session_map: dict[str, str] = {} # (channel, chat_id) -> session_id
self._identity_resolver = IdentityResolver() # 跨通道用户识别
self._idle_timeout = config.get("agent_idle_timeout", 3600) # 1 小时
self._max_agents = config.get("max_concurrent_agents", 10)
def register_channel(self, channel_type: ChannelType, adapter: Any) -> None:
"""注册通道适配器"""
self._channels[channel_type] = adapter
adapter.on_message(self._handle_inbound)
adapter.start()
async def _handle_inbound(self, raw_message: dict) -> None:
"""处理入站消息——标准化、路由、转发"""
# 1. 标准化
msg = self._normalize(raw_message)
# 2. 会话路由
session_id = self._resolve_session(msg)
# 3. 获取或创建 Agent
agent = self._get_or_create_agent(session_id, msg.channel)
# 4. 转发给 Agent
response = await agent.process(msg)
# 5. 回传通道
if response:
await self._deliver(response)
def _normalize(self, raw: dict) -> InboundMessage:
"""将不同通道的原始消息标准化"""
channel_type = ChannelType(raw["channel"])
adapter = self._channels[channel_type]
return adapter.normalize(raw)
def _resolve_session(self, msg: InboundMessage) -> str:
"""
会话解析——跨通道用户识别
通过 IdentityResolver 识别是否为已知用户。
如果用户在 WhatsApp 和 Telegram 都用同一个 user_id,
应该映射到同一个 session。
否则按 (channel, chat_id) 创建独立 session。
"""
# 1. 先通过 IdentityResolver 解析用户
user_id = self._identity_resolver.resolve(msg.channel.value, msg.chat_id)
# 2. 按 user_id 查找 session(同一用户跨通道共享 session)
if user_id in self._session_map:
return self._session_map[user_id]
# 3. 新用户,创建 session
session_id = str(uuid.uuid4())
self._session_map[user_id] = session_id
# 同时保留 (channel, chat_id) -> session_id 的反向映射
key = (msg.channel.value, msg.chat_id)
self._session_map[key] = session_id
return session_id
def _get_or_create_agent(self, session_id: str, channel: ChannelType) -> Any:
"""获取已有 Agent 或创建新 Agent"""
if session_id in self._agent_registry:
return self._agent_registry[session_id]
if len(self._agent_registry) >= self._max_agents:
self._evict_idle_agents()
agent = self._spawn_agent(session_id)
self._agent_registry[session_id] = agent
return agent
def _spawn_agent(self, session_id: str) -> Any:
"""创建新 Agent 实例"""
from agent_runtime import AgentRuntime
return AgentRuntime(
session_id=session_id,
workspace=self.config.get("workspace_dir", "./workspaces"),
tools=self._load_tools(),
llm_config=self.config.get("llm"),
)
def _evict_idle_agents(self) -> None:
"""清理空闲 Agent"""
now = time.time()
idle = [
sid for sid, agent in self._agent_registry.items()
if now - agent.last_active > self._idle_timeout
]
for sid in idle[:3]: # 每次最多清理 3 个
agent = self._agent_registry.pop(sid)
agent.cleanup()
async def _deliver(self, msg: OutboundMessage) -> None:
"""投递到目标通道"""
adapter = self._channels.get(msg.channel)
if adapter:
await adapter.send(msg)Gateway 的关键设计是 session 解析——sessionmap 把 (channel, chatid) 映射到统一的 sessionid。这意味着即使用户从 WhatsApp 切换到 Telegram(如果两个通道绑定了同一个 user_id),Agent 看到的还是同一个会话历史。
通道适配器设计
每个通道有不同的消息格式、连接方式、认证机制。适配器模式把这些差异封装掉,Gateway 只和标准化接口打交道。
"""
通道适配器——不同通道协议的统一抽象
"""
from abc import ABC, abstractmethod
from typing import Callable, Any
class ChannelAdapter(ABC):
"""通道适配器基类
每个通道实现四个方法:
- start(): 启动连接
- normalize(raw): 标准化消息
- send(msg): 发送消息
- on_message(callback): 注册消息回调
"""
@abstractmethod
async def start(self) -> None: ...
@abstractmethod
def normalize(self, raw: dict) -> "InboundMessage": ...
@abstractmethod
async def send(self, msg: "OutboundMessage") -> None: ...
@abstractmethod
def on_message(self, callback: Callable) -> None: ...
class TelegramAdapter(ChannelAdapter):
"""Telegram 通道——基于 grammY 框架"""
def __init__(self, bot_token: str):
self.bot_token = bot_token
self._callback = None
async def start(self) -> None:
from telegram import Bot
self._bot = Bot(token=self.bot_token)
# grammY webhook 或 long polling 启动
def normalize(self, raw: dict) -> "InboundMessage":
"""Telegram 消息标准化"""
from uuid import uuid4
update = raw["update"]
msg = update.message or update.edited_message
return InboundMessage(
message_id=str(msg.message_id),
channel="telegram",
chat_id=str(msg.chat.id),
user_id=str(msg.from_user.id),
content=msg.text or "",
attachments=[
doc.file_id
for doc in [msg.document, msg.photo, msg.voice]
if doc
],
timestamp=msg.date.timestamp(),
reply_to=str(msg.reply_to_message.message_id) if msg.reply_to_message else None,
)
async def send(self, msg: "OutboundMessage") -> None:
from telegram import Bot
bot = Bot(token=self.bot_token)
if msg.attachments:
# 有附件:发送文档/图片
await bot.send_document(
chat_id=msg.chat_id,
document=msg.attachments[0],
caption=msg.content,
)
else:
# 纯文本:注意 Telegram 消息长度限制 4096
chunks = self._split_long_message(msg.content, max_len=4090)
for chunk in chunks:
await bot.send_message(chat_id=msg.chat_id, text=chunk)
def on_message(self, callback: Callable) -> None:
self._callback = callback
def _split_long_message(self, text: str, max_len: int = 4090) -> list[str]:
"""Telegram 单条消息最长 4096 字符,拆分发送"""
if len(text) <= max_len:
return [text]
chunks = []
while text:
if len(text) <= max_len:
chunks.append(text)
break
# 找最近的换行点截断
split_at = text.rfind("\n", 0, max_len)
if split_at == -1:
split_at = max_len
chunks.append(text[:split_at])
text = text[split_at:].lstrip()
return chunks
class WhatsAppAdapter(ChannelAdapter):
"""WhatsApp 通道——基于 Baileys 库
Baileys 是 WhatsApp Web 的非官方 API,通过 WebSocket 连接。
需要注意:WhatsApp 对消息频率有严格限制,频繁发送会被封号。
"""
def __init__(self, session_path: str = "./whatsapp_session"):
self.session_path = session_path
self._callback = None
self._sock = None
async def start(self) -> None:
# 实际场景:通过 Baileys 建立 WebSocket 连接
# makeWASocket({ auth: { creds: loadSession(self.session_path) } })
pass
def normalize(self, raw: dict) -> "InboundMessage":
return InboundMessage(
message_id=raw["key"]["id"],
channel="whatsapp",
chat_id=raw["key"]["remoteJid"],
user_id=raw["key"]["remoteJid"].split("@")[0],
content=raw.get("message", {}).get("conversation", ""),
)
async def send(self, msg: "OutboundMessage") -> None:
# WhatsApp 发送频率限制:约 1 条/秒
# 超过会被封号,必须做速率控制
await self._rate_limited_send(msg)
def on_message(self, callback: Callable) -> None:
self._callback = callbackWhatsApp 和 Telegram 的适配器有一个关键区别:消息长度处理。WhatsApp 单条消息没有长度限制(但太长的消息体验差),Telegram 严格限制 4096 字符。还有速率控制:WhatsApp 对发送频率有严格限制,频繁发送会被封号。Telegram 没有这个问题。
这些差异是 Gateway 必须封装的——不能让 Agent 知道它是在跟 WhatsApp 还是 Telegram 对话,否则每个工具输出都要写两套发送逻辑。
会话聚合与跨通道用户识别
这是 Gateway 架构最有价值的部分。同一个用户可能在多个通道与 Agent 对话,系统需要识别这些通道背后的同一个人。
"""
跨通道用户识别 —— 把不同通道的 chat_id 映射到同一个 user_id
"""
from dataclasses import dataclass, field
@dataclass
class UserIdentity:
"""用户身份——一个 user_id 可以关联多个通道"""
user_id: str
linked_channels: dict[str, str] = field(default_factory=dict) # channel_type -> chat_id
created_at: float = field(default_factory=time.time)
last_seen: float = field(default_factory=time.time)
metadata: dict = field(default_factory=dict)
class IdentityResolver:
"""
用户身份解析器
三种识别策略:
1. 显式绑定:用户主动绑定(如在 WebChat 输入 Telegram user_id)
2. 隐式匹配:通过电话号码、邮箱等共同标识符匹配
3. 默认隔离:无法匹配时,按通道隔离
"""
def __init__(self):
self._users: dict[str, UserIdentity] = {}
self._lookup: dict[str, str] = {} # (channel, chat_id) -> user_id
def resolve(self, channel: str, chat_id: str) -> str:
"""解析用户 ID"""
key = f"{channel}:{chat_id}"
if key in self._lookup:
user_id = self._lookup[key]
self._users[user_id].last_seen = time.time()
return user_id
# 无法匹配,创建新用户
user_id = f"user_{len(self._users) + 1:06d}"
user = UserIdentity(
user_id=user_id,
linked_channels={channel: chat_id},
)
self._users[user_id] = user
self._lookup[key] = user_id
return user_id
def link(self, user_id: str, channel: str, chat_id: str) -> None:
"""手动绑定通道到已有用户"""
if user_id in self._users:
self._users[user_id].linked_channels[channel] = chat_id
key = f"{channel}:{chat_id}"
self._lookup[key] = user_id
def get_sessions(self, user_id: str) -> dict[str, str]:
"""获取用户所有已绑定的通道和 chat_id"""
user = self._users.get(user_id)
return user.linked_channels if user else {}
# 在 Gateway 中集成
gateway = Gateway(config={})
identity = IdentityResolver()
# 用户 A 通过 Telegram 接入
user_a = identity.resolve("telegram", "123456")
# -> "user_000001"
# 用户 A 后来通过 WhatsApp 接入同一个账号
# (假设通过电话号码匹配到了)
identity.link(user_a, "whatsapp", "8613800138000@s.whatsapp.net")
# 现在无论用户 A 从哪个通道发消息,
# Gateway 都路由到同一个 session实际场景中,跨通道识别的难点不在代码逻辑,而在匹配策略。WhatsApp 的 chatid 是手机号,Telegram 的 chatid 是数字 ID,WebChat 的 chat_id 是随机 UUID。只有当用户显式绑定(如"把我的 WhatsApp 和 Telegram 绑在一起")时才能正确关联。不做绑定的话,默认隔离是最安全的——不同通道就是不同的用户,不会串数据。
Gateway 的 Session 管理
每个 session 对应一个 Agent 实例。Session 的生命周期直接影响资源消耗和用户体验。
"""
Session 管理 —— Agent 实例的生命周期控制
"""
import time
from collections import OrderedDict
class SessionManager:
"""
Session 管理器
核心职责:
1. 创建和销毁 Agent 实例
2. 空闲超时自动销毁
3. LRU 淘汰——实例数超过上限时淘汰最久未用的
4. 会话持久化——销毁前保存状态
"""
def __init__(self, max_sessions: int = 50, idle_timeout: int = 3600):
self.max_sessions = max_sessions
self.idle_timeout = idle_timeout
self._sessions: OrderedDict[str, dict] = OrderedDict() # session_id -> {agent, last_active, state}
def get_or_create(self, session_id: str, factory) -> Any:
"""获取已有 session 或创建新 session"""
if session_id in self._sessions:
entry = self._sessions[session_id]
entry["last_active"] = time.time()
self._sessions.move_to_end(session_id)
return entry["agent"]
# 容量满了,淘汰最久未用的
if len(self._sessions) >= self.max_sessions:
self._evict_oldest()
# 创建新 session
agent = factory(session_id)
self._sessions[session_id] = {
"agent": agent,
"last_active": time.time(),
"state": {},
}
return agent
def mark_active(self, session_id: str) -> None:
"""标记 session 活跃"""
if session_id in self._sessions:
self._sessions[session_id]["last_active"] = time.time()
self._sessions.move_to_end(session_id)
def sweep_idle(self) -> list[str]:
"""清理空闲 session,返回被清理的 session_id 列表"""
now = time.time()
to_remove = [
sid for sid, entry in self._sessions.items()
if now - entry["last_active"] > self.idle_timeout
]
for sid in to_remove:
self._destroy(sid)
return to_remove
def _evict_oldest(self) -> None:
"""淘汰最久未用的 session"""
oldest_sid = next(iter(self._sessions))
self._destroy(oldest_sid)
def _destroy(self, session_id: str) -> None:
"""销毁 session——保存状态、清理资源"""
entry = self._sessions.pop(session_id, None)
if entry:
entry["agent"].cleanup()
def stats(self) -> dict:
now = time.time()
active_5min = sum(
1 for e in self._sessions.values()
if now - e["last_active"] < 300
)
return {
"total": len(self._sessions),
"active_5min": active_5min,
"capacity_pct": round(len(self._sessions) / self.max_sessions * 100, 1),
}
# 使用
sessions = SessionManager(max_sessions=50, idle_timeout=3600)
# 获取或创建
agent = sessions.get_or_create("session_abc", lambda sid: create_agent(sid))
sessions.mark_active("session_abc")
# 定期清理(建议用定时任务每分钟调一次)
removed = sessions.sweep_idle()
if removed:
print(f"清理了 {len(removed)} 个空闲 session: {removed}")
print(sessions.stats())
# 输出: {"total": 12, "active_5min": 3, "capacity_pct": 24.0}OrderedDict 在这里不是随便选的——movetoend() 保证活跃 session 始终在字典末尾,evictoldest() 直接取 next(iter()) 就是 LRU 淘汰,O(1) 复杂度。
一个 session 的 Agent 实例占用多少内存?取决于 workspace 里的文件、对话历史、工具状态。我的经验是一个活跃 session 大约 50-200MB(含 LLM 连接和对话历史)。50 个 session 上限意味着 2.5-10GB 内存,需要根据服务器配置调。
Gateway 的速率控制
不同通道有不同的发送限制。WhatsApp 约 1 条/秒、Telegram 约 30 条/分钟、Slack 约 1 条/秒。Gateway 必须做统一的速率控制,否则 Agent 输出多条消息时容易触发通道限流。
"""
速率控制——不同通道的发送限流
"""
import asyncio
import time
from collections import deque
class RateLimiter:
"""
令牌桶速率限制器
每个通道独立限流,互不影响。
"""
def __init__(self, max_tokens: int, refill_rate: float):
"""
max_tokens: 桶容量(最大可突发发送数)
refill_rate: 每秒补充的令牌数
"""
self.max_tokens = max_tokens
self.refill_rate = refill_rate
self._tokens = max_tokens
self._last_refill = time.time()
async def acquire(self) -> None:
while True:
self._refill()
if self._tokens > 0:
self._tokens -= 1
return
await asyncio.sleep(1.0 / self.refill_rate)
def _refill(self) -> None:
now = time.time()
elapsed = now - self._last_refill
self._tokens = min(
self.max_tokens,
self._tokens + elapsed * self.refill_rate,
)
self._last_refill = now
class ChannelRateLimiter:
"""
通道速率管理——每个通道一个限流器
"""
DEFAULTS = {
"whatsapp": (1, 0.8), # 1 条突发,0.8 条/秒(保守)
"telegram": (5, 5.0), # 5 条突发,5 条/秒
"slack": (3, 1.0), # 3 条突发,1 条/秒
"discord": (5, 5.0), # 5 条突发,5 条/秒
"signal": (1, 0.5), # 1 条突发,0.5 条/秒(最保守)
"imessage": (1, 1.0), # 1 条突发,1 条/秒
"webchat": (10, 10.0), # 无限制
}
def __init__(self):
self._limiters: dict[str, RateLimiter] = {}
for channel, (burst, rate) in self.DEFAULTS.items():
self._limiters[channel] = RateLimiter(burst, rate)
async def wait(self, channel: str) -> None:
await self._limiters[channel].acquire()
# 在 Gateway 的 deliver 方法中集成
async def _deliver(self, msg: "OutboundMessage") -> None:
# 先限流
await self.rate_limiter.wait(msg.channel.value)
# 再发送
adapter = self._channels.get(msg.channel)
if adapter:
await adapter.send(msg)WhatsApp 的速率限制是最严的——超过大约 1 条/秒就会被封号。Telegram 和 Discord 宽松得多。WebChat 没有外部限制,但 Agent 本身生成速度也有限(每步 LLM 调用 2-5 秒),所以实际上不会成为瓶颈。
Gateway 的错误处理
通道连接一定会断——网络超时、Token 过期、服务端重启。Gateway 必须处理这些情况而不丢失消息。
"""
Gateway 错误处理——连接断开、消息发送失败、Agent 崩溃
"""
import asyncio
import logging
logger = logging.getLogger("gateway")
class ChannelHealthMonitor:
"""
通道健康监控——自动重连、失败计数、降级
不是每次断开都需要重连。区分三种情况:
1. 网络抖动:短暂断开,自动重连即可
2. 认证失效:Token 过期,需要重新认证
3. 服务不可用:通道服务端挂了,需要等待恢复
"""
def __init__(self, max_retries: int = 5, base_delay: float = 2.0):
self.max_retries = max_retries
self.base_delay = base_delay
self._failure_counts: dict[str, int] = {}
self._reconnect_tasks: dict[str, asyncio.Task] = {}
async def on_disconnect(self, channel: str, error: Exception, reconnect_fn) -> None:
"""处理通道断开连接"""
self._failure_counts[channel] = self._failure_counts.get(channel, 0) + 1
count = self._failure_counts[channel]
if count > self.max_retries:
logger.error(f"通道 {channel} 重连失败 {count} 次,停止重试。错误: {error}")
self._notify_admin(channel, error)
return
# 判断是否需要重新认证
if self._is_auth_error(error):
logger.warning(f"通道 {channel} 认证失效,需要重新认证")
self._notify_reauth_needed(channel)
return
# 指数退避重连
delay = self.base_delay * (2 ** (count - 1))
logger.info(f"通道 {channel} 断开,{delay}s 后重连(第 {count} 次)")
task = asyncio.create_task(self._reconnect_with_delay(
channel, delay, error, reconnect_fn
))
self._reconnect_tasks[channel] = task
async def _reconnect_with_delay(self, channel, delay, error, reconnect_fn):
"""等待后重连"""
await asyncio.sleep(delay)
try:
await reconnect_fn()
self._failure_counts[channel] = 0
logger.info(f"通道 {channel} 重连成功")
except Exception as e:
await self.on_disconnect(channel, e, reconnect_fn)
def _is_auth_error(self, error: Exception) -> bool:
"""判断是否是认证失效"""
error_str = str(error).lower()
return any(kw in error_str for kw in [
"unauthorized", "forbidden", "expired", "invalid token",
"auth", "401", "403",
])
def _notify_admin(self, channel, error):
logger.critical(f"通道 {channel} 彻底断开: {error}")
def _notify_reauth_needed(self, channel):
logger.critical(f"通道 {channel} 需要重新认证,请手动处理")
class DeadLetterQueue:
"""
死信队列——消息发送失败后的兜底存储
通道断开时,正在发送的消息不能丢。
存入死信队列,等通道恢复后重发。
"""
def __init__(self, max_size: int = 1000):
self._queue: deque = deque(maxlen=max_size)
def enqueue(self, message: "OutboundMessage", error: str) -> None:
self._queue.append({
"message": message,
"error": error,
"timestamp": time.time(),
"retry_count": 0,
})
def drain(self, channel: str, send_fn) -> int:
"""重发指定通道的死信消息,返回重发成功的数量"""
remaining = deque()
resent = 0
for item in self._queue:
if item["message"].channel.value == channel:
if item["retry_count"] < 3:
try:
send_fn(item["message"])
resent += 1
continue # 成功,不加入 remaining
except Exception:
item["retry_count"] += 1
# 重试耗尽或发送失败,重新入队
remaining.append(item)
else:
# 非目标通道的消息,保留
remaining.append(item)
self._queue = remaining
return resent
@property
def size(self) -> int:
return len(self._queue)
@property
def by_channel(self) -> dict[str, int]:
"""按通道统计死信数量"""
counts: dict[str, int] = {}
for item in self._queue:
ch = item["message"].channel.value
counts[ch] = counts.get(ch, 0) + 1
return counts实际运行中,WhatsApp 的断开率最高——WhatsApp Web 的连接大约每 2-4 小时需要刷新一次。Telegram 的 webhook 相对稳定,但长期运行的 long polling 连接也可能超时。Gateway 必须对这些断开做透明处理:通道断开期间收到的消息不能丢,存在本地队列里,通道恢复后自动补发。
Gateway 与 DAG 编排
Gateway 收到消息后,如果任务复杂("读取数据库 → 分析 → 生成报告 → 发邮件"),不是简单丢给一个 Agent。字节跳动的 Coze 和腾讯的 ADP 都用 DAG(有向无环图)编排来处理这种场景。
"""
Gateway 中的 DAG 编排——复杂任务的节点化执行
"""
from dataclasses import dataclass, field
from typing import Any
import time
@dataclass
class DAGNode:
"""DAG 中的单个执行节点"""
node_id: str
node_type: str # "llm" | "tool" | "code" | "condition" | "loop"
config: dict # 节点配置(模型、工具名、代码等)
depends_on: list[str] = field(default_factory=list) # 前置节点
output_var: str = "" # 输出变量名,供后续节点引用
@dataclass
class DAGWorkflow:
"""一个完整的 DAG 工作流"""
workflow_id: str
name: str
nodes: list[DAGNode]
entry_points: list[str] # 入口节点 ID
class DAGExecutor:
"""
DAG 执行器——按拓扑顺序执行节点,支持条件分支和循环
字节 Coze 的调度引擎就是基于类似的设计:
- 每个节点(LLM、插件、代码块)是独立执行单元
- 支持条件分支(Switch)、循环迭代、跨节点变量流转
- 万级并发下毫秒级调度响应(优化任务队列 + 异步处理)
"""
def __init__(self, tools, llm):
self.tools = tools
self.llm = llm
self._variables: dict[str, Any] = {}
def execute(self, workflow: DAGWorkflow) -> dict:
"""执行 DAG 工作流"""
# 拓扑排序
ordered = self._topological_sort(workflow.nodes)
results = {}
for node in ordered:
# 检查条件分支
if node.node_type == "condition":
should_branch = self._evaluate_condition(node, results)
if not should_branch:
continue
# 执行节点
result = self._execute_node(node)
results[node.node_id] = result
# 存储输出变量
if node.output_var:
self._variables[node.output_var] = result
return results
def _topological_sort(self, nodes: list[DAGNode]) -> list[DAGNode]:
"""按依赖关系排序"""
in_degree: dict[str, int] = {n.node_id: 0 for n in nodes}
adj: dict[str, list[str]] = {n.node_id: [] for n in nodes}
for node in nodes:
for dep in node.depends_on:
if dep in adj:
adj[dep].append(node.node_id)
in_degree[node.node_id] += 1
queue = [n for n in nodes if in_degree[n.node_id] == 0]
result = []
while queue:
current = queue.pop(0)
result.append(current)
for neighbor in adj[current.node_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(next(n for n in nodes if n.node_id == neighbor))
if len(result) != len(nodes):
raise ValueError("DAG 中存在循环依赖")
return result
def _execute_node(self, node: DAGNode) -> Any:
"""执行单个节点"""
if node.node_type == "llm":
prompt = self._resolve_variables(node.config.get("prompt", ""))
return self.llm.generate(prompt)
elif node.node_type == "tool":
args = self._resolve_variables(node.config.get("args", {}))
return self.tools.execute(node.config["tool_name"], **args)
elif node.node_type == "code":
return self._execute_code(node.config["code"])
elif node.node_type == "condition":
return self._evaluate_condition(node, {})
else:
raise ValueError(f"未知节点类型: {node.node_type}")
def _resolve_variables(self, template: str) -> str:
"""替换模板中的变量引用(如 {{analysis_result}})"""
import re
def replacer(match):
var_name = match.group(1)
return str(self._variables.get(var_name, match.group(0)))
return re.sub(r'\{\{(\w+)\}\}', replacer, template)
def _evaluate_condition(self, node: DAGNode, results: dict) -> bool:
"""评估条件分支"""
condition = node.config.get("condition", "true")
resolved = self._resolve_variables(condition)
try:
return bool(eval(resolved))
except Exception:
return False
def _execute_code(self, code: str) -> Any:
"""执行代码节点(应在沙箱中)"""
local_vars = {"variables": self._variables}
exec(code, {}, local_vars)
return local_vars.get("result")为什么 Gateway 需要 DAG?因为不是所有任务都适合"Agent 自己规划"。对于有明确流程的业务场景(客服工单处理、数据报表生成、审批流),DAG 比自由规划的 Agent 更可控、可审计、可调试。字节 Coze 的实践是把两种方式结合:简单任务走 Agent 自由规划,复杂固定流程走 DAG 编排。
大厂架构对标
对比国内几家大厂的 Agent Gateway 设计,有一些共通的模式和我们目前 OpenClaw 做法的差异:
阿里 AgentScope——Actor 模型分布式:每个 Agent 是独立 Actor,通过 MsgHub 做广播或 P2P 消息传递。和 OpenClaw 的 Gateway 差异在于 AgentScope 原生支持水平扩展(分布式 Actor),而 OpenClaw 的 Gateway 是单机 hub-and-spoke。如果并发 session 从 50 扩到 500,OpenClaw 需要做 Gateway 多实例 + 外部状态存储(Redis),AgentScope 天然支持但学习成本高。
字节 Coze——Studio + Loop 双核:Studio 做低代码开发,Loop 做 LLM Ops(评估、prompt 调优、数据飞轮)。Coze 的 DAG 编排引擎在万级并发下做到了毫秒级调度响应,靠的是 Go 语言 + 优化任务队列。OpenClaw 目前没有对应的 Ops 层。
美团 WOWService——Master-Sub Agent 架构:Master Agent 做意图识别和任务路由,Sub Agent 各司其职(搜索、推荐、到店、外卖)。这和 OpenClaw 的双循环设计思路不同——美团把"规划"也拆成了一个独立的 Master Agent,而不是在同一个 Agent 内部做 slow loop。
蚂蚁 Ragent——基于 Ray 的分布式 Agent 框架:内部 Ray 集群 150 万+ CPU 核稳定运行。Ragent 不做具体算法,专注把用户代码通过 Ray 的 Task/Actor/Object 原语分布式执行。这种规模和 OpenClaw 面向 solo operator 的定位完全不同。
OpenClaw 的设计取舍:简单、可运维、一个人能维护。到了需要 Actor 模型分布式或百万核集群的规模,团队应该选 AgentScope 或 Ragent,而不是在单 Gateway 上堆复杂度。
Gateway 不是万能的
hub-and-spoke 架构解决了多通道的统一接入问题,但也引入了单点故障风险。Gateway 挂了,所有通道都断了。
缓解方案:Gateway 做无状态设计(会话状态存在外部存储如 Redis)、多实例部署、前置负载均衡器、健康检查 + 自动故障切换。
不过对于个人项目或小团队,单实例 Gateway 通常够用。OpenClaw 的设计就是面向 solo operator 的——一个 Gateway 管所有通道,简单直接。到了需要高可用 Gateway 的规模,团队应该考虑的是更完整的 Agent 基础设施,而不是在单 Gateway 上堆复杂度。