构建一个完整的基于 Spring AI 框架的购物场景多智能体协作系统。这个系统将严格按照 "下单 - 支付 - 物流 - 交付 - 客服" 的业务流程,通过 5 个独立智能体的协同工作,实现全链路智能化、低耦合、高扩展的购物流程管理。

一、系统架构设计

1.1 整体架构概述

本系统采用分层架构设计,严格遵循 Spring 生态的设计原则,确保系统的可维护性和可扩展性。整体架构分为四个核心层次:

┌─────────────────┐

│ Controller层 │ 对外暴露REST接口,接收请求并触发流程

├─────────────────┤

│ Service层 │ 核心业务层:5个智能体+StateGraph流程编排

├─────────────────┤

│ Config层 │ 配置OpenAI/支付宝客户端,依赖注入

├─────────────────┤

│ DTO层 │ 定义上下文/请求模型,统一数据传递格式

└─────────────────┘

系统的核心设计理念是 **"分工协作,智能优化"**。每个智能体专注于单一业务领域的处理,通过标准化的接口进行通信和数据交换。这种设计不仅降低了系统的耦合度,还使得每个智能体可以独立开发、测试和维护。

1.2 多智能体架构模式

系统采用 **"1 个 Supervisor 智能体 + 5 个专业智能体"** 的架构模式(9)。Supervisor 作为核心调度节点,负责接收用户需求、分配任务、协调各智能体交互,并监控任务执行状态。5 个专业智能体分别对应购物流程的 5 个环节:商品智能体、购买智能体、支付智能体、订单智能体、物流智能体、客服智能体。

同时,系统还采用了 **"去中心化 + 中心化" 混合架构 **(12)。既保留各智能体的独立决策能力,又通过一个 "协同中枢" 统一接收、分发任务。这种设计使得系统在保持灵活性的同时,又能确保整体流程的可控性。

1.3 线程池架构设计

系统采用分层线程池架构,针对不同类型任务设计专用线程池,避免资源竞争和任务阻塞。核心线程池配置如下:

线程池名称

核心线程数

最大线程数

队列容量

适用场景

DEFAULT_TASK_EXECUTOR

100

200

1024

通用任务处理

taskExecutorService

100

200

100

工作流任务执行

nodeExecutorService

100

200

100

工作流节点执行

TOOL_TASK_EXECUTOR

40

50

50

工具调用任务

这种设计的优势在于:

资源隔离:工具调用任务(如 API 请求)与工作流执行任务分离,避免某类任务异常影响整体系统

弹性伸缩:通过动态线程数适应负载变化

上下文传递:所有线程池均通过 ThreadLocal 传递请求上下文

1.4 StateGraph 流程编排

系统采用 Spring AI Alibaba Graph 作为 Agent 编排背后的核心引擎(18)。StateGraph 的核心概念包括:

StateGraph(状态图):用于定义节点和边

Node(节点):封装具体操作或模型调用

Edge(边):表示节点间的跳转关系

OverAllState(全局状态):贯穿流程共享数据

StateGraph 采用 **"构建→编译→执行" 三阶段模式 **(22)

构建阶段:定义 StateGraph,注册节点和边

编译阶段:将 StateGraph 编译为 CompiledGraph,进行结构校验和优化

执行阶段:CompiledGraph 驱动状态流转,执行节点逻辑

1.5 低耦合高扩展设计原则

系统严格遵循高内聚低耦合的设计原则(40)。低耦合意味着:

模块之间依赖关系应该尽量少

模块之间的变更应该不会影响到其他模块

模块之间通过定义良好的接口进行通信

高扩展设计通过以下机制实现:

Skill 设计模式:每个智能体作为独立的 Skill 实现,支持动态加载和替换

接口抽象:通过统一的 Agent 接口屏蔽不同智能体的实现差异

事件驱动:智能体间通过事件机制通信,避免直接依赖

二、核心技术选型

系统采用的核心技术栈包括:

技术层

技术选型

版本

说明

基础框架

Spring Boot

3.2+

快速开发框架

AI 框架

Spring AI

1.0.0-M1

AI 能力集成

流程编排

Spring AI StateGraph

1.0.0-M1

智能体流程管理

AI 服务

OpenAI 原生 API

-

大语言模型服务

支付工具

支付宝 MCP

4.38.0.ALL

支付处理

知识库

RAG + 向量数据库

-

智能问答

三、智能体设计与实现

完整可运行的多智能体协作系统代码(含Skill调用)

一、核心接口定义

1.1 Agent 接口

import com.shop.multiagent.dto.ShopAgentContext;

/**

 * 智能体统一接口

 */

public interface Agent {

    ShopAgentContext execute(ShopAgentContext context);

}

1.2 Skill 接口

import java.util.Map;

/**

 * Skill统一接口(插件化调度核心)

 */

public interface Skill {

    /**

     * Skill名称(唯一标识)

     */

    String getName();

    /**

     * Skill描述

     */

    String getDescription();

    /**

     * 执行Skill(统一参数格式)

     * @param parameters 入参(标准化Map格式)

     * @return 执行结果

     */

    Object execute(Map parameters);

    /**

     * Skill元数据(版本、超时时间等)

     */

    Map<String, Object> getMetadata();

}

1.3 Skill 注解

import java.lang.annotation.*;

/**

 * Skill注解(用于标识智能体为Skill组件)

 */

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

@Documented

public @interface Skill {

    /**

     * Skill名称(唯一)

     */

    String value();

    /**

     * Skill描述

     */

    String description() default "";

    /**

     * 版本号

     */

    String version() default "1.0.0";

}

二、数据传输对象(DTO)

2.1 ShopAgentContext(共享上下文)

import lombok.Data;

import java.util.HashMap;

import java.util.Map;

/**

 * 智能体共享上下文(传递订单号、状态等共享数据)

 */

@Data

public class ShopAgentContext {

    // 核心共享字段

    private String orderId;          // 订单ID(全局共享)

    private String productInfo;      // 商品信息

    private String userId;           // 用户ID

    private String tradeNo;          // 支付宝交易号

    // 各智能体输出结果

    private String orderInfo;        // 下单智能体输出(JSON格式)

    private boolean paymentSuccess;  // 支付智能体结果(成功/失败)

    private String logisticsInfo;    // 物流智能体结果(物流轨迹)

    private String deliveryInfo;     // 交付智能体输出(预计送达时间)

    private String customerServiceReply; // 客服智能体输出(应答内容)

    // 扩展字段(存储临时数据)

    private Map<String, Object> attributes = new HashMap

2.2 错误响应 DTO

import lombok.Data;

/**

 * 全局错误响应

 */

@Data

public class ErrorResponse {

    private String errorCode;

    private String errorMessage;

    private String details;

}

三、智能体实现(完整语法正确)

3.1 下单智能体(OrderAgentService)

import com.alibaba.fastjson.JSON;

import com.shop.multiagent.dto.ShopAgentContext;

import com.shop.multiagent.framework.Agent;

import com.shop.multiagent.framework.Skill;

import lombok.extern.slf4j.Slf4j;

import org.springframework.ai.openai.OpenAiCompletionsClient;

import org.springframework.stereotype.Service;

import reactor.core.publisher.Flux;

import java.time.Duration;

import java.util.HashMap;

import java.util.Map;

@Service

@Slf4j

@Skill(

    value = "order-agent",

    description = "生成订单信息的智能体(含商品明细、金额、收货地址等)",

    version = "1.0.0"

)

public class OrderAgentService implements Agent, Skill {

    private final OpenAiCompletionsClient completionsClient;

    // 构造器注入OpenAI客户端

    public OrderAgentService(OpenAiCompletionsClient completionsClient) {

        this.completionsClient = completionsClient;

    }

    // ====== Agent接口实现(核心业务逻辑)======

    @Override

