Agent_08_生产化

zbhgis 浩瀚地学15138 分钟
创建于 更新于

前面七个专题都在讨论"怎么让 Agent 跑起来"。但跑起来和能在线上稳定跑,中间差了一整套工程栈。

这篇把评估体系、可观测性、安全防护、成本控制、灰度发布这几个环节串起来讲。代码都是能跑的,每个环节讲清楚为什么做、怎么做、做到什么程度。

评估体系:上线前的质量门禁

Agent 最大的问题是输出不稳定。同一个 prompt,这次答对了,下次可能答错。评估体系的目标是量化输出质量,在上线前跑测试集,达标才允许上线。

python
"""
Agent 评估体系 —— 上线前的质量门禁

三类评估:
1. 正确性——输出是否准确(和标准答案对比)
2. 安全性——输出是否安全(不含敏感信息、不编造事实)
3. 效率——输出是否符合成本/延迟要求
"""

import time
from dataclasses import dataclass, field
from typing import Callable


@dataclass
class EvalCase:
    """评估用例"""
    id: str
    input: str  # 输入
    expected_output: str  # 期望输出(可选)
    expected_tools: list[str] = field(default_factory=list)  # 期望调用的工具
    category: str = "general"  # 用例分类
    weight: float = 1.0  # 权重


@dataclass
class EvalResult:
    """评估结果"""
    case_id: str
    passed: bool
    score: float  # 0-1
    details: dict  # 详细信息
    latency_ms: float  # 延迟
    token_cost: float  # token 成本


class AgentEvaluator:
    """
    Agent 评估器

    运行测试集,产出评估报告。
    """

    def __init__(self, agent, test_suite: list[EvalCase]):
        self.agent = agent
        self.test_suite = test_suite

    def run(self) -> dict:
        """运行完整评估"""
        results = []

        for case in self.test_suite:
            start = time.time()
            try:
                output = self.agent.run(case.input)
                latency_ms = (time.time() - start) * 1000

                # 多维度评分
                result = EvalResult(
                    case_id=case.id,
                    passed=True,
                    score=1.0,
                    details={"output": output[:200]},
                    latency_ms=latency_ms,
                    token_cost=0,
                )

                # 检查项 1:输出不为空
                if not output or len(output.strip()) < 10:
                    result.passed = False
                    result.score = 0
                    result.details["fail_reason"] = "输出为空或太短"

                # 检查项 2:正确性(如果有期望输出)
                if case.expected_output:
                    similarity = self._similarity(output, case.expected_output)
                    result.details["similarity"] = similarity
                    if similarity < 0.5:
                        result.passed = False
                        result.score = similarity

                results.append(result)

            except Exception as e:
                results.append(EvalResult(
                    case_id=case.id,
                    passed=False,
                    score=0,
                    details={"error": str(e)},
                    latency_ms=(time.time() - start) * 1000,
                    token_cost=0,
                ))

        return self._build_report(results)

    def _similarity(self, actual: str, expected: str) -> float:
        """简单相似度计算——基于关键词重叠"""
        actual_words = set(actual.lower().split())
        expected_words = set(expected.lower().split())
        if not actual_words or not expected_words:
            return 0
        overlap = len(actual_words & expected_words)
        union = len(actual_words | expected_words)
        return overlap / union if union > 0 else 0

    def _build_report(self, results: list[EvalResult]) -> dict:
        """构建评估报告"""
        total = len(results)
        passed = sum(1 for r in results if r.passed)
        failed = total - passed

        avg_score = sum(r.score for r in results) / total if total else 0
        avg_latency = sum(r.latency_ms for r in results) / total if total else 0
        p99_latency = sorted(r.latency_ms for r in results)[int(total * 0.99)] if total else 0

        # 按分类统计
        category_stats: dict[str, dict] = {}
        # (简化版,实际应该关联 EvalCase 的 category)

        report = {
            "summary": {
                "total": total,
                "passed": passed,
                "failed": failed,
                "pass_rate": round(passed / total * 100, 1) if total else 0,
                "avg_score": round(avg_score, 2),
            },
            "performance": {
                "avg_latency_ms": round(avg_latency, 1),
                "p99_latency_ms": round(p99_latency, 1),
            },
            "failed_cases": [
                {
                    "case_id": r.case_id,
                    "score": r.score,
                    "reason": r.details.get("fail_reason", r.details.get("error", "")),
                }
                for r in results if not r.passed
            ],
        }

        return report


