路由工作流智能体

实现流程:

  • 我们把原来的单一智能体改成了5个智能体一起协同工作。
  • 当用户提出问题时,首先会发送给【意图分析智能体】,它会判断用户是想让我们推荐课程、查询课程信息还是购买课程。
  • 一旦明确了用户的意图,就会根据不同的需求调用相应的智能体来完成任务,比如推荐课程或购买课程等。
  • 这样做的好处是每个智能体都有明确的任务分工,并且只有在需要时才会调用特定的工具,不需要所有智能体都配备全套工具。这样一来,整个系统变得更加灵活高效了。

实现分析:

将单一的智能体,改造成5个智能体协同工作,每个智能体必然会有一些部分代码是重复的,所以需要定义个interface Agent,用来定义Agent的通用标准方法,并且也需要提供一个抽象类实现,将通用的业务实现写到这个抽象类中。

定义类型枚举:

不同的智能体,是需要通过类型来区分的,比较好的一种方式就是定义类型枚举。

@Getter
public enum AgentTypeEnum {
    ROUTE("ROUTE", "路由智能体"),
    RECOMMEND("RECOMMEND", "课程推荐智能体"),
    CONSULT("CONSULT", "课程咨询智能体"),
    BUY("BUY", "课程购买智能体"),
    KNOWLEDGE("KNOWLEDGE", "知识讲解智能体");

    private final String agentName;
    private final String desc;

    AgentTypeEnum(String agentName, String desc) {
        this.agentName = agentName;
        this.desc = desc;
    }
    @Override
    public String toString() {
        return this.name();
    }

    /**
     * 通过智能体的名称查找枚举
     */
    public static AgentTypeEnum agentNameOf(String agentName) {
        return EnumUtil.getBy(AgentTypeEnum::getAgentName, agentName);
    }

}

定义Agent接口:

我们可以想一下,每个智能体都有什么相关的方法,就把他们抽象出来,形成一个Agent interface,子类只需要实现接口即可。

应该有的方法:

  • process (普通对话)
  • processStream (流式对话)
  • getAgentType (获取智能体类型)
  • stop (停止方法)
  • systemMessage (获取系统提示词方法)

以上这些都是基本的操作方法。实际上,对于一个智能体而言,与大模型或Tools交互,还需要一些设定,比如toolContext、advisors等,所以还需要额外的加一个方法:

  • tools (工具集)
  • toolContext (工具上下文参数)
  • advisors (Advisor列表)
  • advisorParams (Advisor参数列表)
  • systemMessageParams (系统提示词中的参数列表)
public interface Agent {

    /**
     * 表示空参数的预定义数组
     */
    Object[] EMPTY_OBJECTS = new Object[0];

    /**
     * 处理流式请求(如流式回答)
     *
     * @param question  用户输入的问题
     * @param sessionId 会话唯一标识
     * @return 包含中间结果的反应式事件流(Flux)
     */
    Flux<ChatEventVO> processStream(String question, String sessionId);

    /**
     * 处理标准请求(非流式)
     *
     * @param question  用户输入的问题
     * @param sessionId 会话唯一标识
     * @return 最终处理结果字符串
     */
    String process(String question, String sessionId);

    /**
     * 获取智能体类型标识
     *
     * @return 代理类型枚举值(如:ROUTE、RECOMMEND等)
     */
    AgentTypeEnum getAgentType();

    /**
     * 停止指定会话的处理
     *
     * @param sessionId 需要终止的会话ID
     */
    void stop(String sessionId);

    /**
     * 获取系统提示信息模板,默认为空字符串,子类可以覆盖重写该方法以返回自定义的系统提示信息。
     *
     * @return 系统提示的文本模板
     */
    default String systemMessage() {
        return "";
    }


    /**
     * 获取工具列表,默认返回空数组。子类需根据需求覆盖此方法。
     */
    default Object[] tools() {
        return EMPTY_OBJECTS;
    }

    /**
     * 创建并返回一个工具上下文的空Map对象。
     *
     * @param sessionId 会话标识符
     * @param requestId 请求标识符
     * @return 默认返回一个空的Map对象,子类可以覆盖重写该方法以返回自定义的工具上下文。
     */
    default Map<String, Object> toolContext(String sessionId, String requestId) {
        return Map.of();
    }

    /**
     * Advisor列表,默认返回空对象
     */
    default List<Advisor> advisors() {
        return List.of();
    }

    /**
     * 创建并返回一个Advisor的空Map对象。
     *
     * @param sessionId 会话标识符
     * @param requestId 请求标识符
     * @return 默认返回一个空的Map对象,子类可以覆盖重写该方法以返回自定义的工具上下文。
     */
    default Map<String, Object> advisorParams(String sessionId, String requestId) {
        return Map.of();
    }