    public ShopAgentContext execute(ShopAgentContext context) {

        // 1. 构造OpenAI提示词

        String prompt = String.format(

            "生成订单号【%s】的JSON格式下单信息,要求如下:" +

            "1. 用户ID:%s" +

            "2. 商品信息:%s" +

            "3. 必须包含字段:商品明细(名称、规格、单价、数量)、总金额(保留2位小数)、收货地址(省/市/区/详细地址)、下单时间(格式:yyyy-MM-dd HH:mm:ss)" +

            "4. 金额随机生成(合理范围:100-10000元)" +

            "5. 仅返回JSON,不包含其他内容",

            context.getOrderId(), context.getUserId(), context.getProductInfo()

        );

        // 2. 调用OpenAI流式接口

        FluxionsClient.stream(prompt);

        StringBuilder orderInfoBuilder = new StringBuilder();

        // 3. 订阅流式结果

        stream.subscribe(

            chunk -> orderInfoBuilder.append(chunk),  // 接收流数据

            e -> log.error("下单智能体OpenAI调用异常,订单号:{}", context.getOrderId(), e), // 异常处理

            () -> log.info("下单智能体OpenAI调用完成,订单号:{}", context.getOrderId()) // 流结束回调

        );

        // 4. 等待流结束(超时3秒)

        try {

            stream.blockLast(Duration.ofSeconds(3));

        } catch (Exception e) {

            log.error("下单智能体超时,订单号:{}", context.getOrderId(), e);

            throw new RuntimeException("订单生成超时", e);

        }

        // 5. 校验结果并设置到上下文

        String orderInfo = orderInfoBuilder.toString().trim();

        if (!isValidJson(orderInfo)) {

            throw new RuntimeException("订单信息格式错误,非JSON:" + orderInfo);

        }

        context.setOrderInfo(orderInfo);

        return context;

    }

    // ====== Skill接口实现(适配Skill框架)======

    @Override

    public String getName() {

        return this.getClass().getAnnotation(Skill.class).value();

    }

    @Override

    public String getDescription() {

        return this.getClass().getAnnotation(Skill.class).description();

    }

    @Override

    public Object execute(Map, Object> parameters) {

        // 将Skill标准化参数转换为上下文对象

        ShopAgentContext context = convertParamsToContext(parameters);

        // 复用Agent核心逻辑

        return execute(context);

    }

    @Override

    public Map getMetadata() {

        Map = new HashMap        metadata.put("version", this.getClass().getAnnotation(Skill.class).version());

        metadata.put("supportAsync", true);  // 支持异步执行

        metadata.put("timeout", 3000);       // 超时时间(3秒)

        metadata.put("type", "AI_GENERATION");// Skill类型(AI生成类)

        return metadata;

    }

    // ====== 辅助方法 ======

    /**

     * 校验字符串是否为合法JSON

     */

    private boolean isValidJson(String json) {

        try {

            JSON.parseObject(json);

            return true;

        } catch (Exception e) {

            return false;

        }

    }

    /**

     * 将Skill参数转换为上下文对象

     */

    private ShopAgentContext convertParamsToContext(Map parameters) {

        ShopAgentContext context = new ShopAgentContext();

        context.setOrderId((String) parameters.get("orderId"));

        context.setUserId((String) parameters.get("userId"));

        context.setProductInfo((String) parameters.get("productInfo"));

        context.setTradeNo((String) parameters.get("tradeNo"));

        // 兼容扩展字段

        if (parameters.containsKey("attributes") && parameters.get("attributes") instanceof Map) {

            context.setAttributes((Map parameters.get("attributes"));

        }

        // 校验必填参数

        validateRequiredParams(context);

        return context;

    }

    /**

     * 校验必填参数

     */

    private void validateRequiredParams(ShopAgentContext context) {

        if (context.getOrderId() == null) {

            throw new IllegalArgumentException("参数缺失:orderId");

        }

        if (context.getUserId() == null) {

            throw new IllegalArgumentException("参数缺失:userId");

        }

        if (context.getProductInfo() == null) {

            throw new IllegalArgumentException("参数缺失:productInfo");

        }

    }

}

3.2 支付智能体(PaymentAgentService)

import com.alibaba.fastjson.JSON;

import com.alipay.api.AlipayApiException;

import com.alipay.api.AlipayClient;

import com.alipay.api.request.AlipayTradeQueryRequest;

import com.alipay.api.response.AlipayTradeQueryResponse;

import com.shop.multiagent.dto.ShopAgentContext;

import com.shop.multiagent.framework.Agent;

import com.shop.multiagent.framework.Skill;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Service;

import java.util.HashMap;

import java.util.Map;

@Service

@Slf4j

@Skill(

    value = "payment-agent",

    description = "支付宝支付状态校验智能体",

    version = "1.0.0"

)

public class PaymentAgentService implements Agent, Skill {

    private final AlipayClient alipayClient;

    // 构造器注入支付宝客户端

    public PaymentAgentService(AlipayClient alipayClient) {

        this.alipayClient = alipayClient;

    }

    // ====== Agent接口实现(核心业务逻辑)======

    @Override

    public ShopAgentContext execute(ShopAgentContext context) {

        log.info("支付智能体执行,订单号:{},交易号:{}", context.getOrderId(), context.getTradeNo());

        // 1. 校验必填参数

        if (context.getTradeNo() == null) {

            throw new IllegalArgumentException("支付智能体参数缺失:tradeNo");

        }

        // 2. 构造支付宝查询请求

        AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();

        Map> bizContent = new HashMap<>();

        bizContent.put("out_trade_no", context.getOrderId()); // 商户订单号

        bizContent.put("trade_no", context.getTradeNo());     // 支付宝交易号

        request.setBizContent(JSON.toJSONString(bizContent));

        try {

            // 3. 调用支付宝MCP接口

            AlipayTradeQueryResponse response = alipayClient.execute(request);

            // 4. 解析响应结果

            if (response.isSuccess()) {

                String tradeStatus = response.getTradeStatus();

                log.info("支付宝查询成功,订单号:{},交易状态:{}", context.getOrderId(), tradeStatus);

                // 5. 判断支付是否成功(支付宝交易状态:TRADE_SUCCESS/TRADE_FINISHED)

                boolean paymentSuccess = "TRADE_SUCCESS".equals(tradeStatus) || "TRADE_FINISHED".equals(tradeStatus);

                context.setPaymentSuccess(paymentSuccess);

                // 6. 存储支付详情到扩展字段

                Map> paymentDetails = new HashMap<>();

                paymentDetails.put("tradeStatus", tradeStatus);

                paymentDetails.put("totalAmount", response.getTotalAmount());

                paymentDetails.put("payTime", response.getSendPayDate());

                context.getAttributes().put("paymentDetails", paymentDetails);

            } else {

                log.error("支付宝查询失败,订单号:{},错误码:{},错误信息:{}",

                    context.getOrderId(), response.getCode(), response.getMsg());

                context.setPaymentSuccess(false);

                context.getAttributes().put("paymentError", response.getMsg());

            }

        } catch (AlipayApiException e) {

            log.error("支付宝接口调用异常,订单号:{}", context.getOrderId(), e);

            context.setPaymentSuccess(false);

            context.getAttributes().put("paymentError", e.getErrMsg());

            throw new RuntimeException("支付校验失败", e);

        }

        return context;

    }

    // ====== Skill接口实现(适配Skill框架)======

    @Override

    public String getName() {

        return this.getClass().getAnnotation(Skill.class).value();

    }

    @Override

    public String getDescription() {

        return this.getClass().getAnnotation(Skill.class).description();

    }

    @Override

    public Object execute(Map parameters) {

        ShopAgentContext context = convertParamsToContext(parameters);

        return execute(context);

    }

    @Override

    public Map Object> getMetadata() {

        Map> metadata = new HashMap();

        metadata.put("version", this.getClass().getAnnotation(Skill.class).version());

        metadata.put("supportAsync", false); // 支付操作需同步执行

        metadata.put("timeout", 5000);       // 超时时间(5秒)

        metadata.put("type", "PAYMENT");     // Skill类型(支付类)

        return metadata;

    }

    // ====== 辅助方法 ======

    private ShopAgentContext convertParamsToContext(Map, Object> parameters) {

        ShopAgentContext context = new ShopAgentContext();

        context.setOrderId((String) parameters.get("orderId"));

        context.setTradeNo((String) parameters.get("tradeNo"));

        context.setOrderInfo((String) parameters.get("orderInfo"));

        // 校验必填参数

        if (context.getOrderId() == null) {

            throw new IllegalArgumentException("参数缺失:orderId");

        }

        if (context.getTradeNo() == null) {

            throw new IllegalArgumentException("参数缺失:tradeNo");

        }

        return context;

    }

}

3.3 物流智能体(LogisticsAgentService)

import com.shop.multiagent.dto.ShopAgentContext;

import com.shop.multiagent.framework.Agent;

import com.shop.multiagent.framework.Skill;

import lombok.extern.slf4j.Slf4j;

import org.springframework.ai.openai.OpenAiChatClient;

import org.springframework.stereotype.Service;

import java.util.*;

@Service

@Slf4j

@Skill(

    value = "logistics-agent",

    description = "物流轨迹查询与格式化智能体",

    version = "1.0.0"

)

public class LogisticsAgentService implements Agent, Skill {

