🏆本文收录于《滚雪球学SpringBoot 3.x》,专门攻坚指数提升,本年度国内最系统+最专业+最详细(永久更新)。
  
该专栏致力打造最硬核 SpringBoot3 从零基础到进阶系列学习内容,🚀均为全网独家首发,打造精品专栏,专栏持续更新中…欢迎大家订阅持续学习。 如果想快速定位学习,可以看这篇【SpringBoot3教程导航帖】,你想学习的都被收集在内,快速投入学习!!两不误。
  
若还想学习更多,可直接订阅 《Spring Boot实战合集》,一次订阅,持续学习,后续更新内容无需重复付费,适合长期收藏与系统进阶。

演示环境说明:

  • 开发工具:IDEA 2021.3
  • JDK版本: JDK 17(推荐使用 JDK 17 或更高版本,因为 Spring Boot 3.x 系列要求 Java 17,Spring Boot 3.5.4 基于 Spring Framework 6.x 和 Jakarta EE 9,它们都要求至少 JDK 17。)
  • Spring Boot版本:3.5.4(于25年7月24日发布)
  • Maven版本:3.8.2 (或更高)
  • Gradle:(如果使用 Gradle 构建工具的话):推荐使用 Gradle 7.5 或更高版本,确保与 JDK 17 兼容。
  • 操作系统:Windows 11

全文目录:

开篇:为什么“下单成功了,但消息没发出去”会让人睡不着觉?

在单体应用里,很多事情看起来理所当然:

  • 用户提交一个订单;
  • 数据库写入成功;
  • 发送一条“订单已创建”的消息;
  • 下游服务收到消息,扣减库存、发放积分、发送短信。

如果每一步都成功,系统就像一条顺滑的流水线。但只要其中任意一步失败,问题就来了:

  • 数据库已经提交,消息却丢了;
  • 消息已经发出,数据库却回滚了;
  • 消息重复投递了,下游重复扣减库存;
  • 补偿任务跑了两次,把订单取消了两次;
  • 订单状态变成“已支付”,库存却没有减,财务和业务对不上。

这些问题的根源只有一个:本地事务和消息发送,不天然具备原子性
而“消息驱动一致性”的目标,就是在不使用昂贵且脆弱的强一致分布式事务前提下,让系统最终达到一致,并且把失败窗口控制在可接受范围内。

这篇文章会从零开始,把下面四件事讲透:

  1. 本地事务与消息发送为什么会失去原子性;
  2. Outbox / 事务消息为什么能解决大部分工程问题;
  3. 补偿任务应该怎么设计才不乱;
  4. 如何防止重复补偿、重复消费、重复发送。

你会看到完整的概念图、状态图、序列图,以及一套能够直接落地的 Spring Boot 3.x 示例代码。

一、先把业务场景说清楚:一致性问题到底发生在哪里?

我们先设定一个非常典型的场景:电商下单。

当用户点击“提交订单”后,订单服务需要完成这些动作:

  • 写入订单主表;
  • 写入一条“订单已创建”的事件;
  • 通知库存服务预占库存;
  • 通知积分服务冻结积分;
  • 如果后续任一步骤失败,系统需要做出补救。

从业务上看,这些动作是一个整体;但从技术上看,它们分散在多个边界里:

  • 订单库:本地数据库事务;
  • 消息中间件:异步消息;
  • 库存服务:独立消费;
  • 补偿逻辑:可能由定时任务触发,也可能由人工触发。

只要把这四个边界拆开,就会发现一个本质问题:

数据库事务只能保护数据库本身,不能自动把消息中间件、远程服务调用、定时任务一起包进来。

所以,真正困难的不是“如何写订单”,而是:

  • 订单写成功后,如何确保消息一定被发送出去;
  • 消息发送成功后,如何确保不会因为程序崩溃而丢失;
  • 下游消费成功后,如何确保不会重复执行;
  • 发现异常后,如何安全地补偿,而不是越补越乱。

二、先看最容易犯错的写法:把数据库和消息硬塞进一个方法

很多初学者第一次做消息驱动时,会写出这样的代码:

@Transactional
public void createOrder(CreateOrderRequest request) {
    // 1. 保存订单
    orderRepository.save(order);

    // 2. 发送消息
    rabbitTemplate.convertAndSend("order.exchange", "order.created", payload);

    // 3. 继续做其他业务
}

从表面上看,这段代码“很完整”。

但它有至少四个致命窗口:

1)数据库提交成功,消息发送失败

订单已经入库,但消息中间件抖了一下,网络超时,程序抛异常。
如果你把异常向上抛,事务回滚,订单也没了;
如果你捕获异常继续返回,订单在,消息没了。
两种结果都不对。

2)消息发送成功,数据库却回滚

这会更糟。
消息已经被下游看见,下游可能已经开始扣库存、发积分、发短信;
但订单主库因为后续代码报错而回滚,最终业务状态和消息状态完全脱节。

3)程序在“提交数据库”和“发消息”之间崩溃

这是一种非常常见的宕机窗口:

  • 数据库已经提交;
  • 消息还没来得及发;
  • JVM 直接挂了。

等系统恢复后,没有任何机制知道“那条消息本来应该发”。

4)消息发送成功,但消费端失败或重复消费

消息中间件通常只保证“至少一次”或“至多一次”的某些语义,但几乎不会替你保证“刚好一次”。
换句话说,下游服务必须自己具备幂等能力,否则重复消息会把库存、积分、余额全部搞乱。

所以,真正靠谱的方案,不是把所有事情硬塞进一个大事务,而是:

  • 在本地事务里先把“业务事实”和“待发送消息”一起落库;
  • 再由独立的投递器把消息搬运出去;
  • 下游消费端做幂等;
  • 必要时再加补偿任务兜底。

这就是 Outbox 模式的核心思想。

三、先认清几个概念:事务消息、Outbox、补偿模型分别是什么?

很多文章喜欢把这些词混在一起讲,结果读完更糊涂。我们把它们拆开。

1)事务消息

事务消息通常指消息中间件提供的一种能力:

  • 先发半消息或预消息;
  • 执行本地事务;
  • 如果本地事务成功,再提交消息;
  • 如果本地事务失败,回滚消息。

典型代表是某些 MQ 的事务消息能力。它的优点是:

  • 接入后看起来“很像原子操作”;
  • 业务代码相对简洁;
  • 中间件会帮你做一些事务回查。

它的问题也很明显:

  • 强依赖具体 MQ 能力;
  • 事务回查逻辑复杂;
  • 运维和排障门槛更高;
  • 一旦业务过程复杂,事务边界会很难维护。

2)Outbox 模式

Outbox 的核心不是“让消息和数据库强绑定”,而是:

把“消息记录”也当成业务数据,和业务事实一起写进同一个本地数据库事务。

也就是说:

  • 订单写进订单表;
  • 事件写进 outbox 表;
  • 两者同一事务提交;
  • 后台投递器轮询 outbox 表,把事件投递到 MQ 或事件总线。

