假设你用 AI 批量处理 100 个 PDF 文档,程序跑了一半突然崩溃。重启后,你是从头开始还是从断点继续?这篇文章详细聊聊我们的实现方案。


一、问题背景

1.1 一个典型的批处理场景

用户上传了 3 个文档,想用 AI 生成摘要:

文档 名称 内容长度 处理方式
文档1 report.pdf 5,000字 太短了,直接扔给 AI
文档2 thesis.docx 50,000字 需要分块处理
文档3 book.pdf 200,000字 分块更多,处理更久

看起来简单,但有几个问题:

问题1:AI 有长度限制

50,000 字的文档,直接扔给 AI 会超时或报错。必须切成小块,每块 2000 字左右,逐块处理再合并。

问题2:并发太多会崩

如果 100 个分块同时调 AI,API 会拒绝或超时。需要控制并发数量。

问题3:崩溃后怎么办

处理 50,000 字的文档要几分钟,期间程序可能崩溃。重启后,已处理的那些分块怎么办?


二、整体架构设计

2.1 三个层级的关系

我们设计了三层结构:任务 → 文档 → 分块。

任务(用户一次请求)
    │
    ├── 文档1 ── 不分块,直接摘要
    │
    └── 文档2 ── 分块1, 分块2, ... 分块25
    │
    └── 文档3 ── 分块1, 分块2, ... 分块100

为什么要三层?因为:

  • 用户关心的是"任务"(一次请求)
  • 实际处理的是"文档"(每个文件)
  • 大文档要拆成"分块"(AI 能处理的片段)

2.2 核心组件

组件 作用
DocSummaryTaskRunner 总调度器,负责执行、并发控制、崩溃恢复
DocSummaryTaskRecoveryJob 应用启动时扫描未完成任务,自动恢复
DocSummaryTaskService 任务状态管理,比如"处理中"、“已完成”
DocSummaryDocumentService 文档状态管理
DocSummaryChunkMapper 分块的增删改查
DocSummaryAiService 调 AI 生成摘要、合并摘要

2.3 数据库表结构

doc_summary_task          -- 任务表(记录用户请求)
    │ 一对多
    ↓
doc_summary_document      -- 文档表(每个上传的文件)
    │ 一对多
    ↓
doc_summary_chunk         -- 分块表(大文档的每个片段)
    │ 处理完成后
    ↓
doc_summary_result        -- 文档摘要结果表
    │ 合并后
    ↓
doc_summary_task_result   -- 任务整体摘要表

关键设计:分块表里有 summary 字段,用来保存每个分块的摘要。这就是崩溃恢复的基础。


三、正常执行流程

3.1 从任务开始:execute()

当用户提交任务后,系统会调用 execute(taskId)

第一步:原子获取执行权
        tryStartExecution(taskId)
        
        为什么要"原子"?因为可能有两个触发点:
        - RecoveryJob 启动时恢复任务
        - 用户手动点击"重新执行"
        只有状态是 PROCESSING 的任务才能获取执行权
        防止同一个任务被执行两次

第二步:准备资源
        获取任务实体(从数据库)
        获取文档列表(这个任务包含哪些文件)

第三步:创建并发控制资源
        线程池 ExecutorService(控制文档并发数)
        信号量 Semaphore(控制分块并发数)
        为什么要两层?后面详细说

第四步:并发处理每个文档
        for each document:
            CompletableFuture.runAsync(() -> processDocument(doc))
        
        不用等第一个文档处理完,就开始处理第二个

第五步:等所有文档完成
        CompletableFuture.allOf(futures).join()
        
        阻塞等待,直到所有文档都处理完

第六步:清理资源
        关闭线程池
        移除信号量

3.2 处理单个文档:processDocument()

每个文档的处理流程:

第一步:推送事件 DOCUMENT_START
        前端收到后,UI 显示"正在处理 report.pdf"

第二步:更新状态 PENDING → PARSING
        数据库记录:这个文档正在解析

第三步:复制库文件(如果是从知识库选的)
        用户可能选了库里已有的文件
        要先复制到任务目录

第四步:解析文档内容
        用 FileParserFactory 找对应的解析器
        pdf → PDFParser
        docx → DocxParser
        txt → TxtParser
        
        解析后得到纯文本:"本研究报告探讨了..."(50000字)

