总思路:(生产者和消费组都要)创建消费组,消费者创建线程池,用子线程创建消费组、 消费消息,最后手动ack。

以大模型解析简历并发送分析简历到消息队列为例

父类抽象

package interview.guide.common.async;

import interview.guide.common.constant.AsyncTaskStreamConstants;
import interview.guide.infrastructure.redis.RedisService;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

/**
 * Redis Stream 生产者模板基类。
 * 统一消息发送骨架与失败处理逻辑。
 */
@Slf4j
public abstract class AbstractStreamProducer<T> {

    private final RedisService redisService;

    protected AbstractStreamProducer(RedisService redisService) {
        this.redisService = redisService;
    }

    protected void sendTask(T payload) {
        try {
            String messageId = redisService.streamAdd(
                streamKey(),
                buildMessage(payload),
                AsyncTaskStreamConstants.STREAM_MAX_LEN
            );
            log.info("{}任务已发送到Stream: {}, messageId={}",
                taskDisplayName(), payloadIdentifier(payload), messageId);
        } catch (Exception e) {
            log.error("发送{}任务失败: {}, error={}",
                taskDisplayName(), payloadIdentifier(payload), e.getMessage(), e);
            onSendFailed(payload, "任务入队失败: " + e.getMessage());
        }
    }

    protected String truncateError(String error) {
        if (error == null) {
            return null;
        }
        return error.length() > 500 ? error.substring(0, 500) : error;
    }

    protected abstract String taskDisplayName();

    protected abstract String streamKey();

    protected abstract Map<String, String> buildMessage(T payload);

    protected abstract String payloadIdentifier(T payload);

    protected abstract void onSendFailed(T payload, String error);
}

消费者:

package interview.guide.modules.resume.listener;

import interview.guide.common.async.AbstractStreamProducer;
import interview.guide.common.constant.AsyncTaskStreamConstants;
import interview.guide.common.model.AsyncTaskStatus;
import interview.guide.infrastructure.redis.RedisService;
import interview.guide.modules.resume.repository.ResumeRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 简历分析任务生产者
 * 负责发送分析任务到 Redis Stream
 */
@Slf4j
@Component
public class AnalyzeStreamProducer extends AbstractStreamProducer<AnalyzeStreamProducer.AnalyzeTaskPayload> {

    private final ResumeRepository resumeRepository;

    record AnalyzeTaskPayload(Long resumeId, String content) {}

    public AnalyzeStreamProducer(RedisService redisService, ResumeRepository resumeRepository) {
        super(redisService);
        this.resumeRepository = resumeRepository;
    }

    /**
     * 发送分析任务到 Redis Stream
     *
     * @param resumeId 简历ID
     * @param content  简历内容
     */
    public void sendAnalyzeTask(Long resumeId, String content) {
        sendTask(new AnalyzeTaskPayload(resumeId, content));
    }

    @Override
    protected String taskDisplayName() {
        return "分析";
    }

    @Override
    protected String streamKey() {
        return AsyncTaskStreamConstants.RESUME_ANALYZE_STREAM_KEY;
    }

    @Override
    protected Map<String, String> buildMessage(AnalyzeTaskPayload payload) {
        return Map.of(
            AsyncTaskStreamConstants.FIELD_RESUME_ID, payload.resumeId().toString(),
            AsyncTaskStreamConstants.FIELD_CONTENT, payload.content(),
            AsyncTaskStreamConstants.FIELD_RETRY_COUNT, "0"
        );
    }

    @Override
    protected String payloadIdentifier(AnalyzeTaskPayload payload) {
        return "resumeId=" + payload.resumeId();
    }

    @Override
    protected void onSendFailed(AnalyzeTaskPayload payload, String error) {
        updateAnalyzeStatus(payload.resumeId(), AsyncTaskStatus.FAILED, truncateError(error));
    }

    /**
     * 更新分析状态
     */
    private void updateAnalyzeStatus(Long resumeId, AsyncTaskStatus status, String error) {
        resumeRepository.findById(resumeId).ifPresent(resume -> {
            resume.setAnalyzeStatus(status);
            if (error != null) {
                resume.setAnalyzeError(error.length() > 500 ? error.substring(0, 500) : error);
            }
            resumeRepository.save(resume);
        });
    }
}

