1. 概述

AgentLlmNodeAgentToolNodeReactAgent 的两大核心节点,共同实现了 ReAct (Reasoning + Acting) 执行范式:

节点 角色 职责 接口
AgentLlmNode 推理节点 (Reasoning) 消息组装、模板渲染、拦截器链调用、LLM 模型调用、流式/非流式响应处理 NodeActionWithConfig
AgentToolNode 行动节点 (Acting) 工具解析、拦截器链调用、同步/异步工具执行、并行调度、状态收集与合并 NodeActionWithConfig

ReactAgent 中的位置:

START → beforeAgent Hooks → beforeModel Hooks → [AgentLlmNode] → afterModel Hooks
   ↑                                                         ↓
   │                                              (有工具调用?)
   │                                                    ↓ Yes
   └──────────────────── [AgentToolNode] ←──────────────┘
                                ↓ (无工具调用 / return_direct)
                          afterAgent Hooks → END

两者均实现 NodeActionWithConfig 接口:

Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception

接收共享状态 OverAllState 和运行配置 RunnableConfig,返回部分状态更新 Map,由图引擎根据 KeyStrategy 合并返回全局状态。

1.1 AgentLlmNode

负责【调用大语言模型】的核心执行节点,实现 NodeActionWithConfig 接口,作为图工作流的可执行节点,接收状态 + 配置并执行 LLM 推理。

核心依赖:

public class AgentLlmNode implements NodeActionWithConfig {

	// 日志对象,用于节点执行、模型调用、异常等日志打印
	private static final Logger logger = LoggerFactory.getLogger(AgentLlmNode.class);

	// 模型迭代次数字段KEY(存储在运行配置上下文,用于记录工具多轮调用/推理轮次)
	public static final String MODEL_ITERATION_KEY = "_MODEL_ITERATION_";

	// ==================== 基础配置字段 ====================
	// 智能体名称(标识当前LLM节点所属的智能体,用于日志、监控区分)
	private String agentName;

	// Spring AI 顾问列表,用于增强LLM请求处理逻辑(如上下文管理、参数增强)
	private List<Advisor> advisors = new ArrayList<>();

	// FIXME: 工具回调集合应仅在 chatOptions 中统一管理
	//  当前通过不可变列表保证不可变性,后续需重构为仅依赖chatOptions管理工具
	// 智能体可调用的工具回调列表(LLM可调用的外部工具集合)
	private List<ToolCallback> toolCallbacks = new ArrayList<>();

	// 模型调用拦截器列表,支持在LLM调用前后做扩展处理(如参数修改、日志、重试、工具选择)
	private List<ModelInterceptor> modelInterceptors = new ArrayList<>();

	// 输出结果存储的状态KEY(默认值:messages,用于指定LLM响应写入全局状态的字段)
	private String outputKey;

	// 输出格式约束(JSON Schema/结构化格式,强制LLM按指定格式返回结果)
	private String outputSchema;

	// Spring AI 核心对话客户端,真正执行大模型调用的底层组件
	private ChatClient chatClient;

	// 系统提示词(LLM的角色定义、行为约束、核心指令)
	private String systemPrompt;

	// 提示词模板渲染器,用于渲染动态变量的提示词模板
	private TemplateRenderer templateRenderer;

	// 智能体执行指令(补充性指令,与systemPrompt配合使用)
	private String instruction;

	// LLM调用配置(温度、top_p、最大token、工具调用开关等模型参数)
	private ToolCallingChatOptions chatOptions;

	// 是否开启推理日志(开启后会打印详细的LLM推理过程、输入输出、工具调用日志,用于调试)
	private boolean enableReasoningLog;

完整配置构建示例:

AgentLlmNode llmNode = AgentLlmNode.builder()
    .agentName(name)                          // Agent 名称
    .chatOptions(effectiveOptions)            // 合并后的 ChatOptions
    .chatClient(chatClient)                   // ChatClient 实例
    .outputKey(outputKey)                     // 可选,自定义输出键
    .systemPrompt(systemPrompt)               // 系统提示词
    .templateRenderer(templateRenderer)       // 模板渲染器
    .instruction(instruction)                 // Agent 指令
    .outputSchema(outputSchema)               // 输出格式约束
    .toolCallbacks(unmodifiableList(allTools)) // 工具列表(不可变)
    .enableReasoningLog(enableLogging)        // 推理日志开关
    .build();

1.2 AgentToolNode

执行助手消息中工具调用的节点。

核心特性:

  • 支持串行与并行两种工具执行模式。开启并行执行后,多个工具可在配置限制内并发运行。
  • 自动识别实现 AsyncToolCallback 的异步工具,并以异步方式执行,同时提供完善的超时机制处理。

核心依赖:

public class AgentToolNode implements NodeActionWithConfig {

	/** 日志记录器,用于工具执行、异常、运行日志输出 */
	private static final Logger logger = LoggerFactory.getLogger(AgentToolNode.class);

	/** 当前工具节点所属智能体名称 */
	private final String agentName;

	/** 是否开启工具行为执行日志 */
	private final boolean enableActingLog;

	/** 是否开启工具并行执行能力 */
	private final boolean parallelToolExecution;

	/** 最大并行执行工具数量上限 */
	private final int maxParallelTools;

	/** 工具全局执行超时时间 */
	private final Duration toolExecutionTimeout;

	/** 是否将同步工具包装为异步统一执行 */
	private final boolean wrapSyncToolsAsAsync;

	/** 已注册的工具回调实现集合 */
	private List<ToolCallback> toolCallbacks;

	/** 工具执行上下文,用于跨工具传递共享参数 */
	private Map<String, Object> toolContext;

	/** 工具调用拦截器集合,用于工具执行前后自定义扩展逻辑 */
	private List<ToolInterceptor> toolInterceptors = new ArrayList<>();

	/** 工具解析器:根据工具名称动态匹配、查找对应工具实现 */
	private ToolCallbackResolver toolCallbackResolver;

	/** 工具执行异常处理器:统一捕获、处理、包装工具运行时异常 */
	private ToolExecutionExceptionProcessor toolExecutionExceptionProcessor;

}

完整配置构建示例:

AgentToolNode toolNode = AgentToolNode.builder()
    .agentName(name)                                   // Agent 名称
    .parallelToolExecution(parallelToolExecution)       // 是否并行执行
    .maxParallelTools(maxParallelTools)                 // 最大并行数(默认5)
    .toolExecutionTimeout(toolExecutionTimeout)         // 工具超时(默认5分钟)
    .wrapSyncToolsAsAsync(wrapSyncToolsAsAsync)         // 同步工具自动包装为异步
    .toolCallbackResolver(resolver)                     // 工具解析器
    .toolCallbacks(allTools)                            // 工具列表
    .enableActingLog(enableLogging)                     // 行动日志开关
    .toolExecutionExceptionProcessor(processor)         // 异常处理器
    .toolContext(toolContext)                            // 工具上下文
    .build();

Builder 默认值:

参数 默认值 说明
parallelToolExecution false 默认串行执行
maxParallelTools 5 并行上限,防止资源耗尽
toolExecutionTimeout 5 minutes 单个工具执行超时
wrapSyncToolsAsAsync false 默认不包装同步工具
toolExecutionExceptionProcessor DefaultToolExecutionExceptionProcessor(alwaysThrow=false) 异常时不抛出,返回错误消息

2. 加载流程

2.1 构建入口

两个节点的构建统一在 DefaultBuilder.build() 中完成,流程如下:

DefaultBuilder.build()
  │
  ├─ 1. 参数校验(name 非空,chatClient/model 至少提供一个)
  │
  ├─ 2. 合并 ChatOptions
  │     mergeSourceOptionsWithAgentOptions(sourceOptions, agentOptions)
  │     → agentOptions 覆盖 sourceOptions 同名字段
  │
  ├─ 3. 构建 ChatClient
  │     - 若无 chatClient:ChatClient.builder(model).defaultOptions(effectiveOptions).build()
  │     - 若有 chatClient:chatClient.mutate().defaultOptions(effectiveOptions).build()
  │
  ├─ 4. 拦截器分类:separateInterceptorsByType()
  │     - 统一 Interceptor 列表 → 按类型分为 ModelInterceptor[] 和 ToolInterceptor[]
  │
  ├─ 5. 工具收集:gatherLocalTools()
  │     - 按优先级合并:Hook 工具 > Interceptor 工具 > 用户工具
  │     - 来源:直接 tools / ToolCallbackProvider / toolNames 解析 / resolver 反射提取
  │
  ├─ 6. 构建 AgentLlmNode(详见 2.2)
  │
  ├─ 7. 构建 AgentToolNode(详见 2.3)
  │
  └─ 8. 创建 ReactAgent(llmNode, toolNode, compileConfig, builder)
        → 注入拦截器到节点
        → 设置 hasTools 标志

2.2 构建 AgentLlmNode

DefaultBuilder.build() 中初始化 AgentLlmNode 建造者,依次设置参数:

// 初始化 AgentLlmNode 建造者,配置基础核心参数
AgentLlmNode.Builder llmNodeBuilder = AgentLlmNode.builder()
		.agentName(this.name)           // 设置智能体名称
		.chatOptions(effectiveOptions)   // 设置生效的大模型调用配置
		.chatClient(chatClient);         // 设置Spring AI对话客户端

// 条件配置:输出结果存储的状态Key(非空时设置)
if (outputKey != null && !outputKey.isEmpty()) {
	llmNodeBuilder.outputKey(outputKey);
}

// 条件配置:系统提示词(非空时设置)
if (systemPrompt != null) {
	llmNodeBuilder.systemPrompt(systemPrompt);
}

// 条件配置:提示词模板渲染器(非空时设置)
if (templateRenderer != null) {
	llmNodeBuilder.templateRenderer(templateRenderer);
}

// 条件配置:智能体执行指令(非空时设置)
if (instruction != null) {
	llmNodeBuilder.instruction(instruction);
}

// 初始化输出格式约束Schema
String outputSchema = null;
// 优先使用自定义的输出Schema
if (StringUtils.hasLength(this.outputSchema)) {
	outputSchema = this.outputSchema;
} 
// 无自定义Schema时,根据输出类型自动生成格式约束
else if (this.outputType != null) {
	FormatProvider formatProvider = new BeanOutputConverter<>(this.outputType);
	outputSchema = formatProvider.getFormat();
}

// 条件配置:输出格式Schema(非空时设置,强制LLM按格式返回)
if (StringUtils.hasLength(outputSchema)) {
	llmNodeBuilder.outputSchema(outputSchema);
}

// 条件配置:工具回调列表(存在可用工具时,设置不可变列表保证线程安全)
if (CollectionUtils.isNotEmpty(allTools)) {
	llmNodeBuilder.toolCallbacks(Collections.unmodifiableList(allTools));
}

// 条件配置:开启推理日志(日志开关开启时启用)
if (enableLogging) {
	llmNodeBuilder.enableReasoningLog(true);
}

// 构建最终的 AgentLlmNode 实例
AgentLlmNode llmNode = llmNodeBuilder.build();

构造函数:

