上一篇【第65篇】缓存穿透/击穿/雪崩——三大缓存坑的解决方案
下一篇【第67篇】Redis Stream——终于有了真正的消息队列


架构师:“我们需要一个消息队列,去买一套Kafka集群。”
老板:“预算多少?”
架构师:“服务器+运维,一年大概二十万吧。”
老板:“……我们只是要异步发个短信通知,一天才几百条。”
架构师:“那用Redis就行。”
老板:“Redis还能做消息队列?”
架构师:“不仅能,而且有四种做法。”

没错,Redis虽然本职是缓存和数据结构服务器,但它自带的消息队列能力足以应付很多场景。从简单的任务队列到带消费者组的专业级消息流,Redis都能胜任。本文带你逐一拆解四种方案,帮你在合适的场景做出合适的选择。

一、方案一:List消息队列(LPUSH + BRPOP)

1.1 基本原理

用Redis的List结构做消息队列是最直观的方案。生产者用LPUSH把消息塞到List头部,消费者用BRPOP从List尾部阻塞式获取消息。FIFO语义天然满足。

List消息队列工作模型:

  生产者(Producer)                  消费者(Consumer)
  ┌──────────┐                     ┌──────────┐
  │  APP-A   │──LPUSH msg──┐    ┌──│  Worker1  │
  └──────────┘             │    │  └──────────┘
                           ▼    │
                     ┌──────────┴─┐
                     │  msg_queue  │
                     │ [m3,m2,m1]  │  ← List,FIFO队列
                     └──────────┬─┘
                           ▲    │
  ┌──────────┐             │    └──│  Worker2  │
  │  APP-B   │──LPUSH msg──┘       └──────────┘
  └──────────┘                        (BRPOP阻塞等待)

  LPUSH:从左侧推入(可多个生产者)
  BRPOP:从右侧阻塞弹出(可多个消费者竞争)

1.2 代码实现

// === 生产者 ===
public class ListProducer {
    public void sendMessage(String queueName, String message) {
        jedis.lpush(queueName, message);
        // 可选:限制队列长度,防止内存撑爆
        jedis.ltrim(queueName, 0, 99999); // 只保留最新10万条
    }
}

// === 消费者 ===
public class ListConsumer {
    public void startConsume(String queueName) {
        while (!Thread.currentThread().isInterrupted()) {
            // BRPOP:阻塞直到有消息或超时(0=永不超时)
            List<String> result = jedis.brpop(0, queueName);
            // result = [queueName, message]
            if (result != null) {
                String message = result.get(1);
                processMessage(message); // 业务处理
            }
        }
    }
}
命令详解:
  BRPOP queue_name timeout
  - queue_name:队列名
  - timeout:阻塞超时秒数,0表示无限等待
  - 返回值:[队列名, 消息内容] 的双元素数组
  - 多个消费者BRPOP同一个队列 → 自然的负载均衡

1.3 优缺点

优点 缺点
实现极其简单,几行代码搞定 不支持消息确认(ACK),消费失败就丢了
FIFO语义天然保证 不支持消费者组,无法广播
阻塞读取,不浪费CPU 不支持消息持久化(List在内存中)
天然支持多消费者竞争消费 不支持消息回溯和重复消费
Redis原生支持,无需额外依赖 消息堆积过多会占用大量内存

⚠️ 注意:BRPOP拿到的消息一旦弹出就从List中删除了。如果消费者拿到消息后崩溃,这条消息就永久丢失了。可以用BRPOPLPUSH做一个"备份队列"——从队列A取出消息的同时推到队列B(备份),处理成功后再从队列B删除。

二、方案二:发布订阅(Pub/Sub)

2.1 基本原理

Pub/Sub是Redis的"广播"机制。生产者发布消息到频道(Channel),所有订阅了该频道的消费者都会收到消息。这是一对多的模式,不是队列。