    private final OpenAiChatClient chatClient;

    // 构造器注入OpenAI Chat客户端

    public LogisticsAgentService(OpenAiChatClient chatClient) {

        this.chatClient = chatClient;

    }

    // 模拟物流快递公司列表

    private static final List EXPRESS_COMPANIES = Arrays.asList("顺丰速运", "京东物流", "中通快递", "圆通快递", "韵达快递");

    // ====== Agent接口实现(核心业务逻辑)======

    @Override

    public ShopAgentContext execute(ShopAgentContext context) {

        log.info("物流智能体执行,订单号:{}", context.getOrderId());

        // 1. 支付失败直接返回

        if (!context.isPaymentSuccess()) {

            log.warn("订单{}支付未成功,跳过物流查询", context.getOrderId());

            context.setLogisticsInfo("支付未完成,暂无法查询物流信息");

            return context;

        }

        try {

            // 2. 模拟调用物流接口(真实场景替换为支付宝物流API或第三方物流接口)

            String logisticsRawData = mockLogisticsQuery(context.getOrderId());

            log.info("订单{}物流原始数据:{}", context.getOrderId(), logisticsRawData);

            // 3. 调用OpenAI格式化物流信息(用户友好型输出)

            String formattedLogistics = formatLogisticsInfo(logisticsRawData);

            context.setLogisticsInfo(formattedLogistics);

            // 4. 存储物流详情到扩展字段

            context.getAttributes().put("logisticsRawData", logisticsRawData);

        } catch (Exception e) {

            log.error("物流智能体执行异常,订单号:{}", context.getOrderId(), e);

            context.setLogisticsInfo("物流信息查询失败,请稍后重试");

            context.getAttributes().put("logisticsError", e.getMessage());

        }

        return context;

    }

    // ====== Skill接口实现(适配Skill框架)======

    @Override

    public String getName() {

        return this.getClass().getAnnotation(Skill.class).value();

    }

    @Override

    public String getDescription() {

        return this.getClass().getAnnotation(Skill.class).description();

    }

    @Override

    public Object execute(Map> parameters) {

        ShopAgentContext context = convertParamsToContext(parameters);

        return execute(context);

    }

    @Override

    public Map, Object> getMetadata() {

        Map Object> metadata = new HashMap<>();

        metadata.put("version", this.getClass().getAnnotation(Skill.class).version());

        metadata.put("supportAsync", true);  // 支持异步执行

        metadata.put("timeout", 4000);       // 超时时间(4秒)

        metadata.put("type", "LOGISTICS");   // Skill类型(物流类)

        return metadata;

    }

    // ====== 辅助方法 ======

    /**

     * 模拟物流查询接口返回

     */

    private String mockLogisticsQuery(String orderId) {

        // 随机选择快递公司

        String expressCompany = EXPRESS_COMPANIES.get(new Random().nextInt(EXPRESS_COMPANIES.size()));

        // 生成随机运单号

        String trackingNumber = "YT" + System.currentTimeMillis() + new Random().nextInt(1000);

        // 模拟物流轨迹(3-5个节点)

        List<Map String>> tracks = new ArrayList<>();

        int trackCount = 3 + new Random().nextInt(3);

        for (int i = 0; i  {

            Map String> track = new HashMap<>();

            String time = generateRandomTime(i);

            String status = getTrackStatus(i, trackCount);

            track.put("time", time);

            track.put("status", status);

            tracks.add(track);

        }

        // 构造原始数据JSON

        Map Object> rawData = new HashMapData.put("expressCompany", expressCompany);

        rawData.put("trackingNumber", trackingNumber);

        rawData.put("tracks", tracks);

        rawData.put("queryTime", new Date().toString());

        return com.alibaba.fastjson.JSON.toJSONString(rawData);

    }

    /**

     * 生成随机时间(模拟物流节点时间顺序)

     */

    private String generateRandomTime(int index) {

        Calendar calendar = Calendar.getInstance();

        calendar.add(Calendar.HOUR, - (index * 2 + new Random().nextInt(3)));

        return String.format("%tF %tT", calendar, calendar);

    }

    /**

     * 获取物流节点状态

     */

    private String getTrackStatus(int index, int total) {

        if (index == 0) {

            return "订单已创建,等待揽收";

        } else if (index == 1) {

            return "快递员已揽收,正在前往分拣中心";

        } else if (index  1) {

            return "包裹已到达" + getRandomCity() + "分拣中心,正在分拣";

        } else {

            return "包裹已发出,预计" + (1 + new Random().nextInt(3)) + "天后送达";

        }

    }

    /**

     * 生成随机城市(模拟物流中转)

     */

    private String getRandomCity() {

        List = Arrays.asList("北京", "上海", "广州", "深圳", "杭州", "成都", "武汉");

        return cities.get(new Random().nextInt(cities.size()));

    }

    /**

     * 调用OpenAI格式化物流信息

     */

    private String formatLogisticsInfo(String rawData) {

        String prompt = String.format(

            "以下是物流原始数据(JSON格式),请将其转换为用户友好的自然语言描述:" +

            "1. 先说明快递公司和运单号" +

            "2. 按时间顺序列出物流轨迹(最新状态在前)" +

            "3. 语言简洁明了,避免技术术语" +

            "4. 最后给出预计送达时间" +

            "原始数据:%s",

            rawData

        );

        return chatClient.call(prompt);

    }

    /**

     * 参数转换

     */

    private ShopAgentContext convertParamsToContext(Map parameters) {

        ShopAgentContext context = new ShopAgentContext();

        context.setOrderId((String) parameters.get("orderId"));

        context.setPaymentSuccess((Boolean) parameters.getOrDefault("paymentSuccess", false));

        context.setOrderInfo((String) parameters.get("orderInfo"));

        if (context.getOrderId() == null) {

            throw new IllegalArgumentException("参数缺失:orderId");

        }

        return context;

    }

}

3.4 交付智能体(DeliveryAgentService)

import com.shop.multiagent.dto.ShopAgentContext;

import com.shop.multiagent.framework.Agent;

import com.shop.multiagent.framework.Skill;

import lombok.extern.slf4j.Slf4j;

import org.springframework.ai.openai.OpenAiCompletionsClient;

import org.springframework.stereotype.Service;

import reactor.core.publisher.Flux;

import java.time.Duration;

import java.util.HashMap;

import java.util.Map;

@Service

@Slf4j

@Skill(

    value = "delivery-agent",

    description = "交付预估与签收提醒智能体",

    version = "1.0.0"

)

public class DeliveryAgentService implements Agent, Skill {

    private final OpenAiCompletionsClient completionsClient;

    public DeliveryAgentService(OpenAiCompletionsClient completionsClient) {

        this.completionsClient = completionsClient;

    }

    // ====== Agent接口实现(核心业务逻辑)======

    @Override