	public AgentLlmNode(Builder builder) {
		this.agentName = builder.agentName;
		this.outputKey = builder.outputKey;
		this.outputSchema = builder.outputSchema;
		this.systemPrompt = builder.systemPrompt;
		this.instruction = builder.instruction;
		this.templateRenderer = builder.templateRenderer;
		if (builder.advisors != null) {
			this.advisors = builder.advisors;
		}
		if (builder.toolCallbacks != null) {
			this.toolCallbacks = builder.toolCallbacks;
		}
		if (builder.modelInterceptors != null) {
			this.modelInterceptors = builder.modelInterceptors;
		}
		this.chatClient = builder.chatClient;
		this.chatOptions = buildChatOptions(builder.chatOptions, this.toolCallbacks);
		this.enableReasoningLog = builder.enableReasoningLog;
	}

其中 buildChatOptions() 是构建阶段的核心方法,合并工具回调与聊天配置,构建最终的工具调用聊天参数

  1. 若原始聊天配置为空且无工具回调,返回 null
  2. 若原始配置是 ToolCallingChatOptions 类型,合并工具回调(传入的 toolCallbacks 优先级更高)
  3. 若原始配置类型不匹配,打印警告并创建新的配置对象
/**
 * 合并工具回调与聊天配置,构建最终的工具调用聊天参数
 * @param chatOptions 原始的聊天配置参数
 * @param toolCallbacks 需要集成的工具回调集合
 * @return 合并完成的工具调用聊天配置
 */
@Nullable
private ToolCallingChatOptions buildChatOptions(ChatOptions chatOptions, List<ToolCallback> toolCallbacks) {
    // 场景1:无任何配置和工具,直接返回null
    if (chatOptions == null && (toolCallbacks == null || toolCallbacks.isEmpty())) {
        return null;
    }

    // 场景2:存在原始聊天配置,执行配置合并逻辑
    if (chatOptions != null) {
        // 判断配置是否为支持工具调用的类型
        if (chatOptions instanceof ToolCallingChatOptions builderToolCallingOptions) {
            // 复制原始配置,避免修改原对象
            ToolCallingChatOptions copiedOptions = builderToolCallingOptions.copy();

            // 初始化合并列表:优先使用传入的 toolCallbacks
            List<ToolCallback> mergedToolCallbacks = new ArrayList<>(toolCallbacks);
            // 遍历原始配置中的工具,将未重复的工具补充到合并列表(保证工具不重复)
            for (ToolCallback callback : builderToolCallingOptions.getToolCallbacks()) {
                boolean exists = mergedToolCallbacks.stream()
                        .anyMatch(tc -> tc.getToolDefinition().name().equals(callback.getToolDefinition().name()));
                if (!exists) {
                    mergedToolCallbacks.add(callback);
                }
            }

            // 设置合并后的工具列表
            copiedOptions.setToolCallbacks(mergedToolCallbacks);
            // 强制禁用Spring AI内部工具执行,由框架统一管理工具调用流程
            copiedOptions.setInternalToolExecutionEnabled(false);
            return copiedOptions;
        } else {
            // 配置类型不匹配,打印警告日志,原始配置不生效
            logger.warn("提供的聊天配置并非 ToolCallingChatOptions 类型(实际类型:{}),该配置不生效。将使用工具回调创建新配置。",
                    chatOptions.getClass().getName());
        }
    }

    // 场景3:无有效原始配置,直接构建新的工具调用配置
    return ToolCallingChatOptions.builder()
            .toolCallbacks(toolCallbacks)
            // 强制禁用内部工具执行
            .internalToolExecutionEnabled(false)
            .build();
}

其中 internalToolExecutionEnabled(false) 确保工具执行由 AgentToolNode 接管,而非 Spring AI 框架内部自动执行。这是 ReAct 模式的关键,模型只决策调用哪些工具,实际执行由工具节点完成。

/**
 * 标识工具执行的责任主体:
 * 是由 {@link ChatModel} 模型侧执行模型请求的工具,
 * 还是由调用方(框架/业务层)直接执行工具。
 */
Builder internalToolExecutionEnabled(@Nullable Boolean internalToolExecutionEnabled);

2.3 构建 AgentToolNode

DefaultBuilder.build() 中初始化 AgentToolNode 建造者,依次设置参数:

// 使用所有可用工具初始化工具执行节点
AgentToolNode toolNode;
// 构建 AgentToolNode 建造者,配置核心工具执行参数
AgentToolNode.Builder toolBuilder = AgentToolNode.builder()
		.agentName(this.name)  // 设置智能体名称
		.parallelToolExecution(this.parallelToolExecution)  // 配置是否开启工具并行执行
		.maxParallelTools(this.maxParallelTools)  // 配置最大并行执行工具数量
		.toolExecutionTimeout(this.toolExecutionTimeout)  // 配置工具执行超时时间
		.wrapSyncToolsAsAsync(this.wrapSyncToolsAsAsync);  // 配置是否将同步工具包装为异步执行

// 条件配置:工具回调解析器(非空时注入)
if (resolver != null) {
	toolBuilder.toolCallbackResolver(resolver);
}
// 条件配置:注册所有可用工具(工具列表非空时设置)
if (CollectionUtils.isNotEmpty(allTools)) {
	toolBuilder.toolCallbacks(allTools);
}

// 条件配置:开启工具执行日志(日志开关开启时启用)
if (enableLogging) {
	toolBuilder.enableActingLog(true);
}

// 配置工具执行异常处理器:无自定义处理器时,使用默认异常处理器
if (toolExecutionExceptionProcessor == null) {
	toolBuilder.toolExecutionExceptionProcessor(DefaultToolExecutionExceptionProcessor.builder()
			.alwaysThrow(false)  // 默认不直接抛出异常
			.build());
} else {
	// 使用自定义的工具执行异常处理器
	toolBuilder.toolExecutionExceptionProcessor(toolExecutionExceptionProcessor);
}

// 条件配置:工具执行上下文(非空时注入共享上下文参数)
if (toolContext != null && !toolContext.isEmpty()) {
	toolBuilder.toolContext(toolContext);
}

// 构建最终的 AgentToolNode 工具执行节点实例
toolNode = toolBuilder.build();

构造函数:

	public AgentToolNode(Builder builder) {
		this.agentName = builder.agentName;
		this.enableActingLog = builder.enableActingLog;
		this.toolCallbackResolver = builder.toolCallbackResolver;
		this.toolCallbacks = builder.toolCallbacks;
		this.toolContext = builder.toolContext;
		this.toolExecutionExceptionProcessor = builder.toolExecutionExceptionProcessor;
		this.parallelToolExecution = builder.parallelToolExecution;
		this.maxParallelTools = builder.maxParallelTools;
		this.toolExecutionTimeout = builder.toolExecutionTimeout;
		this.wrapSyncToolsAsAsync = builder.wrapSyncToolsAsAsync;
	}

2.4 设置拦截器

DefaultBuilder.build() 执行完成后,进入到 ReactAgent 构造函数中,首先会将拦截器在中注入到节点:

// 为节点设置拦截器
// 从钩子中收集拦截器,并与当前已有的拦截器进行合并
List<ModelInterceptor> mergedModelInterceptors = collectAndMergeModelInterceptors();
List<ToolInterceptor> mergedToolInterceptors = collectAndMergeToolInterceptors();

// 若合并后的模型拦截器不为空,则将其设置到LLM节点中
if (mergedModelInterceptors != null && !mergedModelInterceptors.isEmpty()) {
	this.llmNode.setModelInterceptors(mergedModelInterceptors);
}
// 若合并后的工具拦截器不为空,则将其设置到工具节点中
if (mergedToolInterceptors != null && !mergedToolInterceptors.isEmpty()) {
	this.toolNode.setToolInterceptors(mergedToolInterceptors);
}

优先级规则: Agent 直接配置的拦截器优先于 Hook 贡献的拦截器。同名拦截器只保留 Agent 级别的。

2.5 hasTools 标识

最后给工具节点设置 hasTools 标识:

// 根据工具节点的工具回调列表,标记是否存在可用工具
hasTools = toolNode.getToolCallbacks() != null && !toolNode.getToolCallbacks().isEmpty();

3. 执行流程

3.1 初始化执行图

ReactAgent 第一次调用会编译状态图为 CompiledGraph ,在 getGraph() 方法中会进行状态图的初始化 initGraph()

	protected Optional<OverAllState> doInvoke(Map<String, Object> input, RunnableConfig runnableConfig) {
		CompiledGraph compiledGraph = getAndCompileGraph();
		return compiledGraph.invoke(input, buildNonStreamConfig(runnableConfig));
	}
	
	public synchronized CompiledGraph getAndCompileGraph() {
		if (compiledGraph != null) {
			return compiledGraph;
		}

		StateGraph graph = getGraph();
		try {
			if (this.compileConfig == null) {
				this.compiledGraph = graph.compile();
			}
			else {
				this.compiledGraph = graph.compile(this.compileConfig);
			}
		} catch (GraphStateException e) {
			throw new RuntimeException(e);
		}
		return this.compiledGraph;
	}
	
	public StateGraph getGraph() {
		if (this.graph == null) {
			try {
				this.graph = initGraph();
			}
			catch (GraphStateException e) {
				throw new RuntimeException(e);
			}
		}
		return this.graph;
	}

3.1.1 图添加 LLM、Tool 节点

initGraph() 反复方法中会给状态图添加 LLMTool 节点

// 1. 向 StateGraph 中添加 LLM 核心节点
// 节点名称:AGENT_MODEL_NAME,将同步 llmNode 包装为异步节点
graph.addNode(AGENT_MODEL_NAME, node_async(this.llmNode));

// 2. 如果当前 Agent 包含工具,添加 Tool 执行节点
if (hasTools) {
    graph.addNode(AGENT_TOOL_NAME, node_async(this.toolNode));
}

3.1.2 给钩子注入工具

为实现 ToolInjection 接口的钩子初始化并注入工具:

// 为生效的钩子组件配置工具关联
setupToolsForHooks(effectiveHooks, toolNode);

工具从 AgentToolNode 中进行匹配

