黑马点评 Redis 消息队列四:Stream 如何改造异步秒杀下单?

本文继续整理黑马点评 Redis 实战篇第 7 章「Redis 消息队列」。

前三篇讲完了为什么需要 Redis MQ、List 和 PubSub 为什么不够、Stream 消费者组强在哪里。

这一篇回到黑马点评最终业务:如何用 Redis Stream 改造第 6 章 BlockingQueue 异步秒杀下单。


1. 这篇文章解决什么问题

第 7 章最终落点是:

把第 6 章的本地 BlockingQueue 换成 Redis Stream。

也就是说,订单任务不再这样交接:

请求线程 -> JVM BlockingQueue -> 当前 JVM 后台线程

而是变成:

Lua 脚本 -> Redis Stream -> 消费者组后台线程

这篇文章主要讲清楚:

1. Lua 脚本如何把订单消息写入 stream.orders。
2. 请求线程为什么不再手动 add 到 BlockingQueue。
3. 后台线程如何用 XREADGROUP 读取订单消息。
4. 消费成功后为什么必须 XACK。
5. 异常时为什么要处理 pending-list。

先给结论:

Stream 版异步秒杀中,请求线程只负责生成订单 id 并执行 Lua。Lua 在 Redis 内部完成库存判断、一人一单判断、扣 Redis 库存、记录用户,并通过 XADD 把订单消息写入 stream.orders。后台线程以消费者组身份阻塞读取 Stream 消息,解析成 VoucherOrder 后执行数据库落库,成功后 ACK;如果处理异常,消息会留在 pending-list 中,再由异常恢复逻辑重新处理。


2. 最终链路总览

Stream 版秒杀下单可以拆成两段。

第一段是请求线程 + Lua:

前端请求 /voucher-order/seckill/{id}
    ↓
Controller 调用 seckillVoucher(voucherId)
    ↓
获取当前用户 userId
    ↓
生成 orderId
    ↓
执行 seckill.lua
    ↓
Lua 判断库存和一人一单
    ↓
Lua 扣 Redis 库存、记录用户、XADD 写入 stream.orders
    ↓
Java 根据返回值返回结果

第二段是后台消费者:

项目启动后创建后台线程
    ↓
后台线程 XREADGROUP 读取 stream.orders 新消息
    ↓
解析消息为 VoucherOrder
    ↓
执行数据库扣库存和保存订单
    ↓
成功后 XACK
    ↓
异常时处理 pending-list

完整流程图

1 库存不足

2 重复下单

0 成功

前端秒杀请求

VoucherOrderController

seckillVoucher(voucherId)

生成 userId 和 orderId

执行 seckill.lua

Lua 返回值

返回库存不足

返回不能重复下单

Lua XADD 写入 stream.orders

请求线程返回 orderId

后台线程 VoucherOrderHandler

XREADGROUP 读取 Stream

解析 MapRecord 为 VoucherOrder

createVoucherOrder 落库

XACK 确认消息

异常时 handlePendingList

注意这张图里最重要的变化:

订单消息不是 Java 请求线程 add 到 BlockingQueue。
订单消息是 Lua 脚本直接 XADD 到 Redis Stream。

3. 秒杀入口:Controller 仍然很简单

秒杀入口仍然是:

@PostMapping("/seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
    return voucherOrderService.seckillVoucher(voucherId);
}

Controller 不关心你下面是 BlockingQueue 还是 Stream。

它只负责:

接收 voucherId
调用 Service
返回 Result

真正变化都在 VoucherOrderServiceImplseckill.lua


4. 请求线程:生成订单 id 并执行 Lua

Service 中秒杀方法的核心:

@Override
public Result seckillVoucher(Long voucherId) {
    // 获取用户
    Long userId = UserHolder.getUser().getId();
    long orderId = redisIdWorker.nextId("order");
    // 1.执行lua脚本
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );
    int r = result.intValue();
    // 2.判断结果是否为0
    if (r != 0) {
        return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
    }

    // 3.获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    // 4.返回订单id
    return Result.ok(orderId);
}

这段代码和第 6 章 BlockingQueue 版本相比,最明显的变化是:

没有 orderTasks.add(voucherOrder) 了。

为什么?

因为订单消息已经在 Lua 脚本里写入 Redis Stream 了。

请求线程不需要再把任务放进本地队列。