    /**
     * 获取系统提示信息模板的参数,默认为空Map,子类可以覆盖重写该方法以返回自定义的系统提示信息参数。
     */
    default Map<String, Object> systemMessageParams() {
        return Map.of();
    }

}

 编写抽象类:

@Slf4j
public abstract class AbstractAgent implements Agent {

    @Resource
    private ChatSessionService chatSessionService;
    @Resource
    private ChatClient chatClient;
    @Resource
    private ChatMemory chatMemory;

    // 输出结束的标记
    public static final ChatEventVO STOP_EVENT = ChatEventVO.builder().eventType(ChatEventTypeEnum.STOP.getValue()).build();

    // 存储大模型的生成状态,这里采用ConcurrentHashMap是确保线程安全
    // 目前的版本暂时用Map实现,如果考虑分布式环境的话,可以考虑用redis来实现
    public static final Map<String, Boolean> GENERATE_STATUS = new ConcurrentHashMap<>();

    @Override
    public String process(String question, String sessionId) {
        // 获取用户id
        var userId = UserContext.getUser();
        var requestId = this.generateRequestId();

        //更新会话时间
        this.chatSessionService.update(sessionId, question, userId);

        return this.getChatClientRequest(sessionId, requestId, question)
                .call()
                .content();
    }

    public Flux<ChatEventVO> processStream(String question, String sessionId) {
        // 获取用户id
        var userId = UserContext.getUser();
        var requestId = this.generateRequestId();
        // 大模型输出内容的缓存器,用于在输出中断后的数据存储
        var outputBuilder = new StringBuilder();
        // 获取对话id
        var conversationId = ChatService.getConversationId(sessionId);

        //更新会话时间
        this.chatSessionService.update(sessionId, question, userId);

        return this.getChatClientRequest(sessionId, requestId, question)
                .stream()
                .chatResponse()
                .doFirst(() -> GENERATE_STATUS.put(sessionId, true)) // 第一次输出内容时执行
                .doOnError(throwable -> GENERATE_STATUS.remove(sessionId)) // 出现异常时,删除标识
                .doOnComplete(() -> GENERATE_STATUS.remove(sessionId)) // 完成时执行,删除标识
                .doOnCancel(() -> {
                    // 当输出被取消时,保存输出的内容到历史记录中
                    this.saveStopHistoryRecord(conversationId, outputBuilder.toString());
                })
                .takeWhile(response -> { // 通过返回值来控制Flux流是否继续,true:继续,false:终止
                    return GENERATE_STATUS.getOrDefault(sessionId, false);
                })
                .map(chatResponse -> {
                    var finishReason = chatResponse.getResult().getMetadata().getFinishReason();
                    if (StrUtil.equals(Constant.STOP, finishReason)) {
                        var messageId = chatResponse.getMetadata().getId();
                        ToolResultHolder.put(messageId, Constant.REQUEST_ID, requestId);
                    }

                    // 获取大模型的输出的内容
                    var text = chatResponse.getResult().getOutput().getText();
                    // 追加到输出内容中
                    outputBuilder.append(text);
                    // 封装响应对象
                    return ChatEventVO.builder()
                            .eventData(text)
                            .eventType(ChatEventTypeEnum.DATA.getValue())
                            .build();
                })
                .concatWith(Flux.defer(() -> {
                    // 通过请求id获取到参数列表,如果不为空,就将其追加到返回结果中
                    var map = ToolResultHolder.get(requestId);
                    if (CollUtil.isNotEmpty(map)) {
                        ToolResultHolder.remove(requestId); // 清除参数列表

                        // 响应给前端的参数数据
                        var chatEventVO = ChatEventVO.builder()
                                .eventData(map)
                                .eventType(ChatEventTypeEnum.PARAM.getValue())
                                .build();
                        return Flux.just(chatEventVO, STOP_EVENT);
                    }
                    return Flux.just(STOP_EVENT);
                }));
    }

    private ChatClient.ChatClientRequestSpec getChatClientRequest(String sessionId, String requestId, String question) {
        return this.chatClient.prompt()
                .system(promptSystem -> promptSystem.text(this.systemMessage()).params(this.systemMessageParams()))
                .advisors(advisor -> advisor.advisors(this.advisors()).params(this.advisorParams(sessionId, requestId)))
                .tools(this.tools())
                .toolContext(this.toolContext(sessionId, requestId))
                .user(question);
    }

