大家好,在现在的ai项目中,我们经常涉及大量的长耗时操作,比如在ai音视频处理项目中,我们就要调用asr语音转文字API和大模型API生成视频总结,如果让前端同步等待20-30秒,用户早已关闭页面。
为了实现“上传即返回,后台稳如狗”的体验,我们可以引入RocketMQ。

一、 异步任务场景

假设现有一个音视频智能分析平台,核心功能是:

场景 说明 耗时
视频音频提取 基于 FFmpeg 从上传视频中分离提取音频流 30-60 秒
语音 ASR 转写 将音频完整转写为文本 2-5 分钟
AI 智能总结 调用大模型对转写文本进行智能总结 6-20 秒
知识库构建 对总结文本进行向量化处理,并存储至向量数据库中 8-35 秒

用户感知到的流程:上传 → 等待几分钟 → 看到 AI 总结和测验题目。

实际的系统链路

用户上传视频
    │
    ▼
[HTTP 接口] ──► 保存文件到 MinIO(1秒)
    │
    ▼
[发送 MQ] ──► 立即返回:"处理中,请稍候"
    │
    ▼
[RocketMQ Broker]
    │
    ├──► [消费者 1] FFmpeg 提取音频(30-60秒)
    │
    ├──► [消费者 2] ASR 语音转写(2-5分钟)
    │
    ├──► [消费者 3] AI 分析总结(1-3分钟)
    │
    └──► [消费者 4] 知识库向量化(30秒)

关键挑战:整个处理链路耗时 3-10 分钟,涉及 4 个耗时环节3 个外部 AI 服务


二、不用 RocketMQ 的"噩梦架构"

假设我们不用消息队列,直接同步处理:

// 伪代码:同步处理版本(灾难版)
@PostMapping("/upload")
public Result upload(MultipartFile file) {
    // 1. 保存文件
    MediaEntity media = saveFile(file);
    
    // 2. 提取音频(阻塞 30-60 秒)
    String audioPath = ffmpegService.extractAudio(media);
    
    // 3. 语音转写(阻塞 2-5 分钟)
    String transcript = asrService.transcribe(audioPath);
    
    // 4. AI 分析(阻塞 1-3 分钟)
    AnalysisResult result = aiService.analyze(transcript);
    
    // 5. 保存结果
    saveAnalysisResult(media, result);
    
    return Result.success(result); // 用户等了 5-8 分钟!
}

2.1 同步架构的四大致命问题

问题 现象 后果
HTTP 超时 Nginx/网关默认 60s 超时 大文件处理直接报错
资源耗尽 10 人同时上传,10 个 FFmpeg 进程 CPU 100%,系统卡死
级联失败 AI 接口超时 → 整个请求失败 用户需重新上传,体验极差
无法扩展 新增"视频标签生成"功能 必须修改上传接口,耦合严重

真实场景:一个 500MB 的视频,FFmpeg 提取音频要 1 分钟,ASR 转写要 2 分钟,AI 分析要 2 分钟。用户盯着浏览器等了 5 分钟,最后 AI 接口超时,一切重来。


三、引入 RocketMQ 后的架构蜕变

┌─────────────────────────────────────────────────────────────┐
│                     同步架构(无 MQ)                        │
│                                                             │
│   用户 ──► 上传 ──► [FFmpeg] ──► [ASR] ──► [AI] ──► 结果     │
│              ↑                              ↑               │
│              └──────────── 阻塞 5-10 分钟 ───┘               │
│                                                             │
│   问题:HTTP 超时、资源耗尽、级联失败、无法扩展                │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                    异步架构(有 MQ)                         │
│                                                             │
│   用户 ──► 上传 ──► [保存文件] ──► 返回"处理中"(1 秒内)     │
│                              │                              │
│                              ▼                              │
│                        [RocketMQ]                           │
│                              │                              │
│              ┌───────────────┼───────────────┐              │
│              ▼               ▼               ▼              │
│         [消费者 1]      [消费者 2]      [消费者 3]           │
│         提取音频        语音转写        AI 分析              │
│         (1 分钟)        (3 分钟)        (2 分钟)             │
│                                                             │
│   优势:快速响应、削峰填谷、失败重试、独立扩展                 │
└─────────────────────────────────────────────────────────────┘

架构流程图:

已有任务执行中

获取成功

用户上传视频 / 链接

MediaUploadService

保存文件到 MinIO / S3

保存媒体记录到 DB

创建 MediaProcessTask
状态=PENDING

AnalyzeRocketMQProducer

