1. 核心概念

1.1 时光旅行定义

时光旅行Time-Travel)功能,允许查看和恢复 Graph 执行的历史状态。依托检查点机制实现,智能体 Graph 流程每执行完成一个节点,自动全量快照保存执行上下文状态

系统按执行时序持久化存储核心信息:

  • 全局业务状态数据
  • 当前执行节点、下一跳路由节点
  • 唯一 CheckpointID 、会话隔离 threadId

三大核心能力:

  1. 历史回看:追溯会话全流程状态流转与节点执行记录
  2. 状态回溯:指定任意历史检查点,从该节点位置恢复继续执行
  3. 流程分叉:基于历史快照修改状态,开辟全新执行分支,并行探索多业务路径

底层运行原理:

  • 统一通过Checkpointer实现状态持久化,支持内存/Redis/数据库多存储介质
  • threadId作为会话隔离标识,区分不同用户、不同对话流程
  • 单步执行生成唯一CheckpointID,作为状态回溯唯一锚点
  • StateSnapshot封装完整执行上下文,支持状态重建、流程回放

1.2 典型应用场景

  1. 线上智能体调试排障*:回溯至报错前节点状态,还原完整运行上下文,快速定位路由分支、参数传递、工具调用异常,无需重新构造请求入参。
  2. 对话会话撤销与流程回滚*:对话智能体支持用户撤销指令、回退历史对话轮次;审批、表单填报类流程支持回退重选分支、重新填写内容。
  3. 智能体策略A/B路径实验*:基于同一份历史状态快照,分叉多条执行链路,对比不同提示词、路由规则、工具调用逻辑的执行效果,低成本完成策略调优。
  4. 多版本会话并行生成:基于同一原始会话历史,衍生标准版、精简版、专业版等多套回答流程,各分支独立运行互不干扰,适配文案创作、方案策划场景。
  5. 长会话断点续聊:服务重启、设备切换、对话中断后,通过检查点快速恢复完整会话上下文,实现无缝续聊。
  6. 低代码流程画布联动:搭配前端ReactFlow流程画布,支持流程步骤预览、节点回退编辑、连线重配、多版本流程快照保存。
  7. 合规审计与行为溯源:全流程留存状态快照与节点执行日志,满足业务合规存档、操作行为复盘溯源需求。

1.3 注意事项

  1. 存储选型MemorySaver仅适用于本地测试、开发调试;生产环境必须使用Redis/数据库持久化存储
  2. 序列化规范:所有存入全局状态的业务对象必须实现可序列化,保证检查点正常持久化与恢复
  3. 版本兼容性Graph 流程图结构、节点逻辑修改后,旧版本历史检查点状态存在兼容异常风险
  4. 数据清理:线上业务需配置过期检查点自动清理策略,避免海量历史数据占用存储资源
  5. 重放执行风险:状态回溯重放会重新触发大模型调用、第三方接口请求,二次执行结果存在不一致性

2. 核心基础

2.1 会话与检查点

会话:独立流程运行的唯一标识,所有检查点均归属指定 threadId,通过 threadId 隔离不同用户、不同业务流程历史数据,执行 Graph 流程必须指定会话 ID

检查点:流程执行至每个节点完成后生成的全量状态快照,由 StateSnapshot 实体封装存储,记录当前状态值、执行节点、下一跳节点、任务信息等核心数据,是时光旅行功能的核心载体。

2.2 内置检查点持久化实现

持久化实现类 存储介质 适用场景
MemorySaver 应用内存 本地开发、单元测试
RedisSaver Redis缓存 线上生产、高并发会话
PostgreSqlSaver PostgreSQL数据库 海量历史归档、合规溯源
MongodbSaver MongoDB数据库 非结构化状态数据存储

3. 环境配置与流程初始化

3.1 构建工作流

@Bean
public CompiledGraph checkpointGraph() throws GraphStateException {
    // 定义全局状态字段更新策略
    KeyStrategyFactory keyStrategyFactory = () -> {
        HashMap<String, KeyStrategy> strategies = new HashMap<>();
        strategies.put("messages", new AppendStrategy());
        strategies.put("step", new ReplaceStrategy());
        return strategies;
    };

    // 构建流程节点与路由
    StateGraph builder = new StateGraph(keyStrategyFactory)
            .addNode("step1", node_async(state ->
                    Map.of("messages", "Step 1", "step", 1)))
            .addNode("step2", node_async(state ->
                    Map.of("messages", "Step 2", "step", 2)))
            .addNode("step3", node_async(state ->
                    Map.of("messages", "Step 3", "step", 3)))
            .addEdge(START, "step1")
            .addEdge("step1", "step2")
            .addEdge("step2", "step3")
            .addEdge("step3", END);

    // 注册内存检查点存储器(测试专用)
    MemorySaver memorySaver = new MemorySaver();
    SaverConfig saverConfig = SaverConfig.builder()
            .register(memorySaver)
            .build();

    // 编译开启检查点能力
    CompileConfig compileConfig = CompileConfig.builder()
            .saverConfig(saverConfig)
            .build();

    return builder.compile(compileConfig);
}

3.2 流程执行生成历史检查点

通过指定统一 threadId 绑定同一会话,多次执行自动追加流程历史:

@Test
public void runGraphGenerateCheckpoint() {
    // 绑定唯一会话ID
    RunnableConfig config = RunnableConfig.builder()
            .threadId("conversation-1")
            .build();

    // 首轮流程执行
    Map<String, Object> firstInput = Map.of("query", "Hello");
    checkpointGraph.invoke(firstInput, config);

    // 同会话二次执行,追加历史检查点
    Map<String, Object> secondInput = Map.of("query", "Follow-up question");
    checkpointGraph.invoke(secondInput, config);
}