这样,数据库能保证“订单和事件一起成功或者一起失败”。
至于消息真正发出去,是后续异步步骤,失败了可以重试。

Outbox 的优点是:

  • 与 MQ 厂商解耦;
  • 比事务消息更容易落地和调试;
  • 非常适合 Spring Boot 3.x 这种业务分层清晰的应用;
  • 只要数据库可靠,消息就不会凭空丢失。

3)补偿模型

补偿不是回滚。这个点非常重要。

回滚发生在“还没提交”的时候;
补偿发生在“已经提交,但后续发现不一致”的时候。

补偿模型的本质是:

  • 识别出某个业务对象处于异常状态;
  • 执行一个反向或修复动作;
  • 让系统最终回到一个可接受的状态。

比如:

  • 库存预占失败,取消订单;
  • 订单超时未支付,释放库存;
  • 远程服务连续失败,触发重试或人工处理;
  • 已经发出去的消息重复了,利用幂等表跳过。

补偿模型是 Outbox 的天然搭档。
Outbox 负责把“事情一定被记录下来”,补偿负责把“异常事情尽量修回来”。

四、Spring Boot 3.x 里做这件事,最适合怎样的设计?

Spring Boot 3.x 的优势在于:

  • Java 17 作为基础版本,recordsealedswitch 语法更顺手;
  • jakarta.* 统一了新一代规范包名;
  • Spring Data JPA、Validation、TaskScheduler、EventPublisher 都很成熟;
  • 你可以把复杂一致性问题拆成几个很小的组件,而不是堆在一个超大 Service 里。

本文推荐的落地架构如下:

示意图绘制如下,仅供参考:

这个图里最核心的思想只有一句:

业务写库和事件写库同事务,事件投递和业务处理异步化,失败后再通过重试与补偿兜底。

你会发现,这种设计非常符合 Spring Boot 3.x 的“组件化、声明式、职责单一”风格。

五、Outbox 的状态机应该怎么设计,才能不把自己绕晕?

如果一个事件记录只靠“已发送 / 未发送”两个状态,后面一定会乱。
因为真实世界里,事件至少会经历这些阶段:

  • 刚写入,等待投递;
  • 正在投递,但尚未确认;
  • 投递成功;
  • 投递失败,准备重试;
  • 重试多次仍失败,进入死信或人工处理;
  • 有些事件本身就是补偿事件,也要被再次投递。

所以,状态机最好清晰一些:

示意图绘制如下,仅供参考:

这里每个状态的含义如下:

  • NEW:新建,尚未开始投递;
  • PROCESSING:已经被某个投递线程“认领”,正在发送;
  • SENT:已经发送成功;
  • RETRY:发送失败,等待下一次重试;
  • DEAD:重试次数耗尽,进入人工介入或告警流程。

为什么要有 PROCESSING
因为只要有多线程、多实例并发投递,就必须避免两台机器同时投递同一条消息。
PROCESSING 配合版本号或状态 CAS,可以很好地解决这个问题。

为什么要有 DEAD
因为系统里总会有少量“不可修复失败”,比如 payload 格式错误、下游参数缺失、数据已经物理删除。
这种时候不能无限重试,否则只会把数据库和日志打爆。

六、先把运行环境搭起来:Spring Boot 3.x 示例项目的基础配置

为了让你可以直接运行,本文示例使用:

  • Spring Boot 3.x;
  • Java 17;
  • Spring Web;
  • Spring Data JPA;
  • Validation;
  • H2 数据库;
  • Spring Event 作为本地消息总线;
  • Lombok 简化实体与服务代码。

你只需要知道:

  • 真实项目里可以把 H2 换成 MySQL 或 PostgreSQL;
  • Spring Event 可以换成 RabbitMQ、Kafka、RocketMQ;
  • 业务代码主体不需要大改,主要改消息适配层。

下面是示例配置:

spring:
  datasource:
    url: jdbc:h2:mem:demo;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false
    driver-class-name: org.h2.Driver
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: update
    open-in-view: false
    properties:
      hibernate:
        format_sql: true
        show_sql: true
  h2:
    console:
      enabled: true
logging:
  level:
    org.hibernate.SQL: debug
    org.hibernate.orm.jdbc.bind: trace
app:
  outbox:
    batch-size: 20
    retry-delay-seconds: 30
  compensation:
    timeout-minutes: 10

这一份配置值得注意的地方

  • ddl-auto: update 适合演示,生产环境一般建议使用显式建表脚本;
  • open-in-view: false 是一个好习惯,避免把 JPA Session 一直拖到 Web 层;
  • MODE=MySQL 可以让 H2 在演示中尽量接近 MySQL 的语法习惯;
  • batch-sizeretry-delay-seconds 是投递器最重要的两个参数之一;
  • timeout-minutes 是补偿任务判断“超时未完成”的关键阈值。

七、业务模型:订单、Outbox、消费幂等、补偿日志

在真正动手之前,先把表设计清楚。
这里最重要的不是“多建几张表”,而是“每张表只承担一个职责”。

1)订单表 t_order

订单表记录业务事实。
它不应该兼顾“消息投递状态”,因为消息状态变化很频繁,生命周期也不同。

2)Outbox 表 t_outbox_event

Outbox 表记录“待投递事件”以及投递过程中的状态。
它的职责是把消息变成数据库中的可追踪记录。

3)消费幂等表 t_consume_log

消费幂等表记录“这个消息我已经处理过了”。
一旦消息重复投递,下游可以通过唯一键快速跳过。

4)补偿日志表 t_compensation_log

补偿日志表记录“某个业务对象的某种补偿动作已经执行过”。
它是防止重复补偿的第一道防线。

下面给出一份直观的建表思路。你可以把它理解成文章版的 DDL 说明。

CREATE TABLE t_order (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_no VARCHAR(64) NOT NULL UNIQUE,
    user_id VARCHAR(64) NOT NULL,
    amount DECIMAL(18,2) NOT NULL,
    status VARCHAR(32) NOT NULL,
    version BIGINT NOT NULL DEFAULT 0,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

CREATE TABLE t_outbox_event (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    event_id VARCHAR(64) NOT NULL UNIQUE,
    biz_type VARCHAR(64) NOT NULL,
    biz_key VARCHAR(64) NOT NULL,
    event_type VARCHAR(64) NOT NULL,
    payload CLOB NOT NULL,
    status VARCHAR(32) NOT NULL,
    attempt_count INT NOT NULL DEFAULT 0,
    next_retry_at TIMESTAMP NOT NULL,
    last_error VARCHAR(1000),
    trace_id VARCHAR(64),
    version BIGINT NOT NULL DEFAULT 0,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL,
    UNIQUE (biz_type, biz_key, event_type)
);

CREATE TABLE t_consume_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    event_id VARCHAR(64) NOT NULL UNIQUE,
    order_no VARCHAR(64) NOT NULL,
    consume_status VARCHAR(32) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

CREATE TABLE t_compensation_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    biz_type VARCHAR(64) NOT NULL,
    biz_key VARCHAR(64) NOT NULL,
    action_type VARCHAR(64) NOT NULL,
    compensate_status VARCHAR(32) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL,
    UNIQUE (biz_type, biz_key, action_type)
);

