1. 前言

在调用 CompiledGraph#invoke 方法执行时:

	/**
	 * 同步执行状态图流程,阻塞等待最终结果
	 * 底层基于响应式 stream 实现,阻塞获取最后一个输出结果,并提取最终状态
	 * @param inputs 图执行输入参数
	 * @param config 运行时配置(线程、中断、持久化等)
	 * @return 包装最终状态的 Optional,执行异常/无结果时返回空
	 */
	public Optional<OverAllState> invoke(Map<String, Object> inputs, RunnableConfig config) {
		return Optional.ofNullable(stream(inputs, config).last().map(NodeOutput::state).block());
	}

同步执行的底层也是基于响应式 stream 实现:

	/**
	 * 响应式流式执行状态图(基于 Project Reactor)
	 * 是执行图的核心响应式入口,将输入数据构建为全局状态后开始执行流
	 * @param inputs 图执行输入参数
	 * @param config 运行时配置
	 * @return 响应式流 Flux,持续发射节点执行结果 NodeOutput
	 */
	public Flux<NodeOutput> stream(Map<String, Object> inputs, RunnableConfig config) {
		return streamFromInitialNode(stateCreate(inputs), config);
	}

会先创建并初始化图执行的全局状态对象 OverAllState

/**
 * 创建并初始化图执行的全局状态对象 OverAllState
 * 处理输入参数、自动生成执行ID、装配键策略和数据存储
 * @param inputs 图执行的输入参数
 * @return 构建完成的全局状态对象
 */
private OverAllState stateCreate(Map<String, Object> inputs) {
	// 处理空输入:如果输入为null,初始化为空Map(用于恢复执行场景)
	if (inputs == null) {
		inputs = new HashMap<>();
	}

	// 确保执行ID存在:如果没有执行ID,自动生成UUID作为唯一执行标识
	if (!inputs.containsKey(GraphLifecycleListener.EXECUTION_ID_KEY)) {
		Map<String, Object> newInputs = new HashMap<>(inputs);
		newInputs.put(GraphLifecycleListener.EXECUTION_ID_KEY, java.util.UUID.randomUUID().toString());
		inputs = newInputs;
	}

	// 使用建造者模式构建全局状态
	// 1. 设置键策略映射  2. 设置输入数据  3. 设置全局存储  4. 构建最终状态
	return OverAllStateBuilder.builder()
			.withKeyStrategies(getKeyStrategyMap())    // 绑定图的键策略
			.withData(inputs)                         // 绑定输入参数
			.withStore(compileConfig.getStore())       // 绑定全局存储
			.build();
}

工作流真正的执行是从 GraphRunner 开始:


	/**
	 * 从初始全局状态开始,创建响应式执行流
	 * 真正启动 GraphRunner 执行图逻辑,并处理执行中的正常/完成/异常三种信号
	 * @param overAllState 初始全局状态(包含输入、策略、存储)
	 * @param config 运行时配置
	 * @return 节点执行输出流 Flux<NodeOutput>
	 */
	public Flux<NodeOutput> streamFromInitialNode(OverAllState overAllState, RunnableConfig config) {
		// 校验配置不能为空
		Objects.requireNonNull(config, "config cannot be null");
		try {
			// 创建图执行器
			GraphRunner runner = new GraphRunner(this, config);
			// 运行执行器,处理执行结果流
			return runner.run(overAllState).flatMap(data -> {
				// 情况1:执行完成 → 输出最终结果
				if (data.isDone()) {
					if (data.resultValue().isPresent() && data.resultValue().get() instanceof NodeOutput) {
						return Flux.just((NodeOutput) data.resultValue().get());
					} else {
						return Flux.empty();
					}
				}
				// 情况2:执行异常 → 发射异常信号
				if (data.isError()) {
					return Mono.fromFuture(data.getOutput()).onErrorMap(throwable -> throwable).flux();
				}
				// 情况3:正常执行中 → 发射当前节点输出
				return Mono.fromFuture(data.getOutput()).flux();
			});
		} catch (Exception e) {
			// 启动异常 → 直接返回错误流
			return Flux.error(e);
		}
	}

