LangChain 主要功能构建实战:记忆、链路与智能分发
前言
在上一篇文章中,我们详细拆解了 RAGFlow 的 MCP 工具模块,实现了从用户自然语言输入到知识库助手的智能路由。但这些工具只是"散装的能力",要让整个后端真正运转起来,还需要一条完整的主流程——它要能记住对话上下文、能判断用户意图、能选择合适的处理链路并流式返回结果。
本文将继续拆解项目后端的核心模块:基于 LangChain 的主流程构建。我们将从记忆管理、业务链路、智能分发、统一处理四个层面,一步步还原整个后端的拼装过程。
一、整体架构总览
在 LangChain 主流程中,我们按"数据层 → 逻辑层 → 控制层"的层次来组织代码:
| 层级 | 文件 | 职责 |
|---|---|---|
| 配置层 | config.py |
MongoDB 连接参数与环境变量管理 |
| 数据层 | memory.py |
对话记忆的创建、持久化与格式化 |
| 逻辑层 | chains.py |
知识库查询链、一般对话链(流式输出) |
| 控制层 | intent.py |
基于 Function Calling 的意图智能分类 |
| 编排层 | processor.py |
闭包工厂模式,串联记忆→意图→链路→存储 |
调用链路:
用户输入 → processor.py (create_unified_processor) ├── memory.py (加载历史 → 格式化) ├── intent.py (detect_intent → knowledge_base / general_chat) ├── chains.py (选择链路 → 流式返回) └── memory.py (保存本轮对话)
二、配置层:config.py
这是最基础的一层,负责管理 MongoDB 的连接参数:
load_dotenv()
# MongoDB配置
MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017/")
MONGODB_DB = os.getenv("MONGODB_DB", "conversation_db")
MONGODB_COLLECTION = os.getenv("MONGODB_COLLECTION", "chat_history")
# 其他配置参数
DEFAULT_SESSION_ID = "default"
SESSION_TTL_DAYS = 30 # 会话过期天数
设计要点:
-
所有敏感信息通过
.env文件管理,不硬编码 -
提供合理的默认值,确保开发环境开箱即用
-
SESSION_TTL_DAYS用于后续的会话过期清理策略
三、记忆数据层:memory.py
这是整个系统具备"连续对话"能力的基石。没有记忆的对话就像每次都和陌生人聊天,有了记忆才能实现真正的多轮交互。
3.1 创建记忆对象 — create_memory()
def create_memory(session_id: str = DEFAULT_SESSION_ID) -> ConversationBufferMemory:
# 拼接完整的 MongoDB 连接字符串
connection_string = f"{MONGODB_URI.rstrip('/')}/{MONGODB_DB}"
# 创建基于 MongoDB 的消息历史持久化
message_history = MongoDBChatMessageHistory(
connection_string=connection_string,
session_id=session_id,
database_name=MONGODB_DB,
collection_name=MONGODB_COLLECTION
)
# 创建 ConversationBufferMemory 记忆对象
memory = ConversationBufferMemory(
memory_key="chat_history",
chat_memory=message_history,
input_key="question",
output_key="answer",
return_messages=True
)
return memory
核心设计:
-
MongoDB 持久化:对话历史存储在 MongoDB 中,服务重启不丢失
-
session_id 隔离:不同用户/会话通过
session_id区分,互不干扰 -
优雅降级:MongoDB 连接失败时,自动回退到内存存储,保证服务可用
3.2 格式化对话历史 — format_chat_history()
def format_chat_history(messages: List[Any]) -> str:
formatted = ""
for msg in messages:
if isinstance(msg, HumanMessage):
formatted += f"用户: {msg.content}\n"
elif isinstance(msg, AIMessage):
formatted += f"助手: {msg.content}\n"
return formatted
将 LangChain 的 HumanMessage / AIMessage 对象转换为可读的文本格式,方便注入到 LLM 的 prompt 中作为上下文。
四、业务链路层:chains.py
这一层定义了两条核心处理链路,都支持异步流式输出——用户无需等待完整回答,可以逐字看到结果,体验类似 ChatGPT。
4.1 LLM 流式调用 — call_llm()
async def call_llm(question: str, prompt: str) -> AsyncGenerator[str, None]:
llm = ChatOpenAI(
model=os.getenv("LLM_MODEL", "qwen2.5-14b-instruct"),
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
streaming=True # 启用流式模式
)
chat_prompt = ChatPromptTemplate.from_messages([
("system", prompt),
("human", question)
])
chain = chat_prompt | llm
# 使用 astream 进行异步流式调用,逐块产出
async for chunk in chain.astream({}):
if hasattr(chunk, 'content') and chunk.content:
yield chunk.content
这里使用 AsyncGenerator 实现了真正的流式输出——每生成一个 token 就立刻返回,而不是等全部生成完毕。
4.2 知识库查询链 — create_knowledge_chain()
这是工厂函数,返回一个异步处理函数 process_with_knowledge:
def create_knowledge_chain(system_prompt: str):
async def process_with_knowledge(question: str, history: str):
# 1. 查询增强:结合历史上下文重写问题
enhanced_question = enhance_query(question, history)
# 2. 调用 RAGFlow MCP 获取知识库检索结果
rag_answer = RAGFlow_chat(enhanced_question)
# 3. 拼接完整 prompt:系统提示 + 对话历史 + 知识库结果 + 用户问题
prompt = system_prompt + "\n\n"
if history:
prompt += f"对话历史:\n{history}\n\n"
prompt += f"知识库检索结果:\n{rag_answer}\n\n"
prompt += f"用户问题: {question}"
# 4. 流式调用 LLM 生成最终回答
async for chunk in call_llm(question, prompt):
yield chunk
return process_with_knowledge
处理流程:查询增强 → RAGFlow检索 → Prompt拼接 → LLM流式生成。其中查询增强是一个容易被忽视但非常关键的环节——它能将"那它的风景怎么样"这种依赖上下文的省略表达,改写为"瓦尔登湖的风景怎么样"这样完整的独立查询。
4.3 一般对话链 — create_general_chain()
结构更简单,不经过知识库,直接用 LLM 回答:
def create_general_chain(system_prompt: str):
async def process_general_chat(question: str, history: str):
prompt = system_prompt + "\n\n"
if history:
prompt += f"对话历史:\n{history}\n\n"
prompt += f"用户问题: {question}"
async for chunk in call_llm(question, prompt):
yield chunk
return process_general_chat
两条链路的设计让系统有了"因地制宜"的能力——需要查资料时走知识库链路,日常闲聊时直接对话,效率和体验兼顾。
五、智能分发层:intent.py
这是整个系统的"大脑",负责判断每个用户问题应该走哪条链路。
5.1 核心思想:用 Function Calling 做路由
传统做法是用关键词匹配或规则引擎来判断意图,但面对千变万化的自然语言,规则永远不够用。这里采用了 LangChain 的 Function Calling(MCP) 机制——让 LLM 自己选择应该调用哪个工具:
from langchain_core.tools import tool
@tool("knowledge_base")
def knowledge_base_tool(question: str, chat_history: str = "") -> str:
"""当用户问题需要查询知识库时调用此工具。适用于涉及公司信息、规章制度、
专业领域知识、特定人物或事件等需要查询知识库的内容。"""
return "knowledge_base"
@tool("general_chat")
def general_chat_tool(question: str, chat_history: str = "") -> str:
"""当用户问题是日常问候、常识性问题、数学计算、个人意见等
无需特殊知识库时调用此工具。"""
return "general_chat"
两个工具并不真正执行操作——它们只是"信号",告诉系统该走哪条链路。真正的工作由 chains.py 中的处理链完成。
5.2 意图检测主流程 — detect_intent()
def detect_intent(question: str, chat_history: str = "") -> str:
# 1. 获取助手列表(让 LLM 知道有哪些知识库可用)
assistants_info = get_assistants()
# 2. 获取上一个意图(用于处理上下文衔接问题)
last_intent = get_last_intent()
# 3. 创建 LLM 客户端并绑定工具
llm = ChatOpenAI(model=model, api_key=api_key, base_url=base_url)
llm_with_tools = llm.bind_tools([knowledge_base_tool, general_chat_tool])
# 4. 构建丰富的 system prompt
system_prompt = """分析用户问题,判断是否需要查询专业知识库来回答。
以下是当前系统中所有可用的知识助手及其掌握的知识内容:
{assistants_info}
{last_intent_info}
根据以上助手信息、对话历史和用户问题的内容,判断:
1. 如果问题涉及以上任何助手掌握的知识领域,或涉及具体的公司信息、
规章制度、专业领域知识,应返回intent=knowledge_base。
2. 如果是日常问候、常识性问题、数学计算等无需特殊知识库的问题,
应返回intent=general_chat。
3. 特别注意对话历史上下文,判断当前问题是否是对前文的跟进。
""".format(assistants_info=assistants_info, last_intent_info=last_intent_info)
# 5. 调用 LLM,让它自行选择工具
response = llm_with_tools.invoke(messages)
# 6. 解析 tool_calls,获取意图
if hasattr(response, 'tool_calls') and response.tool_calls:
tool_call = response.tool_calls[0]
intent = tool_call["name"] # "knowledge_base" 或 "general_chat"
# 更新意图历史(保留最近5条)
_intent_history.append(intent)
if len(_intent_history) > 5:
_intent_history = _intent_history[-5:]
return intent
# 默认回退到一般对话
return "general_chat"
5.3 意图历史的巧妙运用
这个设计有一个非常精妙的地方——意图历史:
_intent_history = [] # 保留最近5次意图 def get_last_intent() -> Optional[str]: if _intent_history: return _intent_history[-1] return None
为什么需要它?考虑这个场景:
用户:公司有哪些福利政策? → 系统走知识库查询(knowledge_base)用户:还有什么其他的吗? → 这是个模糊的跟进问题
如果没有意图历史,LLM 可能会把"还有什么其他的吗"判断为 general_chat。但有了上一个意图 knowledge_base 作为上下文,LLM 就能正确判断这是一个知识库跟进问题。
这本质上是一种轻量级的对话状态追踪,不需要额外的状态管理框架,用最简单的列表就解决了上下文衔接问题。
六、统一处理模块:processor.py
这是整个后端流程的"总装车间"——将所有零件拼装成一个可用的处理器。
6.1 闭包工厂模式
def create_unified_processor(system_prompt, session_id):
# 在工厂函数内部创建所有依赖(闭包捕获)
memory = create_memory(session_id=session_id)
knowledge_chain = create_knowledge_chain(system_prompt)
general_chain = create_general_chain(system_prompt)
stats = {"total_queries": 0, "knowledge_base_queries": 0, "general_chat_queries": 0}
async def processor(question: str):
# 1. 加载并格式化历史
memory_vars = memory.load_memory_variables({})
chat_history = memory_vars.get("chat_history", [])
formatted_history = format_chat_history(chat_history)
# 2. 意图分类
intent = detect_intent(question, formatted_history)
# 3. 查询增强(知识库模式)
metadata = {"intent": intent, "original_query": question, "session_id": session_id}
if intent == "knowledge_base":
enhanced_query = enhance_query(question, formatted_history)
metadata["enhanced_query"] = enhanced_query
# 4. 发送元数据
yield {"type": "metadata", "data": metadata}
# 5. 选择链路并流式输出
full_answer = ""
if intent == "knowledge_base":
async for chunk in knowledge_chain(question, formatted_history):
full_answer += chunk
yield {"type": "content", "data": chunk}
else:
async for chunk in general_chain(question, formatted_history):
full_answer += chunk
yield {"type": "content", "data": chunk}
# 6. 保存本轮对话到记忆
memory.chat_memory.add_messages([
HumanMessage(content=question),
AIMessage(content=full_answer)
])
# 7. 发送结束标记
yield {"type": "end", "data": {"full_answer": full_answer, "metadata": metadata}}
# 附加工具函数
processor.get_stats = lambda: stats.copy()
processor.clear_memory = lambda: memory.clear()
return processor
6.2 为什么用闭包而不用类?
闭包工厂模式的优势:
-
依赖封闭:memory、chains、stats 都在闭包作用域内,外部无法意外修改
-
调用简洁:外部只需
processor = create_unified_processor()然后await processor(question),无需理解内部结构 -
会话隔离:每个 session_id 创建一个处理器实例,状态天然隔离
6.3 流式输出的消息协议
processor 的流式输出采用了结构化的消息协议:
| type | 含义 | data 内容 |
|---|---|---|
metadata |
元信息 | intent、original_query、enhanced_query 等 |
content |
回答片段 | LLM 生成的 token 级别文本 |
end |
结束标记 | full_answer 完整回答 + metadata |
这样前端可以:
-
收到
metadata时显示"正在查询知识库..."的提示 -
收到
content时逐字渲染到屏幕 -
收到
end时标记回答结束,展示完整内容
七、完整数据流演示
以一个真实请求为例,追踪数据在系统中的流转:
用户输入:"瓦尔登湖的风景怎么样?"
1. processor.py → memory.load_memory_variables()
→ 加载历史:无(首次对话)
2. processor.py → detect_intent("瓦尔登湖的风景怎么样?")
→ intent.py → get_assistants() → 获取6个助手信息
→ intent.py → llm_with_tools.invoke() → LLM判断为 knowledge_base
→ 返回 "knowledge_base"
3. processor.py → enhance_query()
→ "瓦尔登湖的风景怎么样?" (无需增强,查询已完整)
4. processor.py → knowledge_chain()
→ RAGFlow_chat() → MCP工具自动匹配"文学知识助手"
→ RAGFlow检索 → 返回瓦尔登湖相关文档片段
→ call_llm() → 拼接prompt → 流式生成回答
5. processor.py → memory.chat_memory.add_messages()
→ 保存 HumanMessage + AIMessage 到 MongoDB
日志输出完整反映了这个流程:
[INFO] 使用知识库处理问题: 瓦尔登湖的风景怎么样?... [INFO] 查询增强: '瓦尔登湖的风景怎么样?' -> '瓦尔登湖的风景怎么样?' [INFO] 分析用户意图 → auto_assistant_answer [INFO] 自动选择助手回答问题 [INFO] LLM选择助手 → 文学知识助理 [INFO] RAGFlow检索 → 回答长度: 386 [INFO] 调用LLM → 流式生成最终回答 [INFO] 总耗时: 27.08秒
八、核心设计理念总结
1. 分层解耦
配置、记忆、链路、意图、编排五层各司其职。每层都可以独立测试和替换——比如想换一个意图分类策略,只需修改 intent.py,其他层不受影响。
2. Function Calling 作为智能路由器
不用硬编码 if-else,让 LLM 自己决定走哪条链路。系统 prompt 中注入的知识助手列表,让 LLM 能根据实际可用的知识库来做判断,而不是拍脑袋。
3. 流式优先
全链路异步流式:RAGFlow 有流式 API,LLM 调用使用 astream,processor 通过 yield 逐块转发。用户不需要等待几十秒才能看到完整回答。
4. 闭包工厂模式
用函数闭包替代类,减少了样板代码,让作用域天然隔离会话状态。每个 create_unified_processor() 返回的 processor 函数都是一个自带记忆、链路和统计能力的完整处理单元。
5. 多层容错
-
MongoDB 挂了 → 回退内存存储
-
意图分类异常 → 默认走 general_chat
-
指定助手失败 → 自动切换到智能匹配
每一层都有兜底,不会因为局部故障导致整个服务不可用。
九、技术栈一览
| 组件 | 技术选型 |
|---|---|
| LLM 框架 | LangChain |
| 大模型 | 阿里百炼 · 通义千问 2.5 (14B) |
| 记忆存储 | MongoDB + langchain-mongodb |
| 知识库 | RAGFlow (API调用) |
| 异步流式 | Python AsyncGenerator |
| 工具注册 | LangChain @tool 装饰器 + bind_tools |
总结
本文完整拆解了基于 LangChain 的后端主流程构建,从 MongoDB 记忆管理、双链路设计、Function Calling 智能分发、到闭包工厂模式统一编排,形成了一套可落地、可扩展的企业级 RAG 对话系统架构。
这套架构的核心价值在于:将 LLM 的"智能"用到了系统的每个环节——不只是生成回答,还包括意图判断、助手匹配、查询增强。让 AI 不仅会回答问题,还会"思考"该怎么回答。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)