基于大模型的分布式事务补偿策略自动生成:从异常模式到恢复方案

一、分布式事务补偿的工程困境:手动编写补偿逻辑的脆弱性

在微服务架构中,分布式事务是不可避免的难题。Saga 模式是最常用的补偿事务方案——将长事务拆分为多个本地事务,每个本地事务对应一个补偿操作。当某个步骤失败时,按逆序执行已完成步骤的补偿操作,回滚到一致状态。

但 Saga 模式的工程实现有一个核心痛点:补偿逻辑需要手动编写,而且必须覆盖所有可能的失败场景。在一个包含 10 个步骤的 Saga 中,第 5 步失败时,需要依次补偿第 4、3、2、1 步。每一步的补偿逻辑都需要考虑:补偿操作本身也可能失败、补偿时数据状态可能已经变化、补偿顺序可能影响最终一致性。

在存储部的实际业务中,一个订单创建流程涉及库存扣减、优惠券锁定、积分预扣、支付创建、物流预分配等 8 个步骤,对应的补偿逻辑超过 2000 行代码。每次新增步骤或修改业务规则,都需要同步更新补偿逻辑,遗漏一个场景就可能导致数据不一致。

基于大模型的补偿策略自动生成方案,通过分析业务流程的定义和异常模式,自动生成补偿逻辑代码。这不是替代工程师的架构设计,而是将机械性的补偿代码编写自动化,减少人为遗漏。

二、补偿策略生成的底层机制

2.1 业务流程的形式化描述

要自动生成补偿策略,首先需要将业务流程形式化描述。我们定义了一种基于 JSON 的流程描述语言,每个步骤包含四个要素:

  • 正向操作:步骤的正常执行逻辑(如"扣减库存")
  • 前置条件:执行前必须满足的条件(如"库存充足")
  • 后置状态:执行后的状态变更(如"库存 -N")
  • 幂等键:用于防止重复执行的业务唯一标识
flowchart TD
    A[业务流程 JSON 定义] --> B[LLM 分析正向操作语义]
    B --> C[推断每个步骤的补偿语义]
    C --> D[生成补偿操作代码]
    D --> E[异常模式匹配]
    E --> F[补充边界条件处理]
    F --> G[输出完整补偿链路]

    subgraph 补偿语义推断
        H[扣减 → 恢复]
        I[锁定 → 释放]
        J[创建 → 删除/标记无效]
        K[预扣 → 回退]
    end

    subgraph 异常模式库
        L[补偿操作失败]
        M[数据状态已变化]
        N[并发补偿冲突]
        O[超时未确认]
    end

2.2 补偿语义推断

大模型的核心能力在于理解正向操作的语义,并推断出对应的补偿操作。常见的语义映射关系:

正向操作 补偿操作 语义依据
扣减库存 N 恢复库存 N 数量反向操作
锁定优惠券 释放优惠券锁定 状态回退
预扣积分 N 回退积分 N 数量反向操作
创建支付单 关闭支付单 生命周期终止
分配物流单 取消物流单 生命周期终止

对于复杂的正向操作(如"调用第三方风控接口"),补偿语义不是简单的反向操作,而是需要根据接口的幂等性和状态机来设计。大模型需要结合接口文档和业务上下文来推断补偿策略。

2.3 异常模式与边界条件

补偿策略不仅要处理"正向操作失败"的场景,还要处理"补偿操作本身失败"的场景。常见的异常模式:

  • 补偿失败:补偿操作调用下游服务超时或返回错误。需要重试机制和人工介入兜底。
  • 状态已变化:补偿时发现数据已被其他事务修改。需要乐观锁或条件更新。
  • 并发补偿:多个 Saga 实例同时补偿同一个资源。需要幂等性保证。
  • 超时未确认:Saga 协调器与参与者之间的心跳超时。需要超时检测和自动补偿触发。

三、生产级代码实现

3.1 业务流程定义与补偿策略生成

import json
from dataclasses import dataclass, field
from typing import List, Optional, Dict

@dataclass
class SagaStep:
    """Saga 步骤定义"""
    name: str
    action_service: str       # 正向操作的服务名
    action_method: str        # 正向操作的方法名
    action_params: Dict       # 正向操作的参数
    idempotent_key: str       # 幂等键表达式
    precondition: str         # 前置条件描述
    postcondition: str        # 后置状态描述
    compensate_hint: str = "" # 补偿提示(可选,帮助 LLM 理解补偿语义)

@dataclass
class CompensateAction:
    """自动生成的补偿操作"""
    step_name: str
    compensate_service: str
    compensate_method: str
    compensate_params: Dict
    idempotent_key: str
    retry_policy: Dict
    fallback: str  # 补偿失败时的兜底策略

