一、中间件

1、ClarificationMiddleware

(1)clarification_middleware.py ,分析下哪些场景会用到这个中间件,具体在整个消息中的流程是怎么运作的,如何阻塞目前lead_agent的流程,以一种什么形式向前端发送消息,收到前端反馈后整个流程又是怎么样的,给出具体代码

这里的格式包括:

  • 在上下文或问题前,根据类型添加icon
  • 每一个选项前添加序号

这里的ToolMessage和Command来自:

from langchain_core.messages import ToolMessage

from langgraph.types import Command

        # Create a ToolMessage with the formatted question
        # This will be added to the message history
        tool_message = ToolMessage(
            content=formatted_message,
            tool_call_id=tool_call_id,
            name="ask_clarification",
        )

        # Return a Command that:
        # 1. Adds the formatted tool message
        # 2. Interrupts execution by going to __end__
        # Note: We don't add an extra AIMessage here - the frontend will detect
        # and display ask_clarification tool messages directly
        return Command(
            update={"messages": [tool_message]},
            goto=END,
        )

来自AI的回复,可能是一些text块。这里值得借鉴。

(2)分析ToolMessage和Command的数据结构,为什么说goto=END能阻断执行

Command 是 LangGraph 中用于控制图执行流的指令对象。

  • 结构 :

    • update (dict): 要合并到图状态(State)中的数据。
      • 这里 {“messages”: [tool_message]} 将上述构造的 ToolMessage 写入对话历史。
    • goto (str): 指定下一步跳转到的节点名称。
      • END (或 end ) : 特殊节点,表示 立即终止 当前图的执行。
  • 为什么 goto=END 能阻断执行 :

    1. 正常流程 : Agent 节点生成工具调用 -> Tools 节点执行工具 -> Agent 节点接收结果并继续推理(循环)。
    2. 中间件拦截 :
      • 中间件在工具执行 前 截获调用。
      • 它构造了一个“假的”执行结果(ToolMessage),内容是澄清问题。
      • 它返回 Command(goto=END) 。
    3. 图执行器行为 :
      • 收到 update ,将 ToolMessage 存入状态。
      • 收到 goto=END ,执行器认为当前 Run 已完成,停止调度后续节点(即不会再次调用 Agent 节点去生成对 ToolMessage 的回复)。
    4. 结果 : 整个 Run 结束。外部调用者(ChannelManager)收到最终状态,发现最后一条是 ToolMessage(澄清问题),于是将其作为响应发送给用户,并等待用户的新输入开启下一个 Run。
      代码参考 :
  • 中间件处理: clarification_middleware.py:L116-L129

  • LangGraph 机制: goto=END 是 LangGraph 状态机终止的标准信号。

(3)对于澄清类消息,用户反馈后,服务端如何保存上下文能够继续对话的,langgraph的图也是新开的吧? 有哪些上一轮对话中的信息会参与到用户反馈后的执行过程中,给出对应的文件和代码位置

2、ThreadDataMiddleware

(1)线程数据是如何保存到ThreadState中的,又是什么时候恢复线程数据的。以及是什么时候触发了Checkpointer的保存和加载,请给出调用的具体文件和代码

沙盒可以仅由路径区分?

仅保存了这三个路径

中间件继承自langchain

steam流式对话入口,接收用户消息,返回langgraph的SSE协议

这个地方,先接收response,再yield流式事件,

values为文件内容,end为结束事件

3、调用时机

(1)create_agent中传入了多个中间件,什么时机触发中间件的调用,after_agent 和before_agent又是什么时候执行的呢? 请从收到消息和回复过程的链路角度,给出具体的文件和代码位置

工具调用异常应该是在dangling进行处理。

中间件顺序要求,比如需要先创建线程id,需要先做总结历史,做文件上传,澄清要在最后

4、TitleMiddleware

首轮对话,使用轻量的小模型生成标题,这里的模型配置从哪里读取?生成的标题可以给客户端存储分类。

5、SubagentLimitMiddleware

仅保留限定子任务数目

6、MemoryMiddlewareState

  • after_agent是等整个流程执行完,生成了给用户的回复的时候触发。仅保存用户问题,模型的最终回复,中间的工具执行等不保存。
  • 中间件的state继承自langchain的AgentState

这里的memory_queue为全局单例

如果队列中有一个该线程的相关的,则去掉,并用新的上下文替代。