Pub/Sub工作模型:

  生产者(Publishers)
  ┌──────────┐
  │  APP-A   │──PUBLISH order:created "new order #123"
  └──────────┘
  ┌──────────┐
  │  APP-B   │──PUBLISH order:created "new order #456"
  └──────────┘
       │                             渠道: order:created
       │                           ┌─────────────────┐
       └──────────────────────────►│   Redis Pub/Sub  │
                                   └────────┬────────┘
                                            │
                          ┌─────────────────┼─────────────────┐
                          │                 │                 │
                          ▼                 ▼                 ▼
                    ┌──────────┐     ┌──────────┐     ┌──────────┐
                    │ 发送短信  │     │ 更新统计  │     │ WebSocket │
                    │ Worker   │     │ Worker   │     │  推送     │
                    └──────────┘     └──────────┘     └──────────┘
                        所有订阅者同时收到消息!

2.2 代码实现

// === 发布者 ===
public class Publisher {
    public void publish(String channel, String message) {
        jedis.publish(channel, message);
    }
}

// === 订阅者 ===
public class Subscriber extends JedisPubSub {
    
    @Override
    public void onMessage(String channel, String message) {
        System.out.printf("[%s] 收到消息: %s%n", channel, message);
        processMessage(message);
    }
    
    public void subscribe(String... channels) {
        // 注意:subscribe是阻塞方法
        jedis.subscribe(this, channels);
    }
    
    // 启动订阅(在独立线程中运行)
    public void startInBackground() {
        new Thread(() -> {
            try (Jedis jedis = pool.getResource()) {
                jedis.subscribe(this, "order:created", "user:registered");
            }
        }).start();
    }
}

2.3 致命缺陷

Pub/Sub有三个致命问题,使其不适合作为可靠的消息队列:

缺陷1:消息"即发即忘"
  ┌─────────────────────────────────┐
  │ 发布时如果没有订阅者在监听        │
  │ → 消息直接丢弃,永不送达          │
  │                                  │
  │ 类比:你对着空房间喊话,声音消失了  │
  └─────────────────────────────────┘

缺陷2:没有消息持久化
  ┌─────────────────────────────────┐
  │ Redis宕机 → 所有未处理消息丢失    │
  │ Redis重启 → 历史消息全部清零     │
  │                                  │
  │ 类比:广播电台停电,节目中断       │
  └─────────────────────────────────┘

缺陷3:消费者离线不积压
  ┌─────────────────────────────────┐
  │ 消费者断开连接 → 期间消息全部丢失  │
  │ 消费者重连 → 只收到重连后的消息   │
  │                                  │
  │ 类比:你关掉收音机 → 错过的节目    │
  │       永远听不到                  │
  └─────────────────────────────────┘

⚠️ 注意:Pub/Sub适合实时性要求高、允许消息丢失的场景。比如:实时聊天消息广播、配置变更通知、集群节点上下线通知。千万不要用它做订单通知——订单丢了可不是闹着玩的。

三、方案三:ZSet延迟队列

3.1 基本原理

利用ZSet的score作为"待处理时间戳",消费者定时轮询score <= 当前时间的消息,实现延迟队列。

ZSet延迟队列模型:

  ZSet结构:
  ┌────────────────────────────────────┐
  │ Key: delay:order:queue              │
  │                                     │
  │ Member (消息体)        Score (时间戳) │
  │ ─────────────────    ───────────────│
  │ "order:1001"          1716710400   │ ← 1小时后处理
  │ "order:1002"          1716714000   │ ← 2小时后处理
  │ "order:1003"          1716706800   │ ← 现在就该处理
  │ "order:1004"          1716721200   │ ← 4小时后处理
  │ "order:1005"          1716706800   │ ← 现在就该处理
  └────────────────────────────────────┘

  消费者轮询逻辑:
    每分钟执行一次:
      ZRANGEBYSCORE delay:order:queue 0 <NOW> LIMIT 0 10
      → 获取score <= 当前时间的10条消息
      → 处理这些消息
      → 处理成功后 ZREM 删除