这组表设计为什么合理

  • t_order 只记录订单本身,不混杂消息状态;
  • t_outbox_event 只记录待投递事件和投递过程;
  • t_consume_log 只负责消费幂等;
  • t_compensation_log 只负责补偿防重。

这样拆开之后,系统会非常清晰:

  • 订单成功不代表消息成功;
  • 消息成功不代表消费成功;
  • 消费成功不代表补偿永远不会发生;
  • 每一步都要有自己的可追踪、可重试、可防重机制。

八、核心代码一:枚举、请求对象、事件载荷

先把一些基础类型定义好。
这里我们大量使用 Spring Boot 3.x 和 Java 17 的现代写法,比如 record

package com.example.demo.domain;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;

import java.math.BigDecimal;
import java.time.LocalDateTime;

public record CreateOrderRequest(
        @NotBlank(message = "用户ID不能为空") String userId,
        @NotNull(message = "金额不能为空") @Positive(message = "金额必须大于0") BigDecimal amount
) {
}

public enum OrderStatus {
    CREATED,
    CONFIRMED,
    CANCELLED
}

public enum OutboxStatus {
    NEW,
    PROCESSING,
    RETRY,
    SENT,
    DEAD
}

public enum ConsumeStatus {
    DONE,
    SKIPPED
}

public enum CompensationStatus {
    RUNNING,
    DONE,
    FAILED
}

public record OrderCreatedPayload(
        String eventId,
        String orderNo,
        String userId,
        BigDecimal amount,
        LocalDateTime occurredAt
) {
}

这一段代码的价值

  • CreateOrderRequest 用校验注解把非法参数挡在入口;
  • OrderStatusOutboxStatusCompensationStatus 把状态写死成枚举,避免魔法字符串;
  • OrderCreatedPayload 使用 record,非常适合做消息载荷和事件 DTO。

在 Spring Boot 3.x 里,record 的优势很明显:

  • 不需要手写 getter、setter、equals、hashCode;
  • 数据对象更干净;
  • 作为消息载荷非常合适;
  • 也更方便 JSON 序列化。

九、核心代码二:实体设计——订单、Outbox、消费日志、补偿日志

下面是示例里的核心实体。为了让文章更容易阅读,我把一些重复性的 getter/setter 交给 Lombok 处理。你在实际项目里也可以按自己的风格改成手写版本。

package com.example.demo.entity;

import com.example.demo.domain.*;
import jakarta.persistence.*;
import lombok.Getter;
import lombok.Setter;

import java.math.BigDecimal;
import java.time.LocalDateTime;

@Entity
@Table(name = "t_order")
@Getter
@Setter
public class OrderEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true, length = 64)
    private String orderNo;

    @Column(nullable = false, length = 64)
    private String userId;

    @Column(nullable = false, precision = 18, scale = 2)
    private BigDecimal amount;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false, length = 32)
    private OrderStatus status;

    @Version
    private Long version;

    @Column(nullable = false)
    private LocalDateTime createdAt;

    @Column(nullable = false)
    private LocalDateTime updatedAt;

    @PrePersist
    public void prePersist() {
        LocalDateTime now = LocalDateTime.now();
        if (createdAt == null) {
            createdAt = now;
        }
        updatedAt = now;
        if (status == null) {
            status = OrderStatus.CREATED;
        }
    }

    @PreUpdate
    public void preUpdate() {
        updatedAt = LocalDateTime.now();
    }
}

@Entity
@Table(
        name = "t_outbox_event",
        uniqueConstraints = {
                @UniqueConstraint(name = "uk_outbox_biz", columnNames = {"biz_type", "biz_key", "event_type"}),
                @UniqueConstraint(name = "uk_outbox_event_id", columnNames = {"event_id"})
        }
)
@Getter
@Setter
public class OutboxEventEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "event_id", nullable = false, length = 64)
    private String eventId;

    @Column(name = "biz_type", nullable = false, length = 64)
    private String bizType;

    @Column(name = "biz_key", nullable = false, length = 64)
    private String bizKey;

    @Column(name = "event_type", nullable = false, length = 64)
    private String eventType;

    @Lob
    @Column(nullable = false)
    private String payload;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false, length = 32)
    private OutboxStatus status;

    @Column(nullable = false)
    private Integer attemptCount;

    @Column(nullable = false)
    private LocalDateTime nextRetryAt;

    @Column(length = 1000)
    private String lastError;

    @Column(length = 64)
    private String traceId;

    @Version
    private Long version;

    @Column(nullable = false)
    private LocalDateTime createdAt;

    @Column(nullable = false)
    private LocalDateTime updatedAt;

    @PrePersist
    public void prePersist() {
        LocalDateTime now = LocalDateTime.now();
        if (createdAt == null) {
            createdAt = now;
        }
        updatedAt = now;
        if (status == null) {
            status = OutboxStatus.NEW;
        }
        if (attemptCount == null) {
            attemptCount = 0;
        }
        if (nextRetryAt == null) {
            nextRetryAt = now;
        }
    }

    @PreUpdate
    public void preUpdate() {
        updatedAt = LocalDateTime.now();
    }
}

@Entity
@Table(
        name = "t_consume_log",
        uniqueConstraints = {
                @UniqueConstraint(name = "uk_consume_event_id", columnNames = {"event_id"})
        }
)
@Getter
@Setter
public class ConsumeLogEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "event_id", nullable = false, length = 64)
    private String eventId;

    @Column(name = "order_no", nullable = false, length = 64)
    private String orderNo;

    @Enumerated(EnumType.STRING)
    @Column(name = "consume_status", nullable = false, length = 32)
    private ConsumeStatus consumeStatus;

    @Column(nullable = false)
    private LocalDateTime createdAt;

    @Column(nullable = false)
    private LocalDateTime updatedAt;

    @PrePersist
    public void prePersist() {
        LocalDateTime now = LocalDateTime.now();
        if (createdAt == null) {
            createdAt = now;
        }
        updatedAt = now;
        if (consumeStatus == null) {
            consumeStatus = ConsumeStatus.DONE;
        }
    }

    @PreUpdate
    public void preUpdate() {
        updatedAt = LocalDateTime.now();
    }
}

@Entity
@Table(
        name = "t_compensation_log",
        uniqueConstraints = {
                @UniqueConstraint(name = "uk_comp_biz_action", columnNames = {"biz_type", "biz_key", "action_type"})
        }
)
@Getter
@Setter
public class CompensationLogEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "biz_type", nullable = false, length = 64)
    private String bizType;

    @Column(name = "biz_key", nullable = false, length = 64)
    private String bizKey;

    @Column(name = "action_type", nullable = false, length = 64)
    private String actionType;

    @Enumerated(EnumType.STRING)
    @Column(name = "compensate_status", nullable = false, length = 32)
    private CompensationStatus compensateStatus;

    @Column(nullable = false)
    private LocalDateTime createdAt;

    @Column(nullable = false)
    private LocalDateTime updatedAt;

    @PrePersist
    public void prePersist() {
        LocalDateTime now = LocalDateTime.now();
        if (createdAt == null) {
            createdAt = now;
        }
        updatedAt = now;
        if (compensateStatus == null) {
            compensateStatus = CompensationStatus.RUNNING;
        }
    }

    @PreUpdate
    public void preUpdate() {
        updatedAt = LocalDateTime.now();
    }
}

