LangChain RAG实战——实战与优化篇:完整案例与系统优化

作者:资深程序员 | 平台:LangChain + 阿里云百炼(Qwen)+ ChromaDB
本篇介绍RAG系统的完整实战案例与高级优化策略,涵盖项目结构设计、检索优化、生成优化、系统优化、效果评估等内容。


目录

  1. 完整实战案例
  2. RAG系统优化
  3. 总结

一、完整实战案例

1.1 项目结构

rag_project/
├── config.py          # 配置
├── document_loader.py # 文档加载
├── text_splitter.py   # 文本分割
├── vector_store.py    # 向量存储
├── retriever.py       # 检索器
├── rag_chain.py       # RAG Chain
└── main.py            # 主程序

1.2 配置文件

# config.py
import os
from dotenv import load_dotenv

load_dotenv()

# API配置
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")
OPENAI_API_BASE = "https://dashscope.aliyuncs.com/compatible-mode/v1"

# 模型配置
LLM_MODEL = "qwen-plus"
EMBEDDING_MODEL = "text-embedding-3-small"

# 向量数据库配置
PERSIST_DIRECTORY = "./chroma_db"
COLLECTION_NAME = "rag_docs"

# 检索配置
DEFAULT_K = 5
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50

1.3 完整RAG实现

# main.py
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

class RAGSystem:
    def __init__(self):
        # 初始化组件
        self.llm = ChatOpenAI(
            model="qwen-plus",
            api_key=os.getenv("DASHSCOPE_API_KEY"),
            openai_api_base="https://dashscope.aliyuncs.com/compatible-mode/v1"
        )

        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small",
            openai_api_key=os.getenv("DASHSCOPE_API_KEY"),
            openai_api_base="https://dashscope.aliyuncs.com/compatible-mode/v1"
        )

        self.vectorstore = None
        self.chain = None

    def load_documents(self, path: str, glob: str = "*.pdf"):
        """加载文档"""
        loader = DirectoryLoader(path, glob=glob, loader_cls=PyPDFLoader)
        return loader.load()

    def split_documents(self, documents, chunk_size: int = 500, chunk_overlap: int = 50):
        """分割文档"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len
        )
        return splitter.split_documents(documents)

    def create_vectorstore(self, documents, persist_directory: str = "./chroma_db"):
        """创建向量数据库"""
        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings,
            persist_directory=persist_directory
        )
        self.vectorstore.persist()
        return self.vectorstore

    def load_vectorstore(self, persist_directory: str = "./chroma_db"):
        """加载已存在的向量数据库"""
        self.vectorstore = Chroma(
            persist_directory=persist_directory,
            embedding_function=self.embeddings
        )
        return self.vectorstore

    def create_chain(self):
        """创建RAG Chain"""
        prompt_template = """基于以下参考内容回答用户问题。

参考内容:
{context}

用户问题: {question}

回答:"""

        prompt = ChatPromptTemplate.from_template(prompt_template)

        def format_docs(docs):
            return "\n\n".join([doc.page_content for doc in docs])

        retriever = self.vectorstore.as_retriever(search_kwargs={"k": 5})

        self.chain = (
            {"context": retriever | format_docs, "question": RunnablePassthrough()}
            | prompt
            | self.llm
            | StrOutputParser()
        )
        return self.chain

    def query(self, question: str) -> str:
        """执行查询"""
        if self.chain is None:
            self.create_chain()
        return self.chain.invoke(question)

    def get_sources(self, question: str, k: int = 3):
        """获取源文档"""
        docs = self.vectorstore.similarity_search(question, k=k)
        return [(doc.page_content, doc.metadata) for doc in docs]


# 使用示例
def main():
    rag = RAGSystem()

    # 第一次运行:加载文档并创建向量库
    docs = rag.load_documents("./documents/")
    chunks = rag.split_documents(docs)
    rag.create_vectorstore(chunks)
    rag.create_chain()

    # 查询
    question = "本文档的主要内容是什么?"
    answer = rag.query(question)
    print(f"问题: {question}")
    print(f"答案: {answer}")

    # 获取来源
    sources = rag.get_sources(question)
    print("\n参考来源:")
    for i, (content, metadata) in enumerate(sources, 1):
        print(f"{i}. {metadata.get('source', 'Unknown')} - {content[:100]}...")


if __name__ == "__main__":
    main()

1.4 批量文档处理

# batch_process.py
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader

class DocumentProcessor:
    """支持多种格式的文档处理器"""

    LOADERS = {
        ".pdf": PyPDFLoader,
        ".docx": Docx2txtLoader,
        ".txt": TextLoader,
    }

    @classmethod
    def load_file(cls, file_path: str):
        """加载单个文件"""
        ext = os.path.splitext(file_path)[1].lower()
        if ext in cls.LOADERS:
            loader = cls.LOADERS[ext](file_path)
            return loader.load()
        return []

    @classmethod
    def load_directory(cls, directory: str):
        """加载目录下的所有文档"""
        all_docs = []
        for ext, loader_cls in cls.LOADERS.items():
            loader = DirectoryLoader(
                directory,
                glob=f"*{ext}",
                loader_cls=loader_cls
            )
            all_docs.extend(loader.load())
        return all_docs

二、RAG系统优化

在这里插入图片描述

2.1 检索优化

2.1.1 混合检索

结合关键词检索(BM25)和向量检索,提升检索效果:

# optimization.py
from langchain_community.retrievers import BM25Retriever

# 创建BM25检索器
bm25_retriever = BM25Retriever.from_documents(chunks)

# 向量检索器
vector_retriever = vectorstore.as_retriever()

# 组合检索(Ensemble Retriever)
from langchain_community.retrievers import EnsembleRetriever

ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, vector_retriever],
    weights=[0.3, 0.7]  # BM25权重30%,向量检索权重70%
)
2.1.2 重排序

使用Cross-Encoder对检索结果进行重排序:

# optimization.py
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

# 使用BGE重排序模型
cross_encoder = HuggingFaceCrossEncoder(
    model_name="BAAI/bge-reranker-base"
)

def rerank_results(query, docs, top_k=3):
    """重排序检索结果"""
    doc_texts = [doc.page_content for doc in docs]
    scores = cross_encoder.predict([(query, doc) for doc in doc_texts])

    # 按分数排序
    scored_docs = list(zip(docs, scores))
    scored_docs.sort(key=lambda x: x[1], reverse=True)

    return [doc for doc, score in scored_docs[:top_k]]
2.1.3 查询扩展

使用HyDE(假设性文档嵌入)改进查询:

# optimization.py
from langchain_core.prompts import ChatPromptTemplate

hyde_prompt = ChatPromptTemplate.from_messages([
    ("human", """请根据用户问题,生成一个假设性的回答。
