1. 概述

OverAllState 作为核心容器,用于承载和管理基于图的处理流水线中各节点之间的共享数据。支持键值形式的数据存储,并且可为每个键配置自定义更新策略 KeyStrategy,实现灵活的数据合并或覆盖逻辑。该类实现了序列化,适用于持久化存储、检查点留存以及跨节点通信场景。

主要特性:

  • 数据管理:通过 Map 结构存储任意类型数据。
  • 键策略支持:为每个键关联专属策略,控制新值与已有值的合并规则。
  • 恢复模式:提供恢复标记,标识当前状态是否用于断点续执。
  • 不可变视图:通过 data()keyStrategies() 提供数据与策略的只读视图。
  • 状态快照:可通过 snapShot() 生成当前状态的快照。
  • 人工反馈:支持在流程执行中接入人工反馈与中断消息。

构造方式:

  • 默认构造器:自动注册默认输入键与默认覆盖策略。
  • 含数据构造器:基于已有状态数据 Map 进行初始化。
  • 含恢复标记构造器:创建标记为断点续执专用的状态实例。
  • 全参构造器:可完整自定义数据、键策略及恢复状态标识。

该类非线程安全,若存在并发访问场景,需由外部自行做同步控制。

2. OverAllState 类

2.1 核心常量 & 字段

公共常量:

常量名 类型 作用
DEFAULT_INPUT_KEY String 默认输入键,固定值:input
MARK_FOR_REMOVAL Object 标记对象,用于标识状态键需要被删除

核心成员变量:

变量名 类型 修饰符 作用
data Map<String, Object> private final 存储全局状态的键值数据(核心容器)
keyStrategies Map<String, KeyStrategy> private final 存储每个数据键对应的更新策略
store Store private 长期记忆存储实例,用于跨执行流程持久化

2.2 构造函数

构造函数 权限 功能说明
OverAllState() public 默认构造器:初始化空数据/策略,自动注册input键+替换策略
OverAllState(Map<String, Object> data) public 传入初始数据,初始化状态
OverAllState(Store store) public 传入持久化存储实例,初始化状态
OverAllState(Map<String, Object> data, Store store) public 同时传入初始数据+存储实例,初始化状态
OverAllState(Map<String, Object> data, Map<String, KeyStrategy> keyStrategies) protected 受保护构造器:自定义数据+策略初始化
OverAllState(Map<String, Object> data, Map<String, KeyStrategy> keyStrategies, Store store) protected 全参构造器:完整自定义数据、策略、存储实例

2.3 方法分类介绍

2.3.1 状态基础操作

对全局状态进行重置、清空、覆盖、快照等基础管理:

方法 返回值 功能说明
reset() void 清空所有状态数据
clear() void 清空状态数据(保留策略、存储实例)
cover(OverAllState overAllState) void 用目标状态完整覆盖当前状态(数据+策略+存储)
snapShot() Optional<OverAllState> 创建当前状态的快照副本(深拷贝)

2.3.2 键策略注册与校验

管理数据键对应的更新策略(合并/替换规则):

方法 返回值 功能说明
registerKeyAndStrategy(String key, KeyStrategy strategy) OverAllState 注册单个键+更新策略,支持链式调用
registerKeyAndStrategy(Map<String, KeyStrategy> keyStrategies) OverAllState 批量注册键策略,支持链式调用
containStrategy(String key) boolean 判断是否存在指定键的更新策略
keyStrategies() Map<String, KeyStrategy> 获取所有键策略(原始集合)

2.3.3 状态数据更新

核心方法:根据策略合并/替换/删除状态数据:

方法 返回值 功能说明
input(Map<String, Object> input) OverAllState 写入输入数据,仅更新已注册策略的键
updateState(Map<String, Object> partialState) Map<String, Object> 基于默认策略更新状态,支持删除标记
updateStateWithKeyStrategies(...) void 使用自定义策略映射更新状态
updateStateBySchema(...) void 基于Schema规则+策略更新状态

2.3.4 静态状态更新工具

独立于实例的静态工具方法,用于状态合并计算:

静态方法 返回值 功能说明
updateState(state, partialState) Map<String, Object> 基础状态更新,支持删除标记
updateState(state, partialState, keyStrategies) Map<String, Object> 带策略的状态合并更新