第五步:判断大小级别
        如果 ≤ 8000字 → SMALL(直接摘要)
        如果 ≤ 100000字 → MEDIUM(分块处理)
        如果 > 100000字 → LARGE(分块处理)

第六步:根据大小选择策略
        SMALL → processSmallDocument()
        MEDIUM/LARGE → processChunkedDocument()

第七步:推送事件 DOCUMENT_DONE
        前端显示"report.pdf 处理完成"

3.3 小文档:直接扔给 AI

小文档处理流程:
    │
    ├─► 调用 AI 服务
    │       aiService.generateSummary(content)
    │       AI 返回:"本报告主要结论是..."
    │
    ├─► 保存结果
    │       插入 doc_summary_result 表
    │       记录:docId=1, summary="本报告主要..."
    │
    └─► 检查任务是否全部完成
            checkAndSaveTaskResult()
            如果所有文档都处理完了,生成任务整体摘要

小文档很简单,一步到位。

3.4 大文档:分块处理

这才是重点。假设文档有 50,000 字:

分块处理流程:
    │
    ├─► 第一步:查询已有分块
    │       chunkMapper.selectList(documentId)
    │       
    │       为什么先查?因为可能是崩溃恢复!
    │       如果数据库里已经有分块记录,说明之前处理过
    │       
    │       如果 existingChunks 是空 → 正常流程
    │       如果 existingChunks 不空 → 崩溃恢复流程

    │
    ├─► 正常流程:
    │       │
    │       ├─► 更新文档状态: PARSING → CHUNKING
    │       │       数据库记录:文档正在分块
    │       │
    │       ├─► 执行分块策略
    │       │       StructureChunkStrategy.chunk(content, config)
    │       │       chunkSize=2000(每块2000字)
    │       │       overlap=200(前后重叠200字保持连贯)
    │       │       
    │       │       返回 25 个分块
    │       │
    │       ├─► 创建分块实体,插入数据库
    │       │       for each chunk:
    │       │           DocSummaryChunk entity = new DocSummaryChunk()
    │       │           entity.setContent(chunk.getContent())
    │       │           entity.setOverlapBefore(chunk.getOverlapBefore())
    │       │           entity.setBoundaryContext(chunk.getBoundaryContext())
    │       │           entity.setStatus(PENDING)
    │       │           chunkMapper.insert(entity)  ← 立即落盘!
    │       │
    │       │       为什么立即插入?因为要支持崩溃恢复
    │       │       如果崩溃,分块记录还在数据库里
    │       │
    │       ├─► 推送事件 CHUNK_CREATED
    │       │       前端显示:"创建分块 1/25"
    │       │       前端显示:"创建分块 2/25"
    │       │       ...
    │       │
    │       ├─► 更新文档状态: CHUNKING → SUMMARIZING
    │       │       分块创建完了,开始处理摘要
    │       │
    │       ├─► 异步处理每个分块(并发控制)
    │       │       for each chunk:
    │       │           semaphore.acquire()  ← 获取许可(限制并发)
    │       │           CompletableFuture.supplyAsync(() -> processChunk(chunk))
    │       │           semaphore.release()  ← 释放许可
    │       │
    │       ├─► 等待所有分块完成
    │       │       futures.stream().map(CompletableFuture::join)
    │       │       阻塞等待,收集所有摘要
    │       │
    │       ├─► AI 合并所有摘要
    │       │       25个分块摘要 → 用分隔符连接
    │       │       "第一章...\n\n---\n\n第二章...\n\n---\n\n..."
    │       │       调 AI 再总结一次 → "本文主要研究了..."
    │       │
    │       └─► 保存文档结果
    │               saveDocumentResult()

    │
    └─► 崩溃恢复流程:(后面详细说)

3.5 处理单个分块:processChunk()

这是最底层的方法,每个分块都要经过这个流程:

processChunk(chunk)
    │
    ├─► 更新分块状态: PENDING → SUMMARIZING
    │       chunkMapper.updateById(chunk)
    │       数据库记录:这个分块正在处理
    │
    ├─► 构建完整内容(含上下文)
    │       StringBuilder sb = new StringBuilder()
    │       
    │       if (overlapBefore != null):
    │           sb.append("[前文上下文]\n" + overlapBefore + "\n\n")
    │       
    │       sb.append(content)  ← 分块主要内容
    │       
    │       if (boundaryContext != null):
    │           sb.append("\n\n[边界上下文]\n" + boundaryContext)
    │       
    │       为什么要有上下文?保持语义连贯
    │       AI 看到的不只是这块内容,还有前后关联
    │
    ├─► AI 生成摘要
    │       aiService.generateSummaryWithPrompt(fullContent, CHUNK_SUMMARY_PROMPT)
    │       返回:"本章节介绍了..."
    │
    ├─► 更新状态并保存摘要
    │       chunk.setStatus(COMPLETED)
    │       chunk.setSummary(summary)  ← 关键!摘要落盘
    │       chunkMapper.updateById(chunk)
    │       
    │       摘要保存到数据库,崩溃后可以复用
    │
    ├─► 推送事件 CHUNK_SUMMARY
    │       前端显示:"分块 3 摘要完成"
    │
    └─► 返回摘要
            后续用来合并

关键点:chunk.setSummary(summary) 把摘要存到数据库


四、崩溃恢复流程

4.1 应用启动时自动扫描

Spring 有个注解 @PostConstruct,标记的方法会在 Bean 初始化后自动执行。我们用这个来扫描未完成任务:

@Component
public class DocSummaryTaskRecoveryJob {
    
    @PostConstruct  // 应用启动后自动执行
    public void recoverUnfinishedTasks() {
        // 查询未完成任务
        // status 是 PENDING、PROCESSING、MERGING、FINALIZING 的都算未完成
        
        for (DocSummaryTask task : unfinishedTasks) {
            // 检查是否过期
            // 如果 lastActiveTime 超过 24 小时,标记为过期
            
            if (isTaskExpired(task)) {
                taskService.markExpired(task.getId());
                // 过期了就不恢复,可能数据已经失效
            } else {
                taskRunner.resumeFromCheckpoint(task.getId());
                // 恢复执行
            }
        }
    }
}

为什么要检查过期?因为:

  • 可能任务创建后根本没开始执行
  • 可能任务的数据源已经变了
  • 超过 24 小时,认为数据时效性过期,不再恢复

4.2 任务恢复入口:resumeFromCheckpoint()

resumeFromCheckpoint(taskId)
    │
    ├─► 原子获取执行权
    │       tryStartExecution(taskId)
    │       
    │       为什么又要原子操作?
    │       因为 RecoveryJob 和用户手动点击可能同时触发
    │       只有一个能成功获取执行权

    │
    ├─► 查询未完成的文档
    │       documentMapper.selectList()
    │       WHERE task_id = ? AND status NOT IN ('COMPLETED', 'FAILED')
    │       
    │       跳过已完成的,跳过失败的
    │       只恢复那些"处理中途"的文档

    │
    ├─► 检查是否全部完成
    │       if (unfinishedDocs.isEmpty()):
    │           checkAndSaveTaskResult()
    │           markCompleted()
    │           return
    │       
    │       特殊场景:所有文档都完成了,但任务结果没生成
    │       可能是崩溃时刚好在生成任务结果
    │       补上这一步就行

    │
    ├─► 创建并发资源
    │       ExecutorService(线程池)
    │       Semaphore(信号量)

    │
    ├─► 并发处理未完成的文档
    │       for each doc in unfinishedDocs:
    │           CompletableFuture.runAsync(() -> processDocument(doc))

    │
    └─► 等待完成,标记任务完成

4.3 分块级崩溃恢复:关键逻辑

processDocument() 处理一个文档时,会调用 processChunkedDocument()。这个方法会先查数据库:

查询已有分块
    │
    │   chunkMapper.selectList(documentId)
    │   返回:Chunk 1-10(已有记录)
    │
    ├─► 判断是否是崩溃恢复
    │       if (existingChunks.isEmpty()) → 正常流程
    │       else → 崩溃恢复流程

    │
    ├─► 崩溃恢复流程:
    │       │
    │       ├─► 收集已完成分块的摘要
    │       │       allSummaries = []
    │       │       
    │       │       for each chunk in existingChunks:
    │       │           if (chunk.status == COMPLETED && chunk.summary != null):
    │       │               allSummaries.add(chunk.getSummary())  ← 复用数据库保存的
    │       │       
    │       │       关键:summary 字段是之前保存的
    │       │       直接读出来,不用重新处理
    │       │
    │       ├─► 筛选未完成的分块
    │       │       pendingChunks = existingChunks.filter(c -> c.status != COMPLETED)
    │       │       
    │       │       可能包括:
    │       │       - status=SUMMARIZING(崩溃时正在处理)
    │       │       - status=PENDING(还没开始)
    │       │       - status=FAILED(之前失败)
    │       │
    │       ├─► 处理未完成的分块
    │       │       for each pendingChunk:
    │       │           semaphore.acquire()
    │       │           processChunk(pendingChunk)  ← 重新处理
    │       │           semaphore.release()
    │       │       
    │       │       处理完成后,每个分块的 summary 被保存
    │       │
    │       ├─► 合并所有摘要
    │       │       newSummaries = 处理未完成分块得到的新摘要
    │       │       allSummaries.addAll(newSummaries)
    │       │       
    │       │       最终有完整的摘要集合
    │       │
    │       ├─► AI 合并
    │       │       mergeSummariesWithAi(allSummaries)
    │       │       
    │       └─► 保存文档结果

五、并发控制:两层"关卡"

5.1 为什么需要两层控制?

假设一个任务有 3 个文档:

  • 文档1:不分块,直接处理
  • 文档2:25 个分块
  • 文档3:100 个分块

如果只控制文档并发(比如同时处理 2 个文档),那文档2 和文档3 开始后,会有 125 个分块同时调 AI。AI API 会受不了。

所以需要两层:

第一层:文档并发控制
        ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentDocs)
        
        比如 maxConcurrentDocs = 2
        同时最多处理 2 个文档

第二层:分块并发控制
        Semaphore semaphore = new Semaphore(maxConcurrentChunks)
        
        比如 maxConcurrentChunks = 10
        同一个任务里,最多 10 个分块同时调 AI

5.2 信号量的使用方式

CompletableFuture.supplyAsync(() -> {
    try {
        semaphore.acquire();  // 获取许可,如果没有就等待
        
        return processChunk(taskId, chunk);
        
    } finally {
        semaphore.release();  // 处理完一定要释放
    }
});

为什么用 try-finally?因为 processChunk() 可能抛异常,如果不释放信号量,后续分块会永远等待。

5.3 并发资源的管理

每个任务开始时创建:

taskExecutors.put(taskId, executor);     // 缓存线程池
chunkSemaphores.put(taskId, semaphore);  // 缓存信号量

任务结束时清理:

private void cleanup(Long taskId) {
    ExecutorService executor = taskExecutors.remove(taskId);
    if (executor != null) {
        executor.shutdown();  // 关闭线程池
    }
    
    chunkSemaphores.remove(taskId);  // 移除信号量
}

六、状态流转:像流水线一样

6.1 任务状态

PENDING     → 刚创建,还没开始
PROCESSING  → 正在处理
COMPLETED   → 全部完成
FAILED      → 处理失败
EXPIRED     → 中断超过24小时,不再恢复

6.2 文档状态

PENDING      → 还没处理
PARSING      → 正在解析文件内容
CHUNKING     → 正在分块(大文档)
SUMMARIZING  → 正在生成摘要
COMPLETED    → 处理完成
FAILED       → 处理失败

为什么这么多状态?因为崩溃恢复需要知道"停在哪里":

  • 文档是 SUMMARIZING → 分块已创建,直接查分块恢复
  • 文档是 CHUNKING → 分块正在创建,重新解析创建分块
  • 文档是 PARSING → 刚开始,重新解析文档

6.3 分块状态

PENDING      → 分块已创建,还没处理
SUMMARIZING  → 正在调用 AI
COMPLETED    → 处理完成,summary 已保存
FAILED       → 处理失败

七、崩溃恢复实例:完整演示

7.1 场景设定

用户上传了 2 个大文档,每个文档分 10 块:

任务 ID=123
    │
    ├── Doc1(10个分块)
    │       Chunk 1-10
    │
    └── Doc2(10个分块)
            Chunk 1-10

7.2 处理到一半崩溃

假设 Doc1 处理到 Chunk 5 时,应用突然关闭:

数据库状态(崩溃时):

doc_summary_chunk 表(Doc1 的分块):

