在基于 Spring AI Alibaba 实践多 Agent(智能体)+ 人工介入(HITL,Human-in-the-Loop)功能时,很多开发者会遇到恢复执行阶段抛出IllegalStateException: Resume request without a valid checkpoint!的异常。本文将深度剖析问题根源,并提供可落地的解决方案。

问题现象

在多 Agent 串联执行(如 SequentialAgent 包含多个子 Agent)场景下,当某个子 Agent 触发人工介入审核(HITL)后,前端提交审核结果恢复执行时,系统抛出如下异常:

java.lang.IllegalStateException: Resume request without a valid checkpoint!
	at com.alibaba.cloud.ai.graph.GraphRunnerContext.lambda$initializeFromResume$1(GraphRunnerContext.java:99)
	at java.base/java.util.Optional.orElseThrow(Optional.java:403)
	at com.alibaba.cloud.ai.graph.GraphRunnerContext.initializeFromResume(GraphRunnerContext.java:99)
	at com.alibaba.cloud.ai.graph.GraphRunnerContext.<init>(GraphRunnerContext.java:87)
	at com.alibaba.cloud.ai.graph.GraphRunner.lambda$run$0(GraphRunner.java:51)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:46)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:238)

先说结论

可以在LlmRoutingAgentFlowAgent中加入Listener,然后在Listener.after方法中清除人工介入节点

import cn.hutool.core.collection.CollUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.cloud.ai.graph.GraphLifecycleListener;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.agent.Agent;
import com.alibaba.cloud.ai.graph.agent.flow.agent.FlowAgent;
import com.alibaba.cloud.ai.graph.utils.TypeRef;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * 清除人工介入节点listener
 *
 * @author Mright
 * @version 1.0
 * @since 2026/4/14
 */
public class ClearInterruptionMetadataMataListener implements GraphLifecycleListener {

    @Override
    public void after(String nodeId, Map<String, Object> state, RunnableConfig config, Long curTime) {
        //从Spring中获取所有流程代理
        Map<String, FlowAgent> flowAgentMap = SpringUtil.getBeansOfType(FlowAgent.class);
        if (CollUtil.isEmpty(flowAgentMap)) {
            return;
        }

        //获取所有子agent
        List<Agent> subAgents = flowAgentMap.values().stream().flatMap(flowAgent -> flowAgent.subAgents().stream()).collect(Collectors.toList());

        if (CollUtil.isEmpty(subAgents)) {
            return;
        }

        //如果是子agent,则移除人工介入节点
        ///此处只需移除人工介入节点,无需移除check point,第一是没有提供check point修改的方法,第二是子图在执行时会新创建RunnerConfig,并将checkPoint置空
        ///参考com.alibaba.cloud.ai.graph.internal.node.SubCompiledGraphNodeAction.apply()和com.alibaba.cloud.ai.graph.agent.ReactAgent.AgentToSubCompiledGraphNodeAdapter.apply()
        if (subAgents.stream().map(Agent::name).anyMatch(name -> name.equals(nodeId))) {
            //移除人工介入节点
            config.getMetadataAndRemove(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, new TypeRef<>() {
            });
        }
    }
}

将listner加入父agent

public FlowAgent answerAuditAgent(@Qualifier("openAiChatModel") ChatModel openAiChatModel) {
      //compileConfig
      CompileConfig compileConfig = CompileConfig.builder()
              //清除人工介入信息Listener
              .withLifecycleListener(new ClearInterruptionMetadataMataListener())
              .build();

      return SequentialAgent.builder()
              .name("answer audit agent")
              .description("评估llm生成的答案,使用Web,并改进响应以确保与真实世界保持一致")
              .subAgents(List.of(criticAgent(openAiChatModel), reviserAgent(openAiChatModel)))
              .saver(new MemorySaver())
              .compileConfig(compileConfig)
              .build();
  }

参考issues

核心场景复现

智能体配置结构

