本文详细介绍了RAG大模型知识库问答系统的搭建过程,从文档切分、向量检索到Prompt工程,深入浅出地讲解了每个关键步骤,并分享了作者在实践过程中踩过的坑和解决方案。对于想要学习大模型应用的小白和程序员来说,本文提供了宝贵的参考和指导。

一、RAG到底在解决什么问题

在动手之前,我想先聊聊RAG这个概念,因为很多刚接触的朋友容易搞混。
image.png

大模型很强,但它有两个致命弱点:

第一,知识有截止日期。 GPT-4的训练数据截止到某个时间点,它不知道你们公司上周发布的新规范,也不知道你们昨天刚修复的那个bug是怎么解决的。

第二,会一本正经地胡说八道。 当大模型遇到它不知道的问题时,它不会老老实实说我不知道,而是会基于它学过的通用知识,给你编一个看起来很合理但其实是错的答案。这就是所谓的幻觉(Hallucination)。

RAG(Retrieval-Augmented Generation,检索增强生成)的核心思路其实很简单:别让大模型靠想象力答题,先帮它把参考资料找出来,让它照着资料回答。
image.png
具体来说分三步:

  1. 把你的私有文档切成小块,转成向量存起来

  2. 用户提问时,先根据问题检索出最相关的文档片段

  3. 把问题和检索到的内容一起喂给大模型,让它基于这些材料生成答案

听起来不复杂对吧?我当时也是这么想的,然后就踩了一堆坑。

二、第一个大坑:文档切分没那么简单


我最初的方案特别粗暴——用LangChain的RecursiveCharacterTextSplitter,设置chunk_size=500,overlap=50,直接把所有文档切成小块。
image.png
代码写起来确实很简单:

from langchain.text_splitter import RecursiveCharacterTextSplitter

def naive_split(text):
    最初的简单切分方案——后来证明这是个坑
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        separators=[\n\n, \n, 。, !, ?,  , ]
    )
    chunks = splitter.split_text(text)
return chunks

# 测试一下
sample_text = 
# MySQL主从切换操作手册

## 1. 前置检查
在执行主从切换之前,必须完成以下检查:
- 确认从库同步状态正常(Seconds_Behind_Master = 0)
- 确认没有正在执行的大事务
- 通知相关业务方,确认切换时间窗口

## 2. 切换步骤
2. **1 在主库执行只读设置**

SET GLOBAL read_only = 1;

2. **2 等待从库完全同步**

在从库执行 SHOW SLAVE STATUS,确认 Seconds_Behind_Master = 0

2. **3 停止从库复制**

STOP SLAVE;
RESET SLAVE ALL;

## 3. 回滚方案
如果切换失败,按以下步骤回滚...

chunks = naive_split(sample_text)
for i, chunk in enumerate(chunks):
print(f Chunk {i+1} )
print(chunk[:100] + ... if len(chunk) > 100 else chunk)

看起来没毛病是吧?但实际用起来问题大了。

有一次用户问:MySQL切换前需要做哪些检查?系统返回的文档片段是这样的:

确认没有正在执行的大事务
- 通知相关业务方,确认切换时间窗口

## 2. 切换步骤
2. **1 在主库执行只读设置**

SET GLOBAL read_only = 1;

发现问题了吗?这个片段恰好从检查步骤的中间切开了!第一条检查项确认从库同步状态正常被切到了上一个chunk里。用户问的是需要做哪些检查,结果我们给大模型的参考资料里,第一条检查项就没包含进去。

核心教训:机械地按字数切分,会打断文档的语义完整性。

后来我改成了基于语义结构的切分策略:

import re
from typing import List, Dict

class SmartDocumentSplitter:

    语义感知的文档切分器
    核心思路:尊重文档的原有结构,按标题、段落等语义边界切分

def __init__(self, max_chunk_size=800, min_chunk_size=100):
        self.max_chunk_size = max_chunk_size
        self.min_chunk_size = min_chunk_size

def split_markdown(self, text: str) -> List[Dict]:

        针对Markdown文档的切分
        保持标题层级结构,每个chunk都带上完整的上下文路径

        chunks = []
        current_headers = {1: , 2: , 3: }  # 记录当前的标题层级

# 按行处理,识别标题和内容
        lines = text.split('\n')
        current_content = []

for line in lines:
# 检测Markdown标题
            header_match = re.match(r'^(#{1,3})\s+(.+)$', line)

if header_match:
# 遇到新标题,先保存之前的内容
if current_content:
                    chunk_text = '\n'.join(current_content).strip()