	/**
	 * 为实现ToolInjection接口的钩子初始化并注入工具
	 * 仅注入与钩子要求的工具名称/类型相匹配的工具
	 *
	 * @param hooks 钩子列表
	 * @param toolNode 承载可用工具的代理工具节点
	 */
	private void setupToolsForHooks(List<? extends Hook> hooks, AgentToolNode toolNode) {
		// 校验参数:钩子集合或工具节点为空,直接退出
		if (hooks == null || hooks.isEmpty() || toolNode == null) {
			return;
		}

		// 获取工具节点中所有可用的工具回调
		List<ToolCallback> availableTools = toolNode.getToolCallbacks();
		// 无可用工具时直接退出
		if (availableTools == null || availableTools.isEmpty()) {
			return;
		}

		// 遍历所有钩子,匹配并注入工具
		for (Hook hook : hooks) {
			// 仅处理支持工具注入的钩子(实现ToolInjection接口)
			if (hook instanceof ToolInjection toolInjection) {
				// 根据钩子配置查找匹配的工具
				ToolCallback toolToInject = findToolForHook(toolInjection, availableTools);
				// 找到匹配工具后,执行注入逻辑
				if (toolToInject != null) {
					toolInjection.injectTool(toolToInject);
				}
			}
		}
	}

3.2 AgentLlmNode 执行流程

3.2.1 执行入口

执行到 AgentLlmNode 节点时,会进入其 apply 方法,核心逻辑:

  1. 处理对话上下文
  2. 构建模型请求
  3. 调用LLM(流式/非流式)
  4. 返回更新后的全局状态
/**
 * 图节点执行入口方法
 * @param state 全局状态(存储对话历史、输入等数据)
 * @param config 运行配置(线程ID、元数据、上下文参数)
 * @return 更新后的全局状态数据
 * @throws Exception 执行过程中抛出的异常
 */
@Override
public Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception {
	// 开启推理日志时,打印智能体开始推理的调试日志
	if (enableReasoningLog && logger.isDebugEnabled()) {
		logger.debug("[ThreadId {}] Agent {} start reasoning.", config.threadId()
				.orElse(THREAD_ID_DEFAULT), agentName);
	}

	// ==================== 1. 管理模型迭代计数器(记录工具多轮调用次数) ====================
	final AtomicInteger iterations;
	// 上下文无迭代计数器,初始化并放入配置上下文
	if (!config.context().containsKey(MODEL_ITERATION_KEY)) {
		iterations = new AtomicInteger(0);
		config.context().put(MODEL_ITERATION_KEY, iterations);
	} else {
		// 已存在计数器,迭代次数+1
		iterations = (AtomicInteger) config.context().get(MODEL_ITERATION_KEY);
		iterations.incrementAndGet();
	}

	// ==================== 2. 加载并处理对话消息上下文 ====================
	List<Message> messages = new ArrayList<>();
	// 全局状态无历史消息,从input获取用户输入
	if (state.value("messages").isEmpty()) {
		// 智能体作为图节点时,input是最常用的输入字段
		if (state.value("input").isPresent()) {
			messages.add(new UserMessage(state.value("input").get().toString()));
		} else {
			// 无指令/输入时抛出异常
			throw new IllegalArgumentException("Either 'instruction' or 'includeContents' must be set for Agent.");
		}
	} else {
		// 读取全局状态中的历史对话消息
		messages = (List<Message>) state.value("messages").get();
	}

	// 为用户消息追加输出格式约束(outputSchema)
	augmentUserMessage(messages, outputSchema);
	// 渲染消息中的模板变量(替换动态参数)
	renderTemplatedUserMessage(messages, state.data(), config.metadata());

	// ==================== 3. 构建模型请求对象 ModelRequest ====================
	// 将全局状态和配置元数据合并为上下文,供拦截器读取使用
	Map<String, Object> contextMap = new HashMap<>(state.data());
	Map<String, Object> metadata = config.metadata().orElse(new HashMap<>());
	if (!metadata.isEmpty()) {
		contextMap.putAll(metadata);
	}
	// 构建请求:消息、模型配置、上下文
	ModelRequest.Builder requestBuilder = ModelRequest.builder()
			.messages(messages)
			.options(this.chatOptions != null ? this.chatOptions.copy() : null)
			.context(contextMap);

	// 从工具回调中提取工具名称和描述,注入模型请求
	if (toolCallbacks != null && !toolCallbacks.isEmpty()) {
		List<String> toolNames = new ArrayList<>();
		Map<String, String> toolDescriptions = new HashMap<>();
		for (ToolCallback callback : toolCallbacks) {
			String name = callback.getToolDefinition().name();
			String description = callback.getToolDefinition().description();
			toolNames.add(name);
			if (description != null && !description.isEmpty()) {
				toolDescriptions.put(name, description);
			}
		}
		requestBuilder.tools(toolNames);
		requestBuilder.toolDescriptions(toolDescriptions);
	}

	// 注入系统提示词
	if (StringUtils.hasLength(this.systemPrompt)) {
		requestBuilder.systemMessage(new SystemMessage(this.systemPrompt));
	}

	// 构建最终的模型请求对象
	ModelRequest modelRequest = requestBuilder.build();

	// ==================== 4. 流式/非流式 调用分支 ====================
	// 从配置中获取流式开关,默认开启流式
	boolean stream = config.metadata("_stream_", new TypeRef<Boolean>(){}).orElse(true);
	if (stream) {
		// ==================== 流式调用模式 ====================
		// 创建基础调用处理器:真正执行流式模型调用
		ModelCallHandler baseHandler = request -> {
			try {
				// 打印系统提示词日志
				if (enableReasoningLog) {
					String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
					if (logger.isDebugEnabled()) {
						logger.debug("[ThreadId {}] Agent {} reasoning with system prompt: {}", config.threadId()
								.orElse(THREAD_ID_DEFAULT), agentName, systemPrompt);
					}
				}
				// 构建请求并执行流式调用,获取响应流
				Flux<ChatResponse> chatResponseFlux = buildChatClientRequestSpec(request, config).stream().chatResponse();
				// 开启日志时,打印流式输出结果
				if (enableReasoningLog) {
					chatResponseFlux = chatResponseFlux.doOnNext(chatResponse -> {
						if (chatResponse != null && chatResponse.getResult() != null && chatResponse.getResult().getOutput() != null) {
							// 区分工具调用/普通文本输出日志
							if (chatResponse.getResult().getOutput().hasToolCalls()) {
								logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
										config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult().getOutput().getToolCalls());
							} else {
								logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
										config.threadId()
												.orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult()
												.getOutput().getText());
							}
						}
					});
				}
				return ModelResponse.of(chatResponseFlux);
			} catch (Exception e) {
				// 捕获流式调用异常,返回错误消息
				logger.error("Exception during streaming model call: ", e);
				return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
			}
		};

		// 组装模型拦截器链
		ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
				modelInterceptors, baseHandler);

		// 执行拦截器链+模型调用
		ModelResponse modelResponse = chainedHandler.call(modelRequest);
		// 返回结果:自定义outputKey或默认messages
		return Map.of(StringUtils.hasLength(this.outputKey) ? this.outputKey : "messages", modelResponse.getMessage());
	} else {
		// ==================== 非流式(同步)调用模式 ====================
		// 创建基础调用处理器:真正执行同步模型调用
		ModelCallHandler baseHandler = request -> {
			try {
				// 打印推理日志
				if (enableReasoningLog) {
					String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
					logger.info("[ThreadId {}] Agent {} reasoning round {} with system prompt: {}.", config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), systemPrompt);
				}

				// 构建请求并执行同步调用
				ChatResponse response = buildChatClientRequestSpec(request, config).call().chatResponse();

				// 处理模型响应,空响应默认赋值
				AssistantMessage responseMessage = new AssistantMessage("Empty response from model for unknown reason");
				if (response != null && response.getResult() != null) {
					responseMessage = response.getResult().getOutput();
				}

				// 打印响应结果日志
				if (enableReasoningLog) {
					logger.info("[ThreadId {}] Agent {} reasoning round {} returned: {}.", config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), responseMessage);
				}

				return ModelResponse.of(responseMessage, response);
			} catch (Exception e) {
				// 捕获调用异常,返回错误消息
				logger.error("Exception during invoking model call: ", e);
				return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
			}
		};

		// 组装模型拦截器链
		ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
				modelInterceptors, baseHandler);

		// 打印拦截器链启动日志
		if (enableReasoningLog) {
			logger.info("[ThreadId {}] Agent {} reasoning round {} model chain has started.", config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get());
		}

		// 执行拦截器链+模型调用
		ModelResponse modelResponse = chainedHandler.call(modelRequest);
		// 统计Token使用量,无响应时使用空统计对象
		Usage tokenUsage = modelResponse.getChatResponse() != null ? modelResponse.getChatResponse().getMetadata()
				.getUsage() : new EmptyUsage();

		// 构建更新后的全局状态
		Map<String, Object> updatedState = new HashMap<>();
		updatedState.put("_TOKEN_USAGE_", tokenUsage);
		updatedState.put("messages", modelResponse.getMessage());
		// 配置自定义输出Key时,额外写入结果
		if (StringUtils.hasLength(this.outputKey)) {
			updatedState.put(this.outputKey, modelResponse.getMessage());
		}

		return updatedState;
	}
}

3.2.2 推理日志 & 迭代计数器管理

记录每轮推理轮次,开启日志时打印日志:

		if (enableReasoningLog && logger.isDebugEnabled()) {
			logger.debug("[ThreadId {}] Agent {} start reasoning.", config.threadId()
					.orElse(THREAD_ID_DEFAULT), agentName);
		}

config.context() 维护 _MODEL_ITERATION_ 原子迭代计数器,

		// Check and manage iteration counter
		final AtomicInteger iterations;
		if (!config.context().containsKey(MODEL_ITERATION_KEY)) {
			iterations = new AtomicInteger(0);
			config.context().put(MODEL_ITERATION_KEY, iterations);
		} else {
			iterations = (AtomicInteger) config.context().get(MODEL_ITERATION_KEY);
			iterations.incrementAndGet();
		}

3.2.3 会话消息获取