这个假设性回答应该包含问题的可能答案。

用户问题: {question}

假设性回答:""")
])

hyde_chain = hyde_prompt | llm | StrOutputParser()

def hyde_retrieval(query, vectorstore, k=5):
    """HyDE检索"""
    # 生成假设性回答
    hypothetical_answer = hyde_chain.invoke({"question": query})

    # 使用假设性回答进行检索
    docs = vectorstore.similarity_search(hypothetical_answer, k=k)
    return docs

2.2 生成优化

2.2.1 上下文压缩

减少无关上下文,提高生成质量:

# optimization.py
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.document_compressors import LLMChainExtractor

# 创建压缩器
compressor = LLMChainExtractor.from_llm(llm)

# 创建压缩检索器
compression_retriever = ContextualCompressionRetriever(
    base_retriever=vectorstore.as_retriever(),
    compressors=compressor
)

# 压缩后的检索
compressed_docs = compression_retriever.invoke("用户问题")
2.2.2 Self-RAG风格的自适应检索
# optimization.py
from langchain_core.prompts import ChatPromptTemplate

self_rag_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个严格的事实核查助手。对于每个问题:
1. 首先判断是否需要检索外部知识
2. 如果需要,提供准确的参考信息
3. 如果不确定,明确说明

记住:不要编造信息,准确比完整更重要。"""),
    ("human", "{question}")
])

self_rag_chain = self_rag_prompt | llm | StrOutputParser()

2.3 系统优化

2.3.1 缓存策略
# optimization.py
from functools import lru_cache
from langchain_core.runnables import RunnableLambda

@lru_cache(maxsize=100)
def cached_embedding(text: str) -> list:
    """缓存嵌入结果"""
    return embeddings.embed_query(text)

# 使用缓存检索器
class CachedRetriever:
    def __init__(self, vectorstore):
        self.vectorstore = vectorstore

    def similarity_search(self, query: str, k: int = 5):
        # 查询向量使用缓存
        query_vector = cached_embedding(query)
        return self.vectorstore.similarity_search_by_vector(query_vector, k=k)
2.3.2 异步处理
# optimization.py
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def async_retrieve(query: str, k: int = 5):
    """异步检索"""
    loop = asyncio.get_event_loop()
    docs = await loop.run_in_executor(
        executor,
        vectorstore.similarity_search,
        query, k
    )
    return docs

# 批量异步检索
async def batch_retrieve(queries: list[str]):
    tasks = [async_retrieve(q) for q in queries]
    results = await asyncio.gather(*tasks)
    return results

三、总结

本文系统介绍了基于LangChain的RAG技术实战,涵盖从文档加载到检索增强生成的完整流程:

模块 核心组件 作用
文档加载 PyPDFLoader、Docx2txtLoader 解析多种格式文档
文本分割 RecursiveCharacterTextSplitter 智能分块
嵌入模型 OpenAI Embeddings、BGE 文本向量化
向量存储 ChromaDB、FAISS 高效向量检索
语义检索 similarity_search、MMR 精准信息获取
RAG Chain LCEL表达式 端到端流水线

核心优势

  1. 知识实时性:向量库更新即时生效,无需重新训练模型
  2. 可追溯性:明确返回参考来源,增强答案可信度
  3. 领域适配:通过注入领域文档,实现垂直知识增强
  4. 成本效益:比微调更经济,比长上下文更高效

最佳实践

  1. 文档预处理:清洗HTML标签、表格、代码块等特殊内容
  2. 块大小选择:根据内容类型选择合适的chunk_size(代码块建议小,段落建议大)
  3. 重叠设计:适当的重叠减少边界信息丢失
  4. 元数据保存:保留来源、页码、更新时间等关键信息
  5. 检索评估:定期评估检索准确率,持续优化分割策略

RAG系列文章索引

篇目 标题 内容
第一篇 LangChain RAG实战——数据预处理篇:从文档加载到嵌入向量化 文档加载、文本分割、嵌入模型
第二篇 LangChain RAG实战——向量存储与检索篇:从向量数据库到Chain构建 向量数据库、语义检索、RAG Chain
第三篇 LangChain RAG实战——实战与优化篇:完整案例与系统优化 完整实战、系统优化、效果评估

参考资源

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