    public ShopAgentContext execute(ShopAgentContext context) {

        log.info("交付智能体执行,订单号:{}", context.getOrderId());

        // 1. 支付失败直接返回

        if (!context.isPaymentSuccess()) {

            context.setDeliveryInfo("支付未完成,无法生成交付信息");

            return context;

        }

        // 2. 校验物流信息

        if (context.getLogisticsInfo() == null || context.getLogisticsInfo().contains("失败")) {

            context.setDeliveryInfo("物流信息异常,暂无法预估交付时间");

            return context;

        }

        try {

            // 3. 调用OpenAI生成交付预估信息

            String prompt = String.format(

                "基于以下信息生成交付提醒:" +

                "1. 订单号:%s" +

                "2. 物流信息:%s" +

                "3. 要求:" +

                "   - 明确预计送达时间(精确到日期)" +

                "   - 给出签收注意事项(如:请保持电话畅通、支持代收等)" +

                "   - 说明异常处理方式(如:未收到货如何联系客服)" +

                "   - 语言友好,简洁实用",

                context.getOrderId(), context.getLogisticsInfo()

            );

            Flux completionsClient.stream(prompt);

            StringBuilder deliveryInfoBuilder = new StringBuilder();

            stream.subscribe(

                chunk -> deliveryInfoBuilder.append(chunk),

                e -> log.error("交付智能体OpenAI调用异常,订单号:{}", context.getOrderId(), e),

                () -> log.info("交付智能体OpenAI调用完成,订单号:{}", context.getOrderId())

            );

            stream.blockLast(Duration.ofSeconds(3));

            String deliveryInfo = deliveryInfoBuilder.toString().trim();

            context.setDeliveryInfo(deliveryInfo);

            // 4. 存储交付详情到扩展字段

            context.getAttributes().put("deliveryGeneratedTime", System.currentTimeMillis());

        } catch (Exception e) {

            log.error("交付智能体执行异常,订单号:{}", context.getOrderId(), e);

            context.setDeliveryInfo("交付信息生成失败,请稍后查询");

        }

        return context;

    }

    // ====== Skill接口实现 ======

    @Override

    public String getName() {

        return this.getClass().getAnnotation(Skill.class).value();

    }

    @Override

    public String getDescription() {

        return this.getClass().getAnnotation(Skill.class).description();

    }

    @Override

    public Object execute(Map Object> parameters) {

        ShopAgentContext context = convertParamsToContext(parameters);

        return execute(context);

    }

    @Override

    public Map<String, Object> getMetadata() {

        Map, Object> metadata = new HashMap.put("version", this.getClass().getAnnotation(Skill.class).version());

        metadata.put("supportAsync", true);

        metadata.put("timeout", 3000);

        metadata.put("type", "DELIVERY");

        return metadata;

    }

    // ====== 辅助方法 ======

    private ShopAgentContext convertParamsToContext(Map<String, Object> parameters) {

        ShopAgentContext context = new ShopAgentContext();

        context.setOrderId((String) parameters.get("orderId"));

        context.setPaymentSuccess((Boolean) parameters.getOrDefault("paymentSuccess", false));

        context.setLogisticsInfo((String) parameters.get("logisticsInfo"));

        if (context.getOrderId() == null) {

            throw new IllegalArgumentException("参数缺失:orderId");

        }

        return context;

    }

}

3.5 客服智能体(CustomerServiceAgentService)

import com.shop.multiagent.dto.ShopAgentContext;

import com.shop.multiagent.framework.Agent;

import com.shop.multiagent.framework.Skill;

import lombok.extern.slf4j.Slf4j;

import org.springframework.ai.openai.OpenAiChatClient;

import org.springframework.ai.openai.OpenAiEmbeddingClient;

import org.springframework.stereotype.Service;

import org.springframework.web.client.RestTemplate;

import java.util.*;

@Service

@Slf4j

@Skill(

    value = "customer-service-agent",

    description = "基于RAG知识库的客服应答智能体",

    version = "1.0.0"

)

public class CustomerServiceAgentService implements Agent, Skill {

    private final OpenAiEmbeddingClient embeddingClient;

    private final OpenAiChatClient chatClient;

    private final RestTemplate restTemplate;

    private final String RAG_API_URL = "http://localhost:8080/api/rag/search"; // RAG知识库接口

    public CustomerServiceAgentService(OpenAiEmbeddingClient embeddingClient, OpenAiChatClient chatClient) {

        this.embeddingClient = embeddingClient;

        this.chatClient = chatClient;

        this.restTemplate = new RestTemplate();

    }

    // ====== Agent接口实现(核心业务逻辑)======

    @Override

    public ShopAgentContext execute(ShopAgentContext context) {

        log.info("客服智能体执行,订单号:{}", context.getOrderId());

        try {

            // 1. 构造用户咨询问题(基于订单全链路信息)

            String userQuestion = String.format(

                "用户咨询订单%s的相关问题,订单信息:%s,支付状态:%s,物流信息:%s,交付信息:%s",

                context.getOrderId(),

                context.getOrderInfo(),

                context.isPaymentSuccess() ? "已支付" : "未支付",

                context.getLogisticsInfo(),

                context.getDeliveryInfo()

            );

            log.info("客服智能体用户问题:{}", userQuestion);

            // 2. 生成问题向量(用于RAG知识库检索)

            float[] questionEmbedding = embeddingClient.embed(userQuestion).getEmbedding();

            // 3. 调用RAG知识库检索相关文档

            Map Object> ragRequest = new HashMapRequest.put("orderId", context.getOrderId());

            ragRequest.put("embedding", questionEmbedding);

            ragRequest.put("topK", 3); // 取Top3相关文档

            Map Object> ragResponse = restTemplate.postForObject(RAG_API_URL, ragRequest, Map.class);

            String knowledgeContext = extractKnowledgeContext(ragResponse);

            log.info("订单{}RAG检索结果:{}", context.getOrderId(), knowledgeContext);

            // 4. 调用OpenAI生成客服应答

            String prompt = String.format(

                "作为电商平台客服,基于以下知识库和订单信息回复用户:" +

                "1. 知识库内容:%s" +

                "2. 订单信息:%s" +

                "3. 回复要求:" +

                "   - 语气友好、专业" +

                "   - 直接回答用户问题,不冗余" +

                "   - 若有异常情况,提供解决方案" +

                "   - 结尾引导用户有疑问随时联系",

                knowledgeContext, userQuestion

            );

            String csReply = chatClient.call(prompt);

            context.setCustomerServiceReply(csReply);

            // 5. 存储客服应答详情

            context.getAttributes().put("csReplyGeneratedTime", System.currentTimeMillis());

            context.getAttributes().put("ragSearchCount", ragResponse != null ? ragResponse.get("count") : 0);

        } catch (Exception e) {

            log.error("客服智能体执行异常,订单号:{}", context.getOrderId(), e);

            context.setCustomerServiceReply("非常抱歉,当前客服系统繁忙,请稍后再试(订单号:" + context.getOrderId() + ")");

        }

        return context;

    }

    // ====== Skill接口实现 ======

    @Override

    public String getName() {

        return this.getClass().getAnnotation(Skill.class).value();

    }

    @Override

    public String getDescription() {

        return this.getClass().getAnnotation(Skill.class).description();

    }

    @Override

    public Object execute(Map Object> parameters) {

        ShopAgentContext context = convertParamsToContext(parameters);

        return execute(context);

    }

    @Override

    public Map<String, Object> getMetadata() {

        Map, Object> metadata = new HashMap.put("version", this.getClass().getAnnotation(Skill.class).version());

        metadata.put("supportAsync", true);

        metadata.put("timeout", 6000); // RAG+AI生成超时时间较长(6秒)

        metadata.put("type", "CUSTOMER_SERVICE");

        return metadata;

    }

    // ====== 辅助方法 ======

    /**

     * 提取RAG知识库返回的上下文

     */

    private String extractKnowledgeContext(Map, Object> ragResponse) {

        if (ragResponse == null || !ragResponse.containsKey("documents")) {

            return "知识库暂无相关信息";

        }

        List<Map, String>> documents = (List<Map>>) ragResponse.get("documents");

        StringBuilder contextBuilder = new StringBuilder();

        for (Map String> doc : documents) {

            contextBuilder.append("- ").append(doc.get("content")).append("\n");

        }

        return contextBuilder.toString().trim();

    }

    /**

     * 参数转换

     */

    private ShopAgentContext convertParamsToContext(Map) {

        ShopAgentContext context = new ShopAgentContext();

        context.setOrderId((String) parameters.get("orderId"));

        context.setOrderInfo((String) parameters.get("orderInfo"));

        context.setPaymentSuccess((Boolean) parameters.getOrDefault("paymentSuccess", false));

        context.setLogisticsInfo((String) parameters.get("logisticsInfo"));

        context.setDeliveryInfo((String) parameters.get("deliveryInfo"));

        if (context.getOrderId() == null) {

            throw new IllegalArgumentException("参数缺失:orderId");

        }

        return context;

    }

}

四、Skill 框架核心组件

4.1 Skill 工厂(SkillFactory)

import org.springframework.context.ApplicationContext;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;

import java.util.Collection;

/**

 * Skill工厂(动态加载、管理所有Skill组件)

 */

@Component

public class SkillFactory {

