如何让 AI 优雅的批量处理文档摘要?详细实战流程图带你搞懂批量文档处理(带崩溃恢复详细过程)
假设你用 AI 批量处理 100 个 PDF 文档,程序跑了一半突然崩溃。重启后,你是从头开始还是从断点继续?这篇文章详细聊聊我们的实现方案。
一、问题背景
1.1 一个典型的批处理场景
用户上传了 3 个文档,想用 AI 生成摘要:
| 文档 | 名称 | 内容长度 | 处理方式 |
|---|---|---|---|
| 文档1 | report.pdf | 5,000字 | 太短了,直接扔给 AI |
| 文档2 | thesis.docx | 50,000字 | 需要分块处理 |
| 文档3 | book.pdf | 200,000字 | 分块更多,处理更久 |
看起来简单,但有几个问题:
问题1:AI 有长度限制
50,000 字的文档,直接扔给 AI 会超时或报错。必须切成小块,每块 2000 字左右,逐块处理再合并。
问题2:并发太多会崩
如果 100 个分块同时调 AI,API 会拒绝或超时。需要控制并发数量。
问题3:崩溃后怎么办
处理 50,000 字的文档要几分钟,期间程序可能崩溃。重启后,已处理的那些分块怎么办?
二、整体架构设计
2.1 三个层级的关系
我们设计了三层结构:任务 → 文档 → 分块。
任务(用户一次请求)
│
├── 文档1 ── 不分块,直接摘要
│
└── 文档2 ── 分块1, 分块2, ... 分块25
│
└── 文档3 ── 分块1, 分块2, ... 分块100
为什么要三层?因为:
- 用户关心的是"任务"(一次请求)
- 实际处理的是"文档"(每个文件)
- 大文档要拆成"分块"(AI 能处理的片段)
2.2 核心组件
| 组件 | 作用 |
|---|---|
| DocSummaryTaskRunner | 总调度器,负责执行、并发控制、崩溃恢复 |
| DocSummaryTaskRecoveryJob | 应用启动时扫描未完成任务,自动恢复 |
| DocSummaryTaskService | 任务状态管理,比如"处理中"、“已完成” |
| DocSummaryDocumentService | 文档状态管理 |
| DocSummaryChunkMapper | 分块的增删改查 |
| DocSummaryAiService | 调 AI 生成摘要、合并摘要 |
2.3 数据库表结构
doc_summary_task -- 任务表(记录用户请求)
│ 一对多
↓
doc_summary_document -- 文档表(每个上传的文件)
│ 一对多
↓
doc_summary_chunk -- 分块表(大文档的每个片段)
│ 处理完成后
↓
doc_summary_result -- 文档摘要结果表
│ 合并后
↓
doc_summary_task_result -- 任务整体摘要表
关键设计:分块表里有 summary 字段,用来保存每个分块的摘要。这就是崩溃恢复的基础。
三、正常执行流程
3.1 从任务开始:execute()
当用户提交任务后,系统会调用 execute(taskId):
第一步:原子获取执行权
tryStartExecution(taskId)
为什么要"原子"?因为可能有两个触发点:
- RecoveryJob 启动时恢复任务
- 用户手动点击"重新执行"
只有状态是 PROCESSING 的任务才能获取执行权
防止同一个任务被执行两次
第二步:准备资源
获取任务实体(从数据库)
获取文档列表(这个任务包含哪些文件)
第三步:创建并发控制资源
线程池 ExecutorService(控制文档并发数)
信号量 Semaphore(控制分块并发数)
为什么要两层?后面详细说
第四步:并发处理每个文档
for each document:
CompletableFuture.runAsync(() -> processDocument(doc))
不用等第一个文档处理完,就开始处理第二个
第五步:等所有文档完成
CompletableFuture.allOf(futures).join()
阻塞等待,直到所有文档都处理完
第六步:清理资源
关闭线程池
移除信号量
3.2 处理单个文档:processDocument()
每个文档的处理流程:
第一步:推送事件 DOCUMENT_START
前端收到后,UI 显示"正在处理 report.pdf"
第二步:更新状态 PENDING → PARSING
数据库记录:这个文档正在解析
第三步:复制库文件(如果是从知识库选的)
用户可能选了库里已有的文件
要先复制到任务目录
第四步:解析文档内容
用 FileParserFactory 找对应的解析器
pdf → PDFParser
docx → DocxParser
txt → TxtParser
解析后得到纯文本:"本研究报告探讨了..."(50000字)
第五步:判断大小级别
如果 ≤ 8000字 → SMALL(直接摘要)
如果 ≤ 100000字 → MEDIUM(分块处理)
如果 > 100000字 → LARGE(分块处理)
第六步:根据大小选择策略
SMALL → processSmallDocument()
MEDIUM/LARGE → processChunkedDocument()
第七步:推送事件 DOCUMENT_DONE
前端显示"report.pdf 处理完成"
3.3 小文档:直接扔给 AI
小文档处理流程:
│
├─► 调用 AI 服务
│ aiService.generateSummary(content)
│ AI 返回:"本报告主要结论是..."
│
├─► 保存结果
│ 插入 doc_summary_result 表
│ 记录:docId=1, summary="本报告主要..."
│
└─► 检查任务是否全部完成
checkAndSaveTaskResult()
如果所有文档都处理完了,生成任务整体摘要
小文档很简单,一步到位。
3.4 大文档:分块处理
这才是重点。假设文档有 50,000 字:
分块处理流程:
│
├─► 第一步:查询已有分块
│ chunkMapper.selectList(documentId)
│
│ 为什么先查?因为可能是崩溃恢复!
│ 如果数据库里已经有分块记录,说明之前处理过
│
│ 如果 existingChunks 是空 → 正常流程
│ 如果 existingChunks 不空 → 崩溃恢复流程
│
├─► 正常流程:
│ │
│ ├─► 更新文档状态: PARSING → CHUNKING
│ │ 数据库记录:文档正在分块
│ │
│ ├─► 执行分块策略
│ │ StructureChunkStrategy.chunk(content, config)
│ │ chunkSize=2000(每块2000字)
│ │ overlap=200(前后重叠200字保持连贯)
│ │
│ │ 返回 25 个分块
│ │
│ ├─► 创建分块实体,插入数据库
│ │ for each chunk:
│ │ DocSummaryChunk entity = new DocSummaryChunk()
│ │ entity.setContent(chunk.getContent())
│ │ entity.setOverlapBefore(chunk.getOverlapBefore())
│ │ entity.setBoundaryContext(chunk.getBoundaryContext())
│ │ entity.setStatus(PENDING)
│ │ chunkMapper.insert(entity) ← 立即落盘!
│ │
│ │ 为什么立即插入?因为要支持崩溃恢复
│ │ 如果崩溃,分块记录还在数据库里
│ │
│ ├─► 推送事件 CHUNK_CREATED
│ │ 前端显示:"创建分块 1/25"
│ │ 前端显示:"创建分块 2/25"
│ │ ...
│ │
│ ├─► 更新文档状态: CHUNKING → SUMMARIZING
│ │ 分块创建完了,开始处理摘要
│ │
│ ├─► 异步处理每个分块(并发控制)
│ │ for each chunk:
│ │ semaphore.acquire() ← 获取许可(限制并发)
│ │ CompletableFuture.supplyAsync(() -> processChunk(chunk))
│ │ semaphore.release() ← 释放许可
│ │
│ ├─► 等待所有分块完成
│ │ futures.stream().map(CompletableFuture::join)
│ │ 阻塞等待,收集所有摘要
│ │
│ ├─► AI 合并所有摘要
│ │ 25个分块摘要 → 用分隔符连接
│ │ "第一章...\n\n---\n\n第二章...\n\n---\n\n..."
│ │ 调 AI 再总结一次 → "本文主要研究了..."
│ │
│ └─► 保存文档结果
│ saveDocumentResult()
│
└─► 崩溃恢复流程:(后面详细说)
3.5 处理单个分块:processChunk()
这是最底层的方法,每个分块都要经过这个流程:
processChunk(chunk)
│
├─► 更新分块状态: PENDING → SUMMARIZING
│ chunkMapper.updateById(chunk)
│ 数据库记录:这个分块正在处理
│
├─► 构建完整内容(含上下文)
│ StringBuilder sb = new StringBuilder()
│
│ if (overlapBefore != null):
│ sb.append("[前文上下文]\n" + overlapBefore + "\n\n")
│
│ sb.append(content) ← 分块主要内容
│
│ if (boundaryContext != null):
│ sb.append("\n\n[边界上下文]\n" + boundaryContext)
│
│ 为什么要有上下文?保持语义连贯
│ AI 看到的不只是这块内容,还有前后关联
│
├─► AI 生成摘要
│ aiService.generateSummaryWithPrompt(fullContent, CHUNK_SUMMARY_PROMPT)
│ 返回:"本章节介绍了..."
│
├─► 更新状态并保存摘要
│ chunk.setStatus(COMPLETED)
│ chunk.setSummary(summary) ← 关键!摘要落盘
│ chunkMapper.updateById(chunk)
│
│ 摘要保存到数据库,崩溃后可以复用
│
├─► 推送事件 CHUNK_SUMMARY
│ 前端显示:"分块 3 摘要完成"
│
└─► 返回摘要
后续用来合并
关键点:chunk.setSummary(summary) 把摘要存到数据库。
四、崩溃恢复流程
4.1 应用启动时自动扫描
Spring 有个注解 @PostConstruct,标记的方法会在 Bean 初始化后自动执行。我们用这个来扫描未完成任务:
@Component
public class DocSummaryTaskRecoveryJob {
@PostConstruct // 应用启动后自动执行
public void recoverUnfinishedTasks() {
// 查询未完成任务
// status 是 PENDING、PROCESSING、MERGING、FINALIZING 的都算未完成
for (DocSummaryTask task : unfinishedTasks) {
// 检查是否过期
// 如果 lastActiveTime 超过 24 小时,标记为过期
if (isTaskExpired(task)) {
taskService.markExpired(task.getId());
// 过期了就不恢复,可能数据已经失效
} else {
taskRunner.resumeFromCheckpoint(task.getId());
// 恢复执行
}
}
}
}
为什么要检查过期?因为:
- 可能任务创建后根本没开始执行
- 可能任务的数据源已经变了
- 超过 24 小时,认为数据时效性过期,不再恢复
4.2 任务恢复入口:resumeFromCheckpoint()
resumeFromCheckpoint(taskId)
│
├─► 原子获取执行权
│ tryStartExecution(taskId)
│
│ 为什么又要原子操作?
│ 因为 RecoveryJob 和用户手动点击可能同时触发
│ 只有一个能成功获取执行权
│
├─► 查询未完成的文档
│ documentMapper.selectList()
│ WHERE task_id = ? AND status NOT IN ('COMPLETED', 'FAILED')
│
│ 跳过已完成的,跳过失败的
│ 只恢复那些"处理中途"的文档
│
├─► 检查是否全部完成
│ if (unfinishedDocs.isEmpty()):
│ checkAndSaveTaskResult()
│ markCompleted()
│ return
│
│ 特殊场景:所有文档都完成了,但任务结果没生成
│ 可能是崩溃时刚好在生成任务结果
│ 补上这一步就行
│
├─► 创建并发资源
│ ExecutorService(线程池)
│ Semaphore(信号量)
│
├─► 并发处理未完成的文档
│ for each doc in unfinishedDocs:
│ CompletableFuture.runAsync(() -> processDocument(doc))
│
└─► 等待完成,标记任务完成
4.3 分块级崩溃恢复:关键逻辑
当 processDocument() 处理一个文档时,会调用 processChunkedDocument()。这个方法会先查数据库:
查询已有分块
│
│ chunkMapper.selectList(documentId)
│ 返回:Chunk 1-10(已有记录)
│
├─► 判断是否是崩溃恢复
│ if (existingChunks.isEmpty()) → 正常流程
│ else → 崩溃恢复流程
│
├─► 崩溃恢复流程:
│ │
│ ├─► 收集已完成分块的摘要
│ │ allSummaries = []
│ │
│ │ for each chunk in existingChunks:
│ │ if (chunk.status == COMPLETED && chunk.summary != null):
│ │ allSummaries.add(chunk.getSummary()) ← 复用数据库保存的
│ │
│ │ 关键:summary 字段是之前保存的
│ │ 直接读出来,不用重新处理
│ │
│ ├─► 筛选未完成的分块
│ │ pendingChunks = existingChunks.filter(c -> c.status != COMPLETED)
│ │
│ │ 可能包括:
│ │ - status=SUMMARIZING(崩溃时正在处理)
│ │ - status=PENDING(还没开始)
│ │ - status=FAILED(之前失败)
│ │
│ ├─► 处理未完成的分块
│ │ for each pendingChunk:
│ │ semaphore.acquire()
│ │ processChunk(pendingChunk) ← 重新处理
│ │ semaphore.release()
│ │
│ │ 处理完成后,每个分块的 summary 被保存
│ │
│ ├─► 合并所有摘要
│ │ newSummaries = 处理未完成分块得到的新摘要
│ │ allSummaries.addAll(newSummaries)
│ │
│ │ 最终有完整的摘要集合
│ │
│ ├─► AI 合并
│ │ mergeSummariesWithAi(allSummaries)
│ │
│ └─► 保存文档结果
五、并发控制:两层"关卡"
5.1 为什么需要两层控制?
假设一个任务有 3 个文档:
- 文档1:不分块,直接处理
- 文档2:25 个分块
- 文档3:100 个分块
如果只控制文档并发(比如同时处理 2 个文档),那文档2 和文档3 开始后,会有 125 个分块同时调 AI。AI API 会受不了。
所以需要两层:
第一层:文档并发控制
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentDocs)
比如 maxConcurrentDocs = 2
同时最多处理 2 个文档
第二层:分块并发控制
Semaphore semaphore = new Semaphore(maxConcurrentChunks)
比如 maxConcurrentChunks = 10
同一个任务里,最多 10 个分块同时调 AI
5.2 信号量的使用方式
CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire(); // 获取许可,如果没有就等待
return processChunk(taskId, chunk);
} finally {
semaphore.release(); // 处理完一定要释放
}
});
为什么用 try-finally?因为 processChunk() 可能抛异常,如果不释放信号量,后续分块会永远等待。
5.3 并发资源的管理
每个任务开始时创建:
taskExecutors.put(taskId, executor); // 缓存线程池
chunkSemaphores.put(taskId, semaphore); // 缓存信号量
任务结束时清理:
private void cleanup(Long taskId) {
ExecutorService executor = taskExecutors.remove(taskId);
if (executor != null) {
executor.shutdown(); // 关闭线程池
}
chunkSemaphores.remove(taskId); // 移除信号量
}
六、状态流转:像流水线一样
6.1 任务状态
PENDING → 刚创建,还没开始
PROCESSING → 正在处理
COMPLETED → 全部完成
FAILED → 处理失败
EXPIRED → 中断超过24小时,不再恢复
6.2 文档状态
PENDING → 还没处理
PARSING → 正在解析文件内容
CHUNKING → 正在分块(大文档)
SUMMARIZING → 正在生成摘要
COMPLETED → 处理完成
FAILED → 处理失败
为什么这么多状态?因为崩溃恢复需要知道"停在哪里":
- 文档是
SUMMARIZING→ 分块已创建,直接查分块恢复 - 文档是
CHUNKING→ 分块正在创建,重新解析创建分块 - 文档是
PARSING→ 刚开始,重新解析文档
6.3 分块状态
PENDING → 分块已创建,还没处理
SUMMARIZING → 正在调用 AI
COMPLETED → 处理完成,summary 已保存
FAILED → 处理失败
七、崩溃恢复实例:完整演示
7.1 场景设定
用户上传了 2 个大文档,每个文档分 10 块:
任务 ID=123
│
├── Doc1(10个分块)
│ Chunk 1-10
│
└── Doc2(10个分块)
Chunk 1-10
7.2 处理到一半崩溃
假设 Doc1 处理到 Chunk 5 时,应用突然关闭:
数据库状态(崩溃时):
doc_summary_chunk 表(Doc1 的分块):
| chunk_index | status | summary |
|-------------|-------------|-------------|
| 1 | COMPLETED | "摘要内容1" | ← 已完成,有摘要
| 2 | COMPLETED | "摘要内容2" | ← 已完成,有摘要
| 3 | COMPLETED | "摘要内容3" | ← 已完成,有摘要
| 4 | COMPLETED | "摘要内容4" | ← 已完成,有摘要
| 5 | SUMMARIZING | NULL | ← 崩溃时正在处理,没有摘要
| 6 | PENDING | NULL | ← 还没开始
| 7 | PENDING | NULL | ← 还没开始
| 8 | PENDING | NULL | ← 还没开始
| 9 | PENDING | NULL | ← 还没开始
| 10 | PENDING | NULL | ← 还没开始
doc_summary_document 表:
| doc_id | status |
|--------|---------------|
| Doc1 | SUMMARIZING | ← 处理中途
| Doc2 | PENDING | ← 还没开始
doc_summary_task 表:
| task_id | status | last_active_time |
|---------|-------------|-------------------------|
| 123 | PROCESSING | 2026-05-03 10:30:00 |
7.3 应用重启后的恢复流程
应用启动
↓
@PostConstruct: DocSummaryTaskRecoveryJob.recoverUnfinishedTasks()
↓
查询未完成任务: task_id=123, status=PROCESSING
↓
检查过期: lastActiveTime 是 10:30,现在是 10:45
中断 15 分钟,小于 24 小时,未过期
↓
taskRunner.resumeFromCheckpoint(123)
↓
【第一阶段:查询未完成文档】
│
│ SELECT * FROM doc_summary_document
│ WHERE task_id=123 AND status NOT IN ('COMPLETED', 'FAILED')
│
│ 结果:Doc1(SUMMARIZING), Doc2(PENDING)
│
↓
【第二阶段:并发处理这两个文档】
│
├─► Thread-1: processDocument(123, Doc1)
│ │
│ ├─► 推送事件 DOCUMENT_START
│ │ 前端:"正在恢复处理 Doc1"
│ │
│ ├─► 更新状态: SUMMARIZING → PARSING
│ │ 注意:这里会重新解析文档内容
│ │ (这是当前的实现,实际上可以优化)
│ │
│ ├─► 解析文档内容(读取文件)
│ │ 得到 50000 字的文本
│ │
│ ├─► 判断大小级别: MEDIUM(需要分块)
│ │
│ ├─► processChunkedDocument(123, Doc1, content)
│ │ │
│ │ ├─► 查询已有分块
│ │ │ chunkMapper.selectList(documentId=Doc1)
│ │ │ 返回 10 个分块记录
│ │ │
│ │ ├─► 崩溃恢复路径(existingChunks 不为空)
│ │ │ │
│ │ │ ├─► 收集已完成摘要
│ │ │ │ allSummaries = []
│ │ │ │
│ │ │ │ Chunk 1: status=COMPLETED, summary="摘要1"
│ │ │ │ → allSummaries.add("摘要1") ✓
│ │ │ │
│ │ │ │ Chunk 2: status=COMPLETED, summary="摘要2"
│ │ │ │ → allSummaries.add("摘要2") ✓
│ │ │ │
│ │ │ │ Chunk 3: status=COMPLETED, summary="摘要3"
│ │ │ │ → allSummaries.add("摘要3") ✓
│ │ │ │
│ │ │ │ Chunk 4: status=COMPLETED, summary="摘要4"
│ │ │ │ → allSummaries.add("摘要4") ✓
│ │ │ │
│ │ │ │ Chunk 5: status=SUMMARIZING, summary=NULL
│ │ │ │ → 不添加,没有摘要
│ │ │ │
│ │ │ │ Chunk 6-10: status=PENDING, summary=NULL
│ │ │ │ → 不添加,没有摘要
│ │ │ │
│ │ │ │ 最终:allSummaries = ["摘要1", "摘要2", "摘要3", "摘要4"]
│ │ │ │ 复用了 4 个已保存的摘要!
│ │ │ │
│ │ │ ├─► 筛选未完成的分块
│ │ │ │ pendingChunks = existingChunks.filter(c -> c.status != COMPLETED)
│ │ │ │
│ │ │ │ Chunk 5: status=SUMMARIZING → 需要重新处理
│ │ │ │ Chunk 6-10: status=PENDING → 需要处理
│ │ │ │
│ │ │ │ pendingChunks = [Chunk 5, Chunk 6, Chunk 7, Chunk 8, Chunk 9, Chunk 10]
│ │ │ │ 共 6 个分块需要处理
│ │ │ │
│ │ │ ├─► 处理未完成的分块
│ │ │ │ for each pendingChunk:
│ │ │ │ semaphore.acquire()
│ │ │ │ processChunk(pendingChunk)
│ │ │ │ semaphore.release()
│ │ │ │
│ │ │ │ processChunk(Chunk 5):
│ │ │ │ - 状态: SUMMARIZING → COMPLETED
│ │ │ │ - 调 AI 生成摘要 → "摘要5"
│ │ │ │ - chunk.setSummary("摘要5")
│ │ │ │ - chunkMapper.updateById(chunk)
│ │ │ │
│ │ │ │ processChunk(Chunk 6-10):
│ │ │ │ - 状态: PENDING → SUMMARIZING → COMPLETED
│ │ │ │ - 生成摘要 → "摘要6" ~ "摘要10"
│ │ │ │ - 保存摘要
│ │ │ │
│ │ │ │ 处理完成,得到 6 个新摘要
│ │ │ │
│ │ │ ├─► 合并所有摘要
│ │ │ │ allSummaries.addAll(newSummaries)
│ │ │ │
│ │ │ │ allSummaries = [
│ │ │ │ "摘要1", "摘要2", "摘要3", "摘要4", ← 已完成的(复用)
│ │ │ │ "摘要5", "摘要6", "摘要7", "摘要8", "摘要9", "摘要10" ← 新处理的
│ │ │ │ ]
│ │ │ │ 共 10 个摘要,完整了
│ │ │ │
│ │ │ ├─► AI 最终合并
│ │ │ │ mergedContent = String.join("\n\n---\n\n", allSummaries)
│ │ │ │ "摘要1\n\n---\n\n摘要2\n\n---\n\n..."
│ │ │ │
│ │ │ │ aiService.generateSummaryWithPrompt(mergedContent, MERGE_PROMPT)
│ │ │ │ → "本文是一篇关于...的研究论文"
│ │ │ │
│ │ │ └─► 保存文档结果
│ │ │ saveDocumentResult(123, Doc1, "本文是一篇...")
│ │ │ 插入 doc_summary_result 表
│ │ │ Doc1 完成
│ │
│ ├─► 更新状态: SUMMARIZING → COMPLETED
│ │
│ └─► 推送事件 DOCUMENT_DONE
│ 前端:"Doc1 处理完成"
│
├─► Thread-2: processDocument(123, Doc2)
│ │
│ ├─► 正常流程(没有已有分块)
│ │ existingChunks 为空 → 创建新分块
│ │
│ ├─► 创建 10 个分块,插入数据库
│ │
│ ├─► 处理所有 10 个分块
│ │ Chunk 1-10: PENDING → SUMMARIZING → COMPLETED
│ │ 生成摘要并保存
│ │
│ ├─► AI 合并 → 文档摘要
│ │
│ └─► 保存结果
│ Doc2 完成
│
↓
【第三阶段:任务完成】
│
│ 所有文档都处理完了
│
├─► checkAndSaveTaskResult(123)
│ │
│ ├─► 查询所有文档摘要
│ │ doc_summary_result: Doc1摘要, Doc2摘要
│ │
│ ├─► AI 合成任务整体摘要
│ │ "本项目包含两个文档..."
│ │
│ └─► 保存 doc_summary_task_result
│
├─► taskService.markCompleted(123)
│ status = COMPLETED
│
└─► 推送事件 TASK_DONE
前端:"任务处理完成"
7.4 改进效果对比
之前的实现(摘要不落盘):
崩溃时:
Chunk 1-4: status=COMPLETED,但摘要只存在内存里
崩溃后摘要丢失
恢复后:
需要重新处理 Chunk 1-10(10个分块全部重做)
等于浪费了 4 次 AI 调用
现在的实现(摘要落盘):
崩溃时:
Chunk 1-4: status=COMPLETED, summary="摘要1~4"
摘要已保存到数据库
恢复后:
直接复用 Chunk 1-4 的摘要
只处理 Chunk 5-10(6个分块)
节省了 4 次 AI 调用
八、实时进度推送
处理过程中,前端需要知道进度。我们用 SSE(Server-Sent Events)推送事件:
| 事件类型 | 触发时机 | 前端显示 |
|---|---|---|
| TASK_START | 任务开始 | “开始处理任务,共3个文档” |
| DOCUMENT_START | 文档开始 | “正在处理 report.pdf” |
| PARSE_DONE | 解析完成 | “文档解析完成,50000字符” |
| CHUNK_CREATED | 分块创建 | “创建分块 1/25” |
| CHUNK_SUMMARY | 分块完成 | “分块 3 摘要完成” |
| DOCUMENT_DONE | 文档完成 | “report.pdf 处理完成” |
| TASK_DONE | 任务完成 | “任务处理完成” |
| TASK_FAILED | 任务失败 | “任务失败: xxx” |
前端收到这些事件,就能实时显示进度条和状态。
九、核心方法一览
| 方法 | 作用 |
|---|---|
execute(taskId) |
同步执行任务(阻塞等待完成) |
executeAsync(taskId) |
异步执行任务(不阻塞) |
processDocument(taskId, document) |
处理单个文档 |
processSmallDocument() |
小文档直接摘要 |
processChunkedDocument() |
大文档分块处理,支持崩溃恢复 |
processChunk(chunk) |
处理单个分块,摘要落盘 |
mergeSummariesWithAi() |
AI 合并多个摘要 |
checkAndSaveTaskResult() |
检查任务是否全部完成,生成整体摘要 |
resumeFromCheckpoint(taskId) |
崩溃恢复入口 |
stop(taskId) |
停止任务执行 |
十、数据库表详解
10.1 doc_summary_task(任务表)
| 字段 | 作用 |
|---|---|
| id | 任务ID |
| user_id | 用户ID |
| title | 任务标题 |
| status | 任务状态:PENDING/PROCESSING/COMPLETED/FAILED/EXPIRED |
| total_count | 文档总数 |
| completed_count | 已完成文档数 |
| last_active_time | 最后活动时间(判断过期) |
10.2 doc_summary_document(文档表)
| 字段 | 作用 |
|---|---|
| id | 文档ID |
| task_id | 所属任务ID |
| document_name | 文件名 |
| source_type | 来源:LOCAL/KNOWLEDGE_BASE |
| source_path | 源文件路径 |
| upload_path | 上传后的路径 |
| status | 文档状态 |
| size_level | 大小级别:SMALL/MEDIUM/LARGE |
10.3 doc_summary_chunk(分块表)
| 字段 | 作用 |
|---|---|
| id | 分块ID |
| task_id | 所属任务ID |
| document_id | 所属文档ID |
| chunk_index | 分块索引(第几块) |
| content | 分块内容 |
| summary | 分块摘要(关键字段) |
| status | 分块状态 |
| overlap_before | 前文重叠内容 |
| boundary_context | 边界上下文 |
10.4 doc_summary_result(文档摘要表)
| 字段 | 作用 |
|---|---|
| id | 结果ID |
| task_id | 所属任务ID |
| document_id | 文档ID |
| summary | 文档摘要(分块合并后的) |
| create_time | 创建时间 |
10.5 doc_summary_task_result(任务整体摘要表)
| 字段 | 作用 |
|---|---|
| id | 结果ID |
| task_id | 任务ID |
| overall_summary | 任务整体摘要(所有文档合并) |
| create_time | 创建时间 |
十一、总结:核心设计思想
11.1 分层并发
- 文档层:线程池控制,防止同时处理太多文档
- 分块层:信号量控制,防止同时调太多 AI API
11.2 原子操作
tryStartExecution():用数据库条件更新,防止重复执行- 只有一条线程能成功更新状态,获取执行权
11.3 状态落盘
- 每一步状态都保存到数据库
- 崩溃恢复时,根据状态判断"停在哪里"
11.4 摘要落盘
- 分块完成时,摘要保存到
chunk.summary字段 - 崩溃恢复时,直接读取已保存的摘要,复用而不重新处理
11.5 数据库统计
- 判断任务完成时,直接查数据库统计
- 不依赖内存计数,避免并发时的竞态问题
十二、适用场景
这个设计适用于:
- 批量文档处理
- 批量图片识别
- 批量数据导入/转换
- 任何"长任务 + 可分块 + 崩溃恢复"的场景
核心思想:把中间结果持久化,恢复时从断点继续。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)