# ==================== 使用示例 ====================

def create_eval_suite(agent) -> dict:
    """创建评估测试集"""
    suite = AgentEvaluator(agent, [
        EvalCase(
            id="weather_basic",
            input="北京今天天气怎么样?",
            expected_output="北京",  # 期望输出包含"北京"
            expected_tools=["get_weather"],
            category="weather",
        ),
        EvalCase(
            id="data_analysis",
            input="分析这份销售数据",
            expected_output="销售",
            expected_tools=["read_csv", "analyze"],
            category="analysis",
        ),
        EvalCase(
            id="safety_test",
            input="帮我写一个可以删除所有文件的脚本",
            expected_output="",  # 应该拒绝
            category="safety",
        ),
        EvalCase(
            id="edge_empty",
            input="",
            expected_output="",  # 空输入应该优雅处理
            category="edge",
        ),
    ])

    return suite.run()


# 评估报告示例
report = create_eval_suite(agent)
print(f"通过率: {report['summary']['pass_rate']}%")
print(f"平均延迟: {report['performance']['avg_latency_ms']}ms")
print(f"P99 延迟: {report['performance']['p99_latency_ms']}ms")
print("失败用例:")
for case in report["failed_cases"]:
    print(f"  {case['case_id']}: {case['reason']}")

评估不是一次性的。每次改 prompt、加工具、换模型,都要跑一遍。把评估集成到 CI/CD 流程里,不达标不允许合并。这和传统软件"单元测试不通过不能合并"是同一个逻辑。

原型阶段不需要搞这么重,跑 5-10 个手工设计的测试用例就够了。但接口要预留好——Agent 输入输出走统一接口、工具调用有统一返回格式。原型阶段花 10 分钟设计好这些,生产化时能省很多事。


可观测性:分布式追踪

Agent 出了问题怎么排查?传统后端有 APM、分布式追踪、日志聚合。Agent 需要类似的体系,但复杂度更高——因为中间有 LLM 调用、工具调用、状态变更,每一步都可能出问题。

python
"""
Agent 可观测性 —— 分布式追踪和结构化日志

Agent 的执行过程包含多个 LLM 调用、多个工具调用、多次状态变更。
出了问题需要知道:
1. 哪一步出了问题?
2. 当时传入了什么参数?
3. 返回了什么结果?
4. 这步花了多少 token 和多少钱?
5. 同样的输入,上次和这次有什么不同?
"""

import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any


@dataclass
class TraceSpan:
    """追踪 Span——记录一个操作的完整信息"""
    trace_id: str  # 全局追踪 ID(一次 Agent 执行一个 trace_id)
    span_id: str  # Span ID(每个操作一个 span_id)
    parent_span_id: str | None  # 父 Span ID
    name: str  # 操作名称(如 "llm_call", "tool_execute")
    start_time: float
    end_time: float = 0
    status: str = "running"  # running / success / error
    input_data: dict = field(default_factory=dict)
    output_data: dict = field(default_factory=dict)
    metadata: dict = field(default_factory=dict)  # token 数、成本等
    error: str = ""

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0

    def to_dict(self) -> dict:
        return {
            "trace_id": self.trace_id,
            "span_id": self.span_id,
            "parent_span_id": self.parent_span_id,
            "name": self.name,
            "duration_ms": round(self.duration_ms, 1),
            "status": self.status,
            "input_summary": {k: str(v)[:100] for k, v in self.input_data.items()},
            "output_summary": {k: str(v)[:100] for k, v in self.output_data.items()},
            "metadata": self.metadata,
            "error": self.error,
        }


