redis stream实现消息队列
总思路:(生产者和消费组都要)创建消费组,消费者创建线程池,用子线程创建消费组、 消费消息,最后手动ack。
以大模型解析简历并发送分析简历到消息队列为例
父类抽象
package interview.guide.common.async;
import interview.guide.common.constant.AsyncTaskStreamConstants;
import interview.guide.infrastructure.redis.RedisService;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* Redis Stream 生产者模板基类。
* 统一消息发送骨架与失败处理逻辑。
*/
@Slf4j
public abstract class AbstractStreamProducer<T> {
private final RedisService redisService;
protected AbstractStreamProducer(RedisService redisService) {
this.redisService = redisService;
}
protected void sendTask(T payload) {
try {
String messageId = redisService.streamAdd(
streamKey(),
buildMessage(payload),
AsyncTaskStreamConstants.STREAM_MAX_LEN
);
log.info("{}任务已发送到Stream: {}, messageId={}",
taskDisplayName(), payloadIdentifier(payload), messageId);
} catch (Exception e) {
log.error("发送{}任务失败: {}, error={}",
taskDisplayName(), payloadIdentifier(payload), e.getMessage(), e);
onSendFailed(payload, "任务入队失败: " + e.getMessage());
}
}
protected String truncateError(String error) {
if (error == null) {
return null;
}
return error.length() > 500 ? error.substring(0, 500) : error;
}
protected abstract String taskDisplayName();
protected abstract String streamKey();
protected abstract Map<String, String> buildMessage(T payload);
protected abstract String payloadIdentifier(T payload);
protected abstract void onSendFailed(T payload, String error);
}
消费者:
package interview.guide.modules.resume.listener;
import interview.guide.common.async.AbstractStreamProducer;
import interview.guide.common.constant.AsyncTaskStreamConstants;
import interview.guide.common.model.AsyncTaskStatus;
import interview.guide.infrastructure.redis.RedisService;
import interview.guide.modules.resume.repository.ResumeRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 简历分析任务生产者
* 负责发送分析任务到 Redis Stream
*/
@Slf4j
@Component
public class AnalyzeStreamProducer extends AbstractStreamProducer<AnalyzeStreamProducer.AnalyzeTaskPayload> {
private final ResumeRepository resumeRepository;
record AnalyzeTaskPayload(Long resumeId, String content) {}
public AnalyzeStreamProducer(RedisService redisService, ResumeRepository resumeRepository) {
super(redisService);
this.resumeRepository = resumeRepository;
}
/**
* 发送分析任务到 Redis Stream
*
* @param resumeId 简历ID
* @param content 简历内容
*/
public void sendAnalyzeTask(Long resumeId, String content) {
sendTask(new AnalyzeTaskPayload(resumeId, content));
}
@Override
protected String taskDisplayName() {
return "分析";
}
@Override
protected String streamKey() {
return AsyncTaskStreamConstants.RESUME_ANALYZE_STREAM_KEY;
}
@Override
protected Map<String, String> buildMessage(AnalyzeTaskPayload payload) {
return Map.of(
AsyncTaskStreamConstants.FIELD_RESUME_ID, payload.resumeId().toString(),
AsyncTaskStreamConstants.FIELD_CONTENT, payload.content(),
AsyncTaskStreamConstants.FIELD_RETRY_COUNT, "0"
);
}
@Override
protected String payloadIdentifier(AnalyzeTaskPayload payload) {
return "resumeId=" + payload.resumeId();
}
@Override
protected void onSendFailed(AnalyzeTaskPayload payload, String error) {
updateAnalyzeStatus(payload.resumeId(), AsyncTaskStatus.FAILED, truncateError(error));
}
/**
* 更新分析状态
*/
private void updateAnalyzeStatus(Long resumeId, AsyncTaskStatus status, String error) {
resumeRepository.findById(resumeId).ifPresent(resume -> {
resume.setAnalyzeStatus(status);
if (error != null) {
resume.setAnalyzeError(error.length() > 500 ? error.substring(0, 500) : error);
}
resumeRepository.save(resume);
});
}
}
生产者核心代码分析
消息负载定义
record AnalyzeTaskPayload(Long resumeId, String content) {}
使用Java record定义消息负载结构,包含简历ID和内容字段。
任务发送方法
public void sendAnalyzeTask(Long resumeId, String content) {
sendTask(new AnalyzeTaskPayload(resumeId, content));
}
封装简历分析任务的发送入口,将参数转换为消息负载对象。
消息构建逻辑
@Override
protected Map<String, String> buildMessage(AnalyzeTaskPayload payload) {
return Map.of(
AsyncTaskStreamConstants.FIELD_RESUME_ID, payload.resumeId().toString(),
AsyncTaskStreamConstants.FIELD_CONTENT, payload.content(),
AsyncTaskStreamConstants.FIELD_RETRY_COUNT, "0"
);
}
将消息负载转换为Redis Stream消息格式,包含重试次数字段初始化。
基础发送逻辑
protected void sendTask(T payload) {
try {
String messageId = redisService.streamAdd(
streamKey(),
buildMessage(payload),
AsyncTaskStreamConstants.STREAM_MAX_LEN
);
log.info("{}任务已发送到Stream: {}, messageId={}",
taskDisplayName(), payloadIdentifier(payload), messageId);
} catch (Exception e) {
log.error("发送{}任务失败: {}, error={}",
taskDisplayName(), payloadIdentifier(payload), e.getMessage(), e);
onSendFailed(payload, "任务入队失败: " + e.getMessage());
}
}
实现消息发送核心逻辑,包含流控日志记录和异常处理。
消费者核心实现
初始化流程
@PostConstruct
public void init() {
this.consumerName = consumerPrefix() + UUID.randomUUID().toString().substring(0, 8);
this.executorService = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
r -> {
Thread t = new Thread(r, threadName());
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.AbortPolicy()
);
running.set(true);
executorService.submit(this::startConsumer);
}
初始化消费者实例,创建单线程处理池并启动消费线程。
消费组准备
private void startConsumer() {
try {
redisService.createStreamGroup(streamKey(), groupName());
log.info("Redis Stream group is ready: {}", groupName());
} catch (Exception e) {
log.warn("Failed to prepare Redis Stream group: groupName={}", groupName(), e);
}
consumeLoop();
}
确保消费组存在后进入消息处理循环。
消息处理循环
private void consumeLoop() {
while (running.get()) {
try {
redisService.streamConsumeMessages(
streamKey(),
groupName(),
consumerName,
AsyncTaskStreamConstants.BATCH_SIZE,
AsyncTaskStreamConstants.POLL_INTERVAL_MS,
this::processMessage
);
} catch (Exception e) {
log.error("Failed to consume message", e);
}
}
}
持续从Stream拉取消息并交给处理器。
消息处理核心
private void processMessage(StreamMessageId messageId, Map<String, String> data) {
T payload = parsePayload(messageId, data);
if (payload == null) {
ackMessage(messageId);
return;
}
int retryCount = parseRetryCount(data);
log.info("Processing {} task: payload={}, messageId={}, retryCount={}",
taskDisplayName(), payloadIdentifier(payload), messageId, retryCount);
try {
markProcessing(payload);
processBusiness(payload);
markCompleted(payload);
ackMessage(messageId);
} catch (Exception e) {
log.error("{} task failed: {}", taskDisplayName(), payloadIdentifier(payload), e);
if (retryCount < AsyncTaskStreamConstants.MAX_RETRY_COUNT) {
retryMessage(payload, retryCount + 1);
} else {
markFailed(payload, truncateError(
taskDisplayName() + " failed after retry " + retryCount + ": " + e.getMessage()
));
}
ackMessage(messageId);
}
}
实现消息处理全生命周期管理,包含状态标记、业务处理、重试机制。
关键交互流程
生产者侧流程
- 构建标准格式消息负载
- 通过streamAdd写入Redis Stream
- 记录发送状态日志
消费者侧流程
- 初始化消费组和线程池
- 持续消费Stream消息
- 处理成功后发送ACK确认
- 失败时执行重试策略
异常处理机制
- 网络异常自动重连
- 业务异常触发重试
- 达到最大重试次数标记失败
性能优化点
消息处理优化
- 单线程顺序处理保证消息有序性
- 异步标记处理状态减少IO等待
- 合理的批次大小和轮询间隔
资源管理
- 守护线程设计避免服务中断阻塞
- 显式ACK机制防止消息丢失
- 流长度控制避免内存溢出
监控指标
- 发送成功率监控
- 消费延迟监控
- 重试次数统计
- 消息积压告警
个人理解
生产者使用stream.add,发送消息到stream,消费者抽象父类,使用@PostConstruct初始化创建线程池,并使用new LinkedBlockingQueue<>()阻塞队列,执行startConsumer方法,消费者实现:先通过redissonClient.getStream(streamKey, StringCodec.INSTANCE);获取stram,stream.createGroup(StreamCreateGroupArgs.name(groupName).makeStream());创建消费组。再执行consumeLoop()方法,是一个whiel循环,判断条件为抽象父类使用AtomicBoolean保证线程中全部消费者线程同一执行或停止
redissonClient.getStream(streamKey, StringCodec.INSTANCE);
获取Map<StreamMessageId, Map<String, String>> messages= stream.readGroup( groupName,
consumerName,
StreamReadGroupArgs
.neverDelivered()
.count(count)
.timeout(Duration.ofMillis(blockTimeoutMs))
);
获取消息,再执行消费逻辑,redisService.streamAck(streamKey(), groupName(), messageId);手动ack消息。
生产者发送消息
生产者端负责将业务数据封装并推送至指定的 Stream 中。
- 发送机制:调用
stream.add方法,将消息体发送至目标 Stream。
️ 消费者抽象父类
作为所有具体消费者的基类,负责统一的线程管理与生命周期控制。
- 初始化配置:使用
@PostConstruct注解,在 Bean 加载完成后自动执行初始化逻辑。 - 线程池构建:创建专用的单线程池(
ThreadPoolExecutor)用于执行后台消费任务。 - 任务队列:使用
new LinkedBlockingQueue<>()作为阻塞队列,用于存放待执行的消费任务。 - 启动消费:初始化完成后,向线程池提交
startConsumer方法,启动具体的消费逻辑。 - 状态控制:定义
AtomicBoolean running标志位,确保在多线程环境下,所有消费者线程能够统一、安全地执行或停止。
️ 消费者具体实现
继承自抽象父类,负责具体的 Stream 监听与业务处理。
- 获取 Stream 实例:通过
redissonClient.getStream(streamKey, StringCodec.INSTANCE)获取指定 Stream 的操作句柄。 - 创建消费组:执行
stream.createGroup,并传入StreamCreateGroupArgs.name(groupName).makeStream(),确保消费组(Consumer Group)已就绪。 - 进入消费循环:执行
consumeLoop()方法,该方法内部是一个while循环,循环条件依赖于父类的AtomicBoolean状态。 - 拉取消息:在循环中调用
stream.readGroup拉取待处理消息。配置参数包括:- 指定消费组名(
groupName)与消费者名(consumerName)。 - 使用
StreamReadGroupArgs.neverDelivered()确保只读取未交付的新消息。 - 设置单次拉取数量(
count)与阻塞超时时间(timeout)。
- 指定消费组名(
- 执行业务与确认:遍历获取到的
Map<StreamMessageId, Map<String, String>>消息集合,执行具体的业务消费逻辑。处理成功后,调用redisService.streamAck手动确认(ACK)消息,防止消息重复消费。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)