上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数


Redis作者antirez(在还没退休的时候)曾经说:“Stream是Redis有史以来最复杂的命令集。”
开发者:“终于啊!再也不用拿List冒充消息队列了。”
运维:“所以我们以后还要单独部署Kafka吗?”
架构师:“看情况。Stream长得像Kafka,但它骨子里还是Redis。”

如果说之前的List、Pub/Sub、ZSet都只是"借用"Redis来做消息队列,那Stream就是Redis官方亲自下场,直接给你造了一个消息队列数据结构。它在设计上向Kafka致敬,但保持了Redis的轻量和低延迟基因。本文将深入Stream的每一个毛孔,让你从"会用"到"懂为什么这么设计"。

一、Redis Stream的设计背景

1.1 为什么Redis 5.0要引入Stream?

在Stream出现之前,用Redis做消息队列是一场"拼凑游戏":

Redis做MQ的"史前时代":

  方式1:List(BRPOP)
  ┌─────────────┐
  │ 能干活        │  缺点:
  │ 但要自己搞ACK  │  - 消息确认需要自己实现
  │ 要自己搞重试   │  - 不能回溯历史消息
  └─────────────┘  - 多消费者没法按组消费

  方式2:Pub/Sub
  ┌─────────────┐
  │ 能广播        │  缺点:
  │ 但消息不会持久  │  - 掉线期间消息全丢
  │ 离线全丢       │  - 没有消费者组概念
  └─────────────┘

  方式3:ZSet
  ┌─────────────┐
  │ 能做延迟队列   │  缺点:
  │ 但要自己轮询   │  - 高频轮询浪费资源
  └─────────────┘  - 没有原生ACK

antirez在设计Stream时,同时参考了Kafka的分区日志和Redis的简洁哲学,最终选择了一种"追加日志+消费者组"的结构。它不是Kafka的替代品,而是填补了"想要消息队列但不想引入重量级中间件"这个需求空白。

1.2 Stream的数据模型

从宏观上看,一个Stream就是一个**只追加(append-only)**的日志文件:

Stream = 一个逻辑上的消息序列
  ─────────────────────────────────────────────────────────►
  时间轴

  [msg1] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] [msg8] ...
    │       │      │      │      │      │      │      │
    │       │      │      │      │      │      │      │
    ▼       ▼      ▼      ▼      ▼      ▼      ▼      ▼

  每条消息包含:
  ┌─────────────────────────────┐
  │ ID: 1716700000000-0         │ ← 毫秒时间戳-序号
  │ 字段1: value1               │
  │ 字段2: value2               │  ← 键值对(KV格式)
  │ 字段3: value3               │
  └─────────────────────────────┘

  关键特性:
  ✓ 消息追加到末尾,不修改不删除(除非手动DEL/XTRIM)
  ✓ ID保证了全局顺序
  ✓ 不同消费者以不同进度消费同一条Stream
  ✓ 消费者组可以追踪每个消费者的进度

二、Stream ID:时间与序号的精妙编码

2.1 ID格式

Stream的每条消息都有一个全局唯一的ID,格式为:

Stream ID = <millisecondsTime>-<sequenceNumber>

  示例:1716700000000-0

  ┌──────────────┐ ┌┐
  │ 1716700000000 │0│
  └──────┬───────┘ └┤
         │           │
    UNIX毫秒时间戳    序列号(同毫秒内的微调)
    (2024-05-26      (从0开始自增)
     06:00:00.000)

这个设计有两个精妙之处:

  1. 时间可排序:毫秒时间戳本身就保证了大致的时间顺序
  2. 同毫秒防冲突:序列号保证同一毫秒内的多条消息ID不重复
# ID生成规则演示

# 使用 * 让Redis自动生成ID
127.0.0.1:6379> XADD mystream * name "Tom" age "25"
"1716700000000-0"

# 1ms后的消息
127.0.0.1:6379> XADD mystream * name "Jerry" age "30"
"1716700000001-0"  # 时间戳自动+1