3.2 代码实现

// === 延迟消息生产者 ===
public class DelayProducer {
    public void sendDelayMessage(String queueKey, String message, long delaySeconds) {
        long executeTime = System.currentTimeMillis() / 1000 + delaySeconds;
        jedis.zadd(queueKey, executeTime, message);
    }
}

// === 延迟消息消费者 ===
public class DelayConsumer {
    private static final String QUEUE_KEY = "delay:order:queue";
    private static final String PROCESSING_KEY = "delay:order:processing";
    
    @Scheduled(fixedDelay = 1000) // 每秒轮询一次
    public void consume() {
        long now = System.currentTimeMillis() / 1000;
        
        // 1. 获取所有到期的消息(原子操作)
        // ZRANGEBYSCORE获取到期的消息,转移到处理中的列表
        Set<String> messages = jedis.zrangeByScore(QUEUE_KEY, 0, now, 0, 10);
        
        for (String message : messages) {
            // 2. 原子地将消息从待处理移到处理中(防止重复消费)
            Long result = jedis.zrem(QUEUE_KEY, message);
            if (result == 0) {
                continue; // 被其他消费者抢走了
            }
            
            // 3. 处理消息
            try {
                processMessage(message);
            } catch (Exception e) {
                // 4. 处理失败 → 重新放回队列(延迟重试)
                long retryTime = now + 60; // 60秒后重试
                jedis.zadd(QUEUE_KEY, retryTime, message);
                log.error("消息处理失败,60秒后重试: {}", message, e);
            }
        }
    }
}
经典应用场景:

  1. 订单超时取消
     下单时 ZADD delay:order 1716714000 "order:1001"
     30分钟后消费者扫描到该订单
     → 检查支付状态 → 未支付 → 取消订单

  2. 红包24小时过期退款
     发红包时 ZADD delay:redpacket <24小时后> "packet:abc"
     到期后消费者处理 → 退回余额

  3. 定时提醒
     设置提醒 ZADD delay:reminder <提醒时间> "user:123:remind:meeting"
     到期触发通知推送

  4. 异步任务调度
     提交异步任务,指定执行时间

3.3 优缺点

优点 缺点
原生支持延迟/定时任务 需要轮询,延迟受轮询间隔影响
无需额外中间件 高并发下ZREM竞争激烈
消息可靠(不消费成功不删除) 大量消息时ZSet性能会有影响
支持优先级(score越小越优先) 不支持ACK和消费者组

四、方案四:Redis Stream(真正的消息队列)

4.1 Stream的核心特性

Redis 5.0引入的Stream 数据结构,让Redis真正拥有了"专业级消息队列"的能力。

Redis Stream的本质:

  Stream = 日志 + 消费者组
           ┃
           ┃ 像Kafka的Topic分区
           ┃ 像RabbitMQ的消费者组
           ┃ 但更轻量、更低延迟
           ┃
           ▼
  ┌─────────────────────────────────────────┐
  │ Stream: orders                           │
  │                                          │
  │ ┌──────────┐ ┌──────────┐ ┌──────────┐  │
  │ │ 1700000001│→│ 1700000002│→│ 1700000003│  │
  │ │-0        │ │-0        │ │-0        │  │
  │ │{         │ │{         │ │{         │  │
  │ │ order:100│ │ order:101│ │ order:102│  │
  │ │ amount:99│ │ amount:58│ │ amount:20│  │
  │ │}         │ │}         │ │}         │  │
  │ └──────────┘ └──────────┘ └──────────┘  │
  │                                          │
  │ 消费者组: order-processors                │
  │ ┌─────────┐  ┌──────────┐               │
  │ │Consumer1│  │ Consumer2│               │
  │ │ pending │  │ pending  │               │
  │ └─────────┘  └──────────┘               │
  └─────────────────────────────────────────┘