处理逻辑:

  1. 优先读取全局状态 messages 对话历史列表;
  2. 若无历史消息,则读取 input 输入内容并封装为 UserMessage
  3. 无任何有效输入时,直接抛出参数非法异常。
	List<Message> messages = new ArrayList<>();
	// 全局状态无历史消息,从input获取用户输入
	if (state.value("messages").isEmpty()) {
		// 智能体作为图节点时,input是最常用的输入字段
		if (state.value("input").isPresent()) {
			messages.add(new UserMessage(state.value("input").get().toString()));
		} else {
			// 无指令/输入时抛出异常
			throw new IllegalArgumentException("Either 'instruction' or 'includeContents' must be set for Agent.");
		}
	} else {
		// 读取全局状态中的历史对话消息
		messages = (List<Message>) state.value("messages").get();
	}

	// 为用户消息追加输出格式约束(outputSchema)
	augmentUserMessage(messages, outputSchema);
	// 渲染消息中的模板变量(替换动态参数)
	renderTemplatedUserMessage(messages, state.data(), config.metadata());

3.2.4 结构化输出增强

处理逻辑:

  • 消息增强:在最后一条 UserMessage/AgentInstructionMessage 末尾追加 outputSchema (避免重复追加,已包含则跳过)
  • 模板渲染:遍历消息列表,找到最后一个未渲染的 AgentInstructionMessage ,使用 TemplateRenderer 渲染模板变量,标记 rendered = true

augmentUserMessage 方法:

public void augmentUserMessage(List<Message> messages, String outputSchema) {
	// 无有效输出格式,直接返回
	if (!StringUtils.hasText(outputSchema)) {
		return;
	}

	// 倒序遍历消息列表,找到最后一条用户消息/指令消息
	for (int i = messages.size() - 1; i >= 0; i--) {
		Message message = messages.get(i);
		// 处理普通用户消息
		if (message instanceof UserMessage userMessage) {
			// 校验:避免重复追加输出格式
			if (!userMessage.getText().contains(outputSchema)) {
				// 拼接Schema并替换原消息
				messages.set(i, userMessage.mutate().text(userMessage.getText() + System.lineSeparator() + outputSchema).build());
			}
			break;
		}
		// 处理框架自定义指令消息
		if (message instanceof AgentInstructionMessage templatedUserMessage) {
			// 转义大括号,避免模板解析冲突
			String newOutputSchema = outputSchema.replace("{", "\\{").replace("}", "\\}");
			// 校验:避免重复追加输出格式
			if (!templatedUserMessage.getText().contains(newOutputSchema)) {
				messages.set(i, templatedUserMessage.mutate().text(templatedUserMessage.getText() + System.lineSeparator() + newOutputSchema).build());
			}
			break;
		}

		// 遍历到第一条仍无用户消息,直接新增一条用户消息存储Schema
		if (i == 0) {
			messages.add(new UserMessage(outputSchema));
		}
	}
}

renderTemplatedUserMessage 方法:

public void renderTemplatedUserMessage(List<Message> messages, Map<String, Object> params, Optional<Map<String, Object>> metadata) {
	// 预处理参数:过滤无效参数,格式化参数类型
	Map<String, Object> processedParams = new HashMap<>();
	if (params != null) {
		for (Map.Entry<String, Object> entry : params.entrySet()) {
			String key = entry.getKey();
			Object value = entry.getValue();
			
			// 排除messages关键字,避免循环引用
			if ("messages".equals(key)) {
				continue;
			}
			// 排除集合类型参数
			if (value instanceof List) {
				continue;
			}
			// 将消息对象转换为纯文本
			if (value instanceof Message message) {
				processedParams.put(key, message.getText());
			} else {
				// 其他类型参数直接保留
				processedParams.put(key, value);
			}
		}
	}

	// 倒序遍历,找到最后一条未渲染的指令消息进行模板渲染
	for (int i = messages.size() - 1; i >= 0; i--) {
		Message message = messages.get(i);
		// 仅渲染未执行过模板替换的 AgentInstructionMessage
		if (message instanceof AgentInstructionMessage instructionMessage && !instructionMessage.isRendered()) {
			// 渲染模板变量,标记为已渲染,替换原消息
			AgentInstructionMessage newMessage = instructionMessage.mutate()
					.text(renderPromptTemplate(instructionMessage.getText(), processedParams))
					.rendered(true)
					.build();
			messages.set(i, newMessage);
			break;
		}
	}
}

3.2.5 构建 ModelRequest

处理逻辑:

  1. 整合完整消息列表、拷贝模型配置、合并全局状态与运行元数据;
  2. 自动提取所有 ToolCallback 工具名称与描述,注入请求上下文;
  3. 挂载系统提示词,生成完整模型调用请求。
	// 将全局状态和配置元数据合并为上下文,供拦截器读取使用
	Map<String, Object> contextMap = new HashMap<>(state.data());
	Map<String, Object> metadata = config.metadata().orElse(new HashMap<>());
	if (!metadata.isEmpty()) {
		contextMap.putAll(metadata);
	}
	// 构建请求:消息、模型配置、上下文
	ModelRequest.Builder requestBuilder = ModelRequest.builder()
			.messages(messages)
			.options(this.chatOptions != null ? this.chatOptions.copy() : null)
			.context(contextMap);

	// 从工具回调中提取工具名称和描述,注入模型请求
	if (toolCallbacks != null && !toolCallbacks.isEmpty()) {
		List<String> toolNames = new ArrayList<>();
		Map<String, String> toolDescriptions = new HashMap<>();
		for (ToolCallback callback : toolCallbacks) {
			String name = callback.getToolDefinition().name();
			String description = callback.getToolDefinition().description();
			toolNames.add(name);
			if (description != null && !description.isEmpty()) {
				toolDescriptions.put(name, description);
			}
		}
		requestBuilder.tools(toolNames);
		requestBuilder.toolDescriptions(toolDescriptions);
	}

	// 注入系统提示词
	if (StringUtils.hasLength(this.systemPrompt)) {
		requestBuilder.systemMessage(new SystemMessage(this.systemPrompt));
	}

	// 构建最终的模型请求对象
	ModelRequest modelRequest = requestBuilder.build();

3.2.6 判断流式/非流式模式

默认为 true

boolean stream = config.metadata("_stream_", new TypeRef<Boolean>(){}).orElse(true);

3.2.7 流式执行

流式执行链路:

  • 构建底层流式模型调用处理器;
  • 通过 InterceptorChain 串联所有 ModelInterceptor 拦截器;
  • 执行拦截器链并发起流式 LLM 调用;
  • 最终返回以 outputKey 或默认 messagesKey 的流式消息结果。
// ==================== 流式调用分支 ====================
if (stream) {
    // 创建基础模型调用处理器:真正执行流式大模型调用的核心逻辑
    ModelCallHandler baseHandler = request -> {
        try {
            // 开启推理日志时,打印系统提示词调试信息
            if (enableReasoningLog) {
                String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
                if (logger.isDebugEnabled()) {
                    logger.debug("[ThreadId {}] Agent {} reasoning with system prompt: {}", config.threadId()
                            .orElse(THREAD_ID_DEFAULT), agentName, systemPrompt);
                }
            }
            // 构建模型请求并发起流式调用,获取响应流(Flux)
            Flux<ChatResponse> chatResponseFlux = buildChatClientRequestSpec(request, config).stream().chatResponse();

            // 开启推理日志时,监听流式响应,实时打印输出内容
            if (enableReasoningLog) {
                chatResponseFlux = chatResponseFlux.doOnNext(chatResponse -> {
                    // 校验响应非空,避免空指针
                    if (chatResponse != null && chatResponse.getResult() != null && chatResponse.getResult().getOutput() != null) {
                        // 区分输出类型:工具调用 / 普通文本
                        if (chatResponse.getResult().getOutput().hasToolCalls()) {
                            // 打印工具调用日志
                            logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
                                    config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult().getOutput().getToolCalls());
                        } else {
                            // 打印文本输出日志
                            logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
                                    config.threadId()
                                            .orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult()
                                            .getOutput().getText());
                        }
                    }
                });
            }
            // 封装流式响应并返回
            return ModelResponse.of(chatResponseFlux);
        } catch (Exception e) {
            // 捕获流式调用异常,打印错误日志并返回异常消息
            logger.error("Exception during streaming model call: ", e);
            return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
        }
    };

    // 构建拦截器链:将所有模型拦截器与基础调用处理器串联
    ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
            modelInterceptors, baseHandler);

    // 执行拦截器链 + 模型调用,获取最终响应
    ModelResponse modelResponse = chainedHandler.call(modelRequest);

    // 返回更新后的状态:使用自定义outputKey,无则使用默认messages
    return Map.of(StringUtils.hasLength(this.outputKey) ? this.outputKey : "messages", modelResponse.getMessage());
}

3.2.8 非流式执行

非流式执行链路:

  • 构建同步阻塞式模型调用处理器;
  • 串联模型拦截器,完整执行前置、后置扩展逻辑;
  • 同步获取 ChatResponse 完整响应,统计 Token 消耗;
  • 组装更新全局状态:令牌用量、模型回复、自定义输出字段,最终返回状态集合。
{
    // ==================== 非流式(同步)调用分支 ====================
    // 创建基础模型调用处理器:真正执行同步大模型调用的核心逻辑
    ModelCallHandler baseHandler = request -> {
        try {
            // 开启推理日志时,打印当前轮次的系统提示词信息
            if (enableReasoningLog) {
                String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
                logger.info("[ThreadId {}] Agent {} reasoning round {} with system prompt: {}.",
                        config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), systemPrompt);
            }

            // 构建模型请求并发起同步调用,获取完整响应结果
            ChatResponse response = buildChatClientRequestSpec(request, config).call().chatResponse();

            // 初始化默认响应消息,处理模型无返回的异常情况
            AssistantMessage responseMessage = new AssistantMessage("Empty response from model for unknown reason");
            // 校验响应非空,获取模型真正的输出消息
            if (response != null && response.getResult() != null) {
                responseMessage = response.getResult().getOutput();
            }

            // 开启推理日志时,打印模型返回的结果信息
            if (enableReasoningLog) {
                logger.info("[ThreadId {}] Agent {} reasoning round {} returned: {}.",
                        config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), responseMessage);
            }

            // 封装模型响应消息和原始响应对象并返回
            return ModelResponse.of(responseMessage, response);
        } catch (Exception e) {
            // 捕获同步调用异常,打印错误日志并返回异常提示消息
            logger.error("Exception during invoking model call: ", e);
            return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
        }
    };

    // 构建拦截器链:将所有注册的模型拦截器与基础调用处理器串联执行
    ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
            modelInterceptors, baseHandler);

    // 开启推理日志时,打印模型调用拦截器链启动日志
    if (enableReasoningLog) {
        logger.info("[ThreadId {}] Agent {} reasoning round {} model chain has started.",
                config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get());
    }

    // 执行完整的拦截器链 + 模型调用,获取最终处理后的模型响应
    ModelResponse modelResponse = chainedHandler.call(modelRequest);

    // 统计Token使用量:有响应则获取真实用量,无响应则使用空统计对象
    Usage tokenUsage = modelResponse.getChatResponse() != null ?
            modelResponse.getChatResponse().getMetadata().getUsage() : new EmptyUsage();

    // 构建更新后的全局状态集合
    Map<String, Object> updatedState = new HashMap<>();
    // 存入Token消耗统计数据
    updatedState.put("_TOKEN_USAGE_", tokenUsage);
    // 存入模型返回的助手消息,更新对话历史
    updatedState.put("messages", modelResponse.getMessage());
    // 配置了自定义输出键时,额外存入结果数据
    if (StringUtils.hasLength(this.outputKey)) {
        updatedState.put(this.outputKey, modelResponse.getMessage());
    }

    // 返回更新后的全局状态,驱动图工作流继续执行
    return updatedState;
}

