Spring AI Alibaba学习记录(工作流/Graph)
一、Workflow 的直观类比
这里不再展开底层定义(如 states、nodes、edges 的标准术语),先用一个不恰当但好理解的比喻:
- 把食物在人体中的经历看作一个工作流过程。
- states 类比为“食物在各阶段的状态”。
- edges 类比为“预先规划好的流转路线”。
- nodes 类比为“执行加工的器官”。
一个简化路线(edges)可以理解为:
嘴 -> 胃 -> 小肠 -> 膀胱/大肠 -> 排出体外
对应nodes是:嘴、胃、小肠、膀胱、大肠等。
例如“吃了一个苹果”后,可以这样理解:
- 苹果先进入嘴(节点)被咀嚼加工。
- 判断当前结果是否达到目标状态(类比最终排泄状态),若否,沿着边继续流转。
- 进入胃继续消化加工,再判断,若否继续。
- 进入小肠进行吸收,再判断,若否继续。
- 进入大肠/膀胱进行后续加工,判断为是时,执行“排出体外”。
这个比喻只是帮助理解。实际系统里会更复杂,常见情况是某些 state 在流程中途就已经满足目标,直接提前返回,而不是一定走完整条路径。
二、为什么要在单 Agent 项目中引入 Workflow
场景背景
有一个智能养老项目,对家属开放能力,方便家属随时了解老人情况。
家属提问:
“帮我查询我爸近几天的健康情况。”
单体 Agent 的典型做法是识别到对应工具后直接调用,例如:
@Tool(description = "家属查询老人健康记录。仅支持已登录家属身份查询其绑定的老人。")
public String queryElderHealth(Long elderlyId,String startDate,String endDate) {
//业务逻辑...
}
这类单轮需求没有问题,但需求一旦叠加动作,就会迅速暴露单体流程的扩展问题。
需求逐步升级时,单体 Agent 的问题
- “查询我爸近几天的健康情况,如果出现问题通知我。”
- 需要改代码接入通知模块。
- “查询我爸近几天的健康情况,如果出现问题通知我,并尝试帮我在医院挂号。”
- 需要继续改代码接入挂号模块。
- “查询我爸近几天的健康情况,如果出现问题不用通知我,直接给我爸挂号。”
- 逻辑分支再变化,又要继续改动流程。
最终会出现:
- 分支组合越来越多(只通知、只挂号、通知+挂号、不同触发条件)。
- 业务策略变化频繁,主流程代码频繁修改。
- 功能可复用性差,维护成本不断增加。
三、Workflow 的处理方式
采用 Workflow 后,可以由 Agent 先做裁决,再按图执行节点:
- 查询节点:先获取老人近几天健康数据。
- 风险判断节点:判断是否“出现问题”。
- 通知节点:只有在用户允许通知时才执行通知服务。
- 挂号节点:根据用户意图和判断结果决定是否尝试挂号。
- 汇总节点:统一整合执行结果并回复家属。
核心区别是:
- 业务能力拆成可组合节点。
- 路由由 edges 控制,按条件走不同分支。
- 节点是否执行由状态与策略决定,而不是写死在一段线性代码里。
总之在这个智能养老案例里,Workflow 的价值不是“替代查询”,而是“组织复杂动作”:
- 同一套流程可同时支持“只通知 / 只挂号 / 通知+挂号”等组合。
- 新需求多数通过新增节点或调整连线实现,不必反复改主干代码。
- 流程具备更强的可扩展性、可维护性与策略灵活性。
四.基础使用
4.1首先还是万年不变的Controller层实现接口
import cn.lc.sunnyside.Auth.FamilyLoginContextHolder;
import cn.lc.sunnyside.Workflow.Health.HealthWorkflowService;
import cn.lc.sunnyside.Workflow.SunnySideWorkflowService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.Map;
/**
* 工作流测试控制器
*/
@RestController
@RequestMapping("/api/workflow")
public class WorkflowController {
//这里是构造函数注入(@Autowired也可以)
private final HealthWorkflowService healthWorkflowService;
public WorkflowController(HealthWorkflowService healthWorkflowService) {
this.healthWorkflowService = healthWorkflowService;
}
/**
* 体验家属健康查询专属工作流
* @param query 用户输入,例如:“查一下我爸昨天的健康状况”
* @param phone 手动传入的手机号(测试用),如果不传则尝试从登录上下文中获取
* @return 工作流执行结果
*/
@GetMapping("/health-chat")
public String healthWorkflowChat(
@RequestParam(name = "query", defaultValue = "查一下我爸昨天的健康状况") String query,
@RequestParam(name = "phone", required = false) String phone) {
String familyPhone = phone;
// 如果未手动传入手机号,尝试从全局登录态中获取
if (familyPhone == null || familyPhone.isBlank()) {
familyPhone = FamilyLoginContextHolder.get().map(ctx -> ctx.phone()).orElse(null);
}
return healthWorkflowService.executeWorkflow(query, familyPhone);
}
}
4.2服务层的实现
主要是定义工作流定义节点并执行
import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.action.AsyncNodeAction;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Service
public class HealthWorkflowService {
//就根我们使用redis一样引用工作流的客户端
private final CompiledGraph workflow;
//构造函数定义工作流
public HealthWorkflowService(AnalyzeIntentNode analyzeNode,
FetchHealthDataNode fetchNode,
GenerateReplyNode replyNode) {
// 1. 初始化图结构
StateGraph graph = new StateGraph();
try {
// 2. 注册节点
graph.addNode("analyzeNode", AsyncNodeAction.node_async(analyzeNode));
graph.addNode("fetchNode", AsyncNodeAction.node_async(fetchNode));
graph.addNode("replyNode", AsyncNodeAction.node_async(replyNode));
// 3. 编排边(定义执行的先后顺序)
// 流转:START -> analyzeNode -> fetchNode -> replyNode -> END
graph.addEdge(StateGraph.START, "analyzeNode");
graph.addEdge("analyzeNode", "fetchNode");
graph.addEdge("fetchNode", "replyNode");
graph.addEdge("replyNode", StateGraph.END);
// 4. 编译工作流
this.workflow = graph.compile();
} catch (Exception e) {
throw new RuntimeException("构建健康查询工作流失败", e);
}
}
/**
* 触发并执行工作流
*
* @param query 用户输入
* @param familyPhone 家属手机号(作为上下文传入 State)
* @return 最终回复
*/
public String executeWorkflow(String query, String familyPhone) {
// 构造初始State(也就是我们之前提到的食物)
Map<String, Object> inputs = new HashMap<>();
inputs.put("query", query);
// 此时就开始执行工作流了,建议下面的返回结果代码暂时跳过,直接看下一部分的代码段
Optional<OverAllState> stateOpt = workflow.invoke(inputs);
if (stateOpt.isPresent()) {
// 从返回的 OverAllState 中提取输出字段 "final_reply"
OverAllState state = stateOpt.get();
Optional<Object> finalReplyOpt = state.value("final_reply");
if (finalReplyOpt.isPresent()) {
Object finalReplyObj = finalReplyOpt.get();
return finalReplyObj.toString();
} else {
return "工作流执行异常,无返回结果。";
}
}
return "工作流执行失败。";
}
}
4.3数据的加工流程
上面的代码我们知道工作流是
analyzeNode.java -> fetchNode.java -> replyNode.java -> END
下面我会提供这三个文件(分析用户意图->是否执行业务代码->整理返回结果)个人理解均在注释中记得看注释
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
/**
* 节点1:意图分析节点
* 专门使用大模型分析用户的输入,提取是否需要查询健康数据以及目标日期
*/
@Component
public class AnalyzeIntentNode implements NodeAction {
//内置一个agent判断该节点是否加工
private final ChatClient chatClient;
private final ObjectMapper objectMapper = new ObjectMapper();
public AnalyzeIntentNode(ChatClient.Builder builder) {
this.chatClient = builder.build();
}
@Override
public Map<String, Object> apply(OverAllState state){
// 从状态中获取用户输入
String query = state.value("query").map(Object::toString).orElse("");
Map<String, Object> result = new HashMap<>();
// 构造专用的 Prompt 让大模型输出 JSON 格式
String prompt = String.format(
"请分析用户的输入,判断是否在询问老人的健康状况。\n" +
"请严格返回如下JSON格式,不要包含任何Markdown标记或其他说明文本:\n" +
"{\"is_health_query\": true/false, \"target_date\": \"yyyy-MM-dd\"}\n" +
"如果用户没有明确说明日期,默认使用今天(%s)。\n" +
"用户输入:%s",
LocalDate.now().toString(), query);
// 调用 ChatClient 生成回复
String jsonResponse = chatClient.prompt()
.user(prompt)
.call()
.content();
try {
// 解析Agent返回的JSON字段
JsonNode isHealthQueryNode = jsonNode.get("is_health_query");
// 如果JSON中没有is_health_query字段,默认设为 false(agent判断该问题不是询问健康方面的)
boolean isHealthQuery = isHealthQueryNode != null && isHealthQueryNode.asBoolean();
// 将提取出来的结构化参数写入 State 传递给下一个节点
result.put("is_health_query", isHealthQuery);
} catch (Exception e) {
// 解析失败时,默认降级为非健康查询(走普通聊天流)
result.put("is_health_query", false);
}
return result;
}
}
import cn.lc.sunnyside.Service.FamilyAccessService;
import cn.lc.sunnyside.Service.HealthRecordService;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
/**
* 节点2:健康数据查询节点(纯 Java 业务逻辑,不涉及大模型)
* 根据上一个节点解析出的参数,执行严格的数据库查询
*/
@Component
public class FetchHealthDataNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) {
Map<String, Object> result = new HashMap<>();
boolean isHealthQuery = state.value("is_health_query").map(o -> (Boolean) o).orElse(false);
// 如果上一个节点判断为【查健康】,则执行查询逻辑
if (isHealthQuery) {
/*
传统的sql查询逻辑这里省略,假设成功得到实体healthData.....
*/
// 将查询到的硬核数据放入 State
result.put("health_data", healthData);
}
return result;
}
}
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 节点3:最终回复生成节点
* 结合用户的原始问题和上一步查出的真实健康数据,让大模型生成带有情感关怀的回复。
*/
@Component
public class GenerateReplyNode implements NodeAction {
//构建聊天模块
private final ChatClient chatClient;
public GenerateReplyNode(ChatClient.Builder builder) {
this.chatClient = builder.build();
}
@Override
public Map<String, Object> apply(OverAllState state) {
Map<String, Object> result = new HashMap<>();
// 从 State 中获取是否是健康查询的标志位
boolean isHealthQuery = state.value("is_health_query").map(o -> (Boolean) o).orElse(false);
String answer;
if (isHealthQuery) {
// 从 State 中获取健康数据
String healthData = state.value("health_data").map(Object::toString).orElse("");
// 组装专门用于回复生成的 Prompt
String prompt = String.format(
"你是一个养老院的专属健康助手,语气要温柔、关切、专业。\n" +
"请根据以下由系统查询到的老人真实健康数据,回答家属的提问。\n\n" +
"家属提问:%s\n" +
"健康数据:%s\n\n" +
"请直接给出回复,不要说多余的废话,不要暴露系统查询的底层细节。",
query, healthData);
answer = chatClient.prompt().user(prompt).call().content();
} else {
// 如果不是健康查询,退化为普通聊天(例如只是问好)
answer = chatClient.prompt().user(query).call().content();
}
// 将最终答案写入 State
result.put("final_reply", answer);
return result;
}
}
至此一个查询健康功能实现了,参考案例可以写出通知/挂号功能在节点2纯java逻辑中实现
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)