RocketMQ
Topic: video-analysis-topic
Tag: analyze-task

AnalyzeRocketMQConsumer

获取 Redisson 分布式锁

忽略重复消费

更新任务状态
RUNNING / ANALYZING

MediaSummaryService

AI 转写

AI 摘要

生成章节 / 标签 / 内容分析

保存分析结果到 MediaAnalysis 表

更新 MediaFile.summaryStatus = SUCCESS

更新 MediaProcessTask = SUCCESS

通知前端 SSE 推送

前端刷新详情页

是否需要构建知识库?

VectorizeRocketMQProducer

RocketMQ
Topic: knowledgebase-vectorize-topic
Tag: vectorize

VectorizeRocketMQConsumer

KnowledgeIndexBuildService

切片 transcript / summary

向量化 Embedding

写入 Chroma / 知识库

更新 KnowledgeBase 状态 COMPLETED

用户发起答题

QuizSessionService 创建答题会话

QuizEvaluateRocketMQProducer

RocketMQ
Topic: quiz-evaluate-topic
Tag: evaluate

QuizEvaluateRocketMQConsumer

更新评估状态 PROCESSING

QuizEvaluationService

AI 批改答案

生成 QuizReport

更新 QuizSession 状态 EVALUATED / SUCCESS

前端查看测评结果

状态流转图:

创建任务 / 保存数据库记录

RocketMQ Consumer 开始消费

MQ 发送失败

用户取消任务

业务执行完成

调用 AI / ASR / 向量库失败

超过最大重试次数

人工终止任务

RocketMQ 重新投递

重试次数耗尽

某次重试成功

PENDING

PROCESSING

FAILED

CANCELED

SUCCESS

RETRYING

PENDING :任务已创建,等待消费
PROCESSING:后台正在执行 AI 处理
RETRYING:执行失败,系统自动重试
SUCCESS:处理完成,通知前端展示结果
FAILED:重试耗尽,任务失败
CANCELED:用户 / 人工取消,终止任务

四、RocketMQ 在项目中的四大核心价值

4.1 价值一:异步解耦,用户体验质变

实现方式asyncSend 异步发送

// AbstractRocketMQProducer.java 核心代码
public void sendTask(T payload) {
    String mqDestination = getDestination(); // topic:tag
    
    // 异步发送:不阻塞主线程
    rocketMQTemplate.asyncSend(mqDestination, buildMessage(payload), new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("任务发送成功,消息ID:{}", sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
            // 发送失败兜底:更新数据库状态为 FAILED
            onSendFailed(payload, truncateError(e.getMessage()));
        }
    });
}

关键设计:即使 MQ 发送失败,也有 onSendFailed 回调更新数据库状态,用户刷新页面能看到"处理失败",可手动重试。


4.2 价值二:削峰填谷,保护系统资源

场景:1000 个用户同时上传视频

流量特征:
- 上传请求瞬时峰值:1000 QPS(持续 10 秒)
- 实际处理能力:FFmpeg 最多 4 并发,ASR 接口限流 10 QPS

无 MQ 时:
1000 个请求 ──► 1000 个 FFmpeg 进程启动 ──► 系统 OOM 崩溃

有 MQ 时:
1000 个请求 ──► 快速入队(Broker 缓冲)──► 消费者按能力消费
                │
                └── 消费者线程池:core=2, max=4, queue=50
                └── FFmpeg 最多 4 个并发,系统稳定

线程池配置

@Configuration
@EnableAsync
public class AsyncExecutorConfig {
    @Bean("mediaProcessExecutor")
    public Executor mediaProcessExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);      // 核心 2 线程
        executor.setMaxPoolSize(4);       // 最大 4 线程(保护 FFmpeg)
        executor.setQueueCapacity(50);    // 队列缓冲 50 任务
        executor.setThreadNamePrefix("media-process-");
        return executor;
    }
}

4.3 价值三:可靠投递,失败自动重试

三级可靠性保障

层级 机制 实现
Broker 级 消息持久化 + 主从复制 RocketMQ 默认配置
发送级 发送失败回调兜底 onSendFailed 更新数据库
消费级 业务失败重试 3 次 抛异常触发 RocketMQ 重试

消费端重试代码

@Slf4j
@Component
@RocketMQMessageListener(
    topic = "video-analysis-topic",
    selectorExpression = "analyze-task",
    consumerGroup = "media-analyze-consumer-group"
)
public class AnalyzeRocketMQConsumer implements RocketMQListener<Map<String, String>> {
    