本文以「答案审核场景」为例,配置两个子 Agent(编辑 Agent、调查 Agent),并通过 SequentialAgent 顺序执行:

    @Bean("criticAgent")
    public ReactAgent criticAgent(@Qualifier("openAiChatModel") ChatModel openAiChatModel) {
        WebSearchTool webSearchTool = new WebSearchTool();
        return ReactAgent.builder()
                .model(openAiChatModel)
                .tools(webSearchTool.getFunctionToolCallback())
                .instruction(CRITIC_PROMPT_WORDS)
	            .description("专业编辑智能体,尽量减少对答案文本的修改,使其准确,同时保持整体结构、风格和长度与原文相似")
                .name("critic agent")
                .hooks(new CriticAgentModelHook(), new CriticAgentHook(), new HumanInTheLoopHook.Builder()
                        .approvalOn(webSearchTool.getToolName(), ToolConfig.builder()
                                .description("需要授权执行网页搜索")
                                .build())
                        .build())
                .outputKey("critic_agent_output")
                .saver(new MemorySaver())
                .build();
    }


    @Bean("reviserAgent")
    public ReactAgent reviserAgent(@Qualifier("openAiChatModel") ChatModel openAiChatModel) {
        return ReactAgent.builder()
                .name("reviser agent")
                .model(openAiChatModel)
	            .description("专业的调查记者,擅长批判性思维和在高度可信的出版物上发表之前核实信息")
                .instruction(REVISER_PROMPT_WORDS)
                .hooks(new ReviserAgentModelHook(), new ReviserAgentHook())
                .outputKey("reviser_agent_output")
                .saver(new MemorySaver())
                .build();
    }

    @Bean("answerAuditAgent")
    public FlowAgent answerAuditAgent(@Qualifier("openAiChatModel") ChatModel openAiChatModel) {
        return SequentialAgent.builder()
                .name("answer audit agent")
                .description("评估llm生成的答案,使用Web,并改进响应以确保与真实世界保持一致")
                .subAgents(List.of(criticAgent(openAiChatModel), reviserAgent(openAiChatModel)))
                .saver(new MemorySaver())
                .build();
    }

