Day-1最后我们复盘到了开始节点的装配,所以今天我们就从装配节点逻辑开始继续复盘。

public class RootNode extends AbstractArmorySupport {
    @Resource
    private AiApiNode aiApiNode;
    @Override
    protected AiAgentRegisterVO doApply(ArmoryCommandEntity requestParameter, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        // 路由到下一个节点
        return router(requestParameter, dynamicContext);
    }

    @Override
    public StrategyHandler<ArmoryCommandEntity, DefaultArmoryFactory.DynamicContext, AiAgentRegisterVO> get(ArmoryCommandEntity armoryCommandEntity, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        return aiApiNode;
    }
}

一、通过DefaultArmoryFactory返回根节点后,根节点使用apply方法,先进行一步资源加载,加载完成后,然后调用apply方法,装配节点所需要的数据,然后流转到AiApiNode节点。

public class AiApiNode extends AbstractArmorySupport {
    @Resource
    private ChatModelNode chatModelNode;
    @Override
    protected AiAgentRegisterVO doApply(ArmoryCommandEntity requestParameter, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        // 编写 AiApi 构建
        log.info("Ai Agent 装配操作 - AiApiNode");
        AiAgentConfigTableVO aiAgentConfigTableVO = requestParameter.getAiAgentConfigTableVO();
        AiAgentConfigTableVO.Module.AiApi aiApiConfig = aiAgentConfigTableVO.getModule().getAiApi();

        OpenAiApi openAiApi = OpenAiApi.builder()
                .baseUrl(aiApiConfig.getBaseUrl())
                .apiKey(aiApiConfig.getApiKey())
                .completionsPath(StringUtils.isNotBlank(aiApiConfig.getCompletionsPath()) ? aiApiConfig.getCompletionsPath() : "v1/chat/completions")
                .embeddingsPath(StringUtils.isNotBlank(aiApiConfig.getEmbeddingsPath()) ? aiApiConfig.getEmbeddingsPath() : "v1/embeddings")
                .build();
        dynamicContext.setOpenAiApi(openAiApi);
        // 路由到下一个节点,如果不需要路由了,可以 return 返回结果
        return router(requestParameter, dynamicContext);
    }

    @Override
    public StrategyHandler<ArmoryCommandEntity, DefaultArmoryFactory.DynamicContext, AiAgentRegisterVO> get(ArmoryCommandEntity requestParameter, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        // 如果不需要下一个节点了,可以配置 defaultStrategyHandler
        return chatModelNode;
    }
}

二、AiApiNode它的目的是和 AI 接口,建立请求连接。

baseUrl:这是AI大模型的服务器地址,这里用的是xfg的第三方中转站

apiKey:调用AI的秘钥(通行证),在baseUrl可以审清。

completionsPath:聊天对话的接口路径,最终拼接成baseUrl + v1/chat/completions。

embeddingsPath:向量生成接口路径,把文字变成i向量,AI理解文字,做知识库的入口。

最终把所有参数组装成一个可用的OpenAiApi客户端,并记录在上下文对象中,然后继续流转到ChatModelNode节点。

public class ChatModelNode extends AbstractArmorySupport {
    @Resource
    private AgentNode agentNode;
    @Resource
    private DefaultMcpClientFactory defaultMcpClientFactory;
    @Resource
    private ToolSkillsCreateService toolSkillsCreateService;
    @Override
    protected AiAgentRegisterVO doApply(ArmoryCommandEntity requestParameter, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        log.info("Ai Agent 装配操作 - ChatModelNode");
        // 获取上下文对象
        OpenAiApi openAiApi = dynamicContext.getOpenAiApi();

        // 获取配置对象
        AiAgentConfigTableVO aiAgentConfigTableVO = requestParameter.getAiAgentConfigTableVO();
        AiAgentConfigTableVO.Module.ChatModel chatModelConfig = aiAgentConfigTableVO.getModule().getChatModel();
        List<AiAgentConfigTableVO.Module.ChatModel.ToolMcp> toolMcpList = chatModelConfig.getToolMcpList();
        List<AiAgentConfigTableVO.Module.ChatModel.ToolSkills> toolSkillsList = chatModelConfig.getToolSkillsList();

        List<ToolCallback> toolCallbackList = new ArrayList<>();
        // 构建mcp服务(工厂)
        if (toolMcpList !=null && !toolMcpList.isEmpty()) {
            for (AiAgentConfigTableVO.Module.ChatModel.ToolMcp toolMcp : toolMcpList) {
                ToolMcpCreateService toolMcpCreateService = defaultMcpClientFactory.getToolMcpCreateService(toolMcp);
                ToolCallback[] toolCallbacks = toolMcpCreateService.buildToolCallback(toolMcp);
                toolCallbackList.addAll(List.of(toolCallbacks));
            }
        }
        //构建skills服务
        if (toolSkillsList !=null && !toolSkillsList.isEmpty()) {
            for (AiAgentConfigTableVO.Module.ChatModel.ToolSkills toolSkills : toolSkillsList) {
                ToolCallback[] toolCallbacks = toolSkillsCreateService.buildToolCallback(toolSkills);
                toolCallbackList.addAll(List.of(toolCallbacks));
            }
        }

        // 构建对话模型
        ChatModel chatModel = OpenAiChatModel.builder()
                .openAiApi(openAiApi)
                .defaultOptions(OpenAiChatOptions.builder()
                        .model(chatModelConfig.getModel())
                        .toolCallbacks(toolCallbackList)
                        .build())
                .build();
        dynamicContext.setChatModel(chatModel);;
        return router(requestParameter,dynamicContext);
    }

