Agent_11_OpenClaw_Skills系统

zbhgis 浩瀚地学12257 分钟
创建于 更新于

OpenClaw 有一个 Skills 系统——Agent 通过安装 Skill 获得新能力,不需要改代码。这听起来像工具注册表的变体,但 Skills 比"注册一个函数"多了三层东西:技能描述(给 LLM 看什么时候用)执行边界(沙箱、权限、超时)供应链安全(技能代码从哪来,能不能信任)

Skill 不是什么

很多人以为 Skill 就是工具的别名。不是。工具是一个函数,Skill 是一个能力包。一个 Skill 可以包含多个工具、多个 prompt 模板、配置文件、依赖声明。

对比:

text
Tool (专题七讲的):
- 一个函数
- 一个 schema
- 手动注册到 ToolRegistry
- 没有沙箱

Skill (OpenClaw):
- 一个目录(YAML 配置 + Python/JS 代码 + prompt 模板)
- 自动注册(安装后自动出现在可用工具列表中)
- 有沙箱和执行边界
- 有供应链(从 git 仓库安装,需要审计)

Skill 的目录结构

一个 Skill 是一个目录,里面包含定义文件、执行代码、prompt 模板:

text
skills/
├── weather/
│   ├── skill.yaml          # 技能定义
│   ├── handler.py          # 执行代码
│   └── prompts/
│       └── analyze.yaml    # 关联的 prompt 模板
├── data_analysis/
│   ├── skill.yaml
│   ├── handler.py
│   └── prompts/
│       └── summarize.yaml
└── web_search/
    ├── skill.yaml
    └── handler.js          # 不一定是 Python

yaml
# skill.yaml 示例
name: weather
description: 查询城市天气
version: 1.0.0
author: "openclaw-community"

# 工具定义
tools:
  - name: get_weather
    description: 查询指定城市的当前天气
    handler: handler.py:get_weather
    category: read
    sandbox: true
    timeout: 15

  - name: get_forecast
    description: 查询指定城市的天气预报(未来 7 天)
    handler: handler.py:get_forecast
    category: read
    sandbox: true
    timeout: 15

# 权限声明
permissions:
  - network: true        # 需要网络访问(调天气 API)
  - filesystem: read     # 只读文件系统
  - execute: false       # 不允许执行任意代码

# 依赖
dependencies:
  python:
    - requests>=2.28

# 元数据
tags: [weather, api]

python
"""
Skill 加载系统——从 YAML 配置到可执行工具
"""

import yaml
import importlib
import os
from typing import Any


class SkillDefinition:
    """Skill 定义——从 skill.yaml 解析"""
    name: str
    version: str
    description: str
    tools: list[dict]
    permissions: dict
    dependencies: dict