恢复执行的代码如下

    /**
     * 人工审核后恢复执行。
     */
    @CrossOrigin(origins = "*")
    @PostMapping(value = "/agent/stream/answerAudit/approval", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> resumeAfterApproval(@RequestBody ApprovalRequest request) {
        try {
       
            ...

            // 构建可恢复执行的RunnableConfig
            RunnableConfig runnableConfig = RunnableConfig.builder()
                    .threadId(threadId)
                    .addHumanFeedback(updatedInterruptionMetadata)
                    .build();

            //执行
            return getServerSentEventFlux(answerAuditAgent.stream("", runnableConfig), threadId);
    }

恢复执行时的逻辑是,将用户审核的信息组装成InterruptionMetadata,然后添加到RunnableConfig中。

人工介入执行流程分析

这里简化一下整个代码的执行链路,重点看以下几个代码的位置

父agent的Gragh构建

这是SequentialAgent构建核心父子agent节点的逻辑,前面FlowAgent的公共构建逻辑这里不贴出来,其主要链接的逻辑如下:

在这里插入图片描述
在这里插入图片描述

SubAgent构建新的RunnableConfig

这里分别贴出SubCompiledGraphNode和BaseAgent.as_node在执行时的核心代码

public record SubCompiledGraphNodeAction(String nodeId, CompileConfig parentCompileConfig,
		CompiledGraph subGraph) implements AsyncNodeActionWithConfig, ResumableSubGraphAction {

	@Override
	public CompletableFuture<Map<String, Object>> apply(OverAllState state, RunnableConfig config) {
		final boolean resumeSubgraph = config.metadata(resumeSubGraphId(nodeId), new TypeRef<Boolean>() {
		}).orElse(false);

        //config是父agent(SequentialAgent)传递进来的RunnableConfig,此处使用父config的内容重新构建了一个新的RunnableConfig,并清除了
		//checkPointId和nextNode和context
		RunnableConfig subGraphRunnableConfig = RunnableConfig.builder(config).checkPointId(null).nextNode(null).build();
		subGraphRunnableConfig.clearContext();
		....
        
        //使用新构建的subGraphRunnableConfig执行此subAgent的核心逻辑
        var fluxStream = subGraph.graphResponseStream(state, subGraphRunnableConfig);
            
        ....

		return future;
	}
}

BaseAgent分两类

A2ANodeReactAgent

public AgentToSubCompiledGraphNodeAdapter(String nodeId, boolean includeContents, boolean returnReasoningContents,
				CompiledGraph childGraph, String instruction, CompileConfig parentCompileConfig) {
			this.nodeId = nodeId;
			this.includeContents = includeContents;
			this.returnReasoningContents = returnReasoningContents;
			this.instruction = instruction;
			this.childGraph = childGraph;
			this.parentCompileConfig = parentCompileConfig;
		}


		@Override
		public Map<String, Object> apply(OverAllState parentState, RunnableConfig config) throws Exception {
			final boolean resumeSubgraph = config.metadata(resumeSubGraphId(nodeId), new TypeRef<Boolean>() {}).orElse(false);

            //构建子agent的RunnableConfig
			RunnableConfig subGraphRunnableConfig = getSubGraphRunnableConfig(config);

            ...
                
            //使用新构建的subGraphRunnableConfig执行此subAgent的核心逻辑
			subGraphResult = childGraph.graphResponseStream(stateForChild, subGraphRunnableConfig);
			...
		}

		/**
		* 构建子agent的RunnableConfig
		**/
		private RunnableConfig getSubGraphRunnableConfig(RunnableConfig config) {
             //config是父agent(SequentialAgent)传递进来的RunnableConfig,此处使用父config的内容重新构建了一个新的RunnableConfig,并				//清除了checkPointId和nextNode和context
			RunnableConfig subGraphRunnableConfig = RunnableConfig.builder(config)
					.checkPointId(null)
					.nextNode(null)
					.addMetadata("_AGENT_", subGraphId(nodeId)) // subGraphId is the same as the name of the agent that created it
					.build();
			subGraphRunnableConfig.clearContext();
			
			...

			return subGraphRunnableConfig;
		}
	}
}
public class A2aNodeActionWithConfig implements NodeActionWithConfig {

	@Override
	public Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception {
        //构建子agent的RunnableConfig
		RunnableConfig subGraphRunnableConfig = getSubGraphRunnableConfig(config);
		if (streaming) {
			AsyncGenerator<NodeOutput> generator = createStreamingGenerator(state, subGraphRunnableConfig);
			// Convert AsyncGenerator to Flux using the new toFlux() method
			Flux<GraphResponse<NodeOutput>> flux = toFlux(generator);
			return Map.of(StringUtils.hasLength(this.outputKeyToParent) ? this.outputKeyToParent : "messages", flux);
		}
		else {
			String requestPayload = buildSendMessageRequest(state, subGraphRunnableConfig);
			String resultText = sendMessageToServer(this.agentCard, requestPayload);
			Map<String, Object> resultMap = autoDetectAndParseResponse(resultText);
			Map<String, Object> result = (Map<String, Object>) resultMap.get("result");
			String responseText = extractResponseText(result);
			return Map.of(this.outputKeyToParent, responseText);
		}
	}

	private RunnableConfig getSubGraphRunnableConfig(RunnableConfig config) {
		if (shareState) {
			return config;
		}
        
        //config是父agent(SequentialAgent)传递进来的RunnableConfig,此处使用父config的内容重新构建了一个新的RunnableConfig,并				//清除了checkPointId和nextNode
		return RunnableConfig.builder(config)
				.threadId(config.threadId()
						.map(threadId -> format("%s_%s", threadId, subGraphId()))
						.orElseGet(this::subGraphId))
				.nextNode(null)
				.checkPointId(null)
				.build();
	}
}

父RunnableConfig传递了metadata到子RunnableConfig

