前言

在上一篇文章中,我们详细拆解了 RAGFlow 的 MCP 工具模块,实现了从用户自然语言输入到知识库助手的智能路由。但这些工具只是"散装的能力",要让整个后端真正运转起来,还需要一条完整的主流程——它要能记住对话上下文、能判断用户意图、能选择合适的处理链路并流式返回结果。

本文将继续拆解项目后端的核心模块:基于 LangChain 的主流程构建。我们将从记忆管理、业务链路、智能分发、统一处理四个层面,一步步还原整个后端的拼装过程。


一、整体架构总览

在 LangChain 主流程中,我们按"数据层 → 逻辑层 → 控制层"的层次来组织代码:

层级 文件 职责
配置层 config.py MongoDB 连接参数与环境变量管理
数据层 memory.py 对话记忆的创建、持久化与格式化
逻辑层 chains.py 知识库查询链、一般对话链(流式输出)
控制层 intent.py 基于 Function Calling 的意图智能分类
编排层 processor.py 闭包工厂模式,串联记忆→意图→链路→存储

调用链路:

用户输入 → processor.py (create_unified_processor)
         ├── memory.py (加载历史 → 格式化)
         ├── intent.py  (detect_intent → knowledge_base / general_chat)
         ├── chains.py  (选择链路 → 流式返回)
         └── memory.py  (保存本轮对话)

二、配置层:config.py

这是最基础的一层,负责管理 MongoDB 的连接参数:

load_dotenv()
​
# MongoDB配置
MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017/")
MONGODB_DB = os.getenv("MONGODB_DB", "conversation_db")
MONGODB_COLLECTION = os.getenv("MONGODB_COLLECTION", "chat_history")
​
# 其他配置参数
DEFAULT_SESSION_ID = "default"
SESSION_TTL_DAYS = 30  # 会话过期天数

设计要点:

  • 所有敏感信息通过 .env 文件管理,不硬编码

  • 提供合理的默认值,确保开发环境开箱即用

  • SESSION_TTL_DAYS 用于后续的会话过期清理策略


三、记忆数据层:memory.py

这是整个系统具备"连续对话"能力的基石。没有记忆的对话就像每次都和陌生人聊天,有了记忆才能实现真正的多轮交互。

3.1 创建记忆对象 — create_memory()

def create_memory(session_id: str = DEFAULT_SESSION_ID) -> ConversationBufferMemory:
    # 拼接完整的 MongoDB 连接字符串
    connection_string = f"{MONGODB_URI.rstrip('/')}/{MONGODB_DB}"
​
    # 创建基于 MongoDB 的消息历史持久化
    message_history = MongoDBChatMessageHistory(
        connection_string=connection_string,
        session_id=session_id,
        database_name=MONGODB_DB,
        collection_name=MONGODB_COLLECTION
    )
​
    # 创建 ConversationBufferMemory 记忆对象
    memory = ConversationBufferMemory(
        memory_key="chat_history",
        chat_memory=message_history,
        input_key="question",
        output_key="answer",
        return_messages=True
    )
    return memory

核心设计:

  • MongoDB 持久化:对话历史存储在 MongoDB 中,服务重启不丢失

  • session_id 隔离:不同用户/会话通过 session_id 区分,互不干扰

  • 优雅降级:MongoDB 连接失败时,自动回退到内存存储,保证服务可用

3.2 格式化对话历史 — format_chat_history()

def format_chat_history(messages: List[Any]) -> str:
    formatted = ""
    for msg in messages:
        if isinstance(msg, HumanMessage):
            formatted += f"用户: {msg.content}\n"
        elif isinstance(msg, AIMessage):
            formatted += f"助手: {msg.content}\n"
    return formatted

将 LangChain 的 HumanMessage / AIMessage 对象转换为可读的文本格式,方便注入到 LLM 的 prompt 中作为上下文。


四、业务链路层:chains.py

这一层定义了两条核心处理链路,都支持异步流式输出——用户无需等待完整回答,可以逐字看到结果,体验类似 ChatGPT。

4.1 LLM 流式调用 — call_llm()

async def call_llm(question: str, prompt: str) -> AsyncGenerator[str, None]:
    llm = ChatOpenAI(
        model=os.getenv("LLM_MODEL", "qwen2.5-14b-instruct"),
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url=os.getenv("DASHSCOPE_BASE_URL"),
        streaming=True  # 启用流式模式
    )