2. GraphRunner

GraphRunnerSpring AI Alibaba Graph 的【图执行引擎】,自己不执行逻辑,只负责接收初始状态 → 交给执行器 → 流式返回结果。

源码类注释:

这是一个基于 Reactor 的响应式图执行引擎
为了代码更清晰、职责更分离,使用了面向对象(封装、继承、多态)重构。

2.1 成员变量

public class GraphRunner {

	private final CompiledGraph compiledGraph;
	private final RunnableConfig config;
	private final AtomicReference<Object> resultValue = new AtomicReference<>();
	private final MainGraphExecutor mainGraphExecutor;

逐个解释:

  • CompiledGraph: 已经编译好的可执行流程图
  • RunnableConfig:运行时配置类
  • resultValue线程安全存放最终执行结果
  • MainGraphExecutor真正干活的核心执行器GraphRunner 只是外壳,执行逻辑都在这

2.2 构造方法

传入编译好的图 + 运行配置,并初始化执行器:

public GraphRunner(CompiledGraph compiledGraph, RunnableConfig config) {
	this.compiledGraph = compiledGraph;
	this.config = config;
	this.mainGraphExecutor = new MainGraphExecutor();
}

2.3 run()

核心逻辑:

  • 初始化 GraphRunnerContext (图运行上下文)
  • 调用真正的执行器运行图流程
  • 响应式流式返回
public Flux<GraphResponse<NodeOutput>> run(OverAllState initialState) {
	return Flux.defer(() -> {
		try {
			GraphRunnerContext context = new GraphRunnerContext(initialState, config, compiledGraph);
			return mainGraphExecutor.execute(context, resultValue);
		}
		catch (Exception e) {
			return Flux.error(e);
		}
	});
}

3. BaseGraphExecutor

BaseGraphExecutor所有图执行器的统一抽象父类,为所有图执行器提供通用的执行逻辑、完成处理、线程释放、结果封装等能力,采用抽象类设计,子类必须实现核心执行方法,体现多态与封装特性。

可用实现类:

在这里插入图片描述

整体层级关系:

  • 继承关系:两个执行器都继承 BaseGraphExecutor
  • 协作关系
    • GraphRunner 调用 MainGraphExecutor
    • MainGraphExecutor 内部调用 NodeExecutor

3.1 execute()

定义了抽象执行方法,由子类实现具体的图执行逻辑:

public abstract class BaseGraphExecutor {
	/**
	 * 抽象执行方法:由子类实现具体的图执行逻辑
	 * 不同执行器(同步/异步/流式)提供各自的执行实现,体现多态设计
	 *
	 * @param context      图执行上下文(包含编译图、配置、状态等)
	 * @param resultValue  结果存储引用,用于保存最终执行结果
	 * @return             图执行响应流 Flux<GraphResponse<NodeOutput>>
	 */
	public abstract Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context,
			AtomicReference<Object> resultValue);
}

3.2 handleCompletion()

定义了处理图执行完成逻辑(子类可复用):

	/**
	 * 统一处理图执行完成逻辑(子类可复用)
	 * 封装通用完成逻辑:释放线程、保存检查点、封装最终结果
	 *
	 * @param context      图执行上下文
	 * @param resultValue  结果引用,用于存储最终返回值
	 * @return             完成状态的响应流
	 */
	protected Flux<GraphResponse<NodeOutput>> handleCompletion(GraphRunnerContext context,
			AtomicReference<Object> resultValue) {
		return Flux.defer(() -> {
			try {
				// 判断是否需要释放线程 + 存在检查点保存器
				if (context.getCompiledGraph().compileConfig.releaseThread()
						&& context.getCompiledGraph().compileConfig.checkpointSaver().isPresent()) {
					// 执行线程释放,获取释放标记
					BaseCheckpointSaver.Tag tag = context
						.getCompiledGraph().compileConfig.checkpointSaver()
						.get()
						.release(context.getConfig());
					resultValue.set(tag);
				} else {
					// 无释放配置:直接返回当前状态数据副本
					resultValue.set(new HashMap<>(context.getOverallState().data()));
				}
				// 封装完成响应并返回
				return Flux.just(GraphResponse.done(resultValue.get()));
			}
			catch (Exception e) {
				// 异常时封装错误响应
				return Flux.just(GraphResponse.error(e));
			}
		});
	}

}