可以看到三种构建子图的方式,都是新构建了RunnableConfig,并使用了父RunnableConfig的配置。这个时候就会把父RunnableConfigmetadata传递到子RunnableConfig

public final class RunnableConfig implements HasMetadata<RunnableConfig.Builder> {
//RunnableConfig的builder
		Builder(RunnableConfig config) {
            //调用super构造参数赋值metadata
			super(requireNonNull(config, "config cannot be null!").metadata);
			this.threadId = config.threadId;
			this.checkPointId = config.checkPointId;
			this.nextNode = config.nextNode;
			this.streamMode = config.streamMode;
			this.store = config.store;
			this.context = new ConcurrentHashMap<>(config.context);
		}

		//
		protected Builder(Map<String, Object> metadata) {
			//RunnableConfig.Builder的super构造
			if (metadata != null && !metadata.isEmpty()) {
				this.metadata = new HashMap<>(metadata);
			}
		}
}

中断逻辑

public class NodeExecutor extends BaseGraphExecutor {

	/**
	 * Executes a node and handles its result.
	 * @param context the graph runner context
	 * @param resultValue the atomic reference to store the result value
	 * @return Flux of GraphResponse with node execution result
	 */
	private Flux<GraphResponse<NodeOutput>> executeNode(GraphRunnerContext context,
			AtomicReference<Object> resultValue) {
		try {
			context.setCurrentNodeId(context.getNextNodeId());
			String currentNodeId = context.getCurrentNodeId();
			AsyncNodeActionWithConfig action = context.getNodeAction(currentNodeId);

			if (action == null) {
				return Flux.just(GraphResponse.error(RunnableErrors.missingNode.exception(currentNodeId)));
			}

			if (action instanceof InterruptableAction) {
				//如果是中断节点
				context.getConfig().metadata(RunnableConfig.STATE_UPDATE_METADATA_KEY).ifPresent(updateFromFeedback -> {
					if (updateFromFeedback instanceof Map<?, ?>) {
						context.mergeIntoCurrentState((Map<String, Object>) updateFromFeedback);
					} else {
						throw new RuntimeException();
					}
				});
				//判断是否需要中断,调用的是InterruptableAction.interrupt方法,如果需要的话,会中断,会直接返回结束,并将InterruptionMetadata的内容返回
				Optional<InterruptionMetadata> interruptMetadata = ((InterruptableAction) action)
					.interrupt(currentNodeId, context.cloneState(context.getCurrentStateData()), context.getConfig());
				if (interruptMetadata.isPresent()) {
					resultValue.set(interruptMetadata.get());
					return Flux.just(GraphResponse.done(interruptMetadata.get()));
				}
			}

			context.doListeners(NODE_BEFORE, null);

            //执行node的核心逻辑
			CompletableFuture<Map<String, Object>> future = action.apply(context.getOverallState(),
					context.getConfig());

            //执行后的处理
			return Mono.fromFuture(future)
					.flatMapMany(updateState -> handleActionResult(context, updateState, resultValue))
					.onErrorResume(error -> {
						context.doListeners(ERROR, new Exception(error));
						return Flux.just(GraphResponse.error(error));
					});

		}
		catch (Exception e) {
			return Flux.just(GraphResponse.error(e));
		}
	}    
}
public class HumanInTheLoopHook extends ModelHook implements AsyncNodeActionWithConfig, InterruptableAction {

	/**
	 * 判断有无需要审核项和审核信息
	 *
	 * @param nodeId The identifier of the current node being processed.
	 * @param state The current state of the agent.
	 * @param config The runnable configuration.
	 * @return
	 */
	@Override
	public Optional<InterruptionMetadata> interrupt(String nodeId, OverAllState state, RunnableConfig config) {
		AssistantMessage lastMessage = getLastAssistantMessage(state);

		if (lastMessage == null || !lastMessage.hasToolCalls()) {
			return Optional.empty();
		}

		//判断是否有审核信息
		Optional<Object> feedback = config.metadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY);
		if (feedback.isPresent()) {
			if (!(feedback.get() instanceof InterruptionMetadata)) {
				throw new IllegalArgumentException("Human feedback metadata must be of type InterruptionMetadata.");
			}

			//校验是否有审核结果
			if (!validateFeedback((InterruptionMetadata) feedback.get(), lastMessage.getToolCalls())) {
				//没有审核结果的逻辑,则InterruptionMetadata并返回
				return buildInterruptionMetadata(state, lastMessage);
			}
			return Optional.empty();
		}

