RAG 业务落地开发指导

本文面向后续把这套 RAG 能力接入业务系统的开发者,重点回答三件事:

  1. 上游业务请求怎么进入 RAG。
  2. RAG 内部各组件怎么串起来。
  3. 数据分别存到 MySQL、文件存储、向量库和搜索引擎的哪里。

1. 总体边界

独立工程保留的是一套完整 RAG 子系统,不是简单 demo。

业务系统
  -> RAG HTTP/API 层
  -> 模型/存储/知识库配置
  -> 文档入库 pipeline
  -> 检索 pipeline
  -> QA prompt + 模型调用

代码入口:

src/main/java/com/aizuda/snail/ai/ragforge/controller/RagForgeController.java

这个 controller 只负责暴露接口和参数转换。真正可复用的业务边界在这些类:

RagDocumentService        接收文件/URL,做去重、资源存储、文档元数据入库
DocumentPipeline          解析、切片、写 chunk、写向量库、写搜索引擎
RagSearchService          检索总入口
RagSearchPipeline         检索流水线编排
RagQAService              检索后拼 Prompt 并调用 Chat 模型
KnowledgeService          知识库配置管理
ModelFactory              Chat / Embedding / Rerank 模型工厂
VectorStoreFactory        向量库工厂
SearchEngineFactory       搜索引擎工厂
ResourceService           原始文件存储

2. 配置链路

业务落地前要先准备四类配置。

模型提供商 -> 模型配置 -> 存储实例 -> 知识库

2.1 模型提供商

表:snail_ai_model_provider

作用:记录提供商,如 OpenAI、Ollama、Gemini 或内部兼容 OpenAI 协议的服务。

关键字段:

id              提供商 ID
provider_name   展示名称
provider_key    代码识别用 key,如 openai
is_enabled      是否启用

2.2 模型配置

表:snail_ai_model_config

作用:记录具体模型实例。

RAG 至少需要:

EMBEDDING   文档入库写向量、向量检索时使用
CHAT        QA 阶段生成答案时使用
RERANKER    可选,检索后重排时使用

关键字段:

id             模型配置 ID,业务配置里引用它
provider_id    所属提供商
model_key      真实模型名,如 text-embedding-3-small
model_type     CHAT / EMBEDDING / RERANKER
api_key        加密后的 key
api_endpoint   模型服务地址
config_json    模型扩展参数,如 dimensions、temperature、timeoutMs

代码读取路径:

ModelFactory.getModel(modelConfigId)
-> ModelConfigHandler.getConfigInfo
-> snail_ai_model_config
-> 具体模型实现

2.3 存储实例

表:snail_ai_store_instance

作用:把外部向量库和搜索引擎的连接信息配置化,不写死在代码里。

关键字段:

id          存储实例 ID
category    1=向量库,2=搜索引擎
type        1=PG_VECTOR,2=MILVUS,3=ELASTICSEARCH,4=PG_FULLTEXT
config      连接参数 JSON
status      是否启用
is_default  是否默认

向量库由 VectorStoreFactory 读取:

snail_ai_rag.vector_store_instance_id
-> snail_ai_store_instance.config
-> PGVector / Milvus / Elasticsearch 向量适配器

搜索引擎由 SearchEngineFactory 读取:

snail_ai_rag.search_engine_instance_id
-> snail_ai_store_instance.config
-> Elasticsearch BM25 适配器

2.4 知识库配置

表:snail_ai_rag

作用:这是 RAG 的主配置表,后续所有入库和检索都以 ragId 为主线。

关键字段:

id                         知识库 ID,也就是接口里的 ragId
name                       知识库名称
embedding_model_id          入库和向量检索使用的 Embedding 模型
rerank_model_id             默认 Rerank 模型
vector_store_instance_id    向量库存储实例
search_engine_enable        是否启用 BM25 搜索引擎
search_engine_instance_id   搜索引擎实例
config                      切片、检索、问答参数 JSON
dedup_strategy              文档去重策略
dedup_action                命中去重后的动作
upload_confirm              上传前是否二次确认

config 对应代码:

src/main/java/com/aizuda/snail/ai/persistence/rag/dataobject/RagConfigDO.java

结构:

chunkParams   切片参数:mode、maxChunkTokens、chunkOverlap、regex、smart 模型等
searchParams  检索参数:resultCount、rerankEnabled、denseWeight、rrfK、threshold 等
modelParams   QA 参数:modelId、nearbySliceCount、prompt

3. 文档入库上下游串联

