pai-smart架构流程详解(文件上传模块)
文件上传与解析模块实现了大文件的分片上传、断点续传、文件合并以及文档解析功能。
通过 Redis 和 MinIO 的结合,确保大文件上传的可靠性;并通过 Kafka 实现异步处理。模块支持多种文档格式(PDF、Word、Excel)的解析,并提取文本内容用于后续向量化处理。文本向量化通过调用豆包 API 实现,生成的向量数据目前存储在 Elasticsearch 中
一、核心功能设计

二、数据流转与存储设计
文件从上传到向量化完成的完整流程:
- 客户端计算文件 MD5,发起上传请求→服务端验证文件是否已存在,返回分片策略
- 客户端根据策略分片上传文件
- 服务端接收分片,存入 MinIO 并更新 Redis 状态
- 所有分片上传完成后,触发合并操作
- 合并完成后发送解析任务到 Kafka→解析服务消费任务,根据文件类型选择相应解析器提取文本
- 文本分块后发送向量化任务到 Kafka→向量化服务消费任务,调用豆包 API 将文本转换为向量表示
- 向量数据写入 Elasticsearch ,通知用户处理完成

-
01、MySQL
文件主表(file_upload):存储文件元信息,如 MD5、名称、大小、状态
分片表(chunk_info):记录每个分片的信息,包括索引、MD5、存储路径
解析结果表(document_vectors):存储文本分块和向量化结果的元数据
02、Redis
使用 BitSet 记录已上传分片的位图(SETBIT命令); 存储上传任务的临时状态和进度; 缓存热点文件的元数据,减轻数据库压力
03、MinIO
临时分片:存储上传的文件分片,路径结构为/temp/{fileMd5}/{chunkIndex}完整文件:合并后的文件存储在/documents/{userId}/{fileName}
存储策略:实现热冷数据分离
04、 Elasticsearch
存储文本向量数据和原始文本内容,索引基于文件 MD5 和分块 ID 组织
-
三、关键流程
-
1.分片上传流程:
1.前端计算上传文件的md5,将文件分片(5MB为一个分片),将分片数据,分片索引上传到后端
2.后端接受分片,并落存:
2.1.调用UploadService.uploadChunk
2.2.获取文件的类型和信息,检查文件状态(是否可以上传),检查分片是否上传,检查Redis,数据库中是否存在分片信息,检查MinIo是否存在该分片

如果Redis里存在,但数据库没有则重新上传数据库或者MinIo没有,则将标记设为false,触发未上传链路
2.3.计算分片的md5,构建存储路径,将分片上传到数据库和MinIo

每次分片上传完成都需要在Redis里的BitMap标记,这是项目实现断点重连的关键:
我们会用文件的 MD5 作为 Redis 的 key,然后用一串“0”和“1”的位图来记录每个分片的状态,比如第 0 个位代表第 0 个分片,第 1 个位代表第 1 个分片……上传一个分片就把对应的 bit 位标记为 1。
2.分片合并流程:
1.前端发现分片上传完之后,触发合并请求
2.后端收到请求做权限与完整性检验,确保用户有权限操作该文件,文件分片上传完成
3.后端检验完成之后,调用MinIo的合并api(依据需要相同的fileMD5)(使用 MD5 作为 MinIO 对象路径,确保同名不同内容的文件不会互相覆盖)

4.合并完成之后调用删除接口,将Redis和MinIo里的分片数据删除
5.发起异步请求处理
5.1调用向量模型估算文件需要的token

5.2发送到kafka中处理

3.文件处理流程:
1.发送到kafka处理:

2.获取MinIo里的文件数据
消费者拿到 filePath 后,把对象存储里的合并文件读出来,得到可读取的数据流

3.将这个流发送给tika解析

4.解析后的数据,再用向量模型生成向量

5.文档向量化,并存入es

5.1es操作:

用户上传文件 → MinIO存储文件 → ParseService解析分块 → DocumentVector表存储分块 → VectorizationService向量化 → Elasticsearch存储向量
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)