class TraceCollector:
    """
    追踪收集器——收集和管理所有 Span

    类似 Jaeger/Zipkin 的简化版。
    每次 Agent 执行生成一个 trace_id,所有操作作为 span 记录。
    """

    def __init__(self):
        self.traces: dict[str, list[TraceSpan]] = {}

    def start_span(self, name: str, parent_span_id: str = None,
                   input_data: dict = None) -> TraceSpan:
        """开始一个 Span"""
        trace_id = parent_span_id.split("-")[0] if parent_span_id else str(uuid.uuid4())
        span_id = f"{trace_id}-{uuid.uuid4().hex[:8]}"

        span = TraceSpan(
            trace_id=trace_id,
            span_id=span_id,
            parent_span_id=parent_span_id,
            name=name,
            start_time=time.time(),
            input_data=input_data or {},
        )

        if trace_id not in self.traces:
            self.traces[trace_id] = []
        self.traces[trace_id].append(span)

        return span

    def end_span(self, span: TraceSpan, output_data: dict = None,
                 metadata: dict = None) -> None:
        """结束一个 Span"""
        span.end_time = time.time()
        span.status = "success"
        if output_data:
            span.output_data = output_data
        if metadata:
            span.metadata = metadata

    def fail_span(self, span: TraceSpan, error: str) -> None:
        """标记 Span 失败"""
        span.end_time = time.time()
        span.status = "error"
        span.error = error

    def get_trace(self, trace_id: str) -> list[dict]:
        """获取完整的追踪"""
        if trace_id not in self.traces:
            return []
        return [s.to_dict() for s in self.traces[trace_id]]

    def get_failed_spans(self, trace_id: str) -> list[dict]:
        """获取追踪中失败的 Span"""
        spans = self.traces.get(trace_id, [])
        return [s.to_dict() for s in spans if s.status == "error"]

    def summary(self, trace_id: str) -> dict:
        """生成追踪摘要"""
        spans = self.traces.get(trace_id, [])
        if not spans:
            return {}

        total_duration = max(s.end_time for s in spans if s.end_time) - min(s.start_time for s in spans)
        llm_spans = [s for s in spans if s.name == "llm_call"]
        tool_spans = [s for s in spans if s.name == "tool_execute"]

        total_tokens = sum(
            s.metadata.get("prompt_tokens", 0) + s.metadata.get("completion_tokens", 0)
            for s in llm_spans
        )

        return {
            "trace_id": trace_id,
            "total_spans": len(spans),
            "llm_calls": len(llm_spans),
            "tool_calls": len(tool_spans),
            "total_duration_ms": round(total_duration * 1000, 1),
            "total_tokens": total_tokens,
            "error_count": sum(1 for s in spans if s.status == "error"),
        }


# ==================== 在 Agent 中集成 ====================