2.3.5 数据获取

安全获取状态数据,支持泛型、类型转换、默认值

方法 返回值 功能说明
data() Map<String, Object> 获取不可修改的状态数据视图(防篡改)
value(String key) Optional<T> 按键获取数据,返回Optional容器
value(String key, Class<T> type) Optional<T> 按键+类型获取数据,转换失败返回空
value(String key, T defaultValue) T 按键获取数据,为空则返回默认值

2.3.6 持久化存储操作:

操作长期记忆存储实例:

方法 返回值 功能说明
getStore() Store 获取持久化存储实例

2.3.7 内部辅助方法

仅类内部使用,用于校验、数据转换

方法 权限 功能
keyVerify() protected 校验数据与策略是否存在公共键
hasCommonKey(...) private 判断两个Map是否存在公共键
updatePartialStateFromSchema(...) private static 基于Schema更新局部状态
toMapRemovingNulls/toMapAllowingNulls private static 自定义收集器,处理空值映射

2.3.8 重写方法

方法 返回值 功能说明
toString() String 序列化为JSON格式字符串,异常时返回简洁文本

3. 执行流程

OverAllStateGraph 引擎的全局状态中枢,贯穿状态图执行全流程,负责存储、传递、合并所有节点共享的数据,支持键策略自定义状态快照持久化,是节点间数据流转的唯一载体。

本次分析只涉及普通的流程,对于中断恢复等机制,在对应的章节再讲解

图执行中涉及到状态的流程

在这里插入图片描述


3.1 初始化 Key 更新策略

Key 更新策略(KeyStrategy)是状态图全局状态管理的核心规则,绑定在 OverAllState 中,决定了「节点执行后输出的新数据」如何与「全局状态中的旧数据」合并,是状态图数据流转的「游戏规则」。

状态图有多个节点(LLM思考、工具调用、结果生成等),所有节点共用 OverAllState 这一个全局数据容器。如果没有策略,节点更新数据会直接覆盖,导致历史数据丢失,而策略就是给每个数据键(key)定义新数据来了,是追加、替换、还是合并?

具体作用

  1. 差异化管理状态数据:不同业务数据用不同规则,比如聊天消息必须追加保留历史,执行步骤只需保留最新值
  2. 保障状态一致性与安全性:所有状态更新必须通过策略执行,节点无法随意篡改全局状态,OverAllState 统一管控。
  3. 简化节点开发:节点只需要输出「需要更新的部分数据」,无需手动处理新旧数据合并,策略自动完成。
  4. 支撑复杂工作流: 适配多节点串行/并行执行、断点恢复、人工反馈等场景,保证状态流转正确。

ReactAgent 在构建状态图时生成的默认策略:

private KeyStrategyFactory buildMessagesKeyStrategyFactory(List<? extends Hook> hooks) {
	// KeyStrategyFactory 是函数式接口,用 Lambda 实现
	return () -> {
		// 1. 创建空的策略集合,存储 key → 更新策略
		HashMap<String, KeyStrategy> keyStrategyHashMap = new HashMap<>();

		// 2. 配置【输出键】策略(Agent 最终输出结果的 key)
		if (outputKey != null && !outputKey.isEmpty()) {
			// 有自定义策略用自定义,无则默认使用【替换策略】
			keyStrategyHashMap.put(outputKey, 
				outputKeyStrategy == null ? new ReplaceStrategy() : outputKeyStrategy
			);
		}

		// 3. 【固定核心配置】对话消息 messages 强制使用【追加策略】
		keyStrategyHashMap.put("messages", new AppendStrategy());

		// 4. 遍历所有扩展钩子,合并钩子中自定义的键策略
		if (hooks != null) {
			for (Hook hook : hooks) {
				// 获取单个钩子的自定义策略
				Map<String, KeyStrategy> hookStrategies = hook.getKeyStrategys();
				// 非空则合并到总策略中(钩子策略可覆盖默认策略)
				if (hookStrategies != null && !hookStrategies.isEmpty()) {
					keyStrategyHashMap.putAll(hookStrategies);
				}
			}
		}

		// 5. 返回最终完整的键策略集合,交给 OverAllState 使用
		return keyStrategyHashMap;
	};
}