4. MainGraphExecutor

继承 BaseGraphExecutor 是图执行核心处理器(主执行器),实现完整的状态图执行流程:启动节点 → 业务节点 → 中断/恢复 → 结束节点 → 完成处理。

public class MainGraphExecutor extends BaseGraphExecutor {

	/** 节点执行器:负责单个节点的实际逻辑运行 */
	private final NodeExecutor nodeExecutor;

	/**
	 * 构造方法:初始化节点执行器
	 */
	public MainGraphExecutor() {
		this.nodeExecutor = new NodeExecutor(this);
	}

4.1 execute()

核心职责

  1. 判断当前节点类型(START / END / 普通节点)
  2. 处理中断、恢复、循环、最大迭代次数
  3. 调度节点执行器(NodeExecutor)运行节点

【核心执行方法】重写抽象执行逻辑:

	/**
	 * 【核心执行方法】重写抽象执行逻辑
	 * 编排整个图执行流程:启动、中断、恢复、节点执行、结束、完成
	 * @param context 图执行上下文(状态、配置、节点、边)
	 * @param resultValue 执行结果引用
	 * @return 响应式执行流
	 */
	@Override
	public Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {
		try {
			// ===================== 1. 终止条件:停止/达到最大迭代次数 =====================
			if (context.shouldStop() || context.isMaxIterationsReached()) {
				return handleCompletion(context, resultValue);
			}

			// ===================== 2. 从嵌入模式返回处理 =====================
			final var returnFromEmbed = context.getReturnFromEmbedAndReset();
			if (returnFromEmbed.isPresent()) {
				var interruption = returnFromEmbed.get().value(new TypeRef<InterruptionMetadata>() {
				});
				if (interruption.isPresent()) {
					return Flux.just(GraphResponse.done(interruption.get()));
				}
				return Flux.just(GraphResponse.done(context.buildNodeOutputAndAddCheckpoint(Map.of())));
			}

			// ===================== 3. 中断恢复:当前节点已中断,标记为已恢复 =====================
			if (context.getCurrentNodeId() != null && context.getConfig().isInterrupted(context.getCurrentNodeId())) {
				context.getConfig().withNodeResumed(context.getCurrentNodeId());
				return Flux.just(GraphResponse.done(context.getCurrentStateData()));
			}

			// ===================== 4. 处理 START 启动节点 =====================
			if (context.isStartNode()) {
				return handleStartNode(context);
			}

			// ===================== 5. 处理 END 结束节点 =====================
			if (context.isEndNode()) {
				return handleEndNode(context, resultValue);
			}

			// ===================== 6. 从中断状态恢复 =====================
			final var resumeFrom = context.getResumeFromAndReset();
			if (resumeFrom.isPresent()) {
				if (context.getCompiledGraph().compileConfig.interruptBeforeEdge()
						&& java.util.Objects.equals(context.getNextNodeId(), INTERRUPT_AFTER)) {
					var nextNodeCommand = context.nextNodeId(resumeFrom.get(), context.getCurrentStateData());
					context.setNextNodeId(nextNodeCommand.gotoNode());
					context.setCurrentNodeId(null);
				}
			}

			// ===================== 7. 执行中断:需要暂停,返回中断元数据 =====================
			if (context.shouldInterrupt()) {
				try {
					InterruptionMetadata metadata = InterruptionMetadata
						.builder(context.getCurrentNodeId(), context.cloneState(context.getCurrentStateData()))
						.build();
					return Flux.just(GraphResponse.done(metadata));
				}
				catch (Exception e) {
					return Flux.just(GraphResponse.error(e));
				}
			}

			// ===================== 8. 正常执行:交给节点执行器运行业务节点 =====================
			return nodeExecutor.execute(context, resultValue);
		}
		catch (Exception e) {
			// 全局异常捕获:触发错误监听器 + 打印日志 + 返回错误流
			context.doListeners(ERROR, e);
			org.slf4j.LoggerFactory.getLogger(com.alibaba.cloud.ai.graph.GraphRunner.class)
				.error("Error during graph execution", e);
			return Flux.just(GraphResponse.error(e));
		}
	}

处理图启动节点(START):