    // 存储Skill:key=Skill名称,value=Skill实例

    private final MapMap = new HashMap

    /**

     * 构造器注入Spring上下文,自动扫描所有@Skill注解的Bean

     */

    public SkillFactory(ApplicationContext applicationContext) {

        // 扫描所有标注@Skill注解的Bean

        Map skillBeans = applicationContext.getBeansWithAnnotation(com.shop.multiagent.framework.Skill.class);

        // 初始化SkillMap

        for (Map.Entry, Object> entry : skillBeans.entrySet()) {

            Object bean = entry.getValue();

            // 校验Bean是否实现Skill接口

            if (bean instanceof Skill) {

                Skill skill = (Skill) bean;

                com.shop.multiagent.framework.Skill skillAnnotation = bean.getClass().getAnnotation(com.shop.multiagent.framework.Skill.class);

                String skillName = skillAnnotation.value();

                // 校验Skill名称唯一性

                if (skillMap.containsKey(skillName)) {

                    throw new IllegalStateException("Skill名称重复:" + skillName);

                }

                skillMap.put(skillName, skill);

                log.info("Skill工厂加载成功:{}(描述:{},版本:{})",

                    skillName, skillAnnotation.description(), skillAnnotation.version());

            } else {

                log.warn("Bean {} 标注了@Skill注解,但未实现Skill接口,跳过加载", entry.getKey());

            }

        }

        log.info("Skill工厂初始化完成,共加载{}个Skill", skillMap.size());

    }

    /**

     * 根据名称获取Skill实例

     */

    public Skill getSkill(String skillName) {

        if (skillName == null) {

            throw new IllegalArgumentException("Skill名称不能为空");

        }

        return skillMap.get(skillName);

    }

    /**

     * 检查Skill是否存在

     */

    public boolean hasSkill(String skillName) {

        return skillMap.containsKey(skillName);

    }

    /**

     * 获取所有Skill名称

     */

    public Collection> getSkillNames() {

        return skillMap.keySet();

    }

    /**

     * 获取所有Skill实例

     */

    public Collection> getAllSkills() {

        return skillMap.values();

    }

}

4.2 线程池选择器(SkillThreadPoolSelector)

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import org.springframework.stereotype.Component;

import java.util.Map;

import java.util.concurrent.Executor;

import java.util.concurrent.Executors;

/**

 * 根据Skill元数据动态选择线程池

 */

@Component

public class SkillThreadPoolSelector {

    // 通用线程池(处理异步非工具类任务)

    private final ThreadPoolTaskExecutor defaultExecutor;

    // 工具类线程池(处理支付、物流等第三方接口调用)

    private final ThreadPoolTaskExecutor toolExecutor;

    @Autowired

    public SkillThreadPoolSelector(

            @Qualifier("defaultTaskExecutor") ThreadPoolTaskExecutor defaultExecutor,

            @Qualifier("toolTaskExecutor") ThreadPoolTaskExecutor toolExecutor) {

        this.defaultExecutor = defaultExecutor;

        this.toolExecutor = toolExecutor;

    }

    /**

     * 根据Skill选择合适的线程池

     */

    public Executor selectExecutor(Skill skill) {

        if (skill == null) {

            throw new IllegalArgumentException("Skill不能为空");

        }

        Map> metadata = skill.getMetadata();

        String skillName = skill.getName();

        String skillType = (String) metadata.getOrDefault("type", "DEFAULT");

        // 1. 支付、物流类Skill使用工具线程池(第三方接口调用)

        if ("PAYMENT".equals(skillType) || "LOGISTICS".equals(skillType)) {

            log.debug("Skill {} 选择工具线程池(toolTaskExecutor)", skillName);

            return toolExecutor;

        }

        // 2. 支持异步的Skill使用通用线程池

        if (Boolean.TRUE.equals(metadata.getOrDefault("supportAsync", false))) {

            log.debug("Skill {} 选择通用线程池(defaultTaskExecutor)", skillName);

            return defaultExecutor;

        }

        // 3. 其他情况使用单线程池(同步执行)

        log.debug("Skill {} 选择同步线程池(SingleThreadExecutor)", skillName);

        return Executors.newSingleThreadExecutor();

    }

}

五、流程编排(StateGraph)

5.1 工作流服务(ShopWorkflowService)

import com.shop.multiagent.dto.ShopAgentContext;

import com.shop.multiagent.framework.Skill;

import com.shop.multiagent.framework.SkillFactory;

import com.shop.multiagent.framework.SkillThreadPoolSelector;

import lombok.extern.slf4j.Slf4j;

import org.springframework.ai.stateflow.StateGraph;

import org.springframework.stereotype.Service;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.Executor;

/**

 * 购物流程编排服务(基于StateGraph)

 */

@Service

@Slf4j

public class ShopWorkflowService {

    private final SkillFactory skillFactory;

    private final SkillThreadPoolSelector threadPoolSelector;

    @Autowired

    public ShopWorkflowService(SkillFactory skillFactory, SkillThreadPoolSelector threadPoolSelector) {

        this.skillFactory = skillFactory;

        this.threadPoolSelector = threadPoolSelector;

    }

    /**

     * 构建购物流程StateGraph

     */

    public StateGraph> buildWorkflow() {

        return StateGraph.AgentContext>builder()

            // 1. 下单节点(调用order-agent Skill)

            .step("order", context -> {

                log.info("流程执行:下单节点,订单号:{}", context.getOrderId());

                return executeSkill("order-agent", context);

            })

            // 2. 支付节点(调用payment-agent Skill)

            .step("payment", context -> {

                log.info("流程执行:支付节点,订单号:{}", context.getOrderId());

                return executeSkill("payment-agent", context);

            })

            // 3. 物流节点(调用logistics-agent Skill)

            .step("logistics", context -> {

                log.info("流程执行:物流节点,订单号:{}", context.getOrderId());

                return executeSkill("logistics-agent", context);

            })

            // 4. 交付节点(调用delivery-agent Skill)

            .step("delivery", context -> {

                log.info("流程执行:交付节点,订单号:{}", context.getOrderId());

                return executeSkill("delivery-agent", context);

            })

            // 5. 客服节点(调用customer-service-agent Skill)

            .step("customerService", context -> {

                log.info("流程执行:客服节点,订单号:{}", context.getOrderId());

                return executeSkill("customer-service-agent", context);

            })

            // 流程顺序定义

            .edge("order", "payment")        // 下单 → 支付

            .edge("payment", "logistics")    // 支付 → 物流

            .edge("logistics", "delivery")   // 物流 → 交付

            .edge("delivery", "customerService") // 交付 → 客服

            // 起始节点

            .start("order")

            // 错误处理(每个节点单独配置错误处理器)

            .onError("order", this::handleOrderError)

            .onError("payment", this::handlePaymentError)

            .onError("logistics", this::handleLogisticsError)

            .onError("delivery", this::handleDeliveryError)

            .onError("customerService", this::handleCustomerServiceError)

            // 构建StateGraph

            .build();

    }

    /**

     * 执行Skill(统一调度逻辑)

     */

    private ShopAgentContext executeSkill(String skillName, ShopAgentContext context) {

        // 1. 获取Skill实例

        Skill skill = skillFactory.getSkill(skillName);

        if (skill == null) {

            throw new RuntimeException("未找到Skill:" + skillName);

        }

        // 2. 转换上下文为Skill参数

        Map> params = convertContextToParams(context);

        // 3. 选择线程池

        Executor executor = threadPoolSelector.selectExecutor(skill);

        // 4. 异步执行Skill(根据Skill元数据决定是否异步)

        CompletableFutureAgentContext> future = CompletableFuture.supplyAsync(

            () -> {

                log.debug("Skill {} 开始执行,订单号:{}", skillName, context.getOrderId());

                try {

                    // 执行Skill并转换结果

                    Object result = skill.execute(params);

                    if (!(result instanceof ShopAgentContext)) {

                        throw new RuntimeException("Skill " + skillName + " 执行结果类型错误,应为ShopAgentContext");

                    }

                    return (ShopAgentContext) result;

                } catch (Exception e) {

                    log.error("Skill {} 执行异常,订单号:{}", skillName, context.getOrderId(), e);

                    throw e;

                }

            },

            executor

        );

        // 5. 等待执行结果(同步阻塞,确保流程顺序执行)

        return future.join();

    }