    private static final int MAX_RETRY_COUNT = 3;
    
    @Override
    public void onMessage(Map<String, String> message) {
        int retryCount = parseRetryCount(message);
        AnalyzePayload payload = parsePayload(message);
        
        try {
            markProcessing(payload);        // 状态:PROCESSING
            long costMs = processBusiness(payload);  // 执行业务
            markCompleted(payload);          // 状态:SUCCESS
            
        } catch (Exception e) {
            if (retryCount < MAX_RETRY_COUNT) {
                // 抛异常 → RocketMQ 自动重试(间隔递增:1s, 5s, 10s)
                throw new RuntimeException("触发重试", e);
            } else {
                // 重试耗尽 → 标记失败,用户可手动重试
                markFailed(payload, "已重试" + retryCount + "次: " + e.getMessage());
            }
        }
    }
}

重试策略可视化

第 1 次消费 ──► 失败 ──► 1 秒后重试
第 2 次消费 ──► 失败 ──► 5 秒后重试  
第 3 次消费 ──► 失败 ──► 10 秒后重试
第 4 次消费 ──► 失败 ──► 进入死信队列(DLQ)/ 标记 FAILED

4.4 价值四:独立扩展,功能自由叠加

当前消费者组

业务 Topic Tag Consumer Group 职责
媒体分析 video-analysis-topic analyze-task media-analyze-consumer-group 转写 + AI 总结
知识库构建 knowledgebase-vectorize-topic vectorize knowledgebase-vectorize-consumer-group 向量化入库
测验评估 quiz-evaluate-topic evaluate quiz-evaluate-consumer-group AI 批改答案

扩展场景:新增"智能封面生成"功能

// 新增生产者(无需改上传接口)
@Component
public class CoverGenerateProducer extends AbstractRocketMQProducer<CoverPayload> {
    @Override protected String topic() { return "cover-generate-topic"; }
    @Override protected String tag() { return "generate"; }
}

// 新增消费者(独立部署,互不影响)
@RocketMQMessageListener(
    topic = "cover-generate-topic",
    selectorExpression = "generate",
    consumerGroup = "cover-generate-consumer-group"
)
public class CoverGenerateConsumer implements RocketMQListener<Map<String, String>> {
    // 调用 Stable Diffusion / DALL-E 生成封面
}

价值:上传接口完全不用改,新增功能独立开发、独立部署、独立扩缩容。


五、关键设计细节

5.1 问题一:异步重度依赖 MQ,消息积压、消费者消费不过来怎么监控和处理?

在 项目中,异步链路完全依赖 RocketMQ,一旦出现消息积压(消费者消费速度低于生产者发送速度),会导致用户任务处理延迟、超时,严重影响体验。核心解决思路是「监控先行,分层处理」,具体落地方案如下:

5.1.1 消息积压监控(提前预警,避免爆发)

基于 RocketMQ 自带监控能力 + 自定义告警,实现全链路监控,重点关注 3 个核心指标:

表格

监控指标 说明 阈值建议
队列堆积数 每个 Topic 的每个 Queue 中未消费消息数 消费者最大并发数 × 10(如 4 并发则设 40)
消费堆积时间 最早未消费消息的存储时间 30 分钟(超过则用户任务延迟明显)
消费速率 消费者每秒消费消息数 低于发送速率的 80% 时触发预警

实现方式:

  1. 利用 RocketMQ Console 可视化监控,实时查看队列堆积、消费速率等指标;

  2. 通过 RocketMQ 提供的 API 自定义监控采集,代码示例:

// 采集队列堆积数示例
@Autowired
private MQAdminExt mqAdminExt;

public void collectQueueBacklog() throws MQClientException {
    String topic = "video-analysis-topic";
    List<MessageQueue> queues = mqAdminExt.fetchMessageQueuesInTopic(topic);
    
    for (MessageQueue queue : queues) {
        // 获取队列最大偏移量(已发送消息数)
        long maxOffset = mqAdminExt.maxOffset(queue);
        // 获取队列已消费偏移量(已处理消息数)
        long consumeOffset = mqAdminExt.getConsumerOffset(
            queue.getTopic(), 
            queue.getConsumerGroup()
        );
        // 堆积数 = 最大偏移量 - 已消费偏移量
        long backlog = maxOffset - consumeOffset;
        
        // 触发告警(如钉钉、企业微信通知)
        if (backlog > 40) {
            alarmService.sendAlarm(
                "MQ消息积压告警", 
                "Topic:" + topic + ", Queue:" + queue.getQueueId() + ", 堆积数:" + backlog
            );
        }
    }
}