# 同1ms内多条消息
127.0.0.1:6379> XADD mystream * msg "a"  # 序列号=0
"1716700000002-0"
127.0.0.1:6379> XADD mystream * msg "b"  # 序列号=1
"1716700000002-1"
127.0.0.1:6379> XADD mystream * msg "c"  # 序列号=2
"1716700000002-2"

# 也可以手动指定ID(不推荐,除非有特殊需求)
127.0.0.1:6379> XADD mystream 1716700000003-0 msg "manual"
"1716700000003-0"

2.2 ID的约束规则

ID规则:
  1. 新ID必须大于Stream中当前最大ID
     → 保证消息严格追加、不会插入到历史位置
     → 如果客户端指定了一个小于等于最大ID的值,会报错

  2. 时间戳部分必须 >= 当前最大ID的时间戳
     → 防止时钟回拨造成ID错乱

  3. 如果时间戳相同,序列号必须 > 同时间戳的最大序列号
     → 保证同一毫秒内的严格顺序

  4. 最小ID:0-0(XRANGE用 - 表示)
  5. 最大ID:理论上无穷(XRANGE用 + 表示)

三、基础命令:写入与读取

3.1 XADD:添加消息

# 基本语法
XADD stream_key [MAXLEN ~ count] [NOMKSTREAM] <ID> field value [field value ...]

# 添加一条消息
127.0.0.1:6379> XADD orders * order_id "1001" user_id "U001" amount "99.9"
"1716700000000-0"

# 查看Stream长度
127.0.0.1:6379> XLEN orders
(integer) 1

# 限制Stream长度(约等于最近1000条,trim效率优化)
127.0.0.1:6379> XADD orders MAXLEN ~ 1000 * order_id "1002" amount "58.0"
"1716700000001-0"

MAXLEN ~中的~表示"近似裁剪"——Redis可能在Stream节点层面做性能优化,实际保留的消息数可能略多于1000条。如果要求精确,去掉~即可,但性能会下降。

3.2 XRANGE / XREVRANGE:范围查询

# 查询所有消息(-表示最小ID,+表示最大ID)
127.0.0.1:6379> XRANGE orders - + COUNT 5
1) 1) "1716700000000-0"
   2) 1) "order_id"
      2) "1001"
      3) "user_id"
      4) "U001"
      5) "amount"
      6) "99.9"
2) 1) "1716700000001-0"
   2) 1) "order_id"
      2) "1002"
      3) "amount"
      4) "58.0"

# 查询指定时间范围
127.0.0.1:6379> XRANGE orders 1716700000000 1716700001000 COUNT 100

# 反向查询(最新的在前)
127.0.0.1:6379> XREVRANGE orders + - COUNT 10

3.3 XREAD:读取消息(非消费者组模式)

# 从最早的消息开始读取
127.0.0.1:6379> XREAD COUNT 2 STREAMS orders 0-0
#                                        └─ 起始ID:0-0=最早

# 阻塞读取新消息(等5秒)
127.0.0.1:6379> XREAD BLOCK 5000 STREAMS orders $
#                                         └─ 起始ID:$=只读最新

# 同时读取多个Stream
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS orders payments $
XREAD的两种模式:

  模式1:从头读(指定ID如0-0或特定ID)
    用途:消费者首次启动,回溯历史消息
  
  模式2:读新消息(使用$)
    用途:消费者只关心后续消息,不关心历史
  
  注意:XREAD不是消费者组模式!
  它只是"读取"消息,不会追踪消费进度。
  适合一次性消费或手动管理进度的场景。

四、消费者组:Stream的"大师级"功能

4.1 消费者组模型

消费者组是Stream区别于List/PubSub的核心特性。它解决了多消费者协同消费的问题:

消费者组工作模型:

  Stream: orders
  ┌──────────────────────────────────────────────────┐
  │  msg1  msg2  msg3  msg4  msg5  msg6  msg7  ...  │
  └──────────────────┬───────────────────────────────┘
                     │
           消费者组: "order-processors"
                     │
        ┌────────────┼────────────┐
        │            │            │
        ▼            ▼            ▼
   ┌─────────┐ ┌─────────┐ ┌─────────┐
   │Consumer1│ │Consumer2│ │Consumer3│
   │ 消费msg1 │ │ 消费msg2 │ │ 消费msg3 │
   │ 消费msg4 │ │ 消费msg5 │ │ 消费msg6 │
   │ 消费msg7 │ │  ...    │ │  ...    │
   └─────────┘ └─────────┘ └─────────┘
        │            │            │
        └── 每个消费者维护自己的 ─┘
              消费进度(PENDING列表)

  关键规则:
  ✓ 同一条消息只被组内一个消费者消费
  ✓ 不同消费者组可以独立消费同一条Stream
  ✓ 组内消费者可以动态增减(弹性伸缩)

4.2 消费者组命令详解

# === 1. 创建消费者组 ===
# XGROUP CREATE stream group id [MKSTREAM]
127.0.0.1:6379> XGROUP CREATE orders order-processors 0-0 MKSTREAM
#                                   └─ 组名    └─ 从最早的ID开始消费
#                                                      └─ 如果Stream不存在则创建

# 注意:id=0-0 从头消费;id=$ 只消费新消息

# === 2. 消费者组读取消息 ===
# XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key id
127.0.0.1:6379> XREADGROUP GROUP order-processors consumer-1 COUNT 2 STREAMS orders >
#                                     └─ 组名    └─ 消费者名            └─ > 表示"没确认过的消息"
#                                                                          (也接受指定ID和0)
# > : 只发送从未交付给任何消费者的新消息
# 0 : 发送该消费者pending中但未确认的消息
# 指定ID : 从特定ID开始读取

# === 3. 确认消息 ===
# XACK stream group id [id ...]
127.0.0.1:6379> XACK orders order-processors 1716700000000-0
(integer) 1  # 确认成功

# === 4. 查看Pending列表 ===
127.0.0.1:6379> XPENDING orders order-processors
1) (integer) 5         # Pending消息总数
2) "1716700000000-0"  # 最小Pending ID
3) "1716700000010-0"  # 最大Pending ID
4) 1) 1) "consumer-1"  # 每个消费者的Pending数量
      2) "3"
   2) 1) "consumer-2"
      2) "2"

# 详细查看Pending消息
127.0.0.1:6379> XPENDING orders order-processors - + 10
1) 1) "1716700000000-0"   # ID
   2) "consumer-1"         # 消费者
   3) (integer) 60000      # 空闲时间(毫秒)
   4) (integer) 3          # 被投递次数

# === 5. 转移消息 ===
# XCLAIM stream group consumer min-idle-time id [id ...]
127.0.0.1:6379> XCLAIM orders order-processors consumer-2 60000 1716700000000-0
#                       └─ 组名  └─ 目标消费者 └─ 空闲阈值 └─ 消息ID
# 含义:将空闲超过60秒的消息从consumer-1转移给consumer-2

4.3 消费者组的完整工作流

消息从生产到消费的完整生命周期:

  1. 生产者发送消息
     XADD orders * order_id "1001" amount "99.9"
     → 消息进入Stream,状态: 未分配

  2. 消费者拉取消息
     XREADGROUP GROUP order-processors worker-1 STREAMS orders >
     → 消息分配给worker-1,进入Pending列表
     → 状态: 待确认(pending)

  3. 消费者处理成功
     业务逻辑处理...
     XACK orders order-processors "1716700000000-0"
     → 消息从Pending中移除
     → 状态: 已确认(done)

  4. 消费者处理失败(或者挂了)
     consumer-1 获取消息后崩溃(没有XACK)
     → 消息留在consumer-1的Pending列表中
     → 空闲时间持续增长
     → 状态: 挂起(stuck)

  5. 其他消费者认领
     XPENDING 发现某消息空闲超过阈值
     XCLAIM orders order-processors worker-2 60000 "1716700000000-0"
     → 消息转移给worker-2,投递次数+1
     → 状态: 重新分配