if len(chunk_text) >= self.min_chunk_size:
                        chunks.append({
'content': chunk_text,
'headers': dict(current_headers),
'context_path': self._build_context_path(current_headers)
                        })
                    current_content = []

# 更新标题层级
                level = len(header_match.group(1))
                title = header_match.group(2)
                current_headers[level] = title

# 清除下级标题
for l in range(level + 1, 4):
                    current_headers[l] = 

                current_content.append(line)
else:
                current_content.append(line)

# 如果当前内容超过最大长度,强制切分(但尽量在段落边界)
                content_so_far = '\n'.join(current_content)
if len(content_so_far) > self.max_chunk_size:
                    chunk_text = content_so_far.strip()
                    chunks.append({
'content': chunk_text,
'headers': dict(current_headers),
'context_path': self._build_context_path(current_headers)
                    })
                    current_content = []

# 别忘了最后一段
if current_content:
            chunk_text = '\n'.join(current_content).strip()
if len(chunk_text) >= self.min_chunk_size:
                chunks.append({
'content': chunk_text,
'headers': dict(current_headers),
'context_path': self._build_context_path(current_headers)
                })

return chunks

def _build_context_path(self, headers: Dict) -> str:
        构建层级路径,比如:MySQL主从切换 > 前置检查
        path_parts = [h for h in [headers[1], headers[2], headers[3]] if h]
return' > '.join(path_parts) if path_parts else'未分类'

def enrich_chunk_with_context(self, chunk: Dict) -> str:

        关键技巧:给每个chunk加上上下文前缀
        这样即使单独看这个片段,也能知道它属于哪个章节

        context = f[文档路径:{chunk['context_path']}]\n\n
return context + chunk['content']

# 实际使用示例
splitter = SmartDocumentSplitter(max_chunk_size=800)
chunks = splitter.split_markdown(sample_text)

print(f切分后共 {len(chunks)} 个片段\n)
for i, chunk in enumerate(chunks):
print(f=== Chunk {i+1} ===)
print(f路径:{chunk['context_path']})
print(f内容预览:{chunk['content'][:150]}...)
print()

这样切出来的效果就好多了。每个chunk开头都会带上它的位置信息,大模型在回答时能更准确地理解这段内容的上下文。

不过说实话,这个方案也不是万能的。对于那些格式不规范的老文档(没有清晰的标题结构),切分效果依然一般。后来我又针对不同类型的文档做了差异化处理,这个我们后面再说。

三、第二个大坑:向量检索的语义鸿沟


解决了切分问题,下一步就是向量化和检索了。我用的是开源的BGE模型做Embedding,用Milvus做向量数据库。

第一版的检索代码很直白:

from sentence_transformers import SentenceTransformer
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import numpy as np

class VectorStore:
    向量存储和检索

def __init__(self, model_name='BAAI/bge-base-zh-v1.5'):
# 加载Embedding模型
        self.model = SentenceTransformer(model_name)
        self.dim = 768# BGE base模型的向量维度

# 连接Milvus
        connections.connect(default, host=localhost, port=19530)

def create_collection(self, collection_name: str):
        创建集合
if utility.has_collection(collection_name):
            utility.drop_collection(collection_name)

        fields = [
            FieldSchema(name=id, dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name=content, dtype=DataType.VARCHAR, max_length=4096),
            FieldSchema(name=context_path, dtype=DataType.VARCHAR, max_length=512),
            FieldSchema(name=embedding, dtype=DataType.FLOAT_VECTOR, dim=self.dim)
        ]
        schema = CollectionSchema(fields, description=知识库文档)
        collection = Collection(collection_name, schema)

# 创建索引
        index_params = {
            metric_type: COSINE,
            index_type: IVF_FLAT,
            params: {nlist: 128}
        }
        collection.create_index(embedding, index_params)
return collection

def insert_documents(self, collection_name: str, chunks: list):
        插入文档
        collection = Collection(collection_name)

        contents = [chunk['content'] for chunk in chunks]
        context_paths = [chunk['context_path'] for chunk in chunks]

# 批量生成Embedding
        embeddings = self.model.encode(contents, normalize_embeddings=True)

        collection.insert([contents, context_paths, embeddings.tolist()])
        collection.flush()
print(f成功插入 {len(chunks)} 条文档)

def search(self, collection_name: str, query: str, top_k: int = 5):
        基础检索
        collection = Collection(collection_name)
        collection.load()