| chunk_index | status      | summary     |
|-------------|-------------|-------------|
| 1           | COMPLETED   | "摘要内容1" | ← 已完成,有摘要
| 2           | COMPLETED   | "摘要内容2" | ← 已完成,有摘要
| 3           | COMPLETED   | "摘要内容3" | ← 已完成,有摘要
| 4           | COMPLETED   | "摘要内容4" | ← 已完成,有摘要
| 5           | SUMMARIZING | NULL        | ← 崩溃时正在处理,没有摘要
| 6           | PENDING     | NULL        | ← 还没开始
| 7           | PENDING     | NULL        | ← 还没开始
| 8           | PENDING     | NULL        | ← 还没开始
| 9           | PENDING     | NULL        | ← 还没开始
| 10          | PENDING     | NULL        | ← 还没开始

doc_summary_document 表:

| doc_id | status        |
|--------|---------------|
| Doc1   | SUMMARIZING   | ← 处理中途
| Doc2   | PENDING       | ← 还没开始

doc_summary_task 表:

| task_id | status      | last_active_time        |
|---------|-------------|-------------------------|
| 123     | PROCESSING  | 2026-05-03 10:30:00     |

7.3 应用重启后的恢复流程

应用启动
    ↓
@PostConstruct: DocSummaryTaskRecoveryJob.recoverUnfinishedTasks()
    ↓
查询未完成任务: task_id=123, status=PROCESSING
    ↓
检查过期: lastActiveTime 是 10:30,现在是 10:45
         中断 15 分钟,小于 24 小时,未过期
    ↓
taskRunner.resumeFromCheckpoint(123)
    ↓
【第一阶段:查询未完成文档】
    │
    │   SELECT * FROM doc_summary_document
    │   WHERE task_id=123 AND status NOT IN ('COMPLETED', 'FAILED')
    │   
    │   结果:Doc1(SUMMARIZING), Doc2(PENDING)
    │
    ↓