在自定义状态图时,需要自己构建KeyStrategyFactory 传入策略:

        KeyStrategyFactory keyStrategyFactory = () -> {
            HashMap<String, KeyStrategy> strategies = new HashMap<>();
            strategies.put("messages", new AppendStrategy());
            strategies.put("step", new ReplaceStrategy());
            return strategies;
        };
        
        StateGraph stateGraph= new StateGraph(keyStrategyFactory);

创建 StateGraph 时策略工厂会赋值给 keyStrategyFactory 属性:

	public StateGraph(String name, KeyStrategyFactory keyStrategyFactory, StateSerializer stateSerializer) {
		this.name = name;
		this.keyStrategyFactory = keyStrategyFactory;
		this.stateSerializer = Objects.requireNonNull(stateSerializer, "stateSerializer cannot be null");
	}

编译为可执行状态图,又会把动态定义的键策略工厂,在编译阶段固化为静态的策略表,为 OverAllState 提供状态更新的最终规则:

protected CompiledGraph(StateGraph stateGraph, CompileConfig compileConfig) throws GraphStateException {
    // 1. 从编译配置中获取【最大迭代次数】(防止状态图无限循环执行)
    this.maxIterations = compileConfig.recursionLimit();

    // 2. 持有原始状态图对象(保留节点、边、流程定义)
    this.stateGraph = stateGraph;

    // 3. 核心:从状态图的策略工厂中,生成【最终固定的键策略映射表】
    this.keyStrategyMap = stateGraph.getKeyStrategyFactory()
            // 执行策略工厂的 Lambda 表达式(就是你之前看到的 buildMessagesKeyStrategyFactory)
            .apply()
            // 获取工厂生成的策略集合 Entry
            .entrySet()
            .stream()
            // 流式转换(标准Entry映射)
            .map(e -> Map.entry(e.getKey(), e.getValue()))
            // 收集为最终的 Map:Key(键名) → Value(更新策略)
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
		//...............
}

最终存储在 CompiledGraph#keyStrategyMap 中:

在这里插入图片描述

3.2 执行阶段

这是 OverAllState 最活跃的阶段,遵循状态驱动→节点执行→状态更新→路径选择的循环,直至到达 END 节点或中断点。

执行入口:

        // 执行
        Map<String, Object> input = Map.of("query", "Hello");
        compiledGraph.invoke(input);

3.2.1 初始化状态

执行 CompiledGraph#invoke 执行时,会进行 OverAllState 初始化:

	/**
	 * 创建全局状态实例(OverAllState)
	 * 处理输入参数、生成唯一执行ID,并基于编译后的策略配置初始化状态
	 * @param inputs 外部输入的初始数据
	 * @return 初始化完成的全局状态对象
	 */
	private OverAllState stateCreate(Map<String, Object> inputs) {
		// 处理空输入场景(主要用于状态恢复/断点续执场景)
		if (inputs == null) {
			inputs = new HashMap<>();
		}

		// 强制保证执行ID存在:执行ID是状态图执行链路的唯一标识
		if (!inputs.containsKey(GraphLifecycleListener.EXECUTION_ID_KEY)) {
			// 复制原输入数据,避免修改外部传入的集合
			Map<String, Object> newInputs = new HashMap<>(inputs);
			// 生成UUID作为执行ID,注入到输入参数中
			newInputs.put(GraphLifecycleListener.EXECUTION_ID_KEY, java.util.UUID.randomUUID().toString());
			inputs = newInputs;
		}

		// ===================== 核心逻辑 =====================
		// 基于编译后的配置,构建全新的全局状态OverAllState:
		// 1. 绑定编译完成的键更新策略(keyStrategyMap)
		// 2. 注入外部输入的初始数据
		// 3. 绑定持久化存储实例(用于检查点、状态恢复)
		// ====================================================
		return OverAllStateBuilder.builder()
				.withKeyStrategies(getKeyStrategyMap())
				.withData(inputs)
				.withStore(compileConfig.getStore())
				.build();
	}

初始化完成的 OverAllState 中包含了:

  • data:执行 ID、调用 invoke 传入的数据
  • keyStrategiesKey 策略集合
  • store:长期记忆存储器
    在这里插入图片描述

