跳到正文

生产可靠性模式

Claude 驱动的系统的故障方式与传统软件不同。模型可能产生语法正确但语义错误的输出,可能在无法继续推进时拒绝继续,或者在源数据不完整时产生部分结果。本指南涵盖在生产环境中应对这些故障模式的可靠性模式。


目录

  1. 升级设计
  2. 熔断器模式
  3. 结构化错误传播
  4. 部分结果与覆盖率注释
  5. 结构化人工交接
  6. 来源冲突解决
  7. 反模式
  8. 参见

升级设计

最常见的可靠性错误是将 LLM 置信度分数作为主要的升级信号。置信度分数不是经过校准的概率:模型可能在输出高置信度值的同时,在事实上是错误的。生产环境的升级应由程序化信号驱动,而非数字置信度值。

三种标准升级触发条件

在一个设计良好的系统中,当且仅当以下三种条件之一满足时,才会触发升级。

1. 用户明确请求: 客户明确要求转接人工。这是不可商量的,优先于所有其他信号,包括情绪检测。"我要和真人说话"与愤怒的语气是不同的信号,无论 AI 接下来可以做什么,都需要立即交接。

2. 策略空缺: 请求超出了系统的既定范围,无法通过任何可用工具或知识处理。这不是模型故障;这是范围边界。正确的响应是带上下文的结构化升级,而不是道歉循环。

3. 无法推进: 经过规定次数的尝试或工具调用后,系统仍未产生有效结果。这通过程序化方式衡量:重试预算耗尽、熔断器打开或验证循环失败。

程序化升级信号

Python
from enum import Enum
from dataclasses import dataclass

class EscalationReason(Enum):
    EXPLICIT_REQUEST = "explicit_request"
    POLICY_GAP = "policy_gap"
    MAX_RETRIES_EXCEEDED = "max_retries_exceeded"
    CIRCUIT_BREAKER_OPEN = "circuit_breaker_open"
    MISSING_REQUIRED_TOOL = "missing_required_tool"

@dataclass
class EscalationSignal:
    should_escalate: bool
    reason: EscalationReason | None
    context: dict

def evaluate_escalation(
    user_message: str,
    attempt_count: int,
    policy_coverage: str,  # "covered" | "gap" | "out_of_scope"
    max_attempts: int = 3
) -> EscalationSignal:

    # 优先级 1:用户明确请求,无条件优先检查
    if contains_explicit_escalation_request(user_message):
        return EscalationSignal(
            should_escalate=True,
            reason=EscalationReason.EXPLICIT_REQUEST,
            context={"trigger": "user_stated_intent"}
        )

    # 优先级 2:策略空缺
    if policy_coverage in ("gap", "out_of_scope"):
        return EscalationSignal(
            should_escalate=True,
            reason=EscalationReason.POLICY_GAP,
            context={"coverage": policy_coverage}
        )

    # 优先级 3:无法推进
    if attempt_count >= max_attempts:
        return EscalationSignal(
            should_escalate=True,
            reason=EscalationReason.MAX_RETRIES_EXCEEDED,
            context={"attempts": attempt_count, "max": max_attempts}
        )

    return EscalationSignal(should_escalate=False, reason=None, context={})

def contains_explicit_escalation_request(message: str) -> bool:
    explicit_phrases = [
        "speak to a human", "talk to a person", "real agent",
        "human agent", "transfer me", "escalate this",
        "supervisor", "manager"
    ]
    message_lower = message.lower()
    return any(phrase in message_lower for phrase in explicit_phrases)

情绪不满 vs 明确升级请求

这是两种截然不同的信号,需要完全相反的响应。情绪不满(愤怒语气、重复提问、"这没用")是一个信号,需要以同理心回应并尝试不同的方式。明确的升级请求("我要和真人说话")是一个信号,需要立即交接。

混淆两者是一个常见错误,会产生真实后果:将只是想要更好答案的沮丧用户路由给人工,或者在用户已经决定要人工客服时还在继续努力。

Python
def classify_user_signal(message: str) -> dict:
    # 这两类模式可以共存:分别独立检查
    frustration_markers = [
        "this is ridiculous", "not helpful", "keep asking",
        "not answering", "useless", "terrible", "waste"
    ]
    escalation_markers = [
        "speak to a human", "real person", "agent", "transfer",
        "supervisor", "escalate", "don't want ai"
    ]

    message_lower = message.lower()

    return {
        "frustrated": any(m in message_lower for m in frustration_markers),
        "wants_escalation": any(m in message_lower for m in escalation_markers)
    }

# 用法:
signals = classify_user_signal(user_message)