​
    chat_prompt = ChatPromptTemplate.from_messages([
        ("system", prompt),
        ("human", question)
    ])
​
    chain = chat_prompt | llm
​
    # 使用 astream 进行异步流式调用,逐块产出
    async for chunk in chain.astream({}):
        if hasattr(chunk, 'content') and chunk.content:
            yield chunk.content

这里使用 AsyncGenerator 实现了真正的流式输出——每生成一个 token 就立刻返回,而不是等全部生成完毕。

4.2 知识库查询链 — create_knowledge_chain()

这是工厂函数,返回一个异步处理函数 process_with_knowledge

def create_knowledge_chain(system_prompt: str):
    async def process_with_knowledge(question: str, history: str):
        # 1. 查询增强:结合历史上下文重写问题
        enhanced_question = enhance_query(question, history)
​
        # 2. 调用 RAGFlow MCP 获取知识库检索结果
        rag_answer = RAGFlow_chat(enhanced_question)
​
        # 3. 拼接完整 prompt:系统提示 + 对话历史 + 知识库结果 + 用户问题
        prompt = system_prompt + "\n\n"
        if history:
            prompt += f"对话历史:\n{history}\n\n"
        prompt += f"知识库检索结果:\n{rag_answer}\n\n"
        prompt += f"用户问题: {question}"
​
        # 4. 流式调用 LLM 生成最终回答
        async for chunk in call_llm(question, prompt):
            yield chunk
​
    return process_with_knowledge

处理流程:查询增强 → RAGFlow检索 → Prompt拼接 → LLM流式生成。其中查询增强是一个容易被忽视但非常关键的环节——它能将"那它的风景怎么样"这种依赖上下文的省略表达,改写为"瓦尔登湖的风景怎么样"这样完整的独立查询。

4.3 一般对话链 — create_general_chain()

结构更简单,不经过知识库,直接用 LLM 回答:

def create_general_chain(system_prompt: str):
    async def process_general_chat(question: str, history: str):
        prompt = system_prompt + "\n\n"
        if history:
            prompt += f"对话历史:\n{history}\n\n"
        prompt += f"用户问题: {question}"
​
        async for chunk in call_llm(question, prompt):
            yield chunk
​
    return process_general_chat

两条链路的设计让系统有了"因地制宜"的能力——需要查资料时走知识库链路,日常闲聊时直接对话,效率和体验兼顾。


五、智能分发层:intent.py

这是整个系统的"大脑",负责判断每个用户问题应该走哪条链路。

5.1 核心思想:用 Function Calling 做路由

传统做法是用关键词匹配或规则引擎来判断意图,但面对千变万化的自然语言,规则永远不够用。这里采用了 LangChain 的 Function Calling(MCP) 机制——让 LLM 自己选择应该调用哪个工具:

from langchain_core.tools import tool
​
@tool("knowledge_base")
def knowledge_base_tool(question: str, chat_history: str = "") -> str:
    """当用户问题需要查询知识库时调用此工具。适用于涉及公司信息、规章制度、
    专业领域知识、特定人物或事件等需要查询知识库的内容。"""
    return "knowledge_base"
​
@tool("general_chat")
def general_chat_tool(question: str, chat_history: str = "") -> str:
    """当用户问题是日常问候、常识性问题、数学计算、个人意见等
    无需特殊知识库时调用此工具。"""
    return "general_chat"

两个工具并不真正执行操作——它们只是"信号",告诉系统该走哪条链路。真正的工作由 chains.py 中的处理链完成。

5.2 意图检测主流程 — detect_intent()

def detect_intent(question: str, chat_history: str = "") -> str:
    # 1. 获取助手列表(让 LLM 知道有哪些知识库可用)
    assistants_info = get_assistants()
​
    # 2. 获取上一个意图(用于处理上下文衔接问题)
    last_intent = get_last_intent()
​
    # 3. 创建 LLM 客户端并绑定工具
    llm = ChatOpenAI(model=model, api_key=api_key, base_url=base_url)
    llm_with_tools = llm.bind_tools([knowledge_base_tool, general_chat_tool])