		// 2. If last message is AssistantMessage
		//构建InterruptionMetadata
		return buildInterruptionMetadata(state, lastMessage);
	}

    ///构建InterruptionMetadata
	private Optional<InterruptionMetadata> buildInterruptionMetadata(OverAllState state, AssistantMessage lastMessage) {
		//是否需要中断
		boolean needsInterruption = false;
		InterruptionMetadata.Builder builder = InterruptionMetadata.builder(Hook.getFullHookName(this), state);
		for (AssistantMessage.ToolCall toolCall : lastMessage.getToolCalls()) {
			//如果本次需要调用的tool中包含需要人工介入的工具类,则构建审核信息并判定为需要审核,然后返回
			if (approvalOn.containsKey(toolCall.name())) {
				ToolConfig toolConfig = approvalOn.get(toolCall.name());
				String description = toolConfig.getDescription();
				String content = "The AI is requesting to use the tool: " + toolCall.name() + ".\n"
						+ (description != null ? ("Description: " + description + "\n") : "")
						+ "With the following arguments: " + toolCall.arguments() + "\n"
						+ "Do you approve?";
				// TODO, create a designated tool metadata field in InterruptionMetadata?
				builder.addToolFeedback(ToolFeedback.builder().id(toolCall.id())
								.name(toolCall.name()).description(content).arguments(toolCall.arguments()).build())
						.build();
				needsInterruption = true;
			} else {
				builder.addToolsAutomaticallyApproved(toolCall);
			}
		}
		return needsInterruption ? Optional.of(builder.build()) : Optional.empty();
	}

	/**
	 * 校验审核结果
	 *
	 * @param feedback		审核结果
	 * @param toolCalls		最后一次信息的工具调用
	 * @return
	 */
    private boolean validateFeedback(InterruptionMetadata feedback, List<AssistantMessage.ToolCall> toolCalls) {
        if (feedback == null || feedback.toolFeedbacks() == null || feedback.toolFeedbacks().isEmpty()) {
            return false;
        }

        List<InterruptionMetadata.ToolFeedback> toolFeedbacks = feedback.toolFeedbacks();

        // 1. Tool calls in this step that actually require human approval (names defined in approvalOn)
		//上次会话中需要被拦截的工具
        List<AssistantMessage.ToolCall> toolCallsNeedingApproval = toolCalls.stream()
                .filter(tc -> approvalOn.containsKey(tc.name()))
                .toList();

        // If no tool calls in this step require human approval, validation is trivially satisfied
        if (toolCallsNeedingApproval.isEmpty()) {
            return true;
        }

        // 2. For each tool call requiring approval, ensure corresponding feedback exists and its result is non-null
        for (AssistantMessage.ToolCall call : toolCallsNeedingApproval) {
			//获取审核结果
            InterruptionMetadata.ToolFeedback matchedFeedback = toolFeedbacks.stream()
                    .filter(tf -> tf.getName().equals(call.name())
                            // Also validate id if ToolFeedback contains id field
                            && call.id().equals(tf.getId()))
                    .findFirst()
                    .orElse(null);

            if (matchedFeedback == null) {
                log.warn("Missing feedback for tool {} (id={}); waiting for human input.",
                        call.name(), call.id());
                return false;
            }

            // Ensure the feedback result is provided
			//只要审核结果有,就通过前置校验,到后置校验再校验具体结果
            if (matchedFeedback.getResult() == null) {
                log.warn("Feedback result for tool {} (id={}) is null; waiting for human input.",
                        call.name(), call.id());
                return false;
            }
        }

        // 3. Optional: log unexpected or extra feedback entries that do not match any pending approval tool
        for (InterruptionMetadata.ToolFeedback tf : toolFeedbacks) {
            boolean matched = toolCallsNeedingApproval.stream()
                    .anyMatch(call -> call.name().equals(tf.getName()) && call.id().equals(tf.getId()));
            if (!matched) {
                log.warn("Ignoring unexpected tool feedback: name={}, id={}", tf.getName(), tf.getId());
            }
        }







        return true;
    }
}

