AI Agent Harness Engineering 的事务处理:保证操作的原子性

引言

背景介绍:AI Agent 时代的到来

近年来,以大语言模型(LLM)为核心的 AI Agent 技术正在深刻改变软件系统的构建方式。从自动化客服处理退款请求、代码助手自动修复并提交代码,到 DevOps Agent 完成环境搭建-测试执行-结果上报的全流程,AI Agent 正从“单一工具调用者”演变为“跨系统事务协调者”。

然而,当 AI Agent 的操作从“单步骤、单系统”扩展到“多步骤、跨工具、多系统”时,一个传统软件工程中早已面临的问题再次浮现:如何保证 Agent 执行的一系列操作是“原子性”的? 即:要么所有操作全部成功,系统状态发生预期变化;要么任一操作失败,所有已执行的操作都能被撤销,系统回到操作前的初始状态。

核心问题:AI Agent 操作的原子性困境

在传统软件系统中,我们可以依赖数据库事务、分布式事务协议(如 2PC、Saga)来保证原子性。但 AI Agent 的操作具有以下独特特点,使得传统方案难以直接套用:

  1. 操作异构性:Agent 可能调用数据库、API、文件系统、LLM 推理、Git 等完全不同的工具,这些工具通常不支持统一的事务接口;
  2. 不确定性:LLM 的推理结果、外部 API 的响应可能存在不确定性,导致操作失败的原因更加复杂;
  3. 动态性:Agent 的操作流程可能由 LLM 实时规划,而非预先定义,传统静态事务编排难以适配;
  4. 状态分散性:操作的状态可能分散在 Agent 内存、外部工具、数据库中,难以统一跟踪和回滚。

这些特点构成了本文要解决的核心问题:如何在 AI Agent Harness(Agent 的“ harness ”指支撑 Agent 运行、管理其生命周期和操作的框架层)中,设计一套通用的事务处理机制,以保证 Agent 多步骤操作的原子性?

文章脉络

本文将采用“深度剖析+实践落地”的结构,层层递进地讲解 AI Agent Harness 中原子性保证的原理与实现:

  1. 基础概念:明确 AI Agent、Harness Engineering、事务原子性等核心术语,建立知识基础;
  2. 核心原理解析:分析 AI Agent 操作的原子性挑战,详解状态机管理、补偿机制、事件溯源等核心保证机制;
  3. 数学模型与算法:用形式化语言描述原子性,设计状态机和补偿机制的算法流程并实现代码;
  4. 实践应用:通过退款处理、代码修改等实际场景,演示原子性机制的应用;
  5. 系统设计与实现:介绍一个开源 Agent Harness 项目的架构、接口和核心代码;
  6. 最佳实践与行业趋势:总结落地经验,展望 AI Agent 事务处理的未来发展。

基础概念:建立知识坐标系

在深入探讨原子性保证机制之前,我们需要先明确本文涉及的核心概念,为后续讨论建立统一的术语体系。

核心概念定义

1. AI Agent

AI Agent 是指能够感知环境、通过推理做出决策、并采取行动影响环境的自主实体。在工程化场景中,一个典型的 AI Agent 通常包含以下组件:

  • 感知模块:收集环境信息(如用户输入、系统状态、外部数据);
  • 推理模块:基于 LLM 或规则引擎,根据感知信息规划操作步骤;
  • 执行模块:调用工具/API 执行具体操作;
  • 记忆模块:存储历史对话、操作状态、环境上下文。

本文中的“Agent 操作”特指执行模块调用多个工具完成的一系列关联动作。

2. Agent Harness Engineering

“ Harness ”原意为“马具”,在软件工程中引申为“支撑系统运行的框架层”。Agent Harness Engineering 是一门研究如何设计、实现、优化 Agent 支撑框架的学科,其核心目标是解决 Agent 工程化落地中的共性问题,包括:

  • 生命周期管理(启动、暂停、终止、重启);
  • 操作编排与事务处理;
  • 状态持久化与恢复;
  • 错误处理与容错;
  • 监控与可观测性;
  • 安全与权限控制。

本文聚焦于 Harness 中的“事务处理”模块,尤其是“原子性保证”功能。

3. 事务处理与 ACID 特性

事务(Transaction)是指一组逻辑上相关的操作,这组操作要么全部成功执行,要么全部不执行。事务的核心特性由 ACID 四个字母概括:

  • A(Atomicity,原子性):事务是一个不可分割的工作单元,所有操作要么全部完成,要么全部回滚到初始状态;
  • C(Consistency,一致性):事务执行前后,系统从一个一致性状态转换到另一个一致性状态(即数据满足所有预定义的约束);
  • I(Isolation,隔离性):多个事务并发执行时,一个事务的中间状态不会被其他事务看到;
  • D(Durability,持久性):事务一旦提交,其对系统的修改就是永久的,即使系统崩溃也不会丢失。

