持久性

检查点(Checkpointing)是 LangGraph 持久性的核心机制。它允许你在图执行过程中的任何点保存状态,并在需要时恢复。

核心概念

  • 检查点(Checkpoint): 图状态的快照,它记录了工作流在某一时刻的完整状态(State),包括所有的变量、对话历史和执行进度

  • 线程(Thread): 用于访问检查点的唯一标识

  • 检查点保存器(Checkpointer): 负责保存和恢复状态的组件

    线程Thread

    线程是检查点保存器保存的每个检查点分配的唯一 ID 或线程标识符,可以理解为线程是一个带唯一 ID 的用来存历史状态、记忆、对话上下文的会话记录容器

    当使用检查点调用图表时,必须指定thread_id作为configurable配置部分的一部分:

    # 调用图时必须指定 thread_id
    config = {"configurable": {"thread_id": "unique_thread_id"}}
    result = graph.invoke(input_data, config=config)

    这样的作用是告诉 LangGraph 的 Checkpointer(检查点保存器)去加载或创建一个特定的会话历史

    如果是thread_id:开启一个全新的、记忆为空白的会话;如果是旧的thread_id:加载该会话的最后一次 Checkpoint,AI 就能“记住”之前聊过的所有内容。

    特点

    • 每个线程代表一个独立的对话或执行上下文

    • 线程允许在图执行后访问图的状态

    • 支持多个并发线程

    from langchain.chat_models import init_chat_model
    from langgraph.checkpoint.memory import InMemorySaver
    from langgraph.graph import StateGraph, MessagesState, START, END
    import os
    from dotenv import load_dotenv
    
    load_dotenv()
    
    class MyState(MessagesState):
        result: str
    
    llm = init_chat_model(api_key=os.getenv("DASHSCOPE_API_KEY"),
                          base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                          model_provider="openai",
                          model='MiniMax-M2.1')
    
    
    async def process_message(state: MessagesState):
        response = await llm.ainvoke(state["messages"])
        return {"messages": response}
    
    async def optimize_message(state: MessagesState):
        messages = state["messages"] + [{"role": "system", "content": "请用幽默的形式回复用户"}]
        response = await llm.ainvoke(messages)
        return {"messages": response}
    
    
    builder = StateGraph(state_schema=MyState)
    builder.add_node("process_message", process_message)
    builder.add_node("optimize_message", optimize_message)
    builder.add_edge(START, "process_message")
    builder.add_edge("process_message", "optimize_message")
    builder.add_edge("optimize_message", END)
    
    
    
    # 没有使用持久性
    # graph = builder.compile()
    
    # input_message = {"role": "user", "content": "你好呀!我的名字叫jran"}
    # result = graph.invoke({"messages": [input_message]})
    # result["messages"][-1].pretty_print()
    #
    # input_message = {"role": "user", "content": "我的名字叫什么?"}
    # result = graph.invoke({"messages": [input_message]})
    # result["messages"][-1].pretty_print()
    
    async def main():
        # 创建内存检查点进行持久化存储
        checkpointer = InMemorySaver()
        graph = builder.compile(checkpointer=checkpointer)
        config = {"configurable": {"thread_id": "user_123"}}
    
        input_message = {"role": "user", "content": "你好呀!我的名字叫jran"}
        result = await graph.ainvoke({"messages": [input_message]}, config)
        result["messages"][-1].pretty_print()
    
    
        input_message = {"role": "user", "content": "我的名字叫什么?"}
        result = await graph.ainvoke({"messages": [input_message]}, config)
        result["messages"][-1].pretty_print()
    
    if __name__ == '__main__':
        import asyncio
    
        asyncio.run(main())
    • graph = builder.compile(checkpointer=InMemorySaver())表明checkpointer是一个巨大的会话仓库,它里面存储着所有thread_id的所有checkpoint

    • config是一个标识符,目的是根据里面的thread_id找到对应的checkpoint

    • 每次调用这样的代码:graph.invoke(input, config),系统就会根据thread_id找到所有的checkpoint并加载最新的checkpoint,运行完过后把现在的对话记录成新的checkpoint放在checkpointer里面,也就是checkpoint随对话轮次数增加而增加

    检查点checkpoint

    检查点是在每个超级步骤中保存的图状态的快照,由StateSnapshot具有以下关键属性的对象表示:

    • config:与此检查点相关的配置。

    • metadata:与此检查点相关的元数据。

    • values:当前State的值。也就是图执行到目前为止,所有变量的状态值(如 "messages", "steps", "results" 等字段的值)。

    • next:图中接下来要执行的节点名称的元组。

    • tasks:包含具体要执行的任务的详细信息,用PregeTask类型表示。比next更详细

    checkpoint的作用
    功能 描述
    🌟 容错恢复 如果执行中断(如容器崩溃、任务超时),可以从上次保存的状态恢复,不用重跑整个流程
    💾 状态追踪/审计 可以记录每一步节点执行时的中间状态,方便 Debug、回溯和监控
    🔁 实现有状态的异步/长流程图 对于多轮对话、多阶段任务,检查点使 LangGraph 支持状态持久化和任务跟踪

    本质上来讲,checkpoint就是把某个节点运行完后的State存储起来(数据库或者磁盘),如果出现中断,加载这个State从后续节点开始运行。

    更新state

    graph.update_state()方法可以让我们在不经过节点的条件下更新state里的值

    更新对应状态,就是手动修改线程里的状态数据。它不会删除历史,只会生成新的检查点。你可以改消息、改工具结果、加用户信息、删内容、覆盖内容。更新完之后,下一次 run 就从新状态继续执行。这是实现人工介入、调试、纠错的核心功能

    graph.update_state()依赖于cheakpoint机制,因此图在编译时必须传入checkpointer;本质上graph.update_state()就是创建一个新的checkpoint

    方法参数

    config

    • 必须包含thread_id指定要更新的线程

    • 可选包含checkpoint_id来分叉选定的检查点

    values

    • 用于更新状态的值

    • 更新会传递给 reducer 函数(如果定义了)

    • 没有 reducer 的通道会被覆盖

    as_node

    • 可选参数,指定更新来自哪个节点

    • 影响下一步执行的节点

    记忆存储

    store接口

    如果说chekpoint是“短期记忆”(InMemorysaver),即在同一个thread_id下面保存上下文,那么store就是“长期记忆”(InMemoryStore),记录用户的身份、偏好等等

    • 检查点保存器单独无法跨线程共享信息

    • Store 接口解决了这个问题

    • 可以在所有聊天对话中保留用户特定信息

    用法

    每种内存类型都是一个具有特定属性的 Python 类(Item)。我们可以通过上述转换将其作为字典访问dict。它具有以下属性:

    • value:此内存的值(本身就是一个字典)

    • key:此命名空间中此内存的唯一键(随便起名字)

    • namespace:字符串列表,此内存类型的命名空间(随便起名字)

    • created_at:此内存创建的时间戳

    • updated_at:此内存更新的时间戳

    核心操作

    • 存储数据:store.put(namespace, key, value)

    • 读取数据:store.get(namespace, key)

    • 搜索数据:store.search(namespace, query)

    • 删除数据:store.delete(namespace, key)

    from typing import Annotated, List
    from typing_extensions import TypedDict
    from operator import add
    import uuid
    from langgraph.graph import StateGraph, START, END
    from langgraph.checkpoint.memory import InMemorySaver
    from langgraph.store.memory import InMemoryStore
    from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
    from langchain_core.runnables import RunnableConfig
    from langgraph.store.base import BaseStore
    
    from langchain.chat_models import init_chat_model
    import os
    from dotenv import load_dotenv
    
    load_dotenv()
    
    # 初始化大模型
    llm = init_chat_model(api_key=os.getenv("DASHSCOPE_API_KEY"),
                          base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                          model_provider="openai",
                          model='qwen-plus-2025-01-25')
    
    
    # 定义状态结构
    class MessagesState(TypedDict):
        messages: Annotated[List[BaseMessage], add]
    
    
    # 创建检查点保存器和内存存储
    checkpointer = InMemorySaver()
    in_memory_store = InMemoryStore()
    
    
    # 聊天机器人节点  *代表后面的参数必须使用显示写出参数名称  store=in_memory_store
    def chatbot(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
        """主聊天机器人节点,处理用户消息并生成回复"""
    
        # 获取用户ID和最新消息
        user_id = config["configurable"]["user_id"]
        last_message = state["messages"][-1]
    
        # 定义内存命名空间
        namespace = (user_id, "memories")
    
        # 简单的聊天逻辑
        user_input = last_message.content.lower()
        # 将聊天历史获取并组装提示词
        memories = store.search(namespace)
        memory_text = "\n".join(m.value["memory"] for m in memories)
        prompt = f"请参考聊天记录:{memory_text}\n\nHuman: {user_input}\nAI:"
    
        response = llm.invoke(prompt).content
    
        # 存储对应的问题和答案
        memory = f"问题:{user_input} --- 答案:{response}"
        memory_id = str(uuid.uuid4())
        store.put(namespace, memory_id, {"memory": memory})
        # 返回AI消息
        return {"messages": [AIMessage(content=response)]}
    
    
    # 创建图
    def create_persistent_graph():
        """创建持久化的聊天机器人图"""
    
        # 创建状态图
        workflow = StateGraph(MessagesState)
        # 添加节点
        workflow.add_node("chatbot", chatbot)
        # 添加边
        workflow.add_edge(START, "chatbot")
        workflow.add_edge("chatbot", END)
    
        # 编译图,使用检查点保存器和存储
        graph = workflow.compile(checkpointer=checkpointer, store=in_memory_store)
    
        return graph
    
    
    # 工具函数:显示状态历史
    def show_state_history(graph, config):
        """显示状态历史"""
        print("\n=== 状态历史 ===")
        history = graph.get_state_history(config)
        for i, snapshot in enumerate(history):
            print(f"\n步骤 {i}:")
            print(f"  配置: {snapshot.config}")
            print(f"  值: {snapshot.values}")
            print(f"  下一步: {snapshot.next}")
            print(f"  元数据: {snapshot.metadata}")
    
    
    # 工具函数:显示存储的记忆
    def show_memories(store, user_id):
        """显示用户的所有记忆"""
        print(f"\n=== 用户 {user_id} 的记忆 ===")
        namespace = (user_id, "memories")
        memories = store.search(namespace)
    
        if memories:
            for memory in memories:
                print(f"记忆ID: {memory.key}")
                print(f"内容: {memory.value}")
                print(f"创建时间: {memory.created_at}")
                print(f"更新时间: {memory.updated_at}")
                print("---")
        else:
            print("没有找到记忆")
    
    
    # 主程序
    def main():
        # 创建图
        graph = create_persistent_graph()
    
        # 用户配置
        user_id = "user_123"
        thread_id = "conversation_1"
    
        config = {
            "configurable": {
                "thread_id": thread_id,
                "user_id": user_id
            }
        }
    
        print("=== LangGraph 持久化聊天机器人 ===")
        print("输入 'quit' 退出,'history' 查看状态历史,'memories' 查看记忆")
    
        while True:
            user_input = input("\n用户: ").strip()
    
            if user_input.lower() == 'quit':
                break
            elif user_input.lower() == 'history':
                show_state_history(graph, config)
                continue
            elif user_input.lower() == 'memories':
                show_memories(in_memory_store, user_id)
                continue
    
            # 创建用户消息
            initial_state = {
                "messages": [HumanMessage(content=user_input)]
            }
    
            # 运行图
            try:
                result = graph.invoke(initial_state, config)
    
                # 显示AI回复
                ai_message = result["messages"][-1]
                print(f"AI: {ai_message.content}")
    
            except Exception as e:
                print(f"错误: {e}")
    
        print("\n=== 最终状态 ===")
        final_state = graph.get_state(config)
        print(f"最终状态: {final_state.values}")
    
        print("\n=== 所有记忆 ===")
        show_memories(in_memory_store, user_id)
    
    
    # 演示不同线程间的记忆共享
    def demo_cross_thread_memory():
        """演示跨线程记忆共享"""
        print("\n=== 跨线程记忆共享演示 ===")
    
        graph = create_persistent_graph()
        user_id = "user_456"
    
        # 第一个对话线程
        config1 = {
            "configurable": {
                "thread_id": "thread_1",
                "user_id": user_id
            }
        }
    
        print("线程1 - 建立记忆:")
        result1 = graph.invoke({
            "messages": [HumanMessage(content="我叫jran,我喜欢音乐")]
        }, config1)
        print(f"AI: {result1['messages'][-1].content}")
    
        # 第二个对话线程(相同用户)
        config2 = {
            "configurable": {
                "thread_id": "thread_2",
                "user_id": user_id
            }
        }
    
        print("\n线程2 - 访问记忆:")
        result2 = graph.invoke({
            "messages": [HumanMessage(content="你还记得我吗?")]
        }, config2)
        print(f"AI: {result2['messages'][-1].content}")
    
        # 显示共享的记忆
        show_memories(in_memory_store, user_id)
    
    
    if __name__ == "__main__":
        # 运行主程序
        main()
    
        # 演示跨线程记忆共享
        demo_cross_thread_memory()

    注意:命名空间必须是元组tuple,因为元组是不可改变的。

    namespace = ("users", "user_123", "preferences")
    namespace = ("global",) 

    这两种都是对的

    Memory记忆

    记忆对agent来说至关重要,短期记忆可以让模型记住与用户的交互记录,使模型可以从反馈中学习与完善;长期记忆可以使模型记住用户的相关信息,为用户提供针对性的解答方案

    • 短期记忆(或线程范围的记忆)通过维护会话中的消息历史记录来跟踪正在进行的对话。LangGraph 将短期记忆作为代理状态的一部分进行管理。状态使用检查点持久化到数据库中,以便线程可以随时恢复。短期记忆会在图被调用或某个步骤完成时更新,并且在每个步骤开始时读取状态。

    • 长期记忆跨会话存储用户特定或应用程序级别的数据,并在对话线程之间共享。它可以在任何时间、任何线程中调用。记忆的作用域是任何自定义命名空间,而不仅仅是单个线程 ID。LangGraph 提供存储,方便您保存和调用长期记忆。

    通常在生产中,短期记忆用redis + 长期记忆用postgres

    管理短期记忆

    启用短期记忆后,长对话可能会超出 LLM 的上下文窗口。常见的解决方案如下:

    • 修剪消息:删除前 N 条或后 N 条消息(在调用 LLM 之前)

    • 从 LangGraph 状态中永久删除消息

    • 总结消息:总结历史记录中较早的消息,并用摘要替换它们

    • 管理检查点以存储和检索消息历史记录

    • 自定义策略(例如,消息过滤等)

    修剪消息

    大多数 LLM 都有一个最大支持的上下文窗口(以 token 为单位)。决定何时截断消息的一种方法是计算消息历史记录中的 token 数量,并在接近该限制时进行截断。

    trim_messages()是 LangChain/LangGraph 提供的一个消息裁剪工具函数,专门用于在将对话历史发送给大模型之前,智能地删减掉一部分内容,以确保消息总长度不超过模型的上下文窗口限制

    • max_tokens:目标最大token数

    • strategy:保留哪部分信息;“last”表示保留最近的对话

    • token_counter:用于计数的函数或模型,用来计算token数量

    • start_on:保留的内容必须从这类消息开始;“human”表示从用户消息开始保留

    • end_on:选择从哪一类消息结束

    • include_system:保证系统提示词(prompt)不被删掉

    def call_llm(state: MessagesState):
        messages = trim_messages(
            state["messages"],
            strategy="last",  # 修剪策略(last从末尾,first从开头, middle从中间)
            token_counter=count_tokens_approximately,  # 用来估算token数量
            max_tokens=100,  # 修剪后的消息总 token 不超过 200
            start_on="human",  # 控制从哪一类消息开始截取(从最后一个 human 消息开始往前保留)
            end_on=("human", "ai"),  # 允许哪些角色作为修剪终点
        )
        print("修剪后的消息:", messages)
        response = llm.invoke(messages)
        return {"messages": [response]}
    
    
    checkpointer = InMemorySaver()
    builder = StateGraph(MessagesState)
    builder.add_node(call_llm)
    builder.add_edge(START, "call_llm")
    graph = builder.compile(checkpointer=checkpointer)
    
    config = {"configurable": {"thread_id": "1"}}
    graph.invoke({"messages": "我的名字叫jran"}, config)
    graph.invoke({"messages": "帮我家的猫写一首诗"}, config)
    graph.invoke({"messages": "现在对狗做一样的事情"}, config)
    final_response = graph.invoke({"messages": "我的名字叫什么?"}, config)
    
    print("最终消息:")
    print(final_response["messages"])
    删除消息

    trim_message()是“临时裁剪”,让传给llm的文本数量变少了,但消息本身还在;而Remove_message()是“永久删除”,根据thread_id从state[message]里面删除多条消息。

    from langchain_core.messages import RemoveMessage
    from langgraph.graph import MessagesState
    
    def delete_old_messages(state: MessagesState):
        messages = state["messages"]
        # 删掉最早的 2 条
        to_remove = [RemoveMessage(id=m.id) for m in messages[:2]]
        return {"messages": to_remove}
    总结消息

    修剪或删除消息的问题在于,可能会因剔除消息队列而丢失信息。因此,一些应用程序受益于一种更复杂的方法,即使用聊天模型来汇总消息历史记录。

    SummarizationNode是 LangGraph 提供的一个预置节点,专门用于在对话过长时自动将历史消息压缩为摘要,从而解决 LLM 上下文窗口超限的问题

    SummarizationNode虽然不是一个函数,但是它是一个可调用对象;这个类中实现了__call__方法让它可以向函数一样被调用。

    下载模块

    pip install langmem

    使用方法

    from langchain_openai import ChatOpenAI
    from langgraph.prebuilt import create_react_agent
    from langgraph.prebuilt.chat_agent_executor import AgentState
    from langmem.short_term import SummarizationNode
    from langchain_core.messages.utils import count_tokens_approximately
    
    # 1. 模型
    model = ChatOpenAI(model="gpt-4o")
    summarization_model = model.bind(max_tokens=128)  # 摘要用小输出
    
    # 2. 摘要节点(核心)
    summarization_node = SummarizationNode(
        token_counter=count_tokens_approximately,
        model=summarization_model,
        max_tokens=384,        # 给主模型的总token上限
        max_summary_tokens=128,# 摘要本身最大长度
        output_messages_key="llm_input_messages"
    )
    
    # 3. 扩展状态(存摘要)
    class State(AgentState):
        context: dict
    
    # 4. 集成到 Agent(pre_model_hook 自动触发)
    agent = create_react_agent(
        model=model,
        tools=[],
        state_schema=State,
        pre_model_hook=summarization_node  # 每次调用LLM前执行摘要
    )

    这里summarization_model是需要为摘要专门创建一个模型

    • max_tokens:在进入摘要模型之前:已有的摘要+用户问题<=384

    •  max_tokens_before_summary:当前对话[AI, Human, tools, AI, Human, tools]tokens长度大于50就触发摘要

    •  max_summary_tokens:摘要的长度 128 tokens

    • 摘要 + 用户的问题 -> llm摘要 # 摘要 + 用户的问题在重新进行摘要不能超过384token,超过的会被舍弃掉

    还有两个很重要的参数initial_summary_prompt和existing_summary_prompt

    initial_summary_prompt是第一次生成摘要时的提示词模板

    existing_summary_prompt是后续更新摘要时的提示词模板,在后续更新摘要时,应该在prompt里写明把旧的摘要和新的信息结合形成一个新的摘要

    工具Tool

    工具封装了可调用函数及其输入模式。这些可以传递给兼容的聊天模型,让模型决定是否调用工具以及使用哪些参数。

    LangChain 为常见的外部系统(包括 API、数据库、文件系统和 Web 数据)提供预构建的工具集成。

    浏览集成目录以查找可用的工具。

    常见类别:

    • 搜索:Bing、SerpAPI、Tavily

    • 代码执行:Python REPL、Node.js REPL

    • 数据库:SQL、MongoDB、Redis

    • Web 数据:抓取和浏览

    • API:OpenWeatherMap、NewsAPI等

    from langchain.tools import tool
    
    
    @tool
    def multiply(a: int, b: int) -> int:
        """将两个数目相乘."""
        return a * b
    
    
    # 运行工具
    print(multiply.invoke({"a": 6, "b": 7}))  # returns 42
    
    tool_call = {
        "type": "tool_call",
        "id": "1",
        "args": {"a": 42, "b": 7}
    }
    print(multiply.invoke(tool_call))
    
    print("=" * 8, "在工作流中使用", "=" * 8)
    
    from langgraph.prebuilt import ToolNode
    from langgraph.graph import StateGraph, MessagesState, START, END
    from langchain_tavily import TavilySearch
    
    
    @tool
    def tavily_search_tool(query: str) -> str:
        """这是一个搜索工具"""
        tool_instance = TavilySearch()
        return tool_instance.run(query)
    
    # 执行工具的节点
    tool_node = ToolNode([tavily_search_tool])
    # 绑定工具到模型
    model_with_tools = llm.bind_tools([tavily_search_tool])
    
    
    def should_continue(state: MessagesState):
        messages = state["messages"]
        last_message = messages[-1]
        if last_message.tool_calls:
            return "tools"
        return END
    
    
    def call_model(state: MessagesState):
        messages = state["messages"]
        response = model_with_tools.invoke(messages)
        return {"messages": [response]}
    
    
    builder = StateGraph(MessagesState)
    
    # 定义节点和边
    builder.add_node("call_model", call_model)  
    builder.add_node("tools", tool_node)
    
    builder.add_edge(START, "call_model")  # 模型因为绑定了工具,决定是否要使用工具,返回:tool message
    builder.add_conditional_edges("call_model", should_continue, ["tools", END])
    builder.add_edge("tools", "call_model")
    
    graph = builder.compile()
    
    print(graph.invoke({"messages": [{"role": "user", "content": "上海的天气?"}]}))

    TavilySearch()Python SDK 中一个专门用于联网搜索的工具,调用该tool后,它会帮你给Tavily Search API发送HTTP请求并返回搜索结果,类似于Web RAG

    上下文管理

    LangGraph 中的工具有时需要上下文数据,例如仅在运行时使用的参数(例如,用户 ID 或会话详细信息、状态等),这些数据不应由模型控制。LangGraph 提供了三种方法来管理此类上下文:

    类型 使用场景 可变的 寿命
    配置 静态、不可变的运行时数据 单次调用
    短期记忆 调用期间动态变化的数据 单次调用
    长期记忆 持久的跨会话数据 跨多个会话
    短期记忆

    短期记忆保持在单次执行期间发生变化的动态状态

    一般的工具函数接收的是用户的输入Query;ToolRuntime是 LangChain/LangGraph 专门给「工具函数」用的运行时对象,ToolRuntime负责把所有的上下文、状态、用户信息、调用工具id等全部传入给函数

    # 1. 定义状态 (State)
    class CustomState(MessagesState):
        user_name: str
    
    
    # 2. 定义工具
    @tool
    def get_user_name(runtime: ToolRuntime) -> str:
        """从状态中检索当前用户名。"""
        # 在 LangGraph 中,可以通过 InjectedState 注入整个状态
        return runtime.state.get("user_name", "未知用户")
    
    
    @tool
    def update_user_name(new_name: str, runtime: ToolRuntime) -> Command:
        """更新短期记忆中的用户名。"""
        print(f"--- 触发更新用户名工具: {new_name} ---")
        # Command 会在工具执行完后直接作用于 Graph 的 State
        return Command(
            update={
                "user_name": new_name,
                "messages": [
                    ToolMessage(
                        content=f"姓名已经更改成: {new_name}.",
                        tool_call_id=runtime.tool_call_id,
                    )
                ],
            }
        )
    
    
    tools = [get_user_name, update_user_name]
    llm_with_tools = llm.bind_tools(tools)
    
    
    # 3. 定义节点函数
    def call_model(state: CustomState):
        """模型决策节点"""
        response = llm_with_tools.invoke(state["messages"])
        return {"messages": [response]}
    
    
    # 4. 构建图 (Workflow)
    workflow = StateGraph(CustomState)
    
    # 添加处理节点
    workflow.add_node("agent", call_model)
    workflow.add_node("tools", ToolNode(tools))
    
    # 设置连线
    workflow.add_edge(START, "agent")
    
    # 动态决定:是去执行工具还是直接结束
    workflow.add_conditional_edges(
        "agent",
        tools_condition,  # 内置函数:判断消息中是否有 tool_calls
        {"tools": "tools", "__end__": END},
    )
    
    # 工具执行完后回到 agent,让模型根据工具结果说话
    workflow.add_edge("tools", "agent")
    
    # 5. 编译并运行
    checkpointer = InMemorySaver()
    app = workflow.compile(checkpointer=checkpointer)
    
    # --- 测试运行 ---
    config = {"configurable": {"thread_id": "1"}}
    
    print("\n--- 第一次对话 ---")
    input_1 = {"messages": [{"role": "user", "content": "我的名字是jran"}]}
    for event in app.stream(input_1, config, stream_mode="values"):
        event["messages"][-1].pretty_print()
    
    print("\n--- 查看当前 State 中的 user_name ---")
    print(f"State Name: {app.get_state(config).values.get('user_name')}")
    
    print("\n--- 第二次对话 ---")
    input_2 = {"messages": [{"role": "user", "content": "我的名字是什么?"}]}
    for event in app.stream(input_2, config, stream_mode="values"):
        event["messages"][-1].pretty_print()
    长期记忆

    使用长期记忆来存储对话中特定于用户或应用程序的数据。这对于像聊天机器人这样的应用程序非常有用。

    要使用长期记忆,需要:

    1. 配置存储以在调用之间保留数据。

    2. 使用该ToolRuntime功能从工具或提示中访问store。

    class UserInfo(TypedDict):
        name: str
        language: str
    
    
    @tool
    def update_user_info(user_info: UserInfo, runtime: ToolRuntime) -> str:
        """更新用户信息"""
        print("工具被调用,接收到的 user_info:", user_info)
        user_id = runtime.config.get("configurable").get("user_id")
    
        runtime.store.put(("users",), user_id, user_info)
        return "用户信息更新成功"
    
    
    @tool
    def get_user_info(runtime: ToolRuntime) -> str:
        """查找用户信息."""
        user_id = runtime.config.get("configurable").get("user_id")
        user_info = runtime.store.get(("users",), user_id)
        return str(user_info.value) if user_info else "Unknown user"
    
    
    tools = [update_user_info, get_user_info]
    llm_with_tools = llm.bind_tools(tools)
    
    
    # 4. 定义节点
    def call_model(state: MessagesState):
        return {"messages": [llm_with_tools.invoke(state["messages"])]}
    
    
    # 5. 构建图
    workflow = StateGraph(MessagesState)
    
    workflow.add_node("agent", call_model)
    workflow.add_node("tools", ToolNode(tools))  # ToolNode 会自动处理 store 的传递
    
    workflow.add_edge(START, "agent")
    workflow.add_conditional_edges("agent", tools_condition)
    workflow.add_edge("tools", "agent")
    
    # 6. 初始化存储与持久化器
    # 创建内存存储对象
    store = InMemoryStore()
    # 创建内存持久化器
    checkpointer = InMemorySaver()
    
    # 存储初始化用户信息
    store.put(
        ("users",),
        "user_123",
        {
            "name": "jran",
            "language": "中文",
        }
    )
    
    # 预存初始数据
    store.put(("users",), "user_123", {"name": "jran", "language": "中文"})
    
    # 编译图 (关键:传入 store)
    app = workflow.compile(checkpointer=checkpointer, store=store)
    
    # --- 模拟运行 ---
    config = {"configurable": {"thread_id": "thread_1", "user_id": "user_123"}}
    
    print("\n=== 任务 1: 查询初始信息 ===")
    for event in app.stream({"messages": [{"role": "user", "content": "查询用户信息"}]}, config, stream_mode="values"):
        event["messages"][-1].pretty_print()
    
    print("\n=== 任务 2: 更新信息 ===")
    # 模拟用户说:我的名字叫李铭,使用的语言是西班牙语
    for event in app.stream({"messages": [{"role": "user", "content": "我的名字叫李铭,使用的语言是西班牙语"}]}, config,
                            stream_mode="values"):
        event["messages"][-1].pretty_print()
    
    print("\n=== 任务 3: 再次查询 (验证长期记忆) ===")
    for event in app.stream({"messages": [{"role": "user", "content": "我现在的名字和语言是什么?"}]}, config,
                            stream_mode="values"):
        event["messages"][-1].pretty_print()

    Logo

    AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

    更多推荐