保存checkPoint

public class NodeExecutor extends BaseGraphExecutor {
	private Flux<GraphResponse<NodeOutput>> handleActionResult(GraphRunnerContext context,
			Map<String, Object> updateState, AtomicReference<Object> resultValue) {
		try {

 			...
			//保存checkPoint到saver
			NodeOutput output = context.buildNodeOutputAndAddCheckpoint(updateState);

            ...
            
			context.doListeners(NODE_AFTER, null);
			// Recursively call the main execution handler
			return Flux.just(GraphResponse.of(output))
				.concatWith(Flux.defer(() -> mainGraphExecutor.execute(context, resultValue)));
		}
		catch (Exception e) {
			return Flux.just(GraphResponse.error(e));
		}
	}
}



public class GraphRunnerContext {
	/**
	 * FIXME
	 * Below are duplicated methods. Need to have a unified way of streaming output
	 * to end user.
	 */
	public NodeOutput buildNodeOutputAndAddCheckpoint(Map<String, Object> updateStates) throws Exception {
		//添加checkpoint
		Optional<Checkpoint> cp = addCheckpoint(currentNodeId, nextNodeId);
		//构建NodeOutput
		return buildOutput(currentNodeId, updateStates, cp, false);
	}


	public Optional<Checkpoint> addCheckpoint(String nodeId, String nextNodeId) throws Exception {
		//是否有checkpoint的存储器
		if (compiledGraph.compileConfig.checkpointSaver().isPresent()) {
			//将当前节点、整体信息、下一节点存到checkpoint中
			var cp = Checkpoint.builder().nodeId(nodeId).state(cloneState(overallState.data())).nextNodeId(nextNodeId)
					.build();
			// Force checkPointId to null to ensure we append a new checkpoint instead of
			// replacing the current one
			//构建新的RunnableConfig
			RunnableConfig appendConfig = RunnableConfig.builder(config).checkPointId(null).build();
			//将checkpoint添加进存储器
			this.config = compiledGraph.compileConfig.checkpointSaver().get().put(appendConfig, cp);
			return Optional.of(cp);
		}
		return Optional.empty();
	}
    
    
}

中断恢复逻辑

	public GraphRunnerContext(OverAllState initialState, RunnableConfig config, CompiledGraph compiledGraph)
			throws Exception {
		this.compiledGraph = compiledGraph;
		this.config = config;

        //如果RunnableConfig的metadata中存在HITL的Key或者ckeckPointId不为空,则进入恢复执行逻辑
		if (config.metadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY).isPresent() || config.checkPointId().isPresent()) {
			initializeFromResume(initialState, config);
		} else {
            //否则进入start逻辑
			initializeFromStart(initialState, config);
		}
	}

	/**
	 * 恢复执行
	 *
	 * @param initialState
	 * @param config
	 */
	private void initializeFromResume(OverAllState initialState, RunnableConfig config) {
		log.trace("RESUME REQUEST");

        //从图中获取缓存saver
		var saver = compiledGraph.compileConfig.checkpointSaver()
				.orElseThrow(() -> new IllegalStateException("Resume request without a configured checkpoint saver!"));
		//从saver中获取checkpoint
		//先从存储中获取threadId对应的check point缓存,如果RunnableConfig有checkpoint id,则根据checkpoint id匹配,如果没有,则使用first checkpoint
		var checkpoint = saver.get(config)
				.orElseThrow(() -> new IllegalStateException("Resume request without a valid checkpoint!"));

		var startCheckpointNextNodeAction = compiledGraph.getNodeAction(checkpoint.getNextNodeId());
		if (startCheckpointNextNodeAction instanceof ResumableSubGraphAction resumableAction) {
			// RESUME FORM SUBGRAPH DETECTED
			this.config = RunnableConfig.builder(config)
					.checkPointId(null) // Reset checkpoint id
					.addMetadata(resumableAction.getResumeSubGraphId(), true) // add metadata for
					// sub graph
					.build();
			this.config.clearContext();
		} else {
			// Reset checkpoint id
			this.config = config.withCheckPointId(null);
		}

		this.currentNodeId = null;
        //使用checkPoint的nodeId作为下一个节点,可以理解为从之前中断的位置恢复执行
		this.nextNodeId = checkpoint.getNextNodeId();
        //将存储的内容添加到本次的overallState中,可以理解为将之前中断位置的内容恢复
		this.overallState = initialState.input(checkpoint.getState());
        //恢复节点
		this.resumeFrom = checkpoint.getNodeId();

		log.trace("RESUME FROM {}", checkpoint.getNodeId());
	}