5.1.2 消息积压处理

根据积压程度,分 3 级 处理,避免小问题扩大为系统故障:

表格

级别 积压范围 处理策略
轻度积压 堆积数 < 100 扩容消费者并发数,动态调整线程池配置(无需重启服务)
中度积压 100 ≤ 堆积数 < 1000 部署临时消费者节点,新增相同 Consumer Group 的消费者分担压力
重度积压 堆积数 ≥ 1000 暂停非核心生产者,优先处理核心任务;同时优化消费逻辑

轻度积压处理代码示例:

// 动态调整消费者线程池(核心代码)
@Autowired
private ThreadPoolTaskExecutor mediaProcessExecutor;

public void scaleConsumerThreadPool(int coreSize, int maxSize) {
    mediaProcessExecutor.setCorePoolSize(coreSize);  // 动态增加核心线程
    mediaProcessExecutor.setMaxPoolSize(maxSize);    // 动态增加最大线程
}

说明: 中度积压时,RocketMQ 会自动负载均衡,将 Queue 分配给新增节点;重度积压时需优先保障核心链路(音频提取、ASR 转写)。


5.2 问题二:使用 RocketMQ 做业务解耦,视频上传多步骤是否需要顺序处理?如何实现 RocketMQ 消息顺序消费?

核心结论: 视频上传的多步骤(音频提取 → ASR 转写 → AI 总结 → 知识库构建)必须保证顺序处理,但无需全局顺序,只需保证「同一视频」的步骤顺序即可;不同视频的步骤可并行处理,兼顾顺序性和吞吐量。

5.2.1 为什么必须保证顺序?

视频处理的多步骤存在强依赖关系

音频提取 → ASR 转写 → AI 总结 → 知识库构建
   ↑           ↑          ↑
未完成则无法进行下一步,若顺序错乱会导致:
- 任务失败(如用未转写完成的空文本调用 AI 接口)
- 产生脏数据

5.2.2 RocketMQ 顺序消费实现方案

RocketMQ 顺序消费的核心是「同一 Sharding Key 的消息进入同一 Queue,单线程消费」,结合项目场景,最优实现方案如下:

步骤 实现要点 具体说明
1. 选择 Sharding Key mediaId(视频 ID)作为 Sharding Key 确保同一视频的所有步骤消息进入同一 Queue;不同视频的消息可进入不同 Queue,实现并行处理
2. 生产者发送顺序消息 发送消息时指定 Sharding Key msg.setKeys(mediaId) 或使用顺序消息 API
3. 消费者配置顺序消费 设置消费模式为 ORDERLY 确保同一 Queue 单线程消费
4. 优化:避免单 Queue 阻塞 Topic 配置多个 Queue(建议 8 个) RocketMQ 根据 mediaId 哈希值分配 Queue,避免单个视频长时间任务阻塞全局处理;同时设置消费超时时间,防止单个任务卡住 Queue

5.3 问题三:RocketMQ 分片处理视频、保证顺序一致,会对 AI 解析总结结果产生影响吗?

核心结论: 只要保证「同一视频」的处理步骤顺序一致,RocketMQ 分片处理(多 Queue 并行)不会对 AI 解析总结结果产生任何影响;反而能提升整体处理效率,避免单视频处理阻塞全局。

5.3.1 分片处理的核心逻辑(不影响顺序)

┌─────────────────────────────────────────────────────────┐
│                    RocketMQ 分片处理架构                 │
├─────────────────────────────────────────────────────────┤
│  Queue 1 (mediaId=hash(A))  →  视频A: 提取→转写→总结→构建  │
│  Queue 2 (mediaId=hash(B))  →  视频B: 提取→转写→总结→构建  │
│  Queue 3 (mediaId=hash(C))  →  视频C: 提取→转写→总结→构建  │
│  ...                                                    │
│  同一 mediaId 始终分配到同一个 Queue,保证步骤顺序          │
│  不同 Queue 之间并行处理,互不影响                         │
└─────────────────────────────────────────────────────────┘
  • 分片依据:通过 mediaId 的哈希值分配 Queue,同一 mediaId 始终分配到同一个 Queue,保证其步骤顺序;

  • 并行逻辑:不同 Queue 之间并行处理,互不影响(如 Queue1 处理视频A的步骤,Queue2 处理视频B的步骤);

  • 顺序保证:同一 Queue 内的消息单线程消费,确保视频A的「音频提取 → ASR 转写 → AI 总结」步骤依次执行。