class SkillLoader:
    """
    Skill 加载器

    流程:
    1. 读取 skill.yaml
    2. 验证权限声明
    3. 安装依赖
    4. 导入 handler 代码
    5. 注册工具到 ToolRouter
    """

    def __init__(self, tool_router, skill_dir: str = "./skills"):
        self.tool_router = tool_router
        self.skill_dir = skill_dir
        self._loaded_skills: dict[str, SkillDefinition] = {}

    def load_skill(self, skill_name: str) -> dict:
        """加载单个 Skill"""
        skill_path = os.path.join(self.skill_dir, skill_name, "skill.yaml")
        if not os.path.exists(skill_path):
            return {"success": False, "error": f"Skill 不存在: {skill_name}"}

        # 1. 读取 YAML
        with open(skill_path, "r", encoding="utf-8") as f:
            config = yaml.safe_load(f)

        # 2. 验证
        errors = self._validate(config)
        if errors:
            return {"success": False, "error": "; ".join(errors)}

        # 3. 安装依赖
        self._install_dependencies(config.get("dependencies", {}))

        # 4. 导入 handler
        tools = self._import_handlers(config, skill_path)

        # 5. 注册
        for tool in tools:
            self.tool_router.register(tool)

        self._loaded_skills[skill_name] = config
        return {"success": True, "tools_loaded": len(tools)}

    def _validate(self, config: dict) -> list[str]:
        """验证 Skill 配置"""
        errors = []
        if not config.get("name"):
            errors.append("缺少 name 字段")
        if not config.get("tools"):
            errors.append("缺少 tools 字段")

        for tool in config.get("tools", []):
            if not tool.get("handler"):
                errors.append(f"工具 {tool.get('name', '?')} 缺少 handler")

            # 检查 handler 格式
            handler = tool.get("handler", "")
            if ":" not in handler:
                errors.append(
                    f"工具 {tool.get('name', '?')} 的 handler 格式错误,"
                    f"应为 'filename:function_name'"
                )

        return errors

    def _import_handlers(self, config: dict, yaml_path: str) -> list:
        """导入 handler 代码并构造 ToolDefinition"""
        import importlib.util
        from dataclasses import dataclass

        @dataclass
        class ToolDefinition:
            name: str
            description: str
            handler: Any
            category: str = "general"
            sandbox: bool = True
            timeout: float = 30.0
            requires_approval: bool = False

        skill_dir = os.path.dirname(yaml_path)
        tools = []

        for tool_def in config.get("tools", []):
            handler_ref = tool_def["handler"]
            module_name, func_name = handler_ref.split(":", 1)

            # 动态导入
            module_path = os.path.join(skill_dir, module_name)
            if module_path.endswith(".py"):
                spec = importlib.util.spec_from_file_location(
                    f"skill_{config['name']}_{module_name}",
                    module_path,
                )
                module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(module)
                handler_func = getattr(module, func_name)
            else:
                raise ValueError(f"不支持的 handler 类型: {module_path}")

            # 构造 ToolDefinition
            tool = ToolDefinition(
                name=tool_def["name"],
                description=tool_def["description"],
                handler=handler_func,
                category=tool_def.get("category", "general"),
                sandbox=tool_def.get("sandbox", True),
                timeout=tool_def.get("timeout", 30.0),
                requires_approval=tool_def.get("requires_approval", False),
            )
            tools.append(tool)

        return tools

    def _install_dependencies(self, deps: dict) -> None:
        """安装 Skill 依赖"""
        python_deps = deps.get("python", [])
        if python_deps:
            import subprocess
            subprocess.run(
                ["pip", "install", "-q"] + python_deps,
                check=True,
            )

    def unload_skill(self, skill_name: str) -> None:
        """卸载 Skill——从 ToolRouter 中移除其工具"""
        if skill_name in self._loaded_skills:
            config = self._loaded_skills[skill_name]
            for tool_def in config.get("tools", []):
                # 注意:ToolRouter 需要支持 unregister
                pass
            del self._loaded_skills[skill_name]

    def list_loaded(self) -> list[str]:
        return list(self._loaded_skills.keys())


# 使用
loader = SkillLoader(tool_router=router, skill_dir="./skills")
result = loader.load_skill("weather")
print(result)  # {"success": True, "tools_loaded": 2}

Skill 自动加载的好处:新增一个 Skill 不需要改 Agent 代码,把目录放进去、调 load_skill 就行。卸载同理。

沙箱隔离

Skill 执行的安全核心是沙箱。Skill 可能来自社区、第三方、甚至用户自己写的。不能假设它是安全的。

python
"""
Skill 执行沙箱——限制 Skill 的系统访问权限
"""

import os
import sys
import tempfile
import subprocess
from pathlib import Path
from dataclasses import dataclass


@dataclass
class SandboxConfig:
    """沙箱配置"""
    allow_network: bool = False       # 是否允许网络
    allow_filesystem: str = "none"    # "none" | "read" | "write"
    allow_execute: bool = False       # 是否允许执行子进程
    work_dir: str = "./workspace"     # 工作目录
    max_memory_mb: int = 256          # 最大内存
    max_cpu_seconds: float = 10.0     # 最大 CPU 时间


