Spring AI Alibaba 1.x 系列【45】Graph 模块 Action 相关接口和实现类概览
文章目录
1. Spring AI Alibaba Graph
Spring AI Alibaba Graph 是底层运行时模块,Graph API 可以让开发者对流程有更全面的控制,开发者可以独立定义每个 Node 的逻辑、每条边的逻辑,最终按照业务需要编排成完成的流程图,除了流程编排之外,还原生支持 Streaming、Human In the Loop、Memory 等智能体核心能力。
2. Action
Spring AI Alibaba Graph 中的 Action 定为构成图的动作,此处的动作代表与图节点关联的可执行逻辑(节点),以及决定图边流转的条件逻辑(边),它们基于 OverAllState(状态) 运行,并可选择性地与 RunnableConfig (运行时配置)交互。
action 包中包含了所有动作(Action)相关的核心接口,这些接口的实现是定义基于图构建的有状态智能体工作流行为与控制流程的基础。本包明确区分同步操作与异步操作,支持灵活的图构建方式,以适配多种执行模型。

同步接口:
| 接口名称 | 功能说明 |
|---|---|
| NodeAction | 基础同步节点动作,处理流程状态并返回状态更新数据 |
| NodeActionWithConfig | 支持运行时配置的同步节点动作,可动态自定义节点行为 |
| EdgeAction | 基础同步路由边动作,根据状态判断流程的下一个跳转节点 |
| EdgeActionWithConfig | 支持运行时配置的同步路由边动作,灵活控制流程分支 |
| CommandAction | 结构化同步节点动作,统一封装状态更新与流程跳转指令 |
| MultiCommandAction | 同步多命令节点动作,接收状态与配置,返回多命令执行对象 |
| InterruptableAction | 支持执行前后双钩子的同步中断动作,可控制流程中断时机 |
| InterruptableActionWithConfig | 支持运行配置的同步中断动作,定义流程中断核心规则 |
异步接口:
| 接口名称 | 功能说明 |
|---|---|
| AsyncNodeAction | 异步非阻塞节点动作,适配耗时/IO密集型任务 |
| AsyncNodeActionWithConfig | 支持运行时配置的异步节点动作,非阻塞执行自定义逻辑 |
| AsyncEdgeAction | 异步非阻塞路由边动作,无阻塞判断流程下一步执行路径 |
| AsyncEdgeActionWithConfig | 支持运行时配置的异步路由边动作,灵活控制异步流程分支 |
| AsyncCommandAction | 结构化异步节点动作,异步封装状态更新与流程跳转指令 |
| AsyncMultiCommandAction | 异步多命令节点动作,异步返回多命令执行对象 |
3. Node(节点执行接口)
Node 是工作流的基本执行单元,每个 Node 负责处理特定的业务逻辑。接收状态(State)、运行时配置(RunnableConfig )作为输入,并返回更新后的状态。
3.1 NodeAction
NodeAction 只接收状态作为参数,适用于简单的业务逻辑:
@FunctionalInterface
public interface NodeAction {
Map<String, Object> apply(OverAllState state) throws Exception;
}
3.1.1 ConditionEvaluator(条件评估节点)
根据用户输入的关键词,自动判断下一个执行分支,将结果存入状态 _condition_result,用于图的路由跳转。
关键特性:
- 采用模板方法模式:
evaluateCondition为protected方法,支持子类重写自定义条件逻辑 - 默认关键词路由:检测
error/exception/data/analyze/report/summary,返回对应分支名 - 无侵入式状态更新:仅返回条件结果,不修改原有状态
适用场景:工作流的分支网关,根据用户输入自动分发到不同处理节点(错误处理、数据处理、报告生成等)。
/**
* 条件评估节点
* 实现NodeAction接口,用于执行条件判断逻辑,确定工作流的下一个执行路径
* 该节点会解析全局状态,设置条件标记,为工作流路由决策提供依据
*/
public class ConditionEvaluator implements NodeAction {
/**
* 条件判断结果的存储键名(固定值)
*/
private static final String CONDITION_KEY = "_condition_result";
/**
* 重写节点执行方法
* 执行条件评估,并将结果存入状态Map中返回
* @param state 工作流全局状态对象
* @return 更新后的状态数据
* @throws Exception 执行过程中可能抛出的异常
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 定义用于更新的状态集合
Map<String, Object> updatedState = new HashMap<>();
// 执行条件判断逻辑(默认实现,可通过重写方法扩展自定义逻辑)
String conditionResult = evaluateCondition(state);
// 将条件判断结果存入状态
updatedState.put(CONDITION_KEY, conditionResult);
return updatedState;
}
/**
* 条件判断核心方法
* 可被子类重写,实现自定义的条件判断逻辑
* @param state 当前工作流的全局状态
* @return 条件判断结果(工作流分支路由标识)
*/
protected String evaluateCondition(OverAllState state) {
// 获取用户输入并转为小写,用于关键词匹配
String input = state.value("input", "").toString().toLowerCase();
// 根据输入关键词匹配对应的处理分支
if (input.contains("error") || input.contains("exception")) {
// 包含错误/异常关键词 -> 错误处理分支
return "error_handling";
}
else if (input.contains("data") || input.contains("analyze")) {
// 包含数据/分析关键词 -> 数据处理分支
return "data_processing";
}
else if (input.contains("report") || input.contains("summary")) {
// 包含报告/总结关键词 -> 报告生成分支
return "report_generation";
}
else {
// 无匹配关键词 -> 默认处理分支
return "default";
}
}
/**
* 建造者模式入口方法
* @return Builder构建器对象
*/
public static Builder builder() {
return new Builder();
}
/**
* 建造者内部类
* 用于创建ConditionEvaluator实例
*/
public static class Builder {
/**
* 构建ConditionEvaluator对象
* @return 条件评估节点实例
*/
public ConditionEvaluator build() {
return new ConditionEvaluator();
}
}
}
3.1.2 RoutingMergeNode(路由结果合并节点)
收集并行 / 路由执行的子代理输出,通过 LLM 合成一份简洁、无冗余的最终答案。
关键特性:
- 自动提取子代理输出:兼容
Message/GraphResponse等多种数据格式 LLM智能合成:使用系统提示词整合信息、去重、标注来源、处理冲突- 可配置输出键:默认
merged_result,支持自定义 - 日志完善:记录合并过程,便于调试
适用场景:简单并行任务,仅需要展示所有执行结果,无需智能合并的轻量化场景。
/**
* 路由结果合并节点
* 专用工作流节点:收集由路由代理分发的子代理执行结果,通过大语言模型(LLM)将结果合成为连贯的答案,
* 并将合并后的最终结果写入工作流全局状态中
*/
public class RoutingMergeNode implements NodeAction {
/** 日志对象 */
private static final Logger logger = LoggerFactory.getLogger(RoutingMergeNode.class);
/** 合并结果在全局状态中的默认存储键名 */
public static final String DEFAULT_MERGED_OUTPUT_KEY = "merged_result";
/**
* LLM 结果合成系统提示词模板
* 功能:指导大模型整合多源结果,去重、突出重点、标注冲突、简洁排版
*/
private static final String SYNTHESIZE_SYSTEM_TEMPLATE = """
Synthesize these search results to answer the original question: "%s"
- Combine information from multiple sources without redundancy
- Highlight the most relevant and actionable information
- Note any discrepancies between sources
- Keep the response concise and well-organized
""";
/** Spring AI 大模型接口,用于执行结果合成 */
private final ChatModel chatModel;
/** 子代理列表,需要收集这些代理的执行结果 */
private final List<BaseAgent> subAgents;
/** 合并结果的存储键名,可自定义 */
private final String mergedOutputKey;
/**
* 构造方法:使用默认的结果存储键名
* @param chatModel 大模型实例
* @param subAgents 子代理列表
*/
public RoutingMergeNode(ChatModel chatModel, List<BaseAgent> subAgents) {
this(chatModel, subAgents, DEFAULT_MERGED_OUTPUT_KEY);
}
/**
* 全参构造方法:支持自定义合并结果的存储键名
* @param chatModel 大模型实例
* @param subAgents 子代理列表
* @param mergedOutputKey 自定义合并结果存储键名
*/
public RoutingMergeNode(ChatModel chatModel, List<BaseAgent> subAgents, String mergedOutputKey) {
this.chatModel = chatModel;
this.subAgents = subAgents;
// 空值保护:未指定则使用默认键名
this.mergedOutputKey = mergedOutputKey != null ? mergedOutputKey : DEFAULT_MERGED_OUTPUT_KEY;
}
/**
* 节点核心执行方法
* 1. 收集所有子代理的执行结果
* 2. 提取用户原始问题
* 3. 调用LLM合成最终答案
* 4. 将合并结果返回给工作流状态
* @param state 工作流全局状态
* @return 包含合并结果的状态数据
* @throws Exception 执行异常
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
logger.debug("RoutingMergeNode: 开始合并 {} 个子代理的执行结果", subAgents.size());
// 存储格式化后的子代理结果
List<String> formattedResults = new ArrayList<>();
// 遍历所有子代理,收集执行结果
for (BaseAgent subAgent : subAgents) {
// 获取子代理的输出键名
String outputKey = subAgent.getOutputKey();
if (outputKey == null) {
continue;
}
// 从全局状态中获取子代理的输出结果
Optional<Object> outputOpt = state.value(outputKey);
if (outputOpt.isPresent()) {
// 提取结果中的文本内容
String text = extractText(outputOpt.get(), outputKey);
if (text != null && !text.isBlank()) {
// 格式化结果,标注来源代理名称
String source = capitalize(subAgent.name());
formattedResults.add("**From " + source + ":**\n" + text);
logger.debug("已收集子代理 {} 的结果 (键名: {})", subAgent.name(), outputKey);
}
}
}
// 提取用户的原始问题
String query = extractOriginalQuery(state);
// 调用LLM合成最终答案
String finalAnswer = synthesize(query, formattedResults);
logger.debug("RoutingMergeNode: 已将 {} 个结果源合成为最终答案", formattedResults.size());
// 将最终合并结果写入状态并返回
return Map.of(mergedOutputKey, finalAnswer);
}
/**
* 从工作流状态中提取用户的原始提问
* 逻辑:获取消息列表中的最后一条用户消息
* @param state 全局状态
* @return 用户原始问题文本
*/
private String extractOriginalQuery(OverAllState state) {
// 从状态中获取消息列表,空值保护
@SuppressWarnings("unchecked")
List<Message> messages = (List<Message>) state.value("messages").orElse(List.of());
if (messages.isEmpty()) {
return "";
}
// 获取最后一条消息(用户提问)
Message last = messages.get(messages.size() - 1);
return last.getText() != null ? last.getText() : "";
}
/**
* 调用大模型,将多源结果合成为统一答案
* @param query 用户原始问题
* @param formattedResults 格式化后的子代理结果列表
* @return LLM合成后的最终答案
*/
private String synthesize(String query, List<String> formattedResults) {
// 无结果时直接返回提示
if (formattedResults == null || formattedResults.isEmpty()) {
return "No results found from any knowledge source.";
}
// 拼接所有子代理结果
String formatted = String.join("\n\n", formattedResults);
// 格式化系统提示词
String systemPrompt = SYNTHESIZE_SYSTEM_TEMPLATE.formatted(query);
// 构建LLM对话提示
Prompt prompt = new Prompt(List.of(
new SystemMessage(systemPrompt),
new UserMessage(formatted)));
// 调用大模型并返回结果文本
ChatResponse response = chatModel.call(prompt);
return response.getResult().getOutput().getText();
}
/**
* 通用文本提取工具方法
* 兼容 Message、GraphResponse、普通对象等多种数据格式,提取纯文本内容
* @param output 待提取的结果对象
* @param outputKey 输出键名(用于Map类型数据提取)
* @return 提取后的文本内容
*/
public static String extractText(Object output, String outputKey) {
// 对象是消息类型,直接获取文本
if (output instanceof Message message) {
return message.getText();
}
// 对象是工作流响应类型,递归提取结果
if (output instanceof GraphResponse<?> gr) {
Optional<?> val = gr.resultValue();
if (val.isPresent()) {
Object v = val.get();
// 如果是Map结构,根据输出键取值
if (v instanceof Map<?, ?> map) {
v = map.get(outputKey);
}
// 消息对象提取文本
if (v instanceof Message m) {
return m.getText();
}
// 普通对象转为字符串
return v.toString();
}
return "";
}
// 普通对象空值保护并转字符串
return output != null ? output.toString() : "";
}
/**
* 字符串首字母大写,其余小写格式化工具方法
* 用于格式化代理名称,提升展示效果
* @param s 原始字符串
* @return 格式化后的字符串
*/
private static String capitalize(String s) {
if (s == null || s.isEmpty()) return s;
return s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase();
}
}
3.1.3 ParallelResultAggregator(基础版并行结果聚合器)
简单收集状态中所有非消息数据,拼接为文本字符串。
关键特性:
- 极简实现:遍历状态数据,排除
messages后拼接结果 - 强制校验:必须指定
outputKey,避免空值
适用场景:简单并行任务,仅需要展示所有执行结果,无需智能合并的轻量化场景。
/**
* 并行执行结果聚合节点
* 实现NodeAction接口,用于收集多个并行代理的执行结果,
* 将所有结果合并为一个总结性的字符串,并写入工作流全局状态
*/
public class ParallelResultAggregator implements NodeAction {
/** 聚合结果的存储键名 */
private final String outputKey;
/**
* 构造方法
* @param outputKey 聚合结果的存储键名
*/
public ParallelResultAggregator(String outputKey) {
this.outputKey = outputKey;
}
/**
* 节点核心执行方法
* 1. 遍历全局状态,收集所有非系统消息的代理执行结果
* 2. 将结果拼接为格式化的字符串
* 3. 将拼接结果存入状态并返回
* @param state 工作流全局状态对象
* @return 包含聚合结果的状态数据
* @throws Exception 执行过程中可能抛出的异常
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 定义更新后的状态集合
Map<String, Object> updatedState = new HashMap<>();
// 用于存储所有收集到的代理执行结果
Map<String, Object> aggregatedResults = new HashMap<>();
// 遍历全局状态中的所有数据,筛选代理输出结果
Map<String, Object> stateData = state.data();
for (Map.Entry<String, Object> entry : stateData.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
// 跳过空值和系统消息键,只收集业务执行结果
if (value != null && !key.equals("messages")) {
aggregatedResults.put(key, value);
}
}
// 拼接聚合结果,生成总结性文本
StringBuilder combinedResult = new StringBuilder();
combinedResult.append("Parallel execution results:\n");
// 遍历聚合结果,格式化拼接
for (Map.Entry<String, Object> entry : aggregatedResults.entrySet()) {
combinedResult.append("- ").append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
}
// 将聚合后的结果存入状态
updatedState.put(this.outputKey, combinedResult.toString());
return updatedState;
}
/**
* 建造者模式入口方法
* @return Builder构建器对象
*/
public static Builder builder() {
return new Builder();
}
/**
* 建造者内部类
* 用于链式创建ParallelResultAggregator实例,增加代码可读性
*/
public static class Builder {
/** 聚合结果的存储键名 */
private String outputKey;
/**
* 设置结果存储键名
* @param outputKey 键名
* @return Builder构建器
*/
public Builder outputKey(String outputKey) {
this.outputKey = outputKey;
return this;
}
/**
* 构建ParallelResultAggregator实例
* 校验outputKey不能为空,否则抛出异常
* @return 并行结果聚合节点实例
*/
public ParallelResultAggregator build() {
if (outputKey == null || outputKey.trim().isEmpty()) {
throw new IllegalArgumentException("outputKey must not be null or empty");
}
return new ParallelResultAggregator(outputKey);
}
}
}
3.1.4 EnhancedParallelResultAggregator(增强版并行结果聚合器)
支持自定义合并策略、并发控制、状态键策略,是 ParallelAgent 的配套聚合节点。
关键特性:
- 策略模式:支持
ParallelAgent.MergeStrategy自定义合并逻辑 - 状态智能更新:使用
KeyStrategy.REPLACE管理状态覆盖 - 健壮性处理:兼容
GraphResponse嵌套数据结构 - 空值保护:无输出键时不存储结果,避免冗余
适用场景:复杂并行任务(多 Agent 并发执行),需要灵活合并结果、控制并发、精细化管理状态的场景。
在这里插入代码片
3.1.5 TransparentNode(透明节点)
执行后返回空 Map,不修改任何状态,仅作为流程占位符。
public class TransparentNode implements NodeAction {
public TransparentNode() {
}
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
return Map.of();
}
}
3.2 NodeActionWithConfig
NodeActionWithConfig 额外接收运行配置,可以访问元数据、线程 ID 等信息,适用于需要上下文信息的场景:
@FunctionalInterface
public interface NodeActionWithConfig {
Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception;
}
3.2.1 AgentLlmNode(大模型执行节点)
实现 NodeActionWithConfig 接口,作为多智能体工作流底层基础组件,负责大模型推理、工具调用、消息编排等核心执行能力。
核心能力:
- 双模式大模型调用:支持同步阻塞调用和流式响应调用,适配不同业务场景(实时对话 / 批量处理);
- 全链路工具调用管理:集成
Spring AI工具回调,支持工具筛选、动态工具注入、工具执行配置,关闭框架内部工具执行,适配AI工作流的工具调度; - 可扩展拦截器机制:内置模型拦截器链,支持在模型调用前后做自定义处理(如路由、参数修改、结果增强);
- 提示词工程增强:支持系统提示词、指令模板渲染、输出
Schema自动追加,保证大模型输出格式标准化; - 状态与上下文管理:读取工作流全局状态,自动构建对话消息,支持线程
ID隔离、推理轮次计数; - 可观测性保障:内置推理日志、令牌用量统计、异常捕获,方便调试与监控;
- 高度可配置化:支持自定义输出键、模型参数、顾问(
Advisor)、模板渲染器等。
3.2.2 AgentToolNode(工具执行节点)
实现 NodeActionWithConfig 接口,负责解析智能体助手消息中的工具调用指令,完成同步/异步/并行工具调度执行,是智能体工作流中工具调用能力的底层核心组件。
核心能力:
- 双模式工具执行:支持串行执行、并行执行两种模式,并行模式支持信号量限流,避免资源耗尽;
- 全类型工具兼容:无缝适配
Spring AI同步工具、异步工具(AsyncToolCallback)、可取消异步工具,自动路由执行逻辑; - 异步增强封装:支持将同步工具自动包装为异步执行,统一异步执行规范,降低接入成本;
- 可扩展拦截器机制:内置工具拦截器链,支持工具调用前后自定义处理(参数修改、结果增强、日志埋点);
- 安全状态管理:串行/并行模式均实现状态隔离,并行模式支持
KeyStrategy状态合并规则,保证线程安全; - 完善容错机制:支持工具执行超时控制、优雅取消、异常统一处理、错误信息标准化;
- 动态工具解析:支持静态工具注册 + 动态工具注入,兼容配置元数据传递的动态工具回调;
- 部分响应处理:支持工具部分执行结果续跑,完成剩余工具调用,适配分步执行场景;
- 可观测性保障:内置线程隔离执行日志、执行结果日志,支持调试与线上监控。
3.2.3 AgentToSubCompiledGraphNodeAdapter(智能体转子图节点适配器)
实现 NodeActionWithConfig(节点执行)+ ResumableSubGraphAction(子图可恢复)双接口,将子编译图(子工作流)封装为父工作流的普通执行节点,实现多智能体工作流的分层嵌套、调用与数据互通。
核心能力:
- 父子工作流嵌套适配:无缝将子
CompiledGraph封装为父图节点,实现复杂工作流的分层拆分与组合调用; - 子图可恢复执行:支持子工作流断点续跑,通过元数据标记恢复状态,适配长流程执行场景;
- 状态隔离与透传:父图状态自动透传给子图,支持自定义是否保留消息上下文,避免数据污染;
- 流式响应处理:基于
Reactor Flux实现子图流式响应,滑动窗口缓冲保证消息有序输出; - 智能消息裁剪:支持推理内容开关配置,可选择返回完整推理链或仅最终结果;
- 运行环境隔离:自动生成子图独立线程
ID,隔离父子图检查点(Checkpoint)数据; - 配置独立管理:为子图构建专属运行配置,上下文、检查点、元数据完全隔离,互不干扰。
3.2.4 A2aNodeActionWithConfig(A2A执行节点)
多智能体图引擎(A2A)顶层节点执行核心契约接口,定义了工作流节点带运行配置的标准化执行规范,是所有业务节点、智能体节点、工具节点、子图节点的基础父接口,为整个图引擎提供统一的节点执行契约。
核心能力:
- 带配置标准化执行:定义统一的
apply(OverAllState, RunnableConfig)执行方法,绑定全局状态+运行时配置双入参,是引擎调度节点的核心标准; - 运行上下文全透传:支持线程ID、元数据、检查点、上下文信息透传,适配多智能体分布式、断点续跑、隔离执行场景;
- 统一结果输出规范:固定
Map<String, Object>出参格式,保证工作流上下游节点数据互通、状态流转标准化; - 全类型节点兼容:统一适配
LLM推理节点、工具执行节点、子图嵌套节点、条件判断节点、自定义业务节点等所有节点类型; - 引擎生态深度集成:与图引擎的状态管理、调度器、持久化、拦截器、流式响应等核心能力无缝对接。
3.3 AsyncNodeAction
异步执行节点,支持非阻塞 IO 操作:
@FunctionalInterface
public interface AsyncNodeAction extends Function<OverAllState, CompletableFuture<Map<String, Object>>> {
/**
* 执行节点的异步动作
* 将当前动作应用到指定的代理状态上
*
* @param state 工作流的全局状态对象(所有节点共享的状态)
* @return CompletableFuture 异步执行结果,封装了状态更新的键值对数据
*/
CompletableFuture<Map<String, Object>> apply(OverAllState state);
/**
* 【静态工具方法】将同步节点动作 转换为 异步节点动作
* 用于快速把实现了 NodeAction 的同步节点,包装为 Graph 支持的异步节点
*
* @param syncAction 同步节点动作(同步执行的业务逻辑)
* @return 包装后的异步节点动作
*/
static AsyncNodeAction node_async(NodeAction syncAction) {
// Lambda 实现 AsyncNodeAction 接口
return state -> {
// 获取当前线程上下文(用于保持线程局部变量,如请求上下文、国际化信息等)
Context context = Context.current();
// 创建异步结果对象,用于封装执行结果/异常
CompletableFuture<Map<String, Object>> result = new CompletableFuture<>();
try {
// 同步执行节点业务逻辑
Map<String, Object> stateUpdate = syncAction.apply(state);
// 将同步执行的结果,赋值给异步Future
result.complete(stateUpdate);
} catch (Exception e) {
// 捕获执行异常,将异常信息封装到异步Future中
result.completeExceptionally(e);
}
// 返回异步Future对象
return result;
};
}
}
3.4 AsyncNodeActionWithConfig
【带运行配置】的异步节点动作接口,接收【全局状态】+【运行配置】两个参数,执行异步节点逻辑,并返回状态更新的异步结果:
@FunctionalInterface
public interface AsyncNodeActionWithConfig
extends BiFunction<OverAllState, RunnableConfig, CompletableFuture<Map<String, Object>>> {
/**
* 执行带配置的异步节点动作
*
* @param state 工作流全局共享状态(所有节点可读写)
* @param config 节点运行时配置(包含线程ID、元数据、配置参数等)
* @return CompletableFuture 异步执行结果,封装状态更新的键值对
*/
CompletableFuture<Map<String, Object>> apply(OverAllState state, RunnableConfig config);
/**
* 静态工具方法:将【同步带配置节点】包装为【异步带配置节点】
* 核心作用:快速把自定义的同步 NodeActionWithConfig 转为 Graph 框架支持的异步节点
*
* @param syncAction 同步实现的带配置节点动作
* @return 异步包装后的 AsyncNodeActionWithConfig 实例
*/
static AsyncNodeActionWithConfig node_async(NodeActionWithConfig syncAction) {
return (state, config) -> {
// 获取当前线程上下文(保持请求上下文、国际化等线程局部变量)
Context context = Context.current();
// 创建异步结果对象,承载执行结果或异常
CompletableFuture<Map<String, Object>> result = new CompletableFuture<>();
try {
// 同步执行业务逻辑,完成异步结果
result.complete(syncAction.apply(state, config));
} catch (Exception e) {
// 捕获异常,封装到异步Future中
result.completeExceptionally(e);
}
return result;
};
}
/**
* 静态适配方法:将普通异步节点(AsyncNodeAction)适配为带配置的异步节点
* 特殊处理:如果原始节点支持中断(InterruptableAction),则使用包装类保证中断能力
*
* @param action 普通无配置的异步节点动作
* @return 适配后的带配置异步节点动作
*/
static AsyncNodeActionWithConfig of(AsyncNodeAction action) {
// 判断是否为可中断节点,是则使用中断包装器
if (action instanceof InterruptableAction) {
return new InterruptableAsyncNodeActionWrapper(action, (InterruptableAction) action);
}
// 普通节点直接忽略配置参数,调用原异步节点逻辑
return (state, config) -> action.apply(state);
}
/**
* 可中断异步节点包装器(内部工具类)
* 作用:为支持中断的 AsyncNodeAction 提供配置兼容包装,保留中断能力
* 同时实现 AsyncNodeActionWithConfig 和 InterruptableAction 接口
*/
class InterruptableAsyncNodeActionWrapper implements AsyncNodeActionWithConfig, InterruptableAction {
/**
* 被代理的原始异步节点
*/
private final AsyncNodeAction delegate;
/**
* 可中断动作接口(用于调用中断逻辑)
*/
private final InterruptableAction interruptable;
/**
* 构造方法:注入代理对象和中断接口对象
*
* @param delegate 原始异步节点
* @param interruptable 中断能力接口实现
*/
public InterruptableAsyncNodeActionWrapper(AsyncNodeAction delegate, InterruptableAction interruptable) {
this.delegate = delegate;
this.interruptable = interruptable;
}
/**
* 执行异步节点逻辑(委托给原始节点执行,忽略配置参数)
*/
@Override
public CompletableFuture<Map<String, Object>> apply(OverAllState state, RunnableConfig config) {
return delegate.apply(state);
}
/**
* 节点执行前中断逻辑(委托给原始中断接口)
*/
@Override
public java.util.Optional<InterruptionMetadata> interrupt(String nodeId, OverAllState state, RunnableConfig config) {
return interruptable.interrupt(nodeId, state, config);
}
/**
* 节点执行后中断逻辑(委托给原始中断接口)
*/
@Override
public java.util.Optional<InterruptionMetadata> interruptAfter(String nodeId, OverAllState state,
Map<String, Object> actionResult, RunnableConfig config) {
return interruptable.interruptAfter(nodeId, state, actionResult, config);
}
}
}
4. Edge(边路由接口)
Edge 定义 Node 间的控制流,可为固定连接,也可依据状态条件动态决定下一步执行路径,实现分支逻辑。通过组合 Node 和 Edge,开发者可以创建复杂的循环工作流,随着时间的推移不断更新 State 状态。
简而言之:Node 完成工作,Edge 告诉下一步该做什么。
4.1 EdgeAction
EdgeAction 用于定义 AI 状态图中【边(Edge)】的执行逻辑,接收全局状态并返回状态流转的目标节点 ID :
@FunctionalInterface
public interface EdgeAction {
/**
* 执行边的动作逻辑
* 基于当前全局状态,计算并返回下一个要执行的节点ID
*
* @param state AI状态图的全局状态对象,包含所有上下文数据
* @return 下一个执行节点的唯一标识ID(字符串类型)
* @throws Exception 执行过程中发生的异常,由框架统一处理
*/
String apply(OverAllState state) throws Exception;
}
框架中未提供相关实现类:

4.2 EdgeActionWithConfig
EdgeActionWithConfig 相比普通 EdgeAction,额外支持接收运行时配置参数,适用于需要动态配置的状态流转场景:
@FunctionalInterface
public interface EdgeActionWithConfig {
/**
* 执行带运行配置的边动作逻辑
* 基于AI全局状态和运行时配置,计算并返回状态流转的目标节点ID
*
* @param state AI状态图全局状态对象,存储所有上下文、对话、中间计算结果等数据
* @param runnableConfig 运行时配置对象,包含执行过程中的动态配置参数
* @return 字符串类型的下一个节点唯一ID,框架将根据该值路由到对应的节点
* @throws Exception 执行逻辑时发生的异常,由框架统一捕获和处理
*/
String apply(OverAllState state, RunnableConfig runnableConfig) throws Exception;
}
框架中未提供相关实现类:

4.3 AsyncEdgeAction
异步边动作接口,表示一个操作【代理状态】并返回【新路由节点名称】的异步边执行逻辑:
@FunctionalInterface
public interface AsyncEdgeAction extends Function<OverAllState, CompletableFuture<String>> {
/**
* 执行异步边路由动作
* 根据当前全局状态,异步计算并返回下一个要执行的节点名称
*
* @param state 工作流的全局状态对象(所有节点共享的状态)
* @return CompletableFuture 异步执行结果,封装了下一个节点的路由名称
*/
CompletableFuture<String> apply(OverAllState state);
/**
* 【静态工具方法】将同步边动作 转换为 异步边动作
* 用于快速把实现了 EdgeAction 的同步路由逻辑,包装为 Graph 支持的异步边动作
* 是工作流中定义**条件路由**的核心方法
*
* @param syncAction 同步边动作(同步执行的路由判断逻辑)
* @return 包装后的异步边动作
*/
static AsyncEdgeAction edge_async(EdgeAction syncAction) {
// Lambda 实现 AsyncEdgeAction 接口
return state -> {
// 获取当前线程上下文(保持线程局部变量,如请求上下文、国际化信息等)
Context context = Context.current();
// 创建异步结果对象,用于封装路由结果/异常
CompletableFuture<String> result = new CompletableFuture<>();
try {
// 同步执行路由判断逻辑
String nextNode = syncAction.apply(state);
// 将同步执行的路由结果,赋值给异步Future
result.complete(nextNode);
}
catch (Exception e) {
// 捕获执行异常,将异常信息封装到异步Future中
result.completeExceptionally(e);
}
// 返回异步Future对象
return result;
};
}
}
4.4 AsyncEdgeAction
【带运行配置】的异步边动作接口,接收【全局状态】+【运行配置】,异步执行路由逻辑,返回下一个节点的名称:
@FunctionalInterface
public interface AsyncEdgeActionWithConfig extends BiFunction<OverAllState, RunnableConfig, CompletableFuture<String>> {
/**
* 执行异步路由动作(带运行配置)
* 根据全局状态和运行时配置,异步计算工作流的下一个执行节点
*
* @param state 工作流全局共享状态
* @param runnableConfig 节点运行时配置(包含线程标识、上下文元数据等)
* @return CompletableFuture 异步结果,封装下一个节点的名称字符串
*/
CompletableFuture<String> apply(OverAllState state, RunnableConfig runnableConfig);
/**
* 静态工具方法:将【同步带配置边动作】转换为【异步带配置边动作】
* 核心作用:快速将自定义的同步路由逻辑(EdgeActionWithConfig)包装为框架支持的异步对象
*
* @param syncAction 同步实现的带配置路由动作
* @return 异步包装后的 AsyncEdgeActionWithConfig 实例
*/
static AsyncEdgeActionWithConfig edge_async(EdgeActionWithConfig syncAction) {
return (state, runnableConfig) -> {
// 获取当前线程上下文,保证线程局部变量传递
Context context = Context.current();
// 创建异步结果对象,承载路由结果或异常信息
CompletableFuture<String> result = new CompletableFuture<>();
try {
// 同步执行路由逻辑,完成异步结果
result.complete(syncAction.apply(state, runnableConfig));
} catch (Exception e) {
// 捕获执行异常,封装到异步Future中
result.completeExceptionally(e);
}
return result;
};
}
}
5. Command (命令式路由)
Command(不可变记录类)封装了状态图下一步的执行指令,包含跳转目标节点 + 全局状态更新数据。
/**
* 命令实体(不可变记录类)
* @param gotoNode 下一个要执行的节点名称(不能为空)
* @param update 要合并到全局状态的键值对数据,空集合表示不更新状态
*/
public record Command(String gotoNode, Map<String, Object> update) {
/**
* 紧凑构造方法(自动校验参数非空)
* 强制要求 gotoNode 和 update 都不能为 null,保证数据合法性
*/
public Command {
Objects.requireNonNull(gotoNode, "跳转节点名称(gotoNode)不能为空");
Objects.requireNonNull(update, "状态更新集合(update)不能为空");
}
/**
* 简化构造方法
* 仅指定跳转节点,不修改全局状态(自动传入空的更新集合)
* @param gotoNode 下一个要执行的节点名称
*/
public Command(String gotoNode) {
this(gotoNode, Map.of());
}
}
MultiCommand 表示可路由到多个节点并行执行的命令,适用于条件边动作需要返回多个目标节点的场景(AI 状态图并行分支):
/**
* 【多命令实体(并行执行)】
* @param gotoNodes 需要并行执行的节点标识ID列表
* @param update 待合并到当前全局状态的键值对更新数据,空集合表示无更新
*/
public record MultiCommand(List<String> gotoNodes, Map<String, Object> update) {
/**
* 紧凑构造方法(自动校验参数合法性)
* 强制校验:节点列表和更新集合不能为null,节点列表不能为空集合
*/
public MultiCommand {
Objects.requireNonNull(gotoNodes, "并行执行节点列表(gotoNodes)不能为空");
Objects.requireNonNull(update, "状态更新集合(update)不能为空");
if (gotoNodes.isEmpty()) {
throw new IllegalArgumentException("并行执行节点列表(gotoNodes)不能为空集合");
}
}
/**
* 简化构造方法
* 仅指定并行执行的节点列表,不修改全局状态(自动使用空的更新集合)
* @param gotoNodes 需要并行执行的节点标识ID列表,不可为空
*/
public MultiCommand(List<String> gotoNodes) {
this(gotoNodes, Map.of());
}
/**
* 判断当前多命令是否仅指向单个节点
* 用于兼容仅需单节点执行的旧版逻辑(向后兼容)
* @return 仅包含一个节点时返回true,否则返回false
*/
public boolean isSingleNode() {
return gotoNodes.size() == 1;
}
/**
* 将【单节点】的多命令转换为标准命令(Command)
*
* @return 转换后的标准命令对象
* @throws IllegalStateException 如果当前命令包含多个节点,无法转换时抛出异常
*/
public Command toCommand() {
if (!isSingleNode()) {
throw new IllegalStateException("无法将包含多个节点的MultiCommand转换为单一Command");
}
return new Command(gotoNodes.get(0), update);
}
}
5.1 CommandAction
命令动作接口,接收全局状态和运行配置,生成包含【跳转节点+状态更新】的命令指令:
@FunctionalInterface
public interface CommandAction {
/**
* 执行命令动作
* 根据当前全局状态和运行时配置,生成状态图的下一步执行命令
*
* @param state 状态图全局状态对象,存储所有上下文数据
* @param config 执行运行时配置,包含动态参数、执行配置等信息
* @return Command 命令对象,封装了跳转节点和全局状态更新数据
* @throws Exception 执行过程中发生的异常,由框架统一处理
*/
Command apply(OverAllState state, RunnableConfig config) throws Exception;
}
5.2 MultiCommandAction
同步版本的多命令执行动作接口,用于状态图中,支持返回多个执行命令的节点业务逻辑定义:
/**
* 【多命令动作接口】
*
* @see AsyncMultiCommandAction 对应的异步版本接口
*/
@FunctionalInterface
public interface MultiCommandAction {
/**
* 执行多命令动作(同步)
* 根据全局状态和运行时配置,生成并返回多个状态图执行命令
*
* @param state 状态图全局状态对象,存储所有上下文数据
* @param config 执行运行时配置,包含动态参数、执行配置等信息
* @return MultiCommand 多命令对象,封装批量的节点跳转和状态更新指令
* @throws Exception 执行过程中发生的异常,由框架统一处理
*/
MultiCommand apply(OverAllState state, RunnableConfig config) throws Exception;
}
5.3 AsyncCommandAction
函数式接口,继承 BiFunction,用于工作流中执行异步命令操作,接收全局状态(OverAllState) + 运行配置(RunnableConfig),返回异步执行的命令结果(CompletableFuture<Command>):
public interface AsyncCommandAction extends BiFunction<OverAllState, RunnableConfig, CompletableFuture<Command>> {
/**
* 静态工具方法:将【同步命令动作】转换为【异步命令动作】
* 核心作用:快速把自定义的同步 CommandAction 包装为框架支持的异步命令动作
* 保持线程上下文,捕获执行异常并封装到异步结果中
*
* @param syncAction 同步实现的命令动作
* @return 异步包装后的 AsyncCommandAction 实例
*/
static AsyncCommandAction node_async(CommandAction syncAction) {
return (state, config) -> {
// 获取当前线程上下文,保证线程局部变量传递(如请求上下文、国际化)
Context context = Context.current();
// 创建异步结果对象,承载命令执行结果或异常
CompletableFuture<Command> result = new CompletableFuture<>();
try {
// 同步执行命令逻辑,完成异步结果
result.complete(syncAction.apply(state, config));
} catch (Exception e) {
// 捕获异常,封装到异步Future中
result.completeExceptionally(e);
}
return result;
};
}
/**
* 静态适配方法:将【普通异步边动作】适配为【异步命令动作】
* 功能:将边动作的路由结果(节点名称),封装为 Command 对象
* 实现边路由与命令模式的兼容转换
*
* @param action 普通无配置的异步边动作
* @return 适配后的异步命令动作
*/
static AsyncCommandAction of(AsyncEdgeAction action) {
return (state, config) -> action.apply(state).thenApply(Command::new);
}
/**
* 静态适配方法:将【带配置异步边动作】适配为【异步命令动作】
* 功能:将带配置的边路由结果,封装为 Command 对象
* 支持运行时配置的边动作兼容转换
*
* @param action 带配置的异步边动作
* @return 适配后的异步命令动作
*/
static AsyncCommandAction of(AsyncEdgeActionWithConfig action) {
return (state, config) -> action.apply(state, config).thenApply(Command::new);
}
}
5.4 AsyncMultiCommandAction
【异步多命令动作】接口,支持返回多个目标节点,实现工作流的并行执行能力,条件边需要同时路由到多个节点,并发执行多个分支逻辑场景:
/**
* 【异步多命令动作】接口
* @see AsyncCommandAction 单节点路由的异步命令动作(与之对应)
* @see MultiCommand 本接口的返回值类型,封装多个并行执行的节点ID
*/
public interface AsyncMultiCommandAction extends BiFunction<OverAllState, RunnableConfig, CompletableFuture<MultiCommand>> {
/**
* 静态工具方法:将【同步多命令动作】转换为【异步多命令动作】
* 作用:快速把自定义的同步并行路由逻辑,包装为框架支持的异步并行执行动作
*
* @param syncAction 同步实现的多命令并行路由动作
* @return 包装后的异步多命令动作实例
*/
static AsyncMultiCommandAction node_async(MultiCommandAction syncAction) {
return (state, config) -> {
// 保存当前线程上下文,保证线程局部变量正常传递
Context context = Context.current();
// 创建异步结果对象,承载多命令执行结果或异常
CompletableFuture<MultiCommand> result = new CompletableFuture<>();
try {
// 同步执行并行路由逻辑,完成异步结果
result.complete(syncAction.apply(state, config));
} catch (Exception e) {
// 捕获执行异常,封装到异步Future中
result.completeExceptionally(e);
}
return result;
};
}
/**
* 静态适配方法:将【无配置异步函数】适配为异步多命令动作
* 适配函数:仅依赖全局状态,返回待并行执行的节点ID列表
* 自动将节点ID列表包装为 MultiCommand 对象
*
* @param action 接收全局状态、返回节点ID列表的异步函数
* @return 适配后的异步多命令动作
*/
static AsyncMultiCommandAction of(Function<OverAllState, CompletableFuture<List<String>>> action) {
return (state, config) -> action.apply(state)
.thenApply(nodeIds -> new MultiCommand(nodeIds));
}
/**
* 静态适配方法:将【带配置异步函数】适配为异步多命令动作
* 适配函数:依赖全局状态 + 运行配置,返回待并行执行的节点ID列表
* 自动将节点ID列表包装为 MultiCommand 对象
*
* @param action 接收状态与配置、返回节点ID列表的异步函数
* @return 适配后的异步多命令动作
*/
static AsyncMultiCommandAction of(BiFunction<OverAllState, RunnableConfig, CompletableFuture<List<String>>> action) {
return (state, config) -> action.apply(state, config)
.thenApply(MultiCommand::new);
}
}
6. Interruptable(中断支持)
6.1 InterruptableAction
【可中断动作接口】定义状态图执行过程中支持中断操作的动作契约,提供两个中断钩子方法,分别在节点执行前后触发,控制状态图的暂停/中断逻辑:
public interface InterruptableAction {
/**
* 【节点执行前中断检查】
* 在当前节点的动作执行(apply方法)**之前**调用,判断是否需要中断状态图执行
* @param nodeId 当前正在处理的节点唯一标识
* @param state 智能体当前的全局状态
* @param config 运行时配置参数
* @return 包含中断元数据的Optional对象:
* 非空 → 中断状态图执行;
* 空 → 继续执行节点逻辑
*/
Optional<InterruptionMetadata> interrupt(String nodeId, OverAllState state, RunnableConfig config);
/**
* 【节点执行后中断检查】
* 在当前节点的动作执行(apply方法)**完成后**、结果合并到状态前调用
* 可通过节点执行结果判断是否需要中断,支持更灵活的中断逻辑
* <p>
* 若返回中断信息:执行结果会合并到状态,并创建检查点后再触发中断
* @param nodeId 当前正在处理的节点唯一标识
* @param state 智能体当前的全局状态(结果未合并前)
* @param actionResult 节点动作执行返回的结果数据
* @param config 运行时配置参数
* @return 包含中断元数据的Optional对象,默认返回空(不中断)
*/
default Optional<InterruptionMetadata> interruptAfter(
String nodeId,
OverAllState state,
Map<String, Object> actionResult,
RunnableConfig config) {
return Optional.empty();
}
}
6.2 InterruptableActionWithConfig
【简化版可中断动作接口(带配置)】仅保留节点执行前的中断检查能力,适用于无需后置中断检查的简化场景:
public interface InterruptableActionWithConfig {
/**
* 【节点执行前中断检查】
* 判断状态图在当前节点是否需要中断执行
* @param nodeId 当前正在处理的节点唯一标识
* @param state 智能体当前的全局状态
* @param config 运行时配置参数
* @return 包含中断元数据的Optional对象:
* 非空 → 中断状态图执行;
* 空 → 继续执行节点逻辑
*/
Optional<InterruptionMetadata> interrupt(String nodeId, OverAllState state, RunnableConfig config);
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)