    /**

     * 上下文转换为Skill参数

     */

    private MapContextToParams(ShopAgentContext context) {

        Map = new HashMap        params.put("orderId", context.getOrderId());

        params.put("userId", context.getUserId());

        params.put("productInfo", context.getProductInfo());

        params.put("tradeNo", context.getTradeNo());

        params.put("orderInfo", context.getOrderInfo());

        params.put("paymentSuccess", context.isPaymentSuccess());

        params.put("logisticsInfo", context.getLogisticsInfo());

        params.put("deliveryInfo", context.getDeliveryInfo());

        params.put("attributes", context.getAttributes());

        return params;

    }

    /**

     * 节点错误处理器(下单节点)

     */

    private void handleOrderError(ShopAgentContext context, Throwable throwable) {

        log.error("下单节点执行失败,订单号:{}", context.getOrderId(), throwable);

        context.getAttributes().put("errorNode", "order");

        context.getAttributes().put("errorMessage", throwable.getMessage());

        // 下单失败直接终止流程

        throw new RuntimeException("下单失败:" + throwable.getMessage(), throwable);

    }

    /**

     * 支付节点错误处理器

     */

    private void handlePaymentError(ShopAgentContext context, Throwable throwable) {

        log.error("支付节点执行失败,订单号:{}", context.getOrderId(), throwable);

        context.setPaymentSuccess(false);

        context.getAttributes().put("errorNode", "payment");

        context.getAttributes().put("errorMessage", throwable.getMessage());

        // 支付失败仍继续流程(后续节点会处理未支付状态)

    }

    /**

     * 物流节点错误处理器

     */

    private void handleLogisticsError(ShopAgentContext context, Throwable throwable) {

        log.error("物流节点执行失败,订单号:{}", context.getOrderId(), throwable);

        context.setLogisticsInfo("物流查询失败:" + throwable.getMessage());

        context.getAttributes().put("errorNode", "logistics");

        context.getAttributes().put("errorMessage", throwable.getMessage());

    }

    /**

     * 交付节点错误处理器

     */

    private void handleDeliveryError(ShopAgentContext context, Throwable throwable) {

        log.error("交付节点执行失败,订单号:{}", context.getOrderId(), throwable);

        context.setDeliveryInfo("交付信息生成失败:" + throwable.getMessage());

        context.getAttributes().put("errorNode", "delivery");

        context.getAttributes().put("errorMessage", throwable.getMessage());

    }

    /**

     * 客服节点错误处理器

     */

    private void handleCustomerServiceError(ShopAgentContext context, Throwable throwable) {

        log.error("客服节点执行失败,订单号:{}", context.getOrderId(), throwable);

        context.setCustomerServiceReply("客服应答生成失败:" + throwable.getMessage());

        context.getAttributes().put("errorNode", "customerService");

        context.getAttributes().put("errorMessage", throwable.getMessage());

    }

    /**

     * 执行完整购物流程

     */

    public ShopAgentContext runWorkflow(ShopAgentContext initialContext) {

        log.info("开始执行购物流程,订单号:{}", initialContext.getOrderId());

        // 校验初始参数

        validateInitialContext(initialContext);

        // 构建并执行StateGraph

        StateGraph> stateGraph = buildWorkflow();

        ShopAgentContext result = stateGraph.execute(initialContext);

        log.info("购物流程执行完成,订单号:{}", result.getOrderId());

        return result;

    }

    /**

     * 校验初始上下文参数

     */

    private void validateInitialContext(ShopAgentContext context) {

        if (context.getOrderId() == null) {

            throw new IllegalArgumentException("初始参数缺失:orderId");

        }

        if (context.getUserId() == null) {

            throw new IllegalArgumentException("初始参数缺失:userId");

        }

        if (context.getProductInfo() == null) {

            throw new IllegalArgumentException("初始参数缺失:productInfo");

        }

        if (context.getTradeNo() == null) {

            throw new IllegalArgumentException("初始参数缺失:tradeNo");

        }

    }

}

六、配置类(OpenAI、支付宝、线程池)

6.1 线程池配置(ThreadPoolConfig)

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**

 * 线程池配置

 */

@Configuration

public class ThreadPoolConfig {

    /**

     * 通用任务线程池(处理异步非工具类任务)

     */

    @Bean("defaultTaskExecutor")

    public ThreadPoolTaskExecutor defaultTaskExecutor() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(10);          // 核心线程数

        executor.setMaxPoolSize(20);           // 最大线程数

        executor.setQueueCapacity(100);         // 队列容量

        executor.setThreadNamePrefix("default-task-"); // 线程名称前缀

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(调用者执行)

        executor.setKeepAliveSeconds(60);       // 空闲线程存活时间

        executor.initialize();

        return executor;

    }

    /**

     * 工具类任务线程池(处理支付、物流等第三方接口调用)

     */

    @Bean("toolTaskExecutor")

    public ThreadPoolTaskExecutor toolTaskExecutor() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(8);           // 核心线程数(工具调用并发不宜过高)

        executor.setMaxPoolSize(15);           // 最大线程数

        executor.setQueueCapacity(50);          // 队列容量

        executor.setThreadNamePrefix("tool-task-"); // 线程名称前缀

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略(直接抛出异常)

        executor.setKeepAliveSeconds(30);       // 空闲线程存活时间

        executor.initialize();

        return executor;

    }

}

6.2 OpenAI 配置(OpenAiConfig)

import org.springframework.ai.openai.OpenAiApi;

import org.springframework.ai.openai.OpenAiChatClient;

import org.springframework.ai.openai.OpenAiCompletionsClient;

import org.springframework.ai.openai.OpenAiEmbeddingClient;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

 * OpenAI客户端配置

 */

@Configuration

public class OpenAiConfig {

    @Value("${spring.ai.openai.api-key}")

    private String apiKey;

    @Value("${spring.ai.openai.api-endpoint:https://api.openai.com/v1}")

    private String apiEndpoint;

    /**

     * OpenAI API客户端

     */

    @Bean

    public OpenAiApi openAiApi() {

        return new OpenAiApi(apiEndpoint, apiKey);

    }

    /**

     * 文本补全客户端(用于流式生成)

     */

    @Bean

    public OpenAiCompletionsClient openAiCompletionsClient() {

        return new OpenAiCompletionsClient(openAiApi());

    }

    /**

     * 聊天客户端(用于格式化、客服应答)

     */

    @Bean

    public OpenAiChatClient openAiChatClient() {

        return new OpenAiChatClient(openAiApi());

    }

    /**

     * 嵌入客户端(用于RAG知识库检索)

     */

    @Bean

    public OpenAiEmbeddingClient openAiEmbeddingClient() {

        return new OpenAiEmbeddingClient(openAiApi());

    }

}

6.3 支付宝配置(AlipayConfig)

import com.alipay.api.AlipayClient;

import com.alipay.api.DefaultAlipayClient;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

 * 支付宝MCP客户端配置

 */

@Configuration

public class AlipayConfig {

    @Value("${alipay.app-id}")

    private String appId;

    @Value("${alipay.private-key}")

    private String privateKey; // 商户私钥

    @Value("${alipay.alipay-public-key}")

    private String alipayPublicKey; // 支付宝公钥

    @Value("${alipay.gateway-url:https://openapi.alipay.com/gateway.do}")

    private String gatewayUrl;

    @Value("${alipay.charset:UTF-8}")

    private String charset;

    @Value("${alipay.sign-type:RSA2}")

    private String signType;

    /**

     * 支付宝客户端实例

     */

    @Bean

    public AlipayClient alipayClient() {

        return new DefaultAlipayClient(

            gatewayUrl,

            appId,

            privateKey,

            "json",

            charset,

            alipayPublicKey,

            signType

        );

    }

}

七、控制器(对外接口)

7.1 购物流程控制器(ShopAgentController)

import com.shop.multiagent.dto.ShopAgentContext;

import com.shop.multiagent.dto.ErrorResponse;

import com.shop.multiagent.service.ShopWorkflowService;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.http.HttpStatus;

import org.springframework.http.ResponseEntity;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

/**

 * 购物流程对外接口

 */

@RestController

@RequestMapping("/api/shop")

@Slf4j

public class ShopAgentController {