​
    # 4. 构建丰富的 system prompt
    system_prompt = """分析用户问题,判断是否需要查询专业知识库来回答。
​
以下是当前系统中所有可用的知识助手及其掌握的知识内容:
{assistants_info}
​
{last_intent_info}
​
根据以上助手信息、对话历史和用户问题的内容,判断:
1. 如果问题涉及以上任何助手掌握的知识领域,或涉及具体的公司信息、
   规章制度、专业领域知识,应返回intent=knowledge_base。
2. 如果是日常问候、常识性问题、数学计算等无需特殊知识库的问题,
   应返回intent=general_chat。
3. 特别注意对话历史上下文,判断当前问题是否是对前文的跟进。
""".format(assistants_info=assistants_info, last_intent_info=last_intent_info)
​
    # 5. 调用 LLM,让它自行选择工具
    response = llm_with_tools.invoke(messages)
​
    # 6. 解析 tool_calls,获取意图
    if hasattr(response, 'tool_calls') and response.tool_calls:
        tool_call = response.tool_calls[0]
        intent = tool_call["name"]  # "knowledge_base" 或 "general_chat"
​
        # 更新意图历史(保留最近5条)
        _intent_history.append(intent)
        if len(_intent_history) > 5:
            _intent_history = _intent_history[-5:]
​
        return intent
​
    # 默认回退到一般对话
    return "general_chat"

5.3 意图历史的巧妙运用

这个设计有一个非常精妙的地方——意图历史

_intent_history = []  # 保留最近5次意图
​
def get_last_intent() -> Optional[str]:
    if _intent_history:
        return _intent_history[-1]
    return None

为什么需要它?考虑这个场景:

用户:公司有哪些福利政策? → 系统走知识库查询(knowledge_base)用户:还有什么其他的吗? → 这是个模糊的跟进问题

如果没有意图历史,LLM 可能会把"还有什么其他的吗"判断为 general_chat。但有了上一个意图 knowledge_base 作为上下文,LLM 就能正确判断这是一个知识库跟进问题。

这本质上是一种轻量级的对话状态追踪,不需要额外的状态管理框架,用最简单的列表就解决了上下文衔接问题。


六、统一处理模块:processor.py

这是整个后端流程的"总装车间"——将所有零件拼装成一个可用的处理器。

6.1 闭包工厂模式

def create_unified_processor(system_prompt, session_id):
    # 在工厂函数内部创建所有依赖(闭包捕获)
    memory = create_memory(session_id=session_id)
    knowledge_chain = create_knowledge_chain(system_prompt)
    general_chain = create_general_chain(system_prompt)
​
    stats = {"total_queries": 0, "knowledge_base_queries": 0, "general_chat_queries": 0}
​
    async def processor(question: str):
        # 1. 加载并格式化历史
        memory_vars = memory.load_memory_variables({})
        chat_history = memory_vars.get("chat_history", [])
        formatted_history = format_chat_history(chat_history)
​
        # 2. 意图分类
        intent = detect_intent(question, formatted_history)
​
        # 3. 查询增强(知识库模式)
        metadata = {"intent": intent, "original_query": question, "session_id": session_id}
        if intent == "knowledge_base":
            enhanced_query = enhance_query(question, formatted_history)
            metadata["enhanced_query"] = enhanced_query
​
        # 4. 发送元数据
        yield {"type": "metadata", "data": metadata}
​
        # 5. 选择链路并流式输出
        full_answer = ""
        if intent == "knowledge_base":
            async for chunk in knowledge_chain(question, formatted_history):
                full_answer += chunk
                yield {"type": "content", "data": chunk}
        else:
            async for chunk in general_chain(question, formatted_history):
                full_answer += chunk
                yield {"type": "content", "data": chunk}
​
        # 6. 保存本轮对话到记忆
        memory.chat_memory.add_messages([
            HumanMessage(content=question),
            AIMessage(content=full_answer)
        ])
​
        # 7. 发送结束标记
        yield {"type": "end", "data": {"full_answer": full_answer, "metadata": metadata}}
​
    # 附加工具函数
    processor.get_stats = lambda: stats.copy()
    processor.clear_memory = lambda: memory.clear()
​
    return processor

6.2 为什么用闭包而不用类?

