基于Skill框架实现多智能体协作的订单Agent
构建一个完整的基于 Spring AI 框架的购物场景多智能体协作系统。这个系统将严格按照 "下单 - 支付 - 物流 - 交付 - 客服" 的业务流程,通过 5 个独立智能体的协同工作,实现全链路智能化、低耦合、高扩展的购物流程管理。
一、系统架构设计
1.1 整体架构概述
本系统采用分层架构设计,严格遵循 Spring 生态的设计原则,确保系统的可维护性和可扩展性。整体架构分为四个核心层次:
|
┌─────────────────┐ │ Controller层 │ 对外暴露REST接口,接收请求并触发流程 ├─────────────────┤ │ Service层 │ 核心业务层:5个智能体+StateGraph流程编排 ├─────────────────┤ │ Config层 │ 配置OpenAI/支付宝客户端,依赖注入 ├─────────────────┤ │ DTO层 │ 定义上下文/请求模型,统一数据传递格式 └─────────────────┘ |
系统的核心设计理念是 **"分工协作,智能优化"**。每个智能体专注于单一业务领域的处理,通过标准化的接口进行通信和数据交换。这种设计不仅降低了系统的耦合度,还使得每个智能体可以独立开发、测试和维护。
1.2 多智能体架构模式
系统采用 **"1 个 Supervisor 智能体 + 5 个专业智能体"** 的架构模式(9)。Supervisor 作为核心调度节点,负责接收用户需求、分配任务、协调各智能体交互,并监控任务执行状态。5 个专业智能体分别对应购物流程的 5 个环节:商品智能体、购买智能体、支付智能体、订单智能体、物流智能体、客服智能体。
同时,系统还采用了 **"去中心化 + 中心化" 混合架构 **(12)。既保留各智能体的独立决策能力,又通过一个 "协同中枢" 统一接收、分发任务。这种设计使得系统在保持灵活性的同时,又能确保整体流程的可控性。
1.3 线程池架构设计
系统采用分层线程池架构,针对不同类型任务设计专用线程池,避免资源竞争和任务阻塞。核心线程池配置如下:
|
线程池名称 |
核心线程数 |
最大线程数 |
队列容量 |
适用场景 |
|
DEFAULT_TASK_EXECUTOR |
100 |
200 |
1024 |
通用任务处理 |
|
taskExecutorService |
100 |
200 |
100 |
工作流任务执行 |
|
nodeExecutorService |
100 |
200 |
100 |
工作流节点执行 |
|
TOOL_TASK_EXECUTOR |
40 |
50 |
50 |
工具调用任务 |
这种设计的优势在于:
资源隔离:工具调用任务(如 API 请求)与工作流执行任务分离,避免某类任务异常影响整体系统
弹性伸缩:通过动态线程数适应负载变化
上下文传递:所有线程池均通过 ThreadLocal 传递请求上下文
1.4 StateGraph 流程编排
系统采用 Spring AI Alibaba Graph 作为 Agent 编排背后的核心引擎(18)。StateGraph 的核心概念包括:
StateGraph(状态图):用于定义节点和边
Node(节点):封装具体操作或模型调用
Edge(边):表示节点间的跳转关系
OverAllState(全局状态):贯穿流程共享数据
StateGraph 采用 **"构建→编译→执行" 三阶段模式 **(22):
构建阶段:定义 StateGraph,注册节点和边
编译阶段:将 StateGraph 编译为 CompiledGraph,进行结构校验和优化
执行阶段:CompiledGraph 驱动状态流转,执行节点逻辑
1.5 低耦合高扩展设计原则
系统严格遵循高内聚低耦合的设计原则(40)。低耦合意味着:
模块之间依赖关系应该尽量少
模块之间的变更应该不会影响到其他模块
模块之间通过定义良好的接口进行通信
高扩展设计通过以下机制实现:
Skill 设计模式:每个智能体作为独立的 Skill 实现,支持动态加载和替换
接口抽象:通过统一的 Agent 接口屏蔽不同智能体的实现差异
事件驱动:智能体间通过事件机制通信,避免直接依赖
二、核心技术选型
系统采用的核心技术栈包括:
|
技术层 |
技术选型 |
版本 |
说明 |
|
基础框架 |
Spring Boot |
3.2+ |
快速开发框架 |
|
AI 框架 |
Spring AI |
1.0.0-M1 |
AI 能力集成 |
|
流程编排 |
Spring AI StateGraph |
1.0.0-M1 |
智能体流程管理 |
|
AI 服务 |
OpenAI 原生 API |
- |
大语言模型服务 |
|
支付工具 |
支付宝 MCP |
4.38.0.ALL |
支付处理 |
|
知识库 |
RAG + 向量数据库 |
- |
智能问答 |
三、智能体设计与实现
完整可运行的多智能体协作系统代码(含Skill调用)
一、核心接口定义
1.1 Agent 接口
|
import com.shop.multiagent.dto.ShopAgentContext; /** * 智能体统一接口 */ public interface Agent { ShopAgentContext execute(ShopAgentContext context); } |
1.2 Skill 接口
|
import java.util.Map; /** * Skill统一接口(插件化调度核心) */ public interface Skill { /** * Skill名称(唯一标识) */ String getName(); /** * Skill描述 */ String getDescription(); /** * 执行Skill(统一参数格式) * @param parameters 入参(标准化Map格式) * @return 执行结果 */ Object execute(Map parameters); /** * Skill元数据(版本、超时时间等) */ Map<String, Object> getMetadata(); } |
1.3 Skill 注解
|
import java.lang.annotation.*; /** * Skill注解(用于标识智能体为Skill组件) */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Skill { /** * Skill名称(唯一) */ String value(); /** * Skill描述 */ String description() default ""; /** * 版本号 */ String version() default "1.0.0"; } |
二、数据传输对象(DTO)
2.1 ShopAgentContext(共享上下文)
|
import lombok.Data; import java.util.HashMap; import java.util.Map; /** * 智能体共享上下文(传递订单号、状态等共享数据) */ @Data public class ShopAgentContext { // 核心共享字段 private String orderId; // 订单ID(全局共享) private String productInfo; // 商品信息 private String userId; // 用户ID private String tradeNo; // 支付宝交易号 // 各智能体输出结果 private String orderInfo; // 下单智能体输出(JSON格式) private boolean paymentSuccess; // 支付智能体结果(成功/失败) private String logisticsInfo; // 物流智能体结果(物流轨迹) private String deliveryInfo; // 交付智能体输出(预计送达时间) private String customerServiceReply; // 客服智能体输出(应答内容) // 扩展字段(存储临时数据) private Map<String, Object> attributes = new HashMap |
2.2 错误响应 DTO
|
import lombok.Data; /** * 全局错误响应 */ @Data public class ErrorResponse { private String errorCode; private String errorMessage; private String details; } |
三、智能体实现(完整语法正确)
3.1 下单智能体(OrderAgentService)
|
import com.alibaba.fastjson.JSON; import com.shop.multiagent.dto.ShopAgentContext; import com.shop.multiagent.framework.Agent; import com.shop.multiagent.framework.Skill; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.openai.OpenAiCompletionsClient; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import java.time.Duration; import java.util.HashMap; import java.util.Map; @Service @Slf4j @Skill( value = "order-agent", description = "生成订单信息的智能体(含商品明细、金额、收货地址等)", version = "1.0.0" ) public class OrderAgentService implements Agent, Skill { private final OpenAiCompletionsClient completionsClient; // 构造器注入OpenAI客户端 public OrderAgentService(OpenAiCompletionsClient completionsClient) { this.completionsClient = completionsClient; } // ====== Agent接口实现(核心业务逻辑)====== @Override public ShopAgentContext execute(ShopAgentContext context) { // 1. 构造OpenAI提示词 String prompt = String.format( "生成订单号【%s】的JSON格式下单信息,要求如下:" + "1. 用户ID:%s" + "2. 商品信息:%s" + "3. 必须包含字段:商品明细(名称、规格、单价、数量)、总金额(保留2位小数)、收货地址(省/市/区/详细地址)、下单时间(格式:yyyy-MM-dd HH:mm:ss)" + "4. 金额随机生成(合理范围:100-10000元)" + "5. 仅返回JSON,不包含其他内容", context.getOrderId(), context.getUserId(), context.getProductInfo() ); // 2. 调用OpenAI流式接口 FluxionsClient.stream(prompt); StringBuilder orderInfoBuilder = new StringBuilder(); // 3. 订阅流式结果 stream.subscribe( chunk -> orderInfoBuilder.append(chunk), // 接收流数据 e -> log.error("下单智能体OpenAI调用异常,订单号:{}", context.getOrderId(), e), // 异常处理 () -> log.info("下单智能体OpenAI调用完成,订单号:{}", context.getOrderId()) // 流结束回调 ); // 4. 等待流结束(超时3秒) try { stream.blockLast(Duration.ofSeconds(3)); } catch (Exception e) { log.error("下单智能体超时,订单号:{}", context.getOrderId(), e); throw new RuntimeException("订单生成超时", e); } // 5. 校验结果并设置到上下文 String orderInfo = orderInfoBuilder.toString().trim(); if (!isValidJson(orderInfo)) { throw new RuntimeException("订单信息格式错误,非JSON:" + orderInfo); } context.setOrderInfo(orderInfo); return context; } // ====== Skill接口实现(适配Skill框架)====== @Override public String getName() { return this.getClass().getAnnotation(Skill.class).value(); } @Override public String getDescription() { return this.getClass().getAnnotation(Skill.class).description(); } @Override public Object execute(Map, Object> parameters) { // 将Skill标准化参数转换为上下文对象 ShopAgentContext context = convertParamsToContext(parameters); // 复用Agent核心逻辑 return execute(context); } @Override public Map getMetadata() { Map = new HashMap metadata.put("version", this.getClass().getAnnotation(Skill.class).version()); metadata.put("supportAsync", true); // 支持异步执行 metadata.put("timeout", 3000); // 超时时间(3秒) metadata.put("type", "AI_GENERATION");// Skill类型(AI生成类) return metadata; } // ====== 辅助方法 ====== /** * 校验字符串是否为合法JSON */ private boolean isValidJson(String json) { try { JSON.parseObject(json); return true; } catch (Exception e) { return false; } } /** * 将Skill参数转换为上下文对象 */ private ShopAgentContext convertParamsToContext(Map parameters) { ShopAgentContext context = new ShopAgentContext(); context.setOrderId((String) parameters.get("orderId")); context.setUserId((String) parameters.get("userId")); context.setProductInfo((String) parameters.get("productInfo")); context.setTradeNo((String) parameters.get("tradeNo")); // 兼容扩展字段 if (parameters.containsKey("attributes") && parameters.get("attributes") instanceof Map) { context.setAttributes((Map parameters.get("attributes")); } // 校验必填参数 validateRequiredParams(context); return context; } /** * 校验必填参数 */ private void validateRequiredParams(ShopAgentContext context) { if (context.getOrderId() == null) { throw new IllegalArgumentException("参数缺失:orderId"); } if (context.getUserId() == null) { throw new IllegalArgumentException("参数缺失:userId"); } if (context.getProductInfo() == null) { throw new IllegalArgumentException("参数缺失:productInfo"); } } } |
3.2 支付智能体(PaymentAgentService)
|
import com.alibaba.fastjson.JSON; import com.alipay.api.AlipayApiException; import com.alipay.api.AlipayClient; import com.alipay.api.request.AlipayTradeQueryRequest; import com.alipay.api.response.AlipayTradeQueryResponse; import com.shop.multiagent.dto.ShopAgentContext; import com.shop.multiagent.framework.Agent; import com.shop.multiagent.framework.Skill; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.Map; @Service @Slf4j @Skill( value = "payment-agent", description = "支付宝支付状态校验智能体", version = "1.0.0" ) public class PaymentAgentService implements Agent, Skill { private final AlipayClient alipayClient; // 构造器注入支付宝客户端 public PaymentAgentService(AlipayClient alipayClient) { this.alipayClient = alipayClient; } // ====== Agent接口实现(核心业务逻辑)====== @Override public ShopAgentContext execute(ShopAgentContext context) { log.info("支付智能体执行,订单号:{},交易号:{}", context.getOrderId(), context.getTradeNo()); // 1. 校验必填参数 if (context.getTradeNo() == null) { throw new IllegalArgumentException("支付智能体参数缺失:tradeNo"); } // 2. 构造支付宝查询请求 AlipayTradeQueryRequest request = new AlipayTradeQueryRequest(); Map> bizContent = new HashMap<>(); bizContent.put("out_trade_no", context.getOrderId()); // 商户订单号 bizContent.put("trade_no", context.getTradeNo()); // 支付宝交易号 request.setBizContent(JSON.toJSONString(bizContent)); try { // 3. 调用支付宝MCP接口 AlipayTradeQueryResponse response = alipayClient.execute(request); // 4. 解析响应结果 if (response.isSuccess()) { String tradeStatus = response.getTradeStatus(); log.info("支付宝查询成功,订单号:{},交易状态:{}", context.getOrderId(), tradeStatus); // 5. 判断支付是否成功(支付宝交易状态:TRADE_SUCCESS/TRADE_FINISHED) boolean paymentSuccess = "TRADE_SUCCESS".equals(tradeStatus) || "TRADE_FINISHED".equals(tradeStatus); context.setPaymentSuccess(paymentSuccess); // 6. 存储支付详情到扩展字段 Map> paymentDetails = new HashMap<>(); paymentDetails.put("tradeStatus", tradeStatus); paymentDetails.put("totalAmount", response.getTotalAmount()); paymentDetails.put("payTime", response.getSendPayDate()); context.getAttributes().put("paymentDetails", paymentDetails); } else { log.error("支付宝查询失败,订单号:{},错误码:{},错误信息:{}", context.getOrderId(), response.getCode(), response.getMsg()); context.setPaymentSuccess(false); context.getAttributes().put("paymentError", response.getMsg()); } } catch (AlipayApiException e) { log.error("支付宝接口调用异常,订单号:{}", context.getOrderId(), e); context.setPaymentSuccess(false); context.getAttributes().put("paymentError", e.getErrMsg()); throw new RuntimeException("支付校验失败", e); } return context; } // ====== Skill接口实现(适配Skill框架)====== @Override public String getName() { return this.getClass().getAnnotation(Skill.class).value(); } @Override public String getDescription() { return this.getClass().getAnnotation(Skill.class).description(); } @Override public Object execute(Map parameters) { ShopAgentContext context = convertParamsToContext(parameters); return execute(context); } @Override public Map Object> getMetadata() { Map> metadata = new HashMap(); metadata.put("version", this.getClass().getAnnotation(Skill.class).version()); metadata.put("supportAsync", false); // 支付操作需同步执行 metadata.put("timeout", 5000); // 超时时间(5秒) metadata.put("type", "PAYMENT"); // Skill类型(支付类) return metadata; } // ====== 辅助方法 ====== private ShopAgentContext convertParamsToContext(Map, Object> parameters) { ShopAgentContext context = new ShopAgentContext(); context.setOrderId((String) parameters.get("orderId")); context.setTradeNo((String) parameters.get("tradeNo")); context.setOrderInfo((String) parameters.get("orderInfo")); // 校验必填参数 if (context.getOrderId() == null) { throw new IllegalArgumentException("参数缺失:orderId"); } if (context.getTradeNo() == null) { throw new IllegalArgumentException("参数缺失:tradeNo"); } return context; } } |
3.3 物流智能体(LogisticsAgentService)
|
import com.shop.multiagent.dto.ShopAgentContext; import com.shop.multiagent.framework.Agent; import com.shop.multiagent.framework.Skill; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.openai.OpenAiChatClient; import org.springframework.stereotype.Service; import java.util.*; @Service @Slf4j @Skill( value = "logistics-agent", description = "物流轨迹查询与格式化智能体", version = "1.0.0" ) public class LogisticsAgentService implements Agent, Skill { private final OpenAiChatClient chatClient; // 构造器注入OpenAI Chat客户端 public LogisticsAgentService(OpenAiChatClient chatClient) { this.chatClient = chatClient; } // 模拟物流快递公司列表 private static final List EXPRESS_COMPANIES = Arrays.asList("顺丰速运", "京东物流", "中通快递", "圆通快递", "韵达快递"); // ====== Agent接口实现(核心业务逻辑)====== @Override public ShopAgentContext execute(ShopAgentContext context) { log.info("物流智能体执行,订单号:{}", context.getOrderId()); // 1. 支付失败直接返回 if (!context.isPaymentSuccess()) { log.warn("订单{}支付未成功,跳过物流查询", context.getOrderId()); context.setLogisticsInfo("支付未完成,暂无法查询物流信息"); return context; } try { // 2. 模拟调用物流接口(真实场景替换为支付宝物流API或第三方物流接口) String logisticsRawData = mockLogisticsQuery(context.getOrderId()); log.info("订单{}物流原始数据:{}", context.getOrderId(), logisticsRawData); // 3. 调用OpenAI格式化物流信息(用户友好型输出) String formattedLogistics = formatLogisticsInfo(logisticsRawData); context.setLogisticsInfo(formattedLogistics); // 4. 存储物流详情到扩展字段 context.getAttributes().put("logisticsRawData", logisticsRawData); } catch (Exception e) { log.error("物流智能体执行异常,订单号:{}", context.getOrderId(), e); context.setLogisticsInfo("物流信息查询失败,请稍后重试"); context.getAttributes().put("logisticsError", e.getMessage()); } return context; } // ====== Skill接口实现(适配Skill框架)====== @Override public String getName() { return this.getClass().getAnnotation(Skill.class).value(); } @Override public String getDescription() { return this.getClass().getAnnotation(Skill.class).description(); } @Override public Object execute(Map> parameters) { ShopAgentContext context = convertParamsToContext(parameters); return execute(context); } @Override public Map, Object> getMetadata() { Map Object> metadata = new HashMap<>(); metadata.put("version", this.getClass().getAnnotation(Skill.class).version()); metadata.put("supportAsync", true); // 支持异步执行 metadata.put("timeout", 4000); // 超时时间(4秒) metadata.put("type", "LOGISTICS"); // Skill类型(物流类) return metadata; } // ====== 辅助方法 ====== /** * 模拟物流查询接口返回 */ private String mockLogisticsQuery(String orderId) { // 随机选择快递公司 String expressCompany = EXPRESS_COMPANIES.get(new Random().nextInt(EXPRESS_COMPANIES.size())); // 生成随机运单号 String trackingNumber = "YT" + System.currentTimeMillis() + new Random().nextInt(1000); // 模拟物流轨迹(3-5个节点) List<Map String>> tracks = new ArrayList<>(); int trackCount = 3 + new Random().nextInt(3); for (int i = 0; i { Map String> track = new HashMap<>(); String time = generateRandomTime(i); String status = getTrackStatus(i, trackCount); track.put("time", time); track.put("status", status); tracks.add(track); } // 构造原始数据JSON Map Object> rawData = new HashMapData.put("expressCompany", expressCompany); rawData.put("trackingNumber", trackingNumber); rawData.put("tracks", tracks); rawData.put("queryTime", new Date().toString()); return com.alibaba.fastjson.JSON.toJSONString(rawData); } /** * 生成随机时间(模拟物流节点时间顺序) */ private String generateRandomTime(int index) { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.HOUR, - (index * 2 + new Random().nextInt(3))); return String.format("%tF %tT", calendar, calendar); } /** * 获取物流节点状态 */ private String getTrackStatus(int index, int total) { if (index == 0) { return "订单已创建,等待揽收"; } else if (index == 1) { return "快递员已揽收,正在前往分拣中心"; } else if (index 1) { return "包裹已到达" + getRandomCity() + "分拣中心,正在分拣"; } else { return "包裹已发出,预计" + (1 + new Random().nextInt(3)) + "天后送达"; } } /** * 生成随机城市(模拟物流中转) */ private String getRandomCity() { List = Arrays.asList("北京", "上海", "广州", "深圳", "杭州", "成都", "武汉"); return cities.get(new Random().nextInt(cities.size())); } /** * 调用OpenAI格式化物流信息 */ private String formatLogisticsInfo(String rawData) { String prompt = String.format( "以下是物流原始数据(JSON格式),请将其转换为用户友好的自然语言描述:" + "1. 先说明快递公司和运单号" + "2. 按时间顺序列出物流轨迹(最新状态在前)" + "3. 语言简洁明了,避免技术术语" + "4. 最后给出预计送达时间" + "原始数据:%s", rawData ); return chatClient.call(prompt); } /** * 参数转换 */ private ShopAgentContext convertParamsToContext(Map parameters) { ShopAgentContext context = new ShopAgentContext(); context.setOrderId((String) parameters.get("orderId")); context.setPaymentSuccess((Boolean) parameters.getOrDefault("paymentSuccess", false)); context.setOrderInfo((String) parameters.get("orderInfo")); if (context.getOrderId() == null) { throw new IllegalArgumentException("参数缺失:orderId"); } return context; } } |
3.4 交付智能体(DeliveryAgentService)
|
import com.shop.multiagent.dto.ShopAgentContext; import com.shop.multiagent.framework.Agent; import com.shop.multiagent.framework.Skill; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.openai.OpenAiCompletionsClient; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import java.time.Duration; import java.util.HashMap; import java.util.Map; @Service @Slf4j @Skill( value = "delivery-agent", description = "交付预估与签收提醒智能体", version = "1.0.0" ) public class DeliveryAgentService implements Agent, Skill { private final OpenAiCompletionsClient completionsClient; public DeliveryAgentService(OpenAiCompletionsClient completionsClient) { this.completionsClient = completionsClient; } // ====== Agent接口实现(核心业务逻辑)====== @Override public ShopAgentContext execute(ShopAgentContext context) { log.info("交付智能体执行,订单号:{}", context.getOrderId()); // 1. 支付失败直接返回 if (!context.isPaymentSuccess()) { context.setDeliveryInfo("支付未完成,无法生成交付信息"); return context; } // 2. 校验物流信息 if (context.getLogisticsInfo() == null || context.getLogisticsInfo().contains("失败")) { context.setDeliveryInfo("物流信息异常,暂无法预估交付时间"); return context; } try { // 3. 调用OpenAI生成交付预估信息 String prompt = String.format( "基于以下信息生成交付提醒:" + "1. 订单号:%s" + "2. 物流信息:%s" + "3. 要求:" + " - 明确预计送达时间(精确到日期)" + " - 给出签收注意事项(如:请保持电话畅通、支持代收等)" + " - 说明异常处理方式(如:未收到货如何联系客服)" + " - 语言友好,简洁实用", context.getOrderId(), context.getLogisticsInfo() ); Flux completionsClient.stream(prompt); StringBuilder deliveryInfoBuilder = new StringBuilder(); stream.subscribe( chunk -> deliveryInfoBuilder.append(chunk), e -> log.error("交付智能体OpenAI调用异常,订单号:{}", context.getOrderId(), e), () -> log.info("交付智能体OpenAI调用完成,订单号:{}", context.getOrderId()) ); stream.blockLast(Duration.ofSeconds(3)); String deliveryInfo = deliveryInfoBuilder.toString().trim(); context.setDeliveryInfo(deliveryInfo); // 4. 存储交付详情到扩展字段 context.getAttributes().put("deliveryGeneratedTime", System.currentTimeMillis()); } catch (Exception e) { log.error("交付智能体执行异常,订单号:{}", context.getOrderId(), e); context.setDeliveryInfo("交付信息生成失败,请稍后查询"); } return context; } // ====== Skill接口实现 ====== @Override public String getName() { return this.getClass().getAnnotation(Skill.class).value(); } @Override public String getDescription() { return this.getClass().getAnnotation(Skill.class).description(); } @Override public Object execute(Map Object> parameters) { ShopAgentContext context = convertParamsToContext(parameters); return execute(context); } @Override public Map<String, Object> getMetadata() { Map, Object> metadata = new HashMap.put("version", this.getClass().getAnnotation(Skill.class).version()); metadata.put("supportAsync", true); metadata.put("timeout", 3000); metadata.put("type", "DELIVERY"); return metadata; } // ====== 辅助方法 ====== private ShopAgentContext convertParamsToContext(Map<String, Object> parameters) { ShopAgentContext context = new ShopAgentContext(); context.setOrderId((String) parameters.get("orderId")); context.setPaymentSuccess((Boolean) parameters.getOrDefault("paymentSuccess", false)); context.setLogisticsInfo((String) parameters.get("logisticsInfo")); if (context.getOrderId() == null) { throw new IllegalArgumentException("参数缺失:orderId"); } return context; } } |
3.5 客服智能体(CustomerServiceAgentService)
|
import com.shop.multiagent.dto.ShopAgentContext; import com.shop.multiagent.framework.Agent; import com.shop.multiagent.framework.Skill; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.openai.OpenAiChatClient; import org.springframework.ai.openai.OpenAiEmbeddingClient; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.*; @Service @Slf4j @Skill( value = "customer-service-agent", description = "基于RAG知识库的客服应答智能体", version = "1.0.0" ) public class CustomerServiceAgentService implements Agent, Skill { private final OpenAiEmbeddingClient embeddingClient; private final OpenAiChatClient chatClient; private final RestTemplate restTemplate; private final String RAG_API_URL = "http://localhost:8080/api/rag/search"; // RAG知识库接口 public CustomerServiceAgentService(OpenAiEmbeddingClient embeddingClient, OpenAiChatClient chatClient) { this.embeddingClient = embeddingClient; this.chatClient = chatClient; this.restTemplate = new RestTemplate(); } // ====== Agent接口实现(核心业务逻辑)====== @Override public ShopAgentContext execute(ShopAgentContext context) { log.info("客服智能体执行,订单号:{}", context.getOrderId()); try { // 1. 构造用户咨询问题(基于订单全链路信息) String userQuestion = String.format( "用户咨询订单%s的相关问题,订单信息:%s,支付状态:%s,物流信息:%s,交付信息:%s", context.getOrderId(), context.getOrderInfo(), context.isPaymentSuccess() ? "已支付" : "未支付", context.getLogisticsInfo(), context.getDeliveryInfo() ); log.info("客服智能体用户问题:{}", userQuestion); // 2. 生成问题向量(用于RAG知识库检索) float[] questionEmbedding = embeddingClient.embed(userQuestion).getEmbedding(); // 3. 调用RAG知识库检索相关文档 Map Object> ragRequest = new HashMapRequest.put("orderId", context.getOrderId()); ragRequest.put("embedding", questionEmbedding); ragRequest.put("topK", 3); // 取Top3相关文档 Map Object> ragResponse = restTemplate.postForObject(RAG_API_URL, ragRequest, Map.class); String knowledgeContext = extractKnowledgeContext(ragResponse); log.info("订单{}RAG检索结果:{}", context.getOrderId(), knowledgeContext); // 4. 调用OpenAI生成客服应答 String prompt = String.format( "作为电商平台客服,基于以下知识库和订单信息回复用户:" + "1. 知识库内容:%s" + "2. 订单信息:%s" + "3. 回复要求:" + " - 语气友好、专业" + " - 直接回答用户问题,不冗余" + " - 若有异常情况,提供解决方案" + " - 结尾引导用户有疑问随时联系", knowledgeContext, userQuestion ); String csReply = chatClient.call(prompt); context.setCustomerServiceReply(csReply); // 5. 存储客服应答详情 context.getAttributes().put("csReplyGeneratedTime", System.currentTimeMillis()); context.getAttributes().put("ragSearchCount", ragResponse != null ? ragResponse.get("count") : 0); } catch (Exception e) { log.error("客服智能体执行异常,订单号:{}", context.getOrderId(), e); context.setCustomerServiceReply("非常抱歉,当前客服系统繁忙,请稍后再试(订单号:" + context.getOrderId() + ")"); } return context; } // ====== Skill接口实现 ====== @Override public String getName() { return this.getClass().getAnnotation(Skill.class).value(); } @Override public String getDescription() { return this.getClass().getAnnotation(Skill.class).description(); } @Override public Object execute(Map Object> parameters) { ShopAgentContext context = convertParamsToContext(parameters); return execute(context); } @Override public Map<String, Object> getMetadata() { Map, Object> metadata = new HashMap.put("version", this.getClass().getAnnotation(Skill.class).version()); metadata.put("supportAsync", true); metadata.put("timeout", 6000); // RAG+AI生成超时时间较长(6秒) metadata.put("type", "CUSTOMER_SERVICE"); return metadata; } // ====== 辅助方法 ====== /** * 提取RAG知识库返回的上下文 */ private String extractKnowledgeContext(Map, Object> ragResponse) { if (ragResponse == null || !ragResponse.containsKey("documents")) { return "知识库暂无相关信息"; } List<Map, String>> documents = (List<Map>>) ragResponse.get("documents"); StringBuilder contextBuilder = new StringBuilder(); for (Map String> doc : documents) { contextBuilder.append("- ").append(doc.get("content")).append("\n"); } return contextBuilder.toString().trim(); } /** * 参数转换 */ private ShopAgentContext convertParamsToContext(Map) { ShopAgentContext context = new ShopAgentContext(); context.setOrderId((String) parameters.get("orderId")); context.setOrderInfo((String) parameters.get("orderInfo")); context.setPaymentSuccess((Boolean) parameters.getOrDefault("paymentSuccess", false)); context.setLogisticsInfo((String) parameters.get("logisticsInfo")); context.setDeliveryInfo((String) parameters.get("deliveryInfo")); if (context.getOrderId() == null) { throw new IllegalArgumentException("参数缺失:orderId"); } return context; } } |
四、Skill 框架核心组件
4.1 Skill 工厂(SkillFactory)
|
import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Collection; /** * Skill工厂(动态加载、管理所有Skill组件) */ @Component public class SkillFactory { // 存储Skill:key=Skill名称,value=Skill实例 private final MapMap = new HashMap /** * 构造器注入Spring上下文,自动扫描所有@Skill注解的Bean */ public SkillFactory(ApplicationContext applicationContext) { // 扫描所有标注@Skill注解的Bean Map skillBeans = applicationContext.getBeansWithAnnotation(com.shop.multiagent.framework.Skill.class); // 初始化SkillMap for (Map.Entry, Object> entry : skillBeans.entrySet()) { Object bean = entry.getValue(); // 校验Bean是否实现Skill接口 if (bean instanceof Skill) { Skill skill = (Skill) bean; com.shop.multiagent.framework.Skill skillAnnotation = bean.getClass().getAnnotation(com.shop.multiagent.framework.Skill.class); String skillName = skillAnnotation.value(); // 校验Skill名称唯一性 if (skillMap.containsKey(skillName)) { throw new IllegalStateException("Skill名称重复:" + skillName); } skillMap.put(skillName, skill); log.info("Skill工厂加载成功:{}(描述:{},版本:{})", skillName, skillAnnotation.description(), skillAnnotation.version()); } else { log.warn("Bean {} 标注了@Skill注解,但未实现Skill接口,跳过加载", entry.getKey()); } } log.info("Skill工厂初始化完成,共加载{}个Skill", skillMap.size()); } /** * 根据名称获取Skill实例 */ public Skill getSkill(String skillName) { if (skillName == null) { throw new IllegalArgumentException("Skill名称不能为空"); } return skillMap.get(skillName); } /** * 检查Skill是否存在 */ public boolean hasSkill(String skillName) { return skillMap.containsKey(skillName); } /** * 获取所有Skill名称 */ public Collection> getSkillNames() { return skillMap.keySet(); } /** * 获取所有Skill实例 */ public Collection> getAllSkills() { return skillMap.values(); } } |
4.2 线程池选择器(SkillThreadPoolSelector)
|
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * 根据Skill元数据动态选择线程池 */ @Component public class SkillThreadPoolSelector { // 通用线程池(处理异步非工具类任务) private final ThreadPoolTaskExecutor defaultExecutor; // 工具类线程池(处理支付、物流等第三方接口调用) private final ThreadPoolTaskExecutor toolExecutor; @Autowired public SkillThreadPoolSelector( @Qualifier("defaultTaskExecutor") ThreadPoolTaskExecutor defaultExecutor, @Qualifier("toolTaskExecutor") ThreadPoolTaskExecutor toolExecutor) { this.defaultExecutor = defaultExecutor; this.toolExecutor = toolExecutor; } /** * 根据Skill选择合适的线程池 */ public Executor selectExecutor(Skill skill) { if (skill == null) { throw new IllegalArgumentException("Skill不能为空"); } Map> metadata = skill.getMetadata(); String skillName = skill.getName(); String skillType = (String) metadata.getOrDefault("type", "DEFAULT"); // 1. 支付、物流类Skill使用工具线程池(第三方接口调用) if ("PAYMENT".equals(skillType) || "LOGISTICS".equals(skillType)) { log.debug("Skill {} 选择工具线程池(toolTaskExecutor)", skillName); return toolExecutor; } // 2. 支持异步的Skill使用通用线程池 if (Boolean.TRUE.equals(metadata.getOrDefault("supportAsync", false))) { log.debug("Skill {} 选择通用线程池(defaultTaskExecutor)", skillName); return defaultExecutor; } // 3. 其他情况使用单线程池(同步执行) log.debug("Skill {} 选择同步线程池(SingleThreadExecutor)", skillName); return Executors.newSingleThreadExecutor(); } } |
五、流程编排(StateGraph)
5.1 工作流服务(ShopWorkflowService)
|
import com.shop.multiagent.dto.ShopAgentContext; import com.shop.multiagent.framework.Skill; import com.shop.multiagent.framework.SkillFactory; import com.shop.multiagent.framework.SkillThreadPoolSelector; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.stateflow.StateGraph; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** * 购物流程编排服务(基于StateGraph) */ @Service @Slf4j public class ShopWorkflowService { private final SkillFactory skillFactory; private final SkillThreadPoolSelector threadPoolSelector; @Autowired public ShopWorkflowService(SkillFactory skillFactory, SkillThreadPoolSelector threadPoolSelector) { this.skillFactory = skillFactory; this.threadPoolSelector = threadPoolSelector; } /** * 构建购物流程StateGraph */ public StateGraph> buildWorkflow() { return StateGraph.AgentContext>builder() // 1. 下单节点(调用order-agent Skill) .step("order", context -> { log.info("流程执行:下单节点,订单号:{}", context.getOrderId()); return executeSkill("order-agent", context); }) // 2. 支付节点(调用payment-agent Skill) .step("payment", context -> { log.info("流程执行:支付节点,订单号:{}", context.getOrderId()); return executeSkill("payment-agent", context); }) // 3. 物流节点(调用logistics-agent Skill) .step("logistics", context -> { log.info("流程执行:物流节点,订单号:{}", context.getOrderId()); return executeSkill("logistics-agent", context); }) // 4. 交付节点(调用delivery-agent Skill) .step("delivery", context -> { log.info("流程执行:交付节点,订单号:{}", context.getOrderId()); return executeSkill("delivery-agent", context); }) // 5. 客服节点(调用customer-service-agent Skill) .step("customerService", context -> { log.info("流程执行:客服节点,订单号:{}", context.getOrderId()); return executeSkill("customer-service-agent", context); }) // 流程顺序定义 .edge("order", "payment") // 下单 → 支付 .edge("payment", "logistics") // 支付 → 物流 .edge("logistics", "delivery") // 物流 → 交付 .edge("delivery", "customerService") // 交付 → 客服 // 起始节点 .start("order") // 错误处理(每个节点单独配置错误处理器) .onError("order", this::handleOrderError) .onError("payment", this::handlePaymentError) .onError("logistics", this::handleLogisticsError) .onError("delivery", this::handleDeliveryError) .onError("customerService", this::handleCustomerServiceError) // 构建StateGraph .build(); } /** * 执行Skill(统一调度逻辑) */ private ShopAgentContext executeSkill(String skillName, ShopAgentContext context) { // 1. 获取Skill实例 Skill skill = skillFactory.getSkill(skillName); if (skill == null) { throw new RuntimeException("未找到Skill:" + skillName); } // 2. 转换上下文为Skill参数 Map> params = convertContextToParams(context); // 3. 选择线程池 Executor executor = threadPoolSelector.selectExecutor(skill); // 4. 异步执行Skill(根据Skill元数据决定是否异步) CompletableFutureAgentContext> future = CompletableFuture.supplyAsync( () -> { log.debug("Skill {} 开始执行,订单号:{}", skillName, context.getOrderId()); try { // 执行Skill并转换结果 Object result = skill.execute(params); if (!(result instanceof ShopAgentContext)) { throw new RuntimeException("Skill " + skillName + " 执行结果类型错误,应为ShopAgentContext"); } return (ShopAgentContext) result; } catch (Exception e) { log.error("Skill {} 执行异常,订单号:{}", skillName, context.getOrderId(), e); throw e; } }, executor ); // 5. 等待执行结果(同步阻塞,确保流程顺序执行) return future.join(); } /** * 上下文转换为Skill参数 */ private MapContextToParams(ShopAgentContext context) { Map = new HashMap params.put("orderId", context.getOrderId()); params.put("userId", context.getUserId()); params.put("productInfo", context.getProductInfo()); params.put("tradeNo", context.getTradeNo()); params.put("orderInfo", context.getOrderInfo()); params.put("paymentSuccess", context.isPaymentSuccess()); params.put("logisticsInfo", context.getLogisticsInfo()); params.put("deliveryInfo", context.getDeliveryInfo()); params.put("attributes", context.getAttributes()); return params; } /** * 节点错误处理器(下单节点) */ private void handleOrderError(ShopAgentContext context, Throwable throwable) { log.error("下单节点执行失败,订单号:{}", context.getOrderId(), throwable); context.getAttributes().put("errorNode", "order"); context.getAttributes().put("errorMessage", throwable.getMessage()); // 下单失败直接终止流程 throw new RuntimeException("下单失败:" + throwable.getMessage(), throwable); } /** * 支付节点错误处理器 */ private void handlePaymentError(ShopAgentContext context, Throwable throwable) { log.error("支付节点执行失败,订单号:{}", context.getOrderId(), throwable); context.setPaymentSuccess(false); context.getAttributes().put("errorNode", "payment"); context.getAttributes().put("errorMessage", throwable.getMessage()); // 支付失败仍继续流程(后续节点会处理未支付状态) } /** * 物流节点错误处理器 */ private void handleLogisticsError(ShopAgentContext context, Throwable throwable) { log.error("物流节点执行失败,订单号:{}", context.getOrderId(), throwable); context.setLogisticsInfo("物流查询失败:" + throwable.getMessage()); context.getAttributes().put("errorNode", "logistics"); context.getAttributes().put("errorMessage", throwable.getMessage()); } /** * 交付节点错误处理器 */ private void handleDeliveryError(ShopAgentContext context, Throwable throwable) { log.error("交付节点执行失败,订单号:{}", context.getOrderId(), throwable); context.setDeliveryInfo("交付信息生成失败:" + throwable.getMessage()); context.getAttributes().put("errorNode", "delivery"); context.getAttributes().put("errorMessage", throwable.getMessage()); } /** * 客服节点错误处理器 */ private void handleCustomerServiceError(ShopAgentContext context, Throwable throwable) { log.error("客服节点执行失败,订单号:{}", context.getOrderId(), throwable); context.setCustomerServiceReply("客服应答生成失败:" + throwable.getMessage()); context.getAttributes().put("errorNode", "customerService"); context.getAttributes().put("errorMessage", throwable.getMessage()); } /** * 执行完整购物流程 */ public ShopAgentContext runWorkflow(ShopAgentContext initialContext) { log.info("开始执行购物流程,订单号:{}", initialContext.getOrderId()); // 校验初始参数 validateInitialContext(initialContext); // 构建并执行StateGraph StateGraph> stateGraph = buildWorkflow(); ShopAgentContext result = stateGraph.execute(initialContext); log.info("购物流程执行完成,订单号:{}", result.getOrderId()); return result; } /** * 校验初始上下文参数 */ private void validateInitialContext(ShopAgentContext context) { if (context.getOrderId() == null) { throw new IllegalArgumentException("初始参数缺失:orderId"); } if (context.getUserId() == null) { throw new IllegalArgumentException("初始参数缺失:userId"); } if (context.getProductInfo() == null) { throw new IllegalArgumentException("初始参数缺失:productInfo"); } if (context.getTradeNo() == null) { throw new IllegalArgumentException("初始参数缺失:tradeNo"); } } } |
六、配置类(OpenAI、支付宝、线程池)
6.1 线程池配置(ThreadPoolConfig)
|
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * 线程池配置 */ @Configuration public class ThreadPoolConfig { /** * 通用任务线程池(处理异步非工具类任务) */ @Bean("defaultTaskExecutor") public ThreadPoolTaskExecutor defaultTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // 核心线程数 executor.setMaxPoolSize(20); // 最大线程数 executor.setQueueCapacity(100); // 队列容量 executor.setThreadNamePrefix("default-task-"); // 线程名称前缀 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(调用者执行) executor.setKeepAliveSeconds(60); // 空闲线程存活时间 executor.initialize(); return executor; } /** * 工具类任务线程池(处理支付、物流等第三方接口调用) */ @Bean("toolTaskExecutor") public ThreadPoolTaskExecutor toolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(8); // 核心线程数(工具调用并发不宜过高) executor.setMaxPoolSize(15); // 最大线程数 executor.setQueueCapacity(50); // 队列容量 executor.setThreadNamePrefix("tool-task-"); // 线程名称前缀 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略(直接抛出异常) executor.setKeepAliveSeconds(30); // 空闲线程存活时间 executor.initialize(); return executor; } } |
6.2 OpenAI 配置(OpenAiConfig)
|
import org.springframework.ai.openai.OpenAiApi; import org.springframework.ai.openai.OpenAiChatClient; import org.springframework.ai.openai.OpenAiCompletionsClient; import org.springframework.ai.openai.OpenAiEmbeddingClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * OpenAI客户端配置 */ @Configuration public class OpenAiConfig { @Value("${spring.ai.openai.api-key}") private String apiKey; @Value("${spring.ai.openai.api-endpoint:https://api.openai.com/v1}") private String apiEndpoint; /** * OpenAI API客户端 */ @Bean public OpenAiApi openAiApi() { return new OpenAiApi(apiEndpoint, apiKey); } /** * 文本补全客户端(用于流式生成) */ @Bean public OpenAiCompletionsClient openAiCompletionsClient() { return new OpenAiCompletionsClient(openAiApi()); } /** * 聊天客户端(用于格式化、客服应答) */ @Bean public OpenAiChatClient openAiChatClient() { return new OpenAiChatClient(openAiApi()); } /** * 嵌入客户端(用于RAG知识库检索) */ @Bean public OpenAiEmbeddingClient openAiEmbeddingClient() { return new OpenAiEmbeddingClient(openAiApi()); } } |
6.3 支付宝配置(AlipayConfig)
|
import com.alipay.api.AlipayClient; import com.alipay.api.DefaultAlipayClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 支付宝MCP客户端配置 */ @Configuration public class AlipayConfig { @Value("${alipay.app-id}") private String appId; @Value("${alipay.private-key}") private String privateKey; // 商户私钥 @Value("${alipay.alipay-public-key}") private String alipayPublicKey; // 支付宝公钥 @Value("${alipay.gateway-url:https://openapi.alipay.com/gateway.do}") private String gatewayUrl; @Value("${alipay.charset:UTF-8}") private String charset; @Value("${alipay.sign-type:RSA2}") private String signType; /** * 支付宝客户端实例 */ @Bean public AlipayClient alipayClient() { return new DefaultAlipayClient( gatewayUrl, appId, privateKey, "json", charset, alipayPublicKey, signType ); } } |
七、控制器(对外接口)
7.1 购物流程控制器(ShopAgentController)
|
import com.shop.multiagent.dto.ShopAgentContext; import com.shop.multiagent.dto.ErrorResponse; import com.shop.multiagent.service.ShopWorkflowService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 购物流程对外接口 */ @RestController @RequestMapping("/api/shop") @Slf4j public class ShopAgentController { private final |
八、RAG 知识库集成
8.1 RAG 服务接口
创建 RAG 知识库服务接口:
|
public interface RagService { void initializeKnowledgeBase();
void addDocument(String documentId, String content, Map<String, Object> metadata);
List<Document> search(String query, int topK);
String generateResponse(String query, List<Document> relatedDocuments); } |
8.2 向量数据库配置
配置向量数据库(以 Redis 为例):
|
@Configuration public class VectorDbConfig { @Value("${vector.db.type:redis}") private String dbType;
@Value("${vector.db.host:localhost}") private String host;
@Value("${vector.db.port:6379}") private int port;
@Value("${vector.db.password:}") private String password;
@Value("${vector.db.database:0}") private int database;
@Bean public VectorStore vectorStore() { switch (dbType) { case "redis": return new RedisVectorStore(host, port, password, database); case "milvus": return new MilvusVectorStore(); default: return new InMemoryVectorStore(); } } } |
8.3 RAG 实现类
实现 RAG 核心逻辑:
|
@Service public class RagServiceImpl implements RagService { private final VectorStore vectorStore; private final OpenAiEmbeddingClient embeddingClient;
public RagServiceImpl(VectorStore vectorStore, OpenAiEmbeddingClient embeddingClient) { this.vectorStore = vectorStore; this.embeddingClient = embeddingClient; }
@Override public void initializeKnowledgeBase() { // 初始化知识库,加载预定义文档 List<Document> initialDocs = Arrays.asList( new Document("cs_faq_001", "顺丰物流江浙沪1天达", Collections.singletonMap("type", "logistics")), new Document("cs_faq_002", "交付异常可联系400-123-4567", Collections.singletonMap("type", "support")), new Document("cs_faq_003", "签收后7天无理由退换", Collections.singletonMap("type", "return")) );
for (Document doc : initialDocs) { addDocument(doc.getId(), doc.getContent(), doc.getMetadata()); } }
@Override public void addDocument(String documentId, String content, Map<String, Object> metadata) { try { float[] embedding = embeddingClient.embed(content).getEmbedding(); vectorStore.addDocument(documentId, content, embedding, metadata); } catch (Exception e) { log.error("文档添加失败:", e); } }
@Override public List<Document> search(String query, int topK) { try { float[] queryEmbedding = embeddingClient.embed(query).getEmbedding(); return vectorStore.search(queryEmbedding, topK); } catch (Exception e) { log.error("搜索失败:", e); return Collections.emptyList(); } }
@Override public String generateResponse(String query, List<Document> relatedDocuments) { // 构建上下文 StringBuilder contextBuilder = new StringBuilder(); contextBuilder.append("以下是相关知识库内容:\n");
for (Document doc : relatedDocuments) { contextBuilder.append("- ").append(doc.getContent()).append("\n"); }
contextBuilder.append("\n用户问题:").append(query);
// 使用OpenAI生成回答 String prompt = String.format( "基于以上知识库内容,回答用户的问题:%s", contextBuilder.toString() );
return openAiChatClient.call(prompt); } } |
九、Skill 框架设计
9.1 Skill 接口定义
定义 Skill 接口规范:
|
public interface Skill { String getName();
String getDescription();
Object execute(Map<String, Object> parameters);
Map<String, Object> getMetadata(); } |
9.2 Skill 工厂类
创建 Skill 工厂用于动态加载和管理:
|
@Component public class SkillFactory { private final Map<String, Skill> skills = new HashMap<>();
@Autowired public SkillFactory(ApplicationContext applicationContext) { // 自动扫描所有标注@Skill注解的Bean Map<String, Object> skillBeans = applicationContext.getBeansWithAnnotation(Skill.class);
for (Map.Entry<String, Object> entry : skillBeans.entrySet()) { Object bean = entry.getValue(); if (bean instanceof Skill) { Skill skill = (Skill) bean; Skill annotation = bean.getClass().getAnnotation(Skill.class); skills.put(annotation.value(), skill); } } }
public Skill getSkill(String name) { return skills.get(name); }
public boolean hasSkill(String name) { return skills.containsKey(name); }
public Collection<Skill> getAllSkills() { return skills.values(); } } |
9.3 Skill 注解定义
创建自定义 Skill 注解:
|
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface Skill { String value();
String description() default "";
String version() default "1.0.0"; } |
十、异常处理与监控
10.1 全局异常处理器
实现全局异常处理:
|
@RestControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler(Exception.class) public ResponseEntity<ErrorResponse> handleException(Exception e) { ErrorResponse error = new ErrorResponse(); error.setErrorCode("INTERNAL_SERVER_ERROR"); error.setErrorMessage("系统内部错误,请稍后重试"); error.setDetails(e.getMessage());
log.error("全局异常处理:", e); return ResponseEntity.status(500).body(error); }
@ExceptionHandler(AlipayApiException.class) public ResponseEntity<ErrorResponse> handleAlipayException(AlipayApiException e) { ErrorResponse error = new ErrorResponse(); error.setErrorCode("ALIPAY_ERROR"); error.setErrorMessage("支付宝接口调用失败"); error.setDetails(e.getErrMsg());
return ResponseEntity.status(400).body(error); }
@ExceptionHandler(OpenAiException.class) public ResponseEntity<ErrorResponse> handleOpenAiException(OpenAiException e) { ErrorResponse error = new ErrorResponse(); error.setErrorCode("OPENAI_ERROR"); error.setErrorMessage("OpenAI接口调用失败"); error.setDetails(e.getMessage());
return ResponseEntity.status(400).body(error); } } |
10.2 监控与日志配置
配置系统监控:
|
@Configuration public class MonitoringConfig { @Bean public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() { return registry -> registry.config().commonTags("application", "shop-multi-agent", "version", "1.0.0"); }
@Bean public HealthIndicator paymentHealthIndicator(AlipayClient alipayClient) { return () -> { try { AlipaySystemOauthTokenRequest request = new AlipaySystemOauthTokenRequest(); request.setGrantType("client_credentials"); AlipayClient alipayClient = new DefaultAlipayClient( "https://openapi.alipay.com/gateway.do", "app_id", "private_key", "json", "UTF-8", "public_key", "RSA2" ); AlipaySystemOauthTokenResponse response = alipayClient.execute(request); return Health.up().withDetail("status", "OK").build(); } catch (Exception e) { return Health.down().withDetail("error", e.getMessage()).build(); } }; } } |
十一、性能优化策略
11.1 缓存机制
实现智能缓存优化:
|
@Component public class ResultCache { private final LoadingCache<String, Object> cache;
public ResultCache() { this.cache = Caffeine.newBuilder() .maximumSize(1000) .expireAfterWrite(1, TimeUnit.HOURS) .recordStats() .build(key -> loadResult(key)); }
private Object loadResult(String key) { // 从数据库或其他数据源加载 return null; }
public void put(String key, Object value) { cache.put(key, value); }
public Optional<Object> get(String key) { return Optional.ofNullable(cache.getIfPresent(key)); }
public void invalidate(String key) { cache.invalidate(key); }
public CacheStats stats() { return cache.stats(); } } |
11.2 批处理优化
实现智能体批处理:
|
@Service public class BatchProcessingService { private final ThreadPoolExecutor batchExecutor;
public BatchProcessingService() { batchExecutor = new ThreadPoolExecutor( 10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy() ); }
public void processBatch(List<ShopAgentContext> contexts) { for (ShopAgentContext context : contexts) { batchExecutor.submit(() -> { try { ShopWorkflowService workflowService = SpringContextHolder.getBean(ShopWorkflowService.class); workflowService.runWorkflow(context); } catch (Exception e) { log.error("批处理异常:", e); } }); } } } |
十二、测试用例设计
12.1 单元测试
编写智能体单元测试:
|
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class AgentTest { @Autowired private OrderAgentService orderAgent;
@Autowired private PaymentAgentService paymentAgent;
@Test public void testOrderAgent() { ShopAgentContext context = new ShopAgentContext(); context.setOrderId("SHOP_001"); context.setUserId("USER_1001"); context.setProductInfo("华为Mate60 Pro 128G");
ShopAgentContext result = orderAgent.execute(context);
assertNotNull(result.getOrderInfo()); assertTrue(result.getOrderInfo().startsWith("{")); }
@Test public void testPaymentAgentSuccess() { ShopAgentContext context = new ShopAgentContext(); context.setOrderId("SHOP_001"); context.setTradeNo("2024060122001410086000000001");
// 使用Mockito模拟支付宝响应 AlipayTradeQueryResponse response = mock(AlipayTradeQueryResponse.class); when(response.isSuccess()).thenReturn(true); when(response.getTradeStatus()).thenReturn("TRADE_SUCCESS");
ShopAgentContext result = paymentAgent.execute(context);
assertTrue(result.isPaymentSuccess()); } } |
12.2 集成测试
编写完整流程集成测试:
|
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class IntegrationTest { @Autowired private TestRestTemplate restTemplate;
@Test public void testFullWorkflow() { ShopAgentContext initialContext = new ShopAgentContext(); initialContext.setOrderId("SHOP_001"); initialContext.setUserId("USER_1001"); initialContext.setProductInfo("华为Mate60 Pro 128G"); initialContext.setTradeNo("2024060122001410086000000001");
ResponseEntity<ShopAgentContext> response = restTemplate.postForEntity( "/api/shop/run-workflow", initialContext, ShopAgentContext.class );
ShopAgentContext result = response.getBody();
assertNotNull(result); assertNotNull(result.getOrderInfo()); assertTrue(result.isPaymentSuccess()); assertNotNull(result.getLogisticsInfo()); assertNotNull(result.getDeliveryInfo()); assertNotNull(result.getCustomerServiceReply()); } } |
十三、部署与运行
13.1 部署配置
创建 Dockerfile 用于容器化部署:
|
FROM openjdk:17-slim WORKDIR /app COPY target/shop-multi-agent-1.0.0.jar app.jar EXPOSE 8080 ENTRYPOINT ["java", "-jar", "app.jar", "--spring.profiles.active=prod"] |
13.2 运行脚本
创建启动脚本:
|
#!/bin/bash set -e # 设置环境变量 export OPENAI_API_KEY="your-openai-api-key" export ALIPAY_APP_ID="your-alipay-app-id" export ALIPAY_PRIVATE_KEY="your-alipay-private-key" export ALIPAY_PUBLIC_KEY="your-alipay-public-key" # 启动应用 java -jar target/shop-multi-agent-1.0.0.jar \ --server.port=8080 \ --spring.profiles.active=prod |
13.3 配置文件
application-prod.yml 配置:
|
server: port: 8080 servlet: context-path: /api spring: profiles: prod application: name: shop-multi-agent main: allow-bean-definition-overriding: true # 数据库配置(如果有) # datasource: # url: jdbc:mysql://localhost:3306/shop # username: root # password: password # 日志配置 logging: level: root: INFO com.shop.multiagent: DEBUG file: name: /var/log/shop-multi-agent.log max-size: 10MB total-size-capacity: 100MB |
十四、架构设计文档
14.1 系统架构总图
|
系统架构总图 ├── 表现层 │ ├── REST API接口 │ └── 监控界面 ├── 应用层 │ ├── 控制器层 │ ├── 服务层 │ │ ├── 智能体服务 │ │ ├── 流程编排服务 │ │ └── 工具服务 │ └── 业务逻辑层 ├── 数据层 │ ├── 订单数据 │ ├── 用户数据 │ ├── 商品数据 │ └── 支付数据 └── 基础设施层 ├── 线程池 ├── 缓存 ├── 日志 └── 监控 |
14.2 核心组件交互图
|
核心组件交互图 用户 → 控制器 → 工作流服务 → StateGraph ↑ ↑ ↑ │ │ └─ 节点执行器 │ └─ 智能体管理器 └─ 响应处理器 |
14.3 智能体协作时序图
|
智能体协作时序图 用户发送请求 ↓ 控制器接收 ↓ 工作流服务启动 ↓ StateGraph开始执行 ↓ OrderAgent执行 ↓ PaymentAgent执行 ↓ LogisticsAgent执行 ↓ DeliveryAgent执行 ↓ CustomerServiceAgent执行 ↓ 返回最终结果 |
14.4 技术选型决策矩阵
|
技术选型 |
优势 |
劣势 |
风险 |
决策 |
|
Spring AI |
与 Spring 生态无缝集成 |
仍在快速迭代 |
版本兼容性 |
采用 |
|
OpenAI |
成熟稳定 |
成本较高 |
服务依赖 |
采用 |
|
支付宝 MCP |
官方支持 |
文档较少 |
接口变更 |
采用 |
|
Redis 向量数据库 |
性能优秀 |
内存占用 |
数据持久化 |
采用 |
|
StateGraph |
流程清晰 |
学习成本 |
复杂性 |
采用 |
14.5 性能指标预期
|
指标 |
目标值 |
测量方法 |
|
响应时间 |
< 2 秒(95%) |
监控 API 调用耗时 |
|
并发能力 |
1000 QPS |
压力测试 |
|
成功率 |
> 99.9% |
统计成功 / 失败次数 |
|
资源占用 |
< 2GB 内存 |
监控系统资源 |
|
延迟 |
< 500ms |
测量端到端延迟 |
14.6 扩展性设计说明
系统的扩展性通过以下机制实现:
插件化架构:每个智能体都是独立的插件,可以动态加载和替换
接口抽象:通过统一的 Agent 接口和 Skill 接口,支持不同实现
事件驱动:智能体间通过事件通信,降低直接依赖
配置驱动:大部分业务规则可以通过配置文件修改
模块化设计:系统按功能模块划分,各模块独立开发和部署
14.7 安全性设计
系统安全性考虑:
认证授权:所有 API 接口需要身份认证
数据加密:敏感数据如支付信息需要加密存储和传输
访问控制:不同用户有不同的访问权限
审计日志:所有关键操作都需要记录审计日志
限流保护:防止恶意攻击和滥用
总结
本系统基于 Spring AI 框架构建了一个完整的购物场景多智能体协作系统。通过 5 个独立智能体的协同工作,实现了 "下单 - 支付 - 物流 - 交付 - 客服" 全流程的智能化管理。系统采用了先进的技术架构,包括 StateGraph 流程编排、线程池管理、OpenAI 原生接口集成、支付宝 MCP 工具集成、RAG 知识库等核心技术。
系统设计严格遵循低耦合高扩展的原则,通过 Skill 设计模式、事件驱动架构、模块化设计等手段,确保系统具有良好的可维护性和可扩展性。每个智能体都可以独立开发、测试和部署,大大提高了开发效率和系统的可靠性。
在实际应用中,该系统可以处理高并发的购物请求,提供个性化的购物体验,同时通过智能化的客服系统提升用户满意度。通过完善的监控和告警机制,确保系统的稳定运行。
未来,可以通过增加更多的智能体、优化 AI 模型、扩展知识库等方式,进一步提升系统的智能化水平和用户体验。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)