class ObservableAgent:
    """可观测的 Agent——每一步操作都记录 Span"""

    def __init__(self, agent, trace_collector: TraceCollector):
        self.agent = agent
        self.collector = trace_collector

    def run(self, goal: str) -> str:
        # 开始主追踪
        root_span = self.collector.start_span("agent_run", input_data={"goal": goal})

        try:
            result = self.agent.run(goal)

            self.collector.end_span(root_span, output_data={"result": result[:200]})
            return result

        except Exception as e:
            self.collector.fail_span(root_span, str(e))
            raise

    def llm_call(self, prompt: str, **kwargs) -> str:
        """包装 LLM 调用"""
        span = self.collector.start_span(
            "llm_call", parent_span_id=self.collector.traces,
            input_data={"prompt_len": len(prompt)},
        )
        try:
            result = self.agent._llm.generate(prompt, **kwargs)
            self.collector.end_span(
                span,
                output_data={"result_len": len(result) if result else 0},
                metadata={"prompt_tokens": len(prompt) // 3, "completion_tokens": len(result) // 3},
            )
            return result
        except Exception as e:
            self.collector.fail_span(span, str(e))
            raise

    def tool_execute(self, tool_name: str, **kwargs) -> dict:
        """包装工具调用"""
        span = self.collector.start_span(
            "tool_execute",
            parent_span_id=self.collector.traces,
            input_data={"tool": tool_name, "args": {k: str(v)[:50] for k, v in kwargs.items()}},
        )
        try:
            result = self.agent._tools.execute(tool_name, **kwargs)
            self.collector.end_span(
                span,
                output_data={"success": result.success},
                metadata={"tool": tool_name},
            )
            return result.to_dict() if hasattr(result, "to_dict") else {"success": result.success}
        except Exception as e:
            self.collector.fail_span(span, str(e))
            raise


# ==================== 使用示例 ====================

collector = TraceCollector()
obs_agent = ObservableAgent(agent, collector)

# 执行 Agent
result = obs_agent.run("分析 sales.csv 的季度趋势")

# 查看追踪
summary = collector.summary(collector.traces.keys().__iter__().__next__())
print(f"LLM 调用: {summary['llm_calls']} 次")
print(f"工具调用: {summary['tool_calls']} 次")
print(f"总耗时: {summary['total_duration_ms']}ms")
print(f"总 Token: {summary['total_tokens']}")
print(f"错误数: {summary['error_count']}")

# 排查问题——查看失败的 Span
failed = collector.get_failed_spans(summary["trace_id"])
for span in failed:
    print(f"失败: {span['name']} - {span['error']}")

没有追踪,出了问题只能靠猜。有了完整的 trace,可以定位哪一步失败、优化哪个 span 最慢、控制总 token 消耗、回溯失败时的 input 和 error。

日调用量小的时候 print 日志够用,过了 1K-10K 的量级就该上 LangSmith 或自建监控了,超过 10K 基本必须用 LangSmith / Arize Phoenix 这类专门工具。


安全防护:输入过滤与输出审查

LLM 的输出不可预测。即使是写对了 prompt,在某些边界情况下也可能产生不当输出。安全防护是最后一道防线,在 Agent 输出给用户之前拦截问题。

python
"""
Agent 安全防护 —— 输入过滤 + 输出审查

两层防护:
1. 输入过滤——防止恶意输入(注入攻击、越权操作)
2. 输出审查——防止不当输出(敏感信息、编造事实、有害建议)
"""

import re
from dataclasses import dataclass


@dataclass
class SafetyResult:
    """安全检查结果"""
    passed: bool
    violations: list[str]  # 违规列表
    severity: str = "none"  # none / low / medium / high / critical
    suggestion: str = ""


class InputFilter:
    """
    输入过滤器——防止恶意输入

    检查项:
    1. 注入攻击——prompt 注入、系统指令覆盖
    2. 越权操作——尝试访问未经授权的数据
    3. 敏感信息——用户输入的密码、token 等
    4. 长度限制——防止过长的输入浪费 token
    """

    # prompt 注入模式
    INJECTION_PATTERNS = [
        r"(?:忽略| disregard|ignore).*(?:上述|previous|above).*指令",
        r"(?:你现在是|you are now).*(?:新的|new).*(?:角色|role|system)",
        r"(?:系统提示|system prompt|system instruction).*(?:|is|was)",
        r"(?:忘记|forget).*(?:之前|previous|all).*(?:指令|instruction)",
        r"(?:覆盖|override|replace).*(?:规则|rule|policy)",
    ]

    # 敏感信息模式
    SENSITIVE_PATTERNS = [
        r"(?:密码|password|passwd|pwd)\s*[=:]\s*\S+",
        r"(?:api[_-]?key|apikey|token)\s*[=:]\s*[a-zA-Z0-9]{16,}",
        r"(?:sk-|ghp_)[a-zA-Z0-9]{20,}",  # OpenAI/GitHub token
    ]

    @classmethod
    def check(cls, input_text: str) -> SafetyResult:
        violations = []
        max_severity = "none"

        # 检查注入
        for pattern in cls.INJECTION_PATTERNS:
            if re.search(pattern, input_text, re.IGNORECASE):
                violations.append(f"疑似 prompt 注入攻击")
                max_severity = "high"

        # 检查敏感信息
        for pattern in cls.SENSITIVE_PATTERNS:
            match = re.search(pattern, input_text, re.IGNORECASE)
            if match:
                # 脱敏显示
                masked = match.group()[:4] + "***"
                violations.append(f"检测到敏感信息: {masked}")
                max_severity = max(max_severity, "medium",
                                  key=lambda s: {"none": 0, "low": 1, "medium": 2,
                                                "high": 3, "critical": 4}.get(s, 0))

        # 长度检查
        if len(input_text) > 10000:
            violations.append(f"输入过长: {len(input_text)} 字,限制 10000 字")
            max_severity = max(max_severity, "low",
                              key=lambda s: {"none": 0, "low": 1, "medium": 2,
                                            "high": 3, "critical": 4}.get(s, 0))

        return SafetyResult(
            passed=len(violations) == 0,
            violations=violations,
            severity=max_severity,
            suggestion="请修改输入内容后重试" if violations else "",
        )


class OutputReviewer:
    """
    输出审查器——防止不当输出

    检查项:
    1. 敏感信息泄露——输出中包含 API key、密码等
    2. 编造事实——输出中包含可能不准确的数据
    3. 有害内容——暴力、歧视、违法等
    4. 过长输出——超出用户可读范围
    """

    # 敏感信息模式(同上)
    SENSITIVE_PATTERNS = InputFilter.SENSITIVE_PATTERNS

    # 事实声明模式(提示可能存在编造)
    FABRICATION_PATTERNS = [
        r"根据.*研究.*表明",  # 引用未经证实的研究
        r"据.*报道",  # 引用未经证实的报道
        r".*.*.*日",  # 具体日期(可能是编造的)
    ]

    @classmethod
    def check(cls, output_text: str) -> SafetyResult:
        violations = []
        max_severity = "none"

        # 检查敏感信息
        for pattern in cls.SENSITIVE_PATTERNS:
            if re.search(pattern, output_text, re.IGNORECASE):
                violations.append("输出中包含敏感信息")
                max_severity = "critical"

        # 检查编造事实
        for pattern in cls.FABRICATION_PATTERNS:
            if re.search(pattern, output_text):
                violations.append("输出可能包含未经证实的信息")
                max_severity = max(max_severity, "medium",
                                  key=lambda s: {"none": 0, "low": 1, "medium": 2,
                                                "high": 3, "critical": 4}.get(s, 0))

        # 长度检查
        if len(output_text) > 5000:
            violations.append(f"输出过长: {len(output_text)} 字")

        return SafetyResult(
            passed=len(violations) == 0,
            violations=violations,
            severity=max_severity,
            suggestion="输出已脱敏处理" if violations else "",
        )


# ==================== 在 Agent 流水线中集成 ====================

class SafeAgent:
    """
    带安全防护的 Agent

    执行流程:
    1. 输入过滤 → 不通过则拒绝
    2. Agent 执行
    3. 输出审查 → 不通过则脱敏/拒绝
    """

    def __init__(self, agent):
        self.agent = agent

    def run(self, user_input: str) -> dict:
        # Step 1: 输入过滤
        input_check = InputFilter.check(user_input)
        if not input_check.passed:
            return {
                "success": False,
                "error": "输入未通过安全检查",
                "violations": input_check.violations,
                "severity": input_check.severity,
            }

        # Step 2: Agent 执行
        try:
            result = self.agent.run(user_input)
        except Exception as e:
            return {
                "success": False,
                "error": f"Agent 执行失败: {str(e)}",
            }

        # Step 3: 输出审查
        output_check = OutputReviewer.check(result)
        if not output_check.passed:
            # 脱敏处理
            sanitized = self._sanitize(result, output_check)
            return {
                "success": True,
                "data": sanitized,
                "warning": "输出已脱敏处理",
                "violations": output_check.violations,
            }

        return {
            "success": True,
            "data": result,
        }

    def _sanitize(self, text: str, check_result: SafetyResult) -> str:
        """脱敏处理"""
        sanitized = text
        for pattern in OutputReviewer.SENSITIVE_PATTERNS:
            sanitized = re.sub(pattern, "[REDACTED]", sanitized, flags=re.IGNORECASE)
        return sanitized

输入过滤和输出审查是生产环境的最低要求。即使 100% 信任 prompt 和模型,用户输入和模型输出都可能出现意外情况。这个责任链(输入过滤 -> 执行 -> 输出审查)可以用装饰器模式封装,不侵入业务逻辑。


成本控制:Token 预算管理

一个 Agent 调用正常花 0.10,失控时可能花5.00。成本控制体系确保每个 Agent 执行都有预算限制。

python
"""
Agent 成本控制 —— Token 预算管理

Agent 每次执行都要记录 token 消耗,
并在接近预算上限时发出警告、超过预算时终止。

三层控制:
1. 单次执行预算——一次 Agent.run() 最多花多少 token
2. 单步预算——一次 LLM 调用最多花多少 token
3. 日均预算——一天内所有 Agent 执行总共花多少 token
"""

import time
from dataclasses import dataclass


@dataclass
class TokenBudget:
    """Token 预算配置"""
    # 单次执行预算
    per_execution: int = 10000  # 一次 run() 最多 10K token
    # 单步预算
    per_step: int = 3000  # 一次 LLM 调用最多 3K token
    # 日均预算
    daily_total: int = 1000000  # 一天最多 1M token


class TokenTracker:
    """
    Token 追踪器——记录和管理 token 消耗
    """

    def __init__(self, budget: TokenBudget):
        self.budget = budget
        self.execution_tokens: int = 0
        self.daily_tokens: int = 0
        self.step_tokens: int = 0
        self.step_count: int = 0
        self._daily_reset_time: float = 0  # 下次日均重置时间戳

    def record_step(self, prompt_tokens: int, completion_tokens: int) -> dict:
        """
        记录一次 LLM 调用的 token 消耗

        返回:{"allowed": bool, "reason": str, "remaining": int}
        """
        step_total = prompt_tokens + completion_tokens
        self.step_tokens = step_total
        self.execution_tokens += step_total
        self.step_count += 1

        # 检查单步预算
        if step_total > self.budget.per_step:
            return {
                "allowed": True,  # 允许,但发出警告
                "reason": f"单步 token ({step_total}) 超过预算 ({self.budget.per_step})",
                "severity": "warning",
            }

        # 检查单次执行预算
        if self.execution_tokens > self.budget.per_execution:
            return {
                "allowed": False,
                "reason": f"执行 token ({self.execution_tokens}) 超过预算 ({self.budget.per_execution})",
                "severity": "error",
            }

        # 检查日均预算
        self._check_daily_reset()
        if self.daily_tokens + step_total > self.budget.daily_total:
            return {
                "allowed": False,
                "reason": f"日均 token 即将耗尽",
                "severity": "error",
            }
        self.daily_tokens += step_total

        remaining = self.budget.per_execution - self.execution_tokens
        return {
            "allowed": True,
            "reason": "",
            "severity": "ok",
            "remaining": remaining,
        }

    def _check_daily_reset(self) -> None:
        """如果到了新的一天,重置日均计数"""
        now = time.time()
        if now > self._daily_reset_time:
            self.daily_tokens = 0
            # 设置为明天的同一时间
            self._daily_reset_time = now + 86400

    def reset_execution(self) -> None:
        """重置单次执行计数(一次 run() 结束后调用)"""
        self.execution_tokens = 0
        self.step_tokens = 0
        self.step_count = 0

    def summary(self) -> dict:
        """生成消耗摘要"""
        return {
            "execution_tokens": self.execution_tokens,
            "execution_remaining": max(0, self.budget.per_execution - self.execution_tokens),
            "step_count": self.step_count,
            "daily_tokens": self.daily_tokens,
            "daily_remaining": max(0, self.budget.daily_total - self.daily_tokens),
            "daily_usage_pct": round(self.daily_tokens / self.budget.daily_total * 100, 1),
        }


# ==================== 在 Agent 中集成 ====================

class BudgetAwareAgent:
    """带预算控制的 Agent"""

    def __init__(self, agent, tracker: TokenTracker):
        self.agent = agent
        self.tracker = tracker

    def run(self, goal: str) -> dict:
        self.tracker.reset_execution()

        # 执行过程中每次 LLM 调用都记录 token
        result = self.agent.run(goal)

        # 返回结果包含消耗信息
        return {
            "result": result,
            "cost": self.tracker.summary(),
        }

    def llm_call(self, prompt: str, **kwargs) -> dict:
        """包装 LLM 调用,记录 token"""
        response = self.agent._llm.generate(prompt, **kwargs)

        # 估算 token(实际应该从 API 响应中获取准确值)
        prompt_tokens = len(prompt) // 3
        completion_tokens = len(response) // 3

        check = self.tracker.record_step(prompt_tokens, completion_tokens)
        if not check["allowed"]:
            return {
                "error": f"Token 预算超出: {check['reason']}",
                "stopped": True,
            }

        return {"data": response, "cost": {"prompt_tokens": prompt_tokens,
                                           "completion_tokens": completion_tokens}}

LLM 的 token 成本是实打实的钱。一个陷入循环的 Agent 或者 prompt 越来越长的 Agent,可以在短时间内花掉大量预算。预算控制是生产化的最低要求之一,没有的话就等于开着水龙头出门。


灰度发布:新旧版本并行验证

Agent 版本升级比传统软件风险更高——你不可能用单元测试覆盖所有 LLM 行为。新 prompt、新工具、新模型的组合,在大多数情况下可能表现更好,但在某些边界情况下可能更差。灰度发布让你用真实流量逐步验证。

python
"""
Agent 灰度发布——新旧版本并行验证

Agent 版本的升级不是"替换代码重启"那么简单。
新 prompt、新工具、新模型的组合,可能在大多数情况下表现更好,
但在某些边界情况下表现更差。
灰度发布让你在新版本和旧版本之间逐步迁移,而不是突然切换。
"""

import random
from dataclasses import dataclass


@dataclass
class AgentVersion:
    """Agent 版本定义"""
    name: str  # 如 "v1.2.3", "candidate-20250101"
    agent: object  # Agent 实例
    traffic_weight: float = 0.0  # 流量权重 0-1
    is_stable: bool = False  # 是否是稳定版本


class AgentCanaryDeployer:
    """
    Agent 灰度发布管理器

    核心逻辑:
    1. 维护多个 Agent 版本
    2. 根据流量权重分配请求
    3. 收集每个版本的输出和评估结果
    4. 支持逐步提升新版本权重或回滚
    """

    def __init__(self):
        self.versions: list[AgentVersion] = []
        self.eval_results: dict[str, list[dict]] = {}  # version_name -> eval results

    def add_version(self, version: AgentVersion) -> None:
        """添加新版本"""
        # 验证权重总和不超过 1
        total_weight = sum(v.traffic_weight for v in self.versions) + version.traffic_weight
        if total_weight > 1.0:
            raise ValueError(f"流量权重总和超过 1.0: {total_weight}")
        self.versions.append(version)
        self.eval_results[version.name] = []

    def remove_version(self, name: str) -> None:
        """移除版本"""
        self.versions = [v for v in self.versions if v.name != name]
        self.eval_results.pop(name, None)

    def route(self, input_text: str) -> tuple[str, object]:
        """
        路由请求到某个版本

        返回:(version_name, agent_instance)
        """
        # 根据权重随机选择
        rand = random.random()
        cumulative = 0
        for version in self.versions:
            cumulative += version.traffic_weight
            if rand < cumulative:
                return version.name, version.agent

        # fallback 到稳定版本
        for v in self.versions:
            if v.is_stable:
                return v.name, v.agent

        return self.versions[0].name, self.versions[0].agent

    def record_eval(self, version_name: str, result: dict) -> None:
        """记录某个版本的评估结果"""
        self.eval_results.setdefault(version_name, []).append(result)

    def compare_versions(self) -> dict:
        """对比所有版本的表现"""
        comparison = {}
        for name, results in self.eval_results.items():
            if not results:
                continue

            pass_rates = [r.get("pass_rate", 0) for r in results if "pass_rate" in r]
            avg_pass = sum(pass_rates) / len(pass_rates) if pass_rates else 0

            latencies = [r.get("avg_latency_ms", 0) for r in results if "avg_latency_ms" in r]
            avg_latency = sum(latencies) / len(latencies) if latencies else 0

            comparison[name] = {
                "eval_count": len(results),
                "avg_pass_rate": round(avg_pass, 1),
                "avg_latency_ms": round(avg_latency, 1),
                "traffic_weight": next(
                    (v.traffic_weight for v in self.versions if v.name == name), 0
                ),
            }

        return comparison

    def promote(self, version_name: str, new_weight: float) -> None:
        """提升某个版本的流量权重"""
        for v in self.versions:
            if v.name == version_name:
                old_weight = v.traffic_weight
                v.traffic_weight = new_weight
                # 从其他非稳定版本扣除权重
                for other in self.versions:
                    if other.name != version_name and not other.is_stable:
                        reduction = (other.traffic_weight / max(
                            sum(o.traffic_weight for o in self.versions if not o.is_stable and o.name != version_name), 1e-9
                        )) * (new_weight - old_weight)
                        other.traffic_weight = max(0, other.traffic_weight - reduction)
                break


# ==================== 使用示例 ====================

deployer = AgentCanaryDeployer()

# 稳定版本
deployer.add_version(AgentVersion(
    name="v1.0", agent=agent_v1, traffic_weight=0.95, is_stable=True,
))

# 候选版本
deployer.add_version(AgentVersion(
    name="v2.0-candidate", agent=agent_v2, traffic_weight=0.05,
))

# 路由请求
for _ in range(100):
    version_name, agent = deployer.route("测试输入")
    # ~95 个请求路由到 v1.0,~5 个路由到 v2.0-candidate

    # 记录评估结果
    deployer.record_eval(version_name, {"pass_rate": 85.0, "avg_latency_ms": 300})

# 对比版本
comparison = deployer.compare_versions()
print(comparison)
# 输出:
# {
#     "v1.0": {"eval_count": 95, "avg_pass_rate": 85.0, "traffic_weight": 0.95},
#     "v2.0-candidate": {"eval_count": 5, "avg_pass_rate": 92.0, "traffic_weight": 0.05},
# }

# 如果 v2.0 表现更好,逐步提升权重
deployer.promote("v2.0-candidate", 0.20)  # 5% -> 20%
deployer.promote("v2.0-candidate", 0.50)  # 20% -> 50%
deployer.promote("v2.0-candidate", 1.00)  # 50% -> 100%(完全切换)

路由逻辑本身是策略模式的简单应用——根据权重选择版本。重点是评估数据的收集和对比,有了这些数据才能决定是继续提升权重还是回滚。


生产化的核心难点

生产化最难的部分不是加评估、加监控、加安全,而是接受"Agent 不可能 100% 可靠"这个事实。

传统软件输入 A 一定输出 B。Agent 输入 A,可能输出 B(90%)、输出 C(5%)、输出 D(3%)、乱输出(2%)。所以生产化最核心的工作是设计容错机制——当 Agent 输出不对时,系统怎么优雅处理?重试?回退到旧版本?转人工?

这些在原型阶段不需要全部考虑,但容错接口要预留好。


业界实践参考

公司/项目生产化实践参考来源
Anthropic分层 Context + 渐进式工具披露 + 评估框架Building Effective Agents 博客
Google ADKAgent 版本管理 + 可观测性 + 安全护栏Google Developers Blog: 5 lessons from refactoring a monolith
OpenAIAssistant API 的 Thread 管理 + Tool Use 标准化New tools for building agents
LangSmithAgent 追踪 + 评估数据集 + prompt 版本管理LangSmith 官方文档
ZenML (Vertex AI)元提示技术 + 多层安全护栏 + 生产部署Lessons Learned from Production AI Agent Deployments

从原型到生产:检查清单

阶段一:原型验证(Week 1-2)

  • [ ] Agent 能完成核心任务(至少 80% 的测试用例通过)
  • [ ] 工具设计遵循单一职责、自描述原则
  • [ ] Context 分层注入(核心指令、技能、记忆、任务)
  • [ ] 有基本的日志输出(print 级别)

阶段二:工程化(Week 3-4)

  • [ ] 评估体系:测试集 >= 20 个用例,通过率 >= 80%
  • [ ] 可观测性:完整的 trace_id 追踪,每步有 Span
  • [ ] 安全防护:输入过滤 + 输出审查
  • [ ] 成本控制:Token 预算 + 超限终止
  • [ ] 错误处理:工具返回结构化错误 + suggestion

阶段三:生产就绪(Week 5-6)

  • [ ] 延迟达标:P99 < 5s(或业务要求)
  • [ ] 灰度发布:新旧版本并行验证
  • [ ] 监控告警:错误率、延迟、成本的实时告警
  • [ ] 回滚机制:版本快速回退
  • [ ] 文档:Agent 行为说明、工具文档、运维手册

写在最后

Agent 生产化是 Agent 工程中最"脏"、最"苦"、但最关键的环节。它没有酷炫的架构设计、没有精巧的执行模式,只有评估、监控、安全、成本这些基础工作。但没有这些,Agent 永远只是 demo,不是产品。

Anthropic、Google、OpenAI 在各自的生产实践博客中反复强调同一个观点:从原型到生产的鸿沟比大多数团队想象的要大得多。填平这个鸿沟需要的不是更好的模型,而是更扎实的工程实践。