五、Java实战:Spring Data Redis操作Stream

5.1 生产者代码

@Component
public class StreamProducer {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String STREAM_KEY = "orders";
    
    // 发送消息
    public String sendMessage(Map<String, String> fields) {
        // 构建Record
        MapRecord<String, String, String> record = StreamRecords
                .newRecord()
                .ofMap(fields)
                .withStreamKey(STREAM_KEY);
        
        // XADD,自动分配ID
        RecordId id = redisTemplate.opsForStream().add(record);
        
        // 限制Stream长度(防止内存溢出)
        redisTemplate.opsForStream()
                .trim(STREAM_KEY, 10000); // 保留最新1万条
        
        return id.getValue(); // 返回 "1716700000000-0"
    }
    
    // 批量发送
    public void sendBatch(List<Map<String, String>> messages) {
        messages.forEach(this::sendMessage);
    }
}

5.2 消费者代码(消费者组模式)

@Component
public class StreamConsumer {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final String STREAM_KEY = "orders";
    private static final String GROUP_NAME = "order-processors";
    private static final String CONSUMER_NAME = "worker-" + UUID.randomUUID().toString().substring(0, 8);
    
    @PostConstruct
    public void init() {
        // 创建消费者组(如果不存在)
        try {
            redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
        } catch (RedisSystemException e) {
            // 组已存在,忽略
            if (!e.getMessage().contains("BUSYGROUP")) {
                throw e;
            }
        }
        // 启动消费循环
        startConsuming();
    }
    
    public void startConsuming() {
        new Thread(() -> {
            while (true) {
                try {
                    // 先处理Pending消息(之前没ACK的)
                    processPending();
                    
                    // 再拉取新消息
                    List<MapRecord<String, Object, Object>> messages = 
                        redisTemplate.opsForStream()
                            .read(Consumer.from(GROUP_NAME, CONSUMER_NAME),
                                  StreamReadOptions.empty().count(10).block(Duration.ofSeconds(5)),
                                  StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()));
                    
                    for (MapRecord<String, Object, Object> message : messages) {
                        try {
                            // 处理业务逻辑
                            processOrder(message.getValue());
                            
                            // 确认消息
                            redisTemplate.opsForStream()
                                .acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
                        } catch (Exception e) {
                            log.error("消息处理失败: {}", message.getId(), e);
                            // 不ACK,等待XCLAIM转移或重试
                        }
                    }
                } catch (Exception e) {
                    log.error("消费循环异常", e);
                    try { Thread.sleep(1000); } catch (InterruptedException ie) {}
                }
            }
        }, "stream-consumer").start();
    }
    
    // 处理Pending消息(兜底机制)
    private void processPending() {
        PendingMessages pendingMessages = redisTemplate.opsForStream()
            .pending(STREAM_KEY, GROUP_NAME, 
                     PendingMessagesOptions.empty().range(Range.unbounded()).count(10));
        
        for (PendingMessage msg : pendingMessages) {
            // 空闲超过60秒的消息,重新处理
            if (msg.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofSeconds(60)) > 0) {
                List<MapRecord<String, Object, Object>> claimed = 
                    redisTemplate.opsForStream()
                        .claim(STREAM_KEY, GROUP_NAME, CONSUMER_NAME,
                               Duration.ofSeconds(60),
                               RecordId.of(msg.getIdAsString()));
                
                for (MapRecord<String, Object, Object> record : claimed) {
                    try {
                        processOrder(record.getValue());
                        redisTemplate.opsForStream()
                            .acknowledge(STREAM_KEY, GROUP_NAME, record.getId());
                    } catch (Exception e) {
                        log.error("Pending消息重试失败: {}", record.getId(), e);
                    }
                }
            }
        }
    }
}