if signals["wants_escalation"]:
    initiate_human_handoff(context)   # 立即、无条件
elif signals["frustrated"]:
    adjust_response_approach()        # 更有同理心,换个角度

从结构化输出进行基于规则的路由

在可能的情况下,从结构化输出字段而非模型级置信度派生升级决策。如果模型产生了带有 policy_gap: truerequires_human_review: true 字段的结构化结果,这些字段就是确定性的路由信号,无需解释任何置信度分数。

Python
@dataclass
class AgentDecision:
    response_text: str
    policy_gap: bool
    requires_human_review: bool
    coverage_level: str  # "full" | "partial" | "none"
    missing_information: list[str]

def route_from_structured_output(decision: AgentDecision) -> str:
    if decision.policy_gap or decision.coverage_level == "none":
        return "escalate"
    if decision.requires_human_review or decision.coverage_level == "partial":
        return "queue_for_review"
    return "respond"

熔断器模式

熔断器防止故障依赖项(API、工具、外部服务)造成级联故障。没有它,每次触碰故障服务的智能体调用都会挂起直到超时,消耗资源并降低整个管道的性能。

Python
import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"        # 正常运行
    OPEN = "open"            # 故障中,立即拒绝调用
    HALF_OPEN = "half_open"  # 测试是否恢复

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: float | None = None

    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_recovery():
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise CircuitOpenError(
                    f"Circuit open since {self.last_failure_time:.0f}. "
                    f"Recovery in {self._time_until_recovery():.0f}s"
                )

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception:
            self._on_failure()
            raise

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _should_attempt_recovery(self) -> bool:
        if self.last_failure_time is None:
            return True
        return time.time() - self.last_failure_time >= self.recovery_timeout

    def _time_until_recovery(self) -> float:
        if self.last_failure_time is None:
            return 0.0
        return max(0.0, self.recovery_timeout - (time.time() - self.last_failure_time))

class CircuitOpenError(Exception):
    pass

批量管道中的文档级隔离

在批量处理管道中,每个文档应有其自己的错误边界。一个文档的失败不应中止剩余批次。

Python
def process_document_batch(documents: list[str], processor) -> list[dict]:
    results = []
    circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)

    for doc_id, document in enumerate(documents):
        try:
            result = circuit_breaker.call(processor, document)
            results.append({"doc_id": doc_id, "status": "success", "result": result})
        except CircuitOpenError as e:
            # 熔断器打开:跳过剩余文档并向上游报告该状态
            results.append({"doc_id": doc_id, "status": "circuit_open", "error": str(e)})
            for remaining_id in range(doc_id + 1, len(documents)):
                results.append({
                    "doc_id": remaining_id,
                    "status": "skipped",
                    "reason": "circuit_open"
                })
            break
        except Exception as e:
            results.append({"doc_id": doc_id, "status": "error", "error": str(e)})

    return results

熔断器在批次级别关闭,而非文档级别。当它打开时,你希望知道的是底层服务不可用,而不是三个单独的文档碰巧依次失败了。


结构化错误传播

在多智能体系统中,作为通用异常字符串传递的错误会丢失恢复所需的上下文。结构化错误携带了上游编排器决定是否重试、重新路由或升级所需的信息。

Python
from dataclasses import dataclass

@dataclass
class StructuredAgentError:
    error_category: str           # "tool_failure" | "validation_error" | "policy_gap" | "timeout"
    is_retryable: bool
    failure_type: str             # 该类别内的具体子类型
    attempted_query: str | None   # 尝试了什么(用于调试)
    partial_results: dict | None  # 失败前产生的任何可用输出
    alternative_approach: str | None  # 给编排器的建议
    error_message: str
    attempt_count: int = 1

    def to_dict(self) -> dict:
        return {
            "error_category": self.error_category,
            "is_retryable": self.is_retryable,
            "failure_type": self.failure_type,
            "attempted_query": self.attempted_query,
            "has_partial_results": self.partial_results is not None,
            "alternative_approach": self.alternative_approach,
            "error_message": self.error_message,
            "attempt_count": self.attempt_count
        }

# 在子智能体中的用法:
def search_database(query: str) -> dict:
    try:
        return db.search(query)
    except DatabaseTimeoutError:
        raise StructuredAgentError(
            error_category="tool_failure",
            is_retryable=True,
            failure_type="database_timeout",
            attempted_query=query,
            partial_results=None,
            alternative_approach="retry with narrower query or use cached results",
            error_message="Database query timed out after 30s"
        )
    except NoResultsError:
        raise StructuredAgentError(
            error_category="tool_failure",
            is_retryable=False,
            failure_type="no_results",
            attempted_query=query,
            partial_results=None,
            alternative_approach="try broader search terms or escalate to human",
            error_message=f"No results found for query: {query}"
        )