    /**
     * 保存停止输出的记录
     *
     * @param conversationId 对话id
     * @param content   大模型输出的内容
     */
    private void saveStopHistoryRecord(String conversationId, String content) {
        this.chatMemory.add(conversationId, new AssistantMessage(content));
    }

    private String generateRequestId() {
        return IdUtil.fastSimpleUUID();
    }

    @Override
    public Map<String, Object> advisorParams(String sessionId, String requestId) {
        var conversationId = ChatService.getConversationId(sessionId);
        return Map.of(ChatMemory.CONVERSATION_ID, conversationId);
    }

    @Override
    public void stop(String sessionId) {
        GENERATE_STATUS.remove(sessionId);
    }
}

路由智能体

设置系统提示词:

路由Agent的提示词:

# 角色
天机AI意图分析师

## 能力
1. 识别用户意图并匹配对应编号:
   - RECOMMEND(课程推荐)
   - BUY(课程购买)
   - CONSULT(课程咨询)
   - KNOWLEDGE(知识讲解)
2. 特殊场景处理:
   - 识别关键词触发意图:
     - BUY: 确认购买/下单/是的确认
     - RECOMMEND: 包含年龄/学历/兴趣信息
   - 识别问候语并礼貌回应:你好/您好
3. 非相关提问时礼貌拒答

## 约束
精准识别,避免误判

## 输出
- 匹配意图时返回编号
- 问候语场景返回「您好!有什么可以帮您?」
- 无匹配时用自然语言回复

## 示例
输入:20岁本科想学Java → RECOMMEND  
输入:现在要下单 → BUY  
输入:这个课程多少钱 → CONSULT
输入:java是什么 → KNOWLEDGE
输入:你好 → 您好!有什么可以帮您?  
输入:今天天气 → 抱歉我只处理课程相关问题

读取nacos配置(提示词):

tj:
  ai:
    prompt:
      system:
        chat:
          data-id: system-chat-message.txt
          group: DEFAULT_GROUP
          timeout-ms: 20000
        route-agent:
          data-id: route-agent-system-message.txt

@Data
@Configuration
@ConfigurationProperties(prefix = "tj.ai.prompt")
public class AIProperties {

    private System system; // 系统提示语,用于课程推荐、购买业务

    @Data
    public static class System {
        private Chat chat; // 系统提示语,用于课程推荐、购买业务
        private Chat routeAgent; // 路由智能体系统提示词

        @Data
        public static class Chat {
            private String dataId;
            private String group = "DEFAULT_GROUP";
            private long timeoutMs = 20000L; // 读取的超时时间,单位毫秒
        }
    }
}

加载配置:

package com.tianji.aigc.config;
// 省略一些代码........
public class SystemPromptConfig {

    // 省略一些代码........

    // 使用原子引用,保证线程安全
    private final AtomicReference<String> chatSystemMessage = new AtomicReference<>();
    private final AtomicReference<String> routeAgentSystemMessage = new AtomicReference<>();
    
    @PostConstruct // 初始化时加载配置
    public void init() {
        // 读取配置文件
        loadConfig(aiProperties.getSystem().getChat(), chatSystemMessage);
        loadConfig(aiProperties.getSystem().getRouteAgent(), routeAgentSystemMessage);
    }
// 省略一些代码........
}

 编写智能体:

package com.tianji.aigc.agent;

import com.tianji.aigc.config.SystemPromptConfig;
import com.tianji.aigc.enums.AgentTypeEnum;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

/**
 * 路由智能体
 */
@Component
@RequiredArgsConstructor
public class RouteAgent extends AbstractAgent {

    private final SystemPromptConfig systemPromptConfig;

    @Override
    public String systemMessage() {
        return this.systemPromptConfig.getRouteAgentSystemMessage().get();
    }

    @Override
    public AgentTypeEnum getAgentType() {
        return AgentTypeEnum.ROUTE;
    }

}

课程推荐智能体:

 编写智能体:

package com.tianji.aigc.agent;

import com.tianji.aigc.config.SystemPromptConfig;
import com.tianji.aigc.constants.Constant;
import com.tianji.aigc.enums.AgentTypeEnum;
import com.tianji.aigc.tools.CourseTools;
import com.tianji.common.utils.UserContext;
import lombok.RequiredArgsConstructor;
import org.springframework.ai.chat.client.advisor.api.Advisor;
import org.springframework.ai.chat.client.advisor.vectorstore.QuestionAnswerAdvisor;
import org.springframework.ai.vectorstore.SearchRequest;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
@RequiredArgsConstructor
public class RecommendAgent extends AbstractAgent {

    private final SystemPromptConfig systemPromptConfig;
    private final VectorStore vectorStore;
    private final CourseTools courseTools;

