Spring AI Alibaba 1.x 系列【46】Graph 工作流编排指南
·
1. 前言
Spring AI Alibaba Graph 可以改变您构建智能代理的思维方式。使用 Graph 构建代理时,首先把业务流程分解为节点(nodes)离散步骤;然后定义每个节点的决策与流转关系;最后通过共享状态(state)串联所有节点,每个节点均可读取、写入状态。
本教程以客服邮件处理代理为例,完整讲解 Graph 工作流编排落地流程。
2. 业务流程需求
2.1 能力要求
邮件处理 AI 智能体需完成以下能力:
- 读取客户邮件
- 按紧急程度、业务主题做意图分类
- 检索知识库文档解答问题
- 提交
Bug工单跟踪 - 复杂/高优问题上报人工客服
- 按需生成邮件回复、安排后续跟进
2.2 典型业务场景
- 简单产品问题:
如何重置我的密码? Bug报告:导出功能选择 PDF 格式时程序崩溃- 紧急账单问题:
订阅被重复扣费! - 功能请求:
APP 能否增加暗黑模式? - 复杂技术问题:
API 集成间歇性 504 超时错误
3. 流程编排
编排核心五步法:
- 把业务流程拆解为离散节点步骤
- 定义每个节点的职责、入参、上下文
- 设计全局共享
State状态结构与更新策略 - 逐个实现自定义
Node节点,完善异常处理 - 组装
StateGraph,配置边、条件路由、人工中断与持久化
3.1 步骤一:工作流拆解为离散节点
首先识别流程中的不同步骤。每个步骤将成为一个节点(执行特定任务的函数)。然后勾画这些步骤如何相互连接。
流程链路:

各节点职责说明:
| 节点 | 核心职责 |
|---|---|
| 读取邮件 | 提取、解析原始邮件内容与发件人信息 |
| 分类意图 | LLM 分析邮件紧急度、业务主题,决定流程路由 |
| 文档搜索 | 检索产品知识库,获取答疑参考资料 |
| Bug 跟踪 | 在工单系统创建/更新问题票据 |
| 起草回复 | 基于分类、文档、客户信息生成邮件草稿 |
| 人工审核 | 高优/复杂问题流转人工审批编辑 |
| 发送回复 | 最终邮件内容正式下发给客户 |
提示:
- 决策型节点:分类意图、起草回复、人工审核,自主决定下一跳节点;
- 串行固定节点:读取邮件→分类意图、文档搜索→起草回复,路径固定无分支。
3.2 步骤二:定义各节点类型与上下文
对于图中的每个节点,确定它代表什么类型的操作以及它需要什么上下文才能正常工作。
3.2.1 LLM 推理类节点
适用于文本理解、分类、生成、决策推理场景。
分类意图节点:
- 静态上下文:分类枚举、紧急度定义、输出格式规范
- 动态上下文:邮件正文、发件人信息
- 输出结果:结构化分类对象,用于流程路由
起草回复节点:
- 静态上下文:回复语气、公司规范、邮件模板
- 动态上下文:分类结果、知识库检索结果、客户历史信息
- 输出结果:可审核的专业邮件草稿
3.2.2 数据检索类节点
外部数据源查询、接口调用场景。
- 文档搜索节点:基于意图+主题构建查询词,异常容错、结果缓存可扩展
- 客户历史查询节点:通过客户邮箱/ID 拉取档案,异常降级兜底
3.2.3 外部操作类节点
执行外部业务动作。
- 发送回复节点:审批完成后触发邮件下发,不做缓存、保证幂等
Bug跟踪节点:Bug类意图自动创建工单,返回工单编号写入状态
3.2.4 人工干预类节点
需要人工审批、编辑、补充信息场景。
- 人工审核节点:携带原始邮件、草稿、紧急度等上下文
- 触发条件:高紧急度、复杂问题、回复质量存疑
3.3 步骤三:设计全局共享 State 与更新策略
设计原则:
- 跨步骤需要复用的数据,存入
State; - 可由现有字段计算派生的数据,不重复存储;
State只存原始结构化数据,不存格式化提示文案、模板;- 提示文本在节点内按需动态拼接,解耦状态与
prompt。
需持久化字段:
- 原始邮件内容、发件人、邮件
ID - 邮件分类结构化结果
- 知识库搜索结果、客户档案数据
- 邮件草稿、审核数据
- 流程日志、状态标识、下一跳节点标识
状态实体与键策略定义:
// 邮件分类结构化实体
public static class EmailClassification {
private String intent; // question / bug / billing / feature / complex
private String urgency; // low / medium / high / critical
private String topic;
private String summary;
public EmailClassification() {}
public EmailClassification(String intent, String urgency, String topic, String summary) {
this.intent = intent;
this.urgency = urgency;
this.topic = topic;
this.summary = summary;
}
// getter & setter
public String getIntent() { return intent; }
public void setIntent(String intent) { this.intent = intent; }
public String getUrgency() { return urgency; }
public void setUrgency(String urgency) { this.urgency = urgency; }
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getSummary() { return summary; }
public void setSummary(String summary) { this.summary = summary; }
@Override
public String toString() {
return String.format("EmailClassification{intent='%s', urgency='%s', topic='%s', summary='%s'}",
intent, urgency, topic, summary);
}
}
// 配置State各字段更新策略
public static KeyStrategyFactory createKeyStrategyFactory() {
return () -> {
HashMap<String, KeyStrategy> strategies = new HashMap<>();
strategies.put("email_content", new ReplaceStrategy());
strategies.put("sender_email", new ReplaceStrategy());
strategies.put("email_id", new ReplaceStrategy());
strategies.put("classification", new ReplaceStrategy());
strategies.put("search_results", new ReplaceStrategy());
strategies.put("customer_history", new ReplaceStrategy());
strategies.put("draft_response", new ReplaceStrategy());
strategies.put("messages", new AppendStrategy());
strategies.put("next_node", new ReplaceStrategy());
strategies.put("status", new ReplaceStrategy());
strategies.put("review_data", new ReplaceStrategy());
return strategies;
};
}
3.4 步骤四:自定义 Node 节点开发与异常处理
3.4.1 异常处理分类策略
| 错误类型 | 责任方 | 处理策略 | 适用场景 |
|---|---|---|---|
| 瞬时错误(网络/限流) | 系统自动 | 重试机制 | 临时可自愈故障 |
| LLM 可恢复错误(解析/工具调用失败) | LLM 自愈 | 错误写入状态,流程回环重试 | LLM 可感知错误并调整逻辑 |
| 用户可修复错误(缺失参数/信息不明) | 人工介入 | interruptBefore 暂停流程 |
需用户补充信息、审批确认 |
| 意外未知错误 | 开发调试 | 直接抛出异常冒泡 | 不可预知、需线下排查问题 |
3.4.2 典型异常处理示例
LLM 可恢复错误示例:
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.OverAllState;
import java.util.Map;
public class ExecuteToolNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
try {
String toolCall = state.value("tool_call")
.map(v -> (String) v)
.orElse("");
String result = runTool(toolCall);
return Map.of("tool_result", result, "next_node", "agent");
} catch (ToolException e) {
// 错误写入状态,回流给LLM重试
return Map.of("tool_result", "Tool error: " + e.getMessage(), "next_node", "agent");
}
}
private String runTool(String toolCall) { return ""; }
}
用户可修复错误示例:
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.OverAllState;
import java.util.Map;
public class LookupCustomerHistory implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
String customerId = state.value("customer_id")
.map(v -> (String) v)
.orElse(null);
if (customerId == null) {
// 标记状态,框架配置interruptBefore实现流程暂停
return Map.of(
"status", "需要客户ID",
"message", "请提供客户账户ID以查询订阅历史"
);
}
Map<String, Object> customerData = fetchCustomerHistory(customerId);
return Map.of("customer_history", customerData, "next_node", "draft_response");
}
private Map<String, Object> fetchCustomerHistory(String customerId) {
return Map.of("tier", "premium", "since", "2020-01-01");
}
}
意外错误直接冒泡示例:
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.OverAllState;
import java.util.Map;
public class SendReplyNodeExample implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 缺失参数直接抛出,不强行捕获
String response = state.value("draft_response")
.map(v -> (String) v)
.orElseThrow(() -> new IllegalStateException("No draft response"));
emailService.send(response);
return Map.of("status", "sent");
}
private final EmailService emailService = new EmailService();
static class EmailService { public void send(String s) {} }
}
3.4.3 业务节点完整实现
读取邮件 & 意图分类节点
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.OverAllState;
import org.springframework.ai.chat.client.ChatClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
private static final Logger log = LoggerFactory.getLogger(QuickStartExample.class);
// 读取邮件节点
public static class ReadEmailNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
String emailContent = state.value("email_content")
.map(v -> (String) v)
.orElse("");
log.info("Processing email: {}", emailContent);
List<String> messages = new ArrayList<>();
messages.add("Processing email: " + emailContent);
return Map.of("messages", messages);
}
}
// 意图分类节点
public static class ClassifyIntentNode implements NodeAction {
private final ChatClient chatClient;
public ClassifyIntentNode(ChatClient.Builder chatClientBuilder) {
this.chatClient = chatClientBuilder.build();
}
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
String emailContent = state.value("email_content")
.map(v -> (String) v)
.orElseThrow(() -> new IllegalStateException("No email content"));
String senderEmail = state.value("sender_email")
.map(v -> (String) v)
.orElse("unknown");
// 节点内动态拼接Prompt,不存入State
String classificationPrompt = String.format("""
分析这封客户邮件并进行分类:
邮件: %s
发件人: %s
意图可选: question, bug, billing, feature, complex
紧急度可选: low, medium, high, critical
以JSON格式返回: {"intent":"","urgency":"","topic":"","summary":""}
""", emailContent, senderEmail);
String response = chatClient.prompt()
.user(classificationPrompt)
.call()
.content();
EmailClassification classification = parseClassification(response);
String nextNode = routeByClassification(classification);
return Map.of("classification", classification, "next_node", nextNode);
}
// 路由分支判断
private String routeByClassification(EmailClassification classification) {
if ("billing".equals(classification.getIntent()) || "critical".equals(classification.getUrgency())) {
return "human_review";
} else if (List.of("question", "feature").contains(classification.getIntent())) {
return "search_documentation";
} else if ("bug".equals(classification.getIntent())) {
return "bug_tracking";
}
return "draft_response";
}
// 简易JSON正则解析
private EmailClassification parseClassification(String jsonResponse) {
EmailClassification classification = new EmailClassification();
Pattern intentPattern = Pattern.compile("\"intent\"\\s*:\\s*\"([^\"]+)\"");
Pattern urgencyPattern = Pattern.compile("\"urgency\"\\s*:\\s*\"([^\"]+)\"");
Pattern topicPattern = Pattern.compile("\"topic\"\\s*:\\s*\"([^\"]+)\"");
Pattern summaryPattern = Pattern.compile("\"summary\"\\s*:\\s*\"([^\"]+)\"");
Matcher matcher = intentPattern.matcher(jsonResponse);
if (matcher.find()) classification.setIntent(matcher.group(1));
matcher = urgencyPattern.matcher(jsonResponse);
if (matcher.find()) classification.setUrgency(matcher.group(1));
matcher = topicPattern.matcher(jsonResponse);
if (matcher.find()) classification.setTopic(matcher.group(1));
matcher = summaryPattern.matcher(jsonResponse);
if (matcher.find()) classification.setSummary(matcher.group(1));
// 默认兜底
if (classification.getIntent() == null) classification.setIntent("question");
if (classification.getUrgency() == null) classification.setUrgency("medium");
if (classification.getTopic() == null) classification.setTopic("general");
if (classification.getSummary() == null) classification.setSummary("需要处理的客户邮件");
return classification;
}
}
文档搜索 & Bug 跟踪节点
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.OverAllState;
import java.util.Map;
import java.util.List;
// 知识库文档搜索节点
public static class SearchDocumentationNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
EmailClassification classification = state.value("classification")
.map(v -> (EmailClassification) v)
.orElse(new EmailClassification());
String query = classification.getIntent() + " " + classification.getTopic();
try {
List<String> searchResults = List.of(
"通过设置 > 安全 > 更改密码重置密码",
"密码必须至少12个字符",
"包含大小写字母、数字和特殊符号"
);
log.info("Searching documentation for: {}", query);
return Map.of("search_results", searchResults, "next_node", "draft_response");
} catch (Exception e) {
log.warn("Search error: {}", e.getMessage());
List<String> errorResult = List.of("搜索暂时不可用: " + e.getMessage());
return Map.of("search_results", errorResult, "next_node", "draft_response");
}
}
}
// Bug工单跟踪节点
public static class BugTrackingNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
String ticketId = "BUG-12345";
log.info("Created bug ticket: {}", ticketId);
return Map.of(
"search_results", List.of("已创建Bug票据 " + ticketId),
"current_step", "bug_tracked",
"next_node", "draft_response"
);
}
}
回复起草 & 人工审核 & 发送节点
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.OverAllState;
import org.springframework.ai.chat.client.ChatClient;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Optional;
import java.util.stream.Collectors;
// 邮件草稿生成节点
public static class DraftResponseNode implements NodeAction {
private final ChatClient chatClient;
public DraftResponseNode(ChatClient.Builder chatClientBuilder) {
this.chatClient = chatClientBuilder.build();
}
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
EmailClassification classification = state.value("classification")
.map(v -> (EmailClassification) v)
.orElse(new EmailClassification());
String emailContent = state.value("email_content")
.map(v -> (String) v)
.orElse("");
List<String> contextSections = new ArrayList<>();
Optional<List<String>> searchResults = state.value("search_results").map(v -> (List<String>) v);
if (searchResults.isPresent()) {
String formattedDocs = searchResults.get().stream()
.map(doc -> "- " + doc)
.collect(Collectors.joining("\n"));
contextSections.add("相关文档:\n" + formattedDocs);
}
Optional<Map<String, Object>> customerHistory = state.value("customer_history").map(v -> (Map<String, Object>) v);
if (customerHistory.isPresent()) {
Map<String, Object> history = customerHistory.get();
contextSections.add("客户等级: " + history.getOrDefault("tier", "standard"));
}
String draftPrompt = String.format("""
为这封客户邮件起草专业回复:
%s
邮件意图: %s
紧急程度: %s
%s
要求:专业友好、针对性解答、合理引用参考文档
""",
emailContent,
classification.getIntent(),
classification.getUrgency(),
String.join("\n", contextSections)
);
String response = chatClient.prompt().user(draftPrompt).call().content();
boolean needsReview = List.of("high", "critical").contains(classification.getUrgency())
|| "complex".equals(classification.getIntent());
String nextNode = needsReview ? "human_review" : "send_reply";
return Map.of("draft_response", response, "next_node", nextNode);
}
}
// 人工审核节点
public static class HumanReviewNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
EmailClassification classification = state.value("classification")
.map(v -> (EmailClassification) v)
.orElse(new EmailClassification());
Map<String, Object> reviewData = Map.of(
"email_id", state.value("email_id").map(v -> (String) v).orElse(""),
"original_email", state.value("email_content").map(v -> (String) v).orElse(""),
"draft_response", state.value("draft_response").map(v -> (String) v).orElse(""),
"urgency", classification.getUrgency(),
"intent", classification.getIntent(),
"action", "请审核并批准/编辑此响应"
);
log.info("Waiting for human review: {}", reviewData);
return Map.of(
"review_data", reviewData,
"status", "waiting_for_review",
"next_node", "send_reply"
);
}
}
// 发送回复节点
public static class SendReplyNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
String draftResponse = state.value("draft_response")
.map(v -> (String) v)
.orElse("");
log.info("Sending reply: {}...",
draftResponse.length() > 100 ? draftResponse.substring(0, 100) : draftResponse);
return Map.of("status", "sent");
}
}
3.5 步骤五:组装 StateGraph 工作流
构建 Graph 并配置人工中断、状态持久化:
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.checkpoint.config.SaverConfig;
import com.alibaba.cloud.ai.graph.checkpoint.savers.MemorySaver;
import com.alibaba.cloud.ai.graph.CompileConfig;
import com.alibaba.cloud.ai.graph.exception.GraphStateException;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.client.ChatClient;
import static com.alibaba.cloud.ai.graph.StateGraph.END;
import static com.alibaba.cloud.ai.graph.StateGraph.START;
import static com.alibaba.cloud.ai.graph.action.AsyncEdgeAction.edge_async;
import static com.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;
import java.util.Map;
public static CompiledGraph createEmailAgentGraph(ChatModel chatModel) throws GraphStateException {
ChatClient.Builder chatClientBuilder = ChatClient.builder(chatModel);
// 初始化所有节点
var readEmail = node_async(new ReadEmailNode());
var classifyIntent = node_async(new ClassifyIntentNode(chatClientBuilder));
var searchDocumentation = node_async(new SearchDocumentationNode());
var bugTracking = node_async(new BugTrackingNode());
var draftResponse = node_async(new DraftResponseNode(chatClientBuilder));
var humanReview = node_async(new HumanReviewNode());
var sendReply = node_async(new SendReplyNode());
// 构建图
StateGraph workflow = new StateGraph(createKeyStrategyFactory())
.addNode("read_email", readEmail)
.addNode("classify_intent", classifyIntent)
.addNode("search_documentation", searchDocumentation)
.addNode("bug_tracking", bugTracking)
.addNode("draft_response", draftResponse)
.addNode("human_review", humanReview)
.addNode("send_reply", sendReply);
// 基础固定边
workflow.addEdge(START, "read_email");
workflow.addEdge("read_email", "classify_intent");
workflow.addEdge("send_reply", END);
workflow.addEdge("search_documentation", "draft_response");
workflow.addEdge("bug_tracking", "draft_response");
// 条件路由边
workflow.addConditionalEdges("classify_intent",
edge_async(state -> (String) state.value("next_node").orElse("draft_response")),
Map.of(
"search_documentation", "search_documentation",
"bug_tracking", "bug_tracking",
"human_review", "human_review",
"draft_response", "draft_response"
));
workflow.addConditionalEdges("draft_response",
edge_async(state -> (String) state.value("next_node").orElse("send_reply")),
Map.of("human_review", "human_review", "send_reply", "send_reply"));
workflow.addConditionalEdges("human_review",
edge_async(state -> (String) state.value("next_node").orElse("send_reply")),
Map.of("send_reply", "send_reply"));
// 配置内存持久化 + 人工审核前中断
var memory = new MemorySaver();
var compileConfig = CompileConfig.builder()
.saverConfig(SaverConfig.builder().register(memory).build())
.interruptBefore("human_review")
.build();
return workflow.compile(compileConfig);
}
4. 工作流测试
import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.NodeOutput;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
import java.util.Map;
public static void testBillingIssue(CompiledGraph app) throws Exception {
log.info("=== 测试紧急账单问题 ===");
// 初始入参状态
Map<String, Object> initialState = Map.of(
"email_content", "我的订阅被收费两次了!这很紧急!",
"sender_email", "customer@example.com",
"email_id", "email_123",
"messages", new ArrayList<String>()
);
// 指定线程ID,实现状态持久化隔离
var config = RunnableConfig.builder()
.threadId("customer_123")
.build();
// 流式执行,自动在人工审核节点中断
Flux<NodeOutput> stream = app.stream(initialState, config);
stream
.doOnNext(output -> log.info("节点输出: {}", output))
.doOnError(error -> log.error("执行错误: {}", error.getMessage()))
.doOnComplete(() -> log.info("流完成"))
.blockLast();
// 获取中断后的当前状态
var currentState = app.getState(config);
Map<String, Object> stateData = currentState.state().data();
String draftResponse = (String) stateData.get("draft_response");
if (draftResponse != null) {
log.info("Draft ready for review: {}...",
draftResponse.length() > 100 ? draftResponse.substring(0, 100) : draftResponse);
}
// 人工审批编辑,更新状态并恢复流程
var updatedConfig = app.updateState(config, Map.of(
"approved", true,
"edited_response", "我们对重复收费深表歉意,已立即为您启动退款流程..."
), null);
// 继续执行剩余流程
app.stream(null, updatedConfig)
.doOnNext(output -> log.info("节点输出: {}", output))
.doOnError(error -> log.error("执行错误: {}", error.getMessage()))
.doOnComplete(() -> log.info("流完成"))
.blockLast();
// 查看最终结果
var finalState = app.getState(updatedConfig);
String status = (String) finalState.state().data().get("status");
log.info("Email sent successfully! Status: {}", status);
}
5. 核心总结
- 流程拆解:把复杂业务拆分为单一职责的离散
Node,便于调试、复用、断点续跑; - 状态设计:
State只存原始结构化数据,Prompt在节点内动态拼接,解耦更灵活; - 节点即函数:
Node接收状态、执行业务、返回状态更新与路由标识; - 异常工程化:区分瞬时/
LLM/用户/未知四类错误,差异化处理; - 人工介入原生支持:通过
interruptBefore+ 检查点持久化,实现流程暂停、状态保存、随时恢复; - 条件路由轻量化:节点内部决策下一跳,
Graph仅配置基础边与条件映射,结构简洁易维护。
6. 高级扩展方向
- 人工介入进阶:工具调用审批、批量审批、多级审核流程;
- 子图编排:复杂逻辑抽离为子图,嵌套复用;
LLM流式输出:节点实时流式返回内容;- 函数工具集成:对接网络搜索、数据库、第三方
API; - 节点重试与降级:框架级重试、故障自动降级;
- 状态时光旅行:基于检查点回溯历史流程状态;
- 并行分支:互不依赖节点并行执行,提升流程吞吐量。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)