黑马点评-Redis 消息队列-04_stream_seckill_order
黑马点评 Redis 消息队列四:Stream 如何改造异步秒杀下单?
本文继续整理黑马点评 Redis 实战篇第 7 章「Redis 消息队列」。
前三篇讲完了为什么需要 Redis MQ、List 和 PubSub 为什么不够、Stream 消费者组强在哪里。
这一篇回到黑马点评最终业务:如何用 Redis Stream 改造第 6 章 BlockingQueue 异步秒杀下单。
1. 这篇文章解决什么问题
第 7 章最终落点是:
把第 6 章的本地 BlockingQueue 换成 Redis Stream。
也就是说,订单任务不再这样交接:
请求线程 -> JVM BlockingQueue -> 当前 JVM 后台线程
而是变成:
Lua 脚本 -> Redis Stream -> 消费者组后台线程
这篇文章主要讲清楚:
1. Lua 脚本如何把订单消息写入 stream.orders。
2. 请求线程为什么不再手动 add 到 BlockingQueue。
3. 后台线程如何用 XREADGROUP 读取订单消息。
4. 消费成功后为什么必须 XACK。
5. 异常时为什么要处理 pending-list。
先给结论:
Stream 版异步秒杀中,请求线程只负责生成订单 id 并执行 Lua。Lua 在 Redis 内部完成库存判断、一人一单判断、扣 Redis 库存、记录用户,并通过 XADD 把订单消息写入 stream.orders。后台线程以消费者组身份阻塞读取 Stream 消息,解析成 VoucherOrder 后执行数据库落库,成功后 ACK;如果处理异常,消息会留在 pending-list 中,再由异常恢复逻辑重新处理。
2. 最终链路总览
Stream 版秒杀下单可以拆成两段。
第一段是请求线程 + Lua:
前端请求 /voucher-order/seckill/{id}
↓
Controller 调用 seckillVoucher(voucherId)
↓
获取当前用户 userId
↓
生成 orderId
↓
执行 seckill.lua
↓
Lua 判断库存和一人一单
↓
Lua 扣 Redis 库存、记录用户、XADD 写入 stream.orders
↓
Java 根据返回值返回结果
第二段是后台消费者:
项目启动后创建后台线程
↓
后台线程 XREADGROUP 读取 stream.orders 新消息
↓
解析消息为 VoucherOrder
↓
执行数据库扣库存和保存订单
↓
成功后 XACK
↓
异常时处理 pending-list
完整流程图
注意这张图里最重要的变化:
订单消息不是 Java 请求线程 add 到 BlockingQueue。
订单消息是 Lua 脚本直接 XADD 到 Redis Stream。
3. 秒杀入口:Controller 仍然很简单
秒杀入口仍然是:
@PostMapping("/seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
Controller 不关心你下面是 BlockingQueue 还是 Stream。
它只负责:
接收 voucherId
调用 Service
返回 Result
真正变化都在 VoucherOrderServiceImpl 和 seckill.lua。
4. 请求线程:生成订单 id 并执行 Lua
Service 中秒杀方法的核心:
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 4.返回订单id
return Result.ok(orderId);
}
这段代码和第 6 章 BlockingQueue 版本相比,最明显的变化是:
没有 orderTasks.add(voucherOrder) 了。
为什么?
因为订单消息已经在 Lua 脚本里写入 Redis Stream 了。
请求线程不需要再把任务放进本地队列。
5. Lua 脚本:XADD 写入订单消息
seckill.lua 中最后几步:
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*',
'userId', userId,
'voucherId', voucherId,
'id', orderId)
return 0
这段 Lua 做了三件事:
1. Redis 库存 -1。
2. 把 userId 加入已下单 Set。
3. 向 stream.orders 写入订单消息。
第 3 步相当于 Redis 命令:
XADD stream.orders * userId 101 voucherId 10 id 123456789
这里的消息字段刚好能组成一个订单:
userId:谁下单
voucherId:买哪张券
id:订单 id
为什么 XADD 放在 Lua 里
如果 Java 先执行 Lua,然后 Lua 返回成功后,Java 再单独执行 XADD,会有一个风险:
Lua 已经扣了 Redis 库存、记录了用户
Java 还没来得及 XADD
服务宕机
订单消息没进入队列
把 XADD 放到 Lua 中,就能让这些动作在 Redis 内部作为一个整体执行:
判断资格
扣 Redis 库存
记录用户
写入 Stream 消息
要么脚本没执行成功。
要么这些 Redis 内部动作一起完成。
这就是 Stream 版比 BlockingQueue 版更可靠的关键点之一。
6. 后台线程什么时候启动
和第 6 章类似,项目启动后会启动一个后台任务:
private static final ExecutorService SECKILL_ORDER_EXECUTOR =
Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
这里的含义是:
Spring 创建好 Service Bean 后,自动执行 init。
init 把 VoucherOrderHandler 提交给单线程线程池。
后台线程开始循环消费订单消息。
它不是用户请求来了才启动。
它是项目启动时就准备好了。
7. 后台线程如何读取 Stream 消息
后台任务核心代码:
List<MapRecord<String, Object, Object>> list =
stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty()
.count(1)
.block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
这段 Java 对应 Redis 命令:
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
逐个解释:
Consumer.from("g1", "c1")
表示消费者 c1 属于消费者组 g1。
count(1)
表示一次最多读取 1 条消息。
block(Duration.ofSeconds(2))
表示没有消息时最多阻塞等待 2 秒。
stream.orders
表示读取订单消息队列。
ReadOffset.lastConsumed()
在消费者组正常消费场景中,对应读取新消息的语义。
所以这段代码的业务含义是:
后台消费者 c1 从 g1 消费者组中读取 stream.orders 的新订单消息。
如果暂时没有消息,就阻塞等待 2 秒。
8. MapRecord 是什么
opsForStream().read(...) 返回:
List<MapRecord<String, Object, Object>>
可以把 MapRecord 理解成:
从 Stream 里读出来的一条消息记录。
它包含:
1. Stream key
2. 消息 ID
3. 消息内容 Map
代码:
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder =
BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
这里的 value 大概长这样:
{
"userId": "101",
"voucherId": "10",
"id": "123456789"
}
BeanUtil.fillBeanWithMap(...) 的作用是:
把 Map 中的字段填充到 VoucherOrder 对象中。
得到:
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(101L);
voucherOrder.setVoucherId(10L);
voucherOrder.setId(123456789L);
这个对象就可以交给数据库落库方法处理了。
9. createVoucherOrder 仍然负责数据库落库
读取到订单消息后:
createVoucherOrder(voucherOrder);
事务方法中仍然会:
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
int count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId())
.count();
if (count > 0) {
log.error("用户已经购买过了");
return;
}
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if (!success) {
log.error("库存不足");
return;
}
save(voucherOrder);
}
这说明:
Stream 只是负责传递订单任务。
真正的数据库落库逻辑仍然在 createVoucherOrder。
Lua 成功不等于数据库订单已经创建。
Stream 消息被消费并成功执行 createVoucherOrder 后,数据库订单才真正落地。
10. 为什么处理成功后必须 ACK
正常处理成功后:
stringRedisTemplate.opsForStream()
.acknowledge("stream.orders", "g1", record.getId());
对应 Redis 命令:
XACK stream.orders g1 消息ID
它告诉 Redis:
这条消息我已经处理成功了。
可以从 g1 消费者组的 pending-list 中移除。
如果不 ACK,会发生什么?
消息会一直留在 pending-list。
系统会认为:
这条消息已经投递过,但还没确认成功。
后续异常恢复逻辑可能会反复处理它。
所以 ACK 必须放在业务处理成功之后。
不能先 ACK 再落库。
如果先 ACK,再落库失败,就会变成:
Redis 认为消息完成了
但数据库订单没保存
消息也不会进入 pending-list 等待恢复
这就是典型的消息丢失风险。
11. 为什么异常时要处理 pending-list
后台线程外层有异常捕获:
try {
// 读取消息
// 解析消息
// 创建订单
// ACK
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
如果异常发生在 ACK 之前,这条消息会留在 pending-list。
比如:
消费者读到了消息
消息进入 pending-list
消费者执行 createVoucherOrder 时异常
还没来得及 ACK
这时如果不处理 pending-list,消息就卡在那里。
所以要调用:
handlePendingList();
把未确认的消息重新拿出来处理。
12. handlePendingList 如何恢复异常消息
异常恢复代码核心:
List<MapRecord<String, Object, Object>> list =
stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
对应 Redis 命令:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0
注意最后是:
0
不是:
>
在消费者组中:
>:读取新消息
0:读取 pending-list 中未确认的旧消息
所以 handlePendingList() 做的是:
不断从 pending-list 中读取未确认消息。
读到后重新解析、重新创建订单、成功后 ACK。
直到 pending-list 为空才退出。
代码结构是:
while (true) {
try {
// 1.读取 pending-list
// 2.如果为空,break
// 3.解析消息
// 4.创建订单
// 5.ACK
} catch (Exception e) {
// 出错后稍微休眠,再继续重试
Thread.sleep(20);
}
}
pending-list 恢复流程图
13. 为什么 Stream 版比 BlockingQueue 版更可靠
对比第 6 章和第 7 章:
第 6 章 BlockingQueue
Lua 成功
↓
Java 把 VoucherOrder 放入 JVM BlockingQueue
↓
后台线程 take
↓
落库
问题:
队列在 JVM 内存中。
服务宕机可能丢任务。
多实例队列不共享。
第 7 章 Stream
Lua 成功
↓
Lua 直接 XADD 写入 Redis Stream
↓
后台消费者组读取
↓
落库
↓
ACK
优势:
订单消息存在 Redis。
服务实例之间共享队列。
消费者处理失败后消息进入 pending-list。
成功后 ACK,失败可恢复。
对比图
14. 这一版仍然要注意什么
Stream 版比 BlockingQueue 更可靠,但它不是“绝对不会出问题”。
仍然有几个点要注意。
注意 1:消费者组需要提前创建
使用 XREADGROUP 前,需要有消费者组。
通常要提前执行:
XGROUP CREATE stream.orders g1 0 MKSTREAM
否则消费者组不存在时,读取会失败。
注意 2:ACK 必须在业务成功后
正确顺序是:
读取消息
处理数据库落库
业务成功
ACK
不能先 ACK 再落库。
注意 3:createVoucherOrder 要具备幂等兜底
Stream 消息可能因为异常恢复被重复处理。
所以数据库层仍然要保留:
一人一单查询
stock > 0 条件扣库存
事务
否则重复消息可能导致重复订单或库存异常。
注意 4:这仍然是 Redis 方案,不是专业 MQ
Redis Stream 已经比 List 和 PubSub 更强。
但在更复杂的生产场景里,仍可能选择 RabbitMQ、Kafka、RocketMQ 这类专业 MQ。
黑马点评这里主要是用 Redis Stream 把异步秒杀链路讲通。
15. 本篇最容易混淆的几个点
1. Stream 版中 Java 请求线程还要不要 add 到 BlockingQueue
不需要。
订单消息已经由 Lua 脚本通过 XADD 写入 stream.orders。
2. Lua 返回 0 是不是订单已经进数据库
不是。
Lua 返回 0 表示 Redis 资格判断成功,并且订单消息已经写入 Stream。
数据库落库由后台消费者完成。
3. ACK 是不是可以提前做
不能。
ACK 必须在数据库业务处理成功后执行。
提前 ACK 后如果业务失败,消息就失去了恢复机会。
4. pending-list 处理的是新消息吗
不是。
pending-list 处理的是已经投递给消费者但还没有 ACK 的旧消息。
正常读取新消息用 >。
异常恢复读取 pending-list 用 0。
5. Stream 版还需要数据库兜底吗
需要。
Stream 解决的是消息可靠投递和消费恢复。
数据库仍然是最终事实,需要事务、一人一单兜底、库存条件扣减。
16. 面试怎么回答
如果面试官问:黑马点评如何用 Redis Stream 实现异步秒杀下单?
可以这样回答:
秒杀请求进入后,Service 先获取当前用户 id 并生成订单 id,然后执行 Lua 脚本。Lua 在 Redis 中原子判断库存和一人一单,如果库存不足返回 1,如果重复下单返回 2;如果通过,就扣减 Redis 库存、把 userId 加入已下单 Set,并通过 XADD 向
stream.orders写入包含 userId、voucherId、orderId 的订单消息。请求线程根据 Lua 返回值快速返回订单 id。项目启动时会启动后台线程,使用XREADGROUP以消费者组方式从stream.orders读取消息,解析成 VoucherOrder 后执行数据库扣库存和保存订单,成功后执行 XACK;如果处理异常,消息会留在 pending-list,再通过读取 pending-list 进行恢复处理。
如果面试官问:为什么 Stream 版比 BlockingQueue 版更可靠?
可以这样回答:
BlockingQueue 是 JVM 本地内存队列,服务宕机时队列中的订单任务会丢失,多实例部署时队列也不共享。Redis Stream 把订单消息存到 Redis 中,多个服务实例可以共享同一个队列,并且消费者组支持 ACK 和 pending-list。消费者处理成功后 ACK,处理失败或宕机时消息会留在 pending-list 中,后续可以重新读取处理,因此可靠性比本地阻塞队列更好。
如果面试官问:pending-list 在秒杀下单里有什么作用?
可以这样回答:
pending-list 用来保存已经投递给消费者但还没有 ACK 的消息。后台消费者读取订单消息后,如果数据库落库成功,就执行 XACK,消息从 pending-list 中移除;如果处理过程中异常或服务宕机,没有 ACK,这条消息会留在 pending-list 中。后续异常恢复逻辑可以用
XREADGROUP ... 0读取 pending-list 中的消息重新处理,避免订单消息因为消费失败而丢失。
17. 总结
第 7 章最终版异步秒杀链路可以这样记:
请求线程生成 orderId
↓
执行 Lua
↓
Lua 判断资格、扣 Redis 库存、记录用户
↓
Lua XADD 写入 stream.orders
↓
请求线程返回 orderId
↓
后台线程 XREADGROUP 读取消息
↓
解析成 VoucherOrder
↓
数据库落库
↓
成功后 XACK
↓
异常时从 pending-list 恢复
最核心的一句话:
第 7 章不是推翻第 6 章的异步思想,而是把第 6 章的本地 BlockingQueue 升级成 Redis Stream,让订单消息从 JVM 内存任务变成 Redis 中可确认、可恢复、可被多实例共享的消息。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)