【Redis从入门到精通】第67篇:Redis Stream——终于有了真正的消息队列
上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数
Redis作者antirez(在还没退休的时候)曾经说:“Stream是Redis有史以来最复杂的命令集。”
开发者:“终于啊!再也不用拿List冒充消息队列了。”
运维:“所以我们以后还要单独部署Kafka吗?”
架构师:“看情况。Stream长得像Kafka,但它骨子里还是Redis。”
如果说之前的List、Pub/Sub、ZSet都只是"借用"Redis来做消息队列,那Stream就是Redis官方亲自下场,直接给你造了一个消息队列数据结构。它在设计上向Kafka致敬,但保持了Redis的轻量和低延迟基因。本文将深入Stream的每一个毛孔,让你从"会用"到"懂为什么这么设计"。
一、Redis Stream的设计背景
1.1 为什么Redis 5.0要引入Stream?
在Stream出现之前,用Redis做消息队列是一场"拼凑游戏":
Redis做MQ的"史前时代":
方式1:List(BRPOP)
┌─────────────┐
│ 能干活 │ 缺点:
│ 但要自己搞ACK │ - 消息确认需要自己实现
│ 要自己搞重试 │ - 不能回溯历史消息
└─────────────┘ - 多消费者没法按组消费
方式2:Pub/Sub
┌─────────────┐
│ 能广播 │ 缺点:
│ 但消息不会持久 │ - 掉线期间消息全丢
│ 离线全丢 │ - 没有消费者组概念
└─────────────┘
方式3:ZSet
┌─────────────┐
│ 能做延迟队列 │ 缺点:
│ 但要自己轮询 │ - 高频轮询浪费资源
└─────────────┘ - 没有原生ACK
antirez在设计Stream时,同时参考了Kafka的分区日志和Redis的简洁哲学,最终选择了一种"追加日志+消费者组"的结构。它不是Kafka的替代品,而是填补了"想要消息队列但不想引入重量级中间件"这个需求空白。
1.2 Stream的数据模型
从宏观上看,一个Stream就是一个**只追加(append-only)**的日志文件:
Stream = 一个逻辑上的消息序列
─────────────────────────────────────────────────────────►
时间轴
[msg1] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] [msg8] ...
│ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
每条消息包含:
┌─────────────────────────────┐
│ ID: 1716700000000-0 │ ← 毫秒时间戳-序号
│ 字段1: value1 │
│ 字段2: value2 │ ← 键值对(KV格式)
│ 字段3: value3 │
└─────────────────────────────┘
关键特性:
✓ 消息追加到末尾,不修改不删除(除非手动DEL/XTRIM)
✓ ID保证了全局顺序
✓ 不同消费者以不同进度消费同一条Stream
✓ 消费者组可以追踪每个消费者的进度
二、Stream ID:时间与序号的精妙编码
2.1 ID格式
Stream的每条消息都有一个全局唯一的ID,格式为:
Stream ID = <millisecondsTime>-<sequenceNumber>
示例:1716700000000-0
┌──────────────┐ ┌┐
│ 1716700000000 │0│
└──────┬───────┘ └┤
│ │
UNIX毫秒时间戳 序列号(同毫秒内的微调)
(2024-05-26 (从0开始自增)
06:00:00.000)
这个设计有两个精妙之处:
- 时间可排序:毫秒时间戳本身就保证了大致的时间顺序
- 同毫秒防冲突:序列号保证同一毫秒内的多条消息ID不重复
# ID生成规则演示
# 使用 * 让Redis自动生成ID
127.0.0.1:6379> XADD mystream * name "Tom" age "25"
"1716700000000-0"
# 1ms后的消息
127.0.0.1:6379> XADD mystream * name "Jerry" age "30"
"1716700000001-0" # 时间戳自动+1
# 同1ms内多条消息
127.0.0.1:6379> XADD mystream * msg "a" # 序列号=0
"1716700000002-0"
127.0.0.1:6379> XADD mystream * msg "b" # 序列号=1
"1716700000002-1"
127.0.0.1:6379> XADD mystream * msg "c" # 序列号=2
"1716700000002-2"
# 也可以手动指定ID(不推荐,除非有特殊需求)
127.0.0.1:6379> XADD mystream 1716700000003-0 msg "manual"
"1716700000003-0"
2.2 ID的约束规则
ID规则:
1. 新ID必须大于Stream中当前最大ID
→ 保证消息严格追加、不会插入到历史位置
→ 如果客户端指定了一个小于等于最大ID的值,会报错
2. 时间戳部分必须 >= 当前最大ID的时间戳
→ 防止时钟回拨造成ID错乱
3. 如果时间戳相同,序列号必须 > 同时间戳的最大序列号
→ 保证同一毫秒内的严格顺序
4. 最小ID:0-0(XRANGE用 - 表示)
5. 最大ID:理论上无穷(XRANGE用 + 表示)
三、基础命令:写入与读取
3.1 XADD:添加消息
# 基本语法
XADD stream_key [MAXLEN ~ count] [NOMKSTREAM] <ID> field value [field value ...]
# 添加一条消息
127.0.0.1:6379> XADD orders * order_id "1001" user_id "U001" amount "99.9"
"1716700000000-0"
# 查看Stream长度
127.0.0.1:6379> XLEN orders
(integer) 1
# 限制Stream长度(约等于最近1000条,trim效率优化)
127.0.0.1:6379> XADD orders MAXLEN ~ 1000 * order_id "1002" amount "58.0"
"1716700000001-0"
MAXLEN ~中的~表示"近似裁剪"——Redis可能在Stream节点层面做性能优化,实际保留的消息数可能略多于1000条。如果要求精确,去掉~即可,但性能会下降。
3.2 XRANGE / XREVRANGE:范围查询
# 查询所有消息(-表示最小ID,+表示最大ID)
127.0.0.1:6379> XRANGE orders - + COUNT 5
1) 1) "1716700000000-0"
2) 1) "order_id"
2) "1001"
3) "user_id"
4) "U001"
5) "amount"
6) "99.9"
2) 1) "1716700000001-0"
2) 1) "order_id"
2) "1002"
3) "amount"
4) "58.0"
# 查询指定时间范围
127.0.0.1:6379> XRANGE orders 1716700000000 1716700001000 COUNT 100
# 反向查询(最新的在前)
127.0.0.1:6379> XREVRANGE orders + - COUNT 10
3.3 XREAD:读取消息(非消费者组模式)
# 从最早的消息开始读取
127.0.0.1:6379> XREAD COUNT 2 STREAMS orders 0-0
# └─ 起始ID:0-0=最早
# 阻塞读取新消息(等5秒)
127.0.0.1:6379> XREAD BLOCK 5000 STREAMS orders $
# └─ 起始ID:$=只读最新
# 同时读取多个Stream
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS orders payments $
XREAD的两种模式:
模式1:从头读(指定ID如0-0或特定ID)
用途:消费者首次启动,回溯历史消息
模式2:读新消息(使用$)
用途:消费者只关心后续消息,不关心历史
注意:XREAD不是消费者组模式!
它只是"读取"消息,不会追踪消费进度。
适合一次性消费或手动管理进度的场景。
四、消费者组:Stream的"大师级"功能
4.1 消费者组模型
消费者组是Stream区别于List/PubSub的核心特性。它解决了多消费者协同消费的问题:
消费者组工作模型:
Stream: orders
┌──────────────────────────────────────────────────┐
│ msg1 msg2 msg3 msg4 msg5 msg6 msg7 ... │
└──────────────────┬───────────────────────────────┘
│
消费者组: "order-processors"
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Consumer1│ │Consumer2│ │Consumer3│
│ 消费msg1 │ │ 消费msg2 │ │ 消费msg3 │
│ 消费msg4 │ │ 消费msg5 │ │ 消费msg6 │
│ 消费msg7 │ │ ... │ │ ... │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└── 每个消费者维护自己的 ─┘
消费进度(PENDING列表)
关键规则:
✓ 同一条消息只被组内一个消费者消费
✓ 不同消费者组可以独立消费同一条Stream
✓ 组内消费者可以动态增减(弹性伸缩)
4.2 消费者组命令详解
# === 1. 创建消费者组 ===
# XGROUP CREATE stream group id [MKSTREAM]
127.0.0.1:6379> XGROUP CREATE orders order-processors 0-0 MKSTREAM
# └─ 组名 └─ 从最早的ID开始消费
# └─ 如果Stream不存在则创建
# 注意:id=0-0 从头消费;id=$ 只消费新消息
# === 2. 消费者组读取消息 ===
# XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key id
127.0.0.1:6379> XREADGROUP GROUP order-processors consumer-1 COUNT 2 STREAMS orders >
# └─ 组名 └─ 消费者名 └─ > 表示"没确认过的消息"
# (也接受指定ID和0)
# > : 只发送从未交付给任何消费者的新消息
# 0 : 发送该消费者pending中但未确认的消息
# 指定ID : 从特定ID开始读取
# === 3. 确认消息 ===
# XACK stream group id [id ...]
127.0.0.1:6379> XACK orders order-processors 1716700000000-0
(integer) 1 # 确认成功
# === 4. 查看Pending列表 ===
127.0.0.1:6379> XPENDING orders order-processors
1) (integer) 5 # Pending消息总数
2) "1716700000000-0" # 最小Pending ID
3) "1716700000010-0" # 最大Pending ID
4) 1) 1) "consumer-1" # 每个消费者的Pending数量
2) "3"
2) 1) "consumer-2"
2) "2"
# 详细查看Pending消息
127.0.0.1:6379> XPENDING orders order-processors - + 10
1) 1) "1716700000000-0" # ID
2) "consumer-1" # 消费者
3) (integer) 60000 # 空闲时间(毫秒)
4) (integer) 3 # 被投递次数
# === 5. 转移消息 ===
# XCLAIM stream group consumer min-idle-time id [id ...]
127.0.0.1:6379> XCLAIM orders order-processors consumer-2 60000 1716700000000-0
# └─ 组名 └─ 目标消费者 └─ 空闲阈值 └─ 消息ID
# 含义:将空闲超过60秒的消息从consumer-1转移给consumer-2
4.3 消费者组的完整工作流
消息从生产到消费的完整生命周期:
1. 生产者发送消息
XADD orders * order_id "1001" amount "99.9"
→ 消息进入Stream,状态: 未分配
2. 消费者拉取消息
XREADGROUP GROUP order-processors worker-1 STREAMS orders >
→ 消息分配给worker-1,进入Pending列表
→ 状态: 待确认(pending)
3. 消费者处理成功
业务逻辑处理...
XACK orders order-processors "1716700000000-0"
→ 消息从Pending中移除
→ 状态: 已确认(done)
4. 消费者处理失败(或者挂了)
consumer-1 获取消息后崩溃(没有XACK)
→ 消息留在consumer-1的Pending列表中
→ 空闲时间持续增长
→ 状态: 挂起(stuck)
5. 其他消费者认领
XPENDING 发现某消息空闲超过阈值
XCLAIM orders order-processors worker-2 60000 "1716700000000-0"
→ 消息转移给worker-2,投递次数+1
→ 状态: 重新分配
五、Java实战:Spring Data Redis操作Stream
5.1 生产者代码
@Component
public class StreamProducer {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String STREAM_KEY = "orders";
// 发送消息
public String sendMessage(Map<String, String> fields) {
// 构建Record
MapRecord<String, String, String> record = StreamRecords
.newRecord()
.ofMap(fields)
.withStreamKey(STREAM_KEY);
// XADD,自动分配ID
RecordId id = redisTemplate.opsForStream().add(record);
// 限制Stream长度(防止内存溢出)
redisTemplate.opsForStream()
.trim(STREAM_KEY, 10000); // 保留最新1万条
return id.getValue(); // 返回 "1716700000000-0"
}
// 批量发送
public void sendBatch(List<Map<String, String>> messages) {
messages.forEach(this::sendMessage);
}
}
5.2 消费者代码(消费者组模式)
@Component
public class StreamConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String STREAM_KEY = "orders";
private static final String GROUP_NAME = "order-processors";
private static final String CONSUMER_NAME = "worker-" + UUID.randomUUID().toString().substring(0, 8);
@PostConstruct
public void init() {
// 创建消费者组(如果不存在)
try {
redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
} catch (RedisSystemException e) {
// 组已存在,忽略
if (!e.getMessage().contains("BUSYGROUP")) {
throw e;
}
}
// 启动消费循环
startConsuming();
}
public void startConsuming() {
new Thread(() -> {
while (true) {
try {
// 先处理Pending消息(之前没ACK的)
processPending();
// 再拉取新消息
List<MapRecord<String, Object, Object>> messages =
redisTemplate.opsForStream()
.read(Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamReadOptions.empty().count(10).block(Duration.ofSeconds(5)),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()));
for (MapRecord<String, Object, Object> message : messages) {
try {
// 处理业务逻辑
processOrder(message.getValue());
// 确认消息
redisTemplate.opsForStream()
.acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
} catch (Exception e) {
log.error("消息处理失败: {}", message.getId(), e);
// 不ACK,等待XCLAIM转移或重试
}
}
} catch (Exception e) {
log.error("消费循环异常", e);
try { Thread.sleep(1000); } catch (InterruptedException ie) {}
}
}
}, "stream-consumer").start();
}
// 处理Pending消息(兜底机制)
private void processPending() {
PendingMessages pendingMessages = redisTemplate.opsForStream()
.pending(STREAM_KEY, GROUP_NAME,
PendingMessagesOptions.empty().range(Range.unbounded()).count(10));
for (PendingMessage msg : pendingMessages) {
// 空闲超过60秒的消息,重新处理
if (msg.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofSeconds(60)) > 0) {
List<MapRecord<String, Object, Object>> claimed =
redisTemplate.opsForStream()
.claim(STREAM_KEY, GROUP_NAME, CONSUMER_NAME,
Duration.ofSeconds(60),
RecordId.of(msg.getIdAsString()));
for (MapRecord<String, Object, Object> record : claimed) {
try {
processOrder(record.getValue());
redisTemplate.opsForStream()
.acknowledge(STREAM_KEY, GROUP_NAME, record.getId());
} catch (Exception e) {
log.error("Pending消息重试失败: {}", record.getId(), e);
}
}
}
}
}
}
⚠️ 注意:Spring Data Redis 3.x与2.x的Stream API变化较大。上面用的是3.x的API。如果你还在用2.x,需要参考对应版本的文档,核心逻辑是一样的,只是类名和方法签名略有不同。
六、Stream vs Kafka:相似与不同
6.1 架构对比
Redis Stream:
┌────────────────────────────────────────────┐
│ Redis 单节点(或Cluster) │
│ │
│ Stream: orders 消费者组: proc-1 │
│ ┌─────────────────┐ ┌──────┐ ┌──────┐ │
│ │ msg1 msg2 msg3 │ │Consumer1│Consumer2│ │
│ └─────────────────┘ └─────────┘ │
│ 外部存储 │
└────────────────────────────────────────────┘
优势:部署简单,低延迟
劣势:单Stream,无分区,受单机内存限制
Kafka:
┌────────────────────────────────────────────┐
│ Kafka Broker集群 │
│ │
│ Topic: orders (3分区) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ ... │
│ │ 磁盘持久化 │ │ 磁盘持久化 │ │
│ └─────────────┘ └─────────────┘ │
│ 磁盘 = TB级存储 │
│ 消费者组 = 跨分区消费 │
└────────────────────────────────────────────┘
优势:海量堆积、分区并行、持久化到磁盘
劣势:运维复杂、延迟略高
6.2 关键差异
| 维度 | Redis Stream | Kafka |
|---|---|---|
| 分区/分片 | 无原生分区 | 分区是核心概念 |
| 存储 | 内存(RDB/AOF备份) | 磁盘(顺序写入) |
| 消息删除 | MAXLEN/XTRIM/手动DEL | 按时间或大小自动清理 |
| 单Stream吞吐 | 10万+/s | 百万+/s |
| 延迟 | <1ms | 2-10ms |
| 消费者进度 | 保存在Redis中 | 保存在Kafka内部topic |
| 消息回溯 | 支持(只要没被trim) | 支持(可配置保留时间) |
| 部署成本 | 低(你可能已有Redis) | 高(需独立集群) |
6.3 XTRIM:控制Stream长度
# 精确裁剪:保留最近1000条
127.0.0.1:6379> XTRIM orders MAXLEN 1000
# 近似裁剪:性能更高,但可能多保留几十条
127.0.0.1:6379> XTRIM orders MAXLEN ~ 1000
# 按最小ID裁剪:删除小于指定ID的消息
127.0.0.1:6379> XTRIM orders MINID 1716700001000-0
# XADD时同时限制长度(一步到位)
127.0.0.1:6379> XADD orders MAXLEN ~ 1000 * field value
⚠️ 注意:XTRIM会删除消息!如果有消费者还没消费到这些消息,消息就永久丢失了。在设置MAXLEN时要确保所有消费者组都能在消息被trim之前消费完毕。建议给Stream预留足够的长度余量,比如你日均消费1000条,设置MAXLEN 5000比1000更安全。
七、总结
Redis Stream让Redis真正成为了一款"可以当消息队列用"的数据结构服务器。它的核心价值在于:
- 零额外成本:你很可能已经在用Redis了,Stream不需要新的部署
- 消费者组:解决了多消费者协同消费的难题
- 消息ACK:消费确认机制保障了消息可靠投递
- 低延迟:内存操作,延迟可以做到亚毫秒级
但它不是Kafka的替代品。面对TB级堆积、分区并行消费、严格顺序保证等场景,专业的消息中间件仍然是最佳选择。
记住一个判断标准:如果你的团队已经在维护Kafka/RabbitMQ,就别折腾Stream了;如果你没有MQ,且消息量不大(百万级/天以下),Stream是性价比最高的选择。
下一篇文章,我们将聊聊HyperLogLog——一个用12KB内存就能统计1亿用户的神奇数据结构。
上一篇【第66篇】消息队列——用Redis实现轻量级MQ的四种方案
下一篇【第68篇】HyperLogLog——用极小内存统计超大基数
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)