一、Workflow 的直观类比

这里不再展开底层定义(如 states、nodes、edges 的标准术语),先用一个不恰当但好理解的比喻:

  • 把食物在人体中的经历看作一个工作流过程。
  • states 类比为“食物在各阶段的状态”。
  • edges 类比为“预先规划好的流转路线”。
  • nodes 类比为“执行加工的器官”。

一个简化路线(edges)可以理解为:

嘴 -> 胃 -> 小肠 -> 膀胱/大肠 -> 排出体外

对应nodes是:嘴、胃、小肠、膀胱、大肠等。

例如“吃了一个苹果”后,可以这样理解:

  1. 苹果先进入嘴(节点)被咀嚼加工。
  2. 判断当前结果是否达到目标状态(类比最终排泄状态),若否,沿着边继续流转。
  3. 进入胃继续消化加工,再判断,若否继续。
  4. 进入小肠进行吸收,再判断,若否继续。
  5. 进入大肠/膀胱进行后续加工,判断为是时,执行“排出体外”。

这个比喻只是帮助理解。实际系统里会更复杂,常见情况是某些 state 在流程中途就已经满足目标,直接提前返回,而不是一定走完整条路径。

二、为什么要在单 Agent 项目中引入 Workflow

场景背景

有一个智能养老项目,对家属开放能力,方便家属随时了解老人情况。

家属提问:

“帮我查询我爸近几天的健康情况。”

单体 Agent 的典型做法是识别到对应工具后直接调用,例如:

   @Tool(description = "家属查询老人健康记录。仅支持已登录家属身份查询其绑定的老人。")
    public String queryElderHealth(Long elderlyId,String startDate,String endDate) {  
            //业务逻辑...
    }

这类单轮需求没有问题,但需求一旦叠加动作,就会迅速暴露单体流程的扩展问题。

需求逐步升级时,单体 Agent 的问题

  1. “查询我爸近几天的健康情况,如果出现问题通知我。”
    • 需要改代码接入通知模块。
  2. “查询我爸近几天的健康情况,如果出现问题通知我,并尝试帮我在医院挂号。”
    • 需要继续改代码接入挂号模块。
  3. “查询我爸近几天的健康情况,如果出现问题不用通知我,直接给我爸挂号。”
    • 逻辑分支再变化,又要继续改动流程。

最终会出现:

  • 分支组合越来越多(只通知、只挂号、通知+挂号、不同触发条件)。
  • 业务策略变化频繁,主流程代码频繁修改。
  • 功能可复用性差,维护成本不断增加。

三、Workflow 的处理方式

采用 Workflow 后,可以由 Agent 先做裁决,再按图执行节点:

  1. 查询节点:先获取老人近几天健康数据。
  2. 风险判断节点:判断是否“出现问题”。
  3. 通知节点:只有在用户允许通知时才执行通知服务。
  4. 挂号节点:根据用户意图和判断结果决定是否尝试挂号。
  5. 汇总节点:统一整合执行结果并回复家属。

核心区别是:

  • 业务能力拆成可组合节点。
  • 路由由 edges 控制,按条件走不同分支。
  • 节点是否执行由状态与策略决定,而不是写死在一段线性代码里。

总之在这个智能养老案例里,Workflow 的价值不是“替代查询”,而是“组织复杂动作”:

  • 同一套流程可同时支持“只通知 / 只挂号 / 通知+挂号”等组合。
  • 新需求多数通过新增节点或调整连线实现,不必反复改主干代码。
  • 流程具备更强的可扩展性、可维护性与策略灵活性。

四.基础使用

4.1首先还是万年不变的Controller层实现接口


import cn.lc.sunnyside.Auth.FamilyLoginContextHolder;
import cn.lc.sunnyside.Workflow.Health.HealthWorkflowService;
import cn.lc.sunnyside.Workflow.SunnySideWorkflowService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.Map;

/**
 * 工作流测试控制器
 */
@RestController
@RequestMapping("/api/workflow")
public class WorkflowController {

	//这里是构造函数注入(@Autowired也可以)
    private final HealthWorkflowService healthWorkflowService;