入口:

POST /demo/rag/document/upload-and-process
POST /demo/rag/document/import-url-and-process

业务上游只需要提供:

ragId       写入哪个知识库
file/url    文档来源
可选去重参数 dedupStrategy / dedupAction

完整调用链:

RagForgeController
-> RagDocumentService.upload / importFromUrl
-> DocumentImportFactory
-> ResourceService
-> snail_ai_resource
-> snail_ai_rag_document(PENDING)
-> DocumentPipeline.processDocument
-> ResourceService.load(resourceId)
-> DocumentParserFactory
-> DocumentChunkingService
-> snail_ai_rag_chunk
-> VectorStoreFactory
-> 外部向量库 index: rag_{ragId}
-> SearchEngineFactory
-> 外部搜索引擎 index: rag_{ragId}
-> snail_ai_rag_document.status = SUCCESS / FAILED

3.1 原始文件怎么存

原始文件不直接塞到 snail_ai_rag_document.content

存储路径:

ResourceService.upload
-> LOCAL 或 MINIO
-> snail_ai_resource
-> snail_ai_rag_document.resource_id

核心表:snail_ai_resource

id             资源 ID
storage_key    本地相对路径或对象存储 key
original_name  原始文件名
file_size      文件大小
mime_type      MIME 类型
storage_type   LOCAL / MINIO
access_url     预览或访问 URL
biz_type       DOCUMENT
biz_id         ragId
creator_id     上传人,可为空

3.2 文档元数据怎么存

表:snail_ai_rag_document

一份上传文档对应一行。

关键字段:

id            documentId
rag_id        所属知识库
name          文件名
file_type     pdf/docx/xlsx/txt/md/html 等
source_type   UPLOAD / URL
status        0=PENDING 1=PARSING 2=PROCESSING 3=SUCCESS 4=FAILED
error_msg     失败原因
chunk_count   切片数量
content_hash  原始内容 SHA-256,用于文档级去重
resource_id   指向 snail_ai_resource.id

推荐业务用法:

上传后返回 documentId
业务系统保存 documentId 与自身业务单据的关系
前端或后台轮询 document.status 判断是否处理完成
失败时展示 error_msg 并允许重试 processDocument

3.3 切片怎么存

表:snail_ai_rag_chunk

一份文档会拆成多行 chunk。

关键字段:

id               chunkId
rag_id           所属知识库
document_id      所属文档
paragraph_index  段落序号
chunk_index      文档内切片序号
content          切片文本
token_count      估算 token 数
vector_id        外部向量库里的向量 ID
content_hash     chunk 文本 SHA-256,用于 chunk 级向量复用

为什么 MySQL 还要存 content

向量库和搜索引擎负责召回,不负责业务主数据。
最终展示、拼 Prompt、权限过滤、来源展示,都应回到 MySQL chunk/document 做补全。

3.4 向量怎么存

向量不存 MySQL,存外部向量库。

索引命名:

rag_{ragId}

代码:

IndexNameBuilder.KNOWLEDGE.build(Map.of("ragId", ragId))

写入内容:

id        vectorId,对应 snail_ai_rag_chunk.vector_id
content   chunk 文本
metadata  ragId、documentId、chunkId
vector    embedding 后的向量

关系:

snail_ai_rag_chunk.id
  -> metadata.chunkId

snail_ai_rag_chunk.vector_id
  -> 向量库 document/vector id

3.5 搜索引擎怎么存

搜索引擎用于 BM25/关键词召回。当前实现使用 Elasticsearch。

索引命名同样是:

rag_{ragId}

写入内容:

id        chunkId
content   chunk 文本
metadata  ragId、documentId、chunkId

注意:

search_engine_enable=false 时不会写搜索引擎,也不会走 BM25。
业务对编号、术语、错误码召回敏感时,建议开启 BM25。

4. 检索上下游串联

入口:

POST /demo/rag/search

请求:

{
  "ragId": 1,
  "query": "设备报警 E32 怎么处理?",
  "debug": true
}

完整调用链:

RagForgeController.search
-> RagSearchService.search
-> RagSearchPipeline
   -> ConfigResolveHandler
   -> QueryRewriteHandler
   -> VectorSearchHandler
   -> Bm25SearchHandler
   -> HybridFusionHandler
   -> RerankHandler
   -> FinalizeHandler

4.1 ConfigResolveHandler

输入:

ragId

读取:

snail_ai_rag
snail_ai_rag.config

输出到上下文:

knowledge
searchParams
modelParams
query