5. Lua 脚本:XADD 写入订单消息

seckill.lua 中最后几步:

-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*',
    'userId', userId,
    'voucherId', voucherId,
    'id', orderId)
return 0

这段 Lua 做了三件事:

1. Redis 库存 -1。
2. 把 userId 加入已下单 Set。
3. 向 stream.orders 写入订单消息。

第 3 步相当于 Redis 命令:

XADD stream.orders * userId 101 voucherId 10 id 123456789

这里的消息字段刚好能组成一个订单:

userId:谁下单
voucherId:买哪张券
id:订单 id

为什么 XADD 放在 Lua 里

如果 Java 先执行 Lua,然后 Lua 返回成功后,Java 再单独执行 XADD,会有一个风险:

Lua 已经扣了 Redis 库存、记录了用户
Java 还没来得及 XADD
服务宕机
订单消息没进入队列

把 XADD 放到 Lua 中,就能让这些动作在 Redis 内部作为一个整体执行:

判断资格
扣 Redis 库存
记录用户
写入 Stream 消息

要么脚本没执行成功。

要么这些 Redis 内部动作一起完成。

这就是 Stream 版比 BlockingQueue 版更可靠的关键点之一。


6. 后台线程什么时候启动

和第 6 章类似,项目启动后会启动一个后台任务:

private static final ExecutorService SECKILL_ORDER_EXECUTOR =
        Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

这里的含义是:

Spring 创建好 Service Bean 后,自动执行 init。
init 把 VoucherOrderHandler 提交给单线程线程池。
后台线程开始循环消费订单消息。

它不是用户请求来了才启动。

它是项目启动时就准备好了。


7. 后台线程如何读取 Stream 消息

后台任务核心代码:

List<MapRecord<String, Object, Object>> list =
        stringRedisTemplate.opsForStream().read(
                Consumer.from("g1", "c1"),
                StreamReadOptions.empty()
                        .count(1)
                        .block(Duration.ofSeconds(2)),
                StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
        );

这段 Java 对应 Redis 命令:

XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >

逐个解释:

Consumer.from("g1", "c1")
表示消费者 c1 属于消费者组 g1。

count(1)
表示一次最多读取 1 条消息。

block(Duration.ofSeconds(2))
表示没有消息时最多阻塞等待 2 秒。

stream.orders
表示读取订单消息队列。

ReadOffset.lastConsumed()
在消费者组正常消费场景中,对应读取新消息的语义。

所以这段代码的业务含义是:

后台消费者 c1 从 g1 消费者组中读取 stream.orders 的新订单消息。
如果暂时没有消息,就阻塞等待 2 秒。

8. MapRecord 是什么

opsForStream().read(...) 返回:

List<MapRecord<String, Object, Object>>

可以把 MapRecord 理解成:

从 Stream 里读出来的一条消息记录。

它包含:

1. Stream key
2. 消息 ID
3. 消息内容 Map

代码:

MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder =
        BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

这里的 value 大概长这样:

{
  "userId": "101",
  "voucherId": "10",
  "id": "123456789"
}

BeanUtil.fillBeanWithMap(...) 的作用是:

把 Map 中的字段填充到 VoucherOrder 对象中。

得到:

VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(101L);
voucherOrder.setVoucherId(10L);
voucherOrder.setId(123456789L);

这个对象就可以交给数据库落库方法处理了。


9. createVoucherOrder 仍然负责数据库落库

读取到订单消息后:

createVoucherOrder(voucherOrder);

事务方法中仍然会:

@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
    Long userId = voucherOrder.getUserId();
    int count = query()
            .eq("user_id", userId)
            .eq("voucher_id", voucherOrder.getVoucherId())
            .count();
    if (count > 0) {
       log.error("用户已经购买过了");
       return;
    }

    boolean success = seckillVoucherService.update()
            .setSql("stock = stock - 1")
            .eq("voucher_id", voucherOrder.getVoucherId())
            .gt("stock", 0)
            .update();
    if (!success) {
        log.error("库存不足");
        return;
    }
    save(voucherOrder);
}

这说明:

Stream 只是负责传递订单任务。
真正的数据库落库逻辑仍然在 createVoucherOrder。

Lua 成功不等于数据库订单已经创建。

Stream 消息被消费并成功执行 createVoucherOrder 后,数据库订单才真正落地。