    public WorkflowController(HealthWorkflowService healthWorkflowService) {

        this.healthWorkflowService = healthWorkflowService;
    }

    /**
     * 体验家属健康查询专属工作流
     * @param query 用户输入,例如:“查一下我爸昨天的健康状况”
     * @param phone 手动传入的手机号(测试用),如果不传则尝试从登录上下文中获取
     * @return 工作流执行结果
     */
    @GetMapping("/health-chat")
    public String healthWorkflowChat(
            @RequestParam(name = "query", defaultValue = "查一下我爸昨天的健康状况") String query,
            @RequestParam(name = "phone", required = false) String phone) {
        
        String familyPhone = phone;
        // 如果未手动传入手机号,尝试从全局登录态中获取
        if (familyPhone == null || familyPhone.isBlank()) {
            familyPhone = FamilyLoginContextHolder.get().map(ctx -> ctx.phone()).orElse(null);
        }
        

        return healthWorkflowService.executeWorkflow(query, familyPhone);
    }
}

4.2服务层的实现

主要是定义工作流定义节点并执行


import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.action.AsyncNodeAction;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Service
public class HealthWorkflowService {

	//就根我们使用redis一样引用工作流的客户端
    private final CompiledGraph workflow;

	//构造函数定义工作流
    public HealthWorkflowService(AnalyzeIntentNode analyzeNode,
            FetchHealthDataNode fetchNode,
            GenerateReplyNode replyNode) {
        // 1. 初始化图结构
        StateGraph graph = new StateGraph();
        try {
            // 2. 注册节点
            graph.addNode("analyzeNode", AsyncNodeAction.node_async(analyzeNode));
            graph.addNode("fetchNode", AsyncNodeAction.node_async(fetchNode));
            graph.addNode("replyNode", AsyncNodeAction.node_async(replyNode));

            // 3. 编排边(定义执行的先后顺序)
            // 流转:START -> analyzeNode -> fetchNode -> replyNode -> END
            graph.addEdge(StateGraph.START, "analyzeNode");
            graph.addEdge("analyzeNode", "fetchNode");
            graph.addEdge("fetchNode", "replyNode");
            graph.addEdge("replyNode", StateGraph.END);

            // 4. 编译工作流
            this.workflow = graph.compile();
        } catch (Exception e) {
            throw new RuntimeException("构建健康查询工作流失败", e);
        }
    }

    /**
     * 触发并执行工作流
     * 
     * @param query       用户输入
     * @param familyPhone 家属手机号(作为上下文传入 State)
     * @return 最终回复
     */
    public String executeWorkflow(String query, String familyPhone) {
        // 构造初始State(也就是我们之前提到的食物)
        Map<String, Object> inputs = new HashMap<>();
        inputs.put("query", query);

        // 此时就开始执行工作流了,建议下面的返回结果代码暂时跳过,直接看下一部分的代码段
        Optional<OverAllState> stateOpt = workflow.invoke(inputs);

        if (stateOpt.isPresent()) {
            // 从返回的 OverAllState 中提取输出字段 "final_reply"
            OverAllState state = stateOpt.get();
            Optional<Object> finalReplyOpt = state.value("final_reply");

            if (finalReplyOpt.isPresent()) {
                Object finalReplyObj = finalReplyOpt.get();
                return finalReplyObj.toString();
            } else {
                return "工作流执行异常,无返回结果。";
            }
        }
        return "工作流执行失败。";
    }
}

4.3数据的加工流程

上面的代码我们知道工作流是

analyzeNode.java -> fetchNode.java -> replyNode.java -> END

下面我会提供这三个文件(分析用户意图->是否执行业务代码->整理返回结果)个人理解均在注释中记得看注释


import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;

/**
 * 节点1:意图分析节点
 * 专门使用大模型分析用户的输入,提取是否需要查询健康数据以及目标日期
 */
@Component
public class AnalyzeIntentNode implements NodeAction {

	//内置一个agent判断该节点是否加工
    private final ChatClient chatClient;
    
    private final ObjectMapper objectMapper = new ObjectMapper();

    public AnalyzeIntentNode(ChatClient.Builder builder) {
        this.chatClient = builder.build();
    }