通过定时器,定期处理队列中的所有记忆,包括不同用户的

  • 获取有效的对话,去掉uploads等相关信息,控制对话长度
  • 提示词使用当前记忆和对话信息进行更新
  • 调用小模型进行处理,可以通过memory config中配置对应的模型。
  • 最后通过factory中的create_chat_model创建该模型的单例。

get_memory_data:优先从cache(内存缓存)中by agent name(每个用户可以有不同的agent)获取记忆,如果记忆文件中的时间比这个新,则从文件中读取,更新缓存。

对现有内存进行更新,包括去掉事实,增加事实等。

7、ViewImageMiddleware和UploadsMiddleware

两个类似,都需要将文件或图片追加到上一条消息,组装为完整的图文问题或文件文本描述。将原本的只含有文字+(参数如图片文件路径)的消息,转为含有完整图片base64或文件描述(路径、名称、size、扩展名)。

8、TodoMiddleware

(1)分析下这个文件,作用和调用时机,是否会直接给前端展示

追加了一条人类消息,提醒todolist,而且要求任意时候有代办事项变动,要调用write_todos

(2)分析什么时候写入todo list,todo中间件中的state tools属性是什么时候设置的

这个工具在agent.py文件中有自己的提示词和工具描述

二、消息和命令

1、ToolMessage

from langchain_core.messages import ToolMessage

三、记忆和检索

1、目前对技能和长久记忆的检索是怎么做的,是否用到了向量检索,如果用到了向量化和检索又实用了什么模型和机制,给出具体代码

结论:两者均未使用向量检索
Skills(技能)检索
完全没有向量化,检索方式如下:

加载链:

四、工具调用

五、前端最简洁版实现

1、webui的sse协议是通过哪种方式与服务端建立连接的,如何进行通讯,有哪些事件,这些事件是如何产生的

六、demo启动

七、langchain和langgraph学习

项目中有使用到这两个框架的部分

from langchain.agents import create_agent
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.runnables import RunnableConfig

from langchain.agents import AgentState


from langchain.agents.middleware import AgentMiddleware

from langchain.agents.middleware.types import ModelCallResult, ModelRequest, ModelResponse

from langchain.agents.middleware import TodoListMiddleware
from langchain.agents.middleware.todo import PlanningState, Todo
from langchain.tools import tool
from langchain_core.tools import BaseTool
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.chat_models import BaseChatModel

from langchain_core.tracers.langchain import LangChainTracer

from langchain.tools import InjectedToolCallId, ToolRuntime, tool
from langgraph.types import Checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

from langgraph.graph import END
from langgraph.prebuilt.tool_node import ToolCallRequest
from langgraph.types import Command

from langgraph.runtime import Runtime

from langgraph_sdk import get_client

from langgraph.typing import ContextT


from langgraph.config import get_stream_writer

可以看到大量使用了关于langchain的组件,相关组件可以深入学习下

1、模型配置化:

  • 对于工具中用到的模型(如投研)都可以进行配置,在yaml中配置即可。
  • 调用factory中的create_chat_model创建单例,并进行思考模式等配置
  • title、summarization、memory(超过多少token可以配置,保持10轮)支持分别配置,models中支持配置多个。可以指定,这种模型管理还是非常方便的

2、可以通过中间件添加在模型调用、工具调用、agent(整个流程)开始或完成前后的操作,根据添加顺序依次执行。

3、澄清中间件值得参考

检测到澄清工具后,将工具调用的参数转为格式化的澄清文字消息,组装为可给用户看的工具回复消息。并将流程路由到结束,并通过goto=END终止agent图。

4、TodoMiddleware可模拟构造一条人类消息。在模型执行之前添加todolist,值得参考

5、TodoListMiddleware(langchain.agents.middleware自带的),在before_agent中可以注入write_todos工具,也可以参考,相当于为某个流程定制化注入工具。

6、沙箱。可以在config.yaml中配置本地沙箱,此部分代码不用改。

7、Checkpointer存储机制。将thread_state保存,新请求来的时候根据thread_id加载恢复。

代办:

1、Checkpointer需要存储到db

问题:

(1)在agent文件中单独定义todo list的系统提示词和工具描述,感觉不太友好。

(2)langgraph中有humaninloop机制,语控场景“我有一笔工资到账,请结合我的持仓、风险等级,给出投资建议,可以推荐理财产品”。 deerflow需要调用语控接口(查询最近一笔工资),实际执行是由客户端执行的,服务端必然存在等待和中断,langchain等语控执行完后如何继续进行下一步规划和查询?

(3)langchain其他中间件学习

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