RocketMQ实战:异步处理长耗时API调用
目录
大家好,在现在的ai项目中,我们经常涉及大量的长耗时操作,比如在ai音视频处理项目中,我们就要调用asr语音转文字API和大模型API生成视频总结,如果让前端同步等待20-30秒,用户早已关闭页面。
为了实现“上传即返回,后台稳如狗”的体验,我们可以引入RocketMQ。
一、 异步任务场景
假设现有一个音视频智能分析平台,核心功能是:
| 场景 | 说明 | 耗时 |
|---|---|---|
| 视频音频提取 | 基于 FFmpeg 从上传视频中分离提取音频流 | 30-60 秒 |
| 语音 ASR 转写 | 将音频完整转写为文本 | 2-5 分钟 |
| AI 智能总结 | 调用大模型对转写文本进行智能总结 | 6-20 秒 |
| 知识库构建 | 对总结文本进行向量化处理,并存储至向量数据库中 | 8-35 秒 |
用户感知到的流程:上传 → 等待几分钟 → 看到 AI 总结和测验题目。
实际的系统链路:
用户上传视频
│
▼
[HTTP 接口] ──► 保存文件到 MinIO(1秒)
│
▼
[发送 MQ] ──► 立即返回:"处理中,请稍候"
│
▼
[RocketMQ Broker]
│
├──► [消费者 1] FFmpeg 提取音频(30-60秒)
│
├──► [消费者 2] ASR 语音转写(2-5分钟)
│
├──► [消费者 3] AI 分析总结(1-3分钟)
│
└──► [消费者 4] 知识库向量化(30秒)
关键挑战:整个处理链路耗时 3-10 分钟,涉及 4 个耗时环节、3 个外部 AI 服务。
二、不用 RocketMQ 的"噩梦架构"
假设我们不用消息队列,直接同步处理:
// 伪代码:同步处理版本(灾难版)
@PostMapping("/upload")
public Result upload(MultipartFile file) {
// 1. 保存文件
MediaEntity media = saveFile(file);
// 2. 提取音频(阻塞 30-60 秒)
String audioPath = ffmpegService.extractAudio(media);
// 3. 语音转写(阻塞 2-5 分钟)
String transcript = asrService.transcribe(audioPath);
// 4. AI 分析(阻塞 1-3 分钟)
AnalysisResult result = aiService.analyze(transcript);
// 5. 保存结果
saveAnalysisResult(media, result);
return Result.success(result); // 用户等了 5-8 分钟!
}
2.1 同步架构的四大致命问题
| 问题 | 现象 | 后果 |
|---|---|---|
| HTTP 超时 | Nginx/网关默认 60s 超时 | 大文件处理直接报错 |
| 资源耗尽 | 10 人同时上传,10 个 FFmpeg 进程 | CPU 100%,系统卡死 |
| 级联失败 | AI 接口超时 → 整个请求失败 | 用户需重新上传,体验极差 |
| 无法扩展 | 新增"视频标签生成"功能 | 必须修改上传接口,耦合严重 |
真实场景:一个 500MB 的视频,FFmpeg 提取音频要 1 分钟,ASR 转写要 2 分钟,AI 分析要 2 分钟。用户盯着浏览器等了 5 分钟,最后 AI 接口超时,一切重来。
三、引入 RocketMQ 后的架构蜕变
┌─────────────────────────────────────────────────────────────┐
│ 同步架构(无 MQ) │
│ │
│ 用户 ──► 上传 ──► [FFmpeg] ──► [ASR] ──► [AI] ──► 结果 │
│ ↑ ↑ │
│ └──────────── 阻塞 5-10 分钟 ───┘ │
│ │
│ 问题:HTTP 超时、资源耗尽、级联失败、无法扩展 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 异步架构(有 MQ) │
│ │
│ 用户 ──► 上传 ──► [保存文件] ──► 返回"处理中"(1 秒内) │
│ │ │
│ ▼ │
│ [RocketMQ] │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ [消费者 1] [消费者 2] [消费者 3] │
│ 提取音频 语音转写 AI 分析 │
│ (1 分钟) (3 分钟) (2 分钟) │
│ │
│ 优势:快速响应、削峰填谷、失败重试、独立扩展 │
└─────────────────────────────────────────────────────────────┘
架构流程图:
状态流转图:
PENDING :任务已创建,等待消费
PROCESSING:后台正在执行 AI 处理
RETRYING:执行失败,系统自动重试
SUCCESS:处理完成,通知前端展示结果
FAILED:重试耗尽,任务失败
CANCELED:用户 / 人工取消,终止任务
四、RocketMQ 在项目中的四大核心价值
4.1 价值一:异步解耦,用户体验质变
实现方式:asyncSend 异步发送
// AbstractRocketMQProducer.java 核心代码
public void sendTask(T payload) {
String mqDestination = getDestination(); // topic:tag
// 异步发送:不阻塞主线程
rocketMQTemplate.asyncSend(mqDestination, buildMessage(payload), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("任务发送成功,消息ID:{}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
// 发送失败兜底:更新数据库状态为 FAILED
onSendFailed(payload, truncateError(e.getMessage()));
}
});
}
关键设计:即使 MQ 发送失败,也有 onSendFailed 回调更新数据库状态,用户刷新页面能看到"处理失败",可手动重试。
4.2 价值二:削峰填谷,保护系统资源
场景:1000 个用户同时上传视频
流量特征:
- 上传请求瞬时峰值:1000 QPS(持续 10 秒)
- 实际处理能力:FFmpeg 最多 4 并发,ASR 接口限流 10 QPS
无 MQ 时:
1000 个请求 ──► 1000 个 FFmpeg 进程启动 ──► 系统 OOM 崩溃
有 MQ 时:
1000 个请求 ──► 快速入队(Broker 缓冲)──► 消费者按能力消费
│
└── 消费者线程池:core=2, max=4, queue=50
└── FFmpeg 最多 4 个并发,系统稳定
线程池配置:
@Configuration
@EnableAsync
public class AsyncExecutorConfig {
@Bean("mediaProcessExecutor")
public Executor mediaProcessExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2); // 核心 2 线程
executor.setMaxPoolSize(4); // 最大 4 线程(保护 FFmpeg)
executor.setQueueCapacity(50); // 队列缓冲 50 任务
executor.setThreadNamePrefix("media-process-");
return executor;
}
}
4.3 价值三:可靠投递,失败自动重试
三级可靠性保障:
| 层级 | 机制 | 实现 |
|---|---|---|
| Broker 级 | 消息持久化 + 主从复制 | RocketMQ 默认配置 |
| 发送级 | 发送失败回调兜底 | onSendFailed 更新数据库 |
| 消费级 | 业务失败重试 3 次 | 抛异常触发 RocketMQ 重试 |
消费端重试代码:
@Slf4j
@Component
@RocketMQMessageListener(
topic = "video-analysis-topic",
selectorExpression = "analyze-task",
consumerGroup = "media-analyze-consumer-group"
)
public class AnalyzeRocketMQConsumer implements RocketMQListener<Map<String, String>> {
private static final int MAX_RETRY_COUNT = 3;
@Override
public void onMessage(Map<String, String> message) {
int retryCount = parseRetryCount(message);
AnalyzePayload payload = parsePayload(message);
try {
markProcessing(payload); // 状态:PROCESSING
long costMs = processBusiness(payload); // 执行业务
markCompleted(payload); // 状态:SUCCESS
} catch (Exception e) {
if (retryCount < MAX_RETRY_COUNT) {
// 抛异常 → RocketMQ 自动重试(间隔递增:1s, 5s, 10s)
throw new RuntimeException("触发重试", e);
} else {
// 重试耗尽 → 标记失败,用户可手动重试
markFailed(payload, "已重试" + retryCount + "次: " + e.getMessage());
}
}
}
}
重试策略可视化:
第 1 次消费 ──► 失败 ──► 1 秒后重试
第 2 次消费 ──► 失败 ──► 5 秒后重试
第 3 次消费 ──► 失败 ──► 10 秒后重试
第 4 次消费 ──► 失败 ──► 进入死信队列(DLQ)/ 标记 FAILED
4.4 价值四:独立扩展,功能自由叠加
当前消费者组:
| 业务 | Topic | Tag | Consumer Group | 职责 |
|---|---|---|---|---|
| 媒体分析 | video-analysis-topic |
analyze-task |
media-analyze-consumer-group |
转写 + AI 总结 |
| 知识库构建 | knowledgebase-vectorize-topic |
vectorize |
knowledgebase-vectorize-consumer-group |
向量化入库 |
| 测验评估 | quiz-evaluate-topic |
evaluate |
quiz-evaluate-consumer-group |
AI 批改答案 |
扩展场景:新增"智能封面生成"功能
// 新增生产者(无需改上传接口)
@Component
public class CoverGenerateProducer extends AbstractRocketMQProducer<CoverPayload> {
@Override protected String topic() { return "cover-generate-topic"; }
@Override protected String tag() { return "generate"; }
}
// 新增消费者(独立部署,互不影响)
@RocketMQMessageListener(
topic = "cover-generate-topic",
selectorExpression = "generate",
consumerGroup = "cover-generate-consumer-group"
)
public class CoverGenerateConsumer implements RocketMQListener<Map<String, String>> {
// 调用 Stable Diffusion / DALL-E 生成封面
}
价值:上传接口完全不用改,新增功能独立开发、独立部署、独立扩缩容。
五、关键设计细节
5.1 问题一:异步重度依赖 MQ,消息积压、消费者消费不过来怎么监控和处理?
在 项目中,异步链路完全依赖 RocketMQ,一旦出现消息积压(消费者消费速度低于生产者发送速度),会导致用户任务处理延迟、超时,严重影响体验。核心解决思路是「监控先行,分层处理」,具体落地方案如下:
5.1.1 消息积压监控(提前预警,避免爆发)
基于 RocketMQ 自带监控能力 + 自定义告警,实现全链路监控,重点关注 3 个核心指标:
表格
| 监控指标 | 说明 | 阈值建议 |
|---|---|---|
| 队列堆积数 | 每个 Topic 的每个 Queue 中未消费消息数 | 消费者最大并发数 × 10(如 4 并发则设 40) |
| 消费堆积时间 | 最早未消费消息的存储时间 | 30 分钟(超过则用户任务延迟明显) |
| 消费速率 | 消费者每秒消费消息数 | 低于发送速率的 80% 时触发预警 |
实现方式:
-
利用 RocketMQ Console 可视化监控,实时查看队列堆积、消费速率等指标;
-
通过 RocketMQ 提供的 API 自定义监控采集,代码示例:
// 采集队列堆积数示例
@Autowired
private MQAdminExt mqAdminExt;
public void collectQueueBacklog() throws MQClientException {
String topic = "video-analysis-topic";
List<MessageQueue> queues = mqAdminExt.fetchMessageQueuesInTopic(topic);
for (MessageQueue queue : queues) {
// 获取队列最大偏移量(已发送消息数)
long maxOffset = mqAdminExt.maxOffset(queue);
// 获取队列已消费偏移量(已处理消息数)
long consumeOffset = mqAdminExt.getConsumerOffset(
queue.getTopic(),
queue.getConsumerGroup()
);
// 堆积数 = 最大偏移量 - 已消费偏移量
long backlog = maxOffset - consumeOffset;
// 触发告警(如钉钉、企业微信通知)
if (backlog > 40) {
alarmService.sendAlarm(
"MQ消息积压告警",
"Topic:" + topic + ", Queue:" + queue.getQueueId() + ", 堆积数:" + backlog
);
}
}
}
5.1.2 消息积压处理
根据积压程度,分 3 级 处理,避免小问题扩大为系统故障:
表格
| 级别 | 积压范围 | 处理策略 |
|---|---|---|
| 轻度积压 | 堆积数 < 100 | 扩容消费者并发数,动态调整线程池配置(无需重启服务) |
| 中度积压 | 100 ≤ 堆积数 < 1000 | 部署临时消费者节点,新增相同 Consumer Group 的消费者分担压力 |
| 重度积压 | 堆积数 ≥ 1000 | 暂停非核心生产者,优先处理核心任务;同时优化消费逻辑 |
轻度积压处理代码示例:
// 动态调整消费者线程池(核心代码)
@Autowired
private ThreadPoolTaskExecutor mediaProcessExecutor;
public void scaleConsumerThreadPool(int coreSize, int maxSize) {
mediaProcessExecutor.setCorePoolSize(coreSize); // 动态增加核心线程
mediaProcessExecutor.setMaxPoolSize(maxSize); // 动态增加最大线程
}
说明: 中度积压时,RocketMQ 会自动负载均衡,将 Queue 分配给新增节点;重度积压时需优先保障核心链路(音频提取、ASR 转写)。
5.2 问题二:使用 RocketMQ 做业务解耦,视频上传多步骤是否需要顺序处理?如何实现 RocketMQ 消息顺序消费?
核心结论: 视频上传的多步骤(音频提取 → ASR 转写 → AI 总结 → 知识库构建)必须保证顺序处理,但无需全局顺序,只需保证「同一视频」的步骤顺序即可;不同视频的步骤可并行处理,兼顾顺序性和吞吐量。
5.2.1 为什么必须保证顺序?
视频处理的多步骤存在强依赖关系:
音频提取 → ASR 转写 → AI 总结 → 知识库构建
↑ ↑ ↑
未完成则无法进行下一步,若顺序错乱会导致:
- 任务失败(如用未转写完成的空文本调用 AI 接口)
- 产生脏数据
5.2.2 RocketMQ 顺序消费实现方案
RocketMQ 顺序消费的核心是「同一 Sharding Key 的消息进入同一 Queue,单线程消费」,结合项目场景,最优实现方案如下:
| 步骤 | 实现要点 | 具体说明 |
|---|---|---|
| 1. 选择 Sharding Key | 以 mediaId(视频 ID)作为 Sharding Key |
确保同一视频的所有步骤消息进入同一 Queue;不同视频的消息可进入不同 Queue,实现并行处理 |
| 2. 生产者发送顺序消息 | 发送消息时指定 Sharding Key | msg.setKeys(mediaId) 或使用顺序消息 API |
| 3. 消费者配置顺序消费 | 设置消费模式为 ORDERLY |
确保同一 Queue 单线程消费 |
| 4. 优化:避免单 Queue 阻塞 | Topic 配置多个 Queue(建议 8 个) | RocketMQ 根据 mediaId 哈希值分配 Queue,避免单个视频长时间任务阻塞全局处理;同时设置消费超时时间,防止单个任务卡住 Queue |
5.3 问题三:RocketMQ 分片处理视频、保证顺序一致,会对 AI 解析总结结果产生影响吗?
核心结论: 只要保证「同一视频」的处理步骤顺序一致,RocketMQ 分片处理(多 Queue 并行)不会对 AI 解析总结结果产生任何影响;反而能提升整体处理效率,避免单视频处理阻塞全局。
5.3.1 分片处理的核心逻辑(不影响顺序)
┌─────────────────────────────────────────────────────────┐
│ RocketMQ 分片处理架构 │
├─────────────────────────────────────────────────────────┤
│ Queue 1 (mediaId=hash(A)) → 视频A: 提取→转写→总结→构建 │
│ Queue 2 (mediaId=hash(B)) → 视频B: 提取→转写→总结→构建 │
│ Queue 3 (mediaId=hash(C)) → 视频C: 提取→转写→总结→构建 │
│ ... │
│ 同一 mediaId 始终分配到同一个 Queue,保证步骤顺序 │
│ 不同 Queue 之间并行处理,互不影响 │
└─────────────────────────────────────────────────────────┘
-
分片依据:通过
mediaId的哈希值分配 Queue,同一mediaId始终分配到同一个 Queue,保证其步骤顺序; -
并行逻辑:不同 Queue 之间并行处理,互不影响(如 Queue1 处理视频A的步骤,Queue2 处理视频B的步骤);
-
顺序保证:同一 Queue 内的消息单线程消费,确保视频A的「音频提取 → ASR 转写 → AI 总结」步骤依次执行。
5.3.2 对 AI 解析总结结果无影响的关键原因
| 原因 | 说明 |
|---|---|
| 数据隔离 | 每个视频的处理数据(音频文件、转写文本、AI 总结结果)都通过 mediaId 关联,存储在独立的数据库记录中,分片处理不会导致数据混淆 |
| 顺序闭环 | AI 解析总结依赖的前置数据(ASR 转写文本),会在顺序消费中确保已完成生成,不会出现「AI 总结先执行、转写未完成」的情况 |
| 无状态消费 | AI 解析总结是无状态操作,仅依赖当前视频的转写文本,与其他视频的处理进度、分片位置无关,不会因分片并行产生干扰 |
5.3.3 潜在风险与规避方案
⚠️ 唯一潜在风险:若 Queue 分配不均(如某 Queue 分配的视频数量过多、单个视频处理耗时过长),可能导致该 Queue 内的视频 AI 总结延迟,但不会影响结果准确性。
规避方案:
-
合理配置 Queue 数量:建议等于消费者节点数 × 2,确保负载均衡;
-
长耗时视频隔离:对超过 10 分钟的视频单独分配 Topic 或 Queue,避免阻塞普通视频;
-
实时监控调整:通过监控实时关注各 Queue 的处理进度,及时调整消费者并发。
六、替代方案对比:为什么选 RocketMQ?
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Spring @Async | 简单,无外部依赖 | 应用重启任务丢失、无重试、无削峰 | 轻量级异步 |
| 数据库轮询 | 简单可靠 | 延迟高(秒级)、数据库压力大 | 低频任务 |
| Redis 队列 | 轻量、高性能 | 无消费确认、无重试、无轨迹 | 简单队列 |
| Kafka | 吞吐极高、生态丰富 | 运维重、延迟较高、Spring 集成较复杂 | 日志、大数据 |
| RocketMQ | 功能完备、延迟低、Spring 集成好 | 社区较 Kafka 小 | 金融级业务消息 |
项目选择 RocketMQ 的关键原因:
- 功能匹配:事务消息、顺序消息、延时消息、消息轨迹,未来都可能用到
- 延迟敏感:AI 分析任务需要低延迟调度(Kafka 更适合高吞吐日志)
- Spring 生态:
rocketmq-spring-boot-starter注解驱动,开发效率高 - 阿里云原生:若上云,无缝对接阿里云 RocketMQ 托管服务
七、总结:RocketMQ 重构前后对比
┌─────────────────────────────────────────────────────────────┐
│ 没有 RocketMQ 的 项目 │
│ ├── 用户上传 500MB 视频,等待 8 分钟,最后超时失败 │
│ ├── 10 人同时上传,服务器 CPU 100%,全部失败 │
│ ├── AI 接口 1 次超时,整个任务重来,用户体验极差 │
│ └── 新增"智能封面"功能,必须修改上传接口,回归测试两周 │
├─────────────────────────────────────────────────────────────┤
│ 有 RocketMQ 的 项目 │
│ ├── 用户 1 秒得到响应,关闭页面,稍后收到完成通知 │
│ ├── 1000 人同时上传,系统稳如泰山,按序处理 │
│ ├── AI 接口失败自动重试 3 次,最终失败用户可一键重试 │
│ └── 新增功能独立扩展,2 天开发上线,零影响现有功能 │
└─────────────────────────────────────────────────────────────┘
RocketMQ 在 项目中的定位:不是"锦上添花",而是支撑整个 AI 处理链路的骨架。没有它,这个系统无法从"Demo 级"进化到"生产级"。
参考代码
完整项目代码结构:
src/main/java/com/example/videoseek/
├── common/
│ ├── async/
│ │ └── AbstractRocketMQProducer.java # 通用生产者抽象
│ └── constant/
│ └── AsyncTaskStreamConstants.java # MQ 常量配置
├── modules/
│ ├── media/
│ │ └── listener/
│ │ ├── AnalyzeRocketMQProducer.java # 媒体分析生产者
│ │ └── AnalyzeRocketMQConsumer.java # 媒体分析消费者
│ ├── knowledgebase/
│ │ └── listener/
│ │ ├── VectorizeRocketMQProducer.java # 向量化生产者
│ │ └── VectorizeRocketMQConsumer.java # 向量化消费者
│ └── quiz/
│ └── listener/
│ ├── QuizEvaluateRocketMQProducer.java # 测验评估生产者
│ └── QuizEvaluateRocketMQConsumer.java # 测验评估消费者
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)