⚠️ 注意:Spring Data Redis 3.x与2.x的Stream API变化较大。上面用的是3.x的API。如果你还在用2.x,需要参考对应版本的文档,核心逻辑是一样的,只是类名和方法签名略有不同。

六、Stream vs Kafka:相似与不同

6.1 架构对比

Redis Stream:
  ┌────────────────────────────────────────────┐
  │              Redis 单节点(或Cluster)         │
  │                                              │
  │  Stream: orders         消费者组: proc-1      │
  │  ┌─────────────────┐   ┌──────┐ ┌──────┐   │
  │  │ msg1 msg2 msg3  │   │Consumer1│Consumer2│  │
  │  └─────────────────┘   └─────────┘       │
  │                                外部存储      │
  └────────────────────────────────────────────┘

  优势:部署简单,低延迟
  劣势:单Stream,无分区,受单机内存限制

Kafka:
  ┌────────────────────────────────────────────┐
  │            Kafka Broker集群                  │
  │                                              │
  │  Topic: orders (3分区)                       │
  │  ┌─────────────┐ ┌─────────────┐            │
  │  │ Partition 0 │ │ Partition 1 │  ...       │
  │  │ 磁盘持久化    │ │ 磁盘持久化    │            │
  │  └─────────────┘ └─────────────┘            │
  │        磁盘 = TB级存储                         │
  │        消费者组 = 跨分区消费                    │
  └────────────────────────────────────────────┘

  优势:海量堆积、分区并行、持久化到磁盘
  劣势:运维复杂、延迟略高

6.2 关键差异

维度 Redis Stream Kafka
分区/分片 无原生分区 分区是核心概念
存储 内存(RDB/AOF备份) 磁盘(顺序写入)
消息删除 MAXLEN/XTRIM/手动DEL 按时间或大小自动清理
单Stream吞吐 10万+/s 百万+/s
延迟 <1ms 2-10ms
消费者进度 保存在Redis中 保存在Kafka内部topic
消息回溯 支持(只要没被trim) 支持(可配置保留时间)
部署成本 低(你可能已有Redis) 高(需独立集群)

6.3 XTRIM:控制Stream长度

# 精确裁剪:保留最近1000条
127.0.0.1:6379> XTRIM orders MAXLEN 1000

# 近似裁剪:性能更高,但可能多保留几十条
127.0.0.1:6379> XTRIM orders MAXLEN ~ 1000

# 按最小ID裁剪:删除小于指定ID的消息
127.0.0.1:6379> XTRIM orders MINID 1716700001000-0

# XADD时同时限制长度(一步到位)
127.0.0.1:6379> XADD orders MAXLEN ~ 1000 * field value

⚠️ 注意:XTRIM会删除消息!如果有消费者还没消费到这些消息,消息就永久丢失了。在设置MAXLEN时要确保所有消费者组都能在消息被trim之前消费完毕。建议给Stream预留足够的长度余量,比如你日均消费1000条,设置MAXLEN 5000比1000更安全。

七、总结

Redis Stream让Redis真正成为了一款"可以当消息队列用"的数据结构服务器。它的核心价值在于:

  1. 零额外成本:你很可能已经在用Redis了,Stream不需要新的部署
  2. 消费者组:解决了多消费者协同消费的难题
  3. 消息ACK:消费确认机制保障了消息可靠投递
  4. 低延迟:内存操作,延迟可以做到亚毫秒级

但它不是Kafka的替代品。面对TB级堆积、分区并行消费、严格顺序保证等场景,专业的消息中间件仍然是最佳选择。

记住一个判断标准:如果你的团队已经在维护Kafka/RabbitMQ,就别折腾Stream了;如果你没有MQ,且消息量不大(百万级/天以下),Stream是性价比最高的选择。

下一篇文章,我们将聊聊HyperLogLog——一个用12KB内存就能统计1亿用户的神奇数据结构。


上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