其中 buildChatClientRequestSpec 是实际构建模型调用规格的关键方法:

  1. 执行 appendSystemPromptIfNeeded 处理消息

    • 将系统提示消息插入到消息列表的开头
    • 检测到多个系统提示消息时自动发出警告
  2. 执行 filterToolCallbacks 过滤工具回调

    • 优先从模型请求的配置中获取工具回调列表
    • 按照模型请求的工具名称列表完成工具过滤
    • 支持拦截器对可用工具集进行动态修改
  3. 处理动态工具回调

    • 将动态工具回调追加到过滤后的工具列表中
    • 把动态工具回调存入配置上下文,供 AgentToolNode 解析使用
  4. 构建 ChatClient 最终请求规范

    • 初始化基础请求,绑定消息列表与顾问拦截器
    • 存在自定义配置时:拷贝配置对象,设置过滤后的工具列表,强制禁用模型内部工具执行,绑定配置到请求
    • 无自定义配置时:检查 ChatClient 默认配置,无默认则创建新配置并绑定工具+禁用内部执行;有默认配置则更新工具并禁用内部执行;其他情况直接绑定工具列表
/**
 * 构建 ChatClient 最终请求规范
 * 整合消息、工具、模型配置,统一处理工具执行权限,是LLM调用的前置核心逻辑
 * @param modelRequest 模型请求对象(封装消息、工具、配置)
 * @param config 运行时配置
 * @return ChatClient 可执行的请求规范
 */
private ChatClient.ChatClientRequestSpec buildChatClientRequestSpec(ModelRequest modelRequest, RunnableConfig config) {
	// 1. 拼接系统提示词到消息列表(按需追加)
	List<Message> messages = appendSystemPromptIfNeeded(modelRequest);

	// 注意!如果ModelRequest中同时自定义了工具(ToolSelectionInterceptor)和配置选项,工具配置会覆盖选项中的工具调用设置
	// 2. 过滤出当前请求可用的工具回调列表
	List<ToolCallback> filteredToolCallbacks = filterToolCallbacks(modelRequest);

	// 3. 处理动态工具回调:追加动态工具,并存入配置上下文(供工具节点使用)
	if (!CollectionUtils.isEmpty(modelRequest.getDynamicToolCallbacks())) {
		filteredToolCallbacks.addAll(modelRequest.getDynamicToolCallbacks());
		// FIXME:使用 RunnableConfig 通过配置上下文将动态工具回调传递给工具节点(内部使用)
		config.context().put(RunnableConfig.DYNAMIC_TOOL_CALLBACKS_METADATA_KEY, modelRequest.getDynamicToolCallbacks());
	}

	// 4. 构建基础请求:绑定消息列表 + 拦截顾问
	var promptSpec = this.chatClient.prompt()
		.messages(messages)
		.advisors(this.advisors);

	// 5. 获取模型请求中的配置
	ToolCallingChatOptions requestOptions = modelRequest.getOptions();

	// 6. 存在自定义配置:拷贝配置、绑定过滤后的工具、强制禁用内部工具执行
	if (requestOptions != null) {
		ToolCallingChatOptions copiedOptions = requestOptions.copy();
		// 设置过滤后的工具列表
		copiedOptions.setToolCallbacks(filteredToolCallbacks);
		// 强制禁用模型内部工具执行,避免与Agent框架的工具管理逻辑冲突
		copiedOptions.setInternalToolExecutionEnabled(false);
		// 绑定配置到请求
		promptSpec.options(copiedOptions);
	}
	// 7. 无自定义配置:兜底处理,保证工具配置和执行权限正确
	else {
		// 检查是否为默认请求规范,获取底层默认配置
		if (promptSpec instanceof DefaultChatClient.DefaultChatClientRequestSpec defaultChatClientRequestSpec) {
			ChatOptions options = defaultChatClientRequestSpec.getChatOptions();
			// 场景1:无任何默认配置 → 创建新配置,绑定工具+禁用内部执行
			if (options == null) {
				options = ToolCallingChatOptions.builder()
					.toolCallbacks(filteredToolCallbacks)
					.internalToolExecutionEnabled(false)
					.build();
				defaultChatClientRequestSpec.options(options);
			}
			// 场景2:默认配置是工具调用配置 → 拷贝配置、更新工具、禁用内部执行
			else if (options instanceof ToolCallingChatOptions toolCallingChatOptions) {
				ToolCallingChatOptions copiedOptions = toolCallingChatOptions.copy();
				copiedOptions.setToolCallbacks(filteredToolCallbacks);
				copiedOptions.setInternalToolExecutionEnabled(false);
				defaultChatClientRequestSpec.options(copiedOptions);
			}
		}
		// 场景3:非默认请求规范 + 存在工具 → 直接绑定工具列表
		else if (!filteredToolCallbacks.isEmpty()) {
			promptSpec.tools(filteredToolCallbacks);
		}
	}

	// 8. 返回最终构建完成的请求规范
	return promptSpec;
}

filterToolCallbacks

/**
 * 根据 ModelRequest 中指定的工具,过滤工具回调列表
 * @param modelRequest 包含待过滤工具名称列表的模型请求对象
 * @return 匹配请求工具的过滤后工具回调列表
 */
private List<ToolCallback> filterToolCallbacks(ModelRequest modelRequest) {
    // 初始化最终返回的工具回调集合
    List<ToolCallback> toolCallbacks = new ArrayList<>();

    // 模型请求对象为空,直接使用当前节点默认的全部工具回调
    if (modelRequest == null) {
        toolCallbacks.addAll(this.toolCallbacks);
        return toolCallbacks;
    }

    // 优先使用:模型请求配置中携带的工具回调列表
    if (modelRequest.getOptions() != null && modelRequest.getOptions().getToolCallbacks() != null) {
        toolCallbacks.addAll(modelRequest.getOptions().getToolCallbacks());
    } else {
        // 无配置时不执行操作
        // 默认情况下 buildChatOptions() 会确保工具回调被初始化
        // 支持用户通过设置空列表来禁用所有工具
    }

    // 获取请求中指定的工具名称集合
    List<String> requestedTools = modelRequest.getTools();
    // 未指定具体工具,直接返回当前工具列表
    if (requestedTools == null || requestedTools.isEmpty()) {
        return toolCallbacks;
    }

    // 流式过滤:仅保留名称匹配的工具回调,封装为新列表返回
    return new ArrayList<>(toolCallbacks.stream()
            .filter(callback -> requestedTools.contains(callback.getToolDefinition().name()))
            .toList());
}

关键设计: filterToolCallbacks() 允许 ModelInterceptor 通过修改 ModelRequest.toolsModelRequest.options 来动态控制哪些工具对当前模型调用可见。这实现了工具选择(Tool Selection)能力。

3.3 AgentToolNode 执行流程

3.3.1 执行入口

如果大模型返回工具调用,会路由执行到 AgentToolNode 节点,进入其 apply 方法,核心逻辑:

  1. 获取消息上下文:从全局状态中读取对话消息,定位最后一条关键消息
  2. 消息类型判断
    • 助手消息:提取工具调用列表,根据配置选择并行 / 串行执行工具
    • 工具响应消息:处理未完成的部分工具响应
    • 其他类型:直接抛出非法状态异常
  3. 执行模式分发:支持多工具并行执行提升效率,单工具默认串行执行
  4. 日志记录:开启日志时打印工具执行的基础信息,便于调试追踪
/**
 * 工具节点核心执行方法:解析助手消息中的工具调用指令,分发执行工具
 * @param state 全局状态(包含对话消息、上下文数据)
 * @param config 运行配置(线程ID、元数据等)
 * @return 工具执行完成后的更新状态
 * @throws Exception 工具执行/状态处理异常
 */
@Override
public Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception {
	// 从全局状态中获取对话消息列表,无消息时直接抛出异常
	List<Message> messages = (List<Message>) state.value("messages").orElseThrow();
	// 获取消息列表中的最后一条消息(工具调用指令一定在最后一条助手消息中)
	Message lastMessage = messages.get(messages.size() - 1);

	// 场景1:最后一条消息是【助手消息】(包含工具调用请求)
	if (lastMessage instanceof AssistantMessage assistantMessage) {
		// 从助手消息中提取工具调用列表
		List<AssistantMessage.ToolCall> toolCalls = assistantMessage.getToolCalls();

		// 开启日志时,打印工具执行信息(线程ID、智能体名称、工具数量)
		if (enableActingLog) {
			logger.info("[ThreadId {}] Agent {} acting with {} tools.", config.threadId().orElse(THREAD_ID_DEFAULT),
					agentName, toolCalls.size());
		}

		// 根据配置选择执行模式:
		// 开启并行执行 + 工具数量>1 → 并行执行工具
		if (parallelToolExecution && toolCalls.size() > 1) {
			return executeToolCallsParallel(toolCalls, state, config);
		}
		// 否则 → 串行执行工具
		else {
			return executeToolCallsSequential(toolCalls, state, config);
		}
	}
	// 场景2:最后一条消息是【工具响应消息】(处理部分工具响应)
	else if (lastMessage instanceof ToolResponseMessage toolResponseMessage) {
		return handlePartialToolResponses(toolResponseMessage, messages, state, config);
	}
	// 异常场景:最后一条消息类型不合法,无法执行工具
	else {
		throw new IllegalStateException("Last message is neither an AssistantMessage nor a ToolResponseMessage");
	}
}