3.2.2 初始化执行上下文

进入到 GraphRunner#run 执行方法,在初始化执行上下文时,会传入初始化的 OverAllState

/**
 * 运行状态图(响应式执行入口)
 * 接收初始全局状态,以响应式流的方式驱动整个状态图节点执行
 * @param initialState 初始的全局状态 OverAllState(由 stateCreate 方法创建)
 * @return 响应式流 Flux,包含状态图执行过程中所有节点的输出结果
 */
public Flux<GraphResponse<NodeOutput>> run(OverAllState initialState) {
	// Flux.defer 延迟执行:确保每次订阅时才创建执行上下文,保证线程安全和隔离性
	return Flux.defer(() -> {
		try {
			// 创建状态图运行上下文:封装【初始状态】、【运行配置】、【编译后的状态图】三大核心资源
			GraphRunnerContext context = new GraphRunnerContext(initialState, config, compiledGraph);

			// 驱动状态图节点流转、状态更新、结果输出的核心逻辑
			return mainGraphExecutor.execute(context, resultValue);
		}
		catch (Exception e) {
			// 捕获执行过程中的异常,封装为响应式流的错误信号返回
			return Flux.error(e);
		}
	});
}

在构造 GraphRunnerContext 时会有两个分支情况:

	public GraphRunnerContext(OverAllState initialState, RunnableConfig config, CompiledGraph compiledGraph)
			throws Exception {
		this.compiledGraph = compiledGraph;
		this.config = config;

		if (config.metadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY).isPresent() || config.checkPointId().isPresent()) {
			initializeFromResume(initialState, config);
		} else {
			initializeFromStart(initialState, config);
		}
	}

3.2.3 第一次更新状态(初始化执行上下文时)

CompiledGraph#getInitialState 时会进行第一次状态更新。

第一次执行分支的处理逻辑:

  1. 从初始全局状态中获取输入数据
  2. 创建最终可执行的全局状态
/**
 * 从【起始节点】初始化状态图执行环境
 * 加载初始状态、配置全局执行状态、重置节点指针
 * @param initialState 外部传入的初始全局状态
 * @param config 运行时配置
 */
private void initializeFromStart(OverAllState initialState, RunnableConfig config) {
	// 日志:标记状态图开始执行(START节点启动)
	log.trace("START");

	// 从初始全局状态中获取输入数据
	Map<String, Object> inputs = initialState.data();
	// 如果输入数据非空,打印调试日志(记录初始化的输入键集合)
	if (!CollectionUtils.isEmpty(inputs)) {
		log.debug("Initializing with inputs: {}", inputs.keySet());
	}

	// ===================== 核心逻辑 =====================
	// 1. 调用编译状态图的方法,生成标准化的初始状态
	// 2. 调用 stateCreate 方法,创建最终可执行的全局状态 OverAllState
	// 3. 赋值给执行器的全局状态成员变量,作为整个流程的状态载体
	this.overallState = stateCreate(compiledGraph.getInitialState(inputs, config), initialState);

	// 将当前执行节点设置为【状态图起始节点(START)】
	this.currentNodeId = START;
	// 初始化下一个节点ID为null(执行前无预设下一跳节点)
	this.nextNodeId = null;
}

CompiledGraph#getInitialState 处理逻辑:

  1. 根据 threadId(会话 ID)从检查点保存器中加载历史检查点,取最后一条
  2. 如果存在检查点:调用 updateState 基于【键更新策略】和状态规则,合并【当前全局状态】与【局部更新状态】
  3. 如果不存在检查点:传入空 Map 调用 updateState
public Map<String, Object> getInitialState(Map<String, Object> inputs, RunnableConfig config) {

    // 1. 尝试从检查点保存器中加载历史检查点
    // 2. 如果存在检查点:将【历史状态】与【新输入】按照键策略合并
    // 3. 如果不存在检查点:将【空状态】与【新输入】按照键策略合并
    return compileConfig.checkpointSaver()
            .flatMap(saver -> saver.get(config))
            .map(checkpoint -> OverAllState.updateState(checkpoint.getState(), inputs, keyStrategyMap))
            .orElseGet(() -> OverAllState.updateState(new HashMap<>(), inputs, keyStrategyMap));
}