	/**
	 * 处理图启动节点(START)
	 * 触发启动监听器 → 获取入口节点 → 设置下一个节点 → 生成检查点 → 递归执行
	 */
	private Flux<GraphResponse<NodeOutput>> handleStartNode(GraphRunnerContext context) {
		try {
			// 触发 START 监听器
			context.doListeners(START, null);
			// 获取入口节点命令
			Command nextCommand = context.getEntryPoint();
			context.setNextNodeId(nextCommand.gotoNode());

			// 保存检查点
			Optional<Checkpoint> cp = context.addCheckpoint(START, context.getNextNodeId());
			NodeOutput output = context.buildOutput(START, cp);

			// 切换当前节点为入口节点,继续递归执行
			context.setCurrentNodeId(context.getNextNodeId());
			return Flux.just(GraphResponse.of(output))
				.concatWith(Flux.defer(() -> execute(context, new AtomicReference<>())));
		}
		catch (Exception e) {
			return Flux.just(GraphResponse.error(e));
		}
	}

处理图结束节点(END):

	/**
	 * 处理图结束节点(END)
	 * 触发结束监听器 → 构建输出 → 执行完成逻辑
	 */
	private Flux<GraphResponse<NodeOutput>> handleEndNode(GraphRunnerContext context,
			AtomicReference<Object> resultValue) {
		try {
			// 触发 END 监听器
			context.doListeners(END, null);
			NodeOutput output = context.buildNodeOutput(END);
			// 输出结束信号,然后执行完成逻辑
			return Flux.just(GraphResponse.of(output))
				.concatWith(Flux.defer(() -> handleCompletion(context, resultValue)));
		}
		catch (Exception e) {
			return Flux.just(GraphResponse.error(e));
		}
	}


5. NodeExecutor

单个 Node 真正执行的执行者,负责跑每一个具体业务节点,不负责全局流程,只专注「当前节点干完活」。

核心职责

  1. 根据节点 ID 获取 NodeActionLLM节点/检索节点/工具节点/自定义节点)
  2. 执行节点异步逻辑 action.apply()
  3. 支持 InterruptableAction 前置/后置中断拦截
  4. 合并节点输出增量 State全局 OverAllState
  5. 根据 Edge 规则计算下一个节点 ID
  6. 处理高级能力:
    • 流式 Flux 输出
    • Parallel 并行节点
    • GraphFlux / ParallelGraphFlux 子流
    • 流式分片合并、推理内容合并、工具调用合并
  7. 执行完节点后,交回 MainGraphExecutor 继续下一轮调度

内部持有主执行器,执行完节点后递归回去继续调度:

public class NodeExecutor extends BaseGraphExecutor {

	private static final Logger log = LoggerFactory.getLogger(NodeExecutor.class);

	// 持有主执行器,执行完节点后递归回去继续调度
	private final MainGraphExecutor mainGraphExecutor;

	public NodeExecutor(MainGraphExecutor mainGraphExecutor) {
		this.mainGraphExecutor = mainGraphExecutor;
	}
	// ..............
}

MainGraphExecutor 的构造函数中会将自己本身传递给 NodeExecutor

	public MainGraphExecutor() {
		this.nodeExecutor = new NodeExecutor(this);
	}