# 基础命令演示
# 1. 添加消息
XADD orders * order_id "1001" amount "99.9"
# 返回: "1716700000001-0"  (Redis自动生成的ID)

# 2. 查看消息数量
XLEN orders
# 返回: (integer) 1

# 3. 读取消息(从最早开始)
XREAD COUNT 2 STREAMS orders 0-0
# 返回所有消息

# 4. 读取新消息(阻塞模式)
XREAD BLOCK 5000 STREAMS orders $
# 阻塞5秒等待新消息($ = 只读最新消息)

# 5. 范围查询
XRANGE orders - + COUNT 10
# 查询最早到最新的10条消息

# 6. 反向查询
XREVRANGE orders + - COUNT 10
# 查询最新到最早的10条消息

Stream是本文的重点,下一篇(第067篇)会有完整的深入讲解,这里先建立基本概念。

五、四种方案完整对比

5.1 多维对比总表

维度 List(BRPOP) Pub/Sub ZSet延迟队列 Stream
Redis版本 1.0+ 2.0+ 1.2+ 5.0+
消费模式 竞争消费 广播消费 竞争消费 竞争消费+消费者组
消息ACK 不支持 不支持 手动(ZREM) 支持(XACK)
消息持久化 否(纯内存) 否(纯内存) 是(RDB/AOF) 是(RDB/AOF)
消息回溯 不支持 不支持 支持 支持
消息堆积 受内存限制 无堆积 受内存限制 受内存限制(可设MAXLEN)
延迟能力 实时 实时 原生支持延迟 原生不支持延迟
消费者组 不支持 不支持 不支持 原生支持
Pending管理 XPENDING/XCLAIM
吞吐量 高(10万+/s) 极高(50万+/s) 中(1万+/s) 高(10万+/s)
实现复杂度 ★☆☆☆☆ ★☆☆☆☆ ★★☆☆☆ ★★★★☆
可靠性 极低

5.2 选型建议

你的需求是什么?
│
├─ 实时通知、配置下发、聊天广播
│  → Pub/Sub
│  优势:零延迟,所有订阅者同时收到
│  代价:消息可能丢失,不支持回溯
│
├─ 简单的异步任务队列,消息不能丢
│  → List + BRPOP + 备份队列
│  优势:实现简单,天然FIFO
│  代价:没有ACK,需要自己处理失败重试
│
├─ 延迟任务(订单超时、定时提醒)
│  → ZSet延迟队列
│  优势:原生支持延迟,不依赖额外组件
│  代价:需要轮询,大量任务时有性能开销
│
├─ 真正需要可靠消息队列的场景
│  → Stream
│  优势:ACK、消费者组、消息回溯,接近专业MQ
│  代价:实现复杂度较高,需要Redis 5.0+
│
└─ 需要事务消息、严格顺序、海量堆积
   → 别用Redis了,上Kafka/RabbitMQ吧
   原因:专业MQ在这些方面有不可替代的优势

六、Redis作为MQ与Kafka/RabbitMQ的横向对比

6.1 定位对比

Kafka:
  ┌─────────────────────────────────────────────┐
  │ 定位:分布式流处理平台                         │
  │ 优势:海量数据(百万QPS)、持久化到磁盘、分区并行   │
  │ 生态:Kafka Connect、KSQL、Kafka Streams       │
  │ 场景:日志收集、埋点、大数据管道                │
  │ 部署:需要ZooKeeper/KRaft + 多节点集群          │
  └─────────────────────────────────────────────┘

RabbitMQ:
  ┌─────────────────────────────────────────────┐
  │ 定位:企业级消息中间件                         │
  │ 优势:AMQP标准、灵活路由、事务消息              │
  │ 生态:管理后台、联邦/Shovel插件、死信队列        │
  │ 场景:订单系统、支付回调、微服务异步通信          │
  │ 部署:Erlang环境 + RabbitMQ集群                │
  └─────────────────────────────────────────────┘