闭包工厂模式的优势:

  • 依赖封闭:memory、chains、stats 都在闭包作用域内,外部无法意外修改

  • 调用简洁:外部只需 processor = create_unified_processor() 然后 await processor(question),无需理解内部结构

  • 会话隔离:每个 session_id 创建一个处理器实例,状态天然隔离

6.3 流式输出的消息协议

processor 的流式输出采用了结构化的消息协议:

type 含义 data 内容
metadata 元信息 intent、original_query、enhanced_query 等
content 回答片段 LLM 生成的 token 级别文本
end 结束标记 full_answer 完整回答 + metadata

这样前端可以:

  • 收到 metadata 时显示"正在查询知识库..."的提示

  • 收到 content 时逐字渲染到屏幕

  • 收到 end 时标记回答结束,展示完整内容


七、完整数据流演示

以一个真实请求为例,追踪数据在系统中的流转:

用户输入:"瓦尔登湖的风景怎么样?"
​
1. processor.py → memory.load_memory_variables()
   → 加载历史:无(首次对话)
​
2. processor.py → detect_intent("瓦尔登湖的风景怎么样?")
   → intent.py → get_assistants() → 获取6个助手信息
   → intent.py → llm_with_tools.invoke() → LLM判断为 knowledge_base
   → 返回 "knowledge_base"
​
3. processor.py → enhance_query()
   → "瓦尔登湖的风景怎么样?" (无需增强,查询已完整)
​
4. processor.py → knowledge_chain()
   → RAGFlow_chat() → MCP工具自动匹配"文学知识助手"
   → RAGFlow检索 → 返回瓦尔登湖相关文档片段
   → call_llm() → 拼接prompt → 流式生成回答
​
5. processor.py → memory.chat_memory.add_messages()
   → 保存 HumanMessage + AIMessage 到 MongoDB

日志输出完整反映了这个流程:

[INFO] 使用知识库处理问题: 瓦尔登湖的风景怎么样?...
[INFO] 查询增强: '瓦尔登湖的风景怎么样?' -> '瓦尔登湖的风景怎么样?'
[INFO] 分析用户意图 → auto_assistant_answer
[INFO] 自动选择助手回答问题
[INFO] LLM选择助手 → 文学知识助理
[INFO] RAGFlow检索 → 回答长度: 386
[INFO] 调用LLM → 流式生成最终回答
[INFO] 总耗时: 27.08秒

八、核心设计理念总结

1. 分层解耦

配置、记忆、链路、意图、编排五层各司其职。每层都可以独立测试和替换——比如想换一个意图分类策略,只需修改 intent.py,其他层不受影响。

2. Function Calling 作为智能路由器

不用硬编码 if-else,让 LLM 自己决定走哪条链路。系统 prompt 中注入的知识助手列表,让 LLM 能根据实际可用的知识库来做判断,而不是拍脑袋。

3. 流式优先

全链路异步流式:RAGFlow 有流式 API,LLM 调用使用 astream,processor 通过 yield 逐块转发。用户不需要等待几十秒才能看到完整回答。

4. 闭包工厂模式

用函数闭包替代类,减少了样板代码,让作用域天然隔离会话状态。每个 create_unified_processor() 返回的 processor 函数都是一个自带记忆、链路和统计能力的完整处理单元。

5. 多层容错

  • MongoDB 挂了 → 回退内存存储

  • 意图分类异常 → 默认走 general_chat

  • 指定助手失败 → 自动切换到智能匹配

每一层都有兜底,不会因为局部故障导致整个服务不可用。


九、技术栈一览

组件 技术选型
LLM 框架 LangChain
大模型 阿里百炼 · 通义千问 2.5 (14B)
记忆存储 MongoDB + langchain-mongodb
知识库 RAGFlow (API调用)
异步流式 Python AsyncGenerator
工具注册 LangChain @tool 装饰器 + bind_tools

总结

本文完整拆解了基于 LangChain 的后端主流程构建,从 MongoDB 记忆管理、双链路设计、Function Calling 智能分发、到闭包工厂模式统一编排,形成了一套可落地、可扩展的企业级 RAG 对话系统架构。

这套架构的核心价值在于:将 LLM 的"智能"用到了系统的每个环节——不只是生成回答,还包括意图判断、助手匹配、查询增强。让 AI 不仅会回答问题,还会"思考"该怎么回答。


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