    @Override
    public String systemMessage() {
        return this.systemPromptConfig.getRecommendAgentSystemMessage().get();
    }

    @Override
    public AgentTypeEnum getAgentType() {
        return AgentTypeEnum.RECOMMEND;
    }

    @Override
    public List<Advisor> advisors() {
        // 创建RAG增强
        var qaAdvisor = QuestionAnswerAdvisor.builder(this.vectorStore)
                .searchRequest(SearchRequest.builder().similarityThreshold(0.6d).topK(6).build())
                .build();
        return List.of(qaAdvisor);
    }

    @Override
    public Object[] tools() {
        return new Object[]{courseTools};
    }

    @Override
    public Map<String, Object> toolContext(String sessionId, String requestId) {
        var userId = UserContext.getUser();
        return Map.of(
                Constant.USER_ID, userId, // 设置用户id参数
                Constant.REQUEST_ID, requestId  // 设置请求id参数
        );
    }

}

课程咨询智能体:

 编写智能体:

package com.tianji.aigc.agent;

import cn.hutool.core.date.DateUtil;
import com.tianji.aigc.config.SystemPromptConfig;
import com.tianji.aigc.constants.Constant;
import com.tianji.aigc.enums.AgentTypeEnum;
import com.tianji.aigc.tools.CourseTools;
import com.tianji.common.utils.UserContext;
import lombok.RequiredArgsConstructor;
import org.springframework.ai.chat.client.advisor.api.Advisor;
import org.springframework.ai.chat.client.advisor.vectorstore.QuestionAnswerAdvisor;
import org.springframework.ai.vectorstore.SearchRequest;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
 * 课程咨询智能体
 */
@Component
@RequiredArgsConstructor
public class ConsultAgent extends AbstractAgent {

    private final SystemPromptConfig systemPromptConfig;
    private final VectorStore vectorStore;
    private final CourseTools courseTools;

    @Override
    public String systemMessage() {
        return this.systemPromptConfig.getConsultAgentSystemMessage().get();
    }

    @Override
    public AgentTypeEnum getAgentType() {
        return AgentTypeEnum.CONSULT;
    }

    @Override
    public List<Advisor> advisors() {
        // 创建RAG增强
        var qaAdvisor = QuestionAnswerAdvisor.builder(this.vectorStore)
                .searchRequest(SearchRequest.builder().similarityThreshold(0.6d).topK(6).build())
                .build();
        return List.of(qaAdvisor);
    }

    @Override
    public Object[] tools() {
        return new Object[]{courseTools};
    }

    @Override
    public Map<String, Object> toolContext(String sessionId, String requestId) {
        var userId = UserContext.getUser();
        return Map.of(
                Constant.USER_ID, userId, // 设置用户id参数
                Constant.REQUEST_ID, requestId  // 设置请求id参数
        );
    }

    @Override
    public Map<String, Object> systemMessageParams() {
        return Map.of("now", DateUtil.now());
    }
}

课程购买智能体:

 编写智能体:

@Component
@RequiredArgsConstructor
public class BuyAgent extends AbstractAgent {

    private final SystemPromptConfig systemPromptConfig;
    private final OrderTools orderTools;

    @Override
    public String systemMessage() {
        return this.systemPromptConfig.getBuyAgentSystemMessage().get();
    }

    @Override
    public AgentTypeEnum getAgentType() {
        return AgentTypeEnum.BUY;
    }

    @Override
    public Object[] tools() {
        return new Object[]{orderTools};
    }

    @Override
    public Map<String, Object> toolContext(String sessionId, String requestId) {
        var userId = UserContext.getUser();
        return Map.of(
                Constant.USER_ID, userId, // 设置用户id参数
                Constant.REQUEST_ID, requestId  // 设置请求id参数
        );
    }
}

问题讲解智能体:

 编写智能体:

@Component
@RequiredArgsConstructor
public class KnowledgeAgent extends AbstractAgent {

    private final SystemPromptConfig systemPromptConfig;

    @Override
    public String systemMessage() {
        return this.systemPromptConfig.getKnowledgeAgentSystemMessage().get();
    }

    @Override
    public AgentTypeEnum getAgentType() {
        return AgentTypeEnum.KNOWLEDGE;
    }

}

总结:

上述分智能体提示词工程各有所同,仅需要独立进行agent的配置:

  • agent类型
  • 提示词及其参数
  • 工具及其参数
  • 知识库RAG的调用

多智能体协调工作:

已经实现了多个智能体,这些智能体都是独立运行,接下来我们就需要把他们整合起来,一起协调工作,完成天机AI助理。