生产者核心代码分析

消息负载定义

record AnalyzeTaskPayload(Long resumeId, String content) {}

使用Java record定义消息负载结构,包含简历ID和内容字段。

任务发送方法

public void sendAnalyzeTask(Long resumeId, String content) {
  sendTask(new AnalyzeTaskPayload(resumeId, content));
}

封装简历分析任务的发送入口,将参数转换为消息负载对象。

消息构建逻辑

@Override
protected Map<String, String> buildMessage(AnalyzeTaskPayload payload) {
  return Map.of(
    AsyncTaskStreamConstants.FIELD_RESUME_ID, payload.resumeId().toString(),
    AsyncTaskStreamConstants.FIELD_CONTENT, payload.content(),
    AsyncTaskStreamConstants.FIELD_RETRY_COUNT, "0"
  );
}

将消息负载转换为Redis Stream消息格式,包含重试次数字段初始化。

基础发送逻辑

protected void sendTask(T payload) {
  try {
    String messageId = redisService.streamAdd(
      streamKey(),
      buildMessage(payload),
      AsyncTaskStreamConstants.STREAM_MAX_LEN
    );
    log.info("{}任务已发送到Stream: {}, messageId={}",
      taskDisplayName(), payloadIdentifier(payload), messageId);
  } catch (Exception e) {
    log.error("发送{}任务失败: {}, error={}",
      taskDisplayName(), payloadIdentifier(payload), e.getMessage(), e);
    onSendFailed(payload, "任务入队失败: " + e.getMessage());
  }
}

实现消息发送核心逻辑,包含流控日志记录和异常处理。

消费者核心实现

初始化流程

@PostConstruct
public void init() {
  this.consumerName = consumerPrefix() + UUID.randomUUID().toString().substring(0, 8);
  this.executorService = new ThreadPoolExecutor(
    1, 1, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(),
    r -> {
      Thread t = new Thread(r, threadName());
      t.setDaemon(true);
      return t;
    },
    new ThreadPoolExecutor.AbortPolicy()
  );
  running.set(true);
  executorService.submit(this::startConsumer);
}

初始化消费者实例,创建单线程处理池并启动消费线程。

消费组准备

private void startConsumer() {
  try {
    redisService.createStreamGroup(streamKey(), groupName());
    log.info("Redis Stream group is ready: {}", groupName());
  } catch (Exception e) {
    log.warn("Failed to prepare Redis Stream group: groupName={}", groupName(), e);
  }
  consumeLoop();
}

确保消费组存在后进入消息处理循环。

消息处理循环

private void consumeLoop() {
  while (running.get()) {
    try {
      redisService.streamConsumeMessages(
        streamKey(),
        groupName(),
        consumerName,
        AsyncTaskStreamConstants.BATCH_SIZE,
        AsyncTaskStreamConstants.POLL_INTERVAL_MS,
        this::processMessage
      );
    } catch (Exception e) {
      log.error("Failed to consume message", e);
    }
  }
}

持续从Stream拉取消息并交给处理器。

消息处理核心

private void processMessage(StreamMessageId messageId, Map<String, String> data) {
  T payload = parsePayload(messageId, data);
  if (payload == null) {
    ackMessage(messageId);
    return;
  }
  
  int retryCount = parseRetryCount(data);
  log.info("Processing {} task: payload={}, messageId={}, retryCount={}",
    taskDisplayName(), payloadIdentifier(payload), messageId, retryCount);

  try {
    markProcessing(payload);
    processBusiness(payload);
    markCompleted(payload);
    ackMessage(messageId);
  } catch (Exception e) {
    log.error("{} task failed: {}", taskDisplayName(), payloadIdentifier(payload), e);
    if (retryCount < AsyncTaskStreamConstants.MAX_RETRY_COUNT) {
      retryMessage(payload, retryCount + 1);
    } else {
      markFailed(payload, truncateError(
        taskDisplayName() + " failed after retry " + retryCount + ": " + e.getMessage()
      ));
    }
    ackMessage(messageId);
  }
}