updatePartialStateFromSchema 是更新的核心方法:

  • 循环【执行时输入的状态数据】的所有 KEY
  • 获取对应的 KEY 策略,默认为 REPLACE
  • 调用 KEY 策略,传入【执行时输入的状态数据】、【检查点状态数据】,默认 REPLACE 则会直接使用传入的值,没有变化
  • 如果添加了【追加策略】,则会将【检查点状态数据】追加到【执行时输入的状态数据】
/**
 * 基于【键更新策略】和状态规则,合并【当前全局状态】与【局部更新状态】
 * 这是 OverAllState 状态合并的核心实现,所有键策略(追加/替换)都在此执行
 * @param state        当前全局状态(旧数据)
 * @param partialState 节点输出的局部状态(新数据/增量数据)
 * @param keyStrategies 键策略映射(key -> 追加/替换/合并策略)
 * @return 合并完成后的最终局部状态
 */
private static Map<String, Object> updatePartialStateFromSchema(Map<String, Object> state,
        Map<String, Object> partialState, Map<String, KeyStrategy> keyStrategies) {
    // 空值安全:如果局部状态为空,直接返回,无需合并
    if (partialState == null || partialState.isEmpty()) {
        return partialState;
    }

    // 流式遍历局部状态的每一个键值对,执行策略合并
    return partialState.entrySet().stream().map(entry -> {
        String key = entry.getKey();
        Object value = entry.getValue();

        // ============== 1. 处理【删除标记】逻辑 ==============
        // 如果值是 MARK_FOR_REMOVAL 常量,标记该键需要被删除(最终设为null)
        if (value == MARK_FOR_REMOVAL) {
            return entryOf(key, null);
        }

        // ============== 2. 获取当前键对应的更新策略 ==============
        // 从策略集合中获取对应key的策略,无则为null
        KeyStrategy channel = keyStrategies != null ? keyStrategies.get(key) : null;

        // 策略兜底:未配置策略 → 默认使用【替换策略(REPLACE)】
        if (channel == null) {
            channel = KeyStrategy.REPLACE;
        }

        // ============== 3. 执行策略:合并旧值 + 新值 ==============
        // state.get(key):全局状态中的旧值
        // value:局部状态中的新值
        // channel.apply():执行追加/替换/自定义合并逻辑
        Object newValue = channel.apply(state.get(key), value);

        // 返回合并后的键值对
        return entryOf(key, newValue);
    })
    // 收集结果:生成允许null值的Map(支持删除键)
    .collect(toMapAllowingNulls(Map.Entry::getKey, Map.Entry::getValue));
}

CompiledGraph#getInitialState 方法负责加载历史检查点状态,并结合用户输入、键更新策略,合并生成状态图执行的「标准化初始状态数据」,如果是用同一个线程 ID ,就算对话结束了,第二次调用工作流时(非恢复执行),还去查历史一次历史状态进行合并,这是为了实现短期记忆… 但是使用工作流就要注意了,如果最后一个历史状态的本次输入的 KEY 一样,又是追加策略的话,本次输入的值将会被追加!!!

短期记忆message 消息列表在检查点中是累加的,在最后一个节点的检查点会保存本次所有的对话历史,当 getInitialState 执行时,就会拿到最后一条检查点中的对话历史,它包含了线程 ID 绑定的所有历史:

在这里插入图片描述

【短期记忆】这种实现方式会导致每个检查点包含全部的对话历史,如果是长文本对话,那数据量…

3.2.4 创建可执行的全局状态

在 执行了 getInitialState 进行第一次状态更新后,开始创建正式的运行时状态:

	// ===================== 核心逻辑 =====================
	// 1. 调用编译状态图的方法,生成标准化的初始状态
	// 2. 调用 stateCreate 方法,创建最终可执行的全局状态 OverAllState
	// 3. 赋值给执行器的全局状态成员变量,作为整个流程的状态载体
	this.overallState = stateCreate(compiledGraph.getInitialState(inputs, config), initialState);

使用图结构中的键策略、外部传入的输入数据(经过了第一个状态更新),创建一个全新的 OverAllState 实例:

// FIXME 提示:该方法与 CompiledGraph.stateCreate 方法重复
// 需要统一规定 OverallState 的创建时机与创建方式
// 临时修复方案:确保用户传入的消息永远位于 messages 列表的最后一位
private OverAllState stateCreate(Map<String, Object> inputs, OverAllState initialState) {
    // 使用图结构中的键策略 + 外部传入的输入数据,创建一个全新的 OverAllState 实例
    return OverAllStateBuilder.builder()
            // 从初始状态中复制 状态键(key)的合并/管理策略
            .withKeyStrategies(initialState.keyStrategies())
            // 注入外部传入的输入数据(如用户消息、参数等)
            .withData(inputs)
            // 从初始状态中复制 存储对象(Store,用于状态持久化/临时存储)
            .withStore(initialState.getStore())
            // 构建并返回新的 OverAllState 状态对象
            .build();
}

构建完成后赋值给 GraphRunnerContext

在这里插入图片描述

3.2.5 处理开始节点

进入 MainGraphExecutor#handleStartNode 处理开始节点:

/**
 * 处理【图执行的起始节点】逻辑(Agent流程启动入口)
 * @param context 图运行上下文(包含状态、配置、会话、检查点等全部信息)
 * @return 包含起始节点处理结果的 GraphResponse 流
 */
private Flux<GraphResponse<NodeOutput>> handleStartNode(GraphRunnerContext context) {
	try {
		// 1. 触发 START 节点的监听器(生命周期钩子:流程开始时执行)
		context.doListeners(START, null);

		// 2. 获取图的入口命令(决定从哪个节点开始执行)
		Command nextCommand = context.getEntryPoint();

		// 3. 将下一个要执行的节点ID 设置到上下文
		context.setNextNodeId(nextCommand.gotoNode());

		// 4. 创建并保存【起始节点检查点】(短期记忆核心:快照当前状态)
		//    检查点 = 会话状态快照,用于断点续跑、多轮对话上下文恢复
		Optional<Checkpoint> cp = context.addCheckpoint(START, context.getNextNodeId());

		// 5. 构建起始节点的输出结果(包含节点执行状态+检查点信息)
		NodeOutput output = context.buildOutput(START, cp);

		// 6. 将当前执行节点 更新为 下一个节点(推进执行流程)
		context.setCurrentNodeId(context.getNextNodeId());

		// 7. 返回起始节点结果 + 递归执行后续节点(核心:启动Agent主执行流程)
		//    concatWith:先返回起始节点响应,再执行真正的节点流程
		//    defer:保证执行逻辑延迟调用,实现异步流式处理
		return Flux.just(GraphResponse.of(output))
			.concatWith(Flux.defer(() -> execute(context, new AtomicReference<>())));
	}
	catch (Exception e) {
		// 异常捕获:执行失败时返回错误响应
		return Flux.just(GraphResponse.error(e));
	}
}

获取图的下一个入口命令时,会将状态中的 data 数据赋值给 Command 对象:

在这里插入图片描述

addCheckpoint 方法中添加并持久化一个检查点,会使用克隆(直接创建一个新的状态对象)进行保存:

/**
 * 添加并持久化一个检查点(短期记忆核心方法)
 * 作用:创建当前状态快照 → 写入存储器(内存/Redis/DB)→ 实现会话记忆
 * @param nodeId 当前执行的节点ID
 * @param nextNodeId 下一个要执行的节点ID
 * @return 封装好的 Checkpoint 检查点对象
 * @throws Exception 存储/序列化异常
 */
public Optional<Checkpoint> addCheckpoint(String nodeId, String nextNodeId) throws Exception {
    // 1. 判断是否配置了检查点存储器(CheckpointSaver:Memory/Redis/JDBC)
    // 只有配置了 Saver 才会开启短期记忆
    if (compiledGraph.compileConfig.checkpointSaver().isPresent()) {

        // 2. 构建 Checkpoint 检查点对象(短期记忆的实体)
        var cp = Checkpoint.builder()
                .nodeId(nodeId)                // 当前节点ID
                .state(cloneState(overallState.data())) // 克隆并保存【全局状态】(对话历史+中间变量)
                .nextNodeId(nextNodeId)        // 下一个节点ID(用于断点续跑)
                .build();

        // 3. 强制将 checkPointId 设为 null
        // 目的:确保是【追加新检查点】,而不是覆盖旧检查点
        // 这是实现多轮对话、历史不断累加的关键设计
        RunnableConfig appendConfig = RunnableConfig.builder(config)
                .checkPointId(null)
                .build();

        // 4. 调用存储器保存检查点(写入内存/Redis/DB)
        // 短期记忆真正落地存储的地方
        this.config = compiledGraph.compileConfig.checkpointSaver().get().put(appendConfig, cp);

        // 5. 返回已保存的检查点
        return Optional.of(cp);
    }

    // 未配置 CheckpointSaver → 不开启短期记忆,返回空
    return Optional.empty();
}