5.1 executeNode()



	@Override
	public Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {
		return executeNode(context, resultValue);
	}

	/**
	 * 【核心】执行单个节点
	 */
	private Flux<GraphResponse<NodeOutput>> executeNode(
			GraphRunnerContext context,
			AtomicReference<Object> resultValue
	) {
		try {
			// 1. 设置当前节点
			context.setCurrentNodeId(context.getNextNodeId());
			String currentNodeId = context.getCurrentNodeId();

			// 2. 获取节点要执行的动作(LLM/检索/自定义)
			AsyncNodeActionWithConfig action = context.getNodeAction(currentNodeId);

			if (action == null) {
				return Flux.just(GraphResponse.error(RunnableErrors.missingNode.exception(currentNodeId)));
			}

			// 3. 处理可中断动作
			if (action instanceof InterruptableAction interruptable) {
				context.getConfig().metadata(RunnableConfig.STATE_UPDATE_METADATA_KEY).ifPresent(update -> {
					if (update instanceof Map<?, ?> map) {
						context.mergeIntoCurrentState((Map<String, Object>) map);
					}
				});

				Optional<InterruptionMetadata> interruptMetadata = interruptable.interrupt(
						currentNodeId,
						context.cloneState(context.getCurrentStateData()),
						context.getConfig()
				);

				if (interruptMetadata.isPresent()) {
					resultValue.set(interruptMetadata.get());
					return Flux.just(GraphResponse.done(interruptMetadata.get()));
				}
			}

			// 4. 触发节点前置监听器
			context.doListeners(NODE_BEFORE, null);

			// 5. 【执行节点】异步执行
			var future = action.apply(context.getOverallState(), context.getConfig());

			// 6. 处理执行结果
			return Mono.fromFuture(future)
					.flatMapMany(updateState -> handleActionResult(context, updateState, resultValue))
					.onErrorResume(error -> {
						context.doListeners(ERROR, new Exception(error));
						return Flux.just(GraphResponse.error(error));
					});

		}
		catch (Exception e) {
			return Flux.just(GraphResponse.error(e));
		}
	}

5.2 handleActionResult()

	/**
	 * 处理节点执行结果:状态合并 + 路由下一个节点
	 */
	private Flux<GraphResponse<NodeOutput>> handleActionResult(
			GraphRunnerContext context,
			Map<String, Object> updateState,
			AtomicReference<Object> resultValue
	) {
		try {
			// 处理流式输出 Flux
			Optional<Flux<GraphResponse<NodeOutput>>> embedFlux = getEmbedFlux(context, updateState);
			if (embedFlux.isPresent()) {
				return handleEmbeddedFlux(mainGraphExecutor, context, embedFlux.get(), updateState, resultValue);
			}

			// 处理并行流
			Optional<ParallelGraphFlux> parallelFlux = getEmbedParallelGraphFlux(updateState);
			if (parallelFlux.isPresent()) {
				return handleParallelGraphFlux(context, parallelFlux.get(), updateState, resultValue);
			}

			// 处理中断
			String currentNodeId = context.getCurrentNodeId();
			AsyncNodeActionWithConfig action = context.getNodeAction(currentNodeId);
			if (action instanceof InterruptableAction interruptable) {
				Optional<InterruptionMetadata> interruptMetadata = interruptable.interruptAfter(
						currentNodeId,
						context.cloneState(context.getCurrentStateData()),
						updateState,
						context.getConfig()
				);
				if (interruptMetadata.isPresent()) {
					context.mergeIntoCurrentState(updateState);
					Command nextCmd = context.nextNodeId(currentNodeId, context.getCurrentStateData());
					context.setNextNodeId(nextCmd.gotoNode());
					context.buildNodeOutputAndAddCheckpoint(updateState);
					context.doListeners(NODE_AFTER, null);
					resultValue.set(interruptMetadata.get());
					return Flux.just(GraphResponse.done(interruptMetadata.get()));
				}
			}

			// --------------------------
			// 【核心】合并状态到全局State
			// --------------------------
			context.mergeIntoCurrentState(updateState);

			// --------------------------
			// 计算下一个节点
			// --------------------------
			if (context.getCompiledGraph().compileConfig.interruptBeforeEdge()
					&& context.getCompiledGraph().compileConfig.interruptsAfter().contains(currentNodeId)) {
				context.setNextNodeId(INTERRUPT_AFTER);
			}
			else {
				Command nextCmd = context.nextNodeId(currentNodeId, context.getCurrentStateData());
				context.setNextNodeId(nextCmd.gotoNode());
			}

			// 构建输出 & 保存 checkpoint
			NodeOutput output = context.buildNodeOutputAndAddCheckpoint(updateState);
			context.doListeners(NODE_AFTER, null);

			// --------------------------
			// 递归回到主执行器,继续跑下一个节点
			// --------------------------
			return Flux.just(GraphResponse.of(output))
					.concatWith(Flux.defer(() -> mainGraphExecutor.execute(context, resultValue)));
		}
		catch (Exception e) {
			return Flux.just(GraphResponse.error(e));
		}
	}