10. 为什么处理成功后必须 ACK

正常处理成功后:

stringRedisTemplate.opsForStream()
        .acknowledge("stream.orders", "g1", record.getId());

对应 Redis 命令:

XACK stream.orders g1 消息ID

它告诉 Redis:

这条消息我已经处理成功了。
可以从 g1 消费者组的 pending-list 中移除。

如果不 ACK,会发生什么?

消息会一直留在 pending-list。

系统会认为:

这条消息已经投递过,但还没确认成功。

后续异常恢复逻辑可能会反复处理它。

所以 ACK 必须放在业务处理成功之后。

不能先 ACK 再落库。

如果先 ACK,再落库失败,就会变成:

Redis 认为消息完成了
但数据库订单没保存
消息也不会进入 pending-list 等待恢复

这就是典型的消息丢失风险。


11. 为什么异常时要处理 pending-list

后台线程外层有异常捕获:

try {
    // 读取消息
    // 解析消息
    // 创建订单
    // ACK
} catch (Exception e) {
    log.error("处理订单异常", e);
    handlePendingList();
}

如果异常发生在 ACK 之前,这条消息会留在 pending-list。

比如:

消费者读到了消息
消息进入 pending-list
消费者执行 createVoucherOrder 时异常
还没来得及 ACK

这时如果不处理 pending-list,消息就卡在那里。

所以要调用:

handlePendingList();

把未确认的消息重新拿出来处理。


12. handlePendingList 如何恢复异常消息

异常恢复代码核心:

List<MapRecord<String, Object, Object>> list =
        stringRedisTemplate.opsForStream().read(
                Consumer.from("g1", "c1"),
                StreamReadOptions.empty().count(1),
                StreamOffset.create("stream.orders", ReadOffset.from("0"))
        );

对应 Redis 命令:

XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0

注意最后是:

0

不是:

>

在消费者组中:

>:读取新消息
0:读取 pending-list 中未确认的旧消息

所以 handlePendingList() 做的是:

不断从 pending-list 中读取未确认消息。
读到后重新解析、重新创建订单、成功后 ACK。
直到 pending-list 为空才退出。

代码结构是:

while (true) {
    try {
        // 1.读取 pending-list
        // 2.如果为空,break
        // 3.解析消息
        // 4.创建订单
        // 5.ACK
    } catch (Exception e) {
        // 出错后稍微休眠,再继续重试
        Thread.sleep(20);
    }
}

pending-list 恢复流程图

正常消费异常

消息未 ACK

消息留在 pending-list

handlePendingList

XREADGROUP ... STREAMS stream.orders 0

是否读到消息?

pending-list 为空,结束

解析为 VoucherOrder

重新执行 createVoucherOrder

成功后 XACK


13. 为什么 Stream 版比 BlockingQueue 版更可靠

对比第 6 章和第 7 章:

第 6 章 BlockingQueue

Lua 成功
    ↓
Java 把 VoucherOrder 放入 JVM BlockingQueue
    ↓
后台线程 take
    ↓
落库

问题:

队列在 JVM 内存中。
服务宕机可能丢任务。
多实例队列不共享。

第 7 章 Stream

Lua 成功
    ↓
Lua 直接 XADD 写入 Redis Stream
    ↓
后台消费者组读取
    ↓
落库
    ↓
ACK

优势:

订单消息存在 Redis。
服务实例之间共享队列。
消费者处理失败后消息进入 pending-list。
成功后 ACK,失败可恢复。

对比图

第 6 章 BlockingQueue

消息在 JVM 内存

宕机可能丢任务

多实例队列不共享

第 7 章 Redis Stream

消息在 Redis

多实例共享 stream.orders

消费者组协作消费

ACK + pending-list 异常恢复


14. 这一版仍然要注意什么

Stream 版比 BlockingQueue 更可靠,但它不是“绝对不会出问题”。

仍然有几个点要注意。

注意 1:消费者组需要提前创建

使用 XREADGROUP 前,需要有消费者组。

通常要提前执行:

XGROUP CREATE stream.orders g1 0 MKSTREAM

否则消费者组不存在时,读取会失败。

注意 2:ACK 必须在业务成功后

正确顺序是:

读取消息
处理数据库落库
业务成功
ACK

不能先 ACK 再落库。