class CompensationGenerator:
    """基于 LLM 的补偿策略生成器"""

    def __init__(self, llm_client):
        self.llm = llm_client

    def generate(
        self, steps: List[SagaStep]
    ) -> List[CompensateAction]:
        """为 Saga 步骤列表生成补偿策略"""
        # 构造 LLM 提示词
        prompt = self._build_prompt(steps)

        # 调用 LLM 生成补偿策略
        response = self.llm.chat(prompt)
        compensate_actions = self._parse_response(response)

        # 校验生成的补偿策略
        validated = self._validate(steps, compensate_actions)

        return validated

    def _build_prompt(self, steps: List[SagaStep]) -> str:
        """构造 LLM 提示词"""
        steps_desc = []
        for i, step in enumerate(steps):
            steps_desc.append(
                f"步骤 {i+1}: {step.name}\n"
                f"  正向操作: {step.action_service}.{step.action_method}\n"
                f"  参数: {json.dumps(step.action_params, ensure_ascii=False)}\n"
                f"  前置条件: {step.precondition}\n"
                f"  后置状态: {step.postcondition}\n"
                f"  幂等键: {step.idempotent_key}\n"
                + (f"  补偿提示: {step.compensate_hint}\n" if step.compensate_hint else "")
            )

        return (
            "你是一个分布式事务补偿策略专家。"
            "根据以下 Saga 步骤定义,为每个步骤生成补偿操作。\n\n"
            "要求:\n"
            "1. 补偿操作必须是幂等的(可安全重试)\n"
            "2. 补偿参数必须从正向操作的参数和返回值中推导\n"
            "3. 每个补偿操作需要指定重试策略和失败兜底方案\n"
            "4. 输出 JSON 格式\n\n"
            f"Saga 步骤定义:\n{''.join(steps_desc)}\n\n"
            "输出格式:\n"
            "[{\"step_name\": \"...\", \"compensate_service\": \"...\", "
            "\"compensate_method\": \"...\", \"compensate_params\": {...}, "
            "\"idempotent_key\": \"...\", \"retry_policy\": {...}, "
            "\"fallback\": \"...\"}]"
        )

    def _parse_response(self, response: str) -> List[CompensateAction]:
        """解析 LLM 返回的 JSON"""
        # 提取 JSON 部分
        json_str = response.strip()
        if json_str.startswith("```"):
            json_str = json_str.split("```")[1]
            if json_str.startswith("json"):
                json_str = json_str[4:]
        data = json.loads(json_str)
        return [
            CompensateAction(
                step_name=item["step_name"],
                compensate_service=item["compensate_service"],
                compensate_method=item["compensate_method"],
                compensate_params=item["compensate_params"],
                idempotent_key=item["idempotent_key"],
                retry_policy=item["retry_policy"],
                fallback=item["fallback"],
            )
            for item in data
        ]

    def _validate(
        self,
        steps: List[SagaStep],
        actions: List[CompensateAction],
    ) -> List[CompensateAction]:
        """校验补偿策略的完整性"""
        step_names = {s.name for s in steps}
        action_names = {a.step_name for a in actions}

        # 检查是否每个步骤都有对应的补偿操作
        missing = step_names - action_names
        if missing:
            raise ValueError(f"缺少补偿操作的步骤: {missing}")

        # 检查幂等键是否已定义
        for action in actions:
            if not action.idempotent_key:
                raise ValueError(f"步骤 {action.step_name} 的补偿操作缺少幂等键")

        return actions

3.2 Saga 执行器与补偿触发

import time
from enum import Enum

class StepStatus(Enum):
    PENDING = "pending"
    EXECUTING = "executing"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    FAILED = "failed"

@dataclass
class SagaInstance:
    """Saga 实例的运行时状态"""
    saga_id: str
    steps: List[SagaStep]
    compensate_actions: List[CompensateAction]
    step_statuses: List[StepStatus] = field(default_factory=list)
    step_results: List[Optional[Dict]] = field(default_factory=list)