    @Override
    public Map<String, Object> apply(OverAllState state){
        // 从状态中获取用户输入
        String query = state.value("query").map(Object::toString).orElse("");
        Map<String, Object> result = new HashMap<>();

        // 构造专用的 Prompt 让大模型输出 JSON 格式
        String prompt = String.format(
                "请分析用户的输入,判断是否在询问老人的健康状况。\n" +
                        "请严格返回如下JSON格式,不要包含任何Markdown标记或其他说明文本:\n" +
                        "{\"is_health_query\": true/false, \"target_date\": \"yyyy-MM-dd\"}\n" +
                        "如果用户没有明确说明日期,默认使用今天(%s)。\n" +
                        "用户输入:%s",
                LocalDate.now().toString(), query);

        // 调用 ChatClient 生成回复
        String jsonResponse = chatClient.prompt()
                                        .user(prompt)
                                        .call()
                                        .content();

        try {
            // 解析Agent返回的JSON字段
            JsonNode isHealthQueryNode = jsonNode.get("is_health_query");
            // 如果JSON中没有is_health_query字段,默认设为 false(agent判断该问题不是询问健康方面的)
            boolean isHealthQuery = isHealthQueryNode != null && isHealthQueryNode.asBoolean();
            // 将提取出来的结构化参数写入 State 传递给下一个节点
            result.put("is_health_query", isHealthQuery);
        } catch (Exception e) {
            // 解析失败时,默认降级为非健康查询(走普通聊天流)
            result.put("is_health_query", false);
        }

        return result;
    }
}


import cn.lc.sunnyside.Service.FamilyAccessService;
import cn.lc.sunnyside.Service.HealthRecordService;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;

/**
 * 节点2:健康数据查询节点(纯 Java 业务逻辑,不涉及大模型)
 * 根据上一个节点解析出的参数,执行严格的数据库查询
 */
@Component
public class FetchHealthDataNode implements NodeAction {
   

    @Override
    public Map<String, Object> apply(OverAllState state)  {
        Map<String, Object> result = new HashMap<>();

        boolean isHealthQuery = state.value("is_health_query").map(o -> (Boolean) o).orElse(false);

        // 如果上一个节点判断为【查健康】,则执行查询逻辑
        if (isHealthQuery) {
            /*
            传统的sql查询逻辑这里省略,假设成功得到实体healthData.....
			*/
            // 将查询到的硬核数据放入 State
            result.put("health_data", healthData);
        }

        return result;
    }
}

import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 节点3:最终回复生成节点
 * 结合用户的原始问题和上一步查出的真实健康数据,让大模型生成带有情感关怀的回复。
 */
@Component
public class GenerateReplyNode implements NodeAction {
	//构建聊天模块
    private final ChatClient chatClient;

    public GenerateReplyNode(ChatClient.Builder builder) {
        this.chatClient = builder.build();
    }

    @Override
    public Map<String, Object> apply(OverAllState state) {
        Map<String, Object> result = new HashMap<>();

   		// 从 State 中获取是否是健康查询的标志位
        boolean isHealthQuery = state.value("is_health_query").map(o -> (Boolean) o).orElse(false);

        String answer;
        
        if (isHealthQuery) {
        	// 从 State 中获取健康数据
            String healthData = state.value("health_data").map(Object::toString).orElse("");

            // 组装专门用于回复生成的 Prompt
            String prompt = String.format(
                    "你是一个养老院的专属健康助手,语气要温柔、关切、专业。\n" +
                            "请根据以下由系统查询到的老人真实健康数据,回答家属的提问。\n\n" +
                            "家属提问:%s\n" +
                            "健康数据:%s\n\n" +
                            "请直接给出回复,不要说多余的废话,不要暴露系统查询的底层细节。",
                    query, healthData);
            answer = chatClient.prompt().user(prompt).call().content();
        } else {
            // 如果不是健康查询,退化为普通聊天(例如只是问好)
            answer = chatClient.prompt().user(query).call().content();
        }

        // 将最终答案写入 State
        result.put("final_reply", answer);
        return result;
    }
}

至此一个查询健康功能实现了,参考案例可以写出通知/挂号功能在节点2纯java逻辑中实现

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