这一组实体体现了什么设计原则

第一,业务表、事件表、幂等表、补偿表分离
第二,每张表都有明确职责,不会互相污染。
第三,每张表都带状态字段,这样后面可以做重试、审计和人工排障。
第四,统一加 @Version,为并发更新与状态 CAS 留出空间。
第五,所有时间字段都显式保存,便于补偿任务判断超时。

如果你之前一直习惯把状态写在一个 status 字段里随便改,这里是你必须升级认知的地方:

在一致性系统里,状态不是装饰品,状态本身就是控制流程的基础设施。

十、核心代码三:Repository 设计——查询、条件更新、版本控制

实体有了,接下来就是仓储层。
这里最关键的不是“CRUD 能不能跑”,而是“并发条件下是否安全”。

package com.example.demo.repository;

import com.example.demo.domain.*;
import com.example.demo.entity.*;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

@Repository
public interface OrderRepository extends JpaRepository<OrderEntity, Long> {
    Optional<OrderEntity> findByOrderNo(String orderNo);

    List<OrderEntity> findTop50ByStatusAndCreatedAtBeforeOrderByCreatedAtAsc(OrderStatus status,
                                                                             LocalDateTime before);
}

@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEventEntity, Long> {

    List<OutboxEventEntity> findTop50ByStatusInAndNextRetryAtLessThanEqualOrderByCreatedAtAsc(
            Collection<OutboxStatus> statuses,
            LocalDateTime now
    );

    @Modifying
    @Query("""
        update OutboxEventEntity e
           set e.status = :toStatus,
               e.version = e.version + 1,
               e.updatedAt = :now
         where e.id = :id
           and e.status = :fromStatus
           and e.version = :version
    """)
    int compareAndSetStatus(@Param("id") Long id,
                            @Param("fromStatus") OutboxStatus fromStatus,
                            @Param("toStatus") OutboxStatus toStatus,
                            @Param("version") Long version,
                            @Param("now") LocalDateTime now);

    @Modifying
    @Query("""
        update OutboxEventEntity e
           set e.status = :toStatus,
               e.attemptCount = e.attemptCount + 1,
               e.nextRetryAt = :nextRetryAt,
               e.lastError = :lastError,
               e.version = e.version + 1,
               e.updatedAt = :now
         where e.id = :id
           and e.status = :fromStatus
           and e.version = :version
    """)
    int markRetry(@Param("id") Long id,
                  @Param("fromStatus") OutboxStatus fromStatus,
                  @Param("toStatus") OutboxStatus toStatus,
                  @Param("version") Long version,
                  @Param("nextRetryAt") LocalDateTime nextRetryAt,
                  @Param("lastError") String lastError,
                  @Param("now") LocalDateTime now);

    @Modifying
    @Query("""
        update OutboxEventEntity e
           set e.status = :toStatus,
               e.version = e.version + 1,
               e.updatedAt = :now,
               e.lastError = null
         where e.id = :id
           and e.status = :fromStatus
           and e.version = :version
    """)
    int markSent(@Param("id") Long id,
                 @Param("fromStatus") OutboxStatus fromStatus,
                 @Param("toStatus") OutboxStatus toStatus,
                 @Param("version") Long version,
                 @Param("now") LocalDateTime now);
}

@Repository
public interface ConsumeLogRepository extends JpaRepository<ConsumeLogEntity, Long> {
    boolean existsByEventId(String eventId);
}

@Repository
public interface CompensationLogRepository extends JpaRepository<CompensationLogEntity, Long> {
    boolean existsByBizTypeAndBizKeyAndActionType(String bizType, String bizKey, String actionType);
}

Repository 层最重要的不是“查出来”,而是“改得安全”

这里有两个非常值得记住的点。

1)不要用“先查再改”来处理并发状态

很多人一开始会写成:

  • 先查一条 outbox;
  • 判断 status 是否正确;
  • 再更新 status。

这样在单线程下没问题。
但一旦多实例并发,两台机器可能同时查到同一条记录,随后都去发送消息,重复就发生了。

所以更稳妥的方式是:

  • 先读出来拿到 idversion
  • 再用条件更新做 CAS;
  • 更新成功才说明这条记录真正属于你。
2)existsBy... 只是辅助判断,不能单独作为防重手段

existsBy... 很方便,但它不是原子操作。
多个线程可能同时看到“不存在”,然后同时插入。
真正可靠的防重,必须依赖数据库唯一键或条件更新。

这也是为什么本文在消费日志和补偿日志上都加了唯一约束。

十一、核心代码四:订单接口与订单应用服务

接下来是用户真正会碰到的入口。

package com.example.demo.web;

import com.example.demo.domain.CreateOrderRequest;
import com.example.demo.service.OrderApplicationService;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

@RestController
@RequestMapping("/orders")
@RequiredArgsConstructor
public class OrderController {

    private final OrderApplicationService orderApplicationService;

    @PostMapping
    public Map<String, Object> create(@Valid @RequestBody CreateOrderRequest request) {
        String orderNo = orderApplicationService.placeOrder(request);
        return Map.of(
                "orderNo", orderNo,
                "status", "CREATED",
                "message", "订单已创建,事件已写入 Outbox"
        );
    }
}

订单应用服务是整个流程的起点,它的任务非常明确:

  • 接收合法请求;
  • 生成订单号;
  • 写入订单;
  • 写入 outbox;
  • 提交本地事务;
  • 返回订单号。

代码如下:

package com.example.demo.service;

import com.example.demo.domain.*;
import com.example.demo.entity.*;
import com.example.demo.repository.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;

@Service
@RequiredArgsConstructor
public class OrderApplicationService {

    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final ObjectMapper objectMapper;

    @Transactional
    public String placeOrder(CreateOrderRequest request) {
        String orderNo = generateOrderNo();
        String eventId = UUID.randomUUID().toString().replace("-", "");

        // 1. 保存订单
        OrderEntity order = new OrderEntity();
        order.setOrderNo(orderNo);
        order.setUserId(request.userId());
        order.setAmount(request.amount());
        order.setStatus(OrderStatus.CREATED);
        orderRepository.save(order);

        // 2. 构建事件载荷
        OrderCreatedPayload payload = new OrderCreatedPayload(
                eventId,
                orderNo,
                request.userId(),
                request.amount(),
                LocalDateTime.now()
        );

        // 3. 保存 outbox 事件
        OutboxEventEntity outbox = new OutboxEventEntity();
        outbox.setEventId(eventId);
        outbox.setBizType("ORDER");
        outbox.setBizKey(orderNo);
        outbox.setEventType("ORDER_CREATED");
        outbox.setPayload(toJson(payload));
        outbox.setStatus(OutboxStatus.NEW);
        outbox.setAttemptCount(0);
        outbox.setNextRetryAt(LocalDateTime.now());
        outbox.setTraceId(UUID.randomUUID().toString().replace("-", ""));
        outboxEventRepository.save(outbox);

        return orderNo;
    }