    @Override
    public StrategyHandler<ArmoryCommandEntity, DefaultArmoryFactory.DynamicContext, AiAgentRegisterVO> get(ArmoryCommandEntity requestParameter, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        return agentNode;
    }
}

三、首先从上下文对象中获取到OpenAiApi对象,然后拿到配置里的两种工具mcp和skills。

这里我们最初只引入了sse和stdio类型的mcp服务,所以我们将实例化不同类型的MCP服务都封装到了该类中的createMcpSyncClient方法中。但是之后我们想到如果我们要扩展自己的一些MCP服务呢,比如,做一些智能体客服场景,或者公司内部的系统巡检场景,这些场景并没有提供统一的 mcp 服务,则可能都要自己扩展,扩展后装配到整个智能体中。

所以如果我们还编写到这个方法中,我们必须通过一层一层的if···else 判断的方式进行处理,那这个方法只会越写越长。

所以我们对这块工具进行了增强装配,通过工厂+策略接口来扩展实现,并区分出 mcp 客户端、服务端。客户端负责策略装配,服务端负责用户自己扩展新的 mcp 服务。

public interface ToolMcpCreateService {
    ToolCallback[] buildToolCallback(AiAgentConfigTableVO.Module.ChatModel.ToolMcp toolMcp) throws Exception;
}

策略接口是一种抽象设计,通过定义统一的接口,让所有实现的接口,都按照统一的方式使用入参,完成出参。中间过程自己处理,这样就屏蔽了差异。

MCPSSE实现:
 

public class SSEToolMcpCreateService implements ToolMcpCreateService {
    @Override
    public ToolCallback[] buildToolCallback(AiAgentConfigTableVO.Module.ChatModel.ToolMcp toolMcp) throws Exception{
        AiAgentConfigTableVO.Module.ChatModel.ToolMcp.SSEServerParameters sseConfig = toolMcp.getSse();
        String originalBaseUri  = sseConfig.getBaseUri();
        String baseUri=originalBaseUri;
        String sseEndpoint = sseConfig.getSseEndpoint();
        if(StringUtils.isBlank(sseEndpoint)){
            URL url = new URL(originalBaseUri);
            String protocol = url.getProtocol();
            String host = url.getHost();
            int port = url.getPort();
            String baseUrl=port==-1 ? protocol+"://"+host : protocol+"://"+host+":"+port;
            int index = originalBaseUri.indexOf(baseUrl);
            if (index!=-1) {
                sseEndpoint=originalBaseUri.substring(index+baseUrl.length());
            }
            baseUri=baseUrl;
        }
        sseEndpoint=StringUtils.isBlank(sseEndpoint) ? "/sse" :sseEndpoint;
        HttpClientSseClientTransport sseClientTransport = HttpClientSseClientTransport
                .builder(baseUri)
                .sseEndpoint(sseEndpoint)
                .build();

        McpSyncClient mcpSyncClient = McpClient
                .sync(sseClientTransport)
                .requestTimeout(Duration.ofMillis(sseConfig.getRequestTimeout())).build();
        McpSchema.InitializeResult initialize = mcpSyncClient.initialize();
        log.info("Tool SSE MCP Initialized {}", initialize);
        return SyncMcpToolCallbackProvider.builder()
                .mcpClients(mcpSyncClient).build()
                .getToolCallbacks();
    }
}

首先从配置里拿到SSE连接参数,然后解析地址,拆分出baseUrl和路径(把地址拆成:协议,主机,端口),拼接干净的baseUri,接着自动补全sseEndpoint。

创建SSE传输器,建立和远程MCP服务器的SSE长连接,服务器会通过这个连接主动推送消息给 AI。

创建MCP同步客户端,得到一个可以调用远程MCP工具的客户端,初始化MCP连接。

最终返回AI可用的工具回调。

MCPSTDIO实现:

public class StdioToolMcpCreateService implements ToolMcpCreateService {
    @Override
    public ToolCallback[] buildToolCallback(AiAgentConfigTableVO.Module.ChatModel.ToolMcp toolMcp) {
        AiAgentConfigTableVO.Module.ChatModel.ToolMcp.StdioServerParameters stdioConfig = toolMcp.getStdio();
        AiAgentConfigTableVO.Module.ChatModel.ToolMcp.StdioServerParameters.ServerParameters serverParameters = stdioConfig.getServerParameters();

        ServerParameters stdioParams = ServerParameters.builder(serverParameters.getCommand())
                .args(serverParameters.getArgs())
                .env(serverParameters.getEnv())
                .build();

        McpSyncClient mcpSyncClient = McpClient.sync(new StdioClientTransport(stdioParams, new JacksonMcpJsonMapper(new ObjectMapper())))
                .requestTimeout(Duration.ofSeconds(stdioConfig.getRequestTimeout())).build();

        McpSchema.InitializeResult init_stdio = mcpSyncClient.initialize();
        log.info("Tool Stdio MCP Initialized {}", init_stdio);
        return SyncMcpToolCallbackProvider.builder()
                .mcpClients(mcpSyncClient).build()
                .getToolCallbacks();
    }
}

首先从配置里拿到stdio配置,构建stdio启动参数把(命令、参数、环境变量打包成启动本地进程的参数)。

创建stdio传输器(核心)直接启动本地子进程通过 stdin/stdout 和工具进程通信  → 不走网络、不占端口、纯本地通信。

创建MCP同步客户端,初始化本地MCP工具,最后生成AI能使用的工具回调。

MCPLOCAL实现:

public class LocalToolMcpCreateService implements ToolMcpCreateService {
    @Resource
    protected ApplicationContext applicationContext;
    @Override
    public ToolCallback[] buildToolCallback(AiAgentConfigTableVO.Module.ChatModel.ToolMcp toolMcp) {
        AiAgentConfigTableVO.Module.ChatModel.ToolMcp.LocalParameters local = toolMcp.getLocal();
        ToolCallbackProvider localToolCallbackProvider  = (ToolCallbackProvider) applicationContext.getBean(local.getName());
        log.info("Tool Local MCP Initialized {}", local.getName());
        return localToolCallbackProvider.getToolCallbacks();
    }
}

本地local的核心就是拿到Bean的名称,也就拿到了getToolCallbacks,返回即可。

@SpringBootApplication
@Configurable
public class Application {

    public static void main(String[] args){
        SpringApplication.run(Application.class);
    }

    @Bean("myToolCallbackProvider")
    public ToolCallbackProvider testTools(MyTestMcpService testService) {
        return MethodToolCallbackProvider.builder().toolObjects(testService).build();
    }
}

在服务端我们实现好自己需要的mcp工具后,在Application中完成装配处理即可,bean 的名称 myToolCallbackProvider 就是作为本地的 mcp 服务进行使用的。

到这里我们关于mcp服务的构建就算完成了,紧接着我们通过同样的构建逻辑构建skills服务。

两个服务都构建好后,ChatModelNode节点的装配就算完成了,继续流转到AgentNode。

public class AgentNode extends AbstractArmorySupport {
    @Resource
    private AgentWorkflowNode agentWorkflowNode;
    @Override
    protected AiAgentRegisterVO doApply(ArmoryCommandEntity requestParameter, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        // 1. 上下文获取数据
        ChatModel chatModel = dynamicContext.getChatModel();
        AiAgentConfigTableVO aiAgentConfigTableVO = requestParameter.getAiAgentConfigTableVO();

        List<AiAgentConfigTableVO.Module.Agent> agents = aiAgentConfigTableVO.getModule().getAgents();
        for (AiAgentConfigTableVO.Module.Agent agentConfig : agents) {
            LlmAgent agent=LlmAgent.builder()
                    .model(new MySpringAI(chatModel))
                    .name(agentConfig.getName())
                    .description(agentConfig.getDescription())
                    .instruction(agentConfig.getInstruction())
                    .outputKey(agentConfig.getOutputKey())
                    .build();
            dynamicContext.getAgentGroup().put(agentConfig.getName(),agent);
        }
        return router(requestParameter,dynamicContext);
    }

    @Override
    public StrategyHandler<ArmoryCommandEntity, DefaultArmoryFactory.DynamicContext, AiAgentRegisterVO> get(ArmoryCommandEntity requestParameter, DefaultArmoryFactory.DynamicContext dynamicContext) throws Exception {
        return agentWorkflowNode;
    }
}

四、首先从上下文拿到关键对象,遍历配置,一个一个创建Agent,并且存到上下文对象中。然后继续流转到AgentWorkflowNode节点。

model:大语言模型实例。

name:智能体名称。

description:智能体能力描述(智能体是做什么的)

instruction:系统指令/系统提示词(定规则,人设)

outputKey:输出结果键名(结果存在哪个字段)

还有几个节点的装配下次再发,累了

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