实现消息处理全生命周期管理,包含状态标记、业务处理、重试机制。

    关键交互流程

    生产者侧流程

    1. 构建标准格式消息负载
    2. 通过streamAdd写入Redis Stream
    3. 记录发送状态日志

    消费者侧流程

    1. 初始化消费组和线程池
    2. 持续消费Stream消息
    3. 处理成功后发送ACK确认
    4. 失败时执行重试策略

    异常处理机制

    • 网络异常自动重连
    • 业务异常触发重试
    • 达到最大重试次数标记失败

    性能优化点

    消息处理优化

    • 单线程顺序处理保证消息有序性
    • 异步标记处理状态减少IO等待
    • 合理的批次大小和轮询间隔

    资源管理

    • 守护线程设计避免服务中断阻塞
    • 显式ACK机制防止消息丢失
    • 流长度控制避免内存溢出

    监控指标

    • 发送成功率监控
    • 消费延迟监控
    • 重试次数统计
    • 消息积压告警

    个人理解

    生产者使用stream.add,发送消息到stream,消费者抽象父类,使用@PostConstruct初始化创建线程池,并使用new LinkedBlockingQueue<>()阻塞队列,执行startConsumer方法,消费者实现:先通过redissonClient.getStream(streamKey, StringCodec.INSTANCE);获取stram,stream.createGroup(StreamCreateGroupArgs.name(groupName).makeStream());创建消费组。再执行consumeLoop()方法,是一个whiel循环,判断条件为抽象父类使用AtomicBoolean保证线程中全部消费者线程同一执行或停止

    redissonClient.getStream(streamKey, StringCodec.INSTANCE);
    获取Map<StreamMessageId, Map<String, String>> messages= stream.readGroup(                 groupName,
                    consumerName,                 
                    StreamReadGroupArgs
                    .neverDelivered()                     
                    .count(count)
                    .timeout(Duration.ofMillis(blockTimeoutMs))
    );

    获取消息,再执行消费逻辑,redisService.streamAck(streamKey(), groupName(), messageId);手动ack消息。

    生产者发送消息

    生产者端负责将业务数据封装并推送至指定的 Stream 中。

    • 发送机制:调用 stream.add 方法,将消息体发送至目标 Stream。

    ️ 消费者抽象父类

    作为所有具体消费者的基类,负责统一的线程管理与生命周期控制。

    • 初始化配置:使用 @PostConstruct 注解,在 Bean 加载完成后自动执行初始化逻辑。
    • 线程池构建:创建专用的单线程池(ThreadPoolExecutor)用于执行后台消费任务。
    • 任务队列:使用 new LinkedBlockingQueue<>() 作为阻塞队列,用于存放待执行的消费任务。
    • 启动消费:初始化完成后,向线程池提交 startConsumer 方法,启动具体的消费逻辑。
    • 状态控制:定义 AtomicBoolean running 标志位,确保在多线程环境下,所有消费者线程能够统一、安全地执行或停止。

    ️ 消费者具体实现

    继承自抽象父类,负责具体的 Stream 监听与业务处理。

    1. 获取 Stream 实例:通过 redissonClient.getStream(streamKey, StringCodec.INSTANCE) 获取指定 Stream 的操作句柄。
    2. 创建消费组:执行 stream.createGroup,并传入 StreamCreateGroupArgs.name(groupName).makeStream(),确保消费组(Consumer Group)已就绪。
    3. 进入消费循环:执行 consumeLoop() 方法,该方法内部是一个 while 循环,循环条件依赖于父类的 AtomicBoolean 状态。
    4. 拉取消息:在循环中调用 stream.readGroup 拉取待处理消息。配置参数包括:
      • 指定消费组名(groupName)与消费者名(consumerName)。
      • 使用 StreamReadGroupArgs.neverDelivered() 确保只读取未交付的新消息。
      • 设置单次拉取数量(count)与阻塞超时时间(timeout)。
    5. 执行业务与确认:遍历获取到的 Map<StreamMessageId, Map<String, String>> 消息集合,执行具体的业务消费逻辑。处理成功后,调用 redisService.streamAck 手动确认(ACK)消息,防止消息重复消费。
    Logo

    AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

    更多推荐