LangChain RAG 实战:构建企业级检索增强生成系统
·
LangChain RAG 实战:构建企业级检索增强生成系统
项目概述
本项目是一个基于 LangChain 框架实现的完整 RAG(Retrieval-Augmented Generation)系统,涵盖了从数据加载、向量化存储到智能问答的全流程。项目展示了如何整合多种向量数据库、大语言模型和 LangGraph 工作流编排,构建生产级的 AI 应用。
技术栈
| 类别 | 技术选型 |
|---|---|
| 核心框架 | LangChain, LangGraph |
| 大语言模型 | Qwen/Qwen3-8B (SiliconFlow) |
| 嵌入模型 | BAAI/bge-m3 (1024 维度) |
| 向量数据库 | Milvus, PGVector, Qdrant, Redis |
| 工作流编排 | LangGraph + Redis Checkpointer |
| 可观测性 | LangSmith |
项目结构
rag/
├── vector_run.py # 数据入库 - 多源文档加载与向量化
├── query_run.py # 查询服务 - RAG 检索与问答
└── langgraph_run.py # 工作流编排 - Agent 工具调用
核心功能模块
1. 多源数据加载 (vector_run.py)
支持 9 种数据源的并行加载处理:
def run() -> None:
function_list = [
sqldatabase_loader, # PostgreSQL 数据库
bili_loader, # B 站视频字幕
pdf_loader, # PDF 文档
md_loader, # Markdown 文件
java_loader, # Java 源代码
word_loader, # Word 文档
directory_loader, # 目录批量加载
web_loader, # 网页爬虫
json_loader, # JSON 数据
]
关键特性:
- 使用
ThreadPoolExecutor实现并发加载 - 针对不同格式采用专用 Loader(如
Docx2txtLoader,UnstructuredPDFLoader) - Java 代码采用语法感知的
RecursiveCharacterTextSplitter.from_language(Language.JAVA) - 统一分片参数:
chunk_size=1024,chunk_overlap=200
2. 多向量库对比实现
Milvus(混合检索)
Milvus(
embedding_function=embeddings,
builtin_function=[BM25BuiltInFunction(
input_field_names="text",
output_field_names="sparse",
enable_match=True,
analyzer_params={"type": "chinese"}
)],
vector_field=["dense", "sparse"], # 稠密 + 稀疏双向量
collection_name="test"
)
PGVector(PostgreSQL)
PGVector(
connection=pg_engine,
embeddings=embeddings,
collection_name="test",
embedding_length=1024,
use_jsonb=True
)
Redis Vector
RedisVectorStore(
config=RedisConfig(redis_client=redis, index_name="vs-index"),
embeddings=embeddings,
ttl=60*5 # 5 分钟过期
)
3. RAG 检索增强链 (query_run.py)
retriever = ContextualCompressionRetriever(
base_retriever=milvus_vector_store.as_retriever(),
base_compressor=LLMChainExtractor.from_llm(llm), # LLM 二次压缩
)
chain = (
{
"context": itemgetter("question") | retriever,
"question": itemgetter("question"),
}
| prompt
| llm
| StrOutputParser()
)
检索优化策略:
- 使用
ContextualCompressionRetriever进行上下文压缩 LLMChainExtractor提取关键信息片段- 支持相似度阈值过滤:
score_threshold=0.8
4. LangGraph 工作流编排 (langgraph_run.py)
实现了一个支持工具调用的 Agent 工作流:
workflow = StateGraph(MessagesState)
workflow.add_node("call_model", call_model)
workflow.add_node("tool_node", tool_node)
workflow.add_edge(START, "call_model")
workflow.add_conditional_edges("call_model", should_continue, ["tool_node", END])
workflow.add_edge("tool_node", "call_model")
工具定义示例:
@tool
def multiply(a: int, b: int) -> int:
"""乘法,两数乘法."""
return a * b
@tool
def search(query: str) -> str:
"""搜索."""
return f"{query}"
持久化配置:
RedisSaver:对话状态检查点RedisStore:长期记忆存储RedisCache:LLM 响应缓存(TTL=5 分钟)CachePolicy(ttl=5*60):节点级缓存策略
关键技术要点
1. 速率限制
rate_limiter = InMemoryRateLimiter(
requests_per_second=0.1,
check_every_n_seconds=0.1,
max_bucket_size=10,
)
2. LangSmith 追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "test"
3. 会话历史管理
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
4. 流式响应
responses = chain.stream(
input={"question": query},
config={"configurable": {"session_id": session_id}}
)
for part in responses:
print(part, end='', flush=True)
运行方式
# 1. 数据入库
python vector_run.py
# 2. 执行查询
python query_run.py
# 3. Agent 工作流
python langgraph_run.py
环境依赖
langchain>=0.1.0
langchain-community>=0.0.10
langchain-openai>=0.0.2
langchain-milvus>=0.1.0
langchain-postgres>=0.0.1
langchain-redis>=0.0.1
langgraph>=0.0.20
pymilvus>=2.3.0
redis>=5.0.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0
性能优化建议
- 批处理:使用
add_documents批量插入而非单条添加 - 异步加载:利用
AsyncChromiumLoader提升网页爬取效率 - 索引优化:Milvus 使用 HNSW 索引加速检索
- 缓存策略:合理设置 TTL 平衡内存与响应速度
- 混合检索:结合 Dense + Sparse 向量提升召回率
扩展方向
- 集成 Query Transformer 进行查询重写
- 实现 Multi-Retrieval 多路召回
- 添加 ReRanker 精排层
- 支持知识库版本管理
- 实现增量更新机制
参考资料
本文档适用于技术分享与学习参考,实际生产环境请注意密钥安全管理。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)