基于 LangChain 的 RAG(检索增强生成)系统从原理到实战
目录
2.2 在线问答阶段(Retrieval & Generation)
5.5 离线建库 __001__store_data_faiss.py
5.6 在线问答 __002__online_question_qa.py
一、什么是 RAG
1.1 RAG 的定义与核心思想
RAG(Retrieval-Augmented Generation),中文译为"检索增强生成",由 Facebook AI Research(现 Meta AI)的研究人员 Patrick Lewis 等人在 2020 年的论文《Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks》中首次提出。
其核心思想可以用一句话概括:
先查资料,再回答问题。
具体来说,大语言模型在回答用户问题时,不再单纯依赖自身参数中"记住"的知识,而是会先从一个外部知识库中检索(Retrieve) 与问题最相关的文档片段,然后将这些片段作为上下文,与用户的问题一起输入(Augment) 给大语言模型,最终由模型生成(Generate) 有据可依的答案。
LangChain 官方文档对此的描述也十分明确:
Retrieval is the process of accessing and integrating relevant information from external sources at runtime. RAG builds on this by combining retrieval with generation to produce more grounded and informed responses.
1.2 RAG 的诞生背景
RAG 的出现并非偶然,它解决的是大语言模型固有的架构性缺陷:
| 问题 | 描述 | 具体表现 |
|---|---|---|
| 知识截断 | 模型的训练数据有时间截止点,无法获取最新信息 | 问"今天汇率多少?"模型无法回答 |
| 幻觉(Hallucination) | 模型在缺乏相关知识时,倾向于"编造"看似合理的内容 | 编造不存在的法规条文、虚构统计数据 |
| 私域知识缺失 | 模型没有企业内部文档、行业规范等私有数据 | 无法回答公司内部流程、产品参数等 |
| 无法溯源 | 用户无法验证模型回答的依据来源 | 不知道答案来自哪份文档 |
在 RAG 之前,业界主要通过两种方式解决这些问题:
- 微调(Fine-tuning):用领域数据对模型进行二次训练。成本高、周期长,且每次知识更新都需要重新训练。
- 长上下文窗口:将所有文档塞进 Prompt。受限于模型上下文长度(如 4K、8K、128K tokens),无法处理海量文档。
RAG 提供了第三条路:不在模型参数里存储知识,而是在推理时动态检索知识。 这种方式既不需要重新训练模型,也不受上下文窗口限制,且知识可以随时更新。
1.3 为什么需要 RAG
总结来说,RAG 的核心价值在于:
- 1.降低幻觉率:模型回答基于检索到的真实文档,而非凭空生成。
- 2.知识可更新:只需更新向量数据库中的文档,无需重新训练模型。
- 3.可溯源性:每段回答都可以追溯到具体的源文档片段,便于验证。
- 4.成本低:相比微调,RAG 的部署和维护成本更低。
- 5.私域知识接入:可以将企业内部的 SOP、文档、规则等无缝接入模型。
1.4 RAG 的典型应用场景
以物流行业为例,用户可能会问:
- "我的快递为什么一直停留在'干线运输'?"
- "什么是异常签收?"
- "冷链运输的温控标准是什么?"
- "仓配一体和普通快递有什么区别?"
这些问题的答案往往依赖于企业内部的 SOP 文档、物流术语手册、运单规则说明、售后处理流程等资料。单靠模型本身,并不能保证回答准确。RAG 的价值就在于:把企业自己的知识库接进来,让模型"看着资料回答"。
其他典型场景还包括:
- 智能客服:基于产品手册、FAQ 自动回答用户问题
- 法律助手:基于法规条文、判例库辅助法律检索
- 医疗问诊:基于医学指南、药品说明书提供参考建议
- 代码助手:基于项目文档、API 文档辅助开发
二、RAG 的基本流程
一个完整的 RAG 系统分为两大阶段:离线建库和在线问答。
2.1 离线建库阶段(Indexing)
离线建库是 RAG 的"准备工作",目标是将原始文档处理成可检索的向量知识库。
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 读取文档 │───▶│ 文本切分 │───▶│ 向量化 │───▶│ 存入向量数据库 │
│ (Load) │ │ (Split) │ │ (Embed) │ │ (Store) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
各步骤说明:
| 步骤 | 说明 | 关键考量 |
|---|---|---|
| 读取文档 | 从文件系统加载 PDF、TXT、Markdown 等格式的文档 | 需要处理不同格式的解析器 |
| 文本切分 | 将长文档拆分为较小的文本块(Chunk) | chunk_size 和 chunk_overlap 的设置直接影响检索质量 |
| 向量化 | 使用 Embedding 模型将文本块转换为高维向量 | 向量维度、模型选择影响语义表征能力 |
| 存储 | 将向量及其对应的原始文本存入向量数据库 | 选择合适的向量数据库(FAISS、Chroma、Milvus 等) |
2.2 在线问答阶段(Retrieval & Generation)
在线问答是 RAG 的"运行时",在用户提问时触发。
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 用户提问 │───▶│ 问题向量化 │───▶│ 相似度检索 │───▶│ 拼接 Prompt │───▶│ 大模型生成 │
│ (Query) │ │ (Embed) │ │ (Retrieve) │ │ (Augment) │ │ (Generate) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
各步骤说明:
| 步骤 | 说明 |
|---|---|
| 用户输入问题 | 用户以自然语言提出问题 |
| 问题向量化 | 使用与建库阶段相同的 Embedding 模型,将问题转换为向量 |
| 相似度检索 | 在向量数据库中找到与问题向量最相似的 Top-K 个文档片段 |
| 拼接 Prompt | 将检索到的文档片段作为上下文,与用户问题组合成完整的 Prompt |
| 大模型生成 | 将 Prompt 发送给大语言模型,生成最终答案 |
2.3 流程图解
下面用文字描述一个完整的 RAG 数据流:
离线阶段 在线阶段
┌──────────────────┐ ┌──────────────────────────┐
│ │ │ │
物流文档.pdf ──▶│ 文档加载 │ │ 用户提问 │
SOP.txt ──▶│ (DocumentLoader) │ │ "冷链运输温控标准是什么?" │
规章手册.md ──▶│ │ │ │
└────────┬─────────┘ └────────────┬─────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────────────┐
│ 文本切分 │ │ 问题 Embedding │
│ (TextSplitter) │ │ (同一Embedding模型) │
│ chunk_size=200 │ │ │
└────────┬─────────┘ └────────────┬─────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────────────┐
│ 向量化 │ │ 向量相似度检索 │
│ (Embedding) │ │ Top-K = 3 │
└────────┬─────────┘ └────────────┬─────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────────────┐
│ 存入 FAISS │ │ Prompt 组装 │
│ 向量数据库 │ │ 问题 + 检索到的上下文 │
└──────────────────┘ └────────────┬─────────────┘
│
▼
┌──────────────────────────┐
│ 大模型生成答案 │
│ (DeepSeek / GPT 等) │
└──────────────────────────┘
三、RAG 核心组件详解
LangChain 将 RAG 的基础构件总结为以下几大模块化组件,每个组件都可以独立替换:
3.1 文档加载器(Document Loader)
作用:将不同格式的原始文件解析为统一的 Document 对象。
LangChain 提供了丰富的文档加载器:
| 加载器 | 支持格式 | 说明 |
|---|---|---|
TextLoader |
.txt, .md |
纯文本加载,最基础的加载器 |
PyPDFLoader |
.pdf |
PDF 文档加载,基于 pypdf 库 |
CSVLoader |
.csv |
CSV 表格加载 |
UnstructuredHTMLLoader |
.html |
HTML 页面加载 |
DirectoryLoader |
文件夹 | 批量加载目录下所有文件 |
WebBaseLoader |
URL | 网页内容抓取 |
一个 Document 对象包含两个核心字段:
Document(
page_content="文档的文本内容...", # 文本内容
metadata={"source": "文件路径"} # 元数据(来源、页码等)
)
3.2 文本分割器(Text Splitter)
作用:将长文档拆分为较小的文本块(Chunk),以便后续向量化和检索。
为什么要切分?
- Embedding 模型通常有最大 token 限制(如 8192 tokens)
- 过长的文本会导致语义稀释,降低向量表征的精度
- 检索时需要精确匹配,小块文本更容易命中相关内容
LangChain 提供了多种分割策略:
| 分割器 | 策略 | 适用场景 |
|---|---|---|
RecursiveCharacterTextSplitter |
按字符递归分割(优先在 \n\n、\n、空格处断开) |
最通用,推荐默认使用 |
CharacterTextSplitter |
按指定字符分割 | 简单场景 |
MarkdownHeaderTextSplitter |
按 Markdown 标题层级分割 | Markdown 文档 |
TokenTextSplitter |
按 Token 数量分割 | 需要精确 token 控制的场景 |
关键参数说明:
chunk_size:每个文本块的最大字符数。设置过大会导致语义稀释,设置过小会丢失上下文。一般建议 200-1000。chunk_overlap:相邻文本块之间的重叠字符数。重叠可以保证切分边界处的语义连贯性。一般建议为 chunk_size 的 10%-20%。
3.3 向量嵌入模型(Embedding Model)
作用:将文本转换为高维数值向量(Embedding),使得语义相近的文本在向量空间中距离更近。
工作原理简述:
"冷链运输温度标准" ──▶ Embedding Model ──▶ [0.023, -0.156, 0.891, ..., 0.334] (1024维)
"冷鲜配送温控要求" ──▶ Embedding Model ──▶ [0.025, -0.148, 0.887, ..., 0.329] (1024维)
"退货退款流程说明" ──▶ Embedding Model ──▶ [-0.712, 0.445, -0.201, ..., 0.087] (1024维)
前两个句子语义相近,它们的向量在空间中距离更近;第三个句子语义不同,向量距离更远。
常见的 Embedding 模型对比:
| 模型 | 来源 | 维度 | 特点 |
|---|---|---|---|
text-embedding-v4 |
阿里百炼 | 1024 | 国内访问快,中文效果好 |
text-embedding-3-small |
OpenAI | 1536 | 性价比高 |
text-embedding-3-large |
OpenAI | 3072 | 精度最高 |
bge-large-zh-v1.5 |
BAAI(智源) | 1024 | 开源中文模型,可本地部署 |
重要提示:离线建库和在线检索时,必须使用同一个 Embedding 模型,否则向量空间不一致,检索结果将毫无意义。
3.4 向量数据库(Vector Store)
作用:存储文档向量,并提供高效的相似度检索能力。
向量数据库的核心能力是近似最近邻搜索(ANN, Approximate Nearest Neighbor),可以在百万甚至亿级向量中快速找到与查询向量最相似的 Top-K 个结果。
常用向量数据库对比:
| 数据库 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| FAISS | 内存库(可持久化) | Facebook 开源,速度快,纯 Python 接口 | 中小规模数据,快速原型 |
| Chroma | 内存/嵌入式 | 轻量级,API 简洁 | 快速开发、小规模应用 |
| Milvus | 分布式 | 支持大规模数据,云原生架构 | 生产环境、大规模数据 |
| Pinecone | 云服务 | 全托管,无需运维 | 不想自己管理基础设施 |
| Weaviate | 混合检索 | 支持向量+关键词混合检索 | 需要混合检索的场景 |
相似度计算的常见方法:
| 方法 | 原理 | 距离含义 |
|---|---|---|
| 余弦相似度(Cosine Similarity) | 计算两个向量夹角的余弦值 | 值越大(接近1)越相似 |
| 欧氏距离(L2 Distance) | 计算两个向量在空间中的直线距离 | 值越小越相似 |
| 内积(Inner Product) | 计算两个向量的点积 | 值越大越相似(向量已归一化时等价于余弦相似度) |
3.5 检索器(Retriever)
作用:根据用户查询,从向量数据库中返回最相关的文档片段。
LangChain 中的 Retriever 是一个统一接口,底层可以对接不同的向量数据库和检索策略:
# 基础用法
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# 带相似度阈值
retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.5, "k": 5}
)
进阶检索策略:
| 策略 | 说明 |
|---|---|
| Similarity Search | 基础的向量相似度检索 |
| MMR(Maximum Marginal Relevance) | 在保证相关性的同时,增加结果的多样性,避免返回过于相似的重复内容 |
| Multi-Query | 将用户问题改写为多个不同表述,分别检索后合并结果,提高召回率 |
| Hybrid Search | 结合向量检索和关键词检索(BM25),兼顾语义理解和精确匹配 |
3.6 大语言模型(LLM)
作用:接收"问题 + 检索上下文",生成最终的自然语言答案。
在 RAG 架构中,LLM 的角色是"基于给定上下文的生成器",而非"知识的存储者"。Prompt 的设计至关重要,通常包含:
- System Prompt:设定角色和约束(如"只基于给定的上下文回答")
- User Prompt:包含用户的问题和检索到的上下文片段
四、LangChain 实现 RAG 的模块化代码
4.0 环境准备与依赖安装
首先安装所有必要的 Python 依赖:
# 核心框架
pip install langchain langchain-core langchain-community langchain-openai langchain-text-splitters
# 文档加载
pip install pypdf
# 向量数据库
pip install faiss-cpu
# 环境变量管理
pip install python-dotenv
注意:如果需要 GPU 加速 FAISS,可以安装
faiss-gpu替代faiss-cpu,但需要配置 CUDA 环境。
4.1 配置模块 config.py
配置模块负责加载 .env 环境变量,将敏感的 API Key 和配置项与代码分离,这是工程化的基本实践。
import os
from dotenv import load_dotenv
def load_project_env() -> None:
"""
Load .env from repo root (same style as other demos in this project).
"""
repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
env_path = os.path.join(repo_root, ".env")
load_dotenv(env_path)
设计要点:
- 使用
python-dotenv的load_dotenv自动读取.env文件 - 通过
os.path.abspath(__file__)定位项目根目录,避免硬编码路径 - 在项目根目录创建
.env文件存放所有配置项
4.2 文档加载模块 load_documents.py
该模块负责从指定路径加载文档,支持递归遍历文件夹,自动根据文件后缀选择合适的加载器。
from __future__ import annotations
import os
from typing import List
from langchain_core.documents import Document
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders import PyPDFLoader
def load_documents(path: str) -> List[Document]:
"""
RAG API 1/6:文档加载
行为说明:
- 只接收一个路径字符串(可以是文件路径或文件夹路径)
- 如果是文件夹:递归遍历并加载其中支持的文档
- 自动根据后缀判断:`.txt` / `.md` / `.pdf`
参数:
path: 文件或文件夹的路径
返回:
List[Document]: 加载后的文档列表
"""
docs: List[Document] = []
# 收集所有待加载的文件路径
files: list[str] = []
if os.path.isdir(path):
for root, _dirs, filenames in os.walk(path):
for name in filenames:
files.append(os.path.join(root, name))
else:
files = [path]
# 根据后缀选择加载器
for fp in files:
suf = os.path.splitext(fp)[1].lower()
if suf in {".txt", ".md"}:
docs.extend(TextLoader(fp, encoding="utf-8").load())
elif suf == ".pdf":
docs.extend(PyPDFLoader(fp).load())
else:
print(f"[WARN] 不支持的文件格式,已跳过: {fp}")
print(f"[INFO] 共加载 {len(docs)} 个文档片段,来源 {len(files)} 个文件")
return docs
if __name__ == "__main__":
repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(repo_root, "data")
docs = load_documents(data_path)
for doc in docs:
print(doc.page_content[:200]) # 只打印前200字符
print("-" * 100)
代码解读:
os.walk(path)递归遍历目录,收集所有文件路径- 根据文件后缀名选择对应的 Loader:
.txt/.md使用TextLoader,.pdf使用PyPDFLoader - 加载后的每个
Document对象包含page_content(文本内容)和metadata(元数据,如来源文件路径、页码等)
4.3 文本切分模块 split_text.py
from __future__ import annotations
import os
import sys
from typing import Iterable, List
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
def split_documents(
docs: Iterable[Document],
chunk_size: int = 100,
chunk_overlap: int = 20
) -> List[Document]:
"""
RAG API 2/6: Text splitting
使用 RecursiveCharacterTextSplitter 将文档切分为更小的文本块。
该分割器会优先在段落、换行、句号等自然断点处分割,保持语义完整性。
参数:
docs: 待切分的文档列表
chunk_size: 每个文本块的最大字符数(默认 100)
chunk_overlap: 相邻文本块的重叠字符数(默认 20)
返回:
List[Document]: 切分后的文档块列表
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
add_start_index=True, # 在 metadata 中记录文本块在原文中的起始位置
separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""], # 中文友好的分割符
)
chunks = splitter.split_documents(list(docs))
print(f"[INFO] 切分完成:{len(list(docs))} 个文档 → {len(chunks)} 个文本块")
return chunks
if __name__ == "__main__":
from __005__langchain_rag_components.__001__load_documents import load_documents
repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(repo_root, "data")
docs = load_documents(data_path)
chunks = split_documents(docs, chunk_size=100, chunk_overlap=30)
for i, chunk in enumerate(chunks):
print(f"--- Chunk {i+1} (length={len(chunk.page_content)}) ---")
print(chunk.page_content)
print()
参数调优建议:
| 场景 | chunk_size | chunk_overlap | 说明 |
|---|---|---|---|
| 短 FAQ / 客服问答 | 100-300 | 20-50 | 问答对本身较短 |
| 技术文档 / SOP | 300-600 | 50-100 | 操作步骤需要上下文连贯 |
| 长篇报告 / 论文 | 500-1000 | 80-150 | 需要保留足够的上下文信息 |
4.4 向量嵌入模块 embeddings.py
本项目使用阿里百炼平台(DashScope)的 text-embedding-v4 模型,通过 OpenAI 兼容接口调用。
百炼平台申请 API Key 的地址:https://bailian.console.aliyun.com
.env 配置:
BAILIAN_API_KEY=sk-your-bailian-api-key
BAILIAN_EMBEDDINGS_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
BAILIAN_EMBEDDINGS_MODEL=text-embedding-v4
代码实现:
from __future__ import annotations
import os
from langchain_openai import OpenAIEmbeddings
from __005__langchain_rag_components.__000__config import load_project_env
load_project_env()
def build_embeddings() -> OpenAIEmbeddings:
"""
RAG API 3/6: Vectorization (embeddings)
使用阿里百炼平台的 text-embedding-v4 模型。
通过 DashScope 的 OpenAI 兼容接口调用,因此使用 langchain_openai 的 OpenAIEmbeddings。
返回:
OpenAIEmbeddings: 配置好的 Embedding 模型实例
"""
return OpenAIEmbeddings(
api_key=os.getenv("BAILIAN_API_KEY"),
base_url=os.getenv("BAILIAN_EMBEDDINGS_BASE_URL"),
model=os.getenv("BAILIAN_EMBEDDINGS_MODEL"),
chunk_size=10, # DashScope 单次 embed 批大小上限为 10
tiktoken_enabled=False, # DashScope 兼容模式下禁用 tiktoken 分词
check_embedding_ctx_length=False, # 跳过本地 token 长度预检查,交给服务端校验
)
if __name__ == "__main__":
embeddings = build_embeddings()
# 单条文本向量化测试
v1 = embeddings.embed_query("你好,我是小明,很高兴认识你。")
print(f"单条向量维度: {len(v1)}")
print(f"向量前5维: {v1[:5]}")
print()
# 多条文本向量化测试
texts = ["你好,我是小明。", "今天天气不错。", "退货流程怎么走?"]
vs = embeddings.embed_documents(texts)
print(f"文档数量: {len(vs)}")
print(f"每条向量维度: {len(vs[0])}")
# 简单的相似度验证
import numpy as np
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
sim_01 = cosine_similarity(vs[0], vs[1]) # "我是小明" vs "天气不错"
sim_02 = cosine_similarity(vs[0], vs[2]) # "我是小明" vs "退货流程"
print(f"\n'我是小明' 与 '天气不错' 的余弦相似度: {sim_01:.4f}")
print(f"'我是小明' 与 '退货流程' 的余弦相似度: {sim_02:.4f}")
为什么使用 OpenAIEmbeddings?
阿里百炼的 DashScope 提供了 OpenAI 兼容接口(/compatible-mode/v1),因此可以直接复用 LangChain 的 OpenAIEmbeddings 类,只需将 base_url 指向 DashScope 的端点即可。这是一种常见的"适配器模式"——不需要为每个模型提供商都编写专用的客户端。
关键参数解释:
| 参数 | 值 | 说明 |
|---|---|---|
chunk_size |
10 | DashScope 的 embed 接口单次最多处理 10 条文本,超过会报错 |
tiktoken_enabled |
False | 禁用 OpenAI 的 tiktoken 分词器,因为 DashScope 使用自己的 tokenizer |
check_embedding_ctx_length |
False | 禁用本地 token 长度预检查,避免因 tokenizer 不匹配导致误判 |
4.5 向量存储模块 vector_store.py
本项目使用 FAISS(Facebook AI Similarity Search) 作为向量数据库。FAISS 是 Meta 开源的高效相似度搜索库,支持 CPU 和 GPU 加速。
安装:
pip install faiss-cpu # CPU 版本(推荐入门使用)
# pip install faiss-gpu # GPU 版本(需要 CUDA 环境)
完整代码实现:
from __future__ import annotations
from pathlib import Path
from typing import Iterable, Optional, Sequence
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
def build_faiss(
embeddings: Embeddings,
docs: Optional[Iterable[Document]] = None,
texts: Optional[Sequence[str]] = None,
persist_dir: str = "__005__langchain_rag_components/_faiss_index",
normalize_L2: bool = True,
) -> FAISS:
"""
RAG API 4/6: 向量存储
将文档或文本向量化后存入 FAISS 向量数据库,并持久化到本地目录。
参数:
embeddings: Embedding 模型实例
docs: Document 对象列表(与 texts 二选一)
texts: 纯文本列表(与 docs 二选一)
persist_dir: FAISS 索引的本地存储路径
normalize_L2: 是否对向量做 L2 归一化(推荐开启,使 L2 距离等价于余弦距离)
返回:
FAISS: 构建好的 FAISS 向量数据库实例
"""
if not docs and not texts:
raise ValueError("`docs` 和 `texts` 至少需要提供一个。")
if texts is not None:
vs = FAISS.from_texts(list(texts), embeddings, normalize_L2=normalize_L2)
else:
vs = FAISS.from_documents(list(docs), embeddings, normalize_L2=normalize_L2)
# 持久化保存到本地
vs.save_local(persist_dir)
print(f"[INFO] FAISS 索引已保存到: {persist_dir},共 {vs.index.ntotal} 条向量")
return vs
def load_faiss(persist_dir: str, embeddings: Embeddings) -> FAISS:
"""
从本地加载已保存的 FAISS 索引。
参数:
persist_dir: FAISS 索引的本地存储路径
embeddings: Embedding 模型实例(必须与建库时使用的模型一致)
返回:
FAISS: 加载后的 FAISS 向量数据库实例
"""
if not Path(persist_dir).exists():
raise FileNotFoundError(f"FAISS 索引目录不存在:{persist_dir}")
vs = FAISS.load_local(
persist_dir,
embeddings,
allow_dangerous_deserialization=True, # FAISS 使用 pickle 序列化,需显式授权
)
print(f"[INFO] FAISS 索引已加载,共 {vs.index.ntotal} 条向量")
return vs
def similarity_search(
vs: FAISS,
query: str,
k: int = 2,
score_threshold: float = None
):
"""
相似度检索。
参数:
vs: FAISS 向量数据库实例
query: 用户查询文本
k: 返回的最相似文档数量
score_threshold: 相似度阈值(L2 距离模式下,距离越小越相似)
返回:
List[Tuple[Document, float]]: (文档, 距离分数) 的列表
"""
results = vs.similarity_search_with_score(query, k=k, score_threshold=score_threshold)
return results
if __name__ == "__main__":
from __005__langchain_rag_components.__000__config import load_project_env
from __005__langchain_rag_components.__003__embeddings import build_embeddings
load_project_env()
embeddings_model = build_embeddings()
persist_dir = "_faiss_index_test"
texts = [
"配送一般几天能到?",
"售后赔付规则是什么?",
"如何申请退货退款?",
"我想查询物流进度。",
]
# 构建并保存
vs = build_faiss(
texts=texts,
persist_dir=persist_dir,
embeddings=embeddings_model
)
print(f"Build OK. ntotal={vs.index.ntotal}")
# 加载
vs2 = load_faiss(persist_dir, embeddings_model)
print(f"Load OK. ntotal={vs2.index.ntotal}")
# 检索
query = "如何申请退货退款?"
hits = similarity_search(vs2, query, k=3)
print(f"\n查询: {query}")
print(f"结果:")
for i, (hit, score) in enumerate(hits):
print(f" [{i+1}] {hit.page_content} (L2距离: {score:.4f})")
allow_dangerous_deserialization=True 说明:
FAISS 的 load_local 方法底层使用 Python 的 pickle 模块进行序列化/反序列化。pickle 在反序列化不可信数据时存在安全风险(可能执行任意代码)。LangChain 因此要求显式传入 allow_dangerous_deserialization=True 来确认用户知晓该风险。
在以下场景中这是安全的:
- 加载自己构建并保存的索引文件
- 加载来自可信来源的索引文件
在以下场景中需要警惕:
- 加载来自不明第三方的索引文件
normalize_L2 参数的作用:
当 normalize_L2=True 时,FAISS 会将所有向量做 L2 归一化(即缩放到单位球面上)。归一化后,L2 欧氏距离与余弦相似度之间存在单调关系:L2_distance² = 2 × (1 - cosine_similarity)。这意味着用 L2 距离排序的结果与用余弦相似度排序的结果是一致的,同时 L2 距离的计算效率更高。
4.6 大模型问答模块 rag_answer.py
from __future__ import annotations
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from typing import Sequence
import os
from __005__langchain_rag_components.__000__config import load_project_env
load_project_env()
llm = ChatOpenAI(
api_key=os.getenv("MODEL_API_KEY"),
base_url=os.getenv("MODEL_BASE_URL"),
model=os.getenv("MODEL_NAME"),
)
def answer_with_llm(
question: str,
docs: Sequence[Document],
) -> str:
"""
RAG API 5/6: 调用大模型生成回答(使用检索到的 docs 作为上下文)。
参数:
question: 用户的问题
docs: 检索到的相关文档列表
返回:
str: 大模型生成的回答
"""
# 将检索到的文档拼接为上下文字符串,带编号方便溯源引用
context = "\n\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs))
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"你是一个严谨的中文问答助手。"
"只基于给定的上下文回答问题。"
"如果上下文不足以回答问题,请明确说明"根据已检索到的资料无法确定"。"
"回答时请引用上下文的编号(如 [1]、[2])作为依据。",
),
(
"user",
"问题:{question}\n\n"
"上下文:\n{context}\n\n"
"请给出答案:"
),
]
)
msg = (prompt | llm).invoke({"question": question, "context": context})
return msg.content
def answer_with_llm_stream(
question: str,
docs: Sequence[Document],
):
"""
RAG API 5/6(流式版本): 以流式方式调用大模型生成回答。
适合需要实时输出的交互场景(如 Web 界面),减少用户等待感。
"""
context = "\n\n".join(f"[{i+1}] {d.page_content}" for i, d in enumerate(docs))
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"你是一个严谨的中文问答助手。"
"只基于给定的上下文回答问题。"
"如果上下文不足以回答问题,请明确说明"根据已检索到的资料无法确定"。"
"回答时请引用上下文的编号(如 [1]、[2])作为依据。",
),
(
"user",
"问题:{question}\n\n"
"上下文:\n{context}\n\n"
"请给出答案:"
),
]
)
chain = prompt | llm
for chunk in chain.stream({"question": question, "context": context}):
yield chunk.content
if __name__ == "__main__":
demo_docs = [
Document(page_content="RAG 是 Retrieval-Augmented Generation,先检索相关资料再生成回答。"),
Document(page_content="它可以降低幻觉,让回答更贴近你提供的知识库。"),
]
print("=== 同步调用 ===")
result = answer_with_llm("用三句话解释什么是 RAG。", demo_docs)
print(result)
print("\n=== 流式调用 ===")
for token in answer_with_llm_stream("用三句话解释什么是 RAG。", demo_docs):
print(token, end="", flush=True)
print()
Prompt 设计要点:
- 1.System Prompt 中的约束:明确要求"只基于给定的上下文回答",这是防止幻觉的关键指令。如果上下文不足以回答,模型应当如实说明,而非编造答案。
- 2.编号引用:要求模型引用
[1]、[2]等编号,增强回答的可溯源性。 - 3.上下文格式化:每个文档片段前加编号,方便模型引用。
五、完整 RAG 项目实战
5.1 项目结构总览
将上述模块组装成一个完整的物流知识库 RAG 问答系统,项目结构如下:
rag_project/
├── .env # 环境变量(API Key 等)
├── data/ # 原始文档目录
│ ├── logistics_faq.txt # 物流常见问题
│ ├── cold_chain_rules.md # 冷链运输规范
│ └── return_policy.pdf # 退货政策文档
├── _faiss_index/ # FAISS 向量索引持久化目录
│ ├── index.faiss
│ └── index.pkl
├── config.py # 配置加载
├── common.py # 公共模型实例(Embedding + LLM)
├── __001__store_data_faiss.py # 离线:文档加载 → 切分 → 向量化 → 存储
└── __002__online_question_qa.py # 在线:用户提问 → 检索 → 生成答案
5.2 环境变量配置 .env
# ===== 大语言模型配置(DeepSeek)=====
MODEL_API_KEY=sk-your-deepseek-api-key
MODEL_BASE_URL=https://api.deepseek.com
MODEL_NAME=deepseek-chat
# ===== Embedding 模型配置(阿里百炼)=====
BAILIAN_API_KEY=sk-your-bailian-api-key
BAILIAN_EMBEDDINGS_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
BAILIAN_EMBEDDINGS_MODEL=text-embedding-v4
安全提示:
.env文件包含敏感的 API Key,务必将其添加到.gitignore中,不要提交到版本控制系统。
5.3 配置类 config.py
import os
from dotenv import load_dotenv
def load_project_env() -> None:
"""
Load .env from repo root (same style as other demos in this project).
"""
repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
env_path = os.path.join(repo_root, ".env")
load_dotenv(env_path)
# 项目启动时自动加载环境变量
load_project_env()
class Config:
"""
统一配置类,集中管理所有环境变量。
"""
def __init__(self):
# 百炼 Embedding 模型
self.BAILIAN_API_KEY = os.getenv("BAILIAN_API_KEY")
self.BAILIAN_EMBEDDINGS_BASE_URL = os.getenv("BAILIAN_EMBEDDINGS_BASE_URL")
self.BAILIAN_EMBEDDINGS_MODEL = os.getenv("BAILIAN_EMBEDDINGS_MODEL")
# 大语言模型
self.MODEL_API_KEY = os.getenv("MODEL_API_KEY")
self.MODEL_BASE_URL = os.getenv("MODEL_BASE_URL")
self.MODEL_NAME = os.getenv("MODEL_NAME")
def validate(self):
"""校验必要配置项是否存在。"""
required = [
"BAILIAN_API_KEY", "BAILIAN_EMBEDDINGS_BASE_URL", "BAILIAN_EMBEDDINGS_MODEL",
"MODEL_API_KEY", "MODEL_BASE_URL", "MODEL_NAME",
]
missing = [k for k in required if not getattr(self, k)]
if missing:
raise EnvironmentError(f"缺少必要的环境变量: {', '.join(missing)}")
5.4 公共模块 common.py
将 Embedding 模型和 LLM 模型的实例化集中管理,避免在多个文件中重复创建:
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from config import Config
conf = Config()
def build_embeddings_model() -> OpenAIEmbeddings:
"""
创建 Embedding 模型实例。
使用阿里百炼的 text-embedding-v4,通过 DashScope 的 OpenAI 兼容接口调用。
"""
return OpenAIEmbeddings(
api_key=conf.BAILIAN_API_KEY,
base_url=conf.BAILIAN_EMBEDDINGS_BASE_URL,
model=conf.BAILIAN_EMBEDDINGS_MODEL,
chunk_size=10, # DashScope 兼容接口单次批大小上限
tiktoken_enabled=False, # DashScope 兼容模式:禁用 tiktoken
check_embedding_ctx_length=False, # 跳过本地 token 长度预检查
)
# 全局单例:Embedding 模型
my_embeddings_model = build_embeddings_model()
# 全局单例:大语言模型
my_llm = ChatOpenAI(
api_key=conf.MODEL_API_KEY,
base_url=conf.MODEL_BASE_URL,
model=conf.MODEL_NAME,
)
为什么使用全局单例?
Embedding 模型和 LLM 的实例化会初始化 HTTP 客户端等资源。如果每次调用都重新创建,会产生不必要的开销。将其设为模块级别的全局变量,利用 Python 模块的单例特性(import 只执行一次),可以确保整个应用共享同一实例。
5.5 离线建库 __001__store_data_faiss.py
"""
离线建库模块
功能:加载原始文档 → 文本切分 → 向量化 → 存入 FAISS 向量数据库
"""
import os
from typing import Iterable, List, Optional, Sequence
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from common import my_embeddings_model
# ============================
# Step 1: 文档加载
# ============================
def load_documents(path: str) -> List[Document]:
"""
递归加载指定路径下的所有文档。
支持 .txt、.md、.pdf 三种格式。
"""
docs: List[Document] = []
# 收集文件路径
files: List[str] = []
if os.path.isdir(path):
for root, _dirs, filenames in os.walk(path):
for name in filenames:
files.append(os.path.join(root, name))
else:
files = [path]
# 按后缀加载
for fp in files:
ext = os.path.splitext(fp)[1].lower()
if ext in {".txt", ".md"}:
docs.extend(TextLoader(fp, encoding="utf-8").load())
elif ext == ".pdf":
docs.extend(PyPDFLoader(fp).load())
else:
print(f"[WARN] 跳过不支持的文件: {fp}")
print(f"[INFO] 加载完成: {len(files)} 个文件 → {len(docs)} 个文档片段")
return docs
# ============================
# Step 2: 文本切分
# ============================
def split_documents(
documents: List[Document],
chunk_size: int = 200,
chunk_overlap: int = 20
) -> List[Document]:
"""
将文档切分为更小的文本块。
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
add_start_index=True,
)
chunks = splitter.split_documents(documents)
print(f"[INFO] 切分完成: {len(documents)} 个文档 → {len(chunks)} 个文本块")
return chunks
# ============================
# Step 3: 向量化并存入 FAISS
# ============================
def build_faiss(
embeddings_model: Embeddings,
docs: Optional[Iterable[Document]] = None,
texts: Optional[Sequence[str]] = None,
persist_dir: str = "_faiss_index",
normalize_L2: bool = True,
) -> FAISS:
"""
将文档或文本向量化后存入 FAISS 数据库,并保存到本地。
"""
if not docs and not texts:
raise ValueError("`docs` 和 `texts` 至少需要提供一个。")
if texts is not None:
vs = FAISS.from_texts(list(texts), embeddings_model, normalize_L2=normalize_L2)
else:
vs = FAISS.from_documents(list(docs), embeddings_model, normalize_L2=normalize_L2)
vs.save_local(persist_dir)
print(f"[INFO] FAISS 索引已保存: {persist_dir},共 {vs.index.ntotal} 条向量")
return vs
# ============================
# 主流程:组装以上三步
# ============================
def store_data_main(data_dir: str = "data", persist_dir: str = "_faiss_index"):
"""
离线建库主函数。
参数:
data_dir: 原始文档所在目录
persist_dir: FAISS 索引保存目录
"""
print("=" * 50)
print("开始离线建库...")
print("=" * 50)
# Step 1: 加载文档
document_list = load_documents(data_dir)
if not document_list:
print("[ERROR] 未找到任何文档,请检查 data 目录")
return
# Step 2: 切分文档
chunk_list = split_documents(document_list, chunk_size=200, chunk_overlap=20)
# Step 3: 向量化并存储
build_faiss(my_embeddings_model, docs=chunk_list, persist_dir=persist_dir)
print("=" * 50)
print("离线建库完成!")
print("=" * 50)
if __name__ == "__main__":
store_data_main()
5.6 在线问答 __002__online_question_qa.py
"""
在线问答模块
功能:加载 FAISS 索引 → 用户提问 → 向量检索 → 大模型生成答案
"""
from pathlib import Path
from typing import Sequence
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.prompts import ChatPromptTemplate
from common import my_llm, my_embeddings_model
# ============================
# Step 1: 加载 FAISS 索引
# ============================
def load_faiss(persist_dir: str, embeddings_model: Embeddings) -> FAISS:
"""从本地加载 FAISS 向量数据库。"""
if not Path(persist_dir).exists():
raise FileNotFoundError(f"FAISS 索引目录不存在:{persist_dir},请先运行离线建库。")
vs = FAISS.load_local(
persist_dir,
embeddings_model,
allow_dangerous_deserialization=True,
)
print(f"[INFO] 索引加载成功,共 {vs.index.ntotal} 条向量")
return vs
# ============================
# Step 2: 相似度检索
# ============================
def similarity_search(
vs: FAISS,
query: str,
k: int = 3,
score_threshold: float = None
):
"""
基于向量相似度检索最相关的文档片段。
返回:List[Tuple[Document, float]],其中 float 为 L2 距离(越小越相似)
"""
return vs.similarity_search_with_score(query, k=k, score_threshold=score_threshold)
# ============================
# Step 3: 大模型生成答案(流式输出)
# ============================
def answer_with_llm(question: str, docs: Sequence[Document]):
"""
基于检索到的文档,调用大模型生成答案(流式输出)。
"""
context = "\n\n".join([f"[{i + 1}] {doc.page_content}" for i, doc in enumerate(docs)])
prompt_template = ChatPromptTemplate.from_messages(
[
(
"system",
"你是一个严谨的中文问答助手。"
"只基于给定的上下文回答问题。"
"如果上下文不足以回答问题,请明确说明"根据已检索到的资料无法确定"。"
"回答时请引用上下文的编号作为依据。",
),
(
"user",
"问题:{question}\n\n"
"上下文:\n{context}\n\n"
"请给出答案:"
),
]
)
chain = prompt_template | my_llm
for chunk in chain.stream({"question": question, "context": context}):
yield chunk.content
# ============================
# 主流程:组装以上三步
# ============================
def online_question_qa_main(question: str):
"""
在线问答主函数。
参数:
question: 用户输入的问题
Yields:
str: 大模型生成的答案 token(流式)
"""
# Step 1: 加载索引
vs = load_faiss(persist_dir="_faiss_index", embeddings_model=my_embeddings_model)
# Step 2: 检索
hits = similarity_search(vs, question, k=3)
hit_document_list = []
print(f"\n{'='*50}")
print(f"问题: {question}")
print(f"检索结果:")
for i, (hit, score) in enumerate(hits):
print(f" [{i+1}] (L2距离: {score:.4f}) {hit.page_content[:80]}...")
hit_document_list.append(hit)
print(f"{'='*50}")
print(f"模型回答:")
# Step 3: 生成
for content in answer_with_llm(question=question, docs=hit_document_list):
yield content
# ============================
# 交互式命令行入口
# ============================
if __name__ == "__main__":
print("物流知识库 RAG 问答系统")
print("输入 'exit' 或 'quit' 退出\n")
while True:
question = input("请输入问题: ").strip()
if not question:
continue
if question.lower() in {"exit", "quit"}:
print("再见!")
break
for content in online_question_qa_main(question):
print(content, end="", flush=True)
print("\n")
5.7 运行效果演示
第一步:离线建库
python __001__store_data_faiss.py
预期输出:
==================================================
开始离线建库...
[INFO] 加载完成: 3 个文件 → 15 个文档片段
[INFO] 切分完成: 15 个文档 → 47 个文本块
[INFO] FAISS 索引已保存: _faiss_index,共 47 条向量
==================================================
离线建库完成!
第二步:在线问答
python __002__online_question_qa.py
预期交互:
物流知识库 RAG 问答系统
输入 'exit' 或 'quit' 退出
请输入问题: 冷链运输的温控标准是什么?
==================================================
问题: 冷链运输的温控标准是什么?
检索结果:
[1] (L2距离: 0.2134) 冷链运输要求全程温度控制在2-8摄氏度之间...
[2] (L2距离: 0.3567) 冷链运输车辆需配备GPS温控监测设备...
[3] (L2距离: 0.4123) 生鲜食品的冷链运输温度标准根据品类有所不同...
==================================================
模型回答:
根据上下文,冷链运输的温控标准如下:
1. 冷链运输要求全程温度控制在 2-8 摄氏度之间 [1]。
2. 冷链运输车辆需配备 GPS 温控监测设备,实时记录运输过程中的温度数据 [2]。
3. 生鲜食品的冷链运输温度标准会根据品类有所不同 [3]。
需要注意的是,具体品类的温度要求可能存在差异,建议参考相关品类的专项规范。
六、进阶优化方向
基础 RAG 跑通之后,可以从以下几个方向进行优化:
6.1 检索质量优化
| 优化策略 | 说明 |
|---|---|
| 混合检索(Hybrid Search) | 结合向量检索和 BM25 关键词检索,兼顾语义理解和精确匹配 |
| 查询改写(Query Rewriting) | 使用 LLM 对用户问题进行改写、扩展或分解,提高检索召回率 |
| 重排序(Reranking) | 对初步检索的结果使用 Cross-Encoder 模型进行精排,提高排序质量 |
| 多路召回(Multi-Query) | 将用户问题改写为多个不同表述,分别检索后合并去重 |
6.2 文本切分优化
| 优化策略 | 说明 |
|---|---|
| 语义切分 | 使用 Embedding 计算相邻句子的语义相似度,在语义断裂处切分 |
| Markdown 感知切分 | 使用 MarkdownHeaderTextSplitter 按标题层级切分,保持章节完整性 |
| 递归层级切分 | 先按大段落切分,再对超长段落按句子切分 |
6.3 生成质量优化
| 优化策略 | 说明 |
|---|---|
| Prompt 工程 | 优化 System Prompt,增加角色设定、输出格式约束等 |
| 答案引用验证 | 生成答案后,验证引用的编号是否确实存在于上下文中 |
| 多轮对话 | 维护对话历史,支持追问和上下文引用 |
6.4 工程化优化
| 优化策略 | 说明 |
|---|---|
| 增量更新 | 支持新增文档的增量索引,无需全量重建 |
| 索引缓存 | 将 FAISS 索引加载到内存,避免每次查询都从磁盘读取 |
| 异步处理 | 使用异步接口提升并发查询的吞吐量 |
| 日志与监控 | 记录检索结果、响应时间、用户反馈等指标 |
七、常见问题与排查
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
BadRequestError: batch size > 10 |
DashScope embed 接口单次最多处理 10 条 | 设置 chunk_size=10 |
tiktoken 相关报错 |
DashScope 兼容模式不支持 OpenAI 的 tokenizer | 设置 tiktoken_enabled=False |
| 检索结果为空 | FAISS 索引目录不存在或为空 | 先运行离线建库脚本 |
| 检索结果不相关 | chunk_size 设置过小/过大,或 Embedding 模型不匹配 | 调整切分参数,确保建库和检索使用同一模型 |
ModuleNotFoundError: faiss |
未安装 FAISS | pip install faiss-cpu |
.env 未生效 |
文件路径不对或未调用 load_dotenv |
确认 .env 在项目根目录,且在使用变量前调用加载函数 |
allow_dangerous_deserialization 警告 |
FAISS 使用 pickle 序列化 | 加载自己的索引时传入 allow_dangerous_deserialization=True |
| API Key 无效 | Key 过期或配置错误 | 检查 .env 中的 Key 是否正确且未过期 |
八、总结
本文从 RAG 的理论原理出发,基于 LangChain 框架,完成了一个完整的物流知识库 RAG 问答系统的搭建。核心要点总结如下:
- 1.
RAG 的核心思想是"先检索,再生成",通过在推理时动态引入外部知识,有效解决了大模型的知识时效性和幻觉问题。
- 2.
完整的 RAG 流程包含两大阶段:
- 离线建库:文档加载 → 文本切分 → 向量化 → 存入向量数据库
- 在线问答:问题向量化 → 相似度检索 → Prompt 组装 → 大模型生成答案
- 3.
LangChain 的模块化设计使得每个组件(Loader、Splitter、Embedding、VectorStore、LLM)都可以独立替换,便于根据实际需求灵活选择技术栈。
- 4.
工程化实践要点:
- 使用
.env管理敏感配置 - 使用全局单例避免重复初始化
- 合理设置 chunk_size 和 chunk_overlap
- 确保建库和检索使用同一 Embedding 模型
- 使用
- 5.
进阶优化可以从检索策略(混合检索、重排序)、文本切分(语义切分)、生成质量(Prompt 优化)和工程化(增量更新、异步处理)等多个维度展开。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)