注意 3:createVoucherOrder 要具备幂等兜底

Stream 消息可能因为异常恢复被重复处理。

所以数据库层仍然要保留:

一人一单查询
stock > 0 条件扣库存
事务

否则重复消息可能导致重复订单或库存异常。

注意 4:这仍然是 Redis 方案,不是专业 MQ

Redis Stream 已经比 List 和 PubSub 更强。

但在更复杂的生产场景里,仍可能选择 RabbitMQ、Kafka、RocketMQ 这类专业 MQ。

黑马点评这里主要是用 Redis Stream 把异步秒杀链路讲通。


15. 本篇最容易混淆的几个点

1. Stream 版中 Java 请求线程还要不要 add 到 BlockingQueue

不需要。

订单消息已经由 Lua 脚本通过 XADD 写入 stream.orders

2. Lua 返回 0 是不是订单已经进数据库

不是。

Lua 返回 0 表示 Redis 资格判断成功,并且订单消息已经写入 Stream。

数据库落库由后台消费者完成。

3. ACK 是不是可以提前做

不能。

ACK 必须在数据库业务处理成功后执行。

提前 ACK 后如果业务失败,消息就失去了恢复机会。

4. pending-list 处理的是新消息吗

不是。

pending-list 处理的是已经投递给消费者但还没有 ACK 的旧消息。

正常读取新消息用 >

异常恢复读取 pending-list 用 0

5. Stream 版还需要数据库兜底吗

需要。

Stream 解决的是消息可靠投递和消费恢复。

数据库仍然是最终事实,需要事务、一人一单兜底、库存条件扣减。


16. 面试怎么回答

如果面试官问:黑马点评如何用 Redis Stream 实现异步秒杀下单?

可以这样回答:

秒杀请求进入后,Service 先获取当前用户 id 并生成订单 id,然后执行 Lua 脚本。Lua 在 Redis 中原子判断库存和一人一单,如果库存不足返回 1,如果重复下单返回 2;如果通过,就扣减 Redis 库存、把 userId 加入已下单 Set,并通过 XADD 向 stream.orders 写入包含 userId、voucherId、orderId 的订单消息。请求线程根据 Lua 返回值快速返回订单 id。项目启动时会启动后台线程,使用 XREADGROUP 以消费者组方式从 stream.orders 读取消息,解析成 VoucherOrder 后执行数据库扣库存和保存订单,成功后执行 XACK;如果处理异常,消息会留在 pending-list,再通过读取 pending-list 进行恢复处理。

如果面试官问:为什么 Stream 版比 BlockingQueue 版更可靠?

可以这样回答:

BlockingQueue 是 JVM 本地内存队列,服务宕机时队列中的订单任务会丢失,多实例部署时队列也不共享。Redis Stream 把订单消息存到 Redis 中,多个服务实例可以共享同一个队列,并且消费者组支持 ACK 和 pending-list。消费者处理成功后 ACK,处理失败或宕机时消息会留在 pending-list 中,后续可以重新读取处理,因此可靠性比本地阻塞队列更好。

如果面试官问:pending-list 在秒杀下单里有什么作用?

可以这样回答:

pending-list 用来保存已经投递给消费者但还没有 ACK 的消息。后台消费者读取订单消息后,如果数据库落库成功,就执行 XACK,消息从 pending-list 中移除;如果处理过程中异常或服务宕机,没有 ACK,这条消息会留在 pending-list 中。后续异常恢复逻辑可以用 XREADGROUP ... 0 读取 pending-list 中的消息重新处理,避免订单消息因为消费失败而丢失。


17. 总结

第 7 章最终版异步秒杀链路可以这样记:

请求线程生成 orderId
    ↓
执行 Lua
    ↓
Lua 判断资格、扣 Redis 库存、记录用户
    ↓
Lua XADD 写入 stream.orders
    ↓
请求线程返回 orderId
    ↓
后台线程 XREADGROUP 读取消息
    ↓
解析成 VoucherOrder
    ↓
数据库落库
    ↓
成功后 XACK
    ↓
异常时从 pending-list 恢复

最核心的一句话:

第 7 章不是推翻第 6 章的异步思想,而是把第 6 章的本地 BlockingQueue 升级成 Redis Stream,让订单消息从 JVM 内存任务变成 Redis 中可确认、可恢复、可被多实例共享的消息。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