从saver中获取checkPoint的逻辑

public class MemorySaver implements BaseCheckpointSaver {
final Map<String, LinkedList<Checkpoint>> _checkpointsByThread = new HashMap<>();
	@Override
	public final Optional<Checkpoint> get(RunnableConfig config) {

		try {
			return loadOrInitCheckpoints(config, checkpoints -> {
                //RunnableConfig中是否有checkPointId,有则匹配并返回
				if (config.checkPointId().isPresent()) {
					return config.checkPointId()
							.flatMap(id -> checkpoints.stream()
									.filter(checkpoint -> checkpoint.getId().equals(id))
									.findFirst());
				}
                
                //config中没有,则取存储的第一个
				return getLast(checkpoints, config);

			});
		}
		catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	protected final <T> T loadOrInitCheckpoints(RunnableConfig config,
			TryFunction<LinkedList<Checkpoint>, T, Exception> transformer) throws Exception {
		_lock.lock();
		try {
			var threadId = config.threadId().orElse(THREAD_ID_DEFAULT);
            //通过RunnableConfig中的threadId查询存储的checkPoint信息
			return transformer.tryApply(loadedCheckpoints(config, _checkpointsByThread.computeIfAbsent(threadId, k -> new LinkedList<>())));

		}
		finally {
			_lock.unlock();
		}
        
        	protected LinkedList<Checkpoint> loadedCheckpoints(RunnableConfig config, LinkedList<Checkpoint> checkpoints) throws Exception {
		return checkpoints;
	}
}

问题根源深度剖析

1. 核心链路:父→子 Agent 的 RunnableConfig 传递

父 Agent(SequentialAgent)执行时,会为每个子 Agent 创建新的RunnableConfig,但子 Config 会完整复制父 Config 的 Metadata(包括 HITL 相关的HUMAN_FEEDBACK_METADATA_KEY):

// RunnableConfig复制构造器核心逻辑
public final class RunnableConfig implements HasMetadata<RunnableConfig.Builder> {
    Builder(RunnableConfig config) {
        super(requireNonNull(config, "config cannot be null!").metadata); // 复制Metadata
        this.threadId = config.threadId;
        this.checkPointId = config.checkPointId;
        this.nextNode = config.nextNode;
        // 其他字段复制...
    }
}

子 Agent 创建新 Config 时(如 SubCompiledGraphNode、ReactAgent、A2aNode),仅清空checkPointId/nextNode,但保留父 Config 的 Metadata

// 子Agent构建新Config示例(SubCompiledGraphNode)
RunnableConfig subGraphRunnableConfig = RunnableConfig.builder(config)
        .checkPointId(null)
        .nextNode(null)
        .build();
subGraphRunnableConfig.clearContext();

2. 恢复执行的判断逻辑

GraphRunnerContext初始化时,若检测到 Config 中包含HUMAN_FEEDBACK_METADATA_KEY(人工审核标识)或checkPointId,则进入「恢复执行流程」:

public GraphRunnerContext(OverAllState initialState, RunnableConfig config, CompiledGraph compiledGraph) {
    if (config.metadata(HUMAN_FEEDBACK_METADATA_KEY).isPresent() || config.checkPointId().isPresent()) {
        initializeFromResume(initialState, config); // 恢复流程
    } else {
        initializeFromStart(initialState, config); // 全新执行
    }
}

恢复流程会从Saver中读取当前threadId对应的checkpoint

private void initializeFromResume(OverAllState initialState, RunnableConfig config) {
    var saver = compiledGraph.compileConfig.checkpointSaver()
            .orElseThrow(() -> new IllegalStateException("无Checkpoint Saver配置!"));
    // 关键:从Saver中获取checkpoint,获取不到则抛出核心异常
    var checkpoint = saver.get(config)
            .orElseThrow(() -> new IllegalStateException("Resume request without a valid checkpoint!"));
    // 后续恢复节点/状态逻辑...
}

3. 报错本质:Metadata「污染」子 Agent

当第一个子 Agent(critic agent)完成人工审核恢复执行后,父 Config 中的HUMAN_FEEDBACK_METADATA_KEY会被传递到第二个子 Agent(reviser agent)的 Config 中。

此时第二个子 Agent 初始化GraphRunnerContext时,会误判为「恢复执行」,但该子 Agent 的Saver中并无对应的checkpoint(因为它从未触发过中断 / 保存),最终抛出Resume request without a valid checkpoint!

核心总结

父 Agent 的 HITL Metadata 会传递给所有子 Agent → 非中断子 Agent 误进入恢复流程 → 无对应 checkpoint → 报错。

解决方案

核心思路

看源码可以发现,作者是在node执行的多个节点加上了listner的,而我们可以在父agent的Listener.after中加入判断,在每个子agent node执行完后,都清除一次父agent中的RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY,就不会导致恢复内容污染下一个子agent了

	/**
	 * Handles the action result and returns appropriate response.
	 * @param context the graph runner context
	 * @param updateState the updated state from the action
	 * @param resultValue the atomic reference to store the result value
	 * @return Flux of GraphResponse with action result handling
	 */
	private Flux<GraphResponse<NodeOutput>> handleActionResult(GraphRunnerContext context,
			Map<String, Object> updateState, AtomicReference<Object> resultValue) {
			...

			//保存checkPoint到saver
			NodeOutput output = context.buildNodeOutputAndAddCheckpoint(updateState);

        	//执行Listener.after
			context.doListeners(NODE_AFTER, null);
			
            ...
	}

关键说明

  • 监听器的after方法在每个节点执行完成后触发,精准清除子 Agent 对应的 HITL Metadata;
  • 子 Agent 执行时会重新创建RunnableConfig并清空checkPointId,无需额外处理;
  • 仅清除父 Agent 的 Metadata,不影响子 Agent 自身的执行逻辑。

思考:

1.使用AgentHook.after?

可不可以在子agent中加入AgentHook.afterRunnableConfig.HUMAN_FEEDBACK_METADATA_KEY进行清理?

答案是:不可以

因为父agent的RunnableConfig和子agent的RunnableConfig并不共享,而AgentHook.after只能对子agent的matadata进行操作,不能做到清空父agent中的RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY

2.使用AgentHook.before?

可不可以在子agent中加入AgentHook.before对RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY进行清理?

答案是:不可以

因为报错的位置是在初始化GraphRunnerContext的时候,而node的执行是依赖GraphRunnerContext的,所以报错的时候,AgentHook.before还没有执行

参考资料

  • Spring AI Alibaba 官方 Issue:https://github.com/alibaba/spring-ai-alibaba/issues/4466((我已将此方案回复在此issue下方)
  • Spring AI Alibaba 多 Agent 执行流程源码:com.alibaba.cloud.ai.graph包核心类(GraphRunnerContext、RunnableConfig、NodeExecutor等)
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