Harness Engineering:分布式事务管控
Harness Engineering:分布式事务管控
关键词: 分布式系统、事务管理、一致性模型、CAP理论、两阶段提交、三阶段提交、Saga模式、TCC模式
摘要
在现代分布式系统架构中,事务管控是确保数据一致性和系统可靠性的核心挑战之一。随着微服务架构的普及和系统规模的不断扩大,传统的单机事务模型已无法满足复杂业务场景的需求。本文将深入探讨分布式事务管控的理论基础、实现机制和实际应用,从第一性原理出发,系统分析各类分布式事务解决方案的优劣,并提供生产环境中的最佳实践指南。
1. 概念基础
1.1 领域背景化
在计算机科学的发展历程中,事务管理始终是数据处理系统的核心组成部分。从早期的关系型数据库管理系统(RDBMS)到如今的分布式系统架构,事务概念不断演化,但其核心目标始终如一:确保数据操作的可靠性和一致性。
随着互联网应用的爆发式增长,系统面临的并发量和数据量呈指数级上升。单体应用架构逐渐暴露出扩展性瓶颈,微服务架构应运而生。在微服务架构中,一个业务流程往往需要跨多个服务、多个数据库进行操作,这就对事务管理提出了全新的挑战。
1.2 历史轨迹
事务管理的发展历程可以分为三个主要阶段:
-
单机事务阶段(1970s-1990s):
- 1970年代,IBM研究人员在System R项目中提出了事务的ACID特性
- 1980年代,关系型数据库开始广泛支持事务机制
- 锁机制、日志技术和恢复算法逐渐成熟
-
分布式事务萌芽阶段(1990s-2010s):
- 1990年代,X/Open组织提出了XA规范,定义了分布式事务的标准接口
- 两阶段提交(2PC)协议成为分布式事务的经典解决方案
- CORBA、DCOM等中间件技术开始提供分布式事务支持
-
云原生分布式事务阶段(2010s至今):
- 微服务架构普及,柔性事务概念兴起
- Saga模式、TCC模式等最终一致性方案广泛应用
- 各大云厂商推出分布式事务服务(如AWS X-Ray、阿里云GTS)
- 服务网格技术(如Istio)开始集成分布式事务能力
1.3 问题空间定义
在分布式系统中,事务管控面临着一系列独特的挑战,这些挑战构成了我们需要解决的问题空间:
-
网络不确定性:
- 网络延迟、分区和消息丢失是分布式系统的常态
- 无法通过简单的超时机制区分"服务处理慢"和"服务已故障"
-
节点故障:
- 分布式系统中节点故障是不可避免的
- 需要处理部分节点失败而其他节点正常的情况
-
数据一致性与可用性的权衡:
- CAP理论揭示了在分区容错的前提下,一致性和可用性无法同时满足
- 需要根据业务场景在两者之间做出合理权衡
-
性能开销:
- 传统的强一致性分布式事务协议往往带来显著的性能开销
- 在高并发场景下,需要在一致性保证和系统性能之间找到平衡点
-
异构系统集成:
- 现代分布式系统往往包含多种类型的数据存储(关系型数据库、NoSQL数据库、消息队列等)
- 需要在这些异构系统之间协调事务操作
1.4 术语精确性
在深入探讨分布式事务管控之前,我们需要明确一些核心术语的定义:
-
事务(Transaction):
- 一个逻辑工作单元,由一组操作组成,这些操作要么全部成功执行,要么全部不执行
- 通常具有ACID特性(原子性、一致性、隔离性、持久性)
-
分布式事务(Distributed Transaction):
- 涉及多个独立节点或资源的事务
- 需要在分布式环境下保证事务的ACID特性(或其变体)
-
资源管理器(Resource Manager, RM):
- 管理事务性资源的系统组件(如数据库、消息队列)
- 负责资源的锁定、提交和回滚
-
事务管理器(Transaction Manager, TM):
- 协调分布式事务执行的核心组件
- 负责事务的启动、提交、回滚和故障恢复
-
一致性模型(Consistency Model):
- 定义了系统中数据一致性的保证级别
- 包括强一致性、最终一致性等多种模型
-
柔性事务(Flexible Transaction):
- 放宽了传统ACID特性要求的事务模型
- 通常追求最终一致性而非实时一致性
2. 理论框架
2.1 第一性原理分析
要深入理解分布式事务管控,我们需要从最基本的原理出发,构建完整的理论体系。让我们从分布式系统的核心挑战开始:
2.1.1 分布式系统的基本困境
分布式系统的本质是一组通过网络连接的独立计算机,它们在用户看来是一个单一的连贯系统。这种设计带来了可扩展性和容错性的优势,但也引入了基本的困境:
- 不确定性:在分布式系统中,我们无法确定网络延迟、节点故障或消息丢失的发生时间和方式
- 部分失败:系统可能出现部分组件失败而其他组件正常工作的情况
- 并发控制:多个节点同时访问共享资源时需要协调,以避免数据不一致
这些困境是分布式事务管控需要解决的根本问题,也是所有分布式事务理论和协议的出发点。
2.1.2 事务的本质:状态机视角
从状态机的角度来看,事务可以被理解为将系统从一个一致状态转换到另一个一致状态的操作序列。这个视角帮助我们更清晰地理解事务的核心要求:
- 原子性:状态转换要么完全发生,要么完全不发生
- 一致性:转换前后系统都处于有效状态
- 隔离性:并发事务的执行不会相互干扰
- 持久性:一旦状态转换完成,其效果是永久的
在分布式环境中,状态被分散在多个节点上,这使得状态转换的协调变得异常复杂。我们需要设计机制来确保即使在部分节点失败的情况下,整个系统的状态转换仍然能够满足事务的基本要求。
2.2 数学形式化
为了更精确地描述分布式事务,我们引入一些数学形式化表达。
2.2.1 分布式系统模型
我们首先定义一个分布式系统的数学模型:
D=(N,C,T,S)\mathcal{D} = (\mathcal{N}, \mathcal{C}, \mathcal{T}, \mathcal{S})D=(N,C,T,S)
其中:
- N={n1,n2,…,nk}\mathcal{N} = \{n_1, n_2, \ldots, n_k\}N={n1,n2,…,nk} 是系统中的节点集合
- C⊆N×N\mathcal{C} \subseteq \mathcal{N} \times \mathcal{N}C⊆N×N 是节点间的通信通道集合
- T\mathcal{T}T 是全局时间域,我们假设存在一个全局时钟(尽管在实际系统中这很难实现)
- S=∏n∈NSn\mathcal{S} = \prod_{n \in \mathcal{N}} \mathcal{S}_nS=∏n∈NSn 是全局状态空间,其中 Sn\mathcal{S}_nSn 是节点 nnn 的局部状态空间
节点 nnn 的状态转换函数定义为:
δn:Sn×M→Sn×M∗\delta_n: \mathcal{S}_n \times \mathcal{M} \rightarrow \mathcal{S}_n \times \mathcal{M}^*δn:Sn×M→Sn×M∗
其中 M\mathcal{M}M 是消息集合,M∗\mathcal{M}^*M∗ 是有限消息序列集合。这个函数表示节点在某一状态下接收一条消息后,会转换到新的状态并可能发送一些消息。
2.2.2 事务的形式化定义
在上述分布式系统模型基础上,我们可以形式化定义分布式事务:
X=(O,≺,R,C)\mathcal{X} = (\mathcal{O}, \prec, \mathcal{R}, \mathcal{C})X=(O,≺,R,C)
其中:
- O={o1,o2,…,om}\mathcal{O} = \{o_1, o_2, \ldots, o_m\}O={o1,o2,…,om} 是事务操作集合,每个操作 oio_ioi 在特定节点上执行
- ≺⊆O×O\prec \subseteq \mathcal{O} \times \mathcal{O}≺⊆O×O 是操作间的偏序关系,表示操作的执行顺序
- R\mathcal{R}R 是事务涉及的资源集合
- C:S→{true,false}\mathcal{C}: \mathcal{S} \rightarrow \{\text{true}, \text{false}\}C:S→{true,false} 是一致性谓词,定义了系统的有效状态
事务的执行是一个从初始状态 s0s_0s0 到最终状态 sfs_fsf 的状态转换序列,满足:
- 所有操作按照偏序关系 ≺\prec≺ 执行
- 如果事务成功提交,则 C(sf)=true\mathcal{C}(s_f) = \text{true}C(sf)=true
- 如果事务中止,则系统状态等价于 s0s_0s0
2.2.3 一致性的层级模型
在分布式系统中,一致性是一个多层面的概念,我们可以从不同角度对其进行分类和建模。以下是一个一致性层级模型:
-
线性一致性(Linearizability):
最强的一致性模型,要求每个操作看起来是原子执行的,并且操作的全局顺序与实时顺序一致。形式化定义如下:对于任意历史记录 HHH,存在一个合法的顺序历史记录 SSS,满足:
- SSS 是 HHH 的一个线性化(linearization)
- 对于任意两个操作 o1o_1o1 和 o2o_2o2,如果 o1o_1o1 在 HHH 中实时先于 o2o_2o2,则在 SSS 中 o1o_1o1 也先于 o2o_2o2
-
顺序一致性(Sequential Consistency):
要求所有进程看到的操作顺序是一致的,但不需要与实时顺序一致:对于任意历史记录 HHH,存在一个合法的顺序历史记录 SSS,满足:
- SSS 是 HHH 的一个串行化
- 对于每个进程 ppp,SSS 中 ppp 的操作顺序与 HHH 中 ppp 的操作顺序一致
-
因果一致性(Causal Consistency):
只要求有因果关系的操作顺序在所有进程看来是一致的:我们定义操作间的因果关系 →\rightarrow→ 为满足以下条件的最小关系:
- 如果 o1o_1o1 和 o2o_2o2 是同一进程的操作,且 o1o_1o1 在 o2o_2o2 之前执行,则 o1→o2o_1 \rightarrow o_2o1→o2
- 如果 o1o_1o1 是发送消息的操作,o2o_2o2 是接收该消息的操作,则 o1→o2o_1 \rightarrow o_2o1→o2
- 如果 o1→o2o_1 \rightarrow o_2o1→o2 且 o2→o3o_2 \rightarrow o_3o2→o3,则 o1→o3o_1 \rightarrow o_3o1→o3
因果一致性要求对于任意两个有因果关系的操作 o1→o2o_1 \rightarrow o_2o1→o2,所有进程都看到 o1o_1o1 在 o2o_2o2 之前执行。
-
最终一致性(Eventual Consistency):
最弱的一致性模型之一,只保证如果没有新的更新操作,最终所有访问都会返回最后更新的值:设 UUU 是系统中的更新操作集合,tut_utu 是更新操作 uuu 的执行时间,TTT 是一个全局时间点。最终一致性保证存在一个时间点 TfT_fTf,对于所有 T>TfT > T_fT>Tf 和任意读操作 rrr 在时间 TTT 执行,rrr 返回的值反映了所有满足 tu<Tft_u < T_ftu<Tf 的更新操作 uuu 的效果。
这些一致性模型构成了一个从强到弱的层级,每一层都为系统设计者提供了不同的权衡点。在实际应用中,选择合适的一致性模型是分布式事务管控的关键决策之一。
2.3 理论局限性
没有任何一种分布式事务解决方案能够完美解决所有问题,理解这些理论局限性对于做出合理的技术选型至关重要。
2.3.1 CAP定理
CAP定理是分布式系统领域最基本的理论之一,由Eric Brewer在2000年的PODC会议上提出,后来由Gilbert和Lynch在2002年给出了形式化证明。CAP定理指出,在一个异步网络模型中,一个分布式系统无法同时满足以下三个特性:
- 一致性(Consistency):所有节点在同一时刻看到的数据是相同的
- 可用性(Availability):每个请求都能收到一个非错误的响应
- 分区容错性(Partition Tolerance):系统在任意网络分区的情况下仍能继续运行
CAP定理的形式化证明基于以下假设:
- 异步网络模型:没有全局时钟,节点只能通过消息传递进行通信
- 消息可能丢失、延迟或乱序
- 节点可能在任意时间失败
在实际系统中,网络分区是不可避免的,因此我们通常需要在一致性和可用性之间做出权衡。这一定理直接影响了分布式事务解决方案的设计空间:强一致性的事务方案往往会牺牲部分可用性,而高可用性的方案则通常只能提供最终一致性保证。
2.3.2 FLP不可能性结果
FLP不可能性结果是分布式系统领域的另一个基本理论,由Fischer、Lynch和Paterson在1985年提出。该结果指出,在一个异步网络模型中,即使只有一个节点可能崩溃,也不存在一个确定性算法能够解决一致性问题。
FLP结果的核心证明思路是构造一个"二价"(bivalent)初始配置,即从该配置出发,系统可能达成两个不同的决策值。然后证明存在无限长的执行路径,系统永远无法达成一致决策。
这一结果对分布式事务管控有重要影响:它告诉我们,在实际系统中,我们必须依赖一些额外的假设(如超时机制、故障检测器等)来解决一致性问题,而这些假设在某些情况下可能不成立。因此,任何分布式事务解决方案都无法在所有情况下保证正确性,我们需要仔细设计故障检测和恢复机制。
2.3.3 拜占庭将军问题
拜占庭将军问题是由Lamport、Shostak和Pease在1982年提出的一个思想实验,用于描述分布式系统中的故障节点可能发送错误或恶意信息的情况。问题描述如下:
几个师的拜占庭军队包围了一座城市,每个师由一个将军指挥。将军们只能通过信使进行通信。他们需要达成一个统一的作战计划(进攻或撤退)。然而,可能有一些将军是叛徒,他们会试图阻止忠诚的将军们达成一致。
问题的目标是设计一个算法,使得:
- 所有忠诚的将军达成相同的决策
- 决策必须基于忠诚将军的合理判断
Lamport等人证明,在存在 mmm 个叛徒的情况下,至少需要 3m+13m+13m+1 个将军才能解决拜占庭将军问题。他们还提出了几种解决算法,包括口头消息算法和书面消息算法。
在分布式事务管控的场景中,拜占庭故障模型通常过于严格,我们通常假设故障节点要么停止工作(崩溃故障模型),要么可能发送任意错误信息(遗漏-执行故障模型)。但在一些对安全性要求极高的场景(如区块链、金融系统),拜占庭容错技术仍然具有重要应用价值。
2.4 竞争范式分析
在分布式事务管控领域,存在多种竞争的解决方案范式,每种范式都有其优缺点和适用场景。让我们对这些范式进行系统分析。
2.4.1 强一致性范式
强一致性范式追求在任何时刻所有节点上的数据都是一致的,通常通过严格的同步机制来实现。主要代表包括:
-
两阶段提交(2PC):
- 优点:概念简单,能保证强一致性
- 缺点:同步阻塞、单点故障、数据不一致风险
-
三阶段提交(3PC):
- 优点:解决了2PC的单点故障问题,减少了阻塞时间
- 缺点:仍然可能阻塞,实现复杂,不能完全解决数据不一致问题
-
Paxos/Raft算法:
- 优点:能在异步网络中保证一致性,具有较好的容错性
- 缺点:理解和实现复杂,在有大量冲突的场景下性能下降
强一致性范式适用于对数据一致性要求极高的场景,如金融交易、核心库存管理等。
2.4.2 最终一致性范式
最终一致性范式放宽了对实时一致性的要求,只保证在没有新更新的情况下,系统最终会达到一致状态。主要代表包括:
-
Saga模式:
- 优点:无锁、高可用、高吞吐
- 缺点:实现复杂,需要处理补偿事务,隔离性较弱
-
TCC模式:
- 优点:性能较好,能够保证数据最终一致性
- 缺点:侵入性强,开发成本高,需要处理各种异常情况
-
本地消息表:
- 优点:实现简单,可靠性高
- 缺点:与业务耦合,需要额外的消息处理逻辑
最终一致性范式适用于对可用性和性能要求较高,但对实时一致性要求不那么严格的场景,如电商下单、社交网络更新等。
2.4.3 混合范式
混合范式试图结合强一致性和最终一致性的优点,根据不同的业务需求选择合适的一致性模型。主要代表包括:
-
Percolator模型:
- Google提出的分布式事务解决方案,基于两阶段锁和多版本并发控制
- 优点:提供了类似单机数据库的事务语义,同时保持了良好的扩展性
- 缺点:实现复杂,依赖特定的底层存储系统
-
Spanner的TrueTime API:
- Google Spanner使用原子钟和GPS接收器提供全局一致的时间API
- 优点:能够在全球范围内提供强一致性和外部一致性
- 缺点:依赖特殊硬件,实现和部署成本高
混合范式通常适用于对一致性和性能都有较高要求的大规模系统,但实现和运维成本也相应较高。
3. 架构设计
3.1 系统分解
分布式事务管控系统可以从多个维度进行分解,每种分解方式都反映了系统的不同视角。
3.1.1 功能维度分解
从功能角度看,一个完整的分布式事务管控系统通常包含以下核心组件:
-
事务发起者(Transaction Originator):
- 负责启动分布式事务的组件
- 定义事务边界和事务操作序列
- 处理事务的最终结果(提交或回滚)
-
事务参与者(Transaction Participant):
- 执行事务中具体业务逻辑的组件
- 通常与特定的资源管理器相关联
- 实现事务的分支操作(提交、回滚等)
-
事务管理器(Transaction Manager):
- 分布式事务的核心协调组件
- 负责事务的生命周期管理(启动、提交、回滚)
- 处理事务协调过程中的各种异常情况
-
事务日志(Transaction Log):
- 持久化存储事务状态信息的组件
- 用于故障恢复和事务状态追踪
- 通常实现为Write-Ahead Logging(WAL)机制
-
资源管理器(Resource Manager):
- 管理事务性资源的组件(如数据库、消息队列)
- 提供资源的锁定、提交和回滚能力
- 通常实现XA接口或类似的事务接口
3.1.2 部署维度分解
从部署角度看,分布式事务管控系统可以有多种部署架构:
-
集中式架构:
- 事务管理器部署在单一节点上
- 优点:实现简单,协调效率高
- 缺点:单点故障风险,可扩展性受限
-
分布式架构:
- 事务管理器分布在多个节点上
- 优点:高可用,可扩展性好
- 缺点:实现复杂,协调成本高
-
嵌入式架构:
- 事务管控功能嵌入到应用程序或服务框架中
- 优点:部署简单,性能开销小
- 缺点:与应用耦合,难以统一管理
3.2 组件交互模型
不同的分布式事务解决方案有不同的组件交互模型,下面我们介绍几种常见的交互模型。
3.2.1 两阶段提交(2PC)交互模型
两阶段提交的交互过程分为两个阶段:
-
准备阶段:
- 事务管理器向所有资源管理器发送准备请求
- 每个资源管理器执行事务操作,但不提交
- 资源管理器将执行结果(同意或中止)返回给事务管理器
-
提交阶段:
- 如果所有资源管理器都同意提交,事务管理器发送提交请求
- 否则,事务管理器发送回滚请求
- 资源管理器执行相应操作并返回结果
3.2.2 Saga模式交互模型
Saga模式的交互过程基于一系列本地事务和补偿事务:
-
正常执行流程:
- Saga协调器按顺序调用各个服务的本地事务
- 每个本地事务执行完成后,协调器调用下一个服务
- 所有本地事务执行成功后,Saga事务完成
-
异常处理流程:
- 如果某个本地事务执行失败,协调器按相反顺序调用已执行事务的补偿事务
- 补偿事务执行完成后,Saga事务中止
3.2.3 TCC模式交互模型
TCC模式的交互过程分为三个阶段:
-
Try阶段:
- 协调器调用所有服务的Try接口
- Try接口完成业务检查和资源预留
- 所有Try接口执行成功后,进入Confirm阶段
-
Confirm阶段:
- 协调器调用所有服务的Confirm接口
- Confirm接口执行实际的业务操作
- 所有Confirm接口执行成功后,TCC事务完成
-
Cancel阶段(异常情况):
- 如果任何Try接口执行失败,协调器调用所有服务的Cancel接口
- Cancel接口释放预留的资源
- 所有Cancel接口执行成功后,TCC事务中止
3.3 设计模式应用
在分布式事务管控系统的设计中,我们可以应用多种经典设计模式来提高系统的可扩展性、可维护性和可靠性。
3.3.1 命令模式(Command Pattern)
命令模式将请求封装为对象,从而允许我们使用不同的请求对客户端进行参数化。在分布式事务管控中,我们可以使用命令模式来封装事务操作:
命令模式的应用使得我们可以轻松实现事务的回滚和重试机制,同时也便于记录事务操作日志。
3.3.2 状态模式(State Pattern)
状态模式允许对象在内部状态改变时改变其行为。在分布式事务管控中,事务有多个状态(如活动、准备、提交、回滚等),我们可以使用状态模式来管理事务状态转换:
状态模式的应用使得事务状态转换逻辑清晰可见,便于实现和维护复杂的状态转换规则。
3.3.3 观察者模式(Observer Pattern)
观察者模式定义了对象之间的一对多依赖关系,当一个对象状态改变时,所有依赖它的对象都会得到通知并自动更新。在分布式事务管控中,我们可以使用观察者模式来实现事务状态变化的通知机制:
观察者模式的应用使得我们可以轻松添加新的事务状态监听器,而无需修改事务核心逻辑。
4. 实现机制
4.1 算法复杂度分析
在评估分布式事务解决方案时,算法复杂度是一个重要的考虑因素。我们将从时间复杂度、消息复杂度和空间复杂度三个维度分析常见分布式事务算法。
4.1.1 两阶段提交(2PC)复杂度分析
-
时间复杂度:
- 准备阶段:O(1)O(1)O(1) 轮消息交换(假设网络延迟为主要时间因素)
- 提交阶段:O(1)O(1)O(1) 轮消息交换
- 总时间复杂度:O(1)O(1)O(1) 轮消息交换
-
消息复杂度:
- 准备阶段:事务管理器向 nnn 个资源管理器发送 nnn 条准备消息,收到 nnn 条回复
- 提交阶段:事务管理器向 nnn 个资源管理器发送 nnn 条提交/回滚消息,收到 nnn 条回复
- 总消息复杂度:O(n)O(n)O(n)
-
空间复杂度:
- 事务管理器需要存储 O(n)O(n)O(n) 个资源管理器的状态信息
- 每个资源管理器需要存储事务状态信息
- 总空间复杂度:O(n)O(n)O(n)
4.1.2 Paxos算法复杂度分析
Paxos算法是一种经典的分布式一致性算法,我们分析其基本版本的复杂度:
-
时间复杂度:
- 最好情况:O(1)O(1)O(1) 轮消息交换(如果提案者能够直接获得多数接受者的支持)
- 最坏情况:无界(由于活锁问题)
- 实际情况:通常在 O(1)O(1)O(1) 到 O(logn)O(\log n)O(logn) 轮之间
-
消息复杂度:
- 准备阶段:提案者向 nnn 个接受者发送准备消息,收到 nnn 条回复
- 接受阶段:提案者向 nnn 个接受者发送接受消息,收到 nnn 条回复
- 总消息复杂度:O(n)O(n)O(n)(单次成功的提案)
-
空间复杂度:
- 每个接受者需要存储已接受的提案信息
- 总空间复杂度:O(n)O(n)O(n)
4.1.3 Saga模式复杂度分析
Saga模式是一种基于补偿事务的最终一致性方案:
-
时间复杂度:
- 正常情况:O(n)O(n)O(n) 轮消息交换(串行执行 nnn 个本地事务)
- 异常情况:O(n)O(n)O(n) 轮消息交换(执行 kkk 个本地事务和 kkk 个补偿事务,k≤nk \leq nk≤n)
- 如果可以并行执行部分本地事务,时间复杂度可以优化到 O(1)O(1)O(1) 或 O(logn)O(\log n)O(logn)
-
消息复杂度:
- 正常情况:O(n)O(n)O(n) 条消息
- 异常情况:O(n)O(n)O(n) 条消息
- 总消息复杂度:O(n)O(n)O(n)
-
空间复杂度:
- Saga协调器需要存储每个本地事务的执行状态和补偿信息
- 总空间复杂度:O(n)O(n)O(n)
4.2 优化代码实现
为了更具体地理解分布式事务的实现机制,我们将提供几个核心组件的Python实现代码。
4.2.1 两阶段提交(2PC)实现
import threading
import time
from enum import Enum
from typing import List, Dict, Any, Optional
class TransactionState(Enum):
"""事务状态枚举"""
ACTIVE = "active"
PREPARING = "preparing"
PREPARED = "prepared"
COMMITTING = "committing"
COMMITTED = "committed"
ABORTING = "aborting"
ABORTED = "aborted"
class Vote(Enum):
"""资源管理器投票枚举"""
YES = "yes"
NO = "no"
class ResourceManager:
"""资源管理器模拟实现"""
def __init__(self, name: str):
self.name = name
self.data: Dict[str, Any] = {}
self.prepared_data: Dict[str, Any] = {}
self.lock = threading.Lock()
def prepare(self, transaction_id: str, operations: List[Dict[str, Any]]) -> Vote:
"""
准备阶段:执行事务操作但不提交
返回YES表示可以提交,NO表示需要中止
"""
with self.lock:
print(f"[{self.name}] 准备事务 {transaction_id}")
try:
# 保存当前状态用于回滚
self.prepared_data = self.data.copy()
# 模拟执行操作
for op in operations:
if op["type"] == "set":
self.data[op["key"]] = op["value"]
elif op["type"] == "delete":
if op["key"] in self.data:
del self.data[op["key"]]
print(f"[{self.name}] 事务 {transaction_id} 准备成功")
return Vote.YES
except Exception as e:
print(f"[{self.name}] 事务 {transaction_id} 准备失败: {e}")
# 恢复到准备前的状态
self.data = self.prepared_data.copy()
return Vote.NO
def commit(self, transaction_id: str) -> bool:
"""提交阶段:正式提交事务"""
with self.lock:
print(f"[{self.name}] 提交事务 {transaction_id}")
try:
# 清除准备数据,表示事务已提交
self.prepared_data.clear()
print(f"[{self.name}] 事务 {transaction_id} 提交成功")
return True
except Exception as e:
print(f"[{self.name}] 事务 {transaction_id} 提交失败: {e}")
return False
def abort(self, transaction_id: str) -> bool:
"""回滚阶段:回滚事务"""
with self.lock:
print(f"[{self.name}] 回滚事务 {transaction_id}")
try:
# 恢复到准备前的状态
self.data = self.prepared_data.copy()
self.prepared_data.clear()
print(f"[{self.name}] 事务 {transaction_id} 回滚成功")
return True
except Exception as e:
print(f"[{self.name}] 事务 {transaction_id} 回滚失败: {e}")
return False
class TransactionManager:
"""事务管理器实现"""
def __init__(self):
self.resource_managers: List[ResourceManager] = []
self.transaction_states: Dict[str, TransactionState] = {}
self.transaction_operations: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
self.lock = threading.Lock()
self.transaction_counter = 0
def register_resource_manager(self, rm: ResourceManager):
"""注册资源管理器"""
with self.lock:
self.resource_managers.append(rm)
print(f"[事务管理器] 注册资源管理器: {rm.name}")
def begin_transaction(self) -> str:
"""开始一个新事务"""
with self.lock:
self.transaction_counter += 1
transaction_id = f"txn_{self.transaction_counter}"
self.transaction_states[transaction_id] = TransactionState.ACTIVE
self.transaction_operations[transaction_id] = {}
print(f"[事务管理器] 开始事务: {transaction_id}")
return transaction_id
def add_operations(self, transaction_id: str, rm_name: str, operations: List[Dict[str, Any]]):
"""向事务添加操作"""
with self.lock:
if transaction_id not in self.transaction_states:
raise ValueError(f"事务 {transaction_id} 不存在")
if self.transaction_states[transaction_id] != TransactionState.ACTIVE:
raise ValueError(f"事务 {transaction_id} 已不是活动状态")
self.transaction_operations[transaction_id][rm_name] = operations
print(f"[事务管理器] 为事务 {transaction_id} 添加操作到 {rm_name}")
def commit_transaction(self, transaction_id: str) -> bool:
"""提交事务"""
with self.lock:
if transaction_id not in self.transaction_states:
raise ValueError(f"事务 {transaction_id} 不存在")
if self.transaction_states[transaction_id] != TransactionState.ACTIVE:
raise ValueError(f"事务 {transaction_id} 已不是活动状态")
# 第一阶段:准备
self.transaction_states[transaction_id] = TransactionState.PREPARING
print(f"[事务管理器] 开始第一阶段:准备事务 {transaction_id}")
votes: Dict[str, Vote] = {}
for rm in self.resource_managers:
if rm.name in self.transaction_operations[transaction_id]:
operations = self.transaction_operations[transaction_id][rm.name]
vote = rm.prepare(transaction_id, operations)
votes[rm.name] = vote
# 检查所有投票
all_yes = all(vote == Vote.YES for vote in votes.values())
if all_yes:
# 第二阶段:提交
self.transaction_states[transaction_id] = TransactionState.COMMITTING
print(f"[事务管理器] 开始第二阶段:提交事务 {transaction_id}")
all_committed = True
for rm in self.resource_managers:
if rm.name in self.transaction_operations[transaction_id]:
committed = rm.commit(transaction_id)
if not committed:
all_committed = False
if all_committed:
self.transaction_states[transaction_id] = TransactionState.COMMITTED
print(f"[事务管理器] 事务 {transaction_id} 提交成功")
return True
else:
# 这里简化处理,实际系统中需要更复杂的恢复逻辑
self.transaction_states[transaction_id] = TransactionState.COMMITTED
print(f"[事务管理器] 事务 {transaction_id} 部分提交成功")
return True
else:
# 第二阶段:回滚
self.transaction_states[transaction_id] = TransactionState.ABORTING
print(f"[事务管理器] 开始第二阶段:回滚事务 {transaction_id}")
for rm in self.resource_managers:
if rm.name in self.transaction_operations[transaction_id]:
rm.abort(transaction_id)
self.transaction_states[transaction_id] = TransactionState.ABORTED
print(f"[事务管理器] 事务 {transaction_id} 回滚成功")
return False
def demo_two_phase_commit():
"""演示两阶段提交的使用"""
# 创建资源管理器
rm1 = ResourceManager("数据库A")
rm2 = ResourceManager("数据库B")
# 创建事务管理器
tm = TransactionManager()
tm.register_resource_manager(rm1)
tm.register_resource_manager(rm2)
# 示例1:成功提交的事务
print("\n===== 示例1:成功提交的事务 =====")
txn1 = tm.begin_transaction()
tm.add_operations(txn1, "数据库A", [
{"type": "set", "key": "user_id", "value": 123},
{"type": "set", "key": "balance", "value": 1000}
])
tm.add_operations(txn1, "数据库B", [
{"type": "set", "key": "order_id", "value": "ORD-001"},
{"type": "set", "key": "user_id", "value": 123}
])
result1 = tm.commit_transaction(txn1)
print(f"事务1提交结果: {result1}")
print(f"数据库A状态: {rm1.data}")
print(f"数据库B状态: {rm2.data}")
# 示例2:模拟失败事务(为了演示,我们暂时修改rm2的prepare方法)
print("\n===== 示例2:模拟失败事务 =====")
original_prepare = rm2.prepare
def failing_prepare(transaction_id, operations):
print(f"[{rm2.name}] 模拟准备失败")
return Vote.NO
rm2.prepare = failing_prepare
txn2 = tm.begin_transaction()
tm.add_operations(txn2, "数据库A", [
{"type": "set", "key": "balance", "value": 500}
])
tm.add_operations(txn2, "数据库B", [
{"type": "set", "key": "order_id", "value": "ORD-002"}
])
result2 = tm.commit_transaction(txn2)
print(f"事务2提交结果: {result2}")
print(f"数据库A状态: {rm1.data}") # 应该保持不变
print(f"数据库B状态: {rm2.data}") # 应该保持不变
# 恢复原始方法
rm2.prepare = original_prepare
if __name__ == "__main__":
demo_two_phase_commit()
4.2.2 Saga模式实现
import threading
import time
from enum import Enum
from typing import List, Dict, Any, Callable, Optional
class SagaState(Enum):
"""Saga事务状态枚举"""
STARTED = "started"
RUNNING = "running"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"
class SagaStep:
"""Saga步骤定义"""
def __init__(self, name: str,
action: Callable[[], Any],
compensation: Optional[Callable[[], Any]] = None):
self.name = name
self.action = action
self.compensation = compensation
self.executed = False
self.compensated = False
class SagaTransaction:
"""Saga事务实现"""
def __init__(self, name: str):
self.name = name
self.steps: List[SagaStep] = []
self.state = SagaState.STARTED
self.lock = threading.Lock()
self.compensation_error: Optional[Exception] = None
def add_step(self, step: SagaStep):
"""添加Saga步骤"""
with self.lock:
if self.state != SagaState.STARTED:
raise ValueError("只能在事务开始时添加步骤")
self.steps.append(step)
def execute(self) -> bool:
"""执行Saga事务"""
with self.lock:
if self.state != SagaState.STARTED:
raise ValueError("事务已执行或正在执行")
self.state = SagaState.RUNNING
print(f"[Saga] 开始执行事务: {self.name}")
try:
for i, step in enumerate(self.steps):
print(f"[Saga] 执行步骤 {i+1}/{len(self.steps)}: {step.name}")
try:
step.action()
step.executed = True
print(f"[Saga] 步骤 {step.name} 执行成功")
except Exception as e:
print(f"[Saga] 步骤 {step.name} 执行失败: {e}")
self._compensate(i)
return False
with self.lock:
self.state = SagaState.COMPLETED
print(f"[Saga] 事务 {self.name} 执行成功")
return True
except Exception as e:
print(f"[Saga] 事务执行过程中发生未知错误: {e}")
# 找到最后一个已执行的步骤
last_executed = -1
for i, step in enumerate(self.steps):
if step.executed:
last_executed = i
if last_executed >= 0:
self._compensate(last_executed)
return False
def _compensate(self, from_step: int):
"""执行补偿操作"""
with self.lock:
self.state = SagaState.COMPENS
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)