# 编排器处理:
def orchestrate_with_recovery(agent_fn, query: str, max_retries: int = 2) -> dict:
    for attempt in range(max_retries + 1):
        try:
            return agent_fn(query)
        except StructuredAgentError as e:
            if not e.is_retryable or attempt == max_retries:
                return {
                    "status": "failed",
                    "error": e.to_dict(),
                    "partial_results": e.partial_results,
                    "requires_escalation": not e.is_retryable
                }
            # 可重试:如果有建议的替代方案,应用它
            if e.alternative_approach:
                query = refine_query(query, e.alternative_approach)

关键字段是 is_retryable。无法判断是重试还是升级的编排器会默认对所有错误进行重试,这会在不可重试的错误上浪费预算,并错过瞬态错误的处理窗口。


部分结果与覆盖率注释

当管道无法完全完成任务时,返回带有明确覆盖率注释的部分结果,比什么都不返回更有用。消费者可以判断部分结果是否可操作,而无需了解失败内部原理。

Python
from dataclasses import dataclass

@dataclass
class AnalysisResult:
    full_analysis: str | None
    section_results: dict[str, dict]  # section_id -> result
    coverage_summary: dict

def annotate_coverage(section_results: dict) -> dict:
    coverage_map = {}
    for section_id, result in section_results.items():
        if result.get("status") == "success" and result.get("confidence", 0) >= 0.8:
            coverage_map[section_id] = "well-supported"
        elif result.get("status") == "success":
            coverage_map[section_id] = "partially-supported"
        else:
            coverage_map[section_id] = "gap"

    counts = {"well-supported": 0, "partially-supported": 0, "gap": 0}
    for v in coverage_map.values():
        counts[v] += 1

    return {
        "sections": coverage_map,
        "counts": counts,
        "overall_coverage": counts["well-supported"] / len(coverage_map) if coverage_map else 0.0,
        "has_gaps": counts["gap"] > 0
    }

在响应中展示覆盖率注释,让下游消费者无需解析内部状态就能采取行动:

Plain
合同分析摘要

第 1 节(付款条款):有充分依据(完整分析可用)
第 2 节(责任条款):部分依据(分析基于部分文本;建议人工审查)
第 3 节(终止权利):缺口(来源文本不可读;需人工审查)

总体覆盖率:67%(3 节中的 2 节已完整分析)

此模式适用于管道可能产生不完整输出的任何地方:文档提取、多来源研究、批量翻译,或某些条款缺少支持数据的合规检查。


结构化人工交接

当升级发生时,人工客服接收的是结构化数据包,而非原始对话历史。目标是人工客服能在 30 秒内开始处理,而无需回读整个对话日志。

Python
from dataclasses import dataclass
from datetime import datetime

@dataclass
class HandoffPayload:
    customer_id: str
    session_id: str
    escalation_reason: str        # 来自 EscalationReason 枚举
    original_request: str         # 用户第一条消息的原文
    conversation_summary: str     # 3-5 句话的摘要
    root_cause: str               # AI 为何无法解决
    actions_taken: list[str]      # 尝试了什么
    recommended_next_action: str  # 对人工客服的具体建议
    partial_results: dict | None  # 产生的任何有用输出
    urgency: str                  # "high" | "medium" | "low"
    created_at: str               # ISO 8601

def build_handoff(
    session: dict,
    escalation_signal: EscalationSignal,
    conversation_history: list[dict]
) -> HandoffPayload:
    return HandoffPayload(
        customer_id=session["customer_id"],
        session_id=session["session_id"],
        escalation_reason=escalation_signal.reason.value,
        original_request=conversation_history[0]["content"] if conversation_history else "",
        conversation_summary=summarize_conversation(conversation_history),
        root_cause=derive_root_cause(escalation_signal),
        actions_taken=extract_actions_taken(session),
        recommended_next_action=suggest_next_action(escalation_signal),
        partial_results=session.get("partial_results"),
        urgency="high" if escalation_signal.reason == EscalationReason.EXPLICIT_REQUEST else "medium",
        created_at=datetime.utcnow().isoformat()
    )

交接展示格式

人工客服实际看到的内容应读起来像结构化简报,而不是数据转储:

Plain
升级简报:会话 a1b2c3d4
原因:客户请求人工客服
紧迫程度:高

原始请求
"我需要取消订阅,并退还上个月的费用"