class SandboxExecutor:
    """
    沙箱执行器

    不是真正的安全沙箱(真正的沙箱需要 Docker/gVisor/Firecracker)。
    这里是进程级隔离——限制子进程的系统访问。
    """

    def __init__(self, config: SandboxConfig):
        self.config = config

    def execute(self, handler_func, **kwargs) -> dict:
        """
        在沙箱中执行 handler

        如果 handler 是纯 Python 函数,在当前进程执行但限制访问。
        如果配置了隔离级别,在子进程中执行。
        """
        if self._needs_isolation():
            return self._subprocess_execute(handler_func, kwargs)
        else:
            return self._in_process_execute(handler_func, kwargs)

    def _needs_isolation(self) -> bool:
        return not self.config.allow_execute

    def _subprocess_execute(self, handler_func, kwargs: dict) -> dict:
        """
        子进程执行——在隔离的 Python 进程中运行 handler

        1. 把 handler 代码和参数写入临时文件
        2. 启动受限的 Python 子进程
        3. 读取输出
        """
        with tempfile.TemporaryDirectory() as tmpdir:
            # 写入参数
            import json
            params_path = os.path.join(tmpdir, "params.json")
            with open(params_path, "w") as f:
                json.dump(kwargs, f)

            # 构建受限的 Python 命令
            cmd = [
                sys.executable,
                "-u",
                "-c",
                self._build_restricted_script(handler_func, params_path),
            ]

            # 设置环境变量限制
            env = os.environ.copy()
            env["PYTHONSAFEPATH"] = "1"

            try:
                result = subprocess.run(
                    cmd,
                    capture_output=True,
                    text=True,
                    timeout=self.config.max_cpu_seconds,
                    env=env,
                    cwd=self.config.work_dir,
                )

                if result.returncode == 0:
                    output = json.loads(result.stdout)
                    return {"success": True, "output": output}
                else:
                    return {
                        "success": False,
                        "error": f"子进程失败: {result.stderr[:500]}",
                    }

            except subprocess.TimeoutExpired:
                return {"success": False, "error": f"执行超时(>{self.config.max_cpu_seconds}s)"}

    def _in_process_execute(self, handler_func, kwargs: dict) -> dict:
        """
        进程内执行——带权限检查

        不启动子进程,但通过权限检查限制 handler 的访问。
        """
        try:
            # 检查网络访问
            if not self.config.allow_network:
                self._check_no_network_access(handler_func)

            # 检查文件系统访问
            if self.config.allow_filesystem == "none":
                self._check_no_filesystem_access(handler_func)

            result = handler_func(**kwargs)
            return {"success": True, "output": result}

        except PermissionError as e:
            return {"success": False, "error": f"权限拒绝: {e}"}
        except Exception as e:
            return {"success": False, "error": str(e)}

    def _build_restricted_script(self, handler_func, params_path: str) -> str:
        """构建受限的 Python 执行脚本"""
        import inspect
        import json

        source = inspect.getsource(handler_func)
        handler_name = handler_func.__name__
        allowed_dir = self.config.work_dir

        # 把 allowed_dir 和 handler_name 通过参数传递,不用 f-string 嵌入
        return f"""
import json
import sys

# 禁用网络访问
import socket
socket.socket = None
import urllib.request
urllib.request.urlopen = None

# 限制文件系统
_original_open = open
_allowed_dir = {json.dumps(allowed_dir)}
def restricted_open(file, mode='r', *args, **kwargs):
    import os
    abs_path = os.path.abspath(file)
    if not abs_path.startswith(os.path.abspath(_allowed_dir)):
        raise PermissionError(f"不允许访问 {{abs_path}}")
    return _original_open(file, mode, *args, **kwargs)
import builtins
builtins.open = restricted_open

# 执行 handler
{source}

# 读取参数
with open({json.dumps(params_path)}) as f:
    params = json.load(f)

# 调用
result = {handler_name}(**params)

# 输出
print(json.dumps(result, default=str))
"""

    def _check_no_network_access(self, handler_func):
        """检查 handler 是否尝试网络访问(静态分析)"""
        import inspect
        source = inspect.getsource(handler_func)
        network_keywords = [
            "requests.", "urllib", "socket", "http.",
            "urlopen", "HTTPConnection", "aiohttp",
        ]
        for kw in network_keywords:
            if kw in source:
                raise PermissionError(
                    f"Skill 尝试网络访问(检测到 '{kw}'),但沙箱不允许"
                )

    def _check_no_filesystem_access(self, handler_func):
        """检查 handler 是否尝试文件系统访问"""
        import inspect
        source = inspect.getsource(handler_func)
        fs_keywords = ["open(", "os.path", "pathlib", "shutil", "glob"]
        for kw in fs_keywords:
            if kw in source:
                raise PermissionError(
                    f"Skill 尝试文件系统访问(检测到 '{kw}'),但沙箱不允许"
                )

这里的"沙箱"不是真正的安全沙箱。静态分析关键词可以绕过(比如 getattr(import("requests"), "get"))。生产环境应该用 Docker 容器、gVisor 或 Firecracker 做真正的隔离。OpenClaw 面向的是 solo operator,大部分 Skill 是自己写的,进程级隔离 + 权限声明足够了。