Redis(作为MQ):
  ┌─────────────────────────────────────────────┐
  │ 定位:轻量级消息队列                           │
  │ 优势:部署极简(你本来就装了Redis)、亚毫秒延迟    │
  │ 场景:中小规模异步任务、缓存失效通知、轻量解耦    │
  │ 部署:单个Redis实例(你可能已经跑着了)          │
  │ 局限:内存限制、持久化靠RDB/AOF、无事务消息      │
  └─────────────────────────────────────────────┘

6.2 关键能力对比

能力 Redis(Stream) Kafka RabbitMQ
消息确认(ACK) 支持 支持 支持
消息持久化 RDB/AOF(异步) 磁盘(同步) 磁盘+内存
堆积容量 受内存限制(GB级) 无限(磁盘,TB级) 受磁盘限制
严格有序 单Stream保证 分区内保证 队列内保证
事务消息 不支持 有限支持 完整支持
延迟消息 需ZSet配合 不支持原生 插件支持
死信队列 需手动实现 需手动实现 原生支持
运维复杂度 极低
最低延迟 <1ms 2-10ms 1-5ms
最大吞吐(单节点) 10万+/s 百万+/s 5万+/s

6.3 Redis作为MQ的局限性

Redis做MQ,以下场景请三思:

  1. 严格的消息顺序保证
     → Redis Stream只保证单个Stream内的顺序
     → 分布式环境下还有主从延迟问题

  2. 海量堆积(TB级别)
     → Redis数据在内存中,堆积=内存爆炸
     → Kafka靠磁盘,TB级别堆积毫无压力

  3. 事务消息(发消息 + 数据库操作 = 原子性)
     → Redis的事务不支持回滚
     → RabbitMQ有Channel事务和Publisher Confirm

  4. 严格的发布-确认模型
     → 生产者的消息必须到达且不丢失
     → Kafka和RabbitMQ都有成熟的确认机制

  5. 复杂的路由规则
     → 按topic+tags+headers路由
     → RabbitMQ的Exchange体系更成熟

  6. 企业级运维要求
     → 监控大盘、消息追踪、死信处理、审计日志
     → 专业MQ开箱即用,Redis需要大量自研

⚠️ 注意:一个常见误区是"Redis速度快,用它做MQ肯定也快"。速度快不代表适合。消息队列的核心诉求是可靠有序,而Redis的核心优势是快速简单。用百米短跑选手去跑马拉松,姿势可能帅,但未必能跑完全程。

七、总结:什么时候用Redis做MQ?

给你一份"Redis MQ使用决策树":

决策树:
  你的日均消息量 < 1000万 ?
  ├─ YES →
  │   你的消息可以接受丢失吗?
  │   ├─ YES → Pub/Sub
  │   └─ NO →
  │       你的消息需要延迟投递吗?
  │       ├─ YES → ZSet延迟队列
  │       └─ NO →
  │           你需要消费者组和消息重试吗?
  │           ├─ NO → List + BRPOP
  │           └─ YES → Stream
  │
  └─ NO → 
      你需要事务消息、海量堆积或复杂路由?
      ├─ 至少一项YES → 上Kafka/RabbitMQ
      └─ 都不需要 → Stream(做好容量规划)

Redis用好了可以做很多事,但永远记住:合适的工具解决合适的问题。如果你只需要一天发几百条短信通知,别大费周章部署Kafka。反过来,如果你在做电商核心交易链路,别拿Redis Stream去碰运气——该用的中间件还是得用。

下一篇文章,我们将深入Redis Stream的每一个细节——消费者组、ACK机制、消息重试、死信处理,敬请期待。


上一篇【第65篇】缓存穿透/击穿/雪崩——三大缓存坑的解决方案
下一篇【第67篇】Redis Stream——终于有了真正的消息队列


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