发生了什么
客户请求取消订阅,并申请退还 5 月 18 日收取的费用。AI 确认了账户信息,
找到了该费用($49.00),但遇到了策略空缺:超过 7 天的费用退款需要人工授权。
发起了两次升级尝试;客户随后明确请求人工客服。

已采取的行动
- 账户已验证(customer_id: 88821)
- 已找到费用:$49.00(2026-05-18)
- 订阅状态:活跃

建议的下一步行动
授权退款 $49.00(该费用已过 6 天,在 7 天窗口期内),并处理取消。
无需额外验证。

recommended_next_action(建议的下一步行动)字段是最有价值的部分。收到具体建议的人工客服能更快解决问题,也更不可能要求客户重复提供信息。


来源冲突解决

当多个来源不一致时,解决策略取决于不一致的原因。时间差异(一个来源比另一个更新)与事实冲突(关于同一时间段的来源在事实上不一致)需要不同的处理方式。

Python
@dataclass
class Source:
    source_id: str
    content: str
    publication_date: str | None  # ISO 8601,时间消歧的必填项
    source_type: str              # "official" | "news" | "user_generated" | "internal"
    authority_score: float        # 0.0-1.0,领域特定

def resolve_conflict(sources: list[Source], field: str) -> dict:
    values = [s for s in sources if get_field_value(s, field) is not None]

    if len(values) <= 1:
        return {"resolved": values[0] if values else None, "conflict": False}

    # 首先检查时间差异
    dated = [s for s in values if s.publication_date is not None]
    if len(dated) == len(values):
        sorted_by_date = sorted(dated, key=lambda s: s.publication_date, reverse=True)
        newest = sorted_by_date[0]
        second_newest = sorted_by_date[1] if len(sorted_by_date) > 1 else None

        if (second_newest is not None and
                get_field_value(newest, field) == get_field_value(second_newest, field)):
            # 最近的两个来源一致:可能是正确的更新
            return {
                "resolved": newest,
                "conflict": False,
                "resolution_method": "temporal_precedence"
            }

    # 真正的事实冲突:不要自动解决
    return {
        "resolved": None,
        "conflict": True,
        "conflict_type": "factual",
        "conflicting_sources": [s.source_id for s in values],
        "requires_human_review": True
    }

始终在来源元数据中包含 publication_date。没有它,时间消歧就不可能完成,看起来像事实冲突的情况可能只是一个尚未停用的过时来源。

在输出中展示冲突

当真正的冲突无法自动解决时,明确展示它,而不是静默地选择一个来源:

Plain
字段:regulatory_status

来源 A(internal-policy-doc,2026-01-15):"已获 EU 市场批准"
来源 B(legal-review-2026,2026-03-22):"待重新批准:EU 监管更新进行中"

冲突类型:事实性(相同字段、相同司法管辖区、不同值)
解决方式:无法自动解决。使用此字段前需人工审查。

这比不注明来源直接返回单一答案要好。看到没有来源归因的自信答案的消费者,无法做出是否根据该答案采取行动的知情决策。


反模式

将置信度分数用作路由逻辑。 LLM 的置信度分数不是经过校准的概率。改为使用程序化信号:重试次数、熔断器状态、结构化输出字段。

将情绪不满与升级意图混淆。 沮丧的用户通常想要更好的答案,而不是人工。说"我要和真人说话"的用户始终想要人工。将它们视为具有不同响应的不同信号。

在智能体之间传递非结构化异常。"DatabaseError: connection refused" 这样的字符串对编排器来说无法告知是重试还是升级。使用带有 is_retryableerror_categoryalternative_approach 的结构化错误类型。

当存在部分结果时返回空响应。 带有明确覆盖率注释的部分结果几乎总是比带有错误消息的空响应更有用。消费者可以对"67% 覆盖率"做出决策;他们无法对"分析失败"做出任何决策。

在来源元数据中跳过 publication_date。 没有日期,时间冲突看起来就像事实冲突。每个被摄入管道的来源都应该携带日期,即使是近似日期。

将交接构建为对话转储。 将最后 20 轮对话粘贴到工单中不是交接;这是工作转移。带有 recommended_next_action 的结构化交接数据包,是区分好的升级系统与慢速系统的关键所在。


参见

  • 智能体团队工作流:编排器/子智能体架构和工具路由
  • 事件驱动的智能体自动化:基于触发器的自动化和重试循环
  • 任务管理工作流:多步骤智能体工作流的持久状态
  • 计划驱动开发:执行前的计划阶段,减少任务中途失败

来源:飞书 · AI Spark 知识库 | 原文(最新版):https://lcnniolukk80.feishu.cn/wiki/NTuswldwKid0zVkqRvgc5xBpn8f | 归档:2026-06-04