    private String generateOrderNo() {
        return "ORD-" + UUID.randomUUID().toString().replace("-", "").substring(0, 18).toUpperCase();
    }

    private String toJson(Object obj) {
        try {
            return objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new IllegalStateException("事件序列化失败", e);
        }
    }
}

这段代码为什么是 Outbox 的灵魂

你会发现它只做了一件事:在同一个数据库事务里,写订单和写事件。

这就是 Outbox 成功的根本原因。

如果后面的消息投递失败,订单和事件都已经在数据库里,至少不会丢;
如果订单创建失败,Outbox 也不会单独存在;
如果程序崩了,只要数据库提交成功,后台投递器下一次还能继续处理。

这比把“保存订单”和“发消息”绑在一个 try-catch 里靠谱得多。

十二、核心代码五:Outbox 投递器——把事件从数据库搬到消息总线

Outbox 模式不是“把事件存在数据库里就完了”,而是还要有一个投递器。
这个投递器的作用就是:

  • 周期性扫描待发送事件;
  • 认领一条事件;
  • 发送出去;
  • 成功后标记 SENT;
  • 失败后标记 RETRY;
  • 到达阈值后标记 DEAD。

为了避免“一个类里既有事务又有投递又有状态更新”导致 Spring 自调用失效,我们拆成两个 Bean:

  • OutboxStateService 负责状态 CAS;
  • OutboxRelayJob 负责调度和发送。

先看状态服务:

package com.example.demo.service;

import com.example.demo.domain.OutboxStatus;
import com.example.demo.entity.OutboxEventEntity;
import com.example.demo.repository.OutboxEventRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

@Service
@RequiredArgsConstructor
public class OutboxStateService {

    private final OutboxEventRepository outboxEventRepository;

    @Transactional
    public boolean claim(OutboxEventEntity event) {
        int updated = outboxEventRepository.compareAndSetStatus(
                event.getId(),
                event.getStatus(),
                OutboxStatus.PROCESSING,
                event.getVersion(),
                LocalDateTime.now()
        );
        if (updated == 1) {
            event.setStatus(OutboxStatus.PROCESSING);
            event.setVersion(event.getVersion() + 1);
            return true;
        }
        return false;
    }

    @Transactional
    public void markSent(OutboxEventEntity event) {
        int updated = outboxEventRepository.markSent(
                event.getId(),
                OutboxStatus.PROCESSING,
                OutboxStatus.SENT,
                event.getVersion(),
                LocalDateTime.now()
        );
        if (updated != 1) {
            throw new IllegalStateException("标记发送成功失败,说明事件已被其他线程处理");
        }
        event.setStatus(OutboxStatus.SENT);
        event.setVersion(event.getVersion() + 1);
    }

    @Transactional
    public void markRetry(OutboxEventEntity event, String error, LocalDateTime nextRetryAt) {
        int updated = outboxEventRepository.markRetry(
                event.getId(),
                OutboxStatus.PROCESSING,
                OutboxStatus.RETRY,
                event.getVersion(),
                nextRetryAt,
                truncate(error),
                LocalDateTime.now()
        );
        if (updated != 1) {
            throw new IllegalStateException("标记重试失败,说明事件已被其他线程处理");
        }
        event.setStatus(OutboxStatus.RETRY);
        event.setVersion(event.getVersion() + 1);
    }

    private String truncate(String text) {
        if (text == null) {
            return null;
        }
        return text.length() <= 1000 ? text : text.substring(0, 1000);
    }
}

再看投递器:

package com.example.demo.job;

import com.example.demo.domain.OutboxStatus;
import com.example.demo.entity.OutboxEventEntity;
import com.example.demo.repository.OutboxEventRepository;
import com.example.demo.service.OutboxStateService;
import com.example.demo.service.DomainEventBus;
import com.example.demo.domain.OrderCreatedPayload;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRelayJob {

    private final OutboxEventRepository outboxEventRepository;
    private final OutboxStateService outboxStateService;
    private final DomainEventBus domainEventBus;
    private final ObjectMapper objectMapper;

    @Value("${app.outbox.batch-size:20}")
    private int batchSize;

    @Value("${app.outbox.retry-delay-seconds:30}")
    private long retryDelaySeconds;

    @Scheduled(fixedDelayString = "${app.outbox.poll-interval-ms:3000}")
    public void dispatch() {
        List<OutboxEventEntity> batch = outboxEventRepository
                .findTop50ByStatusInAndNextRetryAtLessThanEqualOrderByCreatedAtAsc(
                        List.of(OutboxStatus.NEW, OutboxStatus.RETRY),
                        LocalDateTime.now()
                );

        int count = Math.min(batchSize, batch.size());
        for (int i = 0; i < count; i++) {
            OutboxEventEntity event = batch.get(i);

            if (!outboxStateService.claim(event)) {
                continue;
            }

            try {
                OrderCreatedPayload payload = objectMapper.readValue(event.getPayload(), OrderCreatedPayload.class);
                // 真正项目里,这里可以替换成 RabbitTemplate / KafkaTemplate / RocketMQTemplate
                domainEventBus.publish(payload);
                outboxStateService.markSent(event);
                log.info("Outbox 事件发送成功:eventId={}, bizKey={}", event.getEventId(), event.getBizKey());
            } catch (Exception ex) {
                LocalDateTime nextRetryAt = LocalDateTime.now().plusSeconds(retryDelaySeconds);
                outboxStateService.markRetry(event, ex.getMessage(), nextRetryAt);
                log.warn("Outbox 事件发送失败,准备重试:eventId={}, err={}", event.getEventId(), ex.getMessage());
            }
        }
    }
}

这段代码真正解决了什么问题

它解决的是:谁来投递、如何避免重复投递、失败后如何重试

1)先认领,再发送

claim() 是关键。只有成功把状态从 NEW / RETRY 原子地改成 PROCESSING,这个投递线程才有资格继续发送。

2)发送成功后再标记 SENT

这样即使程序崩了,至少 PROCESSING 状态还在,后续可以通过定时任务扫描超时状态再修复。

3)发送失败后进入 RETRY

这是一种“有序重试”,比把异常直接吞掉要强得多。

4)不用把投递器写成一个大事务

投递器的本质是“搬运工”,不是“业务处理器”。
它不应该承担过多复杂的业务逻辑,否则维护会非常痛苦。

十三、核心代码六:消息总线与消费端幂等处理

到这里,事件已经从 Outbox 表被搬出来了。
接下来要做的是消费。