3.3.2 获取消息上下文

从全局状态中获取对话消息列表,无消息时直接抛出异常。获取消息列表中的最后一条消息(工具调用指令一定在最后一条助手消息中):


	List<Message> messages = (List<Message>) state.value("messages").orElseThrow();

	Message lastMessage = messages.get(messages.size() - 1);

3.3.3 串行执行 or 并行执行

满足以下两个条件时,进入并行执行,否则进行串行执行:

  • 配置开启了 parallelToolExecution(并行工具执行开关)
  • 工具调用数量 > 1(多个工具才需要并行)
		// 根据配置选择执行模式:
		// 开启并行执行 + 工具数量>1 → 并行执行工具
		if (parallelToolExecution && toolCalls.size() > 1) {
			return executeToolCallsParallel(toolCalls, state, config);
		}
		// 否则 → 串行执行工具
		else {
			return executeToolCallsSequential(toolCalls, state, config);
		}

并行 vs 串行状态合并差异:

维度 串行模式 并行模式
合并方式 Map.putAll() — last-write-wins ToolStateCollector.mergeAll() — 按 KeyStrategy
顺序确定性 确定(按 toolCalls 顺序) 确定索引顺序,但并行执行顺序不确定
APPEND 策略支持 不支持(被 putAll 覆盖) 支持(按索引顺序追加)
状态隔离 ConcurrentHashMap + 立即合并 ToolStateCollector + 延迟合并

3.3.4 串行执行

在串行执行 executeToolCallsSequential 方法中,会循环调用所有工具:

		for (AssistantMessage.ToolCall toolCall : toolCalls) {
			// Each tool gets its own isolated update map
			// If this tool times out, clear() only affects this map, not mergedUpdates
			Map<String, Object> toolSpecificUpdate = new ConcurrentHashMap<>();
			ToolCallResponse response = executeToolCallWithInterceptors(toolCall, state, config, toolSpecificUpdate,
					false);
			toolResponses.add(response.toToolResponse());
			returnDirect = shouldReturnDirect(toolCall, returnDirect, config);
			// Merge immediately - subsequent timeout clear() won't affect already-merged data
			mergedUpdates.putAll(toolSpecificUpdate);
		}

注意:串行、并行执行,最后都是调用 executeToolCallWithInterceptors 方法进行执行。

3.3.5 并行执行

executeToolCallsParallel 多工具调用并行执行方法(支持并发限流、超时、优雅取消、线程安全状态管理),包含了所有并行执行逻辑:

  • 将所有异步任务提交到线程池
  • 等待所有并行工具执行完成
  • 将所有结果写入到 OverAllState

注意事项:

  • 通过 state.snapShot() 创建的状态快照为浅拷贝,状态中的可变对象(如List/Map)是共享的。
  • 工具禁止直接修改从状态中获取的集合,必须通过 extraStateFromToolCall 写入更新。
  • 并行状态合并语义:
    • 并行模式使用 ToolStateCollector,严格遵循每个状态 Key 配置的 KeyStrategy 合并策略:

      • APPEND 策略:所有工具的结果会追加合并
      • REPLACE 策略:最后写入的结果覆盖(并行顺序不保证,非确定性)
    • 与串行模式区别:串行使用 Map.putAll() 简单覆盖,并行支持高级合并策略,保证并发安全。

处理逻辑:

/**
 * 【核心底层方法】多工具调用并行执行(支持并发限流、超时、优雅取消、线程安全状态管理)
 * @param toolCalls LLM 下发的工具调用列表
 * @param state Agent 全局状态
 * @param config 运行时配置
 * @return 所有工具执行后的合并状态
 */
private Map<String, Object> executeToolCallsParallel(List<AssistantMessage.ToolCall> toolCalls, OverAllState state,
			RunnableConfig config) {

	// ===================== 1. 并行模式兼容提示 =====================
	// 并行模式下:忽略「同步转异步」配置,避免线程池饥饿/死锁(核心安全设计)
	if (wrapSyncToolsAsAsync && logger.isDebugEnabled()) {
		logger.debug("并行执行模式:自动忽略 wrapSyncToolsAsAsync,同步工具直接在并行线程中执行,防止线程池耗尽");
	}

	// 获取工具执行线程池(来自 ParallelNode 全局线程池)
	Executor executor = getToolExecutor(config);

	// ===================== 2. 线程安全:创建状态快照 =====================
	// 浅拷贝状态,避免并行工具修改原始状态导致线程不安全
	OverAllState stateSnapshot = state.snapShot().orElse(state);

	// 工具状态收集器:支持按 KeyStrategy 并发安全合并工具结果
	ToolStateCollector stateCollector = new ToolStateCollector(toolCalls.size(), state.keyStrategies());

	// ===================== 3. 并发安全数据结构 =====================
	// 原子引用数组:CAS 无锁更新工具执行结果,避免线程竞争
	AtomicReferenceArray<ToolCallResponse> orderedResponses = new AtomicReferenceArray<>(toolCalls.size());
	// 同步集合:收集执行异常
	List<Throwable> failures = java.util.Collections.synchronizedList(new ArrayList<>());

	// 并发Map:存储每个工具的取消令牌,支持超时/手动取消可取消异步工具
	Map<Integer, DefaultCancellationToken> cancellationTokens = new ConcurrentHashMap<>();

	// ===================== 4. 并发限流:信号量控制最大并行工具数 =====================
	// 限制同时执行的工具数量,防止服务器压力过大
	Semaphore semaphore = new Semaphore(maxParallelTools);

	// ===================== 5. 构建并行任务流 =====================
	List<CompletableFuture<Void>> futures = IntStream.range(0, toolCalls.size()).mapToObj(index -> {
		// 获取当前工具
		AssistantMessage.ToolCall toolCall = toolCalls.get(index);
		// 为当前工具创建独立的状态更新Map
		Map<String, Object> toolSpecificUpdate = stateCollector.createToolUpdateMap(index);

		// 提交异步任务到线程池
		return CompletableFuture.runAsync(() -> {
				try {
					// 获取信号量许可,限流并发
					semaphore.acquire();
					try {
						// ===================== 6. 执行工具(带拦截器、状态注入、取消令牌) =====================
						// 内部调用 executeToolByType → 执行同步/异步/可取消工具
						ToolCallResponse response = executeToolCallWithInterceptors(
								toolCall, stateSnapshot, config,
								toolSpecificUpdate, true, cancellationTokens, index
						);
						// CAS 原子更新结果:仅当结果为空时写入(避免超时覆盖正常结果)
						orderedResponses.compareAndSet(index, null, response);
					} finally {
						// 释放信号量
						semaphore.release();
					}
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					failures.add(e);
					// 中断异常:写入错误结果
					orderedResponses.compareAndSet(index, null,
							ToolCallResponse.error(toolCall.id(), toolCall.name(), "工具执行被中断"));
				}
			}, executor)
			// 全局超时控制
			.orTimeout(toolExecutionTimeout.toMillis(), TimeUnit.MILLISECONDS)
			// 异常/超时处理
			.exceptionally(ex -> {
				Throwable cause = ex instanceof CompletionException ? ex.getCause() : ex;
				ToolCallResponse errorResponse = ToolCallResponse.error(toolCall.id(), toolCall.name(),
						extractErrorMessage(cause));

				// CAS 原子写入错误结果
				if (orderedResponses.compareAndSet(index, null, errorResponse)) {
					// 超时:丢弃脏状态 + 取消工具执行
					if (cause instanceof TimeoutException) {
						// 丢弃超时工具的状态更新,避免脏数据
						stateCollector.discardToolUpdateMap(index);
						// 触发协作式取消:通知可取消工具优雅停止
						DefaultCancellationToken token = cancellationTokens.get(index);
						if (token != null) {
							token.cancel();
							logger.debug("工具[{}]索引[{}]超时,已发送取消信号", toolCall.name(), index);
						}
					}
					failures.add(ex);
				}
				return null;
			});
	}).toList();

	// ===================== 7. 等待所有并行工具执行完成 =====================
	CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

	// ===================== 8. 组装执行结果 =====================
	Map<String, Object> updatedState = new HashMap<>();
	List<ToolResponseMessage.ToolResponse> toolResponses = new ArrayList<>();
	Boolean returnDirect = null;

	// 遍历原子数组,收集所有工具结果
	for (int i = 0; i < orderedResponses.length(); i++) {
		ToolCallResponse response = orderedResponses.get(i);
		// 兜底:空结果自动构造错误响应
		if (response == null) {
			AssistantMessage.ToolCall toolCall = toolCalls.get(i);
			response = ToolCallResponse.error(toolCall.id(), toolCall.name(), "工具未返回有效结果");
			logger.warn("工具[{}]索引[{}]结果为空,使用兜底错误", toolCall.name(), i);
		}
		toolResponses.add(response.toToolResponse());
		returnDirect = shouldReturnDirect(toolCalls.get(i), returnDirect, config);
	}

	// 构建消息体,写入Agent状态
	ToolResponseMessage.Builder builder = ToolResponseMessage.builder().responses(toolResponses);
	if (returnDirect != null && returnDirect) {
		builder.metadata(Map.of(FINISH_REASON_METADATA_KEY, FINISH_REASON));
	}
	updatedState.put("messages", builder.build());
	// 合并所有工具的状态更新(按 KeyStrategy 策略)
	updatedState.putAll(stateCollector.mergeAll());

	// 执行日志
	if (enableActingLog) {
		logger.info("[ThreadId {}] Agent {} 并行工具执行完成,总工具数:{},失败数:{}",
				config.threadId().orElse(THREAD_ID_DEFAULT), agentName, toolCalls.size(), failures.size());
	}

	return updatedState;
}

3.3.6 使用拦截器执行

执行带拦截器链的工具调用,支持同步/异步两种工具回调:

/**
 * 执行带拦截器链的工具调用,支持同步/异步两种工具回调
 * @param toolCall 待执行的工具调用对象
 * @param state 当前全局状态
 * @param config 运行时配置
 * @param extraStateFromToolCall 用于收集工具执行产生的状态更新的Map
 * @param inParallelExecution 是否处于并行执行上下文:true=并行,false=串行
 * @return 工具调用响应结果
 */
