前言

AI 为什么需要“记忆”?

在构建复杂的 AI Agent 时,我们不仅需要它能干活(Node),更需要它能记住自己干了什么(State),甚至在出错时能回到过去重新开始(Checkpoint)。今天我们通过一段简洁的代码,拆解 LangGraph 的核心灵魂。

深入

作为深入理解的基石,我们可以把这几个点再点透一点:

1. 概念微调:Checkpointer vs Checkpoint

区分这两者,我们可以将它们理解为:一个是“管家”,一个是“档案”:

  • Checkpointer (存储器/管家):它是物理存在。比如我们接下来用的 InMemorySaver(存内存,断电即失)或者生产环境用的 SqliteSaver / PostgresSaver(存数据库,永久保存)。

  • Checkpoint (快照/档案):它是逻辑存在。是某一时刻 State 的完整镜像 + 路由信息(下一步去哪)。

2. 核心灵魂:thread_id(线程 ID)

Checkpoint 由 thread_id 管理,这点至关重要。

  • 你可以把它理解为**“平行宇宙的坐标”**。

  • 如果没有 thread_id,AI 就分不清谁是谁。有了它,同一个 AI 模型可以同时和 100 个人聊天,每个人都有属于自己的“档案袋”,互不干扰。

3. 作用详解

我们可以给到四个关键词并且加上最直白的解释:

  • 记忆管理(Memory):

    • 直白说:让 AI 记住刚才聊了什么。即使程序关闭再打开,只要 thread_id 一样,它就能接上话。

  • 时间旅行(Time Travel):

    • 直白说:你可以通过 graph.get_state_history(config) 查到 Step 1 的状态,甚至可以强行把状态改写,然后从那个时间点开始“分叉”运行。

  • 人在回路(Human-in-the-loop):

    • 直白说:比如 AI 要执行“删除数据库”的操作,你可以设置一个中断。AI 运行到这一步会产生一个 Checkpoint 然后挂起。直到你(人类)看了一眼,点击“批准”,它才从这个 Checkpoint 继续往下走。

  • 容错(Fault Tolerance):

    • 直白说:如果 node_b 运行的时候服务器停电了,任务没完成。重启后,它不需要从 node_a 重新跑(省钱省时间),而是直接从 Step 1 的 Checkpoint 恢复。

补充一个进阶点:Checkpoint 到底存了什么?

 Checkpoint 不仅存了 State(代码里的 foobar),它还存了 next(下一步该去哪个 Node)。

这也是为什么接下来代码里调用 get_state(config) 的结果里会有 next=() 或者 next=('node_b',) 的原因。

编码

先贴上全部代码,后面会做详细的解释

from langgraph.graph import StateGraph,START,END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

#状态
class State(TypedDict):
    foo:str
    bar:Annotated[list[str],add]

def node_a(state:State):
    return {"foo":"a","bar":["a"]}

def node_b(state:State):
    return {"foo":"b","bar":["b"]}

#构建状态图
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START,"node_a")
workflow.add_edge("node_a","node_b")
workflow.add_edge("node_b",END)

#检查点管理器
checkpointer = InMemorySaver()

#编译
graph = workflow.compile(checkpointer=checkpointer)

#配置
config: RunnableConfig={
    "configurable":{"thread_id":"1"}
}

#调用
results = graph.invoke({"foo":""},config)
print(results)

#状态查看
print(graph.get_state(config))

for checkpoint_tuple in checkpointer.list(config):
    print()
    print(checkpoint_tuple)

for checkpoint_tuple in checkpointer.list(config):
    print()
    print(checkpoint_tuple[2]["step"])
    print(checkpoint_tuple[2]["source"])
    print(checkpoint_tuple[1]["channel_values"])

代码详解
初始化

我们先撇开复杂的概念,逐步拆解这段代码。

把这段代码看作是在设计一个“自动化流水线”。

class State(TypedDict):
    foo:str
    bar:Annotated[list[str],add]

第一步:定义“传单”(State)

这里的 State 就是流水线上一直在传递的那张信息单

foo: str:这是一个普通的字符串。规则是:后来者居上。如果节点 A 写了 "a",节点 B 写了 "b",那空格里最后就只剩下 "b"

bar: Annotated[list[str], add]:这是一个特殊的列表。add 是它的合并规则。规则是:只加不删。不管谁处理,都只能往列表末尾追加新内容。

第二步:定义“工人”(Nodes)

def node_a(state:State):
    return {"foo":"a","bar":["a"]}

def node_b(state:State):
    return {"foo":"b","bar":["b"]}

这两个函数就是流水线上的两个工人:

  • node_a:它的任务很简单。不管传单之前写了什么,它都要在 foo 那里填上 "a",在 bar 列表里塞进一个 ["a"]

  • node_b:同理,它填 "b"foo,塞 ["b"]bar

第三步:铺设“传送带”(Graph & Edges)

workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START,"node_a")
workflow.add_edge("node_a","node_b")
workflow.add_edge("node_b",END)