编写AgentServiceImpl实现类:

  1. 根据路由代理类型查询到路由Agent的实例。
  2. 路由Agent分析用户意图返回对应Agent的代理类型。
  3. 通过代理类型查到对应的Agent实例。
  4. 执行任务。
  5. 若没查到对应Agent的代理类型,则直接输出返回路由Agent返回的内容进行封装。

在 Spring 框架中,SpringUtil.getBeansOfType(Agent.class) 的作用:

  • 如果 Agent 是接口:会找到所有实现了该接口的类的 Bean

  • 如果 Agent 是:会找到所有该类或其子类的 Bean

  • 也会考虑 @Primary@Priority 等注解,但 getBeansOfType 会返回所有匹配的 Bean,不只是主要的那个


@Service
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "tj.ai", name = "chat-type", havingValue = "ROUTE")
public class AgentServiceImpl implements ChatService {

    @Override
    public Flux<ChatEventVO> chat(String question, String sessionId) {
        // 先通过路由智能体,分析用户的意图,再执行后面的逻辑
        var result = this.findAgentByType(AgentTypeEnum.ROUTE).process(question, sessionId);
        var agentTypeEnum = AgentTypeEnum.agentNameOf(result);

        var agent = this.findAgentByType(agentTypeEnum);
        if (agent == null) {
            // 找不到对应的智能体,直接返回结果
            var chatEventVO = ChatEventVO.builder()
                    .eventType(ChatEventTypeEnum.DATA.getValue())
                    .eventData(result)
                    .build();
            return Flux.just(chatEventVO, AbstractAgent.STOP_EVENT);
        }
        // 执行智能体的逻辑
        return agent.processStream(question, sessionId);
    }

    /**
     * 根据代理类型查找对应的Agent实例
     *
     * @param agentTypeEnum 要查找的代理类型
     * @return 与给定类型匹配的Agent实例,如果未找到或类型为null则返回null
     */
    private Agent findAgentByType(AgentTypeEnum agentTypeEnum) {
        if (agentTypeEnum == null) {
            return null;
        }
        var beans = SpringUtil.getBeansOfType(Agent.class);
        // 遍历所有Agent Bean查找匹配类型
        for (var agent : beans.values()) {
            if (agentTypeEnum == agent.getAgentType()) {
                return agent;
            }
        }
        return null;
    }

    /**
     * 停止生成
     *
     * @param sessionId 会话ID
     */
    @Override
    public void stop(String sessionId) {
        this.findAgentByType(AgentTypeEnum.ROUTE).stop(sessionId);
    }
}

改造ChatServiceImpl:

tj:
  ai:
    chat-type: ROUTE # ROUTE / ENHANCE / APP

基于yaml配置类进行配置,来确定是单增强智能体或者多智能体交互路由工作流进行实现IChatService接口,区分实现类。

@ConditionalOnProperty(prefix = "tj.ai", name = "chat-type", havingValue = "ROUTE")
@ConditionalOnProperty(prefix = "tj.ai", name = "chat-type", havingValue = "EXCHANGE")

改造SpringAIConfig:

在SpringAIConfig中,就不需要设置默认的Tool了,需要改造下,如下:

    /**
     * 配置 ChatClient
     */
    @Bean
    public ChatClient chatClient(ChatClient.Builder chatClientBuilder,
                                 Advisor loggerAdvisor,
                                 Advisor messageChatMemoryAdvisor,
                                 CourseTools courseTools, // 课程工具
                                 OrderTools orderTools // 预下单工具
    ) {  // 日志记录器
        return chatClientBuilder
                .defaultAdvisors(loggerAdvisor, messageChatMemoryAdvisor) //添加 Advisor 功能增强
                // .defaultTools(courseTools, orderTools) //添加默认工具
                .build();
    }

Bug修复:

关于最后的BUG,就是调用了process和processStream,所以存入了两条记录

1. 那么可以在SpringAIConfig里,取消【会话记录器】的全局注册注册,在processStream独自注册【会话记录保持器】

2. 那么process因为没有注册,所以提交大模型没有携带会话记录,可以手动查询记录

// 1. 这里没有注入【会话记录器】,所以手动获取会话记录
String conversationId = ChatService.getConversationId(sessionId);
List<Message> messages = chatMemoryRepository.findByConversationId(conversationId);
if (CollUtil.isNotEmpty(messages)) {
    messages.add(new UserMessage(question));
    List<String> list = messages.stream()
    .map(MessageUtil::toJson)
    .toList();
    question = JSONUtil.toJsonStr(list);
}
// 2. 将全部会话记录一起发送给大模型
return getChatClientRequest(sessionId, requestId, question)
.call().content();

这样可以不用记录优化器。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