private ToolCallResponse executeToolCallWithInterceptors(AssistantMessage.ToolCall toolCall, OverAllState state,
        RunnableConfig config, Map<String, Object> extraStateFromToolCall, boolean inParallelExecution) {
    // 重载方法:调用完整参数版本,取消令牌传null,工具索引传-1(表示无索引)
    return executeToolCallWithInterceptors(toolCall, state, config, extraStateFromToolCall, inParallelExecution,
            null, -1);
}

executeToolCallWithInterceptors 核心逻辑:

  1. 构建工具调用请求对象,封装调用参数、上下文、运行环境
  2. 定义基础执行处理器:真正执行工具调用的核心逻辑
  3. 组装拦截器链:将所有工具拦截器与基础处理器串联
  4. 执行拦截器链 + 工具调用,返回最终响应
/**
 * 执行带拦截器链的工具调用,并行执行时支持可选的取消令牌追踪
 * @param toolCall 待执行的工具调用对象
 * @param state 当前全局状态
 * @param config 运行时配置
 * @param extraStateFromToolCall 用于收集工具执行产生的状态更新的Map
 * @param inParallelExecution 是否处于并行执行上下文:true=并行,false=串行
 * @param cancellationTokens 并行执行专用的取消令牌Map(可为null)
 * @param toolIndex 并行执行中当前工具的索引(作为取消令牌Map的key)
 * @return 工具调用响应结果
 */