# 生成查询向量
        query_embedding = self.model.encode([query], normalize_embeddings=True)

        results = collection.search(
            data=query_embedding.tolist(),
            anns_field=embedding,
            param={metric_type: COSINE, params: {nprobe: 16}},
            limit=top_k,
            output_fields=[content, context_path]
        )

return results[0]

基本功能是没问题的。但实际跑起来,我发现了一个让人抓狂的现象——用户的口语化提问和文档的正式表述之间存在巨大的语义鸿沟。

举个例子:

  • 用户问:数据库挂了怎么办
  • 文档标题是:MySQL服务异常恢复操作手册

这两个在语义上是相关的,但向量相似度可能并不高。因为用户说的挂了和文档里的异常,用词差异很大。

更坑的是,有时候检索出的Top 5结果里,真正相关的那篇可能只排在第3或第4位,但前两名是一些看起来相关但实际上文不对题的内容。如果我只取Top 3喂给大模型,可能就漏掉了最关键的信息。

后来我采用了一个两阶段检索的策略:先用向量检索做粗筛,再用重排序模型做精排。

from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch

classEnhancedRetriever:
    增强版检索器:向量检索 + 重排序

def__init__(self, vector_store: VectorStore):
        self.vector_store = vector_store

# 加载重排序模型(BGE Reranker效果不错)
        self.reranker_tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-reranker-base')
        self.reranker_model = AutoModelForSequenceClassification.from_pretrained('BAAI/bge-reranker-base')
        self.reranker_model.eval()

defretrieve_with_rerank(self, collection_name: str, query: str, 
                             initial_top_k: int = 20, final_top_k: int = 5):

        两阶段检索:
1. 向量检索召回 initial_top_k 个候选
2. 用重排序模型精排,返回 final_top_k 个结果

# 第一阶段:向量检索(召回更多候选)
        initial_results = self.vector_store.search(collection_name, query, top_k=initial_top_k)

ifnot initial_results:
return []

# 准备重排序
        candidates = []
for hit in initial_results:
            candidates.append({
'content': hit.entity.get('content'),
'context_path': hit.entity.get('context_path'),
'vector_score': hit.score  # 保留向量检索得分,用于调试
            })

# 第二阶段:重排序
        rerank_scores = self._compute_rerank_scores(query, [c['content'] for c in candidates])

for i, score inenumerate(rerank_scores):
            candidates[i]['rerank_score'] = score

# 按重排序分数排序
        candidates.sort(key=lambda x: x['rerank_score'], reverse=True)

return candidates[:final_top_k]

def_compute_rerank_scores(self, query: str, documents: list) -> list:
        计算query和每个文档的相关性分数
        scores = []

with torch.no_grad():
for doc in documents:
# Reranker的输入格式是 [query, document]
                inputs = self.reranker_tokenizer(
                    [[query, doc]], 
                    padding=True, 
                    truncation=True, 
                    max_length=512, 
                    return_tensors='pt'
                )
                outputs = self.reranker_model(**inputs)
                score = outputs.logits.squeeze().item()
                scores.append(score)

return scores

defretrieve_with_query_expansion(self, collection_name: str, query: str, 
                                       llm_client, top_k: int = 5):

        进阶技巧:查询扩展
        用大模型改写用户问题,生成多个变体,再合并检索结果

# 让大模型帮我们扩展查询
        expansion_prompt = f请将下面这个问题改写成3个不同的表达方式,保持意思相同但用词不同。
每行输出一个改写结果,不要序号,不要其他解释。

原问题:{query}

        expanded_queries = llm_client.generate(expansion_prompt).strip().split('\n')
        expanded_queries = [q.strip() for q in expanded_queries if q.strip()]

# 加上原始查询
        all_queries = [query] + expanded_queries[:3]  # 最多取3个扩展查询

print(f扩展后的查询:{all_queries})  # 调试用

# 对每个查询分别检索
        all_candidates = {}
for q in all_queries:
            results = self.vector_store.search(collection_name, q, top_k=10)
for hit in results:
                content = hit.entity.get('content')
if content notin all_candidates:
                    all_candidates[content] = {
'content': content,
'context_path': hit.entity.get('context_path'),
'best_score': hit.score,
'hit_count': 1
                    }
else:
# 被多个查询命中的文档,增加权重
                    all_candidates[content]['hit_count'] += 1
                    all_candidates[content]['best_score'] = max(
                        all_candidates[content]['best_score'], 
                        hit.score
                    )