5.3 流式、并行、中断处理


	// ==================== 工具方法:流式、并行、中断处理 ====================
	public Optional<Flux<GraphResponse<NodeOutput>>> getEmbedFlux(GraphRunnerContext context, Map<String, Object> partialState) {
		return partialState.entrySet().stream()
				.filter(e -> e.getValue() instanceof Flux<?>)
				.findFirst()
				.map(e -> transformFluxToGraphResponse(context, (Flux<?>) e.getValue(), e.getKey(), context.getCurrentNodeId()));
	}

	private Optional<ParallelGraphFlux> getEmbedParallelGraphFlux(Map<String, Object> partialState) {
		return partialState.entrySet().stream()
				.filter(e -> e.getValue() instanceof ParallelGraphFlux)
				.findFirst()
				.map(e -> (ParallelGraphFlux) e.getValue());
	}

	// 流式输出转换(省略大量流式细节,保留核心逻辑)
	private Flux<GraphResponse<NodeOutput>> transformFluxToGraphResponse(
			GraphRunnerContext context, Flux<?> rawFlux, String key, String nodeId) {
		return rawFlux.map(element -> GraphResponse.of(context.buildStreamingOutput(element, nodeId, true)));
	}

	// 处理嵌入式流
	public Flux<GraphResponse<NodeOutput>> handleEmbeddedFlux(
			MainGraphExecutor main, GraphRunnerContext context,
			Flux<GraphResponse<NodeOutput>> flux, Map<String, Object> state, AtomicReference<Object> result
	) {
		return flux.concatWith(Flux.defer(() -> main.execute(context, result)));
	}

	// 处理并行流
	private Flux<GraphResponse<NodeOutput>> handleParallelGraphFlux(
			GraphRunnerContext context, ParallelGraphFlux flux,
			Map<String, Object> state, AtomicReference<Object> result
	) throws Exception {
		return Flux.merge(flux.getGraphFluxes().stream()
						.map(f -> transformFluxToGraphResponse(context, f.getFlux(), f.getKey(), f.getNodeId()))
						.toList())
				.concatWith(Flux.defer(() -> mainGraphExecutor.execute(context, result)));
	}

	// 中断后处理
	private Optional<InterruptionMetadata> interruptAfterForStreaming(GraphRunnerContext context, Map<String, Object> result) {
		var action = context.getNodeAction(context.getCurrentNodeId());
		if (action instanceof InterruptableAction ia) {
			return ia.interruptAfter(context.getCurrentNodeId(), context.cloneState(context.getCurrentStateData()), result, context.getConfig());
		}
		return Optional.empty();
	}
}

6. 总结

6.1 类的关系

  1. GraphRunner:启动入口
  2. MainGraphExecutor:判断节点、处理中断、循环、递归调度
  3. NodeExecutor:执行节点、合并状态、找下一个节点
  4. BaseGraphExecutor:公共完成逻辑

GraphRunner(入口)

MainGraphExecutor(大脑:调度)

NodeExecutor(手脚:执行节点)

BaseGraphExecutor(基类:公共逻辑)


6.2 Graph 引擎执行流程图

在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