LangChain RAG实战——实战与优化篇:完整案例与系统优化
·
LangChain RAG实战——实战与优化篇:完整案例与系统优化
作者:资深程序员 | 平台:LangChain + 阿里云百炼(Qwen)+ ChromaDB
本篇介绍RAG系统的完整实战案例与高级优化策略,涵盖项目结构设计、检索优化、生成优化、系统优化、效果评估等内容。
目录
一、完整实战案例
1.1 项目结构
rag_project/
├── config.py # 配置
├── document_loader.py # 文档加载
├── text_splitter.py # 文本分割
├── vector_store.py # 向量存储
├── retriever.py # 检索器
├── rag_chain.py # RAG Chain
└── main.py # 主程序
1.2 配置文件
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
# API配置
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")
OPENAI_API_BASE = "https://dashscope.aliyuncs.com/compatible-mode/v1"
# 模型配置
LLM_MODEL = "qwen-plus"
EMBEDDING_MODEL = "text-embedding-3-small"
# 向量数据库配置
PERSIST_DIRECTORY = "./chroma_db"
COLLECTION_NAME = "rag_docs"
# 检索配置
DEFAULT_K = 5
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
1.3 完整RAG实现
# main.py
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
class RAGSystem:
def __init__(self):
# 初始化组件
self.llm = ChatOpenAI(
model="qwen-plus",
api_key=os.getenv("DASHSCOPE_API_KEY"),
openai_api_base="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=os.getenv("DASHSCOPE_API_KEY"),
openai_api_base="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
self.vectorstore = None
self.chain = None
def load_documents(self, path: str, glob: str = "*.pdf"):
"""加载文档"""
loader = DirectoryLoader(path, glob=glob, loader_cls=PyPDFLoader)
return loader.load()
def split_documents(self, documents, chunk_size: int = 500, chunk_overlap: int = 50):
"""分割文档"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len
)
return splitter.split_documents(documents)
def create_vectorstore(self, documents, persist_directory: str = "./chroma_db"):
"""创建向量数据库"""
self.vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=persist_directory
)
self.vectorstore.persist()
return self.vectorstore
def load_vectorstore(self, persist_directory: str = "./chroma_db"):
"""加载已存在的向量数据库"""
self.vectorstore = Chroma(
persist_directory=persist_directory,
embedding_function=self.embeddings
)
return self.vectorstore
def create_chain(self):
"""创建RAG Chain"""
prompt_template = """基于以下参考内容回答用户问题。
参考内容:
{context}
用户问题: {question}
回答:"""
prompt = ChatPromptTemplate.from_template(prompt_template)
def format_docs(docs):
return "\n\n".join([doc.page_content for doc in docs])
retriever = self.vectorstore.as_retriever(search_kwargs={"k": 5})
self.chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| self.llm
| StrOutputParser()
)
return self.chain
def query(self, question: str) -> str:
"""执行查询"""
if self.chain is None:
self.create_chain()
return self.chain.invoke(question)
def get_sources(self, question: str, k: int = 3):
"""获取源文档"""
docs = self.vectorstore.similarity_search(question, k=k)
return [(doc.page_content, doc.metadata) for doc in docs]
# 使用示例
def main():
rag = RAGSystem()
# 第一次运行:加载文档并创建向量库
docs = rag.load_documents("./documents/")
chunks = rag.split_documents(docs)
rag.create_vectorstore(chunks)
rag.create_chain()
# 查询
question = "本文档的主要内容是什么?"
answer = rag.query(question)
print(f"问题: {question}")
print(f"答案: {answer}")
# 获取来源
sources = rag.get_sources(question)
print("\n参考来源:")
for i, (content, metadata) in enumerate(sources, 1):
print(f"{i}. {metadata.get('source', 'Unknown')} - {content[:100]}...")
if __name__ == "__main__":
main()
1.4 批量文档处理
# batch_process.py
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
class DocumentProcessor:
"""支持多种格式的文档处理器"""
LOADERS = {
".pdf": PyPDFLoader,
".docx": Docx2txtLoader,
".txt": TextLoader,
}
@classmethod
def load_file(cls, file_path: str):
"""加载单个文件"""
ext = os.path.splitext(file_path)[1].lower()
if ext in cls.LOADERS:
loader = cls.LOADERS[ext](file_path)
return loader.load()
return []
@classmethod
def load_directory(cls, directory: str):
"""加载目录下的所有文档"""
all_docs = []
for ext, loader_cls in cls.LOADERS.items():
loader = DirectoryLoader(
directory,
glob=f"*{ext}",
loader_cls=loader_cls
)
all_docs.extend(loader.load())
return all_docs
二、RAG系统优化

2.1 检索优化
2.1.1 混合检索
结合关键词检索(BM25)和向量检索,提升检索效果:
# optimization.py
from langchain_community.retrievers import BM25Retriever
# 创建BM25检索器
bm25_retriever = BM25Retriever.from_documents(chunks)
# 向量检索器
vector_retriever = vectorstore.as_retriever()
# 组合检索(Ensemble Retriever)
from langchain_community.retrievers import EnsembleRetriever
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.3, 0.7] # BM25权重30%,向量检索权重70%
)
2.1.2 重排序
使用Cross-Encoder对检索结果进行重排序:
# optimization.py
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
# 使用BGE重排序模型
cross_encoder = HuggingFaceCrossEncoder(
model_name="BAAI/bge-reranker-base"
)
def rerank_results(query, docs, top_k=3):
"""重排序检索结果"""
doc_texts = [doc.page_content for doc in docs]
scores = cross_encoder.predict([(query, doc) for doc in doc_texts])
# 按分数排序
scored_docs = list(zip(docs, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, score in scored_docs[:top_k]]
2.1.3 查询扩展
使用HyDE(假设性文档嵌入)改进查询:
# optimization.py
from langchain_core.prompts import ChatPromptTemplate
hyde_prompt = ChatPromptTemplate.from_messages([
("human", """请根据用户问题,生成一个假设性的回答。
这个假设性回答应该包含问题的可能答案。
用户问题: {question}
假设性回答:""")
])
hyde_chain = hyde_prompt | llm | StrOutputParser()
def hyde_retrieval(query, vectorstore, k=5):
"""HyDE检索"""
# 生成假设性回答
hypothetical_answer = hyde_chain.invoke({"question": query})
# 使用假设性回答进行检索
docs = vectorstore.similarity_search(hypothetical_answer, k=k)
return docs
2.2 生成优化
2.2.1 上下文压缩
减少无关上下文,提高生成质量:
# optimization.py
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.document_compressors import LLMChainExtractor
# 创建压缩器
compressor = LLMChainExtractor.from_llm(llm)
# 创建压缩检索器
compression_retriever = ContextualCompressionRetriever(
base_retriever=vectorstore.as_retriever(),
compressors=compressor
)
# 压缩后的检索
compressed_docs = compression_retriever.invoke("用户问题")
2.2.2 Self-RAG风格的自适应检索
# optimization.py
from langchain_core.prompts import ChatPromptTemplate
self_rag_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个严格的事实核查助手。对于每个问题:
1. 首先判断是否需要检索外部知识
2. 如果需要,提供准确的参考信息
3. 如果不确定,明确说明
记住:不要编造信息,准确比完整更重要。"""),
("human", "{question}")
])
self_rag_chain = self_rag_prompt | llm | StrOutputParser()
2.3 系统优化
2.3.1 缓存策略
# optimization.py
from functools import lru_cache
from langchain_core.runnables import RunnableLambda
@lru_cache(maxsize=100)
def cached_embedding(text: str) -> list:
"""缓存嵌入结果"""
return embeddings.embed_query(text)
# 使用缓存检索器
class CachedRetriever:
def __init__(self, vectorstore):
self.vectorstore = vectorstore
def similarity_search(self, query: str, k: int = 5):
# 查询向量使用缓存
query_vector = cached_embedding(query)
return self.vectorstore.similarity_search_by_vector(query_vector, k=k)
2.3.2 异步处理
# optimization.py
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def async_retrieve(query: str, k: int = 5):
"""异步检索"""
loop = asyncio.get_event_loop()
docs = await loop.run_in_executor(
executor,
vectorstore.similarity_search,
query, k
)
return docs
# 批量异步检索
async def batch_retrieve(queries: list[str]):
tasks = [async_retrieve(q) for q in queries]
results = await asyncio.gather(*tasks)
return results
三、总结
本文系统介绍了基于LangChain的RAG技术实战,涵盖从文档加载到检索增强生成的完整流程:
| 模块 | 核心组件 | 作用 |
|---|---|---|
| 文档加载 | PyPDFLoader、Docx2txtLoader | 解析多种格式文档 |
| 文本分割 | RecursiveCharacterTextSplitter | 智能分块 |
| 嵌入模型 | OpenAI Embeddings、BGE | 文本向量化 |
| 向量存储 | ChromaDB、FAISS | 高效向量检索 |
| 语义检索 | similarity_search、MMR | 精准信息获取 |
| RAG Chain | LCEL表达式 | 端到端流水线 |
核心优势
- 知识实时性:向量库更新即时生效,无需重新训练模型
- 可追溯性:明确返回参考来源,增强答案可信度
- 领域适配:通过注入领域文档,实现垂直知识增强
- 成本效益:比微调更经济,比长上下文更高效
最佳实践
- 文档预处理:清洗HTML标签、表格、代码块等特殊内容
- 块大小选择:根据内容类型选择合适的chunk_size(代码块建议小,段落建议大)
- 重叠设计:适当的重叠减少边界信息丢失
- 元数据保存:保留来源、页码、更新时间等关键信息
- 检索评估:定期评估检索准确率,持续优化分割策略
RAG系列文章索引
| 篇目 | 标题 | 内容 |
|---|---|---|
| 第一篇 | LangChain RAG实战——数据预处理篇:从文档加载到嵌入向量化 | 文档加载、文本分割、嵌入模型 |
| 第二篇 | LangChain RAG实战——向量存储与检索篇:从向量数据库到Chain构建 | 向量数据库、语义检索、RAG Chain |
| 第三篇 | LangChain RAG实战——实战与优化篇:完整案例与系统优化 | 完整实战、系统优化、效果评估 |
参考资源
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)