4.2 VectorSearchHandler

读取:

knowledge.vector_store_instance_id
knowledge.embedding_model_id
searchParams.resultCount

下游:

VectorStoreFactory
-> 外部向量库 rag_{ragId}

输出:

vectorResults: List<SearchResult>

其中 SearchResult.chunkId 来自向量 metadata,后面用于回查 MySQL。

4.3 Bm25SearchHandler

前提:

snail_ai_rag.search_engine_enable = true

读取:

knowledge.search_engine_instance_id
searchParams.resultCount

下游:

SearchEngineFactory
-> Elasticsearch rag_{ragId}

输出:

bm25Results: List<SearchResult>

4.4 HybridFusionHandler

作用:融合向量召回和 BM25 召回。

支持策略:

RRF           默认推荐,按排名倒数融合,比较稳定
WEIGHTED_SUM  按 denseWeight 做加权,适合调参后固定业务场景

配置来源:

snail_ai_rag.config.searchParams.fusionStrategy
snail_ai_rag.config.searchParams.denseWeight
snail_ai_rag.config.searchParams.rrfK

4.5 RerankHandler

作用:用 Reranker 模型对融合后的候选重新排序。

配置来源:

rerankEnabled
rerankModelId
enterRerankCount
resultCount

成本控制点:

只有前 enterRerankCount 个候选会送入 rerank。
最终只保留 resultCount 个结果。

4.6 FinalizeHandler

作用:把召回结果变成可展示、可拼 Prompt 的最终结果。

处理:

按 chunkId 回查 snail_ai_rag_chunk
按 documentId 回查 snail_ai_rag_document
补齐 content、documentName、documentId
按 nearbySliceCount 合并相邻切片
重排结果,缓解 lost-in-the-middle

5. 问答上下游串联

入口:

POST /demo/rag/qa/stream

完整调用链:

RagForgeController.qaStream
-> RagQAService.qaStream
-> snail_ai_rag.config.modelParams
-> RagSearchService.search
-> buildDocumentsText
-> buildSystemPrompt
-> ModelFactory.getModel(modelParams.modelId)
-> ChatModel.chatStreamModel
-> ResponseBodyEmitter

Prompt 拼装规则:

modelParams.prompt 中必须保留 <Documents>
RagQAService 会把检索结果拼成 documentsText,然后替换 <Documents>

示例:

请只根据以下参考资料回答用户问题。资料不足时明确说明不足,不要编造。

<Documents>

如果没有配置 prompt,则默认使用:

请根据以下参考资料回答用户的问题:

{documentsText}

6. 最小落地表集

docs/sql/snail_ai_schema.sql 是从原项目带出的全量建表脚本,里面包含 agent、memory、skill、openapi 等外围表。

如果只落地 RAG,最小核心表是:

snail_ai_model_provider
snail_ai_model_config
snail_ai_store_instance
snail_ai_rag
snail_ai_resource
snail_ai_rag_document
snail_ai_rag_chunk

建议保留但不是 RAG 主链路强依赖:

snail_ai_user
snail_ai_model_usage_stat

可以裁剪的外围表:

snail_ai_agent*
snail_ai_mcp_server
snail_ai_skill*
snail_ai_app
snail_ai_client_node
snail_ai_openapi_user
memory 相关表

如果裁剪表,也要同步删除对应 mapper/PO 或限制 Spring 扫描范围,否则 MyBatis/Spring 仍可能加载不需要的组件。

7. 核心表关系

snail_ai_model_provider 1 ---- N snail_ai_model_config

snail_ai_store_instance 1 ---- N snail_ai_rag.vector_store_instance_id
snail_ai_store_instance 1 ---- N snail_ai_rag.search_engine_instance_id

snail_ai_model_config 1 ---- N snail_ai_rag.embedding_model_id
snail_ai_model_config 1 ---- N snail_ai_rag.rerank_model_id
snail_ai_model_config 1 ---- N snail_ai_rag.config.modelParams.modelId

snail_ai_rag 1 ---- N snail_ai_rag_document
snail_ai_rag_document 1 ---- N snail_ai_rag_chunk
snail_ai_resource 1 ---- 1 snail_ai_rag_document.resource_id

snail_ai_rag_chunk.vector_id ---- 外部向量库 rag_{ragId}
snail_ai_rag_chunk.id ---- 外部搜索引擎 rag_{ragId} 文档 ID 或 metadata.chunkId