Skill 供应链安全

这是 Skills 系统最容易被忽视的部分。从 git 仓库安装的 Skill,你怎么知道里面没有恶意代码?

python
"""
Skill 供应链安全——安装前的审计

三个层级的审计:
1. 来源验证:Skill 从哪来的?
2. 静态扫描:代码有没有可疑模式?
3. 运行时监控:执行时有没有越权行为?
"""

import hashlib
import json
import os
from dataclasses import dataclass


@dataclass
class SkillAudit:
    """Skill 审计报告"""
    skill_name: str
    source: str  # 来源(git URL 或 local)
    source_verified: bool  # 来源是否可信
    static_scan: dict  # 静态扫描结果
    risk_level: str  # "low" | "medium" | "high" | "critical"
    recommendations: list[str]


class SkillAuditor:
    """
    Skill 审计器

    在 Skill 安装前运行。
    高风险 Skill 需要用户确认后才能安装。
    """

    # 危险模式
    DANGEROUS_PATTERNS = [
        # 执行任意代码
        ("eval(", "critical", "使用 eval() 可执行任意代码"),
        ("exec(", "critical", "使用 exec() 可执行任意代码"),
        ("subprocess", "high", "使用 subprocess 可执行系统命令"),
        ("os.system", "critical", "使用 os.system 可执行系统命令"),
        # 网络通信
        ("socket.", "medium", "直接 socket 访问,可能外泄数据"),
        ("requests.post", "medium", "可能外发数据到外部"),
        ("urllib.request", "medium", "可能外发数据到外部"),
        # 文件系统
        ("shutil.rmtree", "high", "可删除整个目录"),
        ("os.remove", "medium", "可删除文件"),
        ("/etc/", "high", "访问系统文件"),
        ("~/.ssh", "critical", "访问 SSH 密钥"),
        ("~/.env", "high", "访问环境变量文件"),
        # 环境变量
        ("os.environ", "high", "读取环境变量(可能包含密钥)"),
    ]

    # 可信来源
    TRUSTED_SOURCES = [
        "github.com/openclaw/",       # 官方 Skill
        "github.com/openclaw-skills/",  # 社区验证 Skill
    ]

    def audit(self, skill_dir: str, source: str = "local") -> SkillAudit:
        """审计 Skill 目录"""
        skill_name = os.path.basename(skill_dir)

        # 来源验证
        source_verified = any(
            source.startswith(ts) for ts in self.TRUSTED_SOURCES
        )

        # 静态扫描
        scan_results = self._static_scan(skill_dir)

        # 计算风险等级
        risk_level = self._calculate_risk(scan_results, source_verified)

        # 建议
        recommendations = self._recommend(scan_results, source_verified)

        return SkillAudit(
            skill_name=skill_name,
            source=source,
            source_verified=source_verified,
            static_scan=scan_results,
            risk_level=risk_level,
            recommendations=recommendations,
        )

    def _static_scan(self, skill_dir: str) -> dict:
        """静态扫描 Skill 目录中的代码"""
        findings = {"critical": [], "high": [], "medium": [], "low": []}

        for root, dirs, files in os.walk(skill_dir):
            for file in files:
                if file.endswith((".py", ".js", ".sh")):
                    filepath = os.path.join(root, file)
                    with open(filepath, "r", encoding="utf-8") as f:
                        content = f.read()

                    for pattern, severity, description in self.DANGEROUS_PATTERNS:
                        if pattern in content:
                            findings[severity].append({
                                "file": filepath,
                                "pattern": pattern,
                                "description": description,
                            })

        return findings

    def _calculate_risk(self, scan_results: dict, source_verified: bool) -> str:
        """计算风险等级"""
        if scan_results["critical"]:
            return "critical"
        if scan_results["high"]:
            return "high"
        if scan_results["medium"] and not source_verified:
            return "high"
        if scan_results["medium"]:
            return "medium"
        return "low"

    def _recommend(self, scan_results: dict, source_verified: bool) -> list[str]:
        """生成建议"""
        recs = []

        if not source_verified:
            recs.append("来源未验证,建议手动审查代码后再安装")

        if scan_results["critical"]:
            recs.append(f"发现 {len(scan_results['critical'])} 个严重问题,不建议安装")

        if scan_results["high"]:
            recs.append(f"发现 {len(scan_results['high'])} 个高风险问题,请确认是否有必要")

        if scan_results["medium"]:
            recs.append(f"发现 {len(scan_results['medium'])} 个中风险问题,建议在沙箱中运行")

        if not recs:
            recs.append("未发现问题,可安全安装")

        return recs