【第二阶段:并发处理这两个文档】
    │
    ├─► Thread-1: processDocument(123, Doc1)
    │       │
    │       ├─► 推送事件 DOCUMENT_START
    │       │       前端:"正在恢复处理 Doc1"
    │       │
    │       ├─► 更新状态: SUMMARIZING → PARSING
    │       │       注意:这里会重新解析文档内容
    │       │       (这是当前的实现,实际上可以优化)
    │       │
    │       ├─► 解析文档内容(读取文件)
    │       │       得到 50000 字的文本
    │       │
    │       ├─► 判断大小级别: MEDIUM(需要分块)
    │       │
    │       ├─► processChunkedDocument(123, Doc1, content)
    │       │       │
    │       │       ├─► 查询已有分块
    │       │       │       chunkMapper.selectList(documentId=Doc1)
    │       │       │       返回 10 个分块记录
    │       │       │
    │       │       ├─► 崩溃恢复路径(existingChunks 不为空)
    │       │       │       │
    │       │       │       ├─► 收集已完成摘要
    │       │       │       │       allSummaries = []
    │       │       │       │       
    │       │       │       │       Chunk 1: status=COMPLETED, summary="摘要1"
    │       │       │       │               → allSummaries.add("摘要1") ✓
    │       │       │       │       
    │       │       │       │       Chunk 2: status=COMPLETED, summary="摘要2"
    │       │       │       │               → allSummaries.add("摘要2") ✓
    │       │       │       │       
    │       │       │       │       Chunk 3: status=COMPLETED, summary="摘要3"
    │       │       │       │               → allSummaries.add("摘要3") ✓
    │       │       │       │       
    │       │       │       │       Chunk 4: status=COMPLETED, summary="摘要4"
    │       │       │       │               → allSummaries.add("摘要4") ✓
    │       │       │       │       
    │       │       │       │       Chunk 5: status=SUMMARIZING, summary=NULL
    │       │       │       │               → 不添加,没有摘要
    │       │       │       │       
    │       │       │       │       Chunk 6-10: status=PENDING, summary=NULL
    │       │       │       │               → 不添加,没有摘要
    │       │       │       │       
    │       │       │       │       最终:allSummaries = ["摘要1", "摘要2", "摘要3", "摘要4"]
    │       │       │       │       复用了 4 个已保存的摘要!
    │       │       │       │
    │       │       │       ├─► 筛选未完成的分块
    │       │       │       │       pendingChunks = existingChunks.filter(c -> c.status != COMPLETED)
    │       │       │       │       
    │       │       │       │       Chunk 5: status=SUMMARIZING → 需要重新处理
    │       │       │       │       Chunk 6-10: status=PENDING → 需要处理
    │       │       │       │       
    │       │       │       │       pendingChunks = [Chunk 5, Chunk 6, Chunk 7, Chunk 8, Chunk 9, Chunk 10]
    │       │       │       │       共 6 个分块需要处理
    │       │       │       │
    │       │       │       ├─► 处理未完成的分块
    │       │       │       │       for each pendingChunk:
    │       │       │       │           semaphore.acquire()
    │       │       │       │           processChunk(pendingChunk)
    │       │       │       │           semaphore.release()
    │       │       │       │       
    │       │       │       │       processChunk(Chunk 5):
    │       │       │       │           - 状态: SUMMARIZING → COMPLETED
    │       │       │       │           - 调 AI 生成摘要 → "摘要5"
    │       │       │       │           - chunk.setSummary("摘要5")
    │       │       │       │           - chunkMapper.updateById(chunk)
    │       │       │       │       
    │       │       │       │       processChunk(Chunk 6-10):
    │       │       │       │           - 状态: PENDING → SUMMARIZING → COMPLETED
    │       │       │       │           - 生成摘要 → "摘要6" ~ "摘要10"
    │       │       │       │           - 保存摘要
    │       │       │       │       
    │       │       │       │       处理完成,得到 6 个新摘要
    │       │       │       │
    │       │       │       ├─► 合并所有摘要
    │       │       │       │       allSummaries.addAll(newSummaries)
    │       │       │       │       
    │       │       │       │       allSummaries = [
    │       │       │       │           "摘要1", "摘要2", "摘要3", "摘要4",  ← 已完成的(复用)
    │       │       │       │           "摘要5", "摘要6", "摘要7", "摘要8", "摘要9", "摘要10"  ← 新处理的
    │       │       │       │       ]
    │       │       │       │       共 10 个摘要,完整了
    │       │       │       │
    │       │       │       ├─► AI 最终合并
    │       │       │       │       mergedContent = String.join("\n\n---\n\n", allSummaries)
    │       │       │       │       "摘要1\n\n---\n\n摘要2\n\n---\n\n..."
    │       │       │       │       
    │       │       │       │       aiService.generateSummaryWithPrompt(mergedContent, MERGE_PROMPT)
    │       │       │       │       → "本文是一篇关于...的研究论文"
    │       │       │       │
    │       │       │       └─► 保存文档结果
    │       │       │               saveDocumentResult(123, Doc1, "本文是一篇...")
    │       │       │               插入 doc_summary_result 表
    │       │       │               Doc1 完成
    │       │
    │       ├─► 更新状态: SUMMARIZING → COMPLETED
    │       │
    │       └─► 推送事件 DOCUMENT_DONE
    │               前端:"Doc1 处理完成"
    │
    ├─► Thread-2: processDocument(123, Doc2)
    │       │
    │       ├─► 正常流程(没有已有分块)
    │       │       existingChunks 为空 → 创建新分块
    │       │
    │       ├─► 创建 10 个分块,插入数据库
    │       │
    │       ├─► 处理所有 10 个分块
    │       │       Chunk 1-10: PENDING → SUMMARIZING → COMPLETED
    │       │       生成摘要并保存
    │       │
    │       ├─► AI 合并 → 文档摘要
    │       │
    │       └─► 保存结果
    │               Doc2 完成
    │
    ↓
【第三阶段:任务完成】
    │
    │   所有文档都处理完了
    │
    ├─► checkAndSaveTaskResult(123)
    │       │
    │       ├─► 查询所有文档摘要
    │       │       doc_summary_result: Doc1摘要, Doc2摘要
    │       │
    │       ├─► AI 合成任务整体摘要
    │       │       "本项目包含两个文档..."
    │       │
    │       └─► 保存 doc_summary_task_result
    │
    ├─► taskService.markCompleted(123)
    │       status = COMPLETED
    │
    └─► 推送事件 TASK_DONE
            前端:"任务处理完成"

7.4 改进效果对比

之前的实现(摘要不落盘):

崩溃时:
    Chunk 1-4: status=COMPLETED,但摘要只存在内存里
    崩溃后摘要丢失