# 综合评分:命中次数 * 最高得分
        candidates = list(all_candidates.values())
for c in candidates:
            c['combined_score'] = c['hit_count'] * c['best_score']

        candidates.sort(key=lambda x: x['combined_score'], reverse=True)
return candidates[:top_k]

查询扩展这招特别好用。比如用户问数据库挂了怎么办,大模型可能会扩展成:

  • MySQL服务故障如何处理
  • 数据库无法连接的解决方案
  • 数据库宕机恢复步骤

这几个查询一起检索,能覆盖更多的相关文档。

四、第三个大坑:Prompt工程的门道比想象中深


检索的问题解决了,接下来就是把检索到的内容和用户问题一起喂给大模型了。这一步我本以为最简单,没想到也踩了不少坑。

最初的Prompt特别朴素:


defbuild_naive_prompt(query: str, context_docs: list) -> str:
    最初的简单Prompt——后来证明太天真了
    context = /n/n.join([doc['content'] for doc in context_docs])

    prompt = f根据以下参考资料回答用户问题。

参考资料:
{context}

用户问题:{query}

请回答:

return prompt

这个Prompt有几个严重问题:

问题一:大模型不知道什么时候该说不知道。 当参考资料里确实没有答案时,它还是会编一个出来。

问题二:没有引导大模型说明信息来源。 用户看到答案,不知道是从哪篇文档里来的,无法追溯和验证。

问题三:对于复杂问题,回答的结构不够清晰。

后来迭代了很多版,最终稳定下来的Prompt是这样的:


defbuild_rag_prompt(query: str, context_docs: list, 
                     include_sources: bool = True) -> str:

    生产环境使用的Prompt模板
    关键设计:明确角色定位、限制回答范围、要求标注来源

# 格式化上下文,每段都标注来源

    context_parts = []
for i, doc inenumerate(context_docs, 1):
        source = doc.get('context_path', '未知来源')
        context_parts.append(f【资料{i},来源:{source}】/n{doc['content']})

    context = /n/n/n/n.join(context_parts)

    prompt = f你是一个企业内部知识库助手,专门帮助员工查找和理解公司内部文档。

## 你的工作准则

1. **只根据提供的参考资料回答问题,不要使用你自己的知识。**

2. **如果参考资料中没有相关信息,请明确说根据现有资料,我无法找到关于这个问题的信息,并建议用户联系相关部门或换个关键词搜索。**

3. **回答时请标注信息来源,格式如【资料1】,方便用户追溯原文。**

4. **对于操作类问题,请按步骤清晰地列出;对于概念类问题,先给出简明定义再展开解释。**

5. **如果不同资料中的信息有冲突,请指出差异并说明各自的适用场景。**

## 参考资料

{context}

## 用户问题

{query}

## 回答要求

请根据上述参考资料回答用户问题。记住:

- 只使用参考资料中的信息
- 标注信息来源
- 没有把握的内容不要编造

return prompt

defbuild_conversational_prompt(query: str, context_docs: list, 
                                 chat_history: list = None) -> str:

    支持多轮对话的Prompt
    需要带上历史对话记录,让大模型理解上下文

    context_parts = []
for i, doc inenumerate(context_docs, 1):
        source = doc.get('context_path', '未知来源')
        context_parts.append(f【资料{i},来源:{source}】/n{doc['content']})
    context = /n/n/n/n.join(context_parts)

# 格式化历史对话

    history_text = 
if chat_history:
        history_parts = []
for turn in chat_history[-5:]:  # 只保留最近5轮,避免太长
            history_parts.append(f用户:{turn['user']})
            history_parts.append(f助手:{turn['assistant']})
        history_text = /n.join(history_parts)

    prompt = f你是一个企业内部知识库助手。

## 参考资料

{context}

## 对话历史

{history_text if history_text else (这是对话的开始)}

## 当前问题

用户:{query}

## 回答准则

1. **优先根据参考资料回答,如无相关信息请明确说明**

2. **考虑对话历史的上下文(如用户说它可能指代之前提到的概念)**

3. **标注信息来源**

助手:

return prompt

关于Prompt,我还想分享一个很重要的经验:不要试图在一个Prompt里塞太多指令。

一开始我把各种要求都写进去:回答要准确、要简洁、要友好、要专业、要标注来源、要分步骤、遇到不确定要说不知道……结果发现模型反而被绕晕了,有时候顾了这个忘了那个。

后来我的做法是:区分核心指令和优化指令,核心指令必须保留,优化指令可以根据问题类型动态调整。