本文为了让你直接跑起来,使用 Spring 的 ApplicationEventPublisher 作为本地事件总线。
它的作用是模拟消息总线:

  • 发布端只负责发布;
  • 消费端只负责处理;
  • 代码结构和真实 MQ 很像;
  • 你之后替换成 RabbitMQ / Kafka 时,改动会很小。

先看事件总线适配层:

package com.example.demo.service;

import com.example.demo.domain.OrderCreatedPayload;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;

public interface DomainEventBus {
    void publish(OrderCreatedPayload payload);
}

@Service
@RequiredArgsConstructor
class SpringDomainEventBus implements DomainEventBus {

    private final ApplicationEventPublisher publisher;

    @Override
    public void publish(OrderCreatedPayload payload) {
        publisher.publishEvent(payload);
    }
}

消费端代码如下:

package com.example.demo.consumer;

import com.example.demo.domain.*;
import com.example.demo.entity.*;
import com.example.demo.repository.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderCreatedConsumer {

    private final ConsumeLogRepository consumeLogRepository;
    private final OrderRepository orderRepository;

    @EventListener
    @Transactional
    public void onMessage(OrderCreatedPayload payload) {
        // 1. 先写幂等日志,利用唯一键保证重复消息不重复执行
        ConsumeLogEntity logEntity = new ConsumeLogEntity();
        logEntity.setEventId(payload.eventId());
        logEntity.setOrderNo(payload.orderNo());
        logEntity.setConsumeStatus(ConsumeStatus.DONE);

        try {
            consumeLogRepository.saveAndFlush(logEntity);
        } catch (DataIntegrityViolationException duplicate) {
            log.info("消息重复,直接跳过:eventId={}", payload.eventId());
            return;
        }

        // 2. 执行业务逻辑:这里以“订单确认”为例
        OrderEntity order = orderRepository.findByOrderNo(payload.orderNo())
                .orElseThrow(() -> new IllegalStateException("订单不存在:" + payload.orderNo()));
        if (order.getStatus() == OrderStatus.CONFIRMED) {
            log.info("订单已确认,无需重复处理:orderNo={}", payload.orderNo());
            return;
        }

        order.setStatus(OrderStatus.CONFIRMED);
        orderRepository.save(order);

        log.info("消息消费成功,订单已确认:orderNo={}, eventId={}", payload.orderNo(), payload.eventId());
    }
}

消费幂等为什么一定要做,而且最好让数据库帮你做

消息系统几乎都要面对“重复投递”这个事实。
所以消费端不能假设消息只来一次。

这里的策略很简单,也很有效:

  • 每条消息都带 eventId
  • 消费前先写一条幂等记录;
  • eventId 上加唯一约束;
  • 如果重复,数据库会直接拦住;
  • 重复消息返回成功,避免消息反复重试。

这个设计优于“先查数据库再执行”的原因很简单:

先查后做不是原子操作;唯一约束才是。

这也是消息驱动系统里最值得记住的一条工程经验。

十四、核心代码七:补偿日志与超时订单补偿任务

补偿模型的关键不在“写一个定时任务”,而在于:

  • 什么时候认为需要补偿;
  • 补偿动作是否可重复执行;
  • 补偿成功后如何留下证据;
  • 补偿和原业务是否会互相抢状态。

我们先定义补偿服务:

package com.example.demo.service;

import com.example.demo.domain.*;
import com.example.demo.entity.*;
import com.example.demo.repository.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.UUID;

@Service
@RequiredArgsConstructor
public class OrderCompensationService {

    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final CompensationLogRepository compensationLogRepository;
    private final ObjectMapper objectMapper;

    @Transactional
    public boolean cancelTimeoutOrder(OrderEntity order) {
        // 1. 先写补偿日志,利用唯一键防止重复补偿
        CompensationLogEntity log = new CompensationLogEntity();
        log.setBizType("ORDER");
        log.setBizKey(order.getOrderNo());
        log.setActionType("TIMEOUT_CANCEL");
        log.setCompensateStatus(CompensationStatus.RUNNING);

        try {
            compensationLogRepository.saveAndFlush(log);
        } catch (DataIntegrityViolationException duplicate) {
            // 已经有线程/任务在做这件事,直接跳过
            return false;
        }

        // 2. 只有还处于 CREATED 状态的订单才允许取消
        OrderEntity latest = orderRepository.findByOrderNo(order.getOrderNo())
                .orElseThrow(() -> new IllegalStateException("订单不存在:" + order.getOrderNo()));

        if (latest.getStatus() != OrderStatus.CREATED) {
            log.setCompensateStatus(CompensationStatus.FAILED);
            return false;
        }

        // 3. 执行补偿动作:取消订单
        latest.setStatus(OrderStatus.CANCELLED);
        orderRepository.save(latest);

        // 4. 同时写入补偿事件,后续也可以继续走 Outbox 投递
        OrderCreatedPayload cancelPayload = new OrderCreatedPayload(
                UUID.randomUUID().toString().replace("-", ""),
                latest.getOrderNo(),
                latest.getUserId(),
                latest.getAmount(),
                LocalDateTime.now()
        );

        OutboxEventEntity outbox = new OutboxEventEntity();
        outbox.setEventId(UUID.randomUUID().toString().replace("-", ""));
        outbox.setBizType("ORDER");
        outbox.setBizKey(latest.getOrderNo());
        outbox.setEventType("ORDER_CANCELLED");
        outbox.setPayload(toJson(cancelPayload));
        outbox.setStatus(OutboxStatus.NEW);
        outbox.setAttemptCount(0);
        outbox.setNextRetryAt(LocalDateTime.now());
        outbox.setTraceId(UUID.randomUUID().toString().replace("-", ""));
        outboxEventRepository.save(outbox);

        log.setCompensateStatus(CompensationStatus.DONE);
        return true;
    }

    private String toJson(Object obj) {
        try {
            return objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new IllegalStateException("补偿事件序列化失败", e);
        }
    }
}

定时任务如下:

package com.example.demo.job;

import com.example.demo.domain.OrderStatus;
import com.example.demo.entity.OrderEntity;
import com.example.demo.repository.OrderRepository;
import com.example.demo.service.OrderCompensationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderCompensationJob {

    private final OrderRepository orderRepository;
    private final OrderCompensationService compensationService;

    @Value("${app.compensation.timeout-minutes:10}")
    private long timeoutMinutes;

    @Scheduled(fixedDelayString = "${app.compensation.poll-interval-ms:10000}")
    public void compensateTimeoutOrders() {
        LocalDateTime cutoff = LocalDateTime.now().minusMinutes(timeoutMinutes);
        List<OrderEntity> list = orderRepository
                .findTop50ByStatusAndCreatedAtBeforeOrderByCreatedAtAsc(OrderStatus.CREATED, cutoff);

        for (OrderEntity order : list) {
            try {
                boolean done = compensationService.cancelTimeoutOrder(order);
                if (done) {
                    log.info("超时订单补偿成功:orderNo={}", order.getOrderNo());
                }
            } catch (Exception ex) {
                log.error("超时订单补偿失败:orderNo={}, err={}", order.getOrderNo(), ex.getMessage(), ex);
            }
        }
    }
}