# 使用
auditor = SkillAuditor()
audit = auditor.audit("./skills/weather", source="github.com/openclaw-skills/weather")

print(f"来源验证: {'通过' if audit.source_verified else '未通过'}")
print(f"风险等级: {audit.risk_level}")
for rec in audit.recommendations:
    print(f"  - {rec}")

# 输出示例:
# 来源验证: 通过
# 风险等级: low
#   - 未发现问题,可安全安装

供应链安全的现实问题:Skill 的 skill.yaml 声明需要网络访问(调天气 API),但 handler 代码里还偷偷读了 ~/.ssh/id_rsa。静态扫描能发现 open("~/.ssh") 这种明显模式,但如果代码是混淆过的(比如 getattr(os, "path").expanduser("~/.ssh")),静态分析就不够了。

最实用的建议:只安装来自可信来源的 Skill,不信任的 Skill 在隔离环境(Docker 容器)中运行

Skill 协议与行业标准

OpenClaw 的 skill.yaml 是自定义格式。国内大厂在做标准化的工具/插件协议:

字节 Coze——OpenAPI/Swagger 兼容协议:所有插件都走标准 OpenAPI 规范,新增插件只需要写配置文件,不需要代码。Coze 自动从 OpenAPI 定义生成工具 schema、参数校验、错误码映射。好处是:一个配置文件可以对接任何 RESTful API,不用写 handler 代码。

腾讯 ADP——MCP(Model Context Protocol)支持:腾讯 ADP 同时支持 MCP 协议和 OpenAI Agents SDK。MCP 是 Anthropic 提出的标准协议,定义了模型和工具之间的通信格式。兼容 MCP 意味着 Skill 可以在不同 Agent 系统之间复用,不被锁定在某个平台上。

美团 WOWService——统一交互协议:所有 Sub Agent 通过标准化协议通信,支持意图识别 → 任务分发 → 结果聚合的完整流程。Master Agent 和 Sub Agent 之间的消息格式是固定的,不同团队可以独立开发自己的 Sub Agent。

阿里 AgentScope——模块化 Prompt/Model/Tools/Memory:每个组件都是可替换的,新增能力只需要实现接口,不需要改框架。AgentScope-Java 还把这套东西搬到了 Java 生态,集成了 Higress(AI 网关)、Nacos(服务注册)。

对比 OpenClaw 的 skill.yaml

维度OpenClaw skill.yaml字节 Coze OpenAPI腾讯 MCP
定义方式YAML 手工配置OpenAPI/Swagger 规范MCP 标准协议
代码要求需要写 handler.py零代码(纯配置)零代码(纯配置)
跨平台复用不可复用Coze 生态内复用跨平台复用
学习成本
灵活性中(受限于 RESTful)
OpenClaw 的 skill.yaml 优势在于灵活——你可以写任意复杂的 handler 代码。劣势是每个 Skill 都要写代码,不像 OpenAPI 那样零代码对接。如果 Skill 主要是在调外部 API(查天气、搜数据、发邮件),OpenAPI 方案更省事。如果 Skill 需要复杂逻辑(数据分析、文件处理),手写 handler 更灵活。

Skill 与 Tool 的关系

回到根本问题——Skill 和 Tool 到底是什么关系?

Tool 是原子能力,Skill 是能力的封装和分发单元。一个 Skill 可以包含多个 Tool,还附带了 prompt 模板、权限声明、依赖管理。

类比的话(好吧我说过不用类比,但这个确实直观):Tool 是函数,Skill 是 Python 包。你安装一个 Skill 就像 pip install 一个包——它自动注册了自己所有的函数,声明了依赖,告诉你要什么权限。

实际项目中,建议这样分工:

  • Tool 层(专题七):每个工具独立写好、测试好——自描述、错误处理、schema 自动生成
  • Skill 层:把相关的 Tool 打包,加权限声明和依赖,做成可插拔的安装包
  • Agent 层:通过安装/卸载 Skill 来管理可用能力,不需要改代码