classPromptBuilder:
    Prompt构建器:根据问题类型动态调整

# 核心指令——任何情况都必须包含

    CORE_INSTRUCTIONS = 

1. **只使用参考资料中的信息回答,不要编造**

2. **资料中没有的信息,明确说无法找到相关信息**

3. **标注信息来源【资料X】**

# 操作类问题的额外指令

    PROCEDURE_INSTRUCTIONS = 
回答格式要求:

- 按步骤编号列出(第一步、第二步...)
- 每个步骤要明确操作对象和操作动作
- 重要的警告或注意事项用⚠️标出

# 概念解释类问题的额外指令

    CONCEPT_INSTRUCTIONS = 
回答格式要求:

- 先用一句话给出核心定义
- 再详细解释关键点
- 如有必要,举例说明

# 故障排查类问题的额外指令

    TROUBLESHOOT_INSTRUCTIONS = 
回答格式要求:

- 先列出可能的原因
- 针对每个原因给出排查方法
- 给出解决方案或规避建议

    @classmethod
defbuild(cls, query: str, context_docs: list, question_type: str = general) -> str:
        根据问题类型构建Prompt

# 简单的问题分类逻辑(实际项目中可以用分类模型)

if question_type == auto:
            question_type = cls._classify_question(query)

        extra_instructions = 
if question_type == procedure:
            extra_instructions = cls.PROCEDURE_INSTRUCTIONS
elif question_type == concept:
            extra_instructions = cls.CONCEPT_INSTRUCTIONS
elif question_type == troubleshoot:
            extra_instructions = cls.TROUBLESHOOT_INSTRUCTIONS

        context = cls._format_context(context_docs)

        prompt = f你是企业内部知识库助手。

## 必须遵守的规则

{cls.CORE_INSTRUCTIONS}