这一套补偿方案的关键点

1)补偿不是“只删数据”

很多人一听补偿,就想成“把数据回滚掉”。
实际上,大多数业务补偿都不是删除,而是:

  • 改状态;
  • 发反向消息;
  • 释放资源;
  • 记录人工待处理任务;
  • 触发后续修复流程。
2)补偿一定要有唯一标识

这里我们用 bizType + bizKey + actionType 做唯一约束。
这样哪怕定时任务跑两次、两台机器同时跑,也只会有一个线程真正写入补偿日志。

3)补偿动作本身也可能触发消息

例如订单取消后,你可能还要给库存服务发“释放库存”消息。
所以补偿并不是终点,它常常只是另一段一致性链路的起点。

十五、如何防止重复补偿:不要只靠“查一查再执行”?

重复补偿比重复消费更隐蔽,因为它往往不是立刻报错,而是慢慢把业务状态改乱。

比如一个订单超时取消流程可能这样出问题:

  • 任务 A 读到订单 CREATED
  • 任务 B 也读到订单 CREATED
  • A 把订单改成 CANCELLED
  • B 也准备改 CANCELLED
  • 两边都各自发出取消消息;
  • 下游又处理了两次释放库存。

所以,防止重复补偿至少要有四道防线:

第一层:业务状态校验

只有处于某个明确状态的对象才允许补偿。
例如订单只有 CREATED 才能取消,CONFIRMED 就不能取消。

第二层:补偿日志唯一键

bizType + bizKey + actionType 组合唯一。
这能挡住绝大多数“同一补偿动作执行两次”的情况。

第三层:条件更新或版本控制

在必要时,对订单状态使用版本号或状态 CAS。
这能防止并发线程同时把同一条状态改掉。

第四层:补偿幂等

真正稳妥的补偿动作,最好天然可幂等。
例如:

  • 释放库存时根据“已预占数量”而不是直接减库存;
  • 取消订单时只允许从 CREATED 进入 CANCELLED
  • 发退款时用退款单号做唯一键;
  • 发消息时用事件 eventId 做唯一键。

这四层并不是互相替代,而是互相叠加。
工程里最怕的就是“只做一层,就觉得足够了”。

十六、把失败窗口展开看:你会知道为什么要这样设计?

现在我们把整个链路展开,看看每个阶段的失败会发生什么。

场景 A:订单写库成功,Outbox 写库成功,投递失败

结果是:

  • 订单还在;
  • 事件也在;
  • 投递器下一轮重试;
  • 最终消息仍有机会送达。

这是 Outbox 最核心的优势。

场景 B:订单写库成功,Outbox 写库失败

由于订单和 Outbox 在同一个事务里,整个事务会回滚。
最终数据库里既没有订单,也没有事件。
这就避免了“订单有了,事件没了”的不一致。

场景 C:消息已经送出,下游消费失败

由于消费端有幂等日志,下游可以重试;
如果多次失败,就把消息记为失败并上报监控;
如果本质是业务异常,则进入人工排障或补偿链路。

场景 D:补偿任务执行到一半挂了

如果补偿任务有独立补偿日志、唯一键、状态约束,那么下次重试时可以继续恢复,而不会重复执行完全相同的动作。

场景 E:多实例同时投递或补偿

只要你设计了:

  • 条件更新;
  • 唯一约束;
  • 状态 CAS;
  • 幂等判断;

那么并发本身不是问题,重复才是问题,而重复是可以控制的。

十七、很多人容易踩的坑:这些错误最好一开始就避开

坑 1:把 Outbox 当成“消息备份表”

Outbox 不是简单日志,更不是备份。
它是业务流程的一部分。
如果你只是把消息插进去但从不治理状态、重试、死信、补偿,它很快就会变成垃圾表。

坑 2:补偿任务里直接写死复杂业务逻辑

补偿任务应该做调度、识别、触发;
真正的业务动作放到独立的 Service 里。
否则你会得到一个越来越大的 @Scheduled 巨无霸。

坑 3:先查后插,误以为自己做了幂等

这个坑前面已经说过,但它非常重要,值得再强调一次。
并发系统里,先查后插不是防重,只是碰运气

坑 4:消费者只做日志,不做唯一约束

如果没有唯一约束,日志最多只能给你排查线索,不能给你一致性保证。

坑 5:所有错误都重试

不是所有错误都值得重试。
例如:

  • JSON 反序列化失败;
  • 配置缺失;
  • 业务参数非法;
  • 数据已经不存在。

这种问题重试一万次也不会成功,应该尽快进入 DEAD 或人工介入。

坑 6:补偿之后不留下证据

如果补偿只改状态不留日志,后面很难回答:

  • 谁补偿的?
  • 什么时候补偿的?
  • 为什么补偿?
  • 是否已经补偿过?

生产系统里,证据链就是稳定性的一部分

十八、如果把本文示例换成真实 MQ,应该改哪里

你可能会问:既然本文用的是 Spring Event,那和真实 MQ 有什么区别?

答案是:核心模式几乎不变,变的只是传输层。

你需要替换的主要有三块:

1)消息总线实现

当前示例使用的是:

  • ApplicationEventPublisher

真实项目里可以替换成:

  • RabbitTemplate
  • KafkaTemplate
  • RocketMQTemplate

只要你的 DomainEventBus 接口不变,业务层几乎不需要动。

2)消费者入口

当前示例使用:

  • @EventListener

真实项目里可替换为:

  • @RabbitListener
  • @KafkaListener
  • RocketMQ 的消息监听器

消费端的幂等、事务、状态判断仍然保留。

3)消息确认与重试策略

真实 MQ 会多出这些问题:

  • ack / nack;
  • 手动确认还是自动确认;
  • 消费重试次数;
  • 死信队列;
  • 顺序消息;
  • 消息过期。

但这些都是“运输层细节”,不会改变 Outbox 和补偿的根本设计。

换句话说:

Outbox 负责“别丢”,幂等负责“别重”,补偿负责“别乱”。

这三件事才是消息驱动一致性的三根柱子。

十九、进一步拓展:为什么 Spring Boot 3.x 这套写法特别适合做一致性设计?

Spring Boot 3.x 之所以很适合讲消息驱动一致性,是因为它把很多复杂机制都变成了简单的约定。

1)声明式事务依然好用

@Transactional 依然是最常见的事务边界描述方式。
你不需要手动管理连接、提交、回滚,就能把“订单 + Outbox”放在同一个原子边界里。

2)校验、Web、持久层分工更清晰

@Valid 负责入口校验,Service 负责业务编排,Repository 负责数据访问。
结构清晰之后,一致性问题更容易被隔离与测试。

3)Java 17 的 record 让消息载荷更轻

事件载荷本来就应该是“纯数据对象”。
record 非常贴合这个角色,减少样板代码,也降低理解成本。