在 AI Agent 场景中,我们首先需要保证的是原子性——因为如果操作不是原子的,一致性、隔离性、持久性都将无从谈起。

4. 原子性的形式化定义

为了更准确地理解原子性,我们可以用状态转换模型进行形式化描述(后续会在“数学模型”章节详细展开):
设系统的所有可能状态构成集合 S S S,Agent 的一个事务 T T T n n n 个有序操作 o p 1 , o p 2 , . . . , o p n op_1, op_2, ..., op_n op1,op2,...,opn 组成,每个操作 o p i op_i opi 可以表示为一个状态转换函数 f i : S → S f_i: S \rightarrow S fi:SS
事务 T T T 的原子性要求:
要么存在状态序列 s 0 → s 1 → . . . → s n s_0 \rightarrow s_1 \rightarrow ... \rightarrow s_n s0s1...sn,其中 s 0 s_0 s0 是初始状态, s i = f i ( s i − 1 ) s_i = f_i(s_{i-1}) si=fi(si1) 且所有 f i f_i fi 执行成功(记为 T T T 提交);
要么存在最大的 k < n k < n k<n,使得 f 1 f_1 f1 f k f_k fk 执行成功,且存在补偿操作 o p k − 1 , . . . , o p 1 − 1 op_k^{-1}, ..., op_1^{-1} opk1,...,op11,对应的补偿函数 f k − 1 , . . . , f 1 − 1 f_k^{-1}, ..., f_1^{-1} fk1,...,f11 满足 s 0 = f 1 − 1 ( f 2 − 1 ( . . . f k − 1 ( s k ) . . . ) ) s_0 = f_1^{-1}(f_2^{-1}(...f_k^{-1}(s_k)...)) s0=f11(f21(...fk1(sk)...))(记为 T T T 回滚)。

前置知识与学习资源

为了更好地理解本文内容,建议读者具备以下前置知识:

  1. 数据库事务基础:了解关系型数据库的事务机制、undo/redo 日志;
  2. 分布式系统一致性:了解 CAP 理论、BASE 理论、2PC、3PC、Saga 模式;
  3. Python 编程:能够阅读和编写 Python 代码,熟悉类、装饰器、异步编程等概念;
  4. AI Agent 基础:了解 LangChain、AutoGPT 等 Agent 框架的基本使用。

如果读者对上述知识不熟悉,可以参考以下学习资源:

  • 《数据库系统概念》(第7版):第14-16章关于事务的内容;
  • 《数据密集型应用系统设计》:第5-7章关于分布式一致性的内容;
  • LangChain 官方文档:https://python.langchain.com/
  • Saga 模式论文:SAGAS

核心原理解析:AI Agent 原子性保证的核心机制

在明确基础概念后,我们需要深入分析 AI Agent 操作的原子性挑战,并逐一讲解对应的解决机制。

AI Agent 操作的原子性挑战

挑战1:操作的异构性与无事务接口

传统数据库事务依赖于数据库提供的 BEGINCOMMITROLLBACK 接口,但 AI Agent 调用的工具往往不支持这类接口:

  • 调用支付 API 扣款:API 通常没有 ROLLBACK 接口,只能通过调用“退款 API”来撤销;
  • 调用 Git 提交代码:提交后可以通过 git reset 撤销,但如果已经推送到远程仓库,撤销会变得复杂;
  • 调用 LLM 生成内容:生成的内容如果已经发送给用户,几乎无法物理撤销,只能通过“告知用户内容有误”来逻辑撤销。

这种异构性导致我们无法用单一的“事务回滚”机制处理所有操作,必须针对不同类型的操作设计不同的“撤销策略”。

挑战2:操作失败的不确定性

AI Agent 操作失败的原因比传统系统更复杂:

  • 工具层面失败:API 超时、数据库连接失败、文件系统权限不足;
  • 推理层面失败:LLM 规划的操作步骤错误、工具调用参数错误;
  • 环境层面失败:外部系统状态变化(如用户在 Agent 操作过程中手动修改了订单状态);
  • Agent 自身失败:Agent 进程崩溃、内存溢出、网络中断。

其中,“推理层面失败”和“环境层面失败”是传统事务处理中较少遇到的,这要求我们的原子性机制不仅能处理“技术故障”,还能处理“逻辑错误”和“并发冲突”。

挑战3:操作流程的动态性

传统事务的操作流程是预先定义的(如“先扣款,再加款”),但 AI Agent 的操作流程可能由 LLM 实时规划:

示例:用户让 Agent“帮我订一张明天从北京到上海的机票,订好后帮我约一辆从家到机场的车”。
Agent 的操作流程可能是:

  1. 查询明天的机票;
  2. 如果有合适的机票,调用订票 API;
  3. 订票成功后,查询家到机场的打车费用;
  4. 如果费用在预算内,调用打车 API。

