【Redis从入门到精通】第66篇:消息队列——用Redis实现轻量级MQ的四种方案
上一篇【第65篇】缓存穿透/击穿/雪崩——三大缓存坑的解决方案
下一篇【第67篇】Redis Stream——终于有了真正的消息队列
架构师:“我们需要一个消息队列,去买一套Kafka集群。”
老板:“预算多少?”
架构师:“服务器+运维,一年大概二十万吧。”
老板:“……我们只是要异步发个短信通知,一天才几百条。”
架构师:“那用Redis就行。”
老板:“Redis还能做消息队列?”
架构师:“不仅能,而且有四种做法。”
没错,Redis虽然本职是缓存和数据结构服务器,但它自带的消息队列能力足以应付很多场景。从简单的任务队列到带消费者组的专业级消息流,Redis都能胜任。本文带你逐一拆解四种方案,帮你在合适的场景做出合适的选择。
一、方案一:List消息队列(LPUSH + BRPOP)
1.1 基本原理
用Redis的List结构做消息队列是最直观的方案。生产者用LPUSH把消息塞到List头部,消费者用BRPOP从List尾部阻塞式获取消息。FIFO语义天然满足。
List消息队列工作模型:
生产者(Producer) 消费者(Consumer)
┌──────────┐ ┌──────────┐
│ APP-A │──LPUSH msg──┐ ┌──│ Worker1 │
└──────────┘ │ │ └──────────┘
▼ │
┌──────────┴─┐
│ msg_queue │
│ [m3,m2,m1] │ ← List,FIFO队列
└──────────┬─┘
▲ │
┌──────────┐ │ └──│ Worker2 │
│ APP-B │──LPUSH msg──┘ └──────────┘
└──────────┘ (BRPOP阻塞等待)
LPUSH:从左侧推入(可多个生产者)
BRPOP:从右侧阻塞弹出(可多个消费者竞争)
1.2 代码实现
// === 生产者 ===
public class ListProducer {
public void sendMessage(String queueName, String message) {
jedis.lpush(queueName, message);
// 可选:限制队列长度,防止内存撑爆
jedis.ltrim(queueName, 0, 99999); // 只保留最新10万条
}
}
// === 消费者 ===
public class ListConsumer {
public void startConsume(String queueName) {
while (!Thread.currentThread().isInterrupted()) {
// BRPOP:阻塞直到有消息或超时(0=永不超时)
List<String> result = jedis.brpop(0, queueName);
// result = [queueName, message]
if (result != null) {
String message = result.get(1);
processMessage(message); // 业务处理
}
}
}
}
命令详解:
BRPOP queue_name timeout
- queue_name:队列名
- timeout:阻塞超时秒数,0表示无限等待
- 返回值:[队列名, 消息内容] 的双元素数组
- 多个消费者BRPOP同一个队列 → 自然的负载均衡
1.3 优缺点
| 优点 | 缺点 |
|---|---|
| 实现极其简单,几行代码搞定 | 不支持消息确认(ACK),消费失败就丢了 |
| FIFO语义天然保证 | 不支持消费者组,无法广播 |
| 阻塞读取,不浪费CPU | 不支持消息持久化(List在内存中) |
| 天然支持多消费者竞争消费 | 不支持消息回溯和重复消费 |
| Redis原生支持,无需额外依赖 | 消息堆积过多会占用大量内存 |
⚠️ 注意:BRPOP拿到的消息一旦弹出就从List中删除了。如果消费者拿到消息后崩溃,这条消息就永久丢失了。可以用BRPOPLPUSH做一个"备份队列"——从队列A取出消息的同时推到队列B(备份),处理成功后再从队列B删除。
二、方案二:发布订阅(Pub/Sub)
2.1 基本原理
Pub/Sub是Redis的"广播"机制。生产者发布消息到频道(Channel),所有订阅了该频道的消费者都会收到消息。这是一对多的模式,不是队列。
Pub/Sub工作模型:
生产者(Publishers)
┌──────────┐
│ APP-A │──PUBLISH order:created "new order #123"
└──────────┘
┌──────────┐
│ APP-B │──PUBLISH order:created "new order #456"
└──────────┘
│ 渠道: order:created
│ ┌─────────────────┐
└──────────────────────────►│ Redis Pub/Sub │
└────────┬────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 发送短信 │ │ 更新统计 │ │ WebSocket │
│ Worker │ │ Worker │ │ 推送 │
└──────────┘ └──────────┘ └──────────┘
所有订阅者同时收到消息!
2.2 代码实现
// === 发布者 ===
public class Publisher {
public void publish(String channel, String message) {
jedis.publish(channel, message);
}
}
// === 订阅者 ===
public class Subscriber extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.printf("[%s] 收到消息: %s%n", channel, message);
processMessage(message);
}
public void subscribe(String... channels) {
// 注意:subscribe是阻塞方法
jedis.subscribe(this, channels);
}
// 启动订阅(在独立线程中运行)
public void startInBackground() {
new Thread(() -> {
try (Jedis jedis = pool.getResource()) {
jedis.subscribe(this, "order:created", "user:registered");
}
}).start();
}
}
2.3 致命缺陷
Pub/Sub有三个致命问题,使其不适合作为可靠的消息队列:
缺陷1:消息"即发即忘"
┌─────────────────────────────────┐
│ 发布时如果没有订阅者在监听 │
│ → 消息直接丢弃,永不送达 │
│ │
│ 类比:你对着空房间喊话,声音消失了 │
└─────────────────────────────────┘
缺陷2:没有消息持久化
┌─────────────────────────────────┐
│ Redis宕机 → 所有未处理消息丢失 │
│ Redis重启 → 历史消息全部清零 │
│ │
│ 类比:广播电台停电,节目中断 │
└─────────────────────────────────┘
缺陷3:消费者离线不积压
┌─────────────────────────────────┐
│ 消费者断开连接 → 期间消息全部丢失 │
│ 消费者重连 → 只收到重连后的消息 │
│ │
│ 类比:你关掉收音机 → 错过的节目 │
│ 永远听不到 │
└─────────────────────────────────┘
⚠️ 注意:Pub/Sub适合实时性要求高、允许消息丢失的场景。比如:实时聊天消息广播、配置变更通知、集群节点上下线通知。千万不要用它做订单通知——订单丢了可不是闹着玩的。
三、方案三:ZSet延迟队列
3.1 基本原理
利用ZSet的score作为"待处理时间戳",消费者定时轮询score <= 当前时间的消息,实现延迟队列。
ZSet延迟队列模型:
ZSet结构:
┌────────────────────────────────────┐
│ Key: delay:order:queue │
│ │
│ Member (消息体) Score (时间戳) │
│ ───────────────── ───────────────│
│ "order:1001" 1716710400 │ ← 1小时后处理
│ "order:1002" 1716714000 │ ← 2小时后处理
│ "order:1003" 1716706800 │ ← 现在就该处理
│ "order:1004" 1716721200 │ ← 4小时后处理
│ "order:1005" 1716706800 │ ← 现在就该处理
└────────────────────────────────────┘
消费者轮询逻辑:
每分钟执行一次:
ZRANGEBYSCORE delay:order:queue 0 <NOW> LIMIT 0 10
→ 获取score <= 当前时间的10条消息
→ 处理这些消息
→ 处理成功后 ZREM 删除
3.2 代码实现
// === 延迟消息生产者 ===
public class DelayProducer {
public void sendDelayMessage(String queueKey, String message, long delaySeconds) {
long executeTime = System.currentTimeMillis() / 1000 + delaySeconds;
jedis.zadd(queueKey, executeTime, message);
}
}
// === 延迟消息消费者 ===
public class DelayConsumer {
private static final String QUEUE_KEY = "delay:order:queue";
private static final String PROCESSING_KEY = "delay:order:processing";
@Scheduled(fixedDelay = 1000) // 每秒轮询一次
public void consume() {
long now = System.currentTimeMillis() / 1000;
// 1. 获取所有到期的消息(原子操作)
// ZRANGEBYSCORE获取到期的消息,转移到处理中的列表
Set<String> messages = jedis.zrangeByScore(QUEUE_KEY, 0, now, 0, 10);
for (String message : messages) {
// 2. 原子地将消息从待处理移到处理中(防止重复消费)
Long result = jedis.zrem(QUEUE_KEY, message);
if (result == 0) {
continue; // 被其他消费者抢走了
}
// 3. 处理消息
try {
processMessage(message);
} catch (Exception e) {
// 4. 处理失败 → 重新放回队列(延迟重试)
long retryTime = now + 60; // 60秒后重试
jedis.zadd(QUEUE_KEY, retryTime, message);
log.error("消息处理失败,60秒后重试: {}", message, e);
}
}
}
}
经典应用场景:
1. 订单超时取消
下单时 ZADD delay:order 1716714000 "order:1001"
30分钟后消费者扫描到该订单
→ 检查支付状态 → 未支付 → 取消订单
2. 红包24小时过期退款
发红包时 ZADD delay:redpacket <24小时后> "packet:abc"
到期后消费者处理 → 退回余额
3. 定时提醒
设置提醒 ZADD delay:reminder <提醒时间> "user:123:remind:meeting"
到期触发通知推送
4. 异步任务调度
提交异步任务,指定执行时间
3.3 优缺点
| 优点 | 缺点 |
|---|---|
| 原生支持延迟/定时任务 | 需要轮询,延迟受轮询间隔影响 |
| 无需额外中间件 | 高并发下ZREM竞争激烈 |
| 消息可靠(不消费成功不删除) | 大量消息时ZSet性能会有影响 |
| 支持优先级(score越小越优先) | 不支持ACK和消费者组 |
四、方案四:Redis Stream(真正的消息队列)
4.1 Stream的核心特性
Redis 5.0引入的Stream 数据结构,让Redis真正拥有了"专业级消息队列"的能力。
Redis Stream的本质:
Stream = 日志 + 消费者组
┃
┃ 像Kafka的Topic分区
┃ 像RabbitMQ的消费者组
┃ 但更轻量、更低延迟
┃
▼
┌─────────────────────────────────────────┐
│ Stream: orders │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 1700000001│→│ 1700000002│→│ 1700000003│ │
│ │-0 │ │-0 │ │-0 │ │
│ │{ │ │{ │ │{ │ │
│ │ order:100│ │ order:101│ │ order:102│ │
│ │ amount:99│ │ amount:58│ │ amount:20│ │
│ │} │ │} │ │} │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 消费者组: order-processors │
│ ┌─────────┐ ┌──────────┐ │
│ │Consumer1│ │ Consumer2│ │
│ │ pending │ │ pending │ │
│ └─────────┘ └──────────┘ │
└─────────────────────────────────────────┘
# 基础命令演示
# 1. 添加消息
XADD orders * order_id "1001" amount "99.9"
# 返回: "1716700000001-0" (Redis自动生成的ID)
# 2. 查看消息数量
XLEN orders
# 返回: (integer) 1
# 3. 读取消息(从最早开始)
XREAD COUNT 2 STREAMS orders 0-0
# 返回所有消息
# 4. 读取新消息(阻塞模式)
XREAD BLOCK 5000 STREAMS orders $
# 阻塞5秒等待新消息($ = 只读最新消息)
# 5. 范围查询
XRANGE orders - + COUNT 10
# 查询最早到最新的10条消息
# 6. 反向查询
XREVRANGE orders + - COUNT 10
# 查询最新到最早的10条消息
Stream是本文的重点,下一篇(第067篇)会有完整的深入讲解,这里先建立基本概念。
五、四种方案完整对比
5.1 多维对比总表
| 维度 | List(BRPOP) | Pub/Sub | ZSet延迟队列 | Stream |
|---|---|---|---|---|
| Redis版本 | 1.0+ | 2.0+ | 1.2+ | 5.0+ |
| 消费模式 | 竞争消费 | 广播消费 | 竞争消费 | 竞争消费+消费者组 |
| 消息ACK | 不支持 | 不支持 | 手动(ZREM) | 支持(XACK) |
| 消息持久化 | 否(纯内存) | 否(纯内存) | 是(RDB/AOF) | 是(RDB/AOF) |
| 消息回溯 | 不支持 | 不支持 | 支持 | 支持 |
| 消息堆积 | 受内存限制 | 无堆积 | 受内存限制 | 受内存限制(可设MAXLEN) |
| 延迟能力 | 实时 | 实时 | 原生支持延迟 | 原生不支持延迟 |
| 消费者组 | 不支持 | 不支持 | 不支持 | 原生支持 |
| Pending管理 | 无 | 无 | 无 | XPENDING/XCLAIM |
| 吞吐量 | 高(10万+/s) | 极高(50万+/s) | 中(1万+/s) | 高(10万+/s) |
| 实现复杂度 | ★☆☆☆☆ | ★☆☆☆☆ | ★★☆☆☆ | ★★★★☆ |
| 可靠性 | 低 | 极低 | 中 | 高 |
5.2 选型建议
你的需求是什么?
│
├─ 实时通知、配置下发、聊天广播
│ → Pub/Sub
│ 优势:零延迟,所有订阅者同时收到
│ 代价:消息可能丢失,不支持回溯
│
├─ 简单的异步任务队列,消息不能丢
│ → List + BRPOP + 备份队列
│ 优势:实现简单,天然FIFO
│ 代价:没有ACK,需要自己处理失败重试
│
├─ 延迟任务(订单超时、定时提醒)
│ → ZSet延迟队列
│ 优势:原生支持延迟,不依赖额外组件
│ 代价:需要轮询,大量任务时有性能开销
│
├─ 真正需要可靠消息队列的场景
│ → Stream
│ 优势:ACK、消费者组、消息回溯,接近专业MQ
│ 代价:实现复杂度较高,需要Redis 5.0+
│
└─ 需要事务消息、严格顺序、海量堆积
→ 别用Redis了,上Kafka/RabbitMQ吧
原因:专业MQ在这些方面有不可替代的优势
六、Redis作为MQ与Kafka/RabbitMQ的横向对比
6.1 定位对比
Kafka:
┌─────────────────────────────────────────────┐
│ 定位:分布式流处理平台 │
│ 优势:海量数据(百万QPS)、持久化到磁盘、分区并行 │
│ 生态:Kafka Connect、KSQL、Kafka Streams │
│ 场景:日志收集、埋点、大数据管道 │
│ 部署:需要ZooKeeper/KRaft + 多节点集群 │
└─────────────────────────────────────────────┘
RabbitMQ:
┌─────────────────────────────────────────────┐
│ 定位:企业级消息中间件 │
│ 优势:AMQP标准、灵活路由、事务消息 │
│ 生态:管理后台、联邦/Shovel插件、死信队列 │
│ 场景:订单系统、支付回调、微服务异步通信 │
│ 部署:Erlang环境 + RabbitMQ集群 │
└─────────────────────────────────────────────┘
Redis(作为MQ):
┌─────────────────────────────────────────────┐
│ 定位:轻量级消息队列 │
│ 优势:部署极简(你本来就装了Redis)、亚毫秒延迟 │
│ 场景:中小规模异步任务、缓存失效通知、轻量解耦 │
│ 部署:单个Redis实例(你可能已经跑着了) │
│ 局限:内存限制、持久化靠RDB/AOF、无事务消息 │
└─────────────────────────────────────────────┘
6.2 关键能力对比
| 能力 | Redis(Stream) | Kafka | RabbitMQ |
|---|---|---|---|
| 消息确认(ACK) | 支持 | 支持 | 支持 |
| 消息持久化 | RDB/AOF(异步) | 磁盘(同步) | 磁盘+内存 |
| 堆积容量 | 受内存限制(GB级) | 无限(磁盘,TB级) | 受磁盘限制 |
| 严格有序 | 单Stream保证 | 分区内保证 | 队列内保证 |
| 事务消息 | 不支持 | 有限支持 | 完整支持 |
| 延迟消息 | 需ZSet配合 | 不支持原生 | 插件支持 |
| 死信队列 | 需手动实现 | 需手动实现 | 原生支持 |
| 运维复杂度 | 极低 | 高 | 中 |
| 最低延迟 | <1ms | 2-10ms | 1-5ms |
| 最大吞吐(单节点) | 10万+/s | 百万+/s | 5万+/s |
6.3 Redis作为MQ的局限性
Redis做MQ,以下场景请三思:
1. 严格的消息顺序保证
→ Redis Stream只保证单个Stream内的顺序
→ 分布式环境下还有主从延迟问题
2. 海量堆积(TB级别)
→ Redis数据在内存中,堆积=内存爆炸
→ Kafka靠磁盘,TB级别堆积毫无压力
3. 事务消息(发消息 + 数据库操作 = 原子性)
→ Redis的事务不支持回滚
→ RabbitMQ有Channel事务和Publisher Confirm
4. 严格的发布-确认模型
→ 生产者的消息必须到达且不丢失
→ Kafka和RabbitMQ都有成熟的确认机制
5. 复杂的路由规则
→ 按topic+tags+headers路由
→ RabbitMQ的Exchange体系更成熟
6. 企业级运维要求
→ 监控大盘、消息追踪、死信处理、审计日志
→ 专业MQ开箱即用,Redis需要大量自研
⚠️ 注意:一个常见误区是"Redis速度快,用它做MQ肯定也快"。速度快不代表适合。消息队列的核心诉求是可靠和有序,而Redis的核心优势是快速和简单。用百米短跑选手去跑马拉松,姿势可能帅,但未必能跑完全程。
七、总结:什么时候用Redis做MQ?
给你一份"Redis MQ使用决策树":
决策树:
你的日均消息量 < 1000万 ?
├─ YES →
│ 你的消息可以接受丢失吗?
│ ├─ YES → Pub/Sub
│ └─ NO →
│ 你的消息需要延迟投递吗?
│ ├─ YES → ZSet延迟队列
│ └─ NO →
│ 你需要消费者组和消息重试吗?
│ ├─ NO → List + BRPOP
│ └─ YES → Stream
│
└─ NO →
你需要事务消息、海量堆积或复杂路由?
├─ 至少一项YES → 上Kafka/RabbitMQ
└─ 都不需要 → Stream(做好容量规划)
Redis用好了可以做很多事,但永远记住:合适的工具解决合适的问题。如果你只需要一天发几百条短信通知,别大费周章部署Kafka。反过来,如果你在做电商核心交易链路,别拿Redis Stream去碰运气——该用的中间件还是得用。
下一篇文章,我们将深入Redis Stream的每一个细节——消费者组、ACK机制、消息重试、死信处理,敬请期待。
上一篇【第65篇】缓存穿透/击穿/雪崩——三大缓存坑的解决方案
下一篇【第67篇】Redis Stream——终于有了真正的消息队列
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)