    private final

八、RAG 知识库集成

8.1 RAG 服务接口

创建 RAG 知识库服务接口:

public interface RagService {

    void initializeKnowledgeBase();

    

    void addDocument(String documentId, String content, Map<String, Object> metadata);

    

    List<Document> search(String query, int topK);

    

    String generateResponse(String query, List<Document> relatedDocuments);

}

8.2 向量数据库配置

配置向量数据库(以 Redis 为例):

@Configuration

public class VectorDbConfig {

    @Value("${vector.db.type:redis}")

    private String dbType;

    

    @Value("${vector.db.host:localhost}")

    private String host;

    

    @Value("${vector.db.port:6379}")

    private int port;

    

    @Value("${vector.db.password:}")

    private String password;

    

    @Value("${vector.db.database:0}")

    private int database;

    

    @Bean

    public VectorStore vectorStore() {

        switch (dbType) {

            case "redis":

                return new RedisVectorStore(host, port, password, database);

            case "milvus":

                return new MilvusVectorStore();

            default:

                return new InMemoryVectorStore();

        }

    }

}

8.3 RAG 实现类

实现 RAG 核心逻辑:

@Service

public class RagServiceImpl implements RagService {

    private final VectorStore vectorStore;

    private final OpenAiEmbeddingClient embeddingClient;

    

    public RagServiceImpl(VectorStore vectorStore, OpenAiEmbeddingClient embeddingClient) {

        this.vectorStore = vectorStore;

        this.embeddingClient = embeddingClient;

    }

    

    @Override

    public void initializeKnowledgeBase() {

        // 初始化知识库,加载预定义文档

        List<Document> initialDocs = Arrays.asList(

            new Document("cs_faq_001", "顺丰物流江浙沪1天达", Collections.singletonMap("type", "logistics")),

            new Document("cs_faq_002", "交付异常可联系400-123-4567", Collections.singletonMap("type", "support")),

            new Document("cs_faq_003", "签收后7天无理由退换", Collections.singletonMap("type", "return"))

        );

        

        for (Document doc : initialDocs) {

            addDocument(doc.getId(), doc.getContent(), doc.getMetadata());

        }

    }

    

    @Override

    public void addDocument(String documentId, String content, Map<String, Object> metadata) {

        try {

            float[] embedding = embeddingClient.embed(content).getEmbedding();

            vectorStore.addDocument(documentId, content, embedding, metadata);

        } catch (Exception e) {

            log.error("文档添加失败:", e);

        }

    }

    

    @Override

    public List<Document> search(String query, int topK) {

        try {

            float[] queryEmbedding = embeddingClient.embed(query).getEmbedding();

            return vectorStore.search(queryEmbedding, topK);

        } catch (Exception e) {

            log.error("搜索失败:", e);

            return Collections.emptyList();

        }

    }

    

    @Override

    public String generateResponse(String query, List<Document> relatedDocuments) {

        // 构建上下文

        StringBuilder contextBuilder = new StringBuilder();

        contextBuilder.append("以下是相关知识库内容:\n");

        

        for (Document doc : relatedDocuments) {

            contextBuilder.append("- ").append(doc.getContent()).append("\n");

        }

        

        contextBuilder.append("\n用户问题:").append(query);

        

        // 使用OpenAI生成回答

        String prompt = String.format(

            "基于以上知识库内容,回答用户的问题:%s",

            contextBuilder.toString()

        );

        

        return openAiChatClient.call(prompt);

    }

}

九、Skill 框架设计

9.1 Skill 接口定义

定义 Skill 接口规范:

public interface Skill {

    String getName();

    

    String getDescription();

    

    Object execute(Map<String, Object> parameters);

    

    Map<String, Object> getMetadata();

}

9.2 Skill 工厂类

创建 Skill 工厂用于动态加载和管理:

@Component

public class SkillFactory {

    private final Map<String, Skill> skills = new HashMap<>();

    

    @Autowired

    public SkillFactory(ApplicationContext applicationContext) {

        // 自动扫描所有标注@Skill注解的Bean

        Map<String, Object> skillBeans = applicationContext.getBeansWithAnnotation(Skill.class);

        

        for (Map.Entry<String, Object> entry : skillBeans.entrySet()) {

            Object bean = entry.getValue();

            if (bean instanceof Skill) {

                Skill skill = (Skill) bean;

                Skill annotation = bean.getClass().getAnnotation(Skill.class);

                skills.put(annotation.value(), skill);

            }

        }

    }

    

    public Skill getSkill(String name) {

        return skills.get(name);

    }

    

    public boolean hasSkill(String name) {

        return skills.containsKey(name);

    }

    

    public Collection<Skill> getAllSkills() {

        return skills.values();

    }

}

9.3 Skill 注解定义

创建自定义 Skill 注解:

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

public @interface Skill {

    String value();

    

    String description() default "";

    

    String version() default "1.0.0";

}

十、异常处理与监控

10.1 全局异常处理器

实现全局异常处理:

@RestControllerAdvice

public class GlobalExceptionHandler {

    @ExceptionHandler(Exception.class)

    public ResponseEntity<ErrorResponse> handleException(Exception e) {

        ErrorResponse error = new ErrorResponse();

        error.setErrorCode("INTERNAL_SERVER_ERROR");

        error.setErrorMessage("系统内部错误,请稍后重试");

        error.setDetails(e.getMessage());

        

        log.error("全局异常处理:", e);

        return ResponseEntity.status(500).body(error);

    }

    

    @ExceptionHandler(AlipayApiException.class)

    public ResponseEntity<ErrorResponse> handleAlipayException(AlipayApiException e) {

        ErrorResponse error = new ErrorResponse();

        error.setErrorCode("ALIPAY_ERROR");

        error.setErrorMessage("支付宝接口调用失败");

        error.setDetails(e.getErrMsg());

        

        return ResponseEntity.status(400).body(error);

    }

    

    @ExceptionHandler(OpenAiException.class)

    public ResponseEntity<ErrorResponse> handleOpenAiException(OpenAiException e) {

        ErrorResponse error = new ErrorResponse();

        error.setErrorCode("OPENAI_ERROR");

        error.setErrorMessage("OpenAI接口调用失败");

        error.setDetails(e.getMessage());

        

        return ResponseEntity.status(400).body(error);

    }

}

10.2 监控与日志配置

配置系统监控:

@Configuration

public class MonitoringConfig {

    @Bean

    public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {

        return registry -> registry.config().commonTags("application", "shop-multi-agent", "version", "1.0.0");

    }

    

    @Bean

    public HealthIndicator paymentHealthIndicator(AlipayClient alipayClient) {

        return () -> {

            try {

                AlipaySystemOauthTokenRequest request = new AlipaySystemOauthTokenRequest();

                request.setGrantType("client_credentials");

                AlipayClient alipayClient = new DefaultAlipayClient(

                    "https://openapi.alipay.com/gateway.do",

                    "app_id",

                    "private_key",

                    "json",

                    "UTF-8",

                    "public_key",

                    "RSA2"

                );

                AlipaySystemOauthTokenResponse response = alipayClient.execute(request);

                return Health.up().withDetail("status", "OK").build();

            } catch (Exception e) {

                return Health.down().withDetail("error", e.getMessage()).build();

            }

        };

    }

}

十一、性能优化策略

11.1 缓存机制

实现智能缓存优化:

@Component

public class ResultCache {

    private final LoadingCache<String, Object> cache;

    

    public ResultCache() {

        this.cache = Caffeine.newBuilder()

            .maximumSize(1000)

            .expireAfterWrite(1, TimeUnit.HOURS)

            .recordStats()

            .build(key -> loadResult(key));

    }

    

    private Object loadResult(String key) {

        // 从数据库或其他数据源加载

        return null;

    }

    

    public void put(String key, Object value) {

        cache.put(key, value);

    }

    

    public Optional<Object> get(String key) {

        return Optional.ofNullable(cache.getIfPresent(key));

    }

    

    public void invalidate(String key) {

        cache.invalidate(key);

    }

    

    public CacheStats stats() {

        return cache.stats();

    }

}

11.2 批处理优化

实现智能体批处理:

@Service

public class BatchProcessingService {

    private final ThreadPoolExecutor batchExecutor;

    