这部分就是在工厂车间里画地线、摆机器

  • StateGraph(State):告诉工厂,我们这张“传单”的格式已经定好了。

  • add_node:把工人(函数)放到指定的工位上。

  • add_edge:连接传送带。箭头指向上一个工位结束后,传单该送往哪。

到这里我们预期一下,这几行代码跑起来会发生什么?

想象一张空白传单:

  1. START:传单进入流水线。

  2. 抵达 node_a:foo 变成 "a"bar 变成 ["a"]

  3. 抵达 node_b:foo 被擦掉重写成了 "b"bar 发现已经有 ["a"] 了,于是在后面加上 ["b"],变成了 ["a", "b"]

  4. END:任务结束,你拿到了最终那张写着 {'foo': 'b', 'bar': ['a', 'b']} 的传单。

记录结果

接下去我们使用内存记录每一步的左右的状态快照

checkpointer = InMemorySaver()
编译和配置

然后开始编辑,相当于我们定义的工作流程正式运行

graph = workflow.compile(checkpointer=checkpointer)

compile() 会把所有节点和连线逻辑转化成一个可执行的 Runnable 对象。而且把checkpointer加入到了流程中。现在,相当于每当传单经过一个工位,checkpointer都会自动冲上去拍一张照。

配置:给即将开始的任务一个id,如上集所说,checkpointer是根据thread_id来管理流程的。

config: RunnableConfig = {
    "configurable": {"thread_id": "1"}
}
调用

下达指令并开跑,把一张只写了 foo 是空字符串的原始传单(输入数据),配合订单号(config),扔进流水线。

results = graph.invoke({"foo":""},config)
print(results)
结果输出

我们看看为什么会这么打印

初始化(foo 是空的字符串 "",bar 是空列表)---------->到达 node_a节点( 把 foo 改成了 "a",bar 里塞了一个 ["a"])---------->到达 node_b节点(foo 改成 b,往bar中塞了一个 ["b"])---------->结束输出结果{'foo': 'b', 'bar': ['a', 'b']}

StateSnapshot详解

我们接下去打印它的详细

print(graph.get_state(config)) 

可以看到返回结果是一个快照对象

这里面values: 核心数据,代表了当前时刻状态机里存储的所有数据。

next: 待办事项(路由信息)

  • 这里是一个元组。如果它是空的 (),说明流程已经彻底结束(跑到了 END)。

  • 如果你设置了断点(比如在 node_a 后执行暂停),这里会显示 ('node_b',),意思是:“数据已经停在路口了,下一站是 node_b”。

config: 当前坐标

这是当前这个状态的唯一标识:

  • thread_id: 订单号(你在同一个 ID 下做的所有事都会被串起来)。

  • checkpoint_id: 当前这一步的唯一指纹。如果你想让程序精准跳回到这一秒钟的状态,就需要用到这个 ID。

  • checkpoint_ns: 命名空间,通常在复杂的“子图(Subgraph)”嵌套时使用,现在可以暂时忽略。

metadata: 操作日志

  • source: 来源。loop 表示这是由状态图自动循环运行产生的。如果是你手动修改了状态,这里可能会显示 update

  • step: 步数计数器。从 -1(输入)开始,到 2(完成 node_b),清晰地记录了这是第几次操作。

parent_config: 溯源信息(父级指针)

这是 Checkpointer 实现“时间旅行”的核心

  • 它保存了上一个步骤的 checkpoint_id

  • 意义:就像链条一样,当前的步骤指向了上一步。LangGraph 就是通过这个字段,把分散在数据库里的快照串成了一个完整的“历史长河”。

tasksinterrupts: 运行状态

  • tasks: 如果当前有节点正在并发执行,这里会显示正在运行的任务。因为你的代码是串行的且已经运行完,所以它是空的。

  • interrupts: 记录当前的“中断”。如果你设置了人工审批流程(Human-in-the-loop),当程序停下来等你点击“批准”时,这里会显示中断的原因。

StateSnapshot 不只是数据的集合,它是一个带有‘时空属性’的信封。它记录了:我是谁(config)、我从哪来(parent_config)、我要去哪(next),以及我肚子里装了什么(values)。

这里可以试一下,获取上一步的状态(node_a 刚结束时的状态)

parent_id = graph.get_state(config).parent_config['configurable']['checkpoint_id']
old_config = {"configurable": {"thread_id": "1", "checkpoint_id": parent_id}}
print(graph.get_state(old_config).values)

打印出的是 {'foo': 'a', 'bar': ['a']}。这就是利用 parent_config 实现的回溯

CheckpointTuple详解

将checkpointer从后往前,一步一步的翻出来看看。在 LangGraph 中,每一步操作(Step)都会生成一个独立的 CheckpointTuple。运行这个 for 循环,你会看到整个程序的进化史。