buildOutput 构建输出时,又会克隆当前全局状态(OverAllState)返回:

/**
 * 构建【普通节点输出】(非快照模式)
 * 作用:封装节点执行后的结果、状态、令牌消耗,返回给上层调用
 * @param nodeId 当前执行的节点ID(如 LLM节点、工具节点)
 * @return 标准化的节点输出对象
 * @throws Exception 克隆状态异常
 */
public NodeOutput buildNodeOutput(String nodeId) throws Exception {
    return NodeOutput.of(
        // 1. 当前节点ID:标识这个输出来自哪个节点
        nodeId,

        // 2. 从配置元数据中获取 Agent 标识(若无则为空字符串)
        // 用于区分多个 Agent 实例/场景
        (String) config.metadata("_AGENT_").orElse(""),

        // 3. 【核心】深拷贝当前全局状态(OverAllState)
        // 包含:对话历史 messages、中间变量、工具执行结果等
        // 这是上下文传递、短期记忆的核心数据
        cloneState(this.overallState.data()),

        // 4. LLM 调用令牌消耗统计(输入tokens + 输出tokens)
        this.tokenUsage
    );
}

3.2.6 进入业务执行节点

NodeExecutor 中会获取业务执行节点对象,传入调用其 apply 方法,接收当前的全局状态、运行时配置作为输入,执行一些业务逻辑,并返回更新后的全局状态:

// 执行动作(如调用LLM、调用函数/工具),返回异步未来结果
CompletableFuture<Map<String, Object>> future = action.apply(context.getOverallState(),
					context.getConfig());

3.2.7 节点执行完成处理

节点执行完成后,调用 handleActionResult 进行处理:

// 将 JDK 异步 Future 转为 Reactor 异步流 Mono
return Mono.fromFuture(future)
    // 动作执行成功 → 处理结果,更新状态,继续执行图流程
	.flatMapMany(updateState -> handleActionResult(context, updateState, resultValue))
	// 异常处理:执行出错时触发错误监听器,返回错误响应
	.onErrorResume(error -> {
		context.doListeners(ERROR, new Exception(error));
		return Flux.just(GraphResponse.error(error));
	});

核心执行步骤:

  • 优先级处理响应流:依次检查并处理:标准嵌入式流 → 并行图流 → 兼容版图流,命中则直接返回对应处理结果。
  • 执行后中断判断:若节点支持中断,触发中断钩子,满足条件则合并状态、创建检查点、返回中断响应。
  • 标准流程处理:合并状态 → 判断配置化中断 → 计算下一个节点 → 构建输出 + 检查点 → 触发监听器。
  • 递归继续执行:封装当前节点响应,串联执行下一个节点,实现图的链式执行。
  • 全局异常捕获:所有异常统一封装为错误响应,保证流程不崩溃。
    在这里插入图片描述
    最后递归调用 MainGraphExecutor 进入下一个节点… 也是一样的处理逻辑,直到 END

2.2.8 第二次更新状态(节点执行完成时)

handleActionResult 中会进行合并状态更新:

context.mergeIntoCurrentState(updateState);

合并状态更新方法:

/**
 * 合并状态更新:同时更新当前上下文状态 和 全局总状态
 * 作用:将节点执行产生的状态变更,安全合并到执行上下文的状态中
 * @param updateState 待合并的状态更新数据
 */
public void mergeIntoCurrentState(Map<String, Object> updateState) {
    // 过滤状态数据:剔除令牌使用量(_TOKEN_USAGE_),单独存储
    Map<String, Object> filteredState = findTokenUsageInDeltaState(updateState);

    // 将过滤后的正常状态,更新到全局总状态中
    this.overallState.updateState(filteredState);
}