恢复后:
    需要重新处理 Chunk 1-10(10个分块全部重做)
    等于浪费了 4 次 AI 调用

现在的实现(摘要落盘):

崩溃时:
    Chunk 1-4: status=COMPLETED, summary="摘要1~4"
    摘要已保存到数据库

恢复后:
    直接复用 Chunk 1-4 的摘要
    只处理 Chunk 5-10(6个分块)
    节省了 4 次 AI 调用

八、实时进度推送

处理过程中,前端需要知道进度。我们用 SSE(Server-Sent Events)推送事件:

事件类型 触发时机 前端显示
TASK_START 任务开始 “开始处理任务,共3个文档”
DOCUMENT_START 文档开始 “正在处理 report.pdf”
PARSE_DONE 解析完成 “文档解析完成,50000字符”
CHUNK_CREATED 分块创建 “创建分块 1/25”
CHUNK_SUMMARY 分块完成 “分块 3 摘要完成”
DOCUMENT_DONE 文档完成 “report.pdf 处理完成”
TASK_DONE 任务完成 “任务处理完成”
TASK_FAILED 任务失败 “任务失败: xxx”

前端收到这些事件,就能实时显示进度条和状态。


九、核心方法一览

方法 作用
execute(taskId) 同步执行任务(阻塞等待完成)
executeAsync(taskId) 异步执行任务(不阻塞)
processDocument(taskId, document) 处理单个文档
processSmallDocument() 小文档直接摘要
processChunkedDocument() 大文档分块处理,支持崩溃恢复
processChunk(chunk) 处理单个分块,摘要落盘
mergeSummariesWithAi() AI 合并多个摘要
checkAndSaveTaskResult() 检查任务是否全部完成,生成整体摘要
resumeFromCheckpoint(taskId) 崩溃恢复入口
stop(taskId) 停止任务执行

十、数据库表详解

10.1 doc_summary_task(任务表)

字段 作用
id 任务ID
user_id 用户ID
title 任务标题
status 任务状态:PENDING/PROCESSING/COMPLETED/FAILED/EXPIRED
total_count 文档总数
completed_count 已完成文档数
last_active_time 最后活动时间(判断过期)

10.2 doc_summary_document(文档表)

字段 作用
id 文档ID
task_id 所属任务ID
document_name 文件名
source_type 来源:LOCAL/KNOWLEDGE_BASE
source_path 源文件路径
upload_path 上传后的路径
status 文档状态
size_level 大小级别:SMALL/MEDIUM/LARGE

10.3 doc_summary_chunk(分块表)

字段 作用
id 分块ID
task_id 所属任务ID
document_id 所属文档ID
chunk_index 分块索引(第几块)
content 分块内容
summary 分块摘要(关键字段)
status 分块状态
overlap_before 前文重叠内容
boundary_context 边界上下文

10.4 doc_summary_result(文档摘要表)

字段 作用
id 结果ID
task_id 所属任务ID
document_id 文档ID
summary 文档摘要(分块合并后的)
create_time 创建时间

10.5 doc_summary_task_result(任务整体摘要表)

字段 作用
id 结果ID
task_id 任务ID
overall_summary 任务整体摘要(所有文档合并)
create_time 创建时间

十一、总结:核心设计思想

11.1 分层并发

  • 文档层:线程池控制,防止同时处理太多文档
  • 分块层:信号量控制,防止同时调太多 AI API

11.2 原子操作

  • tryStartExecution():用数据库条件更新,防止重复执行
  • 只有一条线程能成功更新状态,获取执行权

11.3 状态落盘

  • 每一步状态都保存到数据库
  • 崩溃恢复时,根据状态判断"停在哪里"

11.4 摘要落盘

  • 分块完成时,摘要保存到 chunk.summary 字段
  • 崩溃恢复时,直接读取已保存的摘要,复用而不重新处理

11.5 数据库统计

  • 判断任务完成时,直接查数据库统计
  • 不依赖内存计数,避免并发时的竞态问题

十二、适用场景

这个设计适用于:

  • 批量文档处理
  • 批量图片识别
  • 批量数据导入/转换
  • 任何"长任务 + 可分块 + 崩溃恢复"的场景

核心思想:把中间结果持久化,恢复时从断点继续。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