AI Agent Harness Engineering 的事务处理:保证操作的原子性
AI Agent Harness Engineering 的事务处理:保证操作的原子性
引言
背景介绍:AI Agent 时代的到来
近年来,以大语言模型(LLM)为核心的 AI Agent 技术正在深刻改变软件系统的构建方式。从自动化客服处理退款请求、代码助手自动修复并提交代码,到 DevOps Agent 完成环境搭建-测试执行-结果上报的全流程,AI Agent 正从“单一工具调用者”演变为“跨系统事务协调者”。
然而,当 AI Agent 的操作从“单步骤、单系统”扩展到“多步骤、跨工具、多系统”时,一个传统软件工程中早已面临的问题再次浮现:如何保证 Agent 执行的一系列操作是“原子性”的? 即:要么所有操作全部成功,系统状态发生预期变化;要么任一操作失败,所有已执行的操作都能被撤销,系统回到操作前的初始状态。
核心问题:AI Agent 操作的原子性困境
在传统软件系统中,我们可以依赖数据库事务、分布式事务协议(如 2PC、Saga)来保证原子性。但 AI Agent 的操作具有以下独特特点,使得传统方案难以直接套用:
- 操作异构性:Agent 可能调用数据库、API、文件系统、LLM 推理、Git 等完全不同的工具,这些工具通常不支持统一的事务接口;
- 不确定性:LLM 的推理结果、外部 API 的响应可能存在不确定性,导致操作失败的原因更加复杂;
- 动态性:Agent 的操作流程可能由 LLM 实时规划,而非预先定义,传统静态事务编排难以适配;
- 状态分散性:操作的状态可能分散在 Agent 内存、外部工具、数据库中,难以统一跟踪和回滚。
这些特点构成了本文要解决的核心问题:如何在 AI Agent Harness(Agent 的“ harness ”指支撑 Agent 运行、管理其生命周期和操作的框架层)中,设计一套通用的事务处理机制,以保证 Agent 多步骤操作的原子性?
文章脉络
本文将采用“深度剖析+实践落地”的结构,层层递进地讲解 AI Agent Harness 中原子性保证的原理与实现:
- 基础概念:明确 AI Agent、Harness Engineering、事务原子性等核心术语,建立知识基础;
- 核心原理解析:分析 AI Agent 操作的原子性挑战,详解状态机管理、补偿机制、事件溯源等核心保证机制;
- 数学模型与算法:用形式化语言描述原子性,设计状态机和补偿机制的算法流程并实现代码;
- 实践应用:通过退款处理、代码修改等实际场景,演示原子性机制的应用;
- 系统设计与实现:介绍一个开源 Agent Harness 项目的架构、接口和核心代码;
- 最佳实践与行业趋势:总结落地经验,展望 AI Agent 事务处理的未来发展。
基础概念:建立知识坐标系
在深入探讨原子性保证机制之前,我们需要先明确本文涉及的核心概念,为后续讨论建立统一的术语体系。
核心概念定义
1. AI Agent
AI Agent 是指能够感知环境、通过推理做出决策、并采取行动影响环境的自主实体。在工程化场景中,一个典型的 AI Agent 通常包含以下组件:
- 感知模块:收集环境信息(如用户输入、系统状态、外部数据);
- 推理模块:基于 LLM 或规则引擎,根据感知信息规划操作步骤;
- 执行模块:调用工具/API 执行具体操作;
- 记忆模块:存储历史对话、操作状态、环境上下文。
本文中的“Agent 操作”特指执行模块调用多个工具完成的一系列关联动作。
2. Agent Harness Engineering
“ Harness ”原意为“马具”,在软件工程中引申为“支撑系统运行的框架层”。Agent Harness Engineering 是一门研究如何设计、实现、优化 Agent 支撑框架的学科,其核心目标是解决 Agent 工程化落地中的共性问题,包括:
- 生命周期管理(启动、暂停、终止、重启);
- 操作编排与事务处理;
- 状态持久化与恢复;
- 错误处理与容错;
- 监控与可观测性;
- 安全与权限控制。
本文聚焦于 Harness 中的“事务处理”模块,尤其是“原子性保证”功能。
3. 事务处理与 ACID 特性
事务(Transaction)是指一组逻辑上相关的操作,这组操作要么全部成功执行,要么全部不执行。事务的核心特性由 ACID 四个字母概括:
- A(Atomicity,原子性):事务是一个不可分割的工作单元,所有操作要么全部完成,要么全部回滚到初始状态;
- C(Consistency,一致性):事务执行前后,系统从一个一致性状态转换到另一个一致性状态(即数据满足所有预定义的约束);
- I(Isolation,隔离性):多个事务并发执行时,一个事务的中间状态不会被其他事务看到;
- D(Durability,持久性):事务一旦提交,其对系统的修改就是永久的,即使系统崩溃也不会丢失。
在 AI Agent 场景中,我们首先需要保证的是原子性——因为如果操作不是原子的,一致性、隔离性、持久性都将无从谈起。
4. 原子性的形式化定义
为了更准确地理解原子性,我们可以用状态转换模型进行形式化描述(后续会在“数学模型”章节详细展开):
设系统的所有可能状态构成集合 S S S,Agent 的一个事务 T T T 由 n n n 个有序操作 o p 1 , o p 2 , . . . , o p n op_1, op_2, ..., op_n op1,op2,...,opn 组成,每个操作 o p i op_i opi 可以表示为一个状态转换函数 f i : S → S f_i: S \rightarrow S fi:S→S。
事务 T T T 的原子性要求:
要么存在状态序列 s 0 → s 1 → . . . → s n s_0 \rightarrow s_1 \rightarrow ... \rightarrow s_n s0→s1→...→sn,其中 s 0 s_0 s0 是初始状态, s i = f i ( s i − 1 ) s_i = f_i(s_{i-1}) si=fi(si−1) 且所有 f i f_i fi 执行成功(记为 T T T 提交);
要么存在最大的 k < n k < n k<n,使得 f 1 f_1 f1 到 f k f_k fk 执行成功,且存在补偿操作 o p k − 1 , . . . , o p 1 − 1 op_k^{-1}, ..., op_1^{-1} opk−1,...,op1−1,对应的补偿函数 f k − 1 , . . . , f 1 − 1 f_k^{-1}, ..., f_1^{-1} fk−1,...,f1−1 满足 s 0 = f 1 − 1 ( f 2 − 1 ( . . . f k − 1 ( s k ) . . . ) ) s_0 = f_1^{-1}(f_2^{-1}(...f_k^{-1}(s_k)...)) s0=f1−1(f2−1(...fk−1(sk)...))(记为 T T T 回滚)。
前置知识与学习资源
为了更好地理解本文内容,建议读者具备以下前置知识:
- 数据库事务基础:了解关系型数据库的事务机制、undo/redo 日志;
- 分布式系统一致性:了解 CAP 理论、BASE 理论、2PC、3PC、Saga 模式;
- Python 编程:能够阅读和编写 Python 代码,熟悉类、装饰器、异步编程等概念;
- AI Agent 基础:了解 LangChain、AutoGPT 等 Agent 框架的基本使用。
如果读者对上述知识不熟悉,可以参考以下学习资源:
- 《数据库系统概念》(第7版):第14-16章关于事务的内容;
- 《数据密集型应用系统设计》:第5-7章关于分布式一致性的内容;
- LangChain 官方文档:https://python.langchain.com/;
- Saga 模式论文:SAGAS。
核心原理解析:AI Agent 原子性保证的核心机制
在明确基础概念后,我们需要深入分析 AI Agent 操作的原子性挑战,并逐一讲解对应的解决机制。
AI Agent 操作的原子性挑战
挑战1:操作的异构性与无事务接口
传统数据库事务依赖于数据库提供的 BEGIN、COMMIT、ROLLBACK 接口,但 AI Agent 调用的工具往往不支持这类接口:
- 调用支付 API 扣款:API 通常没有
ROLLBACK接口,只能通过调用“退款 API”来撤销; - 调用 Git 提交代码:提交后可以通过
git reset撤销,但如果已经推送到远程仓库,撤销会变得复杂; - 调用 LLM 生成内容:生成的内容如果已经发送给用户,几乎无法物理撤销,只能通过“告知用户内容有误”来逻辑撤销。
这种异构性导致我们无法用单一的“事务回滚”机制处理所有操作,必须针对不同类型的操作设计不同的“撤销策略”。
挑战2:操作失败的不确定性
AI Agent 操作失败的原因比传统系统更复杂:
- 工具层面失败:API 超时、数据库连接失败、文件系统权限不足;
- 推理层面失败:LLM 规划的操作步骤错误、工具调用参数错误;
- 环境层面失败:外部系统状态变化(如用户在 Agent 操作过程中手动修改了订单状态);
- Agent 自身失败:Agent 进程崩溃、内存溢出、网络中断。
其中,“推理层面失败”和“环境层面失败”是传统事务处理中较少遇到的,这要求我们的原子性机制不仅能处理“技术故障”,还能处理“逻辑错误”和“并发冲突”。
挑战3:操作流程的动态性
传统事务的操作流程是预先定义的(如“先扣款,再加款”),但 AI Agent 的操作流程可能由 LLM 实时规划:
示例:用户让 Agent“帮我订一张明天从北京到上海的机票,订好后帮我约一辆从家到机场的车”。
Agent 的操作流程可能是:
- 查询明天的机票;
- 如果有合适的机票,调用订票 API;
- 订票成功后,查询家到机场的打车费用;
- 如果费用在预算内,调用打车 API。
在这个流程中,步骤3和4是否执行取决于步骤2的结果,且机票的具体信息、打车的具体时间都是动态获取的。这种动态性使得传统的“静态事务编排”难以适配,我们需要一种“动态状态跟踪”机制。
挑战4:状态的分散性
AI Agent 操作的状态可能分散在多个位置:
- Agent 内存:当前执行到第几个步骤、每个步骤的执行结果;
- 状态存储:Redis、数据库中持久化的事务状态;
- 外部工具:支付系统中的扣款记录、Git 仓库中的提交记录;
- 用户端:已经发送给用户的消息。
状态的分散性导致我们难以统一跟踪事务的执行进度,也难以保证回滚时所有状态都能被正确恢复。
针对上述挑战,我们可以采用以下核心机制来保证 AI Agent 操作的原子性:状态机管理、补偿机制(Saga 模式)、事件溯源、幂等设计、快照隔离。下面我们逐一详解这些机制。
核心机制1:状态机管理——动态跟踪事务执行进度
状态机(Finite State Machine, FSM)是一种用来描述系统状态转换的数学模型,它由以下元素组成:
- 状态集合 Q Q Q:事务可能处于的所有状态;
- 初始状态 q 0 ∈ Q q_0 \in Q q0∈Q:事务开始时的状态;
- 终止状态集合 F ⊆ Q F \subseteq Q F⊆Q:事务结束时的状态(成功或失败);
- 转换函数 δ : Q × Σ → Q \delta: Q \times \Sigma \rightarrow Q δ:Q×Σ→Q:根据当前状态和输入事件,转换到下一个状态。
为什么用状态机管理 Agent 事务?
状态机非常适合管理 AI Agent 的动态事务流程,原因如下:
- 清晰的状态跟踪:可以明确知道事务当前处于哪个步骤、每个步骤的执行结果;
- 动态流程支持:可以根据上一个步骤的执行结果(输入事件)动态决定下一个状态;
- 容错与恢复:如果 Agent 崩溃,可以从状态存储中读取最后一个状态,恢复事务执行;
- 可观测性:可以通过状态变化历史,分析事务的执行过程,便于调试和监控。
状态机的状态设计
针对 AI Agent 的事务,我们可以设计以下状态:
| 状态名称 | 状态描述 |
|---|---|
INIT |
事务初始化完成,等待执行第一个操作 |
EXECUTING_OP_i |
正在执行第 i i i 个操作( i i i 从1开始) |
OP_i_SUCCESS |
第 i i i 个操作执行成功,等待执行下一个操作或提交 |
OP_i_FAILED |
第 i i i 个操作执行失败,等待执行补偿操作 |
COMPENSATING_OP_j |
正在执行第 j j j 个补偿操作( j j j 从 k k k 开始, k k k 是最后一个成功的操作序号) |
COMPENSATION_SUCCESS |
所有补偿操作执行成功,事务回滚到初始状态 |
COMPENSATION_FAILED |
某个补偿操作执行失败,需要人工介入 |
COMMITTED |
所有操作执行成功,事务提交 |
状态机的转换流程
我们用 mermaid 流程图来描述状态机的转换过程:
在这个流程中:
- 事务从
INIT状态开始,依次执行每个操作; - 如果某个操作成功,进入下一个操作的执行状态;
- 如果所有操作都成功,进入
COMMITTED状态,事务完成; - 如果某个操作失败,从最后一个成功的操作开始,逆序执行补偿操作;
- 如果所有补偿操作都成功,进入
COMPENSATION_SUCCESS状态,事务回滚完成; - 如果某个补偿操作失败,进入
COMPENSATION_FAILED状态,需要人工介入。
状态持久化
为了支持 Agent 崩溃后的恢复,我们需要将状态机的当前状态、每个操作的执行结果、补偿操作的参数等信息持久化到存储中。常用的存储方案有:
- Redis:适合存储临时状态,支持快速读写;
- 关系型数据库(如 PostgreSQL):适合存储持久化状态,支持事务和查询;
- 事件日志(如 Kafka):适合存储状态变化历史,支持事件溯源。
我们将在“系统核心实现”章节详细讲解状态持久化的代码实现。
核心机制2:补偿机制(Saga 模式)——异构操作的“撤销”方案
由于 AI Agent 调用的工具通常不支持传统的 ROLLBACK 接口,我们需要采用补偿机制(也称为 Saga 模式)来实现操作的撤销。
Saga 模式的起源与核心思想
Saga 模式最早由 Hector Garcia-Molina 和 Kenneth Salem 在 1987 年的论文《SAGAS》中提出,最初用于解决长事务(Long-Lived Transaction)的原子性问题。其核心思想是:
将一个长事务拆分成多个短事务(称为“Saga 步骤”),每个 Saga 步骤都有一个对应的补偿步骤。如果所有 Saga 步骤都成功执行,则事务提交;如果某个 Saga 步骤失败,则逆序执行所有已成功步骤的补偿步骤,将系统回滚到初始状态。
Saga 模式与传统事务的区别
Saga 模式与传统数据库事务的最大区别在于:
- 隔离性:传统事务通过锁或 MVCC 保证隔离性,而 Saga 模式不保证隔离性(Saga 步骤的中间状态可能被其他事务看到),这符合 BASE 理论中的“Basically Available(基本可用)”和“Soft State(软状态)”;
- 撤销方式:传统事务通过 undo 日志撤销操作,而 Saga 模式通过显式定义的补偿步骤撤销操作;
- 适用场景:传统事务适合短事务、单系统场景,而 Saga 模式适合长事务、跨系统场景。
对于 AI Agent 来说,Saga 模式的“无隔离性”通常是可以接受的——因为 Agent 的操作通常是用户发起的单用户事务,并发冲突的概率较低;即使发生并发冲突,也可以通过“乐观锁”或“事后修复”来解决。
补偿操作的设计原则
设计补偿操作是 Saga 模式的核心,也是最难的部分。一个好的补偿操作应该遵循以下原则:
- 幂等性:补偿操作可以被多次执行,而不会产生额外的副作用(例如:“退款100元”的补偿操作如果执行两次,不应该退款200元);
- 语义正确性:补偿操作应该能够正确撤销原操作的语义(例如:原操作是“扣款100元”,补偿操作应该是“退款100元”,而不是“加款100元”——如果原操作是因为“购买商品”扣款,“退款”比“加款”更符合语义);
- 可靠性:补偿操作应该比原操作更可靠(例如:可以将补偿操作写入消息队列,保证即使 Agent 崩溃,补偿操作也能被执行);
- 可逆性的妥协:有些操作是不可逆的(例如:已经发送给用户的消息),这时可以采用“逻辑补偿”(例如:发送一条“刚才的消息有误,请忽略”的消息)代替“物理补偿”。
常见操作的补偿操作示例
我们可以将 AI Agent 调用的操作分为几类,并给出对应的补偿操作示例:
| 操作类型 | 原操作示例 | 补偿操作示例 | 注意事项 |
|---|---|---|---|
| 数据库操作 | INSERT INTO orders (...) |
DELETE FROM orders WHERE id=? |
要保证 DELETE 的幂等性(可以先查询是否存在再删除) |
| 支付操作 | payment.charge(amount=100) |
payment.refund(transaction_id=?) |
要保存 transaction_id,退款金额要与原扣款金额一致 |
| Git 操作 | git commit -m "fix bug" |
git reset --soft HEAD~1 |
如果已经推送到远程仓库,需要用 git push --force,但要注意协作冲突 |
| 文件操作 | write_file("a.txt", content) |
delete_file("a.txt") 或 restore_file("a.txt", backup) |
最好在写入前备份原文件,补偿时恢复备份 |
| 消息发送操作 | send_message(user_id, msg) |
send_message(user_id, "刚才的消息有误,请忽略") |
逻辑补偿,无法物理撤销,需要用户配合 |
Saga 模式的两种实现方式
Saga 模式有两种常见的实现方式:协同式(Choreography) 和 编排式(Orchestration)。
1. 协同式 Saga
在协同式 Saga 中,没有一个中央协调者,每个 Saga 步骤在执行完成后,通过事件通知下一个步骤执行;如果某个步骤失败,通过事件通知前面的步骤执行补偿。
协同式 Saga 的优点是去中心化,缺点是流程不清晰,调试困难,不适合 AI Agent 的动态流程。
2. 编排式 Saga
在编排式 Saga 中,有一个中央协调者(即我们的 Agent Harness),由协调者负责调用 Saga 步骤、跟踪执行状态、触发补偿操作。
编排式 Saga 的优点是流程清晰、易于调试、支持动态流程,非常适合 AI Agent 场景——我们的状态机就是协调者的核心。
核心机制3:事件溯源——回放历史,恢复状态
事件溯源(Event Sourcing)是一种以“事件”为核心的状态存储方式,其核心思想是:
不存储系统的当前状态,而是存储所有导致状态变化的事件。当需要恢复当前状态时,只需要按顺序回放所有事件即可。
为什么用事件溯源?
事件溯源非常适合与 Saga 模式、状态机结合使用,原因如下:
- 完整的历史记录:可以回放事务的所有操作历史,便于调试和审计;
- 状态恢复:如果状态存储损坏,可以通过回放事件重新生成状态;
- 时间旅行:可以查询任意时间点的系统状态;
- 与补偿机制结合:如果需要回滚事务,可以逆序回放“补偿事件”。
事件的设计
针对 AI Agent 的事务,我们可以设计以下事件类型:
| 事件类型 | 事件描述 | 事件属性示例 |
|---|---|---|
TransactionCreated |
事务创建 | transaction_id, created_at, initial_state |
OperationStarted |
操作开始执行 | transaction_id, operation_id, operation_name, started_at |
OperationSucceeded |
操作执行成功 | transaction_id, operation_id, result, succeeded_at |
OperationFailed |
操作执行失败 | transaction_id, operation_id, error, failed_at |
CompensationStarted |
补偿操作开始执行 | transaction_id, compensation_id, compensation_name, started_at |
CompensationSucceeded |
补偿操作执行成功 | transaction_id, compensation_id, result, succeeded_at |
CompensationFailed |
补偿操作执行失败 | transaction_id, compensation_id, error, failed_at |
TransactionCommitted |
事务提交 | transaction_id, committed_at, final_state |
TransactionRolledBack |
事务回滚 | transaction_id, rolled_back_at, initial_state |
每个事件都应该是不可变的(Immutable)——一旦写入事件日志,就不能被修改或删除。如果需要撤销某个事件的影响,可以写入一个“补偿事件”。
事件日志的实现
事件日志可以用以下系统实现:
- Kafka:适合高吞吐量的事件写入,支持事件回放;
- EventStoreDB:专门为事件溯源设计的数据库;
- PostgreSQL:可以用表存储事件,支持事务和查询。
核心机制4:幂等设计——避免重复操作的副作用
在 AI Agent 的事务处理中,由于网络超时、Agent 崩溃等原因,同一个操作或补偿操作可能会被执行多次。为了避免重复操作产生副作用,我们需要保证所有操作和补偿操作都是幂等的。
幂等性的定义
幂等性(Idempotency)是指一个操作可以被多次执行,而每次执行的结果都与第一次执行的结果相同。用数学语言描述就是:
f ( f ( x ) ) = f ( x ) f(f(x)) = f(x) f(f(x))=f(x)
其中 f f f 是操作函数, x x x 是输入参数。
幂等设计的常见方法
我们可以用以下方法实现操作的幂等性:
- 唯一标识符(Idempotency Key):为每个操作分配一个唯一的 Idempotency Key,在执行操作前先检查是否已经用这个 Key 执行过操作,如果是,则直接返回上次的结果;
- 乐观锁:在数据库操作中使用版本号(Version)或时间戳(Timestamp),只有当版本号匹配时才执行操作;
- 查询前置:在执行操作前先查询当前状态,只有当状态符合预期时才执行操作(例如:在执行“退款”操作前,先查询订单是否已经扣款,且未退款);
- 去重表:在数据库中创建一个去重表,记录已经执行过的操作,执行操作前先查去重表。
Idempotency Key 的实现示例
Idempotency Key 是最常用的幂等设计方法,我们可以用以下流程实现:
- Agent Harness 在执行操作前,生成一个唯一的 Idempotency Key(可以用 UUID);
- 将 Idempotency Key 和操作参数一起传给外部工具/API;
- 外部工具/API 在执行操作前,先检查是否已经用这个 Key 执行过操作;
- 如果是,直接返回上次的结果;
- 如果否,执行操作,并将结果和 Key 一起存储;
- 返回结果给 Agent Harness。
如果外部工具/API 不支持 Idempotency Key,我们可以在 Agent Harness 内部实现去重逻辑:将 Idempotency Key 和操作结果存储在 Redis 或数据库中,执行操作前先查存储。
核心机制5:快照隔离——减少并发冲突的影响
虽然 AI Agent 的操作通常是单用户事务,但仍可能出现并发冲突(例如:两个 Agent 同时修改同一个订单的状态)。为了减少并发冲突的影响,我们可以采用快照隔离(Snapshot Isolation)。
快照隔离的定义
快照隔离是一种数据库隔离级别,其核心思想是:
每个事务在执行时,都看到的是数据库在事务开始时的一个“快照”。事务的修改只有在提交时,才会检查是否与其他事务的修改冲突;如果没有冲突,则提交;否则,回滚。
快照隔离可以避免“脏读”、“不可重复读”、“幻读”等问题,同时性能比“串行化”隔离级别高。
在 AI Agent 中应用快照隔离
我们可以在以下场景应用快照隔离:
- 数据库操作:将数据库的隔离级别设置为“快照隔离”(PostgreSQL 中用
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ,MySQL 中用SET TRANSACTION ISOLATION LEVEL READ COMMITTED加 MVCC); - 外部系统状态读取:在执行事务前,先读取所有需要用到的外部系统状态,保存为快照,后续操作都基于这个快照进行;
- 乐观锁:在快照中保存状态的版本号,提交事务时检查版本号是否与当前外部系统的版本号一致;如果一致,则提交;否则,回滚。
数学模型:形式化描述原子性
为了更严谨地理解 AI Agent 事务的原子性,我们可以用数学模型进行形式化描述。本节将介绍状态转换模型、失败模型和原子性保证的正确性条件。
状态转换模型
我们首先定义系统的状态、操作、事务等基本概念:
1. 系统状态
设 S S S 是系统的所有可能状态构成的集合,称为状态空间。每个状态 s ∈ S s \in S s∈S 包含了系统中所有需要关注的信息,例如:
- 账户余额: s . b a l a n c e [ u s e r i d ] s.balance[user_id] s.balance[userid];
- 订单状态: s . o r d e r s t a t u s [ o r d e r i d ] s.order_status[order_id] s.orderstatus[orderid];
- 文件内容: s . f i l e c o n t e n t [ f i l e p a t h ] s.file_content[file_path] s.filecontent[filepath];
- Agent 事务状态: s . t r a n s a c t i o n s t a t e [ t r a n s a c t i o n i d ] s.transaction_state[transaction_id] s.transactionstate[transactionid]。
2. 操作
一个操作 o p op op 是一个部分函数(Partial Function):
o p : S ⇀ S × R e s u l t op: S \rightharpoonup S \times Result op:S⇀S×Result
其中:
- 定义域 dom ( o p ) ⊆ S \text{dom}(op) \subseteq S dom(op)⊆S 是操作可以执行的状态集合;
- 对于 s ∈ dom ( o p ) s \in \text{dom}(op) s∈dom(op), o p ( s ) = ( s ′ , r ) op(s) = (s', r) op(s)=(s′,r) 表示操作在状态 s s s 下执行成功,系统状态转换到 s ′ s' s′,执行结果为 r r r;
- 对于 s ∉ dom ( o p ) s \notin \text{dom}(op) s∈/dom(op),操作执行失败,系统状态保持不变,返回错误信息。
我们用 o p ( s ) ↓ op(s) \downarrow op(s)↓ 表示操作在状态 s s s 下执行成功, o p ( s ) ↑ op(s) \uparrow op(s)↑ 表示执行失败。
3. 补偿操作
对于每个操作 o p op op,我们定义一个对应的补偿操作 o p − 1 op^{-1} op−1,它也是一个部分函数:
o p − 1 : S ⇀ S × R e s u l t op^{-1}: S \rightharpoonup S \times Result op−1:S⇀S×Result
补偿操作的作用是撤销原操作的影响。理想情况下,补偿操作应该满足:
∀ s ∈ dom ( o p ) , 若 o p ( s ) = ( s ′ , r ) , 则 s ′ ∈ dom ( o p − 1 ) 且 o p − 1 ( s ′ ) = ( s , r − 1 ) \forall s \in \text{dom}(op), \text{若 } op(s) = (s', r), \text{则 } s' \in \text{dom}(op^{-1}) \text{ 且 } op^{-1}(s') = (s, r^{-1}) ∀s∈dom(op),若 op(s)=(s′,r),则 s′∈dom(op−1) 且 op−1(s′)=(s,r−1)
其中 r − 1 r^{-1} r−1 是补偿操作的结果。
但在实际场景中,这个理想条件很难完全满足(例如:已经发送给用户的消息无法物理撤销),这时我们可以用“逻辑补偿”来近似满足。
4. 事务
一个事务 T T T 是一个有序的操作序列:
T = [ o p 1 , o p 2 , . . . , o p n ] T = [op_1, op_2, ..., op_n] T=[op1,op2,...,opn]
其中每个 o p i op_i opi 都有对应的补偿操作 o p i − 1 op_i^{-1} opi−1。
我们用 T k T_k Tk 表示事务的前 k k k 个操作: T k = [ o p 1 , . . . , o p k ] T_k = [op_1, ..., op_k] Tk=[op1,...,opk]。
事务的执行语义
我们用状态序列来描述事务的执行过程。设事务的初始状态为 s 0 ∈ S s_0 \in S s0∈S,事务的执行可以分为两种情况:成功执行和失败回滚。
1. 成功执行
如果事务的所有操作都执行成功,则存在状态序列:
s 0 → o p 1 s 1 → o p 2 s 2 → . . . → o p n s n s_0 \xrightarrow{op_1} s_1 \xrightarrow{op_2} s_2 \xrightarrow{...} \xrightarrow{op_n} s_n s0op1s1op2s2...opnsn
其中:
- s i − 1 ∈ dom ( o p i ) s_{i-1} \in \text{dom}(op_i) si−1∈dom(opi),即第 i i i 个操作可以在状态 s i − 1 s_{i-1} si−1 下执行;
- o p i ( s i − 1 ) = ( s i , r i ) op_i(s_{i-1}) = (s_i, r_i) opi(si−1)=(si,ri),即第 i i i 个操作执行成功,状态转换到 s i s_i si,结果为 r i r_i ri。
此时,我们称事务 T T T 在初始状态 s 0 s_0 s0 下提交,最终状态为 s n s_n sn。
2. 失败回滚
如果事务的第 k k k 个操作执行失败( k ≤ n k \leq n k≤n),则前 k − 1 k-1 k−1 个操作执行成功,第 k k k 个操作执行失败:
s 0 → o p 1 s 1 → . . . → o p k − 1 s k − 1 → o p k ↑ s_0 \xrightarrow{op_1} s_1 \xrightarrow{...} \xrightarrow{op_{k-1}} s_{k-1} \xrightarrow{op_k} \uparrow s0op1s1...opk−1sk−1opk↑
此时,我们需要逆序执行前 k − 1 k-1 k−1 个操作的补偿操作:
s k − 1 → o p k − 1 − 1 s k − 2 ′ → . . . → o p 1 − 1 s 0 ′ s_{k-1} \xrightarrow{op_{k-1}^{-1}} s_{k-2}' \xrightarrow{...} \xrightarrow{op_1^{-1}} s_0' sk−1opk−1−1sk−2′...op1−1s0′
理想情况下, s 0 ′ = s 0 s_0' = s_0 s0′=s0,即事务回滚到初始状态。
如果所有补偿操作都执行成功,我们称事务 T T T 在初始状态 s 0 s_0 s0 下回滚成功;如果某个补偿操作执行失败,我们称事务 T T T 回滚失败,需要人工介入。
失败模型
在实际场景中,操作可能会因为各种原因失败,我们需要用失败模型来描述这些失败情况。常见的失败模型有:
1. 崩溃失败(Crash Failure)
操作在执行过程中崩溃,系统状态保持不变,操作没有产生任何副作用。这是最简单的失败模型,我们可以通过重新执行操作来解决。
2. 遗漏失败(Omission Failure)
操作执行了,但结果没有返回给调用者(例如:网络超时)。此时,调用者无法确定操作是否成功,需要通过“查询操作状态”或“幂等重试”来解决。
3. 时序失败(Timing Failure)
操作执行了,但返回结果的时间超出了预期。这通常可以通过设置超时时间来解决。
4. 拜占庭失败(Byzantine Failure)
操作执行了,但返回了错误的结果,或者产生了预期之外的副作用。这是最复杂的失败模型,在 AI Agent 场景中,LLM 的推理错误可能导致拜占庭失败。我们可以通过“多 Agent 投票”、“结果验证”来解决。
原子性保证的正确性条件
现在,我们可以用形式化语言描述 AI Agent 事务原子性保证的正确性条件:
设 T = [ o p 1 , . . . , o p n ] T = [op_1, ..., op_n] T=[op1,...,opn] 是一个事务, s 0 s_0 s0 是初始状态, A \mathcal{A} A 是我们的原子性保证机制。正确性条件包括以下三个部分:
1. 终止性(Termination)
机制 A \mathcal{A} A 必须保证事务最终会结束,即:要么提交,要么回滚成功,要么回滚失败(人工介入)。不会出现“事务一直挂起”的情况。
2. 原子性(Atomicity)
机制 A \mathcal{A} A 必须保证:
- 如果事务提交,则所有操作都执行成功,系统状态转换到 s n s_n sn;
- 如果事务回滚成功,则系统状态回到 s 0 s_0 s0(或逻辑上等价的状态);
- 不会出现“部分操作执行成功,部分操作执行失败,且没有回滚”的情况。
3. 可靠性(Reliability)
机制 A \mathcal{A} A 必须保证:即使在崩溃失败、遗漏失败等情况下,正确性条件仍然成立。
算法流程图与源代码实现
在了解核心机制和数学模型后,我们可以设计并实现一个简单的 AI Agent Harness,用状态机和 Saga 模式保证操作的原子性。本节将首先介绍算法流程图,然后给出 Python 源代码实现。
算法流程图
我们将实现一个编排式 Saga 协调器,其核心算法流程分为两个部分:事务执行流程和事务回滚流程。
1. 事务执行流程
事务执行流程的输入是“操作列表”和“补偿操作列表”,输出是“事务提交”或“触发回滚”。我们用 mermaid 流程图描述:
2. 事务回滚流程
事务回滚流程的输入是“事务ID”,输出是“回滚成功”或“回滚失败”。我们用 mermaid 流程图描述:
Python 源代码实现
现在,我们用 Python 实现上述算法。我们将使用以下依赖库:
uuid:生成唯一的事务ID和幂等键;dataclasses:定义数据类;typing:类型注解;redis:存储状态和幂等键(可选,这里用字典模拟);logging:记录日志。
首先,我们需要安装依赖库(如果用真实的 Redis):
pip install redis
步骤1:定义数据类和枚举
我们首先定义事务状态的枚举、操作基类、补偿操作基类和事务数据类:
import uuid
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Callable
from enum import Enum
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("AgentHarness")
# 事务状态枚举
class TransactionStatus(Enum):
INIT = "INIT"
EXECUTING_OP = "EXECUTING_OP"
OP_SUCCESS = "OP_SUCCESS"
OP_FAILED = "OP_FAILED"
COMPENSATING_OP = "COMPENSATING_OP"
COMPENSATION_SUCCESS = "COMPENSATION_SUCCESS"
COMPENSATION_FAILED = "COMPENSATION_FAILED"
COMMITTED = "COMMITTED"
# 操作结果数据类
@dataclass
class OperationResult:
success: bool
result: Optional[Any] = None
error: Optional[str] = None
# 操作基类
class Operation:
def __init__(self, name: str):
self.name = name
def execute(self, context: Dict[str, Any]) -> OperationResult:
"""执行操作,子类必须实现这个方法"""
raise NotImplementedError("Subclasses must implement execute()")
def get_compensation(self) -> Optional['CompensationOperation']:
"""获取补偿操作,子类可以覆盖这个方法"""
return None
# 补偿操作基类
class CompensationOperation:
def __init__(self, name: str):
self.name = name
def execute(self, context: Dict[str, Any], op_result: OperationResult) -> OperationResult:
"""执行补偿操作,子类必须实现这个方法"""
raise NotImplementedError("Subclasses must implement execute()")
# 事务数据类
@dataclass
class Transaction:
tx_id: str = field(default_factory=lambda: str(uuid.uuid4()))
status: TransactionStatus = TransactionStatus.INIT
current_op_index: int = 0 # 当前执行的操作索引(从0开始)
succeeded_ops: List[int] = field(default_factory=list) # 已成功的操作索引列表
current_compensation_index: int = -1 # 当前执行的补偿操作索引(从succeeded_ops的最后一个开始)
context: Dict[str, Any] = field(default_factory=dict) # 事务上下文,用于存储操作参数和结果
error: Optional[str] = None # 错误信息
operations: List[Operation] = field(default_factory=list) # 操作列表
步骤2:实现状态存储和幂等存储
接下来,我们实现状态存储和幂等存储。这里为了简单,我们用字典模拟 Redis 存储;在实际项目中,可以用真实的 Redis 或数据库。
class StateStore:
"""状态存储类,用于持久化事务状态"""
def __init__(self):
self._store: Dict[str, Transaction] = {}
def save(self, tx: Transaction) -> None:
"""保存事务状态"""
self._store[tx.tx_id] = tx
logger.info(f"Saved transaction {tx.tx_id} with status {tx.status}")
def get(self, tx_id: str) -> Optional[Transaction]:
"""获取事务状态"""
return self._store.get(tx_id)
def delete(self, tx_id: str) -> None:
"""删除事务状态(可选,用于清理)"""
if tx_id in self._store:
del self._store[tx_id]
logger.info(f"Deleted transaction {tx_id}")
class IdempotencyStore:
"""幂等存储类,用于存储操作和补偿操作的执行结果"""
def __init__(self):
self._store: Dict[str, OperationResult] = {}
def save(self, idempotency_key: str, result: OperationResult) -> None:
"""保存幂等键和结果"""
self._store[idempotency_key] = result
logger.info(f"Saved idempotency key {idempotency_key} with result {result}")
def get(self, idempotency_key: str) -> Optional[OperationResult]:
"""获取幂等键对应的结果"""
return self._store.get(idempotency_key)
步骤3:实现 Saga 协调器
现在,我们实现核心的 Saga 协调器,它包含“执行事务”和“回滚事务”两个方法。
class SagaCoordinator:
"""Saga 协调器,负责执行事务和回滚事务"""
def __init__(self, state_store: StateStore, idempotency_store: IdempotencyStore):
self.state_store = state_store
self.idempotency_store = idempotency_store
def _generate_idempot
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)