从状态更新中分离出【令牌使用量】数据:

/**
 * 临时修复方法:从状态更新中分离出【令牌使用量】数据
 * 备注:FIXME 临时方案,专门配合 AgentLlmNode 非流式节点使用
 * 功能:遍历状态数据,识别 _TOKEN_USAGE_ 对应的 Usage 对象,单独赋值给成员变量,不进入正常状态
 * @param updateState 原始状态更新数据
 * @return 过滤掉令牌使用量后的纯净状态数据
 */
private Map<String, Object> findTokenUsageInDeltaState(Map<String, Object> updateState) {
    // 存储过滤后的状态(不包含_TOKEN_USAGE_)
    Map<String, Object> filteredState = new HashMap<>();
    
    // 遍历所有待更新的状态键值对
    for (Map.Entry<String, Object> entry : updateState.entrySet()) {
        String key = entry.getKey();
        Object value = entry.getValue();
        
        // 判断:key 是 _TOKEN_USAGE_ 且 value 是 Usage 类型
        if (Objects.equals(key, "_TOKEN_USAGE_") && value instanceof Usage) {
            // 单独保存令牌使用量,不存入状态map
            this.tokenUsage = (Usage) value;
        } else {
            // 普通状态数据,放入过滤后的map中
            filteredState.put(key, value);
        }
    }
    
    return filteredState;
}

根据键策略合并/替换/删除状态数据:

/**
 * 执行状态更新:根据键策略合并/替换/删除状态数据
 * 核心:支持按key配置更新策略(默认覆盖),支持标记删除键
 * @param partialState 部分状态数据(增量更新)
 * @return 更新完成后的完整状态map
 */
public Map<String, Object> updateState(Map<String, Object> partialState) {
    // 获取当前状态的key更新策略配置(不同key可配置不同合并规则)
    Map<String, KeyStrategy> keyStrategies = keyStrategies();
    
    // 遍历所有需要更新的key
    partialState.keySet().forEach(key -> {
        // 获取该key对应的更新策略,无则使用默认 REPLACE(覆盖)
        KeyStrategy strategy = (keyStrategies != null) ? keyStrategies.get(key) : null;
        if (strategy == null) {
            strategy = KeyStrategy.REPLACE;
        }

        // 处理逻辑1:如果值是 MARK_FOR_REMOVAL 标记 → 删除该key
        if (partialState.get(key) == MARK_FOR_REMOVAL) {
            this.data.remove(key);
        }
        // 处理逻辑2:正常更新 → 使用策略合并新旧值
        else {
            // 获取旧值 → 应用策略 → 存入新值
            Object oldValue = value(key, null);
            Object newValue = strategy.apply(oldValue, partialState.get(key));
            this.data.put(key, newValue);
        }
    });
    
    // 返回更新后的完整状态
    return data();
}

3.2.9 处理结束节点

在最后的 END 节点中,使用最后的状态构建输出,但是不会保存检查点:

/**
 * 处理【图执行的结束节点】
 * 作用:Agent 执行完成、任务结束时调用,做收尾工作
 * @param context 图运行上下文(状态、配置、记忆)
 * @param resultValue 用于存储最终返回结果的原子引用
 * @return 包含结束节点结果的 GraphResponse 流
 */
private Flux<GraphResponse<NodeOutput>> handleEndNode(GraphRunnerContext context,
			AtomicReference<Object> resultValue) {
	try {
		// 1. 触发 END 生命周期监听器(流程结束钩子:日志、监控、收尾回调)
		context.doListeners(END, null);

		// 2. 构建结束节点输出(打包最终状态、token消耗、最终上下文)
		// 这里会返回最终的 OverAllState(包含完整对话历史 = 短期记忆最终状态)
		NodeOutput output = context.buildNodeOutput(END);

		// 3. 返回结束节点响应
		// 先输出结束节点结果 → 再执行完成收尾逻辑 handleCompletion
		return Flux.just(GraphResponse.of(output))
			.concatWith(Flux.defer(() -> handleCompletion(context, resultValue)));
	}
	catch (Exception e) {
		// 异常:流程结束出错时返回错误
		return Flux.just(GraphResponse.error(e));
	}
}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