在这个流程中,步骤3和4是否执行取决于步骤2的结果,且机票的具体信息、打车的具体时间都是动态获取的。这种动态性使得传统的“静态事务编排”难以适配,我们需要一种“动态状态跟踪”机制。

挑战4:状态的分散性

AI Agent 操作的状态可能分散在多个位置:

  • Agent 内存:当前执行到第几个步骤、每个步骤的执行结果;
  • 状态存储:Redis、数据库中持久化的事务状态;
  • 外部工具:支付系统中的扣款记录、Git 仓库中的提交记录;
  • 用户端:已经发送给用户的消息。

状态的分散性导致我们难以统一跟踪事务的执行进度,也难以保证回滚时所有状态都能被正确恢复。


针对上述挑战,我们可以采用以下核心机制来保证 AI Agent 操作的原子性:状态机管理补偿机制(Saga 模式)事件溯源幂等设计快照隔离。下面我们逐一详解这些机制。


核心机制1:状态机管理——动态跟踪事务执行进度

状态机(Finite State Machine, FSM)是一种用来描述系统状态转换的数学模型,它由以下元素组成:

  • 状态集合 Q Q Q:事务可能处于的所有状态;
  • 初始状态 q 0 ∈ Q q_0 \in Q q0Q:事务开始时的状态;
  • 终止状态集合 F ⊆ Q F \subseteq Q FQ:事务结束时的状态(成功或失败);
  • 转换函数 δ : Q × Σ → Q \delta: Q \times \Sigma \rightarrow Q δ:Q×ΣQ:根据当前状态和输入事件,转换到下一个状态。
为什么用状态机管理 Agent 事务?

状态机非常适合管理 AI Agent 的动态事务流程,原因如下:

  1. 清晰的状态跟踪:可以明确知道事务当前处于哪个步骤、每个步骤的执行结果;
  2. 动态流程支持:可以根据上一个步骤的执行结果(输入事件)动态决定下一个状态;
  3. 容错与恢复:如果 Agent 崩溃,可以从状态存储中读取最后一个状态,恢复事务执行;
  4. 可观测性:可以通过状态变化历史,分析事务的执行过程,便于调试和监控。
状态机的状态设计

针对 AI Agent 的事务,我们可以设计以下状态:

状态名称 状态描述
INIT 事务初始化完成,等待执行第一个操作
EXECUTING_OP_i 正在执行第 i i i 个操作( i i i 从1开始)
OP_i_SUCCESS i i i 个操作执行成功,等待执行下一个操作或提交
OP_i_FAILED i i i 个操作执行失败,等待执行补偿操作
COMPENSATING_OP_j 正在执行第 j j j 个补偿操作( j j j k k k 开始, k k k 是最后一个成功的操作序号)
COMPENSATION_SUCCESS 所有补偿操作执行成功,事务回滚到初始状态
COMPENSATION_FAILED 某个补偿操作执行失败,需要人工介入
COMMITTED 所有操作执行成功,事务提交
状态机的转换流程

我们用 mermaid 流程图来描述状态机的转换过程:

事务创建

开始执行操作1

操作1成功

操作1失败

执行操作2

执行补偿1(无则跳过)

操作2成功

操作2失败

所有操作完成,提交

执行补偿2

补偿2成功

补偿1成功

回滚完成

补偿2失败

补偿1失败

人工介入

事务完成

INIT

EXECUTING_OP_1

OP_1_SUCCESS

OP_1_FAILED

EXECUTING_OP_2

COMPENSATING_OP_1

OP_2_SUCCESS

OP_2_FAILED

COMMITTED

COMPENSATING_OP_2

COMPENSATION_SUCCESS

COMPENSATION_FAILED

在这个流程中:

  1. 事务从 INIT 状态开始,依次执行每个操作;
  2. 如果某个操作成功,进入下一个操作的执行状态;
  3. 如果所有操作都成功,进入 COMMITTED 状态,事务完成;
  4. 如果某个操作失败,从最后一个成功的操作开始,逆序执行补偿操作;
  5. 如果所有补偿操作都成功,进入 COMPENSATION_SUCCESS 状态,事务回滚完成;
  6. 如果某个补偿操作失败,进入 COMPENSATION_FAILED 状态,需要人工介入。
状态持久化

为了支持 Agent 崩溃后的恢复,我们需要将状态机的当前状态、每个操作的执行结果、补偿操作的参数等信息持久化到存储中。常用的存储方案有:

  • Redis:适合存储临时状态,支持快速读写;
  • 关系型数据库(如 PostgreSQL):适合存储持久化状态,支持事务和查询;
  • 事件日志(如 Kafka):适合存储状态变化历史,支持事件溯源。