它是一个包含三个核心组件的“档案袋”:

  1. config:这一页账目的“页码”(包含唯一的 checkpoint_id)。

  2. checkpoint:这一页记下的“原始原始数据”(包含版本号、时间戳、所有通道的内部值)。

  3. metadata:这一页的“备注”(比如是哪一秒存的、是哪个节点跑完存的)。

我们看一下CheckpointTuple 的详细结构

CheckpointTuple(
	config={
		'configurable': {
			'thread_id': '1', 
			'checkpoint_ns': '', 
			'checkpoint_id': '1f127352-80fd-6ca4-8002-d762516fa820'
		}
	}, 
	checkpoint={
		'v': 4, 
		'ts': '2026-03-24T03:54:28.407414+00:00', 
		'id': '1f127352-80fd-6ca4-8002-d762516fa820', 
		'channel_versions': {
			'__start__': '00000000000000000000000000000002.0.5958868265123454', 
			'foo': '00000000000000000000000000000004.0.4198389292977224', 
			'branch:to:node_a': '00000000000000000000000000000003.0.4096430761596095', 
			'bar': '00000000000000000000000000000004.0.4198389292977224', 
			'branch:to:node_b': '00000000000000000000000000000004.0.4198389292977224'
		}, 
		'versions_seen': {
			'__input__': {}, 
			'__start__': {
				'__start__': '00000000000000000000000000000001.0.8405408819711978'
			}, 
			'node_a': {
				'branch:to:node_a': '00000000000000000000000000000002.0.5958868265123454'
			}, 
			'node_b': {
				'branch:to:node_b': '00000000000000000000000000000003.0.4096430761596095'
				}
		}, 
		'updated_channels': ['bar', 'foo'], 
		'channel_values': {
			'foo': 'b', 
			'bar': ['a', 'b']
		}
	}, 
	metadata={
		'source': 'loop', 
		'step': 2, 
		'parents': {}
	}, 
	parent_config={
		'configurable': {
			'thread_id': '1', 
			'checkpoint_ns': '', 
			'checkpoint_id': '1f127352-80f1-668a-8001-6ff0371fbcc6'
		}
	}, 
	pending_writes=[]
)

我们从数据里面最核心的四个维度来讲

1. configparent_config“时空链路”

这是实现“断点续传”和“回溯”的钥匙。

  • checkpoint_id: 当前这一秒钟状态的唯一“身份证号”。

  • parent_config: 记录了上一秒钟(上一个节点)的“身份证号”。

  • 直白解释:就像链条一样,每一环都勾住了上一环。通过它们,LangGraph 才能把分散的记录串成一个完整的“故事线”。

2. channel_values“当前账本实况”

  • 核心定义:这是用户最关心的业务数据

  • 直白解释:这是此时此刻“传单”上写着的最终内容。它展示了经过所有节点处理后,foo 变成了什么,bar 累加到了什么程度。

3. channel_versionsversions_seen“版本控制器”

这是 CheckpointTuple 里看起来最乱但技术含量最高的部分。

  • 直白解释:LangGraph 为了防止数据冲突,给每个状态字段(如 foo, bar)都发了一个版本号(那串长长的 00000...)。

  • 关键作用:它记录了哪个节点(Node)看过哪个版本的数据。

    • 为什么重要? 如果你的图里有并行执行(比如两个节点同时运行),LangGraph 就是靠这些版本号来判断谁先谁后,以及是否需要根据 Reducer(如 add)进行合并,确保数据不会乱套。

4. pending_writes“任务待办单”

  • 核心定义:记录那些**“已经计算出结果但还没正式写入账本”**的数据。

  • 直白解释:在你的结果里它是空的,说明这一步已经稳稳地写进了存档。

  • 容错价值:如果在写入过程中程序突然崩溃,pending_writes 里的内容就像是“未保存的草稿”,系统重启后会先去翻这里,确保数据一丁点都不会丢失。

在 Step 1 的快照里,可以看到: pending_writes=[(..., 'foo', 'b'), (..., 'bar', ['b'])]。意味着在 Step 1 存档时,node_a 已经写完了(foo 是 "a")。但是,node_b 的指令已经发出了,它“正准备”写入 'b'Checkpointer 连这种“正在路上”的状态都记录了下来,这就是为什么如果程序在 node_b 运行时断电,重启后它能完美接上的原因。

总结来说

CheckpointTuple 不仅仅是数据的简单堆砌,它是一套严密的、带有版本追踪链式溯源能力的持久化方案。channel_values 存储了结果,而 channel_versionsparent_config 则保证了结果的来源可追溯、更新可控。

简化信息展示
for checkpoint_tuple in checkpointer.list(config):
    print()
    print(checkpoint_tuple[2]["step"])
    print(checkpoint_tuple[2]["source"])
    print(checkpoint_tuple[1]["channel_values"])

拿出最关键的 “步数、来源、数据内容” 展示出来。形成一个透明的易读的运行过程,

总结

通过一份代码,完整地拆解了 LangGraph 的“大脑”是如何工作的。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