private ToolCallResponse executeToolCallWithInterceptors(AssistantMessage.ToolCall toolCall, OverAllState state,
        RunnableConfig config, Map<String, Object> extraStateFromToolCall, boolean inParallelExecution,
        Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {

    // 1. 构建工具调用请求对象,封装调用参数、上下文、运行环境
    ToolCallRequest request = ToolCallRequest.builder()
            .toolCall(toolCall)
            .context(config.metadata().orElse(new HashMap<>()))
            .executionContext(new ToolCallExecutionContext(config, state))
            .build();

    // 2. 定义基础执行处理器:真正执行工具调用的核心逻辑
    ToolCallHandler baseHandler = req -> {
        // 根据工具名称匹配对应的工具回调器
        ToolCallback toolCallback = resolve(req.getToolName(), config);

        // 未找到对应工具:打印警告并抛出异常
        if (toolCallback == null) {
            logger.warn(POSSIBLE_LLM_TOOL_NAME_CHANGE_WARNING, req.getToolName());
            throw new IllegalStateException("未找到对应名称的工具回调器:" + req.getToolName());
        }

        // 开启执行日志时:打印线程、代理、执行工具信息
        if (enableActingLog) {
            logger.info("[线程ID {}] 代理 {} 执行操作,调用工具 {}",
                    config.threadId().orElse(THREAD_ID_DEFAULT), agentName, req.getToolName());
        }

        // 构造工具执行上下文:合并默认上下文 + 请求上下文
        Map<String, Object> toolContextMap = new HashMap<>(toolContext);
        toolContextMap.putAll(req.getContext());

        // 对需要状态注入的工具类型,注入全局状态、配置、可更新状态
        // 支持类型:状态感知工具/函数工具/方法工具
        if (toolCallback instanceof StateAwareToolCallback || toolCallback instanceof FunctionToolCallback<?, ?>
                || toolCallback instanceof MethodToolCallback) {
            toolContextMap.putAll(Map.of(
                    AGENT_STATE_CONTEXT_KEY, state,                // 代理状态
                    AGENT_CONFIG_CONTEXT_KEY, config,              // 代理配置
                    AGENT_STATE_FOR_UPDATE_CONTEXT_KEY, extraStateFromToolCall  // 可更新状态
            ));
        }

        // 根据回调器类型(同步/异步)路由执行逻辑
        return executeToolByType(toolCallback, req, toolContextMap, config, extraStateFromToolCall,
                inParallelExecution, cancellationTokens, toolIndex);
    };

    // 3. 组装拦截器链:将所有工具拦截器与基础处理器串联
    ToolCallHandler chainedHandler = InterceptorChain.chainToolInterceptors(toolInterceptors, baseHandler);

    // 4. 执行拦截器链 + 工具调用,返回最终响应
    return chainedHandler.call(request);
}

根据工具名称查找对应的工具回调实现(三级查找策略)

  1. 优先从本地注册的 toolCallbacks 中匹配
  2. 其次从运行配置元数据中获取动态注册的工具
  3. 最后通过全局工具回调解析器兜底查找
/**
 * 根据工具名称查找对应的工具回调实现(三级查找策略)
 * 1. 优先从本地注册的 toolCallbacks 中匹配
 * 2. 其次从运行配置元数据中获取动态注册的工具
 * 3. 最后通过全局工具回调解析器兜底查找
 * @param toolName 工具名称
 * @param config 运行时配置
 * @return 匹配到的 ToolCallback,未找到则返回 null
 */
private ToolCallback resolve(String toolName, RunnableConfig config) {
    // 1. 从本地注册的工具回调列表中精确匹配名称
    if (toolCallbacks != null) {
        var fromNode = toolCallbacks.stream()
            .filter(callback -> callback.getToolDefinition().name().equals(toolName))
            .findFirst();
        if (fromNode.isPresent()) {
            return fromNode.get();
        }
    }

    // 2. 从配置上下文获取动态注册的工具(AgentLlmNode/拦截器注入)
    ToolCallback fromDynamic = resolveFromConfigMetadata(toolName, config);
    if (fromDynamic != null) {
        return fromDynamic;
    }

    // 3. 全局解析器兜底查找
    return toolCallbackResolver == null ? null : toolCallbackResolver.resolve(toolName);
}

/**
 * 从运行配置的上下文元数据中,查找动态注册的工具回调
 * @param toolName 工具名称
 * @param config 运行时配置
 * @return 匹配的动态工具回调,未找到返回 null
 */
private ToolCallback resolveFromConfigMetadata(String toolName, RunnableConfig config) {
    return Optional.ofNullable(config.context().get(RunnableConfig.DYNAMIC_TOOL_CALLBACKS_METADATA_KEY))
        .filter(v -> v instanceof List)
        .map(v -> (List<ToolCallback>) v)
        .flatMap(list -> list.stream()
            .filter(tc -> tc != null && toolName.equals(tc.getToolDefinition().name()))
            .findFirst())
        .orElse(null);
}

3.3.7 根据工具回调类型路由执行

根据工具回调类型路由执行逻辑,并行执行时支持可选的取消令牌追踪:

  1. 如果是异步工具回调,直接执行异步逻辑
  2. 开启同步转异步 + 串行执行模式:将同步工具包装为异步执行
  3. 其他场景:直接执行同步工具
/**
 * 根据工具回调类型路由执行逻辑,并行执行时支持可选的取消令牌追踪
 * @param toolCallback 待执行的工具回调实例
 * @param request 工具调用请求对象
 * @param toolContextMap 工具执行上下文
 * @param config 运行时配置
 * @param extraStateFromToolCall 用于收集状态更新的Map
 * @param inParallelExecution 是否处于并行执行上下文
 * @param cancellationTokens 并行执行专用的取消令牌Map(可为null)
 * @param toolIndex 并行执行中当前工具的索引
 * @return 工具调用响应结果
 */
private ToolCallResponse executeToolByType(ToolCallback toolCallback, ToolCallRequest request,
        Map<String, Object> toolContextMap, RunnableConfig config, Map<String, Object> extraStateFromToolCall,
        boolean inParallelExecution, Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {

    // 1. 如果是异步工具回调,直接执行异步逻辑
    if (toolCallback instanceof AsyncToolCallback async) {
        return executeAsyncTool(async, request, toolContextMap, config, extraStateFromToolCall, cancellationTokens,
                toolIndex);
    }
    // 2. 开启同步转异步 + 串行执行模式:将同步工具包装为异步执行
    // 注意:仅在串行模式下包装,并行模式外层已有并发,包装会导致线程池耗尽/死锁
    else if (wrapSyncToolsAsAsync && !inParallelExecution) {
        // 获取工具执行线程池
        Executor executor = getToolExecutor(config);
        // 将同步工具适配包装为异步工具
        AsyncToolCallback wrappedAsync = AsyncToolCallbackAdapter.wrapIfNeeded(toolCallback, executor,
                toolExecutionTimeout);
        // 执行包装后的异步工具
        return executeAsyncTool(wrappedAsync, request, toolContextMap, config, extraStateFromToolCall,
                cancellationTokens, toolIndex);
    }
    // 3. 其他场景:直接执行同步工具
    else {
        return executeSyncTool(toolCallback, request, toolContextMap, config);
    }
}

三种路由:

executeToolByType(toolCallback, request, toolContextMap, ..., inParallelExecution)
  │
  ├─ toolCallback instanceof AsyncToolCallback?
  │     └─ Yes → executeAsyncTool(async, ...)
  │
  ├─ wrapSyncToolsAsAsync && !inParallelExecution?
  │     ├─ Yes → AsyncToolCallbackAdapter.wrapIfNeeded(toolCallback, executor, timeout)
  │     │        → executeAsyncTool(wrappedAsync, ...)
  │     └─ No(并行模式下不包装,避免线程池饥饿/死锁)
  │
  └─ 默认 → executeSyncTool(toolCallback, ...)

wrapSyncToolsAsAsync=true 且不在并行执行模式中时,同步工具通过 AsyncToolCallbackAdapter 包装为异步执行:

AsyncToolCallbackAdapter.wrapIfNeeded(syncCallback, executor, timeout)
  │
  ├─ 已经是 AsyncToolCallback → 直接返回
  └─ 同步工具 → 包装为 AsyncToolCallback
        callAsync(arguments, context):
          CompletableFuture.supplyAsync(() -> delegate.call(arguments, context), executor)

为什么并行模式下禁用? 并行模式已经通过外层 runAsync 提供了并发。如果在并行模式中再包装同步工具为异步,外层任务占用线程等待内层异步任务,而内层异步任务又需要同一线程池 → 线程池饥饿/死锁

3.3.8 执行异步工具

执行异步工具,内置超时处理,支持外部取消令牌追踪。

CancellableAsyncToolCallback 提供协作式取消支持:当工具实现该接口时,会创建真实的取消令牌并传递给工具;超时发生时,会主动取消令牌,让工具优雅终止。

并行执行场景下:取消令牌会存入传入的 cancellationTokens 集合,便于外层超时处理器统一触发取消操作。

/**
 * 执行异步工具,内置超时处理,支持外部取消令牌追踪
 * @param callback 待执行的异步工具回调
 * @param request 工具调用请求
 * @param toolContextMap 工具执行上下文
 * @param config 运行时配置
 * @param extraStateFromToolCall 收集工具执行产生的状态更新
 * @param cancellationTokens 并行执行专用的取消令牌Map(可为null)
 * @param toolIndex 并行执行中当前工具的索引(作为Map的key)
 * @return 工具调用响应结果
 */
private ToolCallResponse executeAsyncTool(AsyncToolCallback callback, ToolCallRequest request,
        Map<String, Object> toolContextMap, RunnableConfig config, Map<String, Object> extraStateFromToolCall,
        Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {

    // 构建工具上下文对象
    ToolContext context = new ToolContext(toolContextMap);

    // 声明取消令牌(仅可取消异步工具会使用)
    DefaultCancellationToken cancellationToken = null;

    try {
        CompletableFuture<String> future;

        // 根据回调类型分发执行逻辑:可取消工具使用真实令牌
        if (callback instanceof CancellableAsyncToolCallback cancellable) {
            // 创建取消令牌
            cancellationToken = new DefaultCancellationToken();
            // 并行执行时:将令牌存入外部Map,供外层统一管理取消
            if (cancellationTokens != null && toolIndex >= 0) {
                cancellationTokens.put(toolIndex, cancellationToken);
            }
            // 调用可取消的异步方法,传入令牌
            future = cancellable.callAsync(request.getArguments(), context, cancellationToken);
        }
        else {
            // 普通异步工具:直接执行
            future = callback.callAsync(request.getArguments(), context);
        }

        // 防御性判断:异步返回null
        if (future == null) {
            return ToolCallResponse.error(request.getToolCallId(), request.getToolName(),
                    "异步工具返回了空的Future");
        }

        // 为异步任务设置超时时间,阻塞等待执行结果
        String result = future.orTimeout(callback.getTimeout().toMillis(), TimeUnit.MILLISECONDS).join();

        // 执行成功日志
        if (enableActingLog) {
            logger.info("[线程ID {}] 代理 {} 执行完成,异步工具 {} 已结束",
                    config.threadId().orElse(THREAD_ID_DEFAULT), agentName, request.getToolName());
            if (logger.isDebugEnabled()) {
                logger.debug("工具 {} 返回结果:{}", request.getToolName(), result);
            }
        }

        // 封装并返回成功响应
        return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
    }
    catch (CompletionException e) {
        Throwable cause = e.getCause() != null ? e.getCause() : e;

        // 1. 超时异常:清空状态更新,取消令牌,返回错误
        if (cause instanceof TimeoutException) {
            if (cancellationToken != null) {
                cancellationToken.cancel();
            }
            // 超时后清空脏状态,避免污染合并结果
            extraStateFromToolCall.clear();
            logger.warn("异步工具 {} 执行超时,已丢弃所有状态更新", request.getToolName());
            return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
        }
        // 2. 工具执行异常:使用异常处理器生成友好结果
        else if (cause instanceof ToolExecutionException toolExecutionException) {
            logger.error("异步工具 {} 执行失败,异常处理器:{}", request.getToolName(),
                    toolExecutionExceptionProcessor.getClass().getName(), toolExecutionException);
            String result = toolExecutionExceptionProcessor.process(toolExecutionException);
            return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
        }
        // 3. 其他执行异常
        else {
            logger.error("异步工具 {} 执行失败:{}", request.getToolName(), cause.getMessage(), cause);
            return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
        }
    }
    // 任务被主动取消
    catch (CancellationException e) {
        logger.warn("异步工具 {} 执行被取消", request.getToolName(), e);
        return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
    }
    // 兜底异常捕获
    catch (Exception e) {
        logger.error("异步工具 {} 执行失败:{}", request.getToolName(), e.getMessage(), e);
        return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
    }
}

3.3.9 执行同步工具

/**
 * 执行同步工具调用,并统一处理异常
 * @param callback 待执行的同步工具回调
 * @param request 工具调用请求对象
 * @param toolContextMap 工具执行上下文
 * @param config 运行时配置
 * @return 工具调用响应结果
 */
private ToolCallResponse executeSyncTool(ToolCallback callback, ToolCallRequest request,
        Map<String, Object> toolContextMap, RunnableConfig config) {

    // 构建工具执行上下文
    ToolContext context = new ToolContext(toolContextMap);

    try {
        // 同步执行工具调用,直接获取返回结果
        String result = callback.call(request.getArguments(), context);

        // 开启执行日志时打印信息
        if (enableActingLog) {
            logger.info("[线程ID {}] 代理 {} 执行操作,工具 {} 调用完成",
                    config.threadId().orElse(THREAD_ID_DEFAULT), agentName, request.getToolName());
            if (logger.isDebugEnabled()) {
                logger.debug("工具 {} 返回结果:{}", request.getToolName(), result);
            }
        }

        // 封装并返回成功响应
        return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
    }
    // 捕获工具执行异常,使用异常处理器生成友好结果
    catch (ToolExecutionException e) {
        logger.error("工具 {} 执行失败,使用异常处理器处理:{}", request.getToolName(),
                toolExecutionExceptionProcessor.getClass().getName(), e);
        String result = toolExecutionExceptionProcessor.process(e);
        return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
    }
    // 捕获其他所有异常,返回错误响应
    catch (Exception e) {
        logger.error("工具 {} 执行失败:{}", request.getToolName(), e.getMessage(), e);
        return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), e);
    }
}

3.3.10 部分执行处理

lastMessageToolResponseMessage 时(表示部分工具已执行),处理剩余工具:

handlePartialToolResponses(toolResponseMessage, messages, state, config)
  │
  ├─ 取倒数第二条消息(应为 AssistantMessage)
  ├─ 计算已执行工具 ID 集合:existingResponses.stream().map(id)
  ├─ 过滤出未执行工具:remainingToolCalls = assistantMessage.toolCalls - executedToolIds
  │
  ├─ remainingToolCalls 为空?
  │     └─ Yes → return Map.of()  // 全部已执行,避免重复追加
  │
  ├─ 执行剩余工具(支持并行/串行)
  │
  ├─ 合并 existingResponses + newResponses
  ├─ 计算 returnDirect(基于所有工具调用)
  │
  └─ 返回合并后的状态:
        ├─ "messages" → [merged ToolResponseMessage, RemoveByHash(oldToolResponseMessage)]
        └─ ...newResults
/**
 * 处理分段/增量式工具响应:补全未执行完的工具调用
 * 用于支持部分工具已执行、剩余工具需要继续执行的场景
 * @param toolResponseMessage 当前工具响应消息
 * @param messages 完整消息列表
 * @param state 全局状态
 * @param config 运行时配置
 * @return 包含更新后消息与状态的结果集
 */
private Map<String, Object> handlePartialToolResponses(ToolResponseMessage toolResponseMessage,
        List<Message> messages, OverAllState state, RunnableConfig config) {

    // 校验消息长度:必须能找到前一条助手消息
    if (messages.size() < 2) {
        throw new IllegalStateException("无法在工具响应消息之前找到助手消息");
    }

    // 获取倒数第二条消息,必须是 AssistantMessage(工具调用来源)
    Message secondLastMessage = messages.get(messages.size() - 2);
    if (!(secondLastMessage instanceof AssistantMessage assistantMessage)) {
        throw new IllegalStateException("工具响应消息的前一条消息不是助手消息");
    }

    // 获取已执行完成的工具响应,并收集已执行的工具ID
    List<ToolResponseMessage.ToolResponse> existingResponses = toolResponseMessage.getResponses();
    Set<String> executedToolIds = existingResponses.stream()
        .map(ToolResponseMessage.ToolResponse::id)
        .collect(Collectors.toSet());

    // 过滤出**尚未执行**的工具调用
    List<AssistantMessage.ToolCall> remainingToolCalls = assistantMessage.getToolCalls()
        .stream()
        .filter(tc -> !executedToolIds.contains(tc.id()))
        .toList();

    // 所有工具都已执行完毕:返回空Map,避免重复追加
    if (remainingToolCalls.isEmpty()) {
        return Map.of();
    }

    // 打印执行日志:总工具数 / 已完成数
    if (enableActingLog) {
        logger.info("[线程ID {}] 代理 {} 开始执行剩余工具,共 {} 个,已完成 {} 个",
                config.threadId().orElse(THREAD_ID_DEFAULT), agentName,
                assistantMessage.getToolCalls().size(), existingResponses.size());
    }

    // 执行剩余工具:并行 / 串行
    Map<String, Object> newResults;
    if (parallelToolExecution && remainingToolCalls.size() > 1) {
        newResults = executeToolCallsParallel(remainingToolCalls, state, config);
    }
    else {
        newResults = executeToolCallsSequential(remainingToolCalls, state, config);
    }

    // 合并:旧响应 + 新执行的响应
    ToolResponseMessage newToolResponseMessage = (ToolResponseMessage) newResults.get("messages");
    List<ToolResponseMessage.ToolResponse> allResponses = new ArrayList<>(existingResponses);
    allResponses.addAll(newToolResponseMessage.getResponses());

    // 统一计算 returnDirect:遍历所有工具(已执行+未执行)
    Boolean returnDirect = null;
    for (AssistantMessage.ToolCall toolCall : assistantMessage.getToolCalls()) {
        returnDirect = shouldReturnDirect(toolCall, returnDirect, config);
    }

    // 构建最终状态:替换旧的工具响应,追加新的完整响应
    Map<String, Object> updatedState = new HashMap<>(newResults);
    List<Object> newMessages = new ArrayList<>();

    // 构建完整工具响应
    ToolResponseMessage.Builder builder = ToolResponseMessage.builder().responses(allResponses);
    if (returnDirect != null && returnDirect) {
        builder.metadata(Map.of(FINISH_REASON_METADATA_KEY, FINISH_REASON));
    }

    // 消息更新规则:添加新响应 + 移除旧的工具响应
    newMessages.add(builder.build());
    newMessages.add(new RemoveByHash<>(toolResponseMessage));
    updatedState.put("messages", newMessages);

    // 执行完成日志
    if (enableActingLog) {
        logger.info("[线程ID {}] 代理 {} 分段工具执行完成并返回",
                config.threadId().orElse(THREAD_ID_DEFAULT), agentName);
    }

    return updatedState;
}

RemoveByHash 是一个特殊标记,告诉状态引擎用新的合并消息替换旧的 ToolResponseMessage,而不是追加。


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