我们将在“系统核心实现”章节详细讲解状态持久化的代码实现。


核心机制2:补偿机制(Saga 模式)——异构操作的“撤销”方案

由于 AI Agent 调用的工具通常不支持传统的 ROLLBACK 接口,我们需要采用补偿机制(也称为 Saga 模式)来实现操作的撤销。

Saga 模式的起源与核心思想

Saga 模式最早由 Hector Garcia-Molina 和 Kenneth Salem 在 1987 年的论文《SAGAS》中提出,最初用于解决长事务(Long-Lived Transaction)的原子性问题。其核心思想是:

将一个长事务拆分成多个短事务(称为“Saga 步骤”),每个 Saga 步骤都有一个对应的补偿步骤。如果所有 Saga 步骤都成功执行,则事务提交;如果某个 Saga 步骤失败,则逆序执行所有已成功步骤的补偿步骤,将系统回滚到初始状态。

Saga 模式与传统事务的区别

Saga 模式与传统数据库事务的最大区别在于:

  • 隔离性:传统事务通过锁或 MVCC 保证隔离性,而 Saga 模式不保证隔离性(Saga 步骤的中间状态可能被其他事务看到),这符合 BASE 理论中的“Basically Available(基本可用)”和“Soft State(软状态)”;
  • 撤销方式:传统事务通过 undo 日志撤销操作,而 Saga 模式通过显式定义的补偿步骤撤销操作;
  • 适用场景:传统事务适合短事务、单系统场景,而 Saga 模式适合长事务、跨系统场景。

对于 AI Agent 来说,Saga 模式的“无隔离性”通常是可以接受的——因为 Agent 的操作通常是用户发起的单用户事务,并发冲突的概率较低;即使发生并发冲突,也可以通过“乐观锁”或“事后修复”来解决。

补偿操作的设计原则

设计补偿操作是 Saga 模式的核心,也是最难的部分。一个好的补偿操作应该遵循以下原则:

  1. 幂等性:补偿操作可以被多次执行,而不会产生额外的副作用(例如:“退款100元”的补偿操作如果执行两次,不应该退款200元);
  2. 语义正确性:补偿操作应该能够正确撤销原操作的语义(例如:原操作是“扣款100元”,补偿操作应该是“退款100元”,而不是“加款100元”——如果原操作是因为“购买商品”扣款,“退款”比“加款”更符合语义);
  3. 可靠性:补偿操作应该比原操作更可靠(例如:可以将补偿操作写入消息队列,保证即使 Agent 崩溃,补偿操作也能被执行);
  4. 可逆性的妥协:有些操作是不可逆的(例如:已经发送给用户的消息),这时可以采用“逻辑补偿”(例如:发送一条“刚才的消息有误,请忽略”的消息)代替“物理补偿”。
常见操作的补偿操作示例

我们可以将 AI Agent 调用的操作分为几类,并给出对应的补偿操作示例:

操作类型 原操作示例 补偿操作示例 注意事项
数据库操作 INSERT INTO orders (...) DELETE FROM orders WHERE id=? 要保证 DELETE 的幂等性(可以先查询是否存在再删除)
支付操作 payment.charge(amount=100) payment.refund(transaction_id=?) 要保存 transaction_id,退款金额要与原扣款金额一致
Git 操作 git commit -m "fix bug" git reset --soft HEAD~1 如果已经推送到远程仓库,需要用 git push --force,但要注意协作冲突
文件操作 write_file("a.txt", content) delete_file("a.txt")restore_file("a.txt", backup) 最好在写入前备份原文件,补偿时恢复备份
消息发送操作 send_message(user_id, msg) send_message(user_id, "刚才的消息有误,请忽略") 逻辑补偿,无法物理撤销,需要用户配合
Saga 模式的两种实现方式

Saga 模式有两种常见的实现方式:协同式(Choreography)编排式(Orchestration)

1. 协同式 Saga

在协同式 Saga 中,没有一个中央协调者,每个 Saga 步骤在执行完成后,通过事件通知下一个步骤执行;如果某个步骤失败,通过事件通知前面的步骤执行补偿。

协同式 Saga 的优点是去中心化,缺点是流程不清晰,调试困难,不适合 AI Agent 的动态流程。

2. 编排式 Saga

在编排式 Saga 中,有一个中央协调者(即我们的 Agent Harness),由协调者负责调用 Saga 步骤、跟踪执行状态、触发补偿操作。

编排式 Saga 的优点是流程清晰、易于调试、支持动态流程,非常适合 AI Agent 场景——我们的状态机就是协调者的核心。


核心机制3:事件溯源——回放历史,恢复状态

事件溯源(Event Sourcing)是一种以“事件”为核心的状态存储方式,其核心思想是:

不存储系统的当前状态,而是存储所有导致状态变化的事件。当需要恢复当前状态时,只需要按顺序回放所有事件即可。