    public BatchProcessingService() {

        batchExecutor = new ThreadPoolExecutor(

            10, 20, 60, TimeUnit.SECONDS,

            new ArrayBlockingQueue<>(100),

            new ThreadPoolExecutor.CallerRunsPolicy()

        );

    }

    

    public void processBatch(List<ShopAgentContext> contexts) {

        for (ShopAgentContext context : contexts) {

            batchExecutor.submit(() -> {

                try {

                    ShopWorkflowService workflowService = SpringContextHolder.getBean(ShopWorkflowService.class);

                    workflowService.runWorkflow(context);

                } catch (Exception e) {

                    log.error("批处理异常:", e);

                }

            });

        }

    }

}

十二、测试用例设计

12.1 单元测试

编写智能体单元测试:

@RunWith(SpringJUnit4ClassRunner.class)

@SpringBootTest

public class AgentTest {

    @Autowired

    private OrderAgentService orderAgent;

    

    @Autowired

    private PaymentAgentService paymentAgent;

    

    @Test

    public void testOrderAgent() {

        ShopAgentContext context = new ShopAgentContext();

        context.setOrderId("SHOP_001");

        context.setUserId("USER_1001");

        context.setProductInfo("华为Mate60 Pro 128G");

        

        ShopAgentContext result = orderAgent.execute(context);

        

        assertNotNull(result.getOrderInfo());

        assertTrue(result.getOrderInfo().startsWith("{"));

    }

    

    @Test

    public void testPaymentAgentSuccess() {

        ShopAgentContext context = new ShopAgentContext();

        context.setOrderId("SHOP_001");

        context.setTradeNo("2024060122001410086000000001");

        

        // 使用Mockito模拟支付宝响应

        AlipayTradeQueryResponse response = mock(AlipayTradeQueryResponse.class);

        when(response.isSuccess()).thenReturn(true);

        when(response.getTradeStatus()).thenReturn("TRADE_SUCCESS");

        

        ShopAgentContext result = paymentAgent.execute(context);

        

        assertTrue(result.isPaymentSuccess());

    }

}

12.2 集成测试

编写完整流程集成测试:

@RunWith(SpringJUnit4ClassRunner.class)

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)

public class IntegrationTest {

    @Autowired

    private TestRestTemplate restTemplate;

    

    @Test

    public void testFullWorkflow() {

        ShopAgentContext initialContext = new ShopAgentContext();

        initialContext.setOrderId("SHOP_001");

        initialContext.setUserId("USER_1001");

        initialContext.setProductInfo("华为Mate60 Pro 128G");

        initialContext.setTradeNo("2024060122001410086000000001");

        

        ResponseEntity<ShopAgentContext> response = restTemplate.postForEntity(

            "/api/shop/run-workflow", initialContext, ShopAgentContext.class

        );

        

        ShopAgentContext result = response.getBody();

        

        assertNotNull(result);

        assertNotNull(result.getOrderInfo());

        assertTrue(result.isPaymentSuccess());

        assertNotNull(result.getLogisticsInfo());

        assertNotNull(result.getDeliveryInfo());

        assertNotNull(result.getCustomerServiceReply());

    }

}

十三、部署与运行

13.1 部署配置

创建 Dockerfile 用于容器化部署:

FROM openjdk:17-slim

WORKDIR /app

COPY target/shop-multi-agent-1.0.0.jar app.jar

EXPOSE 8080

ENTRYPOINT ["java", "-jar", "app.jar", "--spring.profiles.active=prod"]

13.2 运行脚本

创建启动脚本:

#!/bin/bash

set -e

# 设置环境变量

export OPENAI_API_KEY="your-openai-api-key"

export ALIPAY_APP_ID="your-alipay-app-id"

export ALIPAY_PRIVATE_KEY="your-alipay-private-key"

export ALIPAY_PUBLIC_KEY="your-alipay-public-key"

# 启动应用

java -jar target/shop-multi-agent-1.0.0.jar \

  --server.port=8080 \

  --spring.profiles.active=prod

13.3 配置文件

application-prod.yml 配置:

server:

  port: 8080

  servlet:

    context-path: /api

spring:

  profiles: prod

  application:

    name: shop-multi-agent

  main:

    allow-bean-definition-overriding: true

# 数据库配置(如果有)

# datasource:

#   url: jdbc:mysql://localhost:3306/shop

#   username: root

#   password: password

# 日志配置

logging:

  level:

    root: INFO

    com.shop.multiagent: DEBUG

  file:

    name: /var/log/shop-multi-agent.log

    max-size: 10MB

    total-size-capacity: 100MB

十四、架构设计文档

14.1 系统架构总图

系统架构总图

├── 表现层

│   ├── REST API接口

│   └── 监控界面

├── 应用层

│   ├── 控制器层

│   ├── 服务层

│   │   ├── 智能体服务

│   │   ├── 流程编排服务

│   │   └── 工具服务

│   └── 业务逻辑层

├── 数据层

│   ├── 订单数据

│   ├── 用户数据

│   ├── 商品数据

│   └── 支付数据

└── 基础设施层

    ├── 线程池

    ├── 缓存

    ├── 日志

    └── 监控

14.2 核心组件交互图

核心组件交互图

用户 → 控制器 → 工作流服务 → StateGraph

       ↑         ↑            ↑

       │         │            └─ 节点执行器

       │         └─ 智能体管理器

       └─ 响应处理器

14.3 智能体协作时序图

智能体协作时序图

用户发送请求

    ↓

控制器接收

    ↓

工作流服务启动

    ↓

StateGraph开始执行

    ↓

OrderAgent执行

    ↓

PaymentAgent执行

    ↓

LogisticsAgent执行

    ↓

DeliveryAgent执行

    ↓

CustomerServiceAgent执行

    ↓

返回最终结果

14.4 技术选型决策矩阵

技术选型

优势

劣势

风险

决策

Spring AI

与 Spring 生态无缝集成

仍在快速迭代

版本兼容性

采用

OpenAI

成熟稳定

成本较高

服务依赖

采用

支付宝 MCP

官方支持

文档较少

接口变更

采用

Redis 向量数据库

性能优秀

内存占用

数据持久化

采用

StateGraph

流程清晰

学习成本

复杂性

采用

14.5 性能指标预期

指标

目标值

测量方法

响应时间

< 2 秒(95%)

监控 API 调用耗时

并发能力

1000 QPS

压力测试

成功率

> 99.9%

统计成功 / 失败次数

资源占用

< 2GB 内存

监控系统资源

延迟

< 500ms

测量端到端延迟

14.6 扩展性设计说明

系统的扩展性通过以下机制实现:

插件化架构:每个智能体都是独立的插件,可以动态加载和替换

接口抽象:通过统一的 Agent 接口和 Skill 接口,支持不同实现

事件驱动:智能体间通过事件通信,降低直接依赖

配置驱动:大部分业务规则可以通过配置文件修改

模块化设计:系统按功能模块划分,各模块独立开发和部署

14.7 安全性设计

系统安全性考虑:

认证授权:所有 API 接口需要身份认证

数据加密:敏感数据如支付信息需要加密存储和传输

访问控制:不同用户有不同的访问权限

审计日志:所有关键操作都需要记录审计日志

限流保护:防止恶意攻击和滥用

总结

本系统基于 Spring AI 框架构建了一个完整的购物场景多智能体协作系统。通过 5 个独立智能体的协同工作,实现了 "下单 - 支付 - 物流 - 交付 - 客服" 全流程的智能化管理。系统采用了先进的技术架构,包括 StateGraph 流程编排、线程池管理、OpenAI 原生接口集成、支付宝 MCP 工具集成、RAG 知识库等核心技术。

系统设计严格遵循低耦合高扩展的原则,通过 Skill 设计模式、事件驱动架构、模块化设计等手段,确保系统具有良好的可维护性和可扩展性。每个智能体都可以独立开发、测试和部署,大大提高了开发效率和系统的可靠性。

在实际应用中,该系统可以处理高并发的购物请求,提供个性化的购物体验,同时通过智能化的客服系统提升用户满意度。通过完善的监控和告警机制,确保系统的稳定运行。

未来,可以通过增加更多的智能体、优化 AI 模型、扩展知识库等方式,进一步提升系统的智能化水平和用户体验。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