class SagaExecutor:
    """Saga 执行器:正向执行 + 自动补偿"""

    def __init__(self, max_retries: int = 3, retry_delay_ms: int = 500):
        self.max_retries = max_retries
        self.retry_delay_ms = retry_delay_ms

    def execute(self, instance: SagaInstance) -> Dict:
        """执行 Saga 实例,失败时自动触发补偿"""
        instance.step_statuses = [StepStatus.PENDING] * len(instance.steps)
        instance.step_results = [None] * len(instance.steps)

        # 正向执行
        for i, step in enumerate(instance.steps):
            instance.step_statuses[i] = StepStatus.EXECUTING
            try:
                result = self._execute_step(step, instance.step_results)
                instance.step_statuses[i] = StepStatus.COMPLETED
                instance.step_results[i] = result
            except Exception as e:
                instance.step_statuses[i] = StepStatus.FAILED
                # 触发补偿:逆序补偿已完成的步骤
                self._compensate(instance, i)
                return {"status": "failed", "failed_step": step.name, "error": str(e)}

        return {"status": "completed", "results": instance.step_results}

    def _compensate(self, instance: SagaInstance, failed_index: int) -> None:
        """逆序补偿已完成的步骤"""
        for i in range(failed_index - 1, -1, -1):
            if instance.step_statuses[i] != StepStatus.COMPLETED:
                continue
            instance.step_statuses[i] = StepStatus.COMPENSATING
            compensate = instance.compensate_actions[i]

            # 带重试的补偿执行
            success = self._execute_with_retry(compensate, instance.step_results[i])
            if success:
                instance.step_statuses[i] = StepStatus.COMPENSATED
            else:
                # 补偿失败,执行兜底策略
                self._handle_compensation_failure(compensate, instance.saga_id)

    def _execute_with_retry(
        self, action: CompensateAction, forward_result: Optional[Dict]
    ) -> bool:
        """带重试的补偿执行"""
        for attempt in range(self.max_retries):
            try:
                # 构造补偿参数:结合正向操作的返回值
                params = self._resolve_params(action.compensate_params, forward_result)
                # 调用补偿服务(实际通过 HTTP/RPC 调用)
                # result = http_post(f"{action.compensate_service}/{action.compensate_method}", params)
                return True
            except Exception:
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay_ms * (attempt + 1) / 1000)
        return False

    @staticmethod
    def _resolve_params(template: Dict, forward_result: Optional[Dict]) -> Dict:
        """解析补偿参数模板,替换正向操作的返回值引用"""
        if forward_result is None:
            return template
        resolved = {}
        for key, value in template.items():
            if isinstance(value, str) and value.startswith("$forward."):
                # 从正向操作返回值中提取字段
                field_name = value[len("$forward."):]
                resolved[key] = forward_result.get(field_name)
            else:
                resolved[key] = value
        return resolved

    @staticmethod
    def _handle_compensation_failure(action: CompensateAction, saga_id: str) -> None:
        """补偿失败的兜底处理"""
        # 记录到补偿失败表,等待人工介入
        # INSERT INTO compensation_failures (saga_id, step_name, fallback, created_at)
        # VALUES (saga_id, action.step_name, action.fallback, NOW())
        pass

四、Trade-offs:自动生成补偿策略的风险

4.1 生成准确性的不确定性

大模型生成的补偿策略可能存在语义错误。例如,对于"调用第三方风控接口"这类操作,大模型可能无法准确推断补偿语义——风控接口可能不支持回滚,补偿策略只能是"记录日志 + 人工审核"。解决方案是引入人工审核环节:LLM 生成补偿策略后,由工程师审核确认再上线。

4.2 补偿链路的可测试性

自动生成的补偿逻辑需要充分的测试覆盖。建议使用混沌工程方法:在测试环境中随机注入故障(服务超时、网络分区、数据库死锁),验证补偿链路是否能正确回滚。自动生成的补偿代码必须通过与手写代码相同级别的测试。

4.3 适用边界

补偿策略自动生成适用于以下场景:正向操作的语义清晰、补偿操作是简单的反向操作(扣减→恢复、锁定→释放)、业务流程变更频繁需要快速更新补偿逻辑。不适用于:补偿语义复杂的场景(如涉及人工审批流程)、对数据一致性要求极高(需要强一致性事务)、补偿操作依赖外部系统且无法验证幂等性。

五、总结

基于大模型的补偿策略自动生成,将机械性的补偿代码编写自动化,但需要人工审核兜底。核心落地步骤如下:

  1. 形式化描述业务流程:用 JSON 定义每个 Saga 步骤的正向操作、前置条件和后置状态。
  2. LLM 生成补偿策略:基于语义映射和业务上下文,为每个步骤生成补偿操作。
  3. 人工审核补偿逻辑:重点检查补偿语义的正确性和幂等性保证。
  4. 混沌测试验证:在测试环境中注入故障,验证补偿链路的完整性。
  5. 监控补偿失败:上线后持续监控补偿失败表,及时处理人工介入场景。

自动生成补偿策略的目标不是消除人工,而是将人工从"编写补偿代码"转移到"审核补偿逻辑"上——后者才是真正需要工程师判断力的环节。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