Spring AI Alibaba 1.x 系列【38】AgentLlmNode、AgentToolNode 构建、执行流程分析
1. 概述
AgentLlmNode 和 AgentToolNode 是 ReactAgent 的两大核心节点,共同实现了 ReAct (Reasoning + Acting) 执行范式:
| 节点 | 角色 | 职责 | 接口 |
|---|---|---|---|
| AgentLlmNode | 推理节点 (Reasoning) | 消息组装、模板渲染、拦截器链调用、LLM 模型调用、流式/非流式响应处理 | NodeActionWithConfig |
| AgentToolNode | 行动节点 (Acting) | 工具解析、拦截器链调用、同步/异步工具执行、并行调度、状态收集与合并 | NodeActionWithConfig |
在 ReactAgent 中的位置:
START → beforeAgent Hooks → beforeModel Hooks → [AgentLlmNode] → afterModel Hooks
↑ ↓
│ (有工具调用?)
│ ↓ Yes
└──────────────────── [AgentToolNode] ←──────────────┘
↓ (无工具调用 / return_direct)
afterAgent Hooks → END
两者均实现 NodeActionWithConfig 接口:
Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception
接收共享状态 OverAllState 和运行配置 RunnableConfig,返回部分状态更新 Map,由图引擎根据 KeyStrategy 合并返回全局状态。
1.1 AgentLlmNode
负责【调用大语言模型】的核心执行节点,实现 NodeActionWithConfig 接口,作为图工作流的可执行节点,接收状态 + 配置并执行 LLM 推理。
核心依赖:
public class AgentLlmNode implements NodeActionWithConfig {
// 日志对象,用于节点执行、模型调用、异常等日志打印
private static final Logger logger = LoggerFactory.getLogger(AgentLlmNode.class);
// 模型迭代次数字段KEY(存储在运行配置上下文,用于记录工具多轮调用/推理轮次)
public static final String MODEL_ITERATION_KEY = "_MODEL_ITERATION_";
// ==================== 基础配置字段 ====================
// 智能体名称(标识当前LLM节点所属的智能体,用于日志、监控区分)
private String agentName;
// Spring AI 顾问列表,用于增强LLM请求处理逻辑(如上下文管理、参数增强)
private List<Advisor> advisors = new ArrayList<>();
// FIXME: 工具回调集合应仅在 chatOptions 中统一管理
// 当前通过不可变列表保证不可变性,后续需重构为仅依赖chatOptions管理工具
// 智能体可调用的工具回调列表(LLM可调用的外部工具集合)
private List<ToolCallback> toolCallbacks = new ArrayList<>();
// 模型调用拦截器列表,支持在LLM调用前后做扩展处理(如参数修改、日志、重试、工具选择)
private List<ModelInterceptor> modelInterceptors = new ArrayList<>();
// 输出结果存储的状态KEY(默认值:messages,用于指定LLM响应写入全局状态的字段)
private String outputKey;
// 输出格式约束(JSON Schema/结构化格式,强制LLM按指定格式返回结果)
private String outputSchema;
// Spring AI 核心对话客户端,真正执行大模型调用的底层组件
private ChatClient chatClient;
// 系统提示词(LLM的角色定义、行为约束、核心指令)
private String systemPrompt;
// 提示词模板渲染器,用于渲染动态变量的提示词模板
private TemplateRenderer templateRenderer;
// 智能体执行指令(补充性指令,与systemPrompt配合使用)
private String instruction;
// LLM调用配置(温度、top_p、最大token、工具调用开关等模型参数)
private ToolCallingChatOptions chatOptions;
// 是否开启推理日志(开启后会打印详细的LLM推理过程、输入输出、工具调用日志,用于调试)
private boolean enableReasoningLog;
完整配置构建示例:
AgentLlmNode llmNode = AgentLlmNode.builder()
.agentName(name) // Agent 名称
.chatOptions(effectiveOptions) // 合并后的 ChatOptions
.chatClient(chatClient) // ChatClient 实例
.outputKey(outputKey) // 可选,自定义输出键
.systemPrompt(systemPrompt) // 系统提示词
.templateRenderer(templateRenderer) // 模板渲染器
.instruction(instruction) // Agent 指令
.outputSchema(outputSchema) // 输出格式约束
.toolCallbacks(unmodifiableList(allTools)) // 工具列表(不可变)
.enableReasoningLog(enableLogging) // 推理日志开关
.build();
1.2 AgentToolNode
执行助手消息中工具调用的节点。
核心特性:
- 支持串行与并行两种工具执行模式。开启并行执行后,多个工具可在配置限制内并发运行。
- 自动识别实现
AsyncToolCallback的异步工具,并以异步方式执行,同时提供完善的超时机制处理。
核心依赖:
public class AgentToolNode implements NodeActionWithConfig {
/** 日志记录器,用于工具执行、异常、运行日志输出 */
private static final Logger logger = LoggerFactory.getLogger(AgentToolNode.class);
/** 当前工具节点所属智能体名称 */
private final String agentName;
/** 是否开启工具行为执行日志 */
private final boolean enableActingLog;
/** 是否开启工具并行执行能力 */
private final boolean parallelToolExecution;
/** 最大并行执行工具数量上限 */
private final int maxParallelTools;
/** 工具全局执行超时时间 */
private final Duration toolExecutionTimeout;
/** 是否将同步工具包装为异步统一执行 */
private final boolean wrapSyncToolsAsAsync;
/** 已注册的工具回调实现集合 */
private List<ToolCallback> toolCallbacks;
/** 工具执行上下文,用于跨工具传递共享参数 */
private Map<String, Object> toolContext;
/** 工具调用拦截器集合,用于工具执行前后自定义扩展逻辑 */
private List<ToolInterceptor> toolInterceptors = new ArrayList<>();
/** 工具解析器:根据工具名称动态匹配、查找对应工具实现 */
private ToolCallbackResolver toolCallbackResolver;
/** 工具执行异常处理器:统一捕获、处理、包装工具运行时异常 */
private ToolExecutionExceptionProcessor toolExecutionExceptionProcessor;
}
完整配置构建示例:
AgentToolNode toolNode = AgentToolNode.builder()
.agentName(name) // Agent 名称
.parallelToolExecution(parallelToolExecution) // 是否并行执行
.maxParallelTools(maxParallelTools) // 最大并行数(默认5)
.toolExecutionTimeout(toolExecutionTimeout) // 工具超时(默认5分钟)
.wrapSyncToolsAsAsync(wrapSyncToolsAsAsync) // 同步工具自动包装为异步
.toolCallbackResolver(resolver) // 工具解析器
.toolCallbacks(allTools) // 工具列表
.enableActingLog(enableLogging) // 行动日志开关
.toolExecutionExceptionProcessor(processor) // 异常处理器
.toolContext(toolContext) // 工具上下文
.build();
Builder 默认值:
| 参数 | 默认值 | 说明 |
|---|---|---|
parallelToolExecution |
false |
默认串行执行 |
maxParallelTools |
5 |
并行上限,防止资源耗尽 |
toolExecutionTimeout |
5 minutes |
单个工具执行超时 |
wrapSyncToolsAsAsync |
false |
默认不包装同步工具 |
toolExecutionExceptionProcessor |
DefaultToolExecutionExceptionProcessor(alwaysThrow=false) |
异常时不抛出,返回错误消息 |
2. 加载流程
2.1 构建入口
两个节点的构建统一在 DefaultBuilder.build() 中完成,流程如下:
DefaultBuilder.build()
│
├─ 1. 参数校验(name 非空,chatClient/model 至少提供一个)
│
├─ 2. 合并 ChatOptions
│ mergeSourceOptionsWithAgentOptions(sourceOptions, agentOptions)
│ → agentOptions 覆盖 sourceOptions 同名字段
│
├─ 3. 构建 ChatClient
│ - 若无 chatClient:ChatClient.builder(model).defaultOptions(effectiveOptions).build()
│ - 若有 chatClient:chatClient.mutate().defaultOptions(effectiveOptions).build()
│
├─ 4. 拦截器分类:separateInterceptorsByType()
│ - 统一 Interceptor 列表 → 按类型分为 ModelInterceptor[] 和 ToolInterceptor[]
│
├─ 5. 工具收集:gatherLocalTools()
│ - 按优先级合并:Hook 工具 > Interceptor 工具 > 用户工具
│ - 来源:直接 tools / ToolCallbackProvider / toolNames 解析 / resolver 反射提取
│
├─ 6. 构建 AgentLlmNode(详见 2.2)
│
├─ 7. 构建 AgentToolNode(详见 2.3)
│
└─ 8. 创建 ReactAgent(llmNode, toolNode, compileConfig, builder)
→ 注入拦截器到节点
→ 设置 hasTools 标志
2.2 构建 AgentLlmNode
DefaultBuilder.build() 中初始化 AgentLlmNode 建造者,依次设置参数:
// 初始化 AgentLlmNode 建造者,配置基础核心参数
AgentLlmNode.Builder llmNodeBuilder = AgentLlmNode.builder()
.agentName(this.name) // 设置智能体名称
.chatOptions(effectiveOptions) // 设置生效的大模型调用配置
.chatClient(chatClient); // 设置Spring AI对话客户端
// 条件配置:输出结果存储的状态Key(非空时设置)
if (outputKey != null && !outputKey.isEmpty()) {
llmNodeBuilder.outputKey(outputKey);
}
// 条件配置:系统提示词(非空时设置)
if (systemPrompt != null) {
llmNodeBuilder.systemPrompt(systemPrompt);
}
// 条件配置:提示词模板渲染器(非空时设置)
if (templateRenderer != null) {
llmNodeBuilder.templateRenderer(templateRenderer);
}
// 条件配置:智能体执行指令(非空时设置)
if (instruction != null) {
llmNodeBuilder.instruction(instruction);
}
// 初始化输出格式约束Schema
String outputSchema = null;
// 优先使用自定义的输出Schema
if (StringUtils.hasLength(this.outputSchema)) {
outputSchema = this.outputSchema;
}
// 无自定义Schema时,根据输出类型自动生成格式约束
else if (this.outputType != null) {
FormatProvider formatProvider = new BeanOutputConverter<>(this.outputType);
outputSchema = formatProvider.getFormat();
}
// 条件配置:输出格式Schema(非空时设置,强制LLM按格式返回)
if (StringUtils.hasLength(outputSchema)) {
llmNodeBuilder.outputSchema(outputSchema);
}
// 条件配置:工具回调列表(存在可用工具时,设置不可变列表保证线程安全)
if (CollectionUtils.isNotEmpty(allTools)) {
llmNodeBuilder.toolCallbacks(Collections.unmodifiableList(allTools));
}
// 条件配置:开启推理日志(日志开关开启时启用)
if (enableLogging) {
llmNodeBuilder.enableReasoningLog(true);
}
// 构建最终的 AgentLlmNode 实例
AgentLlmNode llmNode = llmNodeBuilder.build();
构造函数:
public AgentLlmNode(Builder builder) {
this.agentName = builder.agentName;
this.outputKey = builder.outputKey;
this.outputSchema = builder.outputSchema;
this.systemPrompt = builder.systemPrompt;
this.instruction = builder.instruction;
this.templateRenderer = builder.templateRenderer;
if (builder.advisors != null) {
this.advisors = builder.advisors;
}
if (builder.toolCallbacks != null) {
this.toolCallbacks = builder.toolCallbacks;
}
if (builder.modelInterceptors != null) {
this.modelInterceptors = builder.modelInterceptors;
}
this.chatClient = builder.chatClient;
this.chatOptions = buildChatOptions(builder.chatOptions, this.toolCallbacks);
this.enableReasoningLog = builder.enableReasoningLog;
}
其中 buildChatOptions() 是构建阶段的核心方法,合并工具回调与聊天配置,构建最终的工具调用聊天参数
- 若原始聊天配置为空且无工具回调,返回
null - 若原始配置是
ToolCallingChatOptions类型,合并工具回调(传入的toolCallbacks优先级更高) - 若原始配置类型不匹配,打印警告并创建新的配置对象
/**
* 合并工具回调与聊天配置,构建最终的工具调用聊天参数
* @param chatOptions 原始的聊天配置参数
* @param toolCallbacks 需要集成的工具回调集合
* @return 合并完成的工具调用聊天配置
*/
@Nullable
private ToolCallingChatOptions buildChatOptions(ChatOptions chatOptions, List<ToolCallback> toolCallbacks) {
// 场景1:无任何配置和工具,直接返回null
if (chatOptions == null && (toolCallbacks == null || toolCallbacks.isEmpty())) {
return null;
}
// 场景2:存在原始聊天配置,执行配置合并逻辑
if (chatOptions != null) {
// 判断配置是否为支持工具调用的类型
if (chatOptions instanceof ToolCallingChatOptions builderToolCallingOptions) {
// 复制原始配置,避免修改原对象
ToolCallingChatOptions copiedOptions = builderToolCallingOptions.copy();
// 初始化合并列表:优先使用传入的 toolCallbacks
List<ToolCallback> mergedToolCallbacks = new ArrayList<>(toolCallbacks);
// 遍历原始配置中的工具,将未重复的工具补充到合并列表(保证工具不重复)
for (ToolCallback callback : builderToolCallingOptions.getToolCallbacks()) {
boolean exists = mergedToolCallbacks.stream()
.anyMatch(tc -> tc.getToolDefinition().name().equals(callback.getToolDefinition().name()));
if (!exists) {
mergedToolCallbacks.add(callback);
}
}
// 设置合并后的工具列表
copiedOptions.setToolCallbacks(mergedToolCallbacks);
// 强制禁用Spring AI内部工具执行,由框架统一管理工具调用流程
copiedOptions.setInternalToolExecutionEnabled(false);
return copiedOptions;
} else {
// 配置类型不匹配,打印警告日志,原始配置不生效
logger.warn("提供的聊天配置并非 ToolCallingChatOptions 类型(实际类型:{}),该配置不生效。将使用工具回调创建新配置。",
chatOptions.getClass().getName());
}
}
// 场景3:无有效原始配置,直接构建新的工具调用配置
return ToolCallingChatOptions.builder()
.toolCallbacks(toolCallbacks)
// 强制禁用内部工具执行
.internalToolExecutionEnabled(false)
.build();
}
其中 internalToolExecutionEnabled(false) 确保工具执行由 AgentToolNode 接管,而非 Spring AI 框架内部自动执行。这是 ReAct 模式的关键,模型只决策调用哪些工具,实际执行由工具节点完成。
/**
* 标识工具执行的责任主体:
* 是由 {@link ChatModel} 模型侧执行模型请求的工具,
* 还是由调用方(框架/业务层)直接执行工具。
*/
Builder internalToolExecutionEnabled(@Nullable Boolean internalToolExecutionEnabled);
2.3 构建 AgentToolNode
DefaultBuilder.build() 中初始化 AgentToolNode 建造者,依次设置参数:
// 使用所有可用工具初始化工具执行节点
AgentToolNode toolNode;
// 构建 AgentToolNode 建造者,配置核心工具执行参数
AgentToolNode.Builder toolBuilder = AgentToolNode.builder()
.agentName(this.name) // 设置智能体名称
.parallelToolExecution(this.parallelToolExecution) // 配置是否开启工具并行执行
.maxParallelTools(this.maxParallelTools) // 配置最大并行执行工具数量
.toolExecutionTimeout(this.toolExecutionTimeout) // 配置工具执行超时时间
.wrapSyncToolsAsAsync(this.wrapSyncToolsAsAsync); // 配置是否将同步工具包装为异步执行
// 条件配置:工具回调解析器(非空时注入)
if (resolver != null) {
toolBuilder.toolCallbackResolver(resolver);
}
// 条件配置:注册所有可用工具(工具列表非空时设置)
if (CollectionUtils.isNotEmpty(allTools)) {
toolBuilder.toolCallbacks(allTools);
}
// 条件配置:开启工具执行日志(日志开关开启时启用)
if (enableLogging) {
toolBuilder.enableActingLog(true);
}
// 配置工具执行异常处理器:无自定义处理器时,使用默认异常处理器
if (toolExecutionExceptionProcessor == null) {
toolBuilder.toolExecutionExceptionProcessor(DefaultToolExecutionExceptionProcessor.builder()
.alwaysThrow(false) // 默认不直接抛出异常
.build());
} else {
// 使用自定义的工具执行异常处理器
toolBuilder.toolExecutionExceptionProcessor(toolExecutionExceptionProcessor);
}
// 条件配置:工具执行上下文(非空时注入共享上下文参数)
if (toolContext != null && !toolContext.isEmpty()) {
toolBuilder.toolContext(toolContext);
}
// 构建最终的 AgentToolNode 工具执行节点实例
toolNode = toolBuilder.build();
构造函数:
public AgentToolNode(Builder builder) {
this.agentName = builder.agentName;
this.enableActingLog = builder.enableActingLog;
this.toolCallbackResolver = builder.toolCallbackResolver;
this.toolCallbacks = builder.toolCallbacks;
this.toolContext = builder.toolContext;
this.toolExecutionExceptionProcessor = builder.toolExecutionExceptionProcessor;
this.parallelToolExecution = builder.parallelToolExecution;
this.maxParallelTools = builder.maxParallelTools;
this.toolExecutionTimeout = builder.toolExecutionTimeout;
this.wrapSyncToolsAsAsync = builder.wrapSyncToolsAsAsync;
}
2.4 设置拦截器
DefaultBuilder.build() 执行完成后,进入到 ReactAgent 构造函数中,首先会将拦截器在中注入到节点:
// 为节点设置拦截器
// 从钩子中收集拦截器,并与当前已有的拦截器进行合并
List<ModelInterceptor> mergedModelInterceptors = collectAndMergeModelInterceptors();
List<ToolInterceptor> mergedToolInterceptors = collectAndMergeToolInterceptors();
// 若合并后的模型拦截器不为空,则将其设置到LLM节点中
if (mergedModelInterceptors != null && !mergedModelInterceptors.isEmpty()) {
this.llmNode.setModelInterceptors(mergedModelInterceptors);
}
// 若合并后的工具拦截器不为空,则将其设置到工具节点中
if (mergedToolInterceptors != null && !mergedToolInterceptors.isEmpty()) {
this.toolNode.setToolInterceptors(mergedToolInterceptors);
}
优先级规则: Agent 直接配置的拦截器优先于 Hook 贡献的拦截器。同名拦截器只保留 Agent 级别的。
2.5 hasTools 标识
最后给工具节点设置 hasTools 标识:
// 根据工具节点的工具回调列表,标记是否存在可用工具
hasTools = toolNode.getToolCallbacks() != null && !toolNode.getToolCallbacks().isEmpty();
3. 执行流程
3.1 初始化执行图
在 ReactAgent 第一次调用会编译状态图为 CompiledGraph ,在 getGraph() 方法中会进行状态图的初始化 initGraph() :
protected Optional<OverAllState> doInvoke(Map<String, Object> input, RunnableConfig runnableConfig) {
CompiledGraph compiledGraph = getAndCompileGraph();
return compiledGraph.invoke(input, buildNonStreamConfig(runnableConfig));
}
public synchronized CompiledGraph getAndCompileGraph() {
if (compiledGraph != null) {
return compiledGraph;
}
StateGraph graph = getGraph();
try {
if (this.compileConfig == null) {
this.compiledGraph = graph.compile();
}
else {
this.compiledGraph = graph.compile(this.compileConfig);
}
} catch (GraphStateException e) {
throw new RuntimeException(e);
}
return this.compiledGraph;
}
public StateGraph getGraph() {
if (this.graph == null) {
try {
this.graph = initGraph();
}
catch (GraphStateException e) {
throw new RuntimeException(e);
}
}
return this.graph;
}
3.1.1 图添加 LLM、Tool 节点
initGraph() 反复方法中会给状态图添加 LLM、Tool 节点
// 1. 向 StateGraph 中添加 LLM 核心节点
// 节点名称:AGENT_MODEL_NAME,将同步 llmNode 包装为异步节点
graph.addNode(AGENT_MODEL_NAME, node_async(this.llmNode));
// 2. 如果当前 Agent 包含工具,添加 Tool 执行节点
if (hasTools) {
graph.addNode(AGENT_TOOL_NAME, node_async(this.toolNode));
}
3.1.2 给钩子注入工具
为实现 ToolInjection 接口的钩子初始化并注入工具:
// 为生效的钩子组件配置工具关联
setupToolsForHooks(effectiveHooks, toolNode);
工具从 AgentToolNode 中进行匹配
/**
* 为实现ToolInjection接口的钩子初始化并注入工具
* 仅注入与钩子要求的工具名称/类型相匹配的工具
*
* @param hooks 钩子列表
* @param toolNode 承载可用工具的代理工具节点
*/
private void setupToolsForHooks(List<? extends Hook> hooks, AgentToolNode toolNode) {
// 校验参数:钩子集合或工具节点为空,直接退出
if (hooks == null || hooks.isEmpty() || toolNode == null) {
return;
}
// 获取工具节点中所有可用的工具回调
List<ToolCallback> availableTools = toolNode.getToolCallbacks();
// 无可用工具时直接退出
if (availableTools == null || availableTools.isEmpty()) {
return;
}
// 遍历所有钩子,匹配并注入工具
for (Hook hook : hooks) {
// 仅处理支持工具注入的钩子(实现ToolInjection接口)
if (hook instanceof ToolInjection toolInjection) {
// 根据钩子配置查找匹配的工具
ToolCallback toolToInject = findToolForHook(toolInjection, availableTools);
// 找到匹配工具后,执行注入逻辑
if (toolToInject != null) {
toolInjection.injectTool(toolToInject);
}
}
}
}
3.2 AgentLlmNode 执行流程
3.2.1 执行入口
执行到 AgentLlmNode 节点时,会进入其 apply 方法,核心逻辑:
- 处理对话上下文
- 构建模型请求
- 调用LLM(流式/非流式)
- 返回更新后的全局状态
/**
* 图节点执行入口方法
* @param state 全局状态(存储对话历史、输入等数据)
* @param config 运行配置(线程ID、元数据、上下文参数)
* @return 更新后的全局状态数据
* @throws Exception 执行过程中抛出的异常
*/
@Override
public Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception {
// 开启推理日志时,打印智能体开始推理的调试日志
if (enableReasoningLog && logger.isDebugEnabled()) {
logger.debug("[ThreadId {}] Agent {} start reasoning.", config.threadId()
.orElse(THREAD_ID_DEFAULT), agentName);
}
// ==================== 1. 管理模型迭代计数器(记录工具多轮调用次数) ====================
final AtomicInteger iterations;
// 上下文无迭代计数器,初始化并放入配置上下文
if (!config.context().containsKey(MODEL_ITERATION_KEY)) {
iterations = new AtomicInteger(0);
config.context().put(MODEL_ITERATION_KEY, iterations);
} else {
// 已存在计数器,迭代次数+1
iterations = (AtomicInteger) config.context().get(MODEL_ITERATION_KEY);
iterations.incrementAndGet();
}
// ==================== 2. 加载并处理对话消息上下文 ====================
List<Message> messages = new ArrayList<>();
// 全局状态无历史消息,从input获取用户输入
if (state.value("messages").isEmpty()) {
// 智能体作为图节点时,input是最常用的输入字段
if (state.value("input").isPresent()) {
messages.add(new UserMessage(state.value("input").get().toString()));
} else {
// 无指令/输入时抛出异常
throw new IllegalArgumentException("Either 'instruction' or 'includeContents' must be set for Agent.");
}
} else {
// 读取全局状态中的历史对话消息
messages = (List<Message>) state.value("messages").get();
}
// 为用户消息追加输出格式约束(outputSchema)
augmentUserMessage(messages, outputSchema);
// 渲染消息中的模板变量(替换动态参数)
renderTemplatedUserMessage(messages, state.data(), config.metadata());
// ==================== 3. 构建模型请求对象 ModelRequest ====================
// 将全局状态和配置元数据合并为上下文,供拦截器读取使用
Map<String, Object> contextMap = new HashMap<>(state.data());
Map<String, Object> metadata = config.metadata().orElse(new HashMap<>());
if (!metadata.isEmpty()) {
contextMap.putAll(metadata);
}
// 构建请求:消息、模型配置、上下文
ModelRequest.Builder requestBuilder = ModelRequest.builder()
.messages(messages)
.options(this.chatOptions != null ? this.chatOptions.copy() : null)
.context(contextMap);
// 从工具回调中提取工具名称和描述,注入模型请求
if (toolCallbacks != null && !toolCallbacks.isEmpty()) {
List<String> toolNames = new ArrayList<>();
Map<String, String> toolDescriptions = new HashMap<>();
for (ToolCallback callback : toolCallbacks) {
String name = callback.getToolDefinition().name();
String description = callback.getToolDefinition().description();
toolNames.add(name);
if (description != null && !description.isEmpty()) {
toolDescriptions.put(name, description);
}
}
requestBuilder.tools(toolNames);
requestBuilder.toolDescriptions(toolDescriptions);
}
// 注入系统提示词
if (StringUtils.hasLength(this.systemPrompt)) {
requestBuilder.systemMessage(new SystemMessage(this.systemPrompt));
}
// 构建最终的模型请求对象
ModelRequest modelRequest = requestBuilder.build();
// ==================== 4. 流式/非流式 调用分支 ====================
// 从配置中获取流式开关,默认开启流式
boolean stream = config.metadata("_stream_", new TypeRef<Boolean>(){}).orElse(true);
if (stream) {
// ==================== 流式调用模式 ====================
// 创建基础调用处理器:真正执行流式模型调用
ModelCallHandler baseHandler = request -> {
try {
// 打印系统提示词日志
if (enableReasoningLog) {
String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
if (logger.isDebugEnabled()) {
logger.debug("[ThreadId {}] Agent {} reasoning with system prompt: {}", config.threadId()
.orElse(THREAD_ID_DEFAULT), agentName, systemPrompt);
}
}
// 构建请求并执行流式调用,获取响应流
Flux<ChatResponse> chatResponseFlux = buildChatClientRequestSpec(request, config).stream().chatResponse();
// 开启日志时,打印流式输出结果
if (enableReasoningLog) {
chatResponseFlux = chatResponseFlux.doOnNext(chatResponse -> {
if (chatResponse != null && chatResponse.getResult() != null && chatResponse.getResult().getOutput() != null) {
// 区分工具调用/普通文本输出日志
if (chatResponse.getResult().getOutput().hasToolCalls()) {
logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult().getOutput().getToolCalls());
} else {
logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
config.threadId()
.orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult()
.getOutput().getText());
}
}
});
}
return ModelResponse.of(chatResponseFlux);
} catch (Exception e) {
// 捕获流式调用异常,返回错误消息
logger.error("Exception during streaming model call: ", e);
return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
}
};
// 组装模型拦截器链
ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
modelInterceptors, baseHandler);
// 执行拦截器链+模型调用
ModelResponse modelResponse = chainedHandler.call(modelRequest);
// 返回结果:自定义outputKey或默认messages
return Map.of(StringUtils.hasLength(this.outputKey) ? this.outputKey : "messages", modelResponse.getMessage());
} else {
// ==================== 非流式(同步)调用模式 ====================
// 创建基础调用处理器:真正执行同步模型调用
ModelCallHandler baseHandler = request -> {
try {
// 打印推理日志
if (enableReasoningLog) {
String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
logger.info("[ThreadId {}] Agent {} reasoning round {} with system prompt: {}.", config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), systemPrompt);
}
// 构建请求并执行同步调用
ChatResponse response = buildChatClientRequestSpec(request, config).call().chatResponse();
// 处理模型响应,空响应默认赋值
AssistantMessage responseMessage = new AssistantMessage("Empty response from model for unknown reason");
if (response != null && response.getResult() != null) {
responseMessage = response.getResult().getOutput();
}
// 打印响应结果日志
if (enableReasoningLog) {
logger.info("[ThreadId {}] Agent {} reasoning round {} returned: {}.", config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), responseMessage);
}
return ModelResponse.of(responseMessage, response);
} catch (Exception e) {
// 捕获调用异常,返回错误消息
logger.error("Exception during invoking model call: ", e);
return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
}
};
// 组装模型拦截器链
ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
modelInterceptors, baseHandler);
// 打印拦截器链启动日志
if (enableReasoningLog) {
logger.info("[ThreadId {}] Agent {} reasoning round {} model chain has started.", config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get());
}
// 执行拦截器链+模型调用
ModelResponse modelResponse = chainedHandler.call(modelRequest);
// 统计Token使用量,无响应时使用空统计对象
Usage tokenUsage = modelResponse.getChatResponse() != null ? modelResponse.getChatResponse().getMetadata()
.getUsage() : new EmptyUsage();
// 构建更新后的全局状态
Map<String, Object> updatedState = new HashMap<>();
updatedState.put("_TOKEN_USAGE_", tokenUsage);
updatedState.put("messages", modelResponse.getMessage());
// 配置自定义输出Key时,额外写入结果
if (StringUtils.hasLength(this.outputKey)) {
updatedState.put(this.outputKey, modelResponse.getMessage());
}
return updatedState;
}
}
3.2.2 推理日志 & 迭代计数器管理
记录每轮推理轮次,开启日志时打印日志:
if (enableReasoningLog && logger.isDebugEnabled()) {
logger.debug("[ThreadId {}] Agent {} start reasoning.", config.threadId()
.orElse(THREAD_ID_DEFAULT), agentName);
}
在 config.context() 维护 _MODEL_ITERATION_ 原子迭代计数器,
// Check and manage iteration counter
final AtomicInteger iterations;
if (!config.context().containsKey(MODEL_ITERATION_KEY)) {
iterations = new AtomicInteger(0);
config.context().put(MODEL_ITERATION_KEY, iterations);
} else {
iterations = (AtomicInteger) config.context().get(MODEL_ITERATION_KEY);
iterations.incrementAndGet();
}
3.2.3 会话消息获取
处理逻辑:
- 优先读取全局状态
messages对话历史列表; - 若无历史消息,则读取
input输入内容并封装为UserMessage; - 无任何有效输入时,直接抛出参数非法异常。
List<Message> messages = new ArrayList<>();
// 全局状态无历史消息,从input获取用户输入
if (state.value("messages").isEmpty()) {
// 智能体作为图节点时,input是最常用的输入字段
if (state.value("input").isPresent()) {
messages.add(new UserMessage(state.value("input").get().toString()));
} else {
// 无指令/输入时抛出异常
throw new IllegalArgumentException("Either 'instruction' or 'includeContents' must be set for Agent.");
}
} else {
// 读取全局状态中的历史对话消息
messages = (List<Message>) state.value("messages").get();
}
// 为用户消息追加输出格式约束(outputSchema)
augmentUserMessage(messages, outputSchema);
// 渲染消息中的模板变量(替换动态参数)
renderTemplatedUserMessage(messages, state.data(), config.metadata());
3.2.4 结构化输出增强
处理逻辑:
- 消息增强:在最后一条
UserMessage/AgentInstructionMessage末尾追加outputSchema(避免重复追加,已包含则跳过) - 模板渲染:遍历消息列表,找到最后一个未渲染的
AgentInstructionMessage,使用TemplateRenderer渲染模板变量,标记rendered = true
augmentUserMessage 方法:
public void augmentUserMessage(List<Message> messages, String outputSchema) {
// 无有效输出格式,直接返回
if (!StringUtils.hasText(outputSchema)) {
return;
}
// 倒序遍历消息列表,找到最后一条用户消息/指令消息
for (int i = messages.size() - 1; i >= 0; i--) {
Message message = messages.get(i);
// 处理普通用户消息
if (message instanceof UserMessage userMessage) {
// 校验:避免重复追加输出格式
if (!userMessage.getText().contains(outputSchema)) {
// 拼接Schema并替换原消息
messages.set(i, userMessage.mutate().text(userMessage.getText() + System.lineSeparator() + outputSchema).build());
}
break;
}
// 处理框架自定义指令消息
if (message instanceof AgentInstructionMessage templatedUserMessage) {
// 转义大括号,避免模板解析冲突
String newOutputSchema = outputSchema.replace("{", "\\{").replace("}", "\\}");
// 校验:避免重复追加输出格式
if (!templatedUserMessage.getText().contains(newOutputSchema)) {
messages.set(i, templatedUserMessage.mutate().text(templatedUserMessage.getText() + System.lineSeparator() + newOutputSchema).build());
}
break;
}
// 遍历到第一条仍无用户消息,直接新增一条用户消息存储Schema
if (i == 0) {
messages.add(new UserMessage(outputSchema));
}
}
}
renderTemplatedUserMessage 方法:
public void renderTemplatedUserMessage(List<Message> messages, Map<String, Object> params, Optional<Map<String, Object>> metadata) {
// 预处理参数:过滤无效参数,格式化参数类型
Map<String, Object> processedParams = new HashMap<>();
if (params != null) {
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
// 排除messages关键字,避免循环引用
if ("messages".equals(key)) {
continue;
}
// 排除集合类型参数
if (value instanceof List) {
continue;
}
// 将消息对象转换为纯文本
if (value instanceof Message message) {
processedParams.put(key, message.getText());
} else {
// 其他类型参数直接保留
processedParams.put(key, value);
}
}
}
// 倒序遍历,找到最后一条未渲染的指令消息进行模板渲染
for (int i = messages.size() - 1; i >= 0; i--) {
Message message = messages.get(i);
// 仅渲染未执行过模板替换的 AgentInstructionMessage
if (message instanceof AgentInstructionMessage instructionMessage && !instructionMessage.isRendered()) {
// 渲染模板变量,标记为已渲染,替换原消息
AgentInstructionMessage newMessage = instructionMessage.mutate()
.text(renderPromptTemplate(instructionMessage.getText(), processedParams))
.rendered(true)
.build();
messages.set(i, newMessage);
break;
}
}
}
3.2.5 构建 ModelRequest
处理逻辑:
- 整合完整消息列表、拷贝模型配置、合并全局状态与运行元数据;
- 自动提取所有
ToolCallback工具名称与描述,注入请求上下文; - 挂载系统提示词,生成完整模型调用请求。
// 将全局状态和配置元数据合并为上下文,供拦截器读取使用
Map<String, Object> contextMap = new HashMap<>(state.data());
Map<String, Object> metadata = config.metadata().orElse(new HashMap<>());
if (!metadata.isEmpty()) {
contextMap.putAll(metadata);
}
// 构建请求:消息、模型配置、上下文
ModelRequest.Builder requestBuilder = ModelRequest.builder()
.messages(messages)
.options(this.chatOptions != null ? this.chatOptions.copy() : null)
.context(contextMap);
// 从工具回调中提取工具名称和描述,注入模型请求
if (toolCallbacks != null && !toolCallbacks.isEmpty()) {
List<String> toolNames = new ArrayList<>();
Map<String, String> toolDescriptions = new HashMap<>();
for (ToolCallback callback : toolCallbacks) {
String name = callback.getToolDefinition().name();
String description = callback.getToolDefinition().description();
toolNames.add(name);
if (description != null && !description.isEmpty()) {
toolDescriptions.put(name, description);
}
}
requestBuilder.tools(toolNames);
requestBuilder.toolDescriptions(toolDescriptions);
}
// 注入系统提示词
if (StringUtils.hasLength(this.systemPrompt)) {
requestBuilder.systemMessage(new SystemMessage(this.systemPrompt));
}
// 构建最终的模型请求对象
ModelRequest modelRequest = requestBuilder.build();
3.2.6 判断流式/非流式模式
默认为 true :
boolean stream = config.metadata("_stream_", new TypeRef<Boolean>(){}).orElse(true);
3.2.7 流式执行
流式执行链路:
- 构建底层流式模型调用处理器;
- 通过
InterceptorChain串联所有ModelInterceptor拦截器; - 执行拦截器链并发起流式 LLM 调用;
- 最终返回以
outputKey或默认messages为Key的流式消息结果。
// ==================== 流式调用分支 ====================
if (stream) {
// 创建基础模型调用处理器:真正执行流式大模型调用的核心逻辑
ModelCallHandler baseHandler = request -> {
try {
// 开启推理日志时,打印系统提示词调试信息
if (enableReasoningLog) {
String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
if (logger.isDebugEnabled()) {
logger.debug("[ThreadId {}] Agent {} reasoning with system prompt: {}", config.threadId()
.orElse(THREAD_ID_DEFAULT), agentName, systemPrompt);
}
}
// 构建模型请求并发起流式调用,获取响应流(Flux)
Flux<ChatResponse> chatResponseFlux = buildChatClientRequestSpec(request, config).stream().chatResponse();
// 开启推理日志时,监听流式响应,实时打印输出内容
if (enableReasoningLog) {
chatResponseFlux = chatResponseFlux.doOnNext(chatResponse -> {
// 校验响应非空,避免空指针
if (chatResponse != null && chatResponse.getResult() != null && chatResponse.getResult().getOutput() != null) {
// 区分输出类型:工具调用 / 普通文本
if (chatResponse.getResult().getOutput().hasToolCalls()) {
// 打印工具调用日志
logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult().getOutput().getToolCalls());
} else {
// 打印文本输出日志
logger.info("[ThreadId {}] Agent {} reasoning round {} streaming output: {}",
config.threadId()
.orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), chatResponse.getResult()
.getOutput().getText());
}
}
});
}
// 封装流式响应并返回
return ModelResponse.of(chatResponseFlux);
} catch (Exception e) {
// 捕获流式调用异常,打印错误日志并返回异常消息
logger.error("Exception during streaming model call: ", e);
return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
}
};
// 构建拦截器链:将所有模型拦截器与基础调用处理器串联
ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
modelInterceptors, baseHandler);
// 执行拦截器链 + 模型调用,获取最终响应
ModelResponse modelResponse = chainedHandler.call(modelRequest);
// 返回更新后的状态:使用自定义outputKey,无则使用默认messages
return Map.of(StringUtils.hasLength(this.outputKey) ? this.outputKey : "messages", modelResponse.getMessage());
}
3.2.8 非流式执行
非流式执行链路:
- 构建同步阻塞式模型调用处理器;
- 串联模型拦截器,完整执行前置、后置扩展逻辑;
- 同步获取
ChatResponse完整响应,统计Token消耗; - 组装更新全局状态:令牌用量、模型回复、自定义输出字段,最终返回状态集合。
{
// ==================== 非流式(同步)调用分支 ====================
// 创建基础模型调用处理器:真正执行同步大模型调用的核心逻辑
ModelCallHandler baseHandler = request -> {
try {
// 开启推理日志时,打印当前轮次的系统提示词信息
if (enableReasoningLog) {
String systemPrompt = request.getSystemMessage() != null ? request.getSystemMessage().getText() : "";
logger.info("[ThreadId {}] Agent {} reasoning round {} with system prompt: {}.",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), systemPrompt);
}
// 构建模型请求并发起同步调用,获取完整响应结果
ChatResponse response = buildChatClientRequestSpec(request, config).call().chatResponse();
// 初始化默认响应消息,处理模型无返回的异常情况
AssistantMessage responseMessage = new AssistantMessage("Empty response from model for unknown reason");
// 校验响应非空,获取模型真正的输出消息
if (response != null && response.getResult() != null) {
responseMessage = response.getResult().getOutput();
}
// 开启推理日志时,打印模型返回的结果信息
if (enableReasoningLog) {
logger.info("[ThreadId {}] Agent {} reasoning round {} returned: {}.",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get(), responseMessage);
}
// 封装模型响应消息和原始响应对象并返回
return ModelResponse.of(responseMessage, response);
} catch (Exception e) {
// 捕获同步调用异常,打印错误日志并返回异常提示消息
logger.error("Exception during invoking model call: ", e);
return ModelResponse.of(new AssistantMessage("Exception: " + e.getMessage()));
}
};
// 构建拦截器链:将所有注册的模型拦截器与基础调用处理器串联执行
ModelCallHandler chainedHandler = InterceptorChain.chainModelInterceptors(
modelInterceptors, baseHandler);
// 开启推理日志时,打印模型调用拦截器链启动日志
if (enableReasoningLog) {
logger.info("[ThreadId {}] Agent {} reasoning round {} model chain has started.",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, iterations.get());
}
// 执行完整的拦截器链 + 模型调用,获取最终处理后的模型响应
ModelResponse modelResponse = chainedHandler.call(modelRequest);
// 统计Token使用量:有响应则获取真实用量,无响应则使用空统计对象
Usage tokenUsage = modelResponse.getChatResponse() != null ?
modelResponse.getChatResponse().getMetadata().getUsage() : new EmptyUsage();
// 构建更新后的全局状态集合
Map<String, Object> updatedState = new HashMap<>();
// 存入Token消耗统计数据
updatedState.put("_TOKEN_USAGE_", tokenUsage);
// 存入模型返回的助手消息,更新对话历史
updatedState.put("messages", modelResponse.getMessage());
// 配置了自定义输出键时,额外存入结果数据
if (StringUtils.hasLength(this.outputKey)) {
updatedState.put(this.outputKey, modelResponse.getMessage());
}
// 返回更新后的全局状态,驱动图工作流继续执行
return updatedState;
}
其中 buildChatClientRequestSpec 是实际构建模型调用规格的关键方法:
-
执行
appendSystemPromptIfNeeded处理消息- 将系统提示消息插入到消息列表的开头
- 检测到多个系统提示消息时自动发出警告
-
执行
filterToolCallbacks过滤工具回调- 优先从模型请求的配置中获取工具回调列表
- 按照模型请求的工具名称列表完成工具过滤
- 支持拦截器对可用工具集进行动态修改
-
处理动态工具回调
- 将动态工具回调追加到过滤后的工具列表中
- 把动态工具回调存入配置上下文,供
AgentToolNode解析使用
-
构建
ChatClient最终请求规范- 初始化基础请求,绑定消息列表与顾问拦截器
- 存在自定义配置时:拷贝配置对象,设置过滤后的工具列表,强制禁用模型内部工具执行,绑定配置到请求
- 无自定义配置时:检查
ChatClient默认配置,无默认则创建新配置并绑定工具+禁用内部执行;有默认配置则更新工具并禁用内部执行;其他情况直接绑定工具列表
/**
* 构建 ChatClient 最终请求规范
* 整合消息、工具、模型配置,统一处理工具执行权限,是LLM调用的前置核心逻辑
* @param modelRequest 模型请求对象(封装消息、工具、配置)
* @param config 运行时配置
* @return ChatClient 可执行的请求规范
*/
private ChatClient.ChatClientRequestSpec buildChatClientRequestSpec(ModelRequest modelRequest, RunnableConfig config) {
// 1. 拼接系统提示词到消息列表(按需追加)
List<Message> messages = appendSystemPromptIfNeeded(modelRequest);
// 注意!如果ModelRequest中同时自定义了工具(ToolSelectionInterceptor)和配置选项,工具配置会覆盖选项中的工具调用设置
// 2. 过滤出当前请求可用的工具回调列表
List<ToolCallback> filteredToolCallbacks = filterToolCallbacks(modelRequest);
// 3. 处理动态工具回调:追加动态工具,并存入配置上下文(供工具节点使用)
if (!CollectionUtils.isEmpty(modelRequest.getDynamicToolCallbacks())) {
filteredToolCallbacks.addAll(modelRequest.getDynamicToolCallbacks());
// FIXME:使用 RunnableConfig 通过配置上下文将动态工具回调传递给工具节点(内部使用)
config.context().put(RunnableConfig.DYNAMIC_TOOL_CALLBACKS_METADATA_KEY, modelRequest.getDynamicToolCallbacks());
}
// 4. 构建基础请求:绑定消息列表 + 拦截顾问
var promptSpec = this.chatClient.prompt()
.messages(messages)
.advisors(this.advisors);
// 5. 获取模型请求中的配置
ToolCallingChatOptions requestOptions = modelRequest.getOptions();
// 6. 存在自定义配置:拷贝配置、绑定过滤后的工具、强制禁用内部工具执行
if (requestOptions != null) {
ToolCallingChatOptions copiedOptions = requestOptions.copy();
// 设置过滤后的工具列表
copiedOptions.setToolCallbacks(filteredToolCallbacks);
// 强制禁用模型内部工具执行,避免与Agent框架的工具管理逻辑冲突
copiedOptions.setInternalToolExecutionEnabled(false);
// 绑定配置到请求
promptSpec.options(copiedOptions);
}
// 7. 无自定义配置:兜底处理,保证工具配置和执行权限正确
else {
// 检查是否为默认请求规范,获取底层默认配置
if (promptSpec instanceof DefaultChatClient.DefaultChatClientRequestSpec defaultChatClientRequestSpec) {
ChatOptions options = defaultChatClientRequestSpec.getChatOptions();
// 场景1:无任何默认配置 → 创建新配置,绑定工具+禁用内部执行
if (options == null) {
options = ToolCallingChatOptions.builder()
.toolCallbacks(filteredToolCallbacks)
.internalToolExecutionEnabled(false)
.build();
defaultChatClientRequestSpec.options(options);
}
// 场景2:默认配置是工具调用配置 → 拷贝配置、更新工具、禁用内部执行
else if (options instanceof ToolCallingChatOptions toolCallingChatOptions) {
ToolCallingChatOptions copiedOptions = toolCallingChatOptions.copy();
copiedOptions.setToolCallbacks(filteredToolCallbacks);
copiedOptions.setInternalToolExecutionEnabled(false);
defaultChatClientRequestSpec.options(copiedOptions);
}
}
// 场景3:非默认请求规范 + 存在工具 → 直接绑定工具列表
else if (!filteredToolCallbacks.isEmpty()) {
promptSpec.tools(filteredToolCallbacks);
}
}
// 8. 返回最终构建完成的请求规范
return promptSpec;
}
filterToolCallbacks :
/**
* 根据 ModelRequest 中指定的工具,过滤工具回调列表
* @param modelRequest 包含待过滤工具名称列表的模型请求对象
* @return 匹配请求工具的过滤后工具回调列表
*/
private List<ToolCallback> filterToolCallbacks(ModelRequest modelRequest) {
// 初始化最终返回的工具回调集合
List<ToolCallback> toolCallbacks = new ArrayList<>();
// 模型请求对象为空,直接使用当前节点默认的全部工具回调
if (modelRequest == null) {
toolCallbacks.addAll(this.toolCallbacks);
return toolCallbacks;
}
// 优先使用:模型请求配置中携带的工具回调列表
if (modelRequest.getOptions() != null && modelRequest.getOptions().getToolCallbacks() != null) {
toolCallbacks.addAll(modelRequest.getOptions().getToolCallbacks());
} else {
// 无配置时不执行操作
// 默认情况下 buildChatOptions() 会确保工具回调被初始化
// 支持用户通过设置空列表来禁用所有工具
}
// 获取请求中指定的工具名称集合
List<String> requestedTools = modelRequest.getTools();
// 未指定具体工具,直接返回当前工具列表
if (requestedTools == null || requestedTools.isEmpty()) {
return toolCallbacks;
}
// 流式过滤:仅保留名称匹配的工具回调,封装为新列表返回
return new ArrayList<>(toolCallbacks.stream()
.filter(callback -> requestedTools.contains(callback.getToolDefinition().name()))
.toList());
}
关键设计:
filterToolCallbacks()允许ModelInterceptor通过修改ModelRequest.tools和ModelRequest.options来动态控制哪些工具对当前模型调用可见。这实现了工具选择(Tool Selection)能力。
3.3 AgentToolNode 执行流程
3.3.1 执行入口
如果大模型返回工具调用,会路由执行到 AgentToolNode 节点,进入其 apply 方法,核心逻辑:
- 获取消息上下文:从全局状态中读取对话消息,定位最后一条关键消息
- 消息类型判断
- 助手消息:提取工具调用列表,根据配置选择并行 / 串行执行工具
- 工具响应消息:处理未完成的部分工具响应
- 其他类型:直接抛出非法状态异常
- 执行模式分发:支持多工具并行执行提升效率,单工具默认串行执行
- 日志记录:开启日志时打印工具执行的基础信息,便于调试追踪
/**
* 工具节点核心执行方法:解析助手消息中的工具调用指令,分发执行工具
* @param state 全局状态(包含对话消息、上下文数据)
* @param config 运行配置(线程ID、元数据等)
* @return 工具执行完成后的更新状态
* @throws Exception 工具执行/状态处理异常
*/
@Override
public Map<String, Object> apply(OverAllState state, RunnableConfig config) throws Exception {
// 从全局状态中获取对话消息列表,无消息时直接抛出异常
List<Message> messages = (List<Message>) state.value("messages").orElseThrow();
// 获取消息列表中的最后一条消息(工具调用指令一定在最后一条助手消息中)
Message lastMessage = messages.get(messages.size() - 1);
// 场景1:最后一条消息是【助手消息】(包含工具调用请求)
if (lastMessage instanceof AssistantMessage assistantMessage) {
// 从助手消息中提取工具调用列表
List<AssistantMessage.ToolCall> toolCalls = assistantMessage.getToolCalls();
// 开启日志时,打印工具执行信息(线程ID、智能体名称、工具数量)
if (enableActingLog) {
logger.info("[ThreadId {}] Agent {} acting with {} tools.", config.threadId().orElse(THREAD_ID_DEFAULT),
agentName, toolCalls.size());
}
// 根据配置选择执行模式:
// 开启并行执行 + 工具数量>1 → 并行执行工具
if (parallelToolExecution && toolCalls.size() > 1) {
return executeToolCallsParallel(toolCalls, state, config);
}
// 否则 → 串行执行工具
else {
return executeToolCallsSequential(toolCalls, state, config);
}
}
// 场景2:最后一条消息是【工具响应消息】(处理部分工具响应)
else if (lastMessage instanceof ToolResponseMessage toolResponseMessage) {
return handlePartialToolResponses(toolResponseMessage, messages, state, config);
}
// 异常场景:最后一条消息类型不合法,无法执行工具
else {
throw new IllegalStateException("Last message is neither an AssistantMessage nor a ToolResponseMessage");
}
}
3.3.2 获取消息上下文
从全局状态中获取对话消息列表,无消息时直接抛出异常。获取消息列表中的最后一条消息(工具调用指令一定在最后一条助手消息中):
List<Message> messages = (List<Message>) state.value("messages").orElseThrow();
Message lastMessage = messages.get(messages.size() - 1);
3.3.3 串行执行 or 并行执行
满足以下两个条件时,进入并行执行,否则进行串行执行:
- 配置开启了
parallelToolExecution(并行工具执行开关) - 工具调用数量 >
1(多个工具才需要并行)
// 根据配置选择执行模式:
// 开启并行执行 + 工具数量>1 → 并行执行工具
if (parallelToolExecution && toolCalls.size() > 1) {
return executeToolCallsParallel(toolCalls, state, config);
}
// 否则 → 串行执行工具
else {
return executeToolCallsSequential(toolCalls, state, config);
}
并行 vs 串行状态合并差异:
| 维度 | 串行模式 | 并行模式 |
|---|---|---|
| 合并方式 | Map.putAll() — last-write-wins |
ToolStateCollector.mergeAll() — 按 KeyStrategy |
| 顺序确定性 | 确定(按 toolCalls 顺序) | 确定索引顺序,但并行执行顺序不确定 |
| APPEND 策略支持 | 不支持(被 putAll 覆盖) | 支持(按索引顺序追加) |
| 状态隔离 | ConcurrentHashMap + 立即合并 |
ToolStateCollector + 延迟合并 |
3.3.4 串行执行
在串行执行 executeToolCallsSequential 方法中,会循环调用所有工具:
for (AssistantMessage.ToolCall toolCall : toolCalls) {
// Each tool gets its own isolated update map
// If this tool times out, clear() only affects this map, not mergedUpdates
Map<String, Object> toolSpecificUpdate = new ConcurrentHashMap<>();
ToolCallResponse response = executeToolCallWithInterceptors(toolCall, state, config, toolSpecificUpdate,
false);
toolResponses.add(response.toToolResponse());
returnDirect = shouldReturnDirect(toolCall, returnDirect, config);
// Merge immediately - subsequent timeout clear() won't affect already-merged data
mergedUpdates.putAll(toolSpecificUpdate);
}
注意:串行、并行执行,最后都是调用 executeToolCallWithInterceptors 方法进行执行。
3.3.5 并行执行
executeToolCallsParallel 多工具调用并行执行方法(支持并发限流、超时、优雅取消、线程安全状态管理),包含了所有并行执行逻辑:
- 将所有异步任务提交到线程池
- 等待所有并行工具执行完成
- 将所有结果写入到
OverAllState中
注意事项:
- 通过
state.snapShot()创建的状态快照为浅拷贝,状态中的可变对象(如List/Map)是共享的。 - 工具禁止直接修改从状态中获取的集合,必须通过
extraStateFromToolCall写入更新。 - 并行状态合并语义:
-
并行模式使用
ToolStateCollector,严格遵循每个状态Key配置的KeyStrategy合并策略:- APPEND 策略:所有工具的结果会追加合并
- REPLACE 策略:最后写入的结果覆盖(并行顺序不保证,非确定性)
-
与串行模式区别:串行使用
Map.putAll()简单覆盖,并行支持高级合并策略,保证并发安全。
-
处理逻辑:
/**
* 【核心底层方法】多工具调用并行执行(支持并发限流、超时、优雅取消、线程安全状态管理)
* @param toolCalls LLM 下发的工具调用列表
* @param state Agent 全局状态
* @param config 运行时配置
* @return 所有工具执行后的合并状态
*/
private Map<String, Object> executeToolCallsParallel(List<AssistantMessage.ToolCall> toolCalls, OverAllState state,
RunnableConfig config) {
// ===================== 1. 并行模式兼容提示 =====================
// 并行模式下:忽略「同步转异步」配置,避免线程池饥饿/死锁(核心安全设计)
if (wrapSyncToolsAsAsync && logger.isDebugEnabled()) {
logger.debug("并行执行模式:自动忽略 wrapSyncToolsAsAsync,同步工具直接在并行线程中执行,防止线程池耗尽");
}
// 获取工具执行线程池(来自 ParallelNode 全局线程池)
Executor executor = getToolExecutor(config);
// ===================== 2. 线程安全:创建状态快照 =====================
// 浅拷贝状态,避免并行工具修改原始状态导致线程不安全
OverAllState stateSnapshot = state.snapShot().orElse(state);
// 工具状态收集器:支持按 KeyStrategy 并发安全合并工具结果
ToolStateCollector stateCollector = new ToolStateCollector(toolCalls.size(), state.keyStrategies());
// ===================== 3. 并发安全数据结构 =====================
// 原子引用数组:CAS 无锁更新工具执行结果,避免线程竞争
AtomicReferenceArray<ToolCallResponse> orderedResponses = new AtomicReferenceArray<>(toolCalls.size());
// 同步集合:收集执行异常
List<Throwable> failures = java.util.Collections.synchronizedList(new ArrayList<>());
// 并发Map:存储每个工具的取消令牌,支持超时/手动取消可取消异步工具
Map<Integer, DefaultCancellationToken> cancellationTokens = new ConcurrentHashMap<>();
// ===================== 4. 并发限流:信号量控制最大并行工具数 =====================
// 限制同时执行的工具数量,防止服务器压力过大
Semaphore semaphore = new Semaphore(maxParallelTools);
// ===================== 5. 构建并行任务流 =====================
List<CompletableFuture<Void>> futures = IntStream.range(0, toolCalls.size()).mapToObj(index -> {
// 获取当前工具
AssistantMessage.ToolCall toolCall = toolCalls.get(index);
// 为当前工具创建独立的状态更新Map
Map<String, Object> toolSpecificUpdate = stateCollector.createToolUpdateMap(index);
// 提交异步任务到线程池
return CompletableFuture.runAsync(() -> {
try {
// 获取信号量许可,限流并发
semaphore.acquire();
try {
// ===================== 6. 执行工具(带拦截器、状态注入、取消令牌) =====================
// 内部调用 executeToolByType → 执行同步/异步/可取消工具
ToolCallResponse response = executeToolCallWithInterceptors(
toolCall, stateSnapshot, config,
toolSpecificUpdate, true, cancellationTokens, index
);
// CAS 原子更新结果:仅当结果为空时写入(避免超时覆盖正常结果)
orderedResponses.compareAndSet(index, null, response);
} finally {
// 释放信号量
semaphore.release();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
failures.add(e);
// 中断异常:写入错误结果
orderedResponses.compareAndSet(index, null,
ToolCallResponse.error(toolCall.id(), toolCall.name(), "工具执行被中断"));
}
}, executor)
// 全局超时控制
.orTimeout(toolExecutionTimeout.toMillis(), TimeUnit.MILLISECONDS)
// 异常/超时处理
.exceptionally(ex -> {
Throwable cause = ex instanceof CompletionException ? ex.getCause() : ex;
ToolCallResponse errorResponse = ToolCallResponse.error(toolCall.id(), toolCall.name(),
extractErrorMessage(cause));
// CAS 原子写入错误结果
if (orderedResponses.compareAndSet(index, null, errorResponse)) {
// 超时:丢弃脏状态 + 取消工具执行
if (cause instanceof TimeoutException) {
// 丢弃超时工具的状态更新,避免脏数据
stateCollector.discardToolUpdateMap(index);
// 触发协作式取消:通知可取消工具优雅停止
DefaultCancellationToken token = cancellationTokens.get(index);
if (token != null) {
token.cancel();
logger.debug("工具[{}]索引[{}]超时,已发送取消信号", toolCall.name(), index);
}
}
failures.add(ex);
}
return null;
});
}).toList();
// ===================== 7. 等待所有并行工具执行完成 =====================
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// ===================== 8. 组装执行结果 =====================
Map<String, Object> updatedState = new HashMap<>();
List<ToolResponseMessage.ToolResponse> toolResponses = new ArrayList<>();
Boolean returnDirect = null;
// 遍历原子数组,收集所有工具结果
for (int i = 0; i < orderedResponses.length(); i++) {
ToolCallResponse response = orderedResponses.get(i);
// 兜底:空结果自动构造错误响应
if (response == null) {
AssistantMessage.ToolCall toolCall = toolCalls.get(i);
response = ToolCallResponse.error(toolCall.id(), toolCall.name(), "工具未返回有效结果");
logger.warn("工具[{}]索引[{}]结果为空,使用兜底错误", toolCall.name(), i);
}
toolResponses.add(response.toToolResponse());
returnDirect = shouldReturnDirect(toolCalls.get(i), returnDirect, config);
}
// 构建消息体,写入Agent状态
ToolResponseMessage.Builder builder = ToolResponseMessage.builder().responses(toolResponses);
if (returnDirect != null && returnDirect) {
builder.metadata(Map.of(FINISH_REASON_METADATA_KEY, FINISH_REASON));
}
updatedState.put("messages", builder.build());
// 合并所有工具的状态更新(按 KeyStrategy 策略)
updatedState.putAll(stateCollector.mergeAll());
// 执行日志
if (enableActingLog) {
logger.info("[ThreadId {}] Agent {} 并行工具执行完成,总工具数:{},失败数:{}",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, toolCalls.size(), failures.size());
}
return updatedState;
}
3.3.6 使用拦截器执行
执行带拦截器链的工具调用,支持同步/异步两种工具回调:
/**
* 执行带拦截器链的工具调用,支持同步/异步两种工具回调
* @param toolCall 待执行的工具调用对象
* @param state 当前全局状态
* @param config 运行时配置
* @param extraStateFromToolCall 用于收集工具执行产生的状态更新的Map
* @param inParallelExecution 是否处于并行执行上下文:true=并行,false=串行
* @return 工具调用响应结果
*/
private ToolCallResponse executeToolCallWithInterceptors(AssistantMessage.ToolCall toolCall, OverAllState state,
RunnableConfig config, Map<String, Object> extraStateFromToolCall, boolean inParallelExecution) {
// 重载方法:调用完整参数版本,取消令牌传null,工具索引传-1(表示无索引)
return executeToolCallWithInterceptors(toolCall, state, config, extraStateFromToolCall, inParallelExecution,
null, -1);
}
executeToolCallWithInterceptors 核心逻辑:
- 构建工具调用请求对象,封装调用参数、上下文、运行环境
- 定义基础执行处理器:真正执行工具调用的核心逻辑
- 组装拦截器链:将所有工具拦截器与基础处理器串联
- 执行拦截器链 + 工具调用,返回最终响应
/**
* 执行带拦截器链的工具调用,并行执行时支持可选的取消令牌追踪
* @param toolCall 待执行的工具调用对象
* @param state 当前全局状态
* @param config 运行时配置
* @param extraStateFromToolCall 用于收集工具执行产生的状态更新的Map
* @param inParallelExecution 是否处于并行执行上下文:true=并行,false=串行
* @param cancellationTokens 并行执行专用的取消令牌Map(可为null)
* @param toolIndex 并行执行中当前工具的索引(作为取消令牌Map的key)
* @return 工具调用响应结果
*/
private ToolCallResponse executeToolCallWithInterceptors(AssistantMessage.ToolCall toolCall, OverAllState state,
RunnableConfig config, Map<String, Object> extraStateFromToolCall, boolean inParallelExecution,
Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {
// 1. 构建工具调用请求对象,封装调用参数、上下文、运行环境
ToolCallRequest request = ToolCallRequest.builder()
.toolCall(toolCall)
.context(config.metadata().orElse(new HashMap<>()))
.executionContext(new ToolCallExecutionContext(config, state))
.build();
// 2. 定义基础执行处理器:真正执行工具调用的核心逻辑
ToolCallHandler baseHandler = req -> {
// 根据工具名称匹配对应的工具回调器
ToolCallback toolCallback = resolve(req.getToolName(), config);
// 未找到对应工具:打印警告并抛出异常
if (toolCallback == null) {
logger.warn(POSSIBLE_LLM_TOOL_NAME_CHANGE_WARNING, req.getToolName());
throw new IllegalStateException("未找到对应名称的工具回调器:" + req.getToolName());
}
// 开启执行日志时:打印线程、代理、执行工具信息
if (enableActingLog) {
logger.info("[线程ID {}] 代理 {} 执行操作,调用工具 {}",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, req.getToolName());
}
// 构造工具执行上下文:合并默认上下文 + 请求上下文
Map<String, Object> toolContextMap = new HashMap<>(toolContext);
toolContextMap.putAll(req.getContext());
// 对需要状态注入的工具类型,注入全局状态、配置、可更新状态
// 支持类型:状态感知工具/函数工具/方法工具
if (toolCallback instanceof StateAwareToolCallback || toolCallback instanceof FunctionToolCallback<?, ?>
|| toolCallback instanceof MethodToolCallback) {
toolContextMap.putAll(Map.of(
AGENT_STATE_CONTEXT_KEY, state, // 代理状态
AGENT_CONFIG_CONTEXT_KEY, config, // 代理配置
AGENT_STATE_FOR_UPDATE_CONTEXT_KEY, extraStateFromToolCall // 可更新状态
));
}
// 根据回调器类型(同步/异步)路由执行逻辑
return executeToolByType(toolCallback, req, toolContextMap, config, extraStateFromToolCall,
inParallelExecution, cancellationTokens, toolIndex);
};
// 3. 组装拦截器链:将所有工具拦截器与基础处理器串联
ToolCallHandler chainedHandler = InterceptorChain.chainToolInterceptors(toolInterceptors, baseHandler);
// 4. 执行拦截器链 + 工具调用,返回最终响应
return chainedHandler.call(request);
}
根据工具名称查找对应的工具回调实现(三级查找策略)
- 优先从本地注册的
toolCallbacks中匹配 - 其次从运行配置元数据中获取动态注册的工具
- 最后通过全局工具回调解析器兜底查找
/**
* 根据工具名称查找对应的工具回调实现(三级查找策略)
* 1. 优先从本地注册的 toolCallbacks 中匹配
* 2. 其次从运行配置元数据中获取动态注册的工具
* 3. 最后通过全局工具回调解析器兜底查找
* @param toolName 工具名称
* @param config 运行时配置
* @return 匹配到的 ToolCallback,未找到则返回 null
*/
private ToolCallback resolve(String toolName, RunnableConfig config) {
// 1. 从本地注册的工具回调列表中精确匹配名称
if (toolCallbacks != null) {
var fromNode = toolCallbacks.stream()
.filter(callback -> callback.getToolDefinition().name().equals(toolName))
.findFirst();
if (fromNode.isPresent()) {
return fromNode.get();
}
}
// 2. 从配置上下文获取动态注册的工具(AgentLlmNode/拦截器注入)
ToolCallback fromDynamic = resolveFromConfigMetadata(toolName, config);
if (fromDynamic != null) {
return fromDynamic;
}
// 3. 全局解析器兜底查找
return toolCallbackResolver == null ? null : toolCallbackResolver.resolve(toolName);
}
/**
* 从运行配置的上下文元数据中,查找动态注册的工具回调
* @param toolName 工具名称
* @param config 运行时配置
* @return 匹配的动态工具回调,未找到返回 null
*/
private ToolCallback resolveFromConfigMetadata(String toolName, RunnableConfig config) {
return Optional.ofNullable(config.context().get(RunnableConfig.DYNAMIC_TOOL_CALLBACKS_METADATA_KEY))
.filter(v -> v instanceof List)
.map(v -> (List<ToolCallback>) v)
.flatMap(list -> list.stream()
.filter(tc -> tc != null && toolName.equals(tc.getToolDefinition().name()))
.findFirst())
.orElse(null);
}
3.3.7 根据工具回调类型路由执行
根据工具回调类型路由执行逻辑,并行执行时支持可选的取消令牌追踪:
- 如果是异步工具回调,直接执行异步逻辑
- 开启同步转异步 + 串行执行模式:将同步工具包装为异步执行
- 其他场景:直接执行同步工具
/**
* 根据工具回调类型路由执行逻辑,并行执行时支持可选的取消令牌追踪
* @param toolCallback 待执行的工具回调实例
* @param request 工具调用请求对象
* @param toolContextMap 工具执行上下文
* @param config 运行时配置
* @param extraStateFromToolCall 用于收集状态更新的Map
* @param inParallelExecution 是否处于并行执行上下文
* @param cancellationTokens 并行执行专用的取消令牌Map(可为null)
* @param toolIndex 并行执行中当前工具的索引
* @return 工具调用响应结果
*/
private ToolCallResponse executeToolByType(ToolCallback toolCallback, ToolCallRequest request,
Map<String, Object> toolContextMap, RunnableConfig config, Map<String, Object> extraStateFromToolCall,
boolean inParallelExecution, Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {
// 1. 如果是异步工具回调,直接执行异步逻辑
if (toolCallback instanceof AsyncToolCallback async) {
return executeAsyncTool(async, request, toolContextMap, config, extraStateFromToolCall, cancellationTokens,
toolIndex);
}
// 2. 开启同步转异步 + 串行执行模式:将同步工具包装为异步执行
// 注意:仅在串行模式下包装,并行模式外层已有并发,包装会导致线程池耗尽/死锁
else if (wrapSyncToolsAsAsync && !inParallelExecution) {
// 获取工具执行线程池
Executor executor = getToolExecutor(config);
// 将同步工具适配包装为异步工具
AsyncToolCallback wrappedAsync = AsyncToolCallbackAdapter.wrapIfNeeded(toolCallback, executor,
toolExecutionTimeout);
// 执行包装后的异步工具
return executeAsyncTool(wrappedAsync, request, toolContextMap, config, extraStateFromToolCall,
cancellationTokens, toolIndex);
}
// 3. 其他场景:直接执行同步工具
else {
return executeSyncTool(toolCallback, request, toolContextMap, config);
}
}
三种路由:
executeToolByType(toolCallback, request, toolContextMap, ..., inParallelExecution)
│
├─ toolCallback instanceof AsyncToolCallback?
│ └─ Yes → executeAsyncTool(async, ...)
│
├─ wrapSyncToolsAsAsync && !inParallelExecution?
│ ├─ Yes → AsyncToolCallbackAdapter.wrapIfNeeded(toolCallback, executor, timeout)
│ │ → executeAsyncTool(wrappedAsync, ...)
│ └─ No(并行模式下不包装,避免线程池饥饿/死锁)
│
└─ 默认 → executeSyncTool(toolCallback, ...)
当 wrapSyncToolsAsAsync=true 且不在并行执行模式中时,同步工具通过 AsyncToolCallbackAdapter 包装为异步执行:
AsyncToolCallbackAdapter.wrapIfNeeded(syncCallback, executor, timeout)
│
├─ 已经是 AsyncToolCallback → 直接返回
└─ 同步工具 → 包装为 AsyncToolCallback
callAsync(arguments, context):
CompletableFuture.supplyAsync(() -> delegate.call(arguments, context), executor)
为什么并行模式下禁用? 并行模式已经通过外层
runAsync提供了并发。如果在并行模式中再包装同步工具为异步,外层任务占用线程等待内层异步任务,而内层异步任务又需要同一线程池 → 线程池饥饿/死锁。
3.3.8 执行异步工具
执行异步工具,内置超时处理,支持外部取消令牌追踪。
为 CancellableAsyncToolCallback 提供协作式取消支持:当工具实现该接口时,会创建真实的取消令牌并传递给工具;超时发生时,会主动取消令牌,让工具优雅终止。
并行执行场景下:取消令牌会存入传入的 cancellationTokens 集合,便于外层超时处理器统一触发取消操作。
/**
* 执行异步工具,内置超时处理,支持外部取消令牌追踪
* @param callback 待执行的异步工具回调
* @param request 工具调用请求
* @param toolContextMap 工具执行上下文
* @param config 运行时配置
* @param extraStateFromToolCall 收集工具执行产生的状态更新
* @param cancellationTokens 并行执行专用的取消令牌Map(可为null)
* @param toolIndex 并行执行中当前工具的索引(作为Map的key)
* @return 工具调用响应结果
*/
private ToolCallResponse executeAsyncTool(AsyncToolCallback callback, ToolCallRequest request,
Map<String, Object> toolContextMap, RunnableConfig config, Map<String, Object> extraStateFromToolCall,
Map<Integer, DefaultCancellationToken> cancellationTokens, int toolIndex) {
// 构建工具上下文对象
ToolContext context = new ToolContext(toolContextMap);
// 声明取消令牌(仅可取消异步工具会使用)
DefaultCancellationToken cancellationToken = null;
try {
CompletableFuture<String> future;
// 根据回调类型分发执行逻辑:可取消工具使用真实令牌
if (callback instanceof CancellableAsyncToolCallback cancellable) {
// 创建取消令牌
cancellationToken = new DefaultCancellationToken();
// 并行执行时:将令牌存入外部Map,供外层统一管理取消
if (cancellationTokens != null && toolIndex >= 0) {
cancellationTokens.put(toolIndex, cancellationToken);
}
// 调用可取消的异步方法,传入令牌
future = cancellable.callAsync(request.getArguments(), context, cancellationToken);
}
else {
// 普通异步工具:直接执行
future = callback.callAsync(request.getArguments(), context);
}
// 防御性判断:异步返回null
if (future == null) {
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(),
"异步工具返回了空的Future");
}
// 为异步任务设置超时时间,阻塞等待执行结果
String result = future.orTimeout(callback.getTimeout().toMillis(), TimeUnit.MILLISECONDS).join();
// 执行成功日志
if (enableActingLog) {
logger.info("[线程ID {}] 代理 {} 执行完成,异步工具 {} 已结束",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, request.getToolName());
if (logger.isDebugEnabled()) {
logger.debug("工具 {} 返回结果:{}", request.getToolName(), result);
}
}
// 封装并返回成功响应
return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
}
catch (CompletionException e) {
Throwable cause = e.getCause() != null ? e.getCause() : e;
// 1. 超时异常:清空状态更新,取消令牌,返回错误
if (cause instanceof TimeoutException) {
if (cancellationToken != null) {
cancellationToken.cancel();
}
// 超时后清空脏状态,避免污染合并结果
extraStateFromToolCall.clear();
logger.warn("异步工具 {} 执行超时,已丢弃所有状态更新", request.getToolName());
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
}
// 2. 工具执行异常:使用异常处理器生成友好结果
else if (cause instanceof ToolExecutionException toolExecutionException) {
logger.error("异步工具 {} 执行失败,异常处理器:{}", request.getToolName(),
toolExecutionExceptionProcessor.getClass().getName(), toolExecutionException);
String result = toolExecutionExceptionProcessor.process(toolExecutionException);
return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
}
// 3. 其他执行异常
else {
logger.error("异步工具 {} 执行失败:{}", request.getToolName(), cause.getMessage(), cause);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(cause));
}
}
// 任务被主动取消
catch (CancellationException e) {
logger.warn("异步工具 {} 执行被取消", request.getToolName(), e);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
}
// 兜底异常捕获
catch (Exception e) {
logger.error("异步工具 {} 执行失败:{}", request.getToolName(), e.getMessage(), e);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), extractErrorMessage(e));
}
}
3.3.9 执行同步工具
/**
* 执行同步工具调用,并统一处理异常
* @param callback 待执行的同步工具回调
* @param request 工具调用请求对象
* @param toolContextMap 工具执行上下文
* @param config 运行时配置
* @return 工具调用响应结果
*/
private ToolCallResponse executeSyncTool(ToolCallback callback, ToolCallRequest request,
Map<String, Object> toolContextMap, RunnableConfig config) {
// 构建工具执行上下文
ToolContext context = new ToolContext(toolContextMap);
try {
// 同步执行工具调用,直接获取返回结果
String result = callback.call(request.getArguments(), context);
// 开启执行日志时打印信息
if (enableActingLog) {
logger.info("[线程ID {}] 代理 {} 执行操作,工具 {} 调用完成",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName, request.getToolName());
if (logger.isDebugEnabled()) {
logger.debug("工具 {} 返回结果:{}", request.getToolName(), result);
}
}
// 封装并返回成功响应
return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
}
// 捕获工具执行异常,使用异常处理器生成友好结果
catch (ToolExecutionException e) {
logger.error("工具 {} 执行失败,使用异常处理器处理:{}", request.getToolName(),
toolExecutionExceptionProcessor.getClass().getName(), e);
String result = toolExecutionExceptionProcessor.process(e);
return ToolCallResponse.of(request.getToolCallId(), request.getToolName(), result);
}
// 捕获其他所有异常,返回错误响应
catch (Exception e) {
logger.error("工具 {} 执行失败:{}", request.getToolName(), e.getMessage(), e);
return ToolCallResponse.error(request.getToolCallId(), request.getToolName(), e);
}
}
3.3.10 部分执行处理
当 lastMessage 是 ToolResponseMessage 时(表示部分工具已执行),处理剩余工具:
handlePartialToolResponses(toolResponseMessage, messages, state, config)
│
├─ 取倒数第二条消息(应为 AssistantMessage)
├─ 计算已执行工具 ID 集合:existingResponses.stream().map(id)
├─ 过滤出未执行工具:remainingToolCalls = assistantMessage.toolCalls - executedToolIds
│
├─ remainingToolCalls 为空?
│ └─ Yes → return Map.of() // 全部已执行,避免重复追加
│
├─ 执行剩余工具(支持并行/串行)
│
├─ 合并 existingResponses + newResponses
├─ 计算 returnDirect(基于所有工具调用)
│
└─ 返回合并后的状态:
├─ "messages" → [merged ToolResponseMessage, RemoveByHash(oldToolResponseMessage)]
└─ ...newResults
/**
* 处理分段/增量式工具响应:补全未执行完的工具调用
* 用于支持部分工具已执行、剩余工具需要继续执行的场景
* @param toolResponseMessage 当前工具响应消息
* @param messages 完整消息列表
* @param state 全局状态
* @param config 运行时配置
* @return 包含更新后消息与状态的结果集
*/
private Map<String, Object> handlePartialToolResponses(ToolResponseMessage toolResponseMessage,
List<Message> messages, OverAllState state, RunnableConfig config) {
// 校验消息长度:必须能找到前一条助手消息
if (messages.size() < 2) {
throw new IllegalStateException("无法在工具响应消息之前找到助手消息");
}
// 获取倒数第二条消息,必须是 AssistantMessage(工具调用来源)
Message secondLastMessage = messages.get(messages.size() - 2);
if (!(secondLastMessage instanceof AssistantMessage assistantMessage)) {
throw new IllegalStateException("工具响应消息的前一条消息不是助手消息");
}
// 获取已执行完成的工具响应,并收集已执行的工具ID
List<ToolResponseMessage.ToolResponse> existingResponses = toolResponseMessage.getResponses();
Set<String> executedToolIds = existingResponses.stream()
.map(ToolResponseMessage.ToolResponse::id)
.collect(Collectors.toSet());
// 过滤出**尚未执行**的工具调用
List<AssistantMessage.ToolCall> remainingToolCalls = assistantMessage.getToolCalls()
.stream()
.filter(tc -> !executedToolIds.contains(tc.id()))
.toList();
// 所有工具都已执行完毕:返回空Map,避免重复追加
if (remainingToolCalls.isEmpty()) {
return Map.of();
}
// 打印执行日志:总工具数 / 已完成数
if (enableActingLog) {
logger.info("[线程ID {}] 代理 {} 开始执行剩余工具,共 {} 个,已完成 {} 个",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName,
assistantMessage.getToolCalls().size(), existingResponses.size());
}
// 执行剩余工具:并行 / 串行
Map<String, Object> newResults;
if (parallelToolExecution && remainingToolCalls.size() > 1) {
newResults = executeToolCallsParallel(remainingToolCalls, state, config);
}
else {
newResults = executeToolCallsSequential(remainingToolCalls, state, config);
}
// 合并:旧响应 + 新执行的响应
ToolResponseMessage newToolResponseMessage = (ToolResponseMessage) newResults.get("messages");
List<ToolResponseMessage.ToolResponse> allResponses = new ArrayList<>(existingResponses);
allResponses.addAll(newToolResponseMessage.getResponses());
// 统一计算 returnDirect:遍历所有工具(已执行+未执行)
Boolean returnDirect = null;
for (AssistantMessage.ToolCall toolCall : assistantMessage.getToolCalls()) {
returnDirect = shouldReturnDirect(toolCall, returnDirect, config);
}
// 构建最终状态:替换旧的工具响应,追加新的完整响应
Map<String, Object> updatedState = new HashMap<>(newResults);
List<Object> newMessages = new ArrayList<>();
// 构建完整工具响应
ToolResponseMessage.Builder builder = ToolResponseMessage.builder().responses(allResponses);
if (returnDirect != null && returnDirect) {
builder.metadata(Map.of(FINISH_REASON_METADATA_KEY, FINISH_REASON));
}
// 消息更新规则:添加新响应 + 移除旧的工具响应
newMessages.add(builder.build());
newMessages.add(new RemoveByHash<>(toolResponseMessage));
updatedState.put("messages", newMessages);
// 执行完成日志
if (enableActingLog) {
logger.info("[线程ID {}] 代理 {} 分段工具执行完成并返回",
config.threadId().orElse(THREAD_ID_DEFAULT), agentName);
}
return updatedState;
}
RemoveByHash是一个特殊标记,告诉状态引擎用新的合并消息替换旧的ToolResponseMessage,而不是追加。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)