4. 历史状态查询

4.1 查询会话最新状态

// 指定会话ID
RunnableConfig config = RunnableConfig.builder().threadId("conversation-1").build();
// 获取当前会话最后一次执行的状态快照
StateSnapshot latestSnapshot = checkpointGraph.getState(config);
// 打印状态与执行节点
System.out.println("最新状态:" + latestSnapshot.state());
System.out.println("当前执行节点:" + latestSnapshot.node());

4.2 查询会话全量执行历史

框架提供getStateHistory方法,按倒序返回所有历史快照(最新记录置顶):

RunnableConfig config = RunnableConfig.builder().threadId("conversation-1").build();
// 获取全量历史状态快照
List<StateSnapshot> historyList = (List<StateSnapshot>) checkpointGraph.getStateHistory(config);

// 遍历打印所有执行步骤
for (int i = 0; i < historyList.size(); i++) {
    StateSnapshot snapshot = historyList.get(i);
    System.out.printf("执行步骤%d \n状态数据:%s \n检查点ID:%s \n执行节点:%s\n",
            i,
            snapshot.state(),
            snapshot.config().checkPointId().orElse("无"),
            snapshot.node());
}

4.3 源码核心查询逻辑

/**
 * 时光旅行-获取指定会话所有历史状态快照
 * @param config 运行配置(携带threadId会话标识)
 * @return 全量历史状态快照集合
 */
public Collection<StateSnapshot> getStateHistory(RunnableConfig config) {
    // 校验是否配置检查点存储器
    BaseCheckpointSaver saver = compileConfig.checkpointSaver()
            .orElseThrow(() -> new IllegalStateException("未配置CheckpointSaver检查点存储器!"));
    // 按会话ID查询所有检查点,封装为业务可用状态快照
    return saver.list(config)
            .stream()
            .map(checkpoint -> StateSnapshot.of(
                    keyStrategyMap,
                    checkpoint,
                    config,
                    stateGraph.getStateFactory()
            ))
            .collect(toList());
}

5 状态回溯重放(时光旅行)

5.1 回溯执行原理

  1. 传入原会话 threadId + 历史 CheckpointID 构建回溯配置
  2. 框架自动恢复该检查点对应的完整上下文状态
  3. 仅执行该检查点之后未完成的流程节点,前置已执行节点不再重复运行
  4. 流程执行完成后自动生成全新后续检查点,不篡改原始会话历史

5.2 完整回溯重放代码

// 1. 定义原会话ID
String threadId = "conversation-1";
RunnableConfig originConfig = RunnableConfig.builder().threadId(threadId).build();

// 2. 查询历史快照,选定回溯节点
List<StateSnapshot> history = (List<StateSnapshot>) checkpointGraph.getStateHistory(originConfig);
// 选定指定历史步骤快照
StateSnapshot targetSnapshot = history.get(1);
String targetCheckpointId = targetSnapshot.config().checkPointId().orElse(null);

// 3. 构建回溯执行配置
RunnableConfig replayConfig = RunnableConfig.builder()
        .threadId(threadId)
        .checkPointId(targetCheckpointId)
        .build();

// 4. 从历史检查点位置继续执行后续流程
Optional<OverAllState> result = checkpointGraph.invoke(
        Map.of("query", "从历史状态发起新请求"),
        replayConfig
);

6. 状态修改与流程分支创建

6.1 分支创建原理

基于原有会话历史检查点,更换全新 threadId 构建独立分支,修改状态数据后执行流程,生成完全独立的分支执行历史,原始主会话历史不受任何影响

6.2 标准分支创建代码

// 1. 原主会话配置
String mainThreadId = "conversation-1";
RunnableConfig mainConfig = RunnableConfig.builder().threadId(mainThreadId).build();
checkpointGraph.invoke(Map.of("query", "Hello"), mainConfig);

// 2. 获取历史基准检查点
List<StateSnapshot> mainHistory = (List<StateSnapshot>) checkpointGraph.getStateHistory(mainConfig);
StateSnapshot baseSnapshot = mainHistory.get(1);
String baseCheckpointId = baseSnapshot.config().checkPointId().orElse(null);

// 3. 新建独立分支会话ID
String branchThreadId = "conversation-1-branch";
RunnableConfig branchConfig = RunnableConfig.builder()
        .threadId(branchThreadId)
        .checkPointId(baseCheckpointId)
        .build();

// 4. 分支独立执行,生成全新流程历史
checkpointGraph.invoke(Map.of("query", "分支流程独立请求"), branchConfig);

6.3 分支创建 BUG 说明

使用 MemorySaver 内存检查点存储器创建跨会话分支时,会抛出异常:

java.lang.IllegalStateException: Resume request without a valid checkpoint!

异常原因MemorySaver内存存储仅绑定原始会话ID,不支持跨 threadId 读取复用检查点。

解决方案:生产环境替换为RedisSaver/数据库持久化存储器,即可正常实现跨会话分支创建。

在这里插入图片描述

7 动态修改全局状态

7.1 状态更新方法

通过 updateState 方法动态修改指定会话状态,同时可指定下一跳执行节点,灵活篡改流程走向:

// 绑定目标会话
RunnableConfig config = RunnableConfig.builder().threadId("conversation-1").build();
// 定义需要修改的状态数据
Map<String, Object> updateParam = Map.of("step", 999, "messages", List.of("手动修改状态"));
// 动态更新状态,指定下一跳执行节点
checkpointGraph.updateState(config, updateParam, "step2");

7.2 状态更新规则

状态修改严格匹配字段绑定的更新策略:

  • ReplaceStrategy:直接覆盖原有状态值
  • AppendStrategy:向集合尾部追加数据,保留原始历史数据
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