LangChain RAG 实战:构建企业级检索增强生成系统

项目概述

本项目是一个基于 LangChain 框架实现的完整 RAG(Retrieval-Augmented Generation)系统,涵盖了从数据加载、向量化存储到智能问答的全流程。项目展示了如何整合多种向量数据库、大语言模型和 LangGraph 工作流编排,构建生产级的 AI 应用。

技术栈

类别 技术选型
核心框架 LangChain, LangGraph
大语言模型 Qwen/Qwen3-8B (SiliconFlow)
嵌入模型 BAAI/bge-m3 (1024 维度)
向量数据库 Milvus, PGVector, Qdrant, Redis
工作流编排 LangGraph + Redis Checkpointer
可观测性 LangSmith

项目结构

rag/
├── vector_run.py    # 数据入库 - 多源文档加载与向量化
├── query_run.py     # 查询服务 - RAG 检索与问答
└── langgraph_run.py # 工作流编排 - Agent 工具调用

核心功能模块

1. 多源数据加载 (vector_run.py)

支持 9 种数据源的并行加载处理:

def run() -> None:
    function_list = [
        sqldatabase_loader,  # PostgreSQL 数据库
        bili_loader,         # B 站视频字幕
        pdf_loader,          # PDF 文档
        md_loader,           # Markdown 文件
        java_loader,         # Java 源代码
        word_loader,         # Word 文档
        directory_loader,    # 目录批量加载
        web_loader,          # 网页爬虫
        json_loader,         # JSON 数据
    ]

关键特性:

  • 使用 ThreadPoolExecutor 实现并发加载
  • 针对不同格式采用专用 Loader(如 Docx2txtLoader, UnstructuredPDFLoader
  • Java 代码采用语法感知的 RecursiveCharacterTextSplitter.from_language(Language.JAVA)
  • 统一分片参数:chunk_size=1024, chunk_overlap=200

2. 多向量库对比实现

Milvus(混合检索)
Milvus(
    embedding_function=embeddings,
    builtin_function=[BM25BuiltInFunction(
        input_field_names="text",
        output_field_names="sparse",
        enable_match=True,
        analyzer_params={"type": "chinese"}
    )],
    vector_field=["dense", "sparse"],  # 稠密 + 稀疏双向量
    collection_name="test"
)
PGVector(PostgreSQL)
PGVector(
    connection=pg_engine,
    embeddings=embeddings,
    collection_name="test",
    embedding_length=1024,
    use_jsonb=True
)
Redis Vector
RedisVectorStore(
    config=RedisConfig(redis_client=redis, index_name="vs-index"),
    embeddings=embeddings,
    ttl=60*5  # 5 分钟过期
)

3. RAG 检索增强链 (query_run.py)

retriever = ContextualCompressionRetriever(
    base_retriever=milvus_vector_store.as_retriever(),
    base_compressor=LLMChainExtractor.from_llm(llm),  # LLM 二次压缩
)

chain = (
    {
        "context": itemgetter("question") | retriever,
        "question": itemgetter("question"),
    }
    | prompt
    | llm
    | StrOutputParser()
)

检索优化策略:

  • 使用 ContextualCompressionRetriever 进行上下文压缩
  • LLMChainExtractor 提取关键信息片段
  • 支持相似度阈值过滤:score_threshold=0.8

4. LangGraph 工作流编排 (langgraph_run.py)

实现了一个支持工具调用的 Agent 工作流:

workflow = StateGraph(MessagesState)
workflow.add_node("call_model", call_model)
workflow.add_node("tool_node", tool_node)
workflow.add_edge(START, "call_model")
workflow.add_conditional_edges("call_model", should_continue, ["tool_node", END])
workflow.add_edge("tool_node", "call_model")

工具定义示例:

@tool
def multiply(a: int, b: int) -> int:
    """乘法,两数乘法."""
    return a * b

@tool  
def search(query: str) -> str:
    """搜索."""
    return f"{query}"

持久化配置:

  • RedisSaver:对话状态检查点
  • RedisStore:长期记忆存储
  • RedisCache:LLM 响应缓存(TTL=5 分钟)
  • CachePolicy(ttl=5*60):节点级缓存策略

关键技术要点

1. 速率限制

rate_limiter = InMemoryRateLimiter(
    requests_per_second=0.1,
    check_every_n_seconds=0.1,
    max_bucket_size=10,
)

2. LangSmith 追踪

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "test"

3. 会话历史管理

def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

4. 流式响应

responses = chain.stream(
    input={"question": query},
    config={"configurable": {"session_id": session_id}}
)
for part in responses:
    print(part, end='', flush=True)

运行方式

# 1. 数据入库
python vector_run.py

# 2. 执行查询
python query_run.py

# 3. Agent 工作流
python langgraph_run.py

环境依赖

langchain>=0.1.0
langchain-community>=0.0.10
langchain-openai>=0.0.2
langchain-milvus>=0.1.0
langchain-postgres>=0.0.1
langchain-redis>=0.0.1
langgraph>=0.0.20
pymilvus>=2.3.0
redis>=5.0.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0

性能优化建议

  1. 批处理:使用 add_documents 批量插入而非单条添加
  2. 异步加载:利用 AsyncChromiumLoader 提升网页爬取效率
  3. 索引优化:Milvus 使用 HNSW 索引加速检索
  4. 缓存策略:合理设置 TTL 平衡内存与响应速度
  5. 混合检索:结合 Dense + Sparse 向量提升召回率

扩展方向

  • 集成 Query Transformer 进行查询重写
  • 实现 Multi-Retrieval 多路召回
  • 添加 ReRanker 精排层
  • 支持知识库版本管理
  • 实现增量更新机制

参考资料


本文档适用于技术分享与学习参考,实际生产环境请注意密钥安全管理。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