Spring AI 工作流引擎扩展 Human-in-the-Loop 人工审批功能完整实战
Spring AI 工作流引擎扩展 Human-in-the-Loop 人工审批功能完整实战
基于 JDK 原生并发工具实现零依赖的工作流暂停与恢复机制
📖 目录
一、项目背景与需求分析
1.1 原有项目概述
Spring AI 可视化编排实战:构建 LangGraph 风格的 YAML DSL 工作流引擎:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/161255641
本项目基于 Spring AI + Ollama 构建了一个类似 LangGraph 的 YAML DSL 工作流引擎,支持:
- ✅ 工具链自动调用(Tool)
- ✅ LLM 智能对话(LLM)
- ✅ 条件循环控制(While)
- ✅ 并行执行(Parallel)
- ✅ 变量传递与上下文管理
1.2 业务痛点
在真实业务场景中,某些关键决策点需要人工介入:
| 场景 | 问题 |
|---|---|
| 💰 大额支付 | 金额超过阈值时需人工审核 |
| 📝 内容发布 | AI 生成内容需人工审核后才能发布 |
| 🔐 敏感操作 | 删除、修改等关键操作需确认 |
| 👤 权限申请 | 用户申请权限需管理员审批 |
1.3 需求目标
扩展工作流引擎,新增 human_approval 节点类型:
- 工作流暂停: 执行到审批节点时自动挂起
- 等待审批: 外部通过 API 提交审批结果
- 恢复执行: 根据审批结果继续或终止
- 零额外依赖: 仅使用 JDK 自带并发工具
二、技术架构设计
2.1 核心组件
┌─────────────────────────────────────────────────────┐
│ AgentController │
│ /api/workflow/approval (异步启动工作流) │
└──────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ WorkflowEngine │
│ 执行工作流,遇到 human_approval 节点时暂停 │
└──────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ ApprovalManager │
│ - 管理审批请求(ConcurrentHashMap) │
│ - 阻塞等待(CountDownLatch) │
│ - 工作流实例状态跟踪 │
└──────┬───────────────────────────────┬──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌────────────────────┐
│ 工作流线程 │ │ HTTP API 线程 │
│ (阻塞等待) │ │ (提交审批结果) │
└──────────────┘ └────────┬───────────┘
│
▼
┌────────────────────┐
│ ApprovalController │
│ /api/approval/submit│
└────────────────────┘
2.2 文件结构
src/main/java/com/badao/ai/
├── config/
│ └── ToolConfig.java # 新增 ApprovalManager Bean
├── controller/
│ ├── AgentController.java # 新增 /api/workflow/approval
│ └── ApprovalController.java # 新增:审批 API 控制器
├── workflow/
│ ├── ApprovalManager.java # 新增:审批管理器
│ ├── WorkflowEngine.java # 修改:新增 human_approval 节点
│ └── WorkflowDefinition.java # 修改:新增审批节点字段
└── tools/
├── WeatherTool.java
└── TranslateTool.java
src/main/resources/workflows/
├── weather_agent.yaml # 原有示例
├── content_review_workflow.yaml # 新增:内容审核工作流
└── payment_approval_workflow.yaml # 新增:支付审批工作流
三、环境准备与搭建
3.1 前置条件
| 组件 | 版本 | 说明 |
|---|---|---|
| JDK | 17+ | Java 运行环境 |
| Maven | 3.6+ | 项目构建工具 |
| Ollama | 最新版 | 本地大模型运行时 |
| 模型 | qwen2.5:7b-instruct | AI 对话模型 |
3.2 安装 Ollama
Windows:
# 下载安装包
# 访问 https://ollama.com/download
# 运行 OllamaSetup.exe
# 验证安装
ollama --version
Linux/Mac:
curl -fsSL https://ollama.com/install.sh | sh
3.3 下载模型
# 下载 qwen2.5 模型
ollama pull qwen2.5:7b-instruct
# 验证模型
ollama list
四、核心代码实现
4.1 扩展 WorkflowDefinition
文件: WorkflowDefinition.java
新增字段:
public static class Node {
// ... 原有字段
// human_approval 节点专用字段
private String approvalMessage; // 审批说明信息
private long timeoutSeconds = 300; // 审批超时时间(秒)
private String onReject; // 拒绝后跳转的节点ID
// getter/setter
public String getApprovalMessage() { return approvalMessage; }
public void setApprovalMessage(String approvalMessage) {
this.approvalMessage = approvalMessage;
}
// ...
}
完整代码:
package com.badao.ai.workflow;
import java.util.List;
public class WorkflowDefinition {
private String name;
private String start;
private List<Node> nodes;
private List<Edge> edges;
// getters & setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getStart() { return start; }
public void setStart(String start) { this.start = start; }
public List<Node> getNodes() { return nodes; }
public void setNodes(List<Node> nodes) { this.nodes = nodes; }
public List<Edge> getEdges() { return edges; }
public void setEdges(List<Edge> edges) { this.edges = edges; }
// ========== Node ==========
public static class Node {
private String id;
private String type;
private String tool;
private String input;
private String prompt;
private String condition;
private int maxIterations = 5;
private List<Node> body;
private List<ParallelBranch> branches; // 改为 ParallelBranch 列表
// human_approval 节点专用字段
private String approvalMessage; // 审批说明信息
private long timeoutSeconds = 300; // 审批超时时间(秒),默认5分钟
private String onReject; // 拒绝后跳转的节点ID,默认为 end
// getters & setters(请确保所有字段都有)
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getTool() { return tool; }
public void setTool(String tool) { this.tool = tool; }
public String getInput() { return input; }
public void setInput(String input) { this.input = input; }
public String getPrompt() { return prompt; }
public void setPrompt(String prompt) { this.prompt = prompt; }
public String getCondition() { return condition; }
public void setCondition(String condition) { this.condition = condition; }
public int getMaxIterations() { return maxIterations; }
public void setMaxIterations(int maxIterations) { this.maxIterations = maxIterations; }
public List<Node> getBody() { return body; }
public void setBody(List<Node> body) { this.body = body; }
public List<ParallelBranch> getBranches() { return branches; }
public void setBranches(List<ParallelBranch> branches) { this.branches = branches; }
// human_approval 字段的 getter/setter
public String getApprovalMessage() { return approvalMessage; }
public void setApprovalMessage(String approvalMessage) { this.approvalMessage = approvalMessage; }
public long getTimeoutSeconds() { return timeoutSeconds; }
public void setTimeoutSeconds(long timeoutSeconds) { this.timeoutSeconds = timeoutSeconds; }
public String getOnReject() { return onReject; }
public void setOnReject(String onReject) { this.onReject = onReject; }
}
// ========== ParallelBranch ==========
public static class ParallelBranch {
private List<Node> nodes; // 分支内部的节点序列
public List<Node> getNodes() { return nodes; }
public void setNodes(List<Node> nodes) { this.nodes = nodes; }
}
// ========== Edge ==========
public static class Edge {
private String from;
private String to;
public String getFrom() { return from; }
public void setFrom(String from) { this.from = from; }
public String getTo() { return to; }
public void setTo(String to) { this.to = to; }
}
}
4.2 实现 ApprovalManager
文件: ApprovalManager.java
核心功能:
1. 审批请求类
public static class ApprovalRequest {
private final String workflowInstanceId;
private final String nodeId;
private final String message;
private final Map<String, Object> context;
private final CountDownLatch latch; // 并发控制核心
private volatile ApprovalResult result;
public ApprovalRequest(String instanceId, String nodeId,
String message, Map<String, Object> context) {
this.workflowInstanceId = instanceId;
this.nodeId = nodeId;
this.message = message;
this.context = context;
this.latch = new CountDownLatch(1); // 初始计数为1
this.result = null;
}
}
2. 请求审批(阻塞)
public ApprovalResult requestApproval(String instanceId, String nodeId,
String message, Map<String, Object> context,
long timeoutSeconds) throws InterruptedException {
ApprovalRequest request = new ApprovalRequest(instanceId, nodeId, message, context);
pendingApprovals.put(instanceId + ":" + nodeId, request);
// 更新工作流状态
WorkflowInstance instance = workflowInstances.get(instanceId);
if (instance != null) {
instance.setStatus("WAITING_APPROVAL");
instance.setCurrentNode(nodeId);
}
System.out.println("⏸️ 工作流 [" + instanceId + "] 在节点 [" + nodeId + "] 暂停,等待人工审批...");
// 阻塞等待审批结果(核心机制)
boolean completed = request.getLatch().await(timeoutSeconds, TimeUnit.SECONDS);
if (!completed) {
throw new RuntimeException("审批超时(" + timeoutSeconds + "秒)");
}
return request.getResult();
}
3. 提交审批(唤醒)
public boolean submitApproval(String instanceId, String nodeId, ApprovalResult result) {
String key = instanceId + ":" + nodeId;
ApprovalRequest request = pendingApprovals.get(key);
if (request == null) {
return false;
}
request.setResult(result);
request.getLatch().countDown(); // 唤醒阻塞的线程(核心机制)
pendingApprovals.remove(key);
// 更新工作流状态
WorkflowInstance instance = workflowInstances.get(instanceId);
if (instance != null) {
instance.setStatus(result.isApproved() ? "RUNNING" : "REJECTED");
}
return true;
}
完整代码:
package com.badao.ai.workflow;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 审批管理器:管理工作流的暂停与恢复
* 使用 ConcurrentHashMap + CountDownLatch 实现线程安全的审批等待机制
*/
public class ApprovalManager {
/**
* 审批请求信息
*/
public static class ApprovalRequest {
private final String workflowInstanceId;
private final String nodeId;
private final String message;
private final Map<String, Object> context;
private final CountDownLatch latch;
private volatile ApprovalResult result;
public ApprovalRequest(String workflowInstanceId, String nodeId, String message, Map<String, Object> context) {
this.workflowInstanceId = workflowInstanceId;
this.nodeId = nodeId;
this.message = message;
this.context = context;
this.latch = new CountDownLatch(1);
this.result = null;
}
public String getWorkflowInstanceId() {
return workflowInstanceId;
}
public String getNodeId() {
return nodeId;
}
public String getMessage() {
return message;
}
public Map<String, Object> getContext() {
return context;
}
public CountDownLatch getLatch() {
return latch;
}
public ApprovalResult getResult() {
return result;
}
public void setResult(ApprovalResult result) {
this.result = result;
}
}
/**
* 审批结果
*/
public static class ApprovalResult {
private final boolean approved;
private final String comment;
public ApprovalResult(boolean approved, String comment) {
this.approved = approved;
this.comment = comment;
}
public boolean isApproved() {
return approved;
}
public String getComment() {
return comment;
}
public static ApprovalResult approve(String comment) {
return new ApprovalResult(true, comment);
}
public static ApprovalResult reject(String comment) {
return new ApprovalResult(false, comment);
}
}
/**
* 工作流实例状态
*/
public static class WorkflowInstance {
private final String instanceId;
private final String workflowPath;
private final Map<String, Object> context;
private volatile String status; // RUNNING, WAITING_APPROVAL, COMPLETED, REJECTED
private volatile String currentNode;
public WorkflowInstance(String instanceId, String workflowPath, Map<String, Object> context) {
this.instanceId = instanceId;
this.workflowPath = workflowPath;
this.context = context;
this.status = "RUNNING";
this.currentNode = null;
}
public String getInstanceId() {
return instanceId;
}
public String getWorkflowPath() {
return workflowPath;
}
public Map<String, Object> getContext() {
return context;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getCurrentNode() {
return currentNode;
}
public void setCurrentNode(String currentNode) {
this.currentNode = currentNode;
}
}
// 存储所有待审批的请求
private final Map<String, ApprovalRequest> pendingApprovals = new ConcurrentHashMap<>();
// 存储所有工作流实例
private final Map<String, WorkflowInstance> workflowInstances = new ConcurrentHashMap<>();
/**
* 创建工作流实例
*/
public WorkflowInstance createInstance(String instanceId, String workflowPath, Map<String, Object> context) {
WorkflowInstance instance = new WorkflowInstance(instanceId, workflowPath, context);
workflowInstances.put(instanceId, instance);
return instance;
}
/**
* 获取工作流实例
*/
public WorkflowInstance getInstance(String instanceId) {
return workflowInstances.get(instanceId);
}
/**
* 请求人工审批(阻塞当前线程)
*
* @param instanceId 工作流实例ID
* @param nodeId 当前节点ID
* @param message 审批说明
* @param context 当前上下文
* @param timeoutSeconds 超时时间(秒)
* @return 审批结果
*/
public ApprovalResult requestApproval(String instanceId, String nodeId,
String message, Map<String, Object> context,
long timeoutSeconds) throws InterruptedException {
ApprovalRequest request = new ApprovalRequest(instanceId, nodeId, message, context);
pendingApprovals.put(instanceId + ":" + nodeId, request);
// 更新工作流状态
WorkflowInstance instance = workflowInstances.get(instanceId);
if (instance != null) {
instance.setStatus("WAITING_APPROVAL");
instance.setCurrentNode(nodeId);
}
System.out.println("⏸️ 工作流 [" + instanceId + "] 在节点 [" + nodeId + "] 暂停,等待人工审批...");
System.out.println("📋 审批信息: " + message);
// 阻塞等待审批结果
boolean completed = request.getLatch().await(timeoutSeconds, TimeUnit.SECONDS);
if (!completed) {
// 超时
pendingApprovals.remove(instanceId + ":" + nodeId);
throw new RuntimeException("审批超时(" + timeoutSeconds + "秒)");
}
// 返回审批结果
ApprovalResult result = request.getResult();
System.out.println("✅ 审批完成: " + (result.isApproved() ? "通过" : "拒绝") +
" - " + result.getComment());
return result;
}
/**
* 提交审批结果(由外部 API 调用)
*
* @param instanceId 工作流实例ID
* @param nodeId 节点ID
* @param result 审批结果
* @return 是否成功提交
*/
public boolean submitApproval(String instanceId, String nodeId, ApprovalResult result) {
String key = instanceId + ":" + nodeId;
ApprovalRequest request = pendingApprovals.get(key);
if (request == null) {
System.err.println("❌ 未找到审批请求: " + key);
return false;
}
request.setResult(result);
request.getLatch().countDown(); // 唤醒等待的线程
pendingApprovals.remove(key);
// 更新工作流状态
WorkflowInstance instance = workflowInstances.get(instanceId);
if (instance != null) {
if (result.isApproved()) {
instance.setStatus("RUNNING");
} else {
instance.setStatus("REJECTED");
}
}
return true;
}
/**
* 获取所有待审批的请求
*/
public Map<String, ApprovalRequest> getPendingApprovals() {
return new ConcurrentHashMap<>(pendingApprovals);
}
/**
* 获取指定实例的待审批请求
*/
public ApprovalRequest getPendingApproval(String instanceId, String nodeId) {
return pendingApprovals.get(instanceId + ":" + nodeId);
}
/**
* 获取所有工作流实例
*/
public Map<String, WorkflowInstance> getAllInstances() {
return new ConcurrentHashMap<>(workflowInstances);
}
/**
* 移除工作流实例
*/
public void removeInstance(String instanceId) {
workflowInstances.remove(instanceId);
}
}
4.3 扩展 WorkflowEngine
文件: WorkflowEngine.java
1. 注入 ApprovalManager
@Component
public class WorkflowEngine {
private final ChatClient chatClient;
private final Map<String, Function<String, String>> toolRegistry;
private final ApprovalManager approvalManager; // 新增
public WorkflowEngine(ChatClient chatClient,
Map<String, Function<String, String>> toolRegistry,
ApprovalManager approvalManager) {
this.chatClient = chatClient;
this.toolRegistry = toolRegistry;
this.approvalManager = approvalManager;
}
}
2. 新增节点分发
private String executeNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf, String instanceId) {
switch (node.getType()) {
case "start":
return getNextNodeId(wf, node.getId());
case "tool":
return executeToolNode(node, context, wf);
case "llm":
return executeLlmNode(node, context, wf);
case "while":
return executeWhileNode(node, context, wf, instanceId);
case "parallel":
return executeParallelNode(node, context, wf, instanceId);
case "human_approval": // 新增
return executeHumanApprovalNode(node, context, wf, instanceId);
default:
return getNextNodeId(wf, node.getId());
}
}
3. 实现审批节点执行
private String executeHumanApprovalNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf, String instanceId) {
String approvalMessage = node.getApprovalMessage();
if (approvalMessage == null || approvalMessage.trim().isEmpty()) {
approvalMessage = "请审批节点: " + node.getId();
}
long timeoutSeconds = node.getTimeoutSeconds();
if (timeoutSeconds <= 0) {
timeoutSeconds = 300; // 默认5分钟
}
try {
// 请求审批(这会阻塞当前线程)
ApprovalManager.ApprovalResult result = approvalManager.requestApproval(
instanceId,
node.getId(),
approvalMessage,
context,
timeoutSeconds
);
// 保存审批结果到上下文
String approvalOutput = String.format(
"审批结果: %s | 审批意见: %s",
result.isApproved() ? "通过" : "拒绝",
result.getComment()
);
saveOutput(node.getId(), approvalOutput, context);
context.put(node.getId() + ".approved", result.isApproved());
context.put(node.getId() + ".comment", result.getComment());
// 根据审批结果决定下一步
if (result.isApproved()) {
System.out.println("✅ 审批通过,继续执行");
return getNextNodeId(wf, node.getId());
} else {
System.out.println("❌ 审批拒绝");
String onReject = node.getOnReject();
return (onReject != null && !onReject.isEmpty()) ? onReject : "end";
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("审批被中断", e);
}
}
完整代码:
package com.badao.ai.workflow;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
public class WorkflowEngine {
private final ChatClient chatClient;
private final Map<String, Function<String, String>> toolRegistry;
private final ApprovalManager approvalManager;
public WorkflowEngine(ChatClient chatClient,
Map<String, Function<String, String>> toolRegistry,
ApprovalManager approvalManager) {
this.chatClient = chatClient;
this.toolRegistry = toolRegistry;
this.approvalManager = approvalManager;
}
// ---------- 工作流执行入口 ----------
public String execute(String workflowPath, Map<String, Object> initialContext) {
return execute(workflowPath, initialContext, "workflow-" + System.currentTimeMillis());
}
/**
* 执行工作流(带实例ID)
*/
public String execute(String workflowPath, Map<String, Object> initialContext, String instanceId) {
WorkflowDefinition wf = loadWorkflow(workflowPath);
Map<String, Object> context = new HashMap<>(initialContext);
context.put("_nodes", new HashMap<String, Object>());
// 创建工作流实例
ApprovalManager.WorkflowInstance instance =
approvalManager.createInstance(instanceId, workflowPath, context);
String currentNode = wf.getStart();
while (currentNode != null && !currentNode.equals("end")) {
WorkflowDefinition.Node node = findNodeById(wf, currentNode);
if (node == null) break;
System.out.println("Executing node: " + node.getId());
currentNode = executeNode(node, context, wf, instanceId);
// 检查工作流是否被拒绝
if (instance.getStatus().equals("REJECTED")) {
System.out.println("⛔ 工作流 [" + instanceId + "] 被拒绝,终止执行");
return "工作流被拒绝: " +
(instance.getCurrentNode() != null ? instance.getCurrentNode() : "unknown");
}
}
// 找到工作流中最后一个非 end、非 start 的节点,并返回其输出
String lastOutput = null;
Map<String, Object> nodesOutput = (Map<String, Object>) context.get("_nodes");
if (nodesOutput != null) {
// 按边的顺序找到最后一个被执行的节点(或直接取最后一个节点的 id)
for (WorkflowDefinition.Edge edge : wf.getEdges()) {
if ("end".equals(edge.getTo())) {
lastOutput = nodesOutput.getOrDefault(edge.getFrom(), "").toString();
break;
}
}
}
// 更新工作流状态为完成
instance.setStatus("COMPLETED");
return lastOutput != null ? lastOutput : "No output generated";
}
// ---------- 节点分发 ----------
private String executeNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf, String instanceId) {
switch (node.getType()) {
case "start":
return getNextNodeId(wf, node.getId());
case "tool":
return executeToolNode(node, context, wf);
case "llm":
return executeLlmNode(node, context, wf);
case "while":
return executeWhileNode(node, context, wf, instanceId);
case "parallel":
return executeParallelNode(node, context, wf, instanceId);
case "human_approval":
return executeHumanApprovalNode(node, context, wf, instanceId);
default:
return getNextNodeId(wf, node.getId());
}
}
// ---------- 工具节点 ----------
private String executeToolNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf) {
Function<String, String> tool = toolRegistry.get(node.getTool());
if (tool == null) throw new RuntimeException("Tool not found: " + node.getTool());
String resolvedInput = resolveExpression(node.getInput(), context);
String result = tool.apply(resolvedInput);
saveOutput(node.getId(), result, context);
return getNextNodeId(wf, node.getId());
}
// ---------- LLM 节点 ----------
private String executeLlmNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf) {
String resolvedPrompt = resolveExpression(node.getPrompt(), context);
String response = chatClient.prompt().user(resolvedPrompt).call().content();
saveOutput(node.getId(), response, context);
return getNextNodeId(wf, node.getId());
}
// ---------- While 循环 ----------
private String executeWhileNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf, String instanceId) {
int count = 0;
while (count < node.getMaxIterations()) {
// 先解析条件
String condition = resolveExpression(node.getCondition(), context);
// 条件不满足(null、空、false)则直接退出
if (condition == null || !"true".equalsIgnoreCase(condition)) {
break;
}
// 条件满足,执行循环体
for (WorkflowDefinition.Node bodyNode : node.getBody()) {
executeNode(bodyNode, context, wf, instanceId);
}
count++;
}
return getNextNodeId(wf, node.getId());
}
// ---------- 并行节点 ----------
private String executeParallelNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf, String instanceId) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (WorkflowDefinition.ParallelBranch branch : node.getBranches()) {
List<WorkflowDefinition.Node> branchNodes = branch.getNodes(); // 获取分支内的节点列表
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
Map<String, Object> branchContext = new HashMap<>(context);
for (WorkflowDefinition.Node branchNode : branchNodes) {
executeNode(branchNode, branchContext, wf, instanceId);
}
synchronized (context) {
context.putAll(branchContext);
}
});
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return getNextNodeId(wf, node.getId());
}
// ---------- 人工审批节点 ----------
private String executeHumanApprovalNode(WorkflowDefinition.Node node, Map<String, Object> context,
WorkflowDefinition wf, String instanceId) {
String approvalMessage = node.getApprovalMessage();
if (approvalMessage == null || approvalMessage.trim().isEmpty()) {
approvalMessage = "请审批节点: " + node.getId();
}
long timeoutSeconds = node.getTimeoutSeconds();
if (timeoutSeconds <= 0) {
timeoutSeconds = 300; // 默认5分钟
}
try {
// 请求审批(这会阻塞当前线程)
ApprovalManager.ApprovalResult result = approvalManager.requestApproval(
instanceId,
node.getId(),
approvalMessage,
context,
timeoutSeconds
);
// 保存审批结果到上下文
String approvalOutput = String.format(
"审批结果: %s | 审批意见: %s",
result.isApproved() ? "通过" : "拒绝",
result.getComment()
);
saveOutput(node.getId(), approvalOutput, context);
context.put(node.getId() + ".approved", result.isApproved());
context.put(node.getId() + ".comment", result.getComment());
// 根据审批结果决定下一步
if (result.isApproved()) {
System.out.println("✅ 审批通过,继续执行");
return getNextNodeId(wf, node.getId());
} else {
System.out.println("❌ 审批拒绝");
// 如果配置了 onReject,跳转到指定节点,否则结束工作流
String onReject = node.getOnReject();
return (onReject != null && !onReject.isEmpty()) ? onReject : "end";
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("审批被中断", e);
} catch (RuntimeException e) {
throw new RuntimeException("审批失败: " + e.getMessage(), e);
}
}
// ---------- 辅助方法 ----------
private WorkflowDefinition.Node findNodeById(WorkflowDefinition wf, String id) {
return wf.getNodes().stream().filter(n -> n.getId().equals(id)).findFirst().orElse(null);
}
private String getNextNodeId(WorkflowDefinition wf, String currentId) {
return wf.getEdges().stream()
.filter(e -> e.getFrom().equals(currentId))
.map(WorkflowDefinition.Edge::getTo)
.findFirst().orElse("end");
}
private void saveOutput(String nodeId, String output, Map<String, Object> context) {
((Map<String, Object>) context.get("_nodes")).put(nodeId, output);
context.put(nodeId + ".output", output);
}
private String resolveExpression(String expr, Map<String, Object> context) {
if (expr == null) return null;
Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}");
Matcher matcher = pattern.matcher(expr);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
String var = matcher.group(1);
Object value = context.get(var);
if (value == null) {
Map<String, Object> nodes = (Map<String, Object>) context.get("_nodes");
if (nodes != null) value = nodes.get(var);
}
matcher.appendReplacement(sb, value != null ? value.toString() : "");
}
matcher.appendTail(sb);
return sb.toString();
}
private WorkflowDefinition loadWorkflow(String path) {
Yaml yaml = new Yaml();
InputStream in = getClass().getResourceAsStream(path);
return yaml.loadAs(in, WorkflowDefinition.class);
}
}
4.4 创建 ApprovalController
文件: ApprovalController.java
@RestController
@RequestMapping("/api/approval")
public class ApprovalController {
private final ApprovalManager approvalManager;
public ApprovalController(ApprovalManager approvalManager) {
this.approvalManager = approvalManager;
}
// 提交审批结果
@PostMapping("/submit")
public ResponseEntity<Map<String, Object>> submitApproval(
@RequestBody ApprovalRequest request) {
// ...
}
// 查看待审批列表
@GetMapping("/pending")
public ResponseEntity<Map<String, Object>> getPendingApprovals() {
// ...
}
// 查看工作流实例
@GetMapping("/instances")
public ResponseEntity<Map<String, Object>> getInstances() {
// ...
}
}
4.5 配置 Bean
文件: ToolConfig.java
@Configuration
public class ToolConfig {
@Bean
public ApprovalManager approvalManager() {
return new ApprovalManager();
}
// ... 其他 Bean
}
五、关键知识点解析
5.1 CountDownLatch 原理
什么是 CountDownLatch?
CountDownLatch 是 JDK java.util.concurrent 包中的同步辅助类,允许一个或多个线程等待其他线程完成操作。
核心方法
| 方法 | 说明 |
|---|---|
CountDownLatch(int count) |
构造函数,设置初始计数 |
await() |
阻塞等待,直到计数为0 |
await(long timeout, TimeUnit unit) |
带超时的阻塞等待 |
countDown() |
计数减1 |
getCount() |
获取当前计数 |
工作原理
初始状态: count = 1
工作流线程: HTTP API 线程:
↓ ↓
latch.await() ←── 阻塞等待 ───┐
↓ │
│ latch.countDown()
│ │
│ count = 0
│ │
│ 唤醒等待的线程
↓ ↓
继续执行...
代码示例
// 1. 创建 CountDownLatch,初始计数为1
CountDownLatch latch = new CountDownLatch(1);
// 2. 工作流线程阻塞等待
new Thread(() -> {
System.out.println("等待审批...");
latch.await(); // 阻塞,直到 countDown 被调用
System.out.println("审批完成,继续执行");
}).start();
// 3. HTTP API 线程提交审批
Thread.sleep(2000); // 模拟审批时间
System.out.println("提交审批结果");
latch.countDown(); // 计数减1,唤醒阻塞的线程
5.2 ConcurrentHashMap 原理
为什么使用 ConcurrentHashMap?
| 对比项 | HashMap | ConcurrentHashMap |
|---|---|---|
| 线程安全 | ❌ 否 | ✅ 是 |
| 并发读写 | ❌ 会出现问题 | ✅ 支持 |
| 性能 | 高 | 较高(分段锁/CAS) |
| Null 值 | 允许 | 不允许 |
JDK 8+ 实现原理
ConcurrentHashMap 结构:
┌─────────────────────────────────────┐
│ 数组 (Node[]) │
├──────────┬──────────┬───────────────┤
│ Node[0] │ Node[1] │ Node[2] ... │
│ ↓ │ ↓ │ │
│ 链表/树 │ 链表/树 │ │
└──────────┴──────────┴───────────────┘
并发控制机制:
- 读操作: 无锁(volatile 保证可见性)
- 写操作: CAS + synchronized(只锁当前桶)
- 扩容: 多线程协助扩容
在审批系统中的应用
// 存储待审批请求
private final Map<String, ApprovalRequest> pendingApprovals = new ConcurrentHashMap<>();
// 存储工作流实例
private final Map<String, WorkflowInstance> workflowInstances = new ConcurrentHashMap<>();
// 多线程安全访问
pendingApprovals.put(key, request); // HTTP 线程写入
ApprovalRequest req = pendingApprovals.get(key); // 工作流线程读取
5.3 volatile 关键字
作用
- 保证可见性: 一个线程修改了 volatile 变量,其他线程立即可见
- 禁止指令重排序: 防止编译器和处理器对指令重排序
在审批系统中的应用
public static class ApprovalRequest {
private volatile ApprovalResult result; // 使用 volatile
public void setResult(ApprovalResult result) {
this.result = result; // HTTP 线程写入
}
public ApprovalResult getResult() {
return result; // 工作流线程读取,保证看到最新值
}
}
为什么需要 volatile?
没有 volatile 的问题:
HTTP 线程: 工作流线程:
↓ ↓
result = new ApprovalResult() │
↓ │
(修改在 CPU 缓存中) │
↓
result == null? ← 可能看不到更新!
使用 volatile 后:
HTTP 线程: 工作流线程:
↓ ↓
result = new ApprovalResult() │
↓ │
(强制刷新到主内存) │
↓
result != null ← 一定能看到更新!
5.4 异步执行机制
为什么需要异步?
同步执行的问题:
HTTP 请求 → 启动工作流 → 遇到审批节点 → 阻塞等待 → ... (可能几分钟)
↓
HTTP 超时!客户端等待太久
异步执行的解决方案:
HTTP 请求 → 启动工作流(新线程) → 立即返回
↓
工作流在后台运行
↓
遇到审批节点 → 阻塞等待(不影响 HTTP 响应)
↓
外部 API 提交审批 → 工作流继续
实现代码
@PostMapping("/workflow/approval")
public Map<String, Object> runWorkflowWithApproval(@RequestBody Map<String, String> input) {
String instanceId = input.getOrDefault("instanceId", "workflow-" + System.currentTimeMillis());
// 在新线程中执行工作流
Thread workflowThread = new Thread(() -> {
try {
String result = engine.execute(workflow, context, instanceId);
System.out.println("✅ 工作流 [" + instanceId + "] 执行完成");
} catch (Exception e) {
System.err.println("❌ 工作流执行失败: " + e.getMessage());
}
});
workflowThread.setName("workflow-" + instanceId);
workflowThread.start();
// 立即返回响应
Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("instanceId", instanceId);
response.put("message", "工作流已启动");
return response;
}
5.5 YAML DSL 解析
SnakeYAML 库
Spring Boot 默认包含 SnakeYAML 依赖,用于解析 YAML 文件。
解析流程
YAML 文件: Java 对象:
name: "content_review" → WorkflowDefinition
start: "start_node" ├─ name: "content_review"
nodes: ├─ start: "start_node"
- id: "start_node" └─ nodes:
type: "start" ├─ Node(id="start_node", type="start")
- id: "review" └─ Node(id="review", type="human_approval")
type: "human_approval"
approvalMessage: "请审核"
代码实现
private WorkflowDefinition loadWorkflow(String path) {
Yaml yaml = new Yaml();
InputStream in = getClass().getResourceAsStream(path);
return yaml.loadAs(in, WorkflowDefinition.class);
}
六、完整使用流程
6.1 启动项目
# 1. 启动 Ollama
ollama serve
# 2. 启动 Spring Boot 应用
mvn spring-boot:run
6.2 启动工作流
Windows PowerShell:
Invoke-RestMethod -Uri "http://localhost:886/api/workflow/approval" `
-Method POST -ContentType "application/json" -Body @{
workflow = "/workflows/content_review_workflow.yaml"
instanceId = "test-001"
product_name = "智能手表Pro"
} | ConvertTo-Json
Linux/Mac:
curl -X POST http://localhost:886/api/workflow/approval \
-H "Content-Type: application/json" \
-d '{
"workflow": "/workflows/content_review_workflow.yaml",
"instanceId": "test-001",
"product_name": "智能手表Pro"
}'
响应:
{
"instanceId": "test-001",
"success": true,
"message": "工作流已启动,请通过 /api/approval/pending 查看待审批请求"
}

6.3 查看待审批
curl http://localhost:886/api/approval/pending
控制台输出:
Executing node: start_node
Executing node: generate_content
Executing node: content_review
⏸️ 工作流 [test-001] 在节点 [content_review] 暂停,等待人工审批...
📋 审批信息: 请审核AI生成的营销文案是否合适发布

6.4 提交审批
审批通过:
curl -X POST http://localhost:886/api/approval/submit \
-H "Content-Type: application/json" \
-d '{
"instanceId": "test-001",
"nodeId": "content_review",
"approved": true,
"comment": "文案很好,可以发布"
}'

审批拒绝:
curl -X POST http://localhost:886/api/approval/submit \
-H "Content-Type: application/json" \
-d '{
"instanceId": "test-001",
"nodeId": "content_review",
"approved": false,
"comment": "语气过于夸张,需要更专业"
}'
6.5 观察执行
审批通过后:
✅ 审批完成: 通过 - 文案很好,可以发布
✅ 审批通过,继续执行
Executing node: translate_to_english
Executing node: final_output
✅ 工作流 [test-001] 执行完成: ...

七、实战场景示例
7.1 内容审核工作流
文件: content_review_workflow.yaml
name: "content_review_workflow"
start: "start_node"
nodes:
- id: "start_node"
type: "start"
# AI 生成内容
- id: "generate_content"
type: "llm"
prompt: "请为产品 ${product_name} 生成营销文案..."
# 人工审核
- id: "content_review"
type: "human_approval"
approvalMessage: "请审核AI生成的营销文案是否合适发布"
timeoutSeconds: 300
onReject: "regenerate_content"
# 审批通过:翻译为英文
- id: "translate_to_english"
type: "llm"
prompt: "请将以下中文翻译为英文:${generate_content.output}"
# 审批拒绝:重新生成
- id: "regenerate_content"
type: "llm"
prompt: "根据拒绝意见重新生成:${content_review.comment}"
edges:
- from: "start_node"
to: "generate_content"
- from: "generate_content"
to: "content_review"
- from: "content_review"
to: "translate_to_english"
- from: "translate_to_english"
to: "end"
7.2 大额支付审批
文件: payment_approval_workflow.yaml
name: "payment_approval_workflow"
nodes:
# 查询订单
- id: "get_order_info"
type: "llm"
prompt: "查询订单 ${order_id} 详情..."
# 检查金额
- id: "check_amount"
type: "llm"
prompt: "订单金额是否超过1000元?回复 true 或 false"
# 条件审批
- id: "need_approval_check"
type: "while"
condition: "${check_amount.output}"
maxIterations: 1
body:
- id: "human_approval"
type: "human_approval"
approvalMessage: "订单金额较大,请人工审批"
timeoutSeconds: 600
onReject: "reject_notification"
# 审批通过:处理支付
- id: "process_payment"
type: "llm"
prompt: "订单已通过审批,处理支付..."
八、最佳实践与注意事项
8.1 超时设置
# ✅ 推荐:根据业务场景设置合理超时
- id: "content_review"
type: "human_approval"
timeoutSeconds: 300 # 5分钟
# ❌ 不推荐:超时时间过短
- id: "content_review"
timeoutSeconds: 10 # 10秒太短
# ❌ 不推荐:不设超时(会永久阻塞)
- id: "content_review"
# 缺少 timeoutSeconds
8.2 拒绝后跳转
# ✅ 推荐:配置拒绝后的处理逻辑
- id: "human_approval"
onReject: "regenerate_content"
# ❌ 不推荐:不配置(工作流直接结束)
- id: "human_approval"
# 缺少 onReject
8.3 并发安全
// ✅ 推荐:使用 ConcurrentHashMap
private final Map<String, ApprovalRequest> pendingApprovals = new ConcurrentHashMap<>();
// ❌ 不推荐:使用 HashMap(线程不安全)
private final Map<String, ApprovalRequest> pendingApprovals = new HashMap<>();
8.4 状态可见性
// ✅ 推荐:使用 volatile
private volatile ApprovalResult result;
// ❌ 不推荐:不使用 volatile(可能看不到更新)
private ApprovalResult result;
8.5 异步执行
// ✅ 推荐:异步执行工作流
Thread workflowThread = new Thread(() -> {
engine.execute(workflow, context, instanceId);
});
workflowThread.start();
// ❌ 不推荐:同步执行(会阻塞 HTTP 响应)
engine.execute(workflow, context, instanceId);
九、扩展方向
9.1 持久化
// 将审批记录存储到数据库
public void recordApproval(String instanceId, String nodeId, ApprovalResult result) {
ApprovalRecord record = new ApprovalRecord();
record.setInstanceId(instanceId);
record.setNodeId(nodeId);
record.setApproved(result.isApproved());
record.setComment(result.getComment());
record.setCreateTime(LocalDateTime.now());
approvalRecordRepository.save(record);
}
9.2 消息通知
// 发送邮件通知
public void notifyApprover(String instanceId, String message) {
SimpleMailMessage mailMessage = new SimpleMailMessage();
mailMessage.setTo("approver@example.com");
mailMessage.setSubject("工作流审批请求");
mailMessage.setText("实例ID: " + instanceId + "\n审批信息: " + message);
mailSender.send(mailMessage);
}
9.3 多级审批
// 支持多级审批流程
public ApprovalResult requestMultiLevelApproval(
String instanceId,
List<String> approverRoles) {
for (String role : approverRoles) {
ApprovalResult result = waitForRoleApproval(instanceId, role);
if (!result.isApproved()) {
return result;
}
}
return ApprovalResult.approve("多级审批全部通过");
}
9.4 WebSocket 推送
// 实时推送审批请求
@Autowired
private SimpMessagingTemplate messagingTemplate;
public void notifyApprover(String instanceId, String message) {
messagingTemplate.convertAndSend(
"/topic/approval/" + instanceId,
new ApprovalNotification(instanceId, message)
);
}
十、总结
10.1 核心成果
通过本次实战,我们成功实现了:
✅ 工作流暂停与恢复: 使用 CountDownLatch 实现线程安全的阻塞/唤醒
✅ 灵活配置: YAML 中配置审批信息和拒绝后跳转
✅ REST API: 完整的审批管理接口
✅ 状态跟踪: 实时查看工作流实例状态
✅ 超时处理: 防止无限期等待
✅ 零额外依赖: 仅使用 JDK 自带并发工具
10.2 技术亮点
| 技术点 | 说明 |
|---|---|
| CountDownLatch | JDK 并发工具,实现线程阻塞/唤醒 |
| ConcurrentHashMap | 线程安全的 Map,支持高并发访问 |
| volatile | 保证变量可见性,防止指令重排序 |
| 异步执行 | 新线程运行工作流,避免阻塞 HTTP 响应 |
| YAML DSL | 声明式配置,易于理解和维护 |
10.3 适用场景
- 💰 大额支付审批
- 📝 内容审核发布
- 🔐 敏感操作确认
- 👤 权限申请审批
- 📋 订单人工复核
10.4 学习价值
通过本项目,你可以掌握:
- JDK 并发编程: CountDownLatch、ConcurrentHashMap、volatile
- Spring Boot 开发: REST API、Bean 配置、依赖注入
- Spring AI 集成: ChatClient、工具链、工作流引擎
- YAML DSL 设计: 声明式配置、节点编排、变量传递
- 异步编程模式: 线程管理、阻塞/唤醒、状态同步
📚 相关资源
- Spring AI 官方文档: https://docs.spring.io/spring-ai/reference
- JDK 并发工具: https://docs.oracle.com/javase/tutorial/essential/concurrency
技术栈: Spring Boot 3.3.3 + Spring AI 1.1.2 + Ollama
并发工具: JDK ConcurrentHashMap + CountDownLatch
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)