{f## 回答格式{extra_instructions} if extra_instructions else }

## 参考资料

{context}

## 用户问题

{query}

请回答:

return prompt

    @classmethod
def_classify_question(cls, query: str) -> str:
        简单的问题分类(基于关键词)
        procedure_keywords = [怎么做, 如何操作, 步骤, 流程, 怎样]
        concept_keywords = [是什么, 什么是, 定义, 解释, 区别]
        troubleshoot_keywords = [为什么, 报错, 失败, 异常, 问题, 故障]

        query_lower = query.lower()

ifany(kw in query_lower for kw in procedure_keywords):
return procedure
elifany(kw in query_lower for kw in concept_keywords):
return concept
elifany(kw in query_lower for kw in troubleshoot_keywords):
return troubleshoot
else:
return general

    @classmethod
def_format_context(cls, context_docs: list) -> str:
        parts = []
for i, doc inenumerate(context_docs, 1):
            source = doc.get('context_path', '未知来源')
            parts.append(f【资料{i},来源:{source}】/n{doc['content']})
return /n/n/n/n.join(parts)

五、串起来:完整的RAG Pipeline


前面说了一堆细节,现在把它们串成一个完整的Pipeline:
Mermaid Chart - Create complex, visual diagrams with text.-2026-01-13-113354.png


from openai import OpenAI
from typing importList, Dict, Optional
import json

classRAGPipeline:

    完整的RAG处理流程
    文档切分 -> 向量化存储 -> 检索 -> 重排序 -> 生成回答

def__init__(self,
llm_base_url: str = https://api.deepseek.com,
                 llm_api_key: str = your-api-key,
                 llm_model: str = deepseek-chat):

# 初始化各个组件

        self.splitter = SmartDocumentSplitter(max_chunk_size=800)
        self.vector_store = VectorStore()
        self.retriever = EnhancedRetriever(self.vector_store)

# 初始化LLM客户端(这里用DeepSeek,也可以换成其他的)

        self.llm_client = OpenAI(base_url=llm_base_url, api_key=llm_api_key)
        self.llm_model = llm_model

        self.collection_name = knowledge_base

defingest_documents(self, documents: List[Dict]):

        文档入库
        documents格式:[{title: 文档标题, content: 文档内容, source: 来源}]

print(f开始处理 {len(documents)} 篇文档...)

        all_chunks = []
for doc in documents:

# 在内容前加上标题,帮助切分器识别结构

            full_content = f# {doc['title']}/n/n{doc['content']}

            chunks = self.splitter.split_markdown(full_content)

# 给每个chunk加上文档来源信息

for chunk in chunks:
                chunk['source_doc'] = doc.get('source', doc['title'])

            all_chunks.extend(chunks)

print(f切分后共 {len(all_chunks)} 个片段)

# 创建集合并插入

        self.vector_store.create_collection(self.collection_name)
        self.vector_store.insert_documents(self.collection_name, all_chunks)

print(文档入库完成!)

defquery(self, 
              question: str, 
              chat_history: Optional[List[Dict]] = None,
              top_k: int = 5,
              use_rerank: bool = True) -> Dict:

        处理用户查询
        返回:{answer: 回答内容, sources: [引用的来源], retrieved_docs: [检索到的文档]}

# 1. 检索相关文档

if use_rerank:
            retrieved_docs = self.retriever.retrieve_with_rerank(
                self.collection_name, question, 
                initial_top_k=20, final_top_k=top_k
            )
else:
            results = self.vector_store.search(self.collection_name, question, top_k=top_k)
            retrieved_docs = [{
'content': hit.entity.get('content'),
'context_path': hit.entity.get('context_path'),
'score': hit.score
            } for hit in results]

ifnot retrieved_docs:
return {
                answer: 抱歉,我没有找到与您问题相关的资料。您可以尝试换个关键词,或联系相关部门获取帮助。,
                sources: [],
                retrieved_docs: []
            }

# 2. 构建Prompt

if chat_history:
            prompt = build_conversational_prompt(question, retrieved_docs, chat_history)
else:
            prompt = PromptBuilder.build(question, retrieved_docs, question_type=auto)

# 3. 调用LLM生成回答

        response = self.llm_client.chat.completions.create(
            model=self.llm_model,
            messages=[{role: user, content: prompt}],
            temperature=0.3,  # 知识库问答用较低的temperature
            max_tokens=2000
        )

        answer = response.choices[0].message.content

# 4. 提取引用的来源

        sources = list(set([doc.get('context_path', '未知来源') for doc in retrieved_docs]))

return {
            answer: answer,
            sources: sources,
            retrieved_docs: retrieved_docs
        }

defevaluate_response(self, question: str, answer: str, 
                          ground_truth: str = None) -> Dict:

        回答质量评估(可选)
        用LLM评估回答的质量,方便持续优化

        eval_prompt = f请评估以下问答的质量。

问题:{question}

回答:{answer}

{f参考答案:{ground_truth} if ground_truth else }

请从以下维度评分(1-5分)并说明理由:

1. **相关性:回答是否切题**

2. **准确性:信息是否正确**

3. **完整性:是否完整解答了问题**

4. **可读性:表述是否清晰易懂**

请用JSON格式输出:{{relevance: 分数, accuracy: 分数, completeness: 分数, readability: 分数, comments: 评价说明}}

        response = self.llm_client.chat.completions.create(
            model=self.llm_model,
            messages=[{role: user, content: eval_prompt}],
            temperature=0
        )

try:
            eval_result = json.loads(response.choices[0].message.content)
return eval_result
except:
return {error: 评估结果解析失败}

# 使用示例

if __name__ == __main__:

# 初始化Pipeline

    rag = RAGPipeline(
        llm_base_url=https://api.deepseek.com,
        llm_api_key=your-api-key,
        llm_model=deepseek-chat
    )

# 准备测试文档

    test_documents = [
        {
            title: MySQL主从切换操作手册,
            content: """

 ## 1. 前置检查

 在执行主从切换之前,必须完成以下检查:

 - 确认从库同步状态正常(Seconds_Behind_Master = 0)
 - 确认没有正在执行的大事务
 - 通知相关业务方,确认切换时间窗口

 ## 2. 切换步骤

 ### 2.1 在主库执行只读设置

 SET GLOBAL read_only = 1;

 ### 2.2 等待从库完全同步

 在从库执行 SHOW SLAVE STATUS,确认 Seconds_Behind_Master = 0

 ### 2.3 停止从库复制并提升为主库

 STOP SLAVE;
 RESET SLAVE ALL;
 SET GLOBAL read_only = 0;

 ## 3. 切换后验证

 - 确认新主库可以正常写入
 - 确认应用连接已切换到新主库
 - 监控新主库的性能指标
   """,
               source: DBA团队文档
           }
       ]

# 入库

    rag.ingest_documents(test_documents)

# 测试查询

    result = rag.query(MySQL切换前需要做哪些检查?)

print(= * 50)
print(问题:MySQL切换前需要做哪些检查?)
print(= * 50)
print(f/n回答:/n{result['answer']})
print(f/n参考来源:{result['sources']})

六、上线后的一些经验教训


系统上线到现在差不多两个月了,期间又踩了不少坑,这里挑几个印象最深的说说。

教训一:用户的问题千奇百怪

我们在设计时假设用户会问MySQL怎么做主从切换这种正常问题。但实际上呢?

有人问:上次那个事故怎么处理的来着?——没有任何上下文,系统根本不知道那个事故是哪个。

有人问:帮我写个SQL。——这根本不是知识库问答,这是让大模型帮写代码。

还有人问:在吗?——我也不知道他想干啥。

后来我加了一个意图识别层,先判断用户的问题是否属于知识库问答的范畴:


defclassify_intent(self, query: str) -> str:
"""识别用户意图"""
    intent_prompt = f"""判断用户输入的意图类别,只输出类别名称:

- knowledge_query:查询知识库信息(如询问流程、规范、操作方法)
- code_request:请求生成代码
- chitchat:闲聊或无明确意图
- other:其他

用户输入:{query}

意图类别:"""

    response = self.llm_client.chat.completions.create(
        model=self.llm_model,
        messages=[{"role": "user", "content": intent_prompt}],
        temperature=0,
        max_tokens=20
    )

return response.choices[0].message.content.strip()

对于非知识库问答的意图,给用户一个友好的提示而不是硬着头皮检索。

教训二:冷启动时的尴尬

系统刚上线时,知识库里的文档不多,覆盖的场景有限。用户问了几个问题都答不上来,体验特别差,于是就不来用了。

后来的解决办法:

  1. 上线前先梳理高频问题,确保至少这些问题能回答好

  2. 搞了一个问题收集功能,对于答不上来的问题,记录下来反馈给内容团队,让他们补充相关文档

  3. 做了一个兜底策略——如果检索不到高相关度的内容,就展示相关推荐,把一些相似度尚可的文档标题列出来,引导用户自己去看

教训三:文档更新的同步问题

知识库的文档是会更新的。老版本的操作手册废弃了,新版本发布了。但如果向量数据库里还存着老版本的内容,用户检索到的可能是过时信息。

这个问题说起来简单,做起来挺麻烦的。我们最后的方案是:

  1. 每个文档入库时记录版本号和更新时间

  2. 定期全量重新入库(我们是每周一次)

  3. 对于紧急更新的重要文档,支持手动触发单篇重入库

七、性能优化:让系统不那么慢


RAG系统有个让人头疼的问题——慢。

整个流程跑一遍:Embedding编码、向量检索、重排序、LLM生成,全部加起来可能要好几秒。用户体验就很差,问一个问题要等半天。

image.png

几个优化措施:


import asyncio
from functools import lru_cache
import hashlib

classOptimizedRAG:
    性能优化版RAG

def__init__(self):

# 缓存热门查询的结果

        self.query_cache = {}
        self.cache_ttl = 3600# 1小时过期

    @lru_cache(maxsize=1000)
def_compute_query_embedding(self, query: str):

        Embedding结果缓存
        同样的问题不用重复计算向量

return self.model.encode([query], normalize_embeddings=True)[0]

def_get_cache_key(self, query: str) -> str:
        生成缓存key
return hashlib.md5(query.lower().strip().encode()).hexdigest()

asyncdefstream_query(self, question: str):

        流式输出
        不用等整个回答生成完,边生成边输出

        retrieved_docs = await asyncio.to_thread(
            self.retriever.retrieve_with_rerank,
            self.collection_name, question, 20, 5
        )

        prompt = PromptBuilder.build(question, retrieved_docs, question_type=auto)

# 使用流式API

        stream = self.llm_client.chat.completions.create(
            model=self.llm_model,
            messages=[{role: user, content: prompt}],
            temperature=0.3,
            stream=True# 开启流式
        )

for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content

流式输出这一点特别重要。用户问完问题后,马上就能看到回答在打字,心理上就不会觉得那么慢了。

八、回顾与思考


把这套系统从被骂下线到成为部门标配,前后折腾了将近一个月。趟过的坑挺多,但收获也很大。

几点核心总结:

  1. RAG不是万能的,选好适用场景

RAG适合有明确知识库、答案可追溯的场景。如果你的需求是让大模型发挥创造力(比如写文案、做创意),那RAG反而是个约束。

  1. 切分和检索是根基

大家往往把注意力放在大模型本身,觉得用更强的模型就能解决问题。但实际上,如果前面的切分和检索做得不好,再强的模型也是巧妇难为无米之炊。

  1. Prompt工程真的是门手艺

同样的检索结果,不同的Prompt可能带来天壤之别的回答效果。这个没什么捷径,就是多试、多看、多迭代。

  1. 上线只是开始

真正的挑战在上线之后。用户的各种奇葩输入、文档的持续更新、性能的优化、效果的监控……每一项都是持续的工作。

image.png

最后,附上这套系统目前的一些核心指标:

  • 日均查询量:200+次
  • 平均响应时间:2.3秒(开启流式后首字符延迟约0.8秒)
  • 用户满意度(通过回答后的点赞/点踩收集):约72%
  • 无法回答的比例:约22%(这部分会定期分析,推动补充文档)

数字不算特别亮眼,但比起之前那个没人用的关键词搜索,已经是质的飞跃了。如果你也在做类似的项目,希望这篇文章能帮你少踩一些坑。有问题欢迎评论区交流!

如何学习大模型 AI ?

由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。

但是具体到个人,只能说是:

“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。

这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。

我在一线科技企业深耕十二载,见证过太多因技术卡位而跃迁的案例。那些率先拥抱 AI 的同事,早已在效率与薪资上形成代际优势,我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在大模型的学习中的很多困惑。我们整理出这套 AI 大模型突围资料包

  • ✅ 从零到一的 AI 学习路径图
  • ✅ 大模型调优实战手册(附医疗/金融等大厂真实案例)
  • ✅ 百度/阿里专家闭门录播课
  • ✅ 大模型当下最新行业报告
  • ✅ 真实大厂面试真题
  • ✅ 2026 最新岗位需求图谱

所有资料 ⚡️ ,朋友们如果有需要 《AI大模型入门+进阶学习资源包》下方扫码获取~
在这里插入图片描述

① 全套AI大模型应用开发视频教程

(包含提示工程、RAG、LangChain、Agent、模型微调与部署、DeepSeek等技术点)
在这里插入图片描述

② 大模型系统化学习路线

作为学习AI大模型技术的新手,方向至关重要。 正确的学习路线可以为你节省时间,少走弯路;方向不对,努力白费。这里我给大家准备了一份最科学最系统的学习成长路线图和学习规划,带你从零基础入门到精通!
在这里插入图片描述

③ 大模型学习书籍&文档

学习AI大模型离不开书籍文档,我精选了一系列大模型技术的书籍和学习文档(电子版),它们由领域内的顶尖专家撰写,内容全面、深入、详尽,为你学习大模型提供坚实的理论基础。
在这里插入图片描述

④ AI大模型最新行业报告

2025最新行业报告,针对不同行业的现状、趋势、问题、机会等进行系统地调研和评估,以了解哪些行业更适合引入大模型的技术和应用,以及在哪些方面可以发挥大模型的优势。
在这里插入图片描述

⑤ 大模型项目实战&配套源码

学以致用,在项目实战中检验和巩固你所学到的知识,同时为你找工作就业和职业发展打下坚实的基础。
在这里插入图片描述

⑥ 大模型大厂面试真题

面试不仅是技术的较量,更需要充分的准备。在你已经掌握了大模型技术之后,就需要开始准备面试,我精心整理了一份大模型面试题库,涵盖当前面试中可能遇到的各种技术问题,让你在面试中游刃有余

图片

以上资料如何领取?

在这里插入图片描述

为什么大家都在学大模型?

最近科技巨头英特尔宣布裁员2万人,传统岗位不断缩减,但AI相关技术岗疯狂扩招,有3-5年经验,大厂薪资就能给到50K*20薪!

图片

不出1年,“有AI项目经验”将成为投递简历的门槛。

风口之下,与其像“温水煮青蛙”一样坐等被行业淘汰,不如先人一步,掌握AI大模型原理+应用技术+项目实操经验,“顺风”翻盘!
在这里插入图片描述
在这里插入图片描述

这些资料真的有用吗?

这份资料由我和鲁为民博士(北京清华大学学士和美国加州理工学院博士)共同整理,现任上海殷泊信息科技CEO,其创立的MoPaaS云平台获Forrester全球’强劲表现者’认证,服务航天科工、国家电网等1000+企业,以第一作者在IEEE Transactions发表论文50+篇,获NASA JPL火星探测系统强化学习专利等35项中美专利。本套AI大模型课程由清华大学-加州理工双料博士、吴文俊人工智能奖得主鲁为民教授领衔研发。

资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的技术人员,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。
在这里插入图片描述
在这里插入图片描述

以上全套大模型资料如何领取?

在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