4)jakarta.* 让依赖升级更统一

虽然包名迁移会让老项目痛一下,但从长期看,它让你在 Boot 3.x 里更容易和现代规范对齐。
尤其是在实体、校验、Servlet 这类基础接口上,风格更加统一。

5)定时任务、事件机制、事务传播都很成熟

补偿任务、Outbox 轮询、消费处理、状态更新,都是 Spring 很擅长的事情。
所以你完全可以把精力放在业务设计上,而不是被框架细节牵着走。

二十、把这篇文章真正读懂,你应该记住的不是代码,而是三条原则

如果你只想记住三句话,那就记住这三句:

原则一:不要幻想数据库事务可以天然覆盖消息系统

数据库事务只管数据库。
消息系统、远程服务、补偿任务,都不在它的天然边界里。

原则二:Outbox 解决“消息别丢”,幂等解决“消息别重”,补偿解决“异常别乱”

这三件事各自负责一块,缺一不可。

原则三:补偿不是补救心态,而是系统设计的一部分

真正成熟的系统,不会假装异常不存在。
它会把异常写进模型、写进状态机、写进日志、写进重试和补偿里。

当你把“异常”当成系统的一等公民,系统才会越来越稳。

二十一、完整流程回放:从下单到补偿,整条链路长什么样?

我们最后把整个流程串起来看一遍。

正常路径

  1. 用户提交订单;
  2. OrderApplicationService 在一个本地事务里写订单和 outbox;
  3. 事务提交后,OutboxRelayJob 扫描到这条事件;
  4. 投递器先把事件从 NEW 改成 PROCESSING
  5. 通过消息总线发布事件;
  6. OrderCreatedConsumer 收到事件,先写消费幂等日志;
  7. 消费端把订单状态改成 CONFIRMED
  8. 投递器把 outbox 标记为 SENT

异常路径

  1. 投递器发布事件失败;
  2. outbox 标记为 RETRY
  3. 下一轮任务继续重试;
  4. 多次失败后进入 DEAD
  5. 监控或人工介入。

补偿路径

  1. 订单长时间停留在 CREATED
  2. OrderCompensationJob 扫描到超时订单;
  3. OrderCompensationService 先写补偿日志,利用唯一键防重;
  4. 将订单改成 CANCELLED
  5. 同时写入 ORDER_CANCELLED 的 outbox 事件;
  6. 后续继续通过 Outbox 投递取消消息。

这三条路径合在一起,才是一个完整的消息驱动一致性系统。

二十二、如果你要把它用于真实项目,建议再补上这几件事

本文示例已经足够你理解和跑通核心逻辑,但真实项目里,建议再增加下面这些能力:

1)监控指标

至少要统计:

  • outbox 待发送数;
  • 发送成功率;
  • 重试次数;
  • DEAD 数量;
  • 补偿成功率;
  • 消费重复率。

2)告警

DEAD 事件过多、补偿失败过多、消费者重复过多时,应该自动告警。

3)追踪链路

traceIdbizKeyeventIdorderNo 最好能在日志里贯穿始终。
排障时,你会感谢自己当初多写了这几个字段。

4)补偿策略分级

不是所有问题都用同一种补偿。
有些问题可以自动重试;
有些问题只能人工确认;
有些问题应该直接冻结流程,等待业务介入。

5)消息版本化

当事件格式要变更时,建议考虑版本号。
不要让一个事件结构在系统里默默演化到谁都不敢改。

结语:一致性不是“保证绝对不出错”,而是“出错后也能收回来”

消息驱动系统最迷人的地方,也是最容易误解的地方,就是它从来不是在承诺“绝对不失败”。

它真正承诺的是:

  • 失败可见;
  • 失败可重试;
  • 失败可补偿;
  • 失败不会悄悄把系统状态撕裂成几块。

如果你已经理解了本文的这套思路,那么你对 Spring Boot 3.x 的掌握,就不再只是“会写 CRUD 接口”,而是开始进入“会设计可靠业务流程”的阶段。

这也是零基础进阶到工程实战最关键的一步。

记住一句话就够了:

事务负责把事实写进去,Outbox 负责把消息带出去,幂等负责把重复挡下来,补偿负责把异常拉回来。

当你能用这四句话解释自己的系统时,你就已经不只是会用 Spring Boot 3.x 了,你是在用它设计真正可靠的业务系统。

ok,同学们,本节课就上到这儿,下课~

🧧 学习福利 · 限时开放 🧧

当然,无论你是计算机专业在读学生,还是对编程充满兴趣的入门者,都强烈建议系统学习SpringBoot全体系专栏:👉 「滚雪球学 Spring Boot」;涵盖SpringBoot所有教学内容。

该专栏以“循序渐进 + 实战驱动”为核心理念,从基础到进阶到就业到架构师逐层展开,帮助你快速建立完整的 Spring Boot 技术体系,带你玩转SpringBoot框架。

📌 学习承诺:
通过该专栏,你将能够:

  • 快速掌握 Spring Boot 核心开发能力
  • 构建完整的后端项目认知体系
  • 实现从“入门”到“独立开发”的跃迁

就像“滚雪球”一样,知识不断积累、能力持续放大,实现指数级成长 🚀

最后,如果这篇文章对你有所帮助,帮忙给作者来个一键三连,关注、点赞、收藏,您的支持就是我坚持写作最大的动力。

同时欢迎大家关注技术号:「猿圈奇妙屋」 ,以便学习更多同类型的技术文章,免费白嫖最新BAT互联网公司面试题、4000G PDF编程电子书、简历模板、技术文章Markdown文档等海量资料。

ps:本文涉及所有源代码,均已上传至Gitee开源,供同学们直接对照学习 Gitee传送门,同时,原创开源不易,欢迎给个star🌟,想体验下被🌟的感jio,非常感谢❗

🫵 Who am I?

我是 bug菌,一名深耕 Java 后端领域数十年的一线研发老兵,曾担任独角兽企业后端技术经理、研发架构师等职位,长期专注于 Java 后端、分布式架构、微服务治理、高并发系统、工程效能与研发管理等方向。

目前活跃于多个主流技术社区,包括:

CSDN稀土掘金InfoQ51CTO华为云开发者社区阿里云开发者社区腾讯云开发者社区开源中国博客园墨天轮 等平台。

曾获得:

  • CSDN 博客之星 Top30
  • 华为云多年度十佳博主 & 卓越贡献奖
  • 掘金多年度人气作者 Top40
  • CSDN、掘金、InfoQ、51CTO 等平台签约作者 / 优质作者

截至目前,全网技术内容累计影响读者众多,全网粉丝已超过 30w+

如果你也关注 Java 后端、架构设计、技术成长、职场进阶与研发管理,欢迎关注我的技术内容合集入口:👉 点击查看 👈️

硬核技术号 「猿圈奇妙屋」 期待你的加入。

这里不仅分享技术干货,也记录一线研发人的成长、踩坑、思考与进阶路径。

愿我们一起打怪升级,在技术路上持续进阶。

- End -

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