当前 SQL 没有强制声明所有外键,主要靠代码维护关系。这样便于迁移和清理外部索引,但业务落地时要自己保证删除顺序。

8. 写入一致性和失败处理

文档状态:

0 PENDING     已创建文档行,等待处理
1 PARSING     预留状态
2 PROCESSING  解析/切片/写外部索引中
3 SUCCESS     入库完成
4 FAILED      入库失败,error_msg 保存原因

写入顺序:

1. 原始文件写 Resource
2. 文档行写 RagDocument,PENDING
3. 解析文件
4. 切片
5. 写 RagChunk
6. 写向量库,并回填 vector_id
7. 写搜索引擎
8. 更新 RagDocument 状态

注意:

向量库写入失败会导致文档处理失败。
搜索引擎写入当前是非致命失败,失败时向量检索仍可用,但 BM25 召回会缺数据。
重跑同一 documentId 前会清理旧 chunk 和不再被引用的向量。

业务落地建议:

上传接口只返回 documentId,不要立即假设可检索。
前端轮询 document.status,或后端加异步任务/消息通知。
失败时展示 error_msg,并提供重新处理入口。

9. 去重策略

文档级去重字段:

snail_ai_rag_document.rag_id
snail_ai_rag_document.name
snail_ai_rag_document.content_hash

策略:

0 NONE                不去重
1 BY_NAME             同库同名重复
2 BY_CONTENT          同库同内容重复
3 BY_NAME_OR_CONTENT  同名或同内容重复

冲突动作:

0 REJECT     拒收并报错
1 SKIP       跳过本次上传,返回旧文档
2 OVERWRITE  删除旧文档、chunk、向量和资源后重新入库

Chunk 级去重:

snail_ai_rag_chunk.content_hash

如果同一知识库里已有相同 chunk 且已有 vector_id,新 chunk 会复用旧 vector_id,避免重复 embedding。

10. 业务系统接入建议

10.1 上游业务对象和 ragId 的关系

推荐业务侧建自己的关联表,例如:

business_id
rag_id
document_id
owner_id
permission_scope
created_at

RAG 子系统只关心 ragId/documentId/chunkId,业务权限、租户、栏目、产品线建议放在业务侧或扩展 metadata。

10.2 权限过滤放哪里

可选位置:

上传前:限制谁能写某个 ragId
检索前:限制谁能查某个 ragId
FinalizeHandler:按 documentId/chunkId 做结果过滤
向量/搜索 metadata:写入 tenantId、deptId、bizId 后在检索时过滤

如果权限是强要求,建议不要只在前端控制,至少在 search/qa 入口和 FinalizeHandler 做后端校验。

10.3 同步和异步

当前 demo 的 upload-and-process 是同步处理,方便学习完整链路。

生产建议:

上传接口:只写资源和文档行,返回 documentId
后台任务:异步调用 DocumentPipeline.processDocument(documentId)
查询接口:根据 document.status 展示处理进度

这样可以避免大文件解析、embedding、外部索引写入导致 HTTP 超时。

10.4 业务可扩展点

新增文件解析器      实现 DocumentParser,并注册到 DocumentParserFactory
新增切片策略        实现 ChunkStrategy,并接入 DocumentChunkingService
新增向量库          实现 SnailAiVectorStore,并注册 VectorStoreFactory.REGISTER
新增搜索引擎        实现 SearchEngine,并注册 SearchEngineFactory.REGISTER
新增模型提供商      实现 Chat/Embedding/Rerank 对应构建逻辑
检索后权限过滤      扩展 FinalizeHandler
答案引用格式        扩展 RagQAService.buildDocumentsText 或 buildSystemPrompt

11. 落地检查清单

上线前逐项确认:

MySQL 已执行核心表 SQL
snail_ai_model_provider 已有提供商
snail_ai_model_config 已有 EMBEDDING/CHAT/RERANKER
snail_ai_store_instance 已有向量库实例
需要 BM25 时已有搜索引擎实例
snail_ai_rag 已绑定 embeddingModelId/vectorStoreInstanceId/searchEngineInstanceId
snail_ai_rag.config 已配置 searchParams/modelParams/chunkParams
原始文件存储 LOCAL/MINIO 可读写
向量库 rag_{ragId} 可写可查
搜索引擎 rag_{ragId} 可写可查
Prompt 模板保留 <Documents>
业务权限已在入口或 FinalizeHandler 处理

本项目完整源码已上传至 Gitee,需要的朋友可自行下载学习:
👉 源码地址:https://gitee.com/ww_qq_22/ragforge

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