5.3.2 对 AI 解析总结结果无影响的关键原因

原因 说明
数据隔离 每个视频的处理数据(音频文件、转写文本、AI 总结结果)都通过 mediaId 关联,存储在独立的数据库记录中,分片处理不会导致数据混淆
顺序闭环 AI 解析总结依赖的前置数据(ASR 转写文本),会在顺序消费中确保已完成生成,不会出现「AI 总结先执行、转写未完成」的情况
无状态消费 AI 解析总结是无状态操作,仅依赖当前视频的转写文本,与其他视频的处理进度、分片位置无关,不会因分片并行产生干扰

5.3.3 潜在风险与规避方案

⚠️ 唯一潜在风险:若 Queue 分配不均(如某 Queue 分配的视频数量过多、单个视频处理耗时过长),可能导致该 Queue 内的视频 AI 总结延迟,但不会影响结果准确性

规避方案:

  1. 合理配置 Queue 数量:建议等于消费者节点数 × 2,确保负载均衡;

  2. 长耗时视频隔离:对超过 10 分钟的视频单独分配 Topic 或 Queue,避免阻塞普通视频;

  3. 实时监控调整:通过监控实时关注各 Queue 的处理进度,及时调整消费者并发。


六、替代方案对比:为什么选 RocketMQ?

方案 优点 缺点 适用场景
Spring @Async 简单,无外部依赖 应用重启任务丢失、无重试、无削峰 轻量级异步
数据库轮询 简单可靠 延迟高(秒级)、数据库压力大 低频任务
Redis 队列 轻量、高性能 无消费确认、无重试、无轨迹 简单队列
Kafka 吞吐极高、生态丰富 运维重、延迟较高、Spring 集成较复杂 日志、大数据
RocketMQ 功能完备、延迟低、Spring 集成好 社区较 Kafka 小 金融级业务消息

项目选择 RocketMQ 的关键原因

  1. 功能匹配:事务消息、顺序消息、延时消息、消息轨迹,未来都可能用到
  2. 延迟敏感:AI 分析任务需要低延迟调度(Kafka 更适合高吞吐日志)
  3. Spring 生态rocketmq-spring-boot-starter 注解驱动,开发效率高
  4. 阿里云原生:若上云,无缝对接阿里云 RocketMQ 托管服务

七、总结:RocketMQ 重构前后对比

┌─────────────────────────────────────────────────────────────┐
│  没有 RocketMQ 的 项目                                       │
│  ├── 用户上传 500MB 视频,等待 8 分钟,最后超时失败            │
│  ├── 10 人同时上传,服务器 CPU 100%,全部失败                 │
│  ├── AI 接口 1 次超时,整个任务重来,用户体验极差              │
│  └── 新增"智能封面"功能,必须修改上传接口,回归测试两周        │
├─────────────────────────────────────────────────────────────┤
│  有 RocketMQ 的 项目                                         │
│  ├── 用户 1 秒得到响应,关闭页面,稍后收到完成通知            │
│  ├── 1000 人同时上传,系统稳如泰山,按序处理                  │
│  ├── AI 接口失败自动重试 3 次,最终失败用户可一键重试         │
│  └── 新增功能独立扩展,2 天开发上线,零影响现有功能            │
└─────────────────────────────────────────────────────────────┘

RocketMQ 在 项目中的定位:不是"锦上添花",而是支撑整个 AI 处理链路的骨架。没有它,这个系统无法从"Demo 级"进化到"生产级"。


参考代码

完整项目代码结构:

src/main/java/com/example/videoseek/
├── common/
│   ├── async/
│   │   └── AbstractRocketMQProducer.java      # 通用生产者抽象
│   └── constant/
│       └── AsyncTaskStreamConstants.java      # MQ 常量配置
├── modules/
│   ├── media/
│   │   └── listener/
│   │       ├── AnalyzeRocketMQProducer.java   # 媒体分析生产者
│   │       └── AnalyzeRocketMQConsumer.java   # 媒体分析消费者
│   ├── knowledgebase/
│   │   └── listener/
│   │       ├── VectorizeRocketMQProducer.java # 向量化生产者
│   │       └── VectorizeRocketMQConsumer.java # 向量化消费者
│   └── quiz/
│       └── listener/
│           ├── QuizEvaluateRocketMQProducer.java # 测验评估生产者
│           └── QuizEvaluateRocketMQConsumer.java # 测验评估消费者

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