为什么用事件溯源?

事件溯源非常适合与 Saga 模式、状态机结合使用,原因如下:

  1. 完整的历史记录:可以回放事务的所有操作历史,便于调试和审计;
  2. 状态恢复:如果状态存储损坏,可以通过回放事件重新生成状态;
  3. 时间旅行:可以查询任意时间点的系统状态;
  4. 与补偿机制结合:如果需要回滚事务,可以逆序回放“补偿事件”。
事件的设计

针对 AI Agent 的事务,我们可以设计以下事件类型:

事件类型 事件描述 事件属性示例
TransactionCreated 事务创建 transaction_id, created_at, initial_state
OperationStarted 操作开始执行 transaction_id, operation_id, operation_name, started_at
OperationSucceeded 操作执行成功 transaction_id, operation_id, result, succeeded_at
OperationFailed 操作执行失败 transaction_id, operation_id, error, failed_at
CompensationStarted 补偿操作开始执行 transaction_id, compensation_id, compensation_name, started_at
CompensationSucceeded 补偿操作执行成功 transaction_id, compensation_id, result, succeeded_at
CompensationFailed 补偿操作执行失败 transaction_id, compensation_id, error, failed_at
TransactionCommitted 事务提交 transaction_id, committed_at, final_state
TransactionRolledBack 事务回滚 transaction_id, rolled_back_at, initial_state

每个事件都应该是不可变的(Immutable)——一旦写入事件日志,就不能被修改或删除。如果需要撤销某个事件的影响,可以写入一个“补偿事件”。

事件日志的实现

事件日志可以用以下系统实现:

  • Kafka:适合高吞吐量的事件写入,支持事件回放;
  • EventStoreDB:专门为事件溯源设计的数据库;
  • PostgreSQL:可以用表存储事件,支持事务和查询。

核心机制4:幂等设计——避免重复操作的副作用

在 AI Agent 的事务处理中,由于网络超时、Agent 崩溃等原因,同一个操作或补偿操作可能会被执行多次。为了避免重复操作产生副作用,我们需要保证所有操作和补偿操作都是幂等的

幂等性的定义

幂等性(Idempotency)是指一个操作可以被多次执行,而每次执行的结果都与第一次执行的结果相同。用数学语言描述就是:
f ( f ( x ) ) = f ( x ) f(f(x)) = f(x) f(f(x))=f(x)
其中 f f f 是操作函数, x x x 是输入参数。

幂等设计的常见方法

我们可以用以下方法实现操作的幂等性:

  1. 唯一标识符(Idempotency Key):为每个操作分配一个唯一的 Idempotency Key,在执行操作前先检查是否已经用这个 Key 执行过操作,如果是,则直接返回上次的结果;
  2. 乐观锁:在数据库操作中使用版本号(Version)或时间戳(Timestamp),只有当版本号匹配时才执行操作;
  3. 查询前置:在执行操作前先查询当前状态,只有当状态符合预期时才执行操作(例如:在执行“退款”操作前,先查询订单是否已经扣款,且未退款);
  4. 去重表:在数据库中创建一个去重表,记录已经执行过的操作,执行操作前先查去重表。
Idempotency Key 的实现示例

Idempotency Key 是最常用的幂等设计方法,我们可以用以下流程实现:

  1. Agent Harness 在执行操作前,生成一个唯一的 Idempotency Key(可以用 UUID);
  2. 将 Idempotency Key 和操作参数一起传给外部工具/API;
  3. 外部工具/API 在执行操作前,先检查是否已经用这个 Key 执行过操作;
  4. 如果是,直接返回上次的结果;
  5. 如果否,执行操作,并将结果和 Key 一起存储;
  6. 返回结果给 Agent Harness。

如果外部工具/API 不支持 Idempotency Key,我们可以在 Agent Harness 内部实现去重逻辑:将 Idempotency Key 和操作结果存储在 Redis 或数据库中,执行操作前先查存储。


核心机制5:快照隔离——减少并发冲突的影响

虽然 AI Agent 的操作通常是单用户事务,但仍可能出现并发冲突(例如:两个 Agent 同时修改同一个订单的状态)。为了减少并发冲突的影响,我们可以采用快照隔离(Snapshot Isolation)。

快照隔离的定义

快照隔离是一种数据库隔离级别,其核心思想是:

每个事务在执行时,都看到的是数据库在事务开始时的一个“快照”。事务的修改只有在提交时,才会检查是否与其他事务的修改冲突;如果没有冲突,则提交;否则,回滚。

快照隔离可以避免“脏读”、“不可重复读”、“幻读”等问题,同时性能比“串行化”隔离级别高。

在 AI Agent 中应用快照隔离

我们可以在以下场景应用快照隔离:

  1. 数据库操作:将数据库的隔离级别设置为“快照隔离”(PostgreSQL 中用 SET TRANSACTION ISOLATION LEVEL REPEATABLE READ,MySQL 中用 SET TRANSACTION ISOLATION LEVEL READ COMMITTED 加 MVCC);
  2. 外部系统状态读取:在执行事务前,先读取所有需要用到的外部系统状态,保存为快照,后续操作都基于这个快照进行;
  3. 乐观锁:在快照中保存状态的版本号,提交事务时检查版本号是否与当前外部系统的版本号一致;如果一致,则提交;否则,回滚。

数学模型:形式化描述原子性

为了更严谨地理解 AI Agent 事务的原子性,我们可以用数学模型进行形式化描述。本节将介绍状态转换模型失败模型原子性保证的正确性条件

状态转换模型

我们首先定义系统的状态、操作、事务等基本概念:

1. 系统状态

S S S 是系统的所有可能状态构成的集合,称为状态空间。每个状态 s ∈ S s \in S sS 包含了系统中所有需要关注的信息,例如:

  • 账户余额: s . b a l a n c e [ u s e r i d ] s.balance[user_id] s.balance[userid]
  • 订单状态: s . o r d e r s t a t u s [ o r d e r i d ] s.order_status[order_id] s.orderstatus[orderid]
  • 文件内容: s . f i l e c o n t e n t [ f i l e p a t h ] s.file_content[file_path] s.filecontent[filepath]
  • Agent 事务状态: s . t r a n s a c t i o n s t a t e [ t r a n s a c t i o n i d ] s.transaction_state[transaction_id] s.transactionstate[transactionid]
2. 操作

一个操作 o p op op 是一个部分函数(Partial Function):
o p : S ⇀ S × R e s u l t op: S \rightharpoonup S \times Result op:SS×Result
其中:

  • 定义域 dom ( o p ) ⊆ S \text{dom}(op) \subseteq S dom(op)S 是操作可以执行的状态集合;
  • 对于 s ∈ dom ( o p ) s \in \text{dom}(op) sdom(op) o p ( s ) = ( s ′ , r ) op(s) = (s', r) op(s)=(s,r) 表示操作在状态 s s s 下执行成功,系统状态转换到 s ′ s' s,执行结果为 r r r
  • 对于 s ∉ dom ( o p ) s \notin \text{dom}(op) s/dom(op),操作执行失败,系统状态保持不变,返回错误信息。

我们用 o p ( s ) ↓ op(s) \downarrow op(s) 表示操作在状态 s s s 下执行成功, o p ( s ) ↑ op(s) \uparrow op(s) 表示执行失败。

3. 补偿操作

对于每个操作 o p op op,我们定义一个对应的补偿操作 o p − 1 op^{-1} op1,它也是一个部分函数:
o p − 1 : S ⇀ S × R e s u l t op^{-1}: S \rightharpoonup S \times Result op1:SS×Result
补偿操作的作用是撤销原操作的影响。理想情况下,补偿操作应该满足:
∀ s ∈ dom ( o p ) , 若  o p ( s ) = ( s ′ , r ) , 则  s ′ ∈ dom ( o p − 1 )  且  o p − 1 ( s ′ ) = ( s , r − 1 ) \forall s \in \text{dom}(op), \text{若 } op(s) = (s', r), \text{则 } s' \in \text{dom}(op^{-1}) \text{ 且 } op^{-1}(s') = (s, r^{-1}) sdom(op), op(s)=(s,r), sdom(op1)  op1(s)=(s,r1)
其中 r − 1 r^{-1} r1 是补偿操作的结果。

但在实际场景中,这个理想条件很难完全满足(例如:已经发送给用户的消息无法物理撤销),这时我们可以用“逻辑补偿”来近似满足。

4. 事务

一个事务 T T T 是一个有序的操作序列:
T = [ o p 1 , o p 2 , . . . , o p n ] T = [op_1, op_2, ..., op_n] T=[op1,op2,...,opn]
其中每个 o p i op_i opi 都有对应的补偿操作 o p i − 1 op_i^{-1} opi1

我们用 T k T_k Tk 表示事务的前 k k k 个操作: T k = [ o p 1 , . . . , o p k ] T_k = [op_1, ..., op_k] Tk=[op1,...,opk]


事务的执行语义

我们用状态序列来描述事务的执行过程。设事务的初始状态为 s 0 ∈ S s_0 \in S s0S,事务的执行可以分为两种情况:成功执行失败回滚

1. 成功执行

如果事务的所有操作都执行成功,则存在状态序列:
s 0 → o p 1 s 1 → o p 2 s 2 → . . . → o p n s n s_0 \xrightarrow{op_1} s_1 \xrightarrow{op_2} s_2 \xrightarrow{...} \xrightarrow{op_n} s_n s0op1 s1op2 s2... opn sn
其中:

  • s i − 1 ∈ dom ( o p i ) s_{i-1} \in \text{dom}(op_i) si1dom(opi),即第 i i i 个操作可以在状态 s i − 1 s_{i-1} si1 下执行;
  • o p i ( s i − 1 ) = ( s i , r i ) op_i(s_{i-1}) = (s_i, r_i) opi(si1)=(si,ri),即第 i i i 个操作执行成功,状态转换到 s i s_i si,结果为 r i r_i ri

此时,我们称事务 T T T 在初始状态 s 0 s_0 s0提交,最终状态为 s n s_n sn

2. 失败回滚

如果事务的第 k k k 个操作执行失败( k ≤ n k \leq n kn),则前 k − 1 k-1 k1 个操作执行成功,第 k k k 个操作执行失败:
s 0 → o p 1 s 1 → . . . → o p k − 1 s k − 1 → o p k ↑ s_0 \xrightarrow{op_1} s_1 \xrightarrow{...} \xrightarrow{op_{k-1}} s_{k-1} \xrightarrow{op_k} \uparrow s0op1 s1... opk1 sk1opk
此时,我们需要逆序执行前 k − 1 k-1 k1 个操作的补偿操作:
s k − 1 → o p k − 1 − 1 s k − 2 ′ → . . . → o p 1 − 1 s 0 ′ s_{k-1} \xrightarrow{op_{k-1}^{-1}} s_{k-2}' \xrightarrow{...} \xrightarrow{op_1^{-1}} s_0' sk1opk11 sk2... op11 s0
理想情况下, s 0 ′ = s 0 s_0' = s_0 s0=s0,即事务回滚到初始状态。

如果所有补偿操作都执行成功,我们称事务 T T T 在初始状态 s 0 s_0 s0回滚成功;如果某个补偿操作执行失败,我们称事务 T T T 回滚失败,需要人工介入。


失败模型

在实际场景中,操作可能会因为各种原因失败,我们需要用失败模型来描述这些失败情况。常见的失败模型有:

1. 崩溃失败(Crash Failure)

操作在执行过程中崩溃,系统状态保持不变,操作没有产生任何副作用。这是最简单的失败模型,我们可以通过重新执行操作来解决。

2. 遗漏失败(Omission Failure)

操作执行了,但结果没有返回给调用者(例如:网络超时)。此时,调用者无法确定操作是否成功,需要通过“查询操作状态”或“幂等重试”来解决。

3. 时序失败(Timing Failure)

操作执行了,但返回结果的时间超出了预期。这通常可以通过设置超时时间来解决。

4. 拜占庭失败(Byzantine Failure)

操作执行了,但返回了错误的结果,或者产生了预期之外的副作用。这是最复杂的失败模型,在 AI Agent 场景中,LLM 的推理错误可能导致拜占庭失败。我们可以通过“多 Agent 投票”、“结果验证”来解决。


原子性保证的正确性条件

现在,我们可以用形式化语言描述 AI Agent 事务原子性保证的正确性条件

T = [ o p 1 , . . . , o p n ] T = [op_1, ..., op_n] T=[op1,...,opn] 是一个事务, s 0 s_0 s0 是初始状态, A \mathcal{A} A 是我们的原子性保证机制。正确性条件包括以下三个部分:

1. 终止性(Termination)

机制 A \mathcal{A} A 必须保证事务最终会结束,即:要么提交,要么回滚成功,要么回滚失败(人工介入)。不会出现“事务一直挂起”的情况。

2. 原子性(Atomicity)

机制 A \mathcal{A} A 必须保证:

  • 如果事务提交,则所有操作都执行成功,系统状态转换到 s n s_n sn
  • 如果事务回滚成功,则系统状态回到 s 0 s_0 s0(或逻辑上等价的状态);
  • 不会出现“部分操作执行成功,部分操作执行失败,且没有回滚”的情况。
3. 可靠性(Reliability)

机制 A \mathcal{A} A 必须保证:即使在崩溃失败、遗漏失败等情况下,正确性条件仍然成立。


算法流程图与源代码实现

在了解核心机制和数学模型后,我们可以设计并实现一个简单的 AI Agent Harness,用状态机和 Saga 模式保证操作的原子性。本节将首先介绍算法流程图,然后给出 Python 源代码实现。

算法流程图

我们将实现一个编排式 Saga 协调器,其核心算法流程分为两个部分:事务执行流程事务回滚流程

1. 事务执行流程

事务执行流程的输入是“操作列表”和“补偿操作列表”,输出是“事务提交”或“触发回滚”。我们用 mermaid 流程图描述:

渲染错误: Mermaid 渲染失败: Parse error on line 3: ...op=1, succeeded_ops=[]] C --> D[持久化状 -----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'SQS'
2. 事务回滚流程

事务回滚流程的输入是“事务ID”,输出是“回滚成功”或“回滚失败”。我们用 mermaid 流程图描述:

渲染错误: Mermaid 渲染失败: Parse error on line 13: ...urrent_compensation^{-1}] M --> N{补偿 -----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'DIAMOND_START'

Python 源代码实现

现在,我们用 Python 实现上述算法。我们将使用以下依赖库:

  • uuid:生成唯一的事务ID和幂等键;
  • dataclasses:定义数据类;
  • typing:类型注解;
  • redis:存储状态和幂等键(可选,这里用字典模拟);
  • logging:记录日志。

首先,我们需要安装依赖库(如果用真实的 Redis):

pip install redis

步骤1:定义数据类和枚举

我们首先定义事务状态的枚举、操作基类、补偿操作基类和事务数据类:

import uuid
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Callable
from enum import Enum

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("AgentHarness")

# 事务状态枚举
class TransactionStatus(Enum):
    INIT = "INIT"
    EXECUTING_OP = "EXECUTING_OP"
    OP_SUCCESS = "OP_SUCCESS"
    OP_FAILED = "OP_FAILED"
    COMPENSATING_OP = "COMPENSATING_OP"
    COMPENSATION_SUCCESS = "COMPENSATION_SUCCESS"
    COMPENSATION_FAILED = "COMPENSATION_FAILED"
    COMMITTED = "COMMITTED"

# 操作结果数据类
@dataclass
class OperationResult:
    success: bool
    result: Optional[Any] = None
    error: Optional[str] = None

# 操作基类
class Operation:
    def __init__(self, name: str):
        self.name = name
    
    def execute(self, context: Dict[str, Any]) -> OperationResult:
        """执行操作,子类必须实现这个方法"""
        raise NotImplementedError("Subclasses must implement execute()")
    
    def get_compensation(self) -> Optional['CompensationOperation']:
        """获取补偿操作,子类可以覆盖这个方法"""
        return None

# 补偿操作基类
class CompensationOperation:
    def __init__(self, name: str):
        self.name = name
    
    def execute(self, context: Dict[str, Any], op_result: OperationResult) -> OperationResult:
        """执行补偿操作,子类必须实现这个方法"""
        raise NotImplementedError("Subclasses must implement execute()")

# 事务数据类
@dataclass
class Transaction:
    tx_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    status: TransactionStatus = TransactionStatus.INIT
    current_op_index: int = 0  # 当前执行的操作索引(从0开始)
    succeeded_ops: List[int] = field(default_factory=list)  # 已成功的操作索引列表
    current_compensation_index: int = -1  # 当前执行的补偿操作索引(从succeeded_ops的最后一个开始)
    context: Dict[str, Any] = field(default_factory=dict)  # 事务上下文,用于存储操作参数和结果
    error: Optional[str] = None  # 错误信息
    operations: List[Operation] = field(default_factory=list)  # 操作列表

步骤2:实现状态存储和幂等存储

接下来,我们实现状态存储和幂等存储。这里为了简单,我们用字典模拟 Redis 存储;在实际项目中,可以用真实的 Redis 或数据库。

class StateStore:
    """状态存储类,用于持久化事务状态"""
    def __init__(self):
        self._store: Dict[str, Transaction] = {}
    
    def save(self, tx: Transaction) -> None:
        """保存事务状态"""
        self._store[tx.tx_id] = tx
        logger.info(f"Saved transaction {tx.tx_id} with status {tx.status}")
    
    def get(self, tx_id: str) -> Optional[Transaction]:
        """获取事务状态"""
        return self._store.get(tx_id)
    
    def delete(self, tx_id: str) -> None:
        """删除事务状态(可选,用于清理)"""
        if tx_id in self._store:
            del self._store[tx_id]
            logger.info(f"Deleted transaction {tx_id}")

class IdempotencyStore:
    """幂等存储类,用于存储操作和补偿操作的执行结果"""
    def __init__(self):
        self._store: Dict[str, OperationResult] = {}
    
    def save(self, idempotency_key: str, result: OperationResult) -> None:
        """保存幂等键和结果"""
        self._store[idempotency_key] = result
        logger.info(f"Saved idempotency key {idempotency_key} with result {result}")
    
    def get(self, idempotency_key: str) -> Optional[OperationResult]:
        """获取幂等键对应的结果"""
        return self._store.get(idempotency_key)

步骤3:实现 Saga 协调器

现在,我们实现核心的 Saga 协调器,它包含“执行事务”和“回滚事务”两个方法。

class SagaCoordinator:
    """Saga 协调器,负责执行事务和回滚事务"""
    def __init__(self, state_store: StateStore, idempotency_store: IdempotencyStore):
        self.state_store = state_store
        self.idempotency_store = idempotency_store
    
    def _generate_idempot
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