Spring AI 工作流引擎扩展 Human-in-the-Loop 人工审批功能完整实战

基于 JDK 原生并发工具实现零依赖的工作流暂停与恢复机制


📖 目录


一、项目背景与需求分析

1.1 原有项目概述

Spring AI 可视化编排实战:构建 LangGraph 风格的 YAML DSL 工作流引擎:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/161255641

本项目基于 Spring AI + Ollama 构建了一个类似 LangGraph 的 YAML DSL 工作流引擎,支持:

  • ✅ 工具链自动调用(Tool)
  • ✅ LLM 智能对话(LLM)
  • ✅ 条件循环控制(While)
  • ✅ 并行执行(Parallel)
  • ✅ 变量传递与上下文管理

1.2 业务痛点

在真实业务场景中,某些关键决策点需要人工介入

场景 问题
💰 大额支付 金额超过阈值时需人工审核
📝 内容发布 AI 生成内容需人工审核后才能发布
🔐 敏感操作 删除、修改等关键操作需确认
👤 权限申请 用户申请权限需管理员审批

1.3 需求目标

扩展工作流引擎,新增 human_approval 节点类型:

  1. 工作流暂停: 执行到审批节点时自动挂起
  2. 等待审批: 外部通过 API 提交审批结果
  3. 恢复执行: 根据审批结果继续或终止
  4. 零额外依赖: 仅使用 JDK 自带并发工具

二、技术架构设计

2.1 核心组件

┌─────────────────────────────────────────────────────┐
│                  AgentController                     │
│   /api/workflow/approval (异步启动工作流)             │
└──────────────────┬──────────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────────┐
│                  WorkflowEngine                      │
│   执行工作流,遇到 human_approval 节点时暂停           │
└──────────────────┬──────────────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────────────┐
│                 ApprovalManager                      │
│   - 管理审批请求(ConcurrentHashMap)                 │
│   - 阻塞等待(CountDownLatch)                        │
│   - 工作流实例状态跟踪                                │
└──────┬───────────────────────────────┬──────────────┘
       │                               │
       ▼                               ▼
┌──────────────┐            ┌────────────────────┐
│ 工作流线程    │            │  HTTP API 线程      │
│ (阻塞等待)    │            │ (提交审批结果)      │
└──────────────┘            └────────┬───────────┘
                                     │
                                     ▼
                          ┌────────────────────┐
                          │ ApprovalController  │
                          │ /api/approval/submit│
                          └────────────────────┘

2.2 文件结构

src/main/java/com/badao/ai/
├── config/
│   └── ToolConfig.java                    # 新增 ApprovalManager Bean
├── controller/
│   ├── AgentController.java               # 新增 /api/workflow/approval
│   └── ApprovalController.java            # 新增:审批 API 控制器
├── workflow/
│   ├── ApprovalManager.java               # 新增:审批管理器
│   ├── WorkflowEngine.java                # 修改:新增 human_approval 节点
│   └── WorkflowDefinition.java            # 修改:新增审批节点字段
└── tools/
    ├── WeatherTool.java
    └── TranslateTool.java

src/main/resources/workflows/
├── weather_agent.yaml                     # 原有示例
├── content_review_workflow.yaml           # 新增:内容审核工作流
└── payment_approval_workflow.yaml         # 新增:支付审批工作流

三、环境准备与搭建

3.1 前置条件

组件 版本 说明
JDK 17+ Java 运行环境
Maven 3.6+ 项目构建工具
Ollama 最新版 本地大模型运行时
模型 qwen2.5:7b-instruct AI 对话模型

3.2 安装 Ollama

Windows:

# 下载安装包
# 访问 https://ollama.com/download
# 运行 OllamaSetup.exe

# 验证安装
ollama --version

Linux/Mac:

curl -fsSL https://ollama.com/install.sh | sh

3.3 下载模型

# 下载 qwen2.5 模型
ollama pull qwen2.5:7b-instruct

# 验证模型
ollama list

四、核心代码实现

4.1 扩展 WorkflowDefinition

文件: WorkflowDefinition.java

新增字段:

public static class Node {
    // ... 原有字段
    
    // human_approval 节点专用字段
    private String approvalMessage;      // 审批说明信息
    private long timeoutSeconds = 300;   // 审批超时时间(秒)
    private String onReject;             // 拒绝后跳转的节点ID
    
    // getter/setter
    public String getApprovalMessage() { return approvalMessage; }
    public void setApprovalMessage(String approvalMessage) { 
        this.approvalMessage = approvalMessage; 
    }
    // ...
}

完整代码:

package com.badao.ai.workflow;

import java.util.List;

public class WorkflowDefinition {
    private String name;
    private String start;
    private List<Node> nodes;
    private List<Edge> edges;

    // getters & setters
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }

    public String getStart() { return start; }
    public void setStart(String start) { this.start = start; }

    public List<Node> getNodes() { return nodes; }
    public void setNodes(List<Node> nodes) { this.nodes = nodes; }

    public List<Edge> getEdges() { return edges; }
    public void setEdges(List<Edge> edges) { this.edges = edges; }

    // ========== Node ==========
    public static class Node {
        private String id;
        private String type;
        private String tool;
        private String input;
        private String prompt;
        private String condition;
        private int maxIterations = 5;
        private List<Node> body;
        private List<ParallelBranch> branches;  // 改为 ParallelBranch 列表
        
        // human_approval 节点专用字段
        private String approvalMessage;  // 审批说明信息
        private long timeoutSeconds = 300;  // 审批超时时间(秒),默认5分钟
        private String onReject;  // 拒绝后跳转的节点ID,默认为 end

        // getters & setters(请确保所有字段都有)
        public String getId() { return id; }
        public void setId(String id) { this.id = id; }
        public String getType() { return type; }
        public void setType(String type) { this.type = type; }
        public String getTool() { return tool; }
        public void setTool(String tool) { this.tool = tool; }
        public String getInput() { return input; }
        public void setInput(String input) { this.input = input; }
        public String getPrompt() { return prompt; }
        public void setPrompt(String prompt) { this.prompt = prompt; }
        public String getCondition() { return condition; }
        public void setCondition(String condition) { this.condition = condition; }
        public int getMaxIterations() { return maxIterations; }
        public void setMaxIterations(int maxIterations) { this.maxIterations = maxIterations; }
        public List<Node> getBody() { return body; }
        public void setBody(List<Node> body) { this.body = body; }
        public List<ParallelBranch> getBranches() { return branches; }
        public void setBranches(List<ParallelBranch> branches) { this.branches = branches; }
        
        // human_approval 字段的 getter/setter
        public String getApprovalMessage() { return approvalMessage; }
        public void setApprovalMessage(String approvalMessage) { this.approvalMessage = approvalMessage; }
        public long getTimeoutSeconds() { return timeoutSeconds; }
        public void setTimeoutSeconds(long timeoutSeconds) { this.timeoutSeconds = timeoutSeconds; }
        public String getOnReject() { return onReject; }
        public void setOnReject(String onReject) { this.onReject = onReject; }
    }

    // ========== ParallelBranch ==========
    public static class ParallelBranch {
        private List<Node> nodes;   // 分支内部的节点序列

        public List<Node> getNodes() { return nodes; }
        public void setNodes(List<Node> nodes) { this.nodes = nodes; }
    }

    // ========== Edge ==========
    public static class Edge {
        private String from;
        private String to;

        public String getFrom() { return from; }
        public void setFrom(String from) { this.from = from; }
        public String getTo() { return to; }
        public void setTo(String to) { this.to = to; }
    }
}

4.2 实现 ApprovalManager

文件: ApprovalManager.java

核心功能:

1. 审批请求类
public static class ApprovalRequest {
    private final String workflowInstanceId;
    private final String nodeId;
    private final String message;
    private final Map<String, Object> context;
    private final CountDownLatch latch;  // 并发控制核心
    private volatile ApprovalResult result;
    
    public ApprovalRequest(String instanceId, String nodeId, 
                          String message, Map<String, Object> context) {
        this.workflowInstanceId = instanceId;
        this.nodeId = nodeId;
        this.message = message;
        this.context = context;
        this.latch = new CountDownLatch(1);  // 初始计数为1
        this.result = null;
    }
}
2. 请求审批(阻塞)
public ApprovalResult requestApproval(String instanceId, String nodeId, 
                                     String message, Map<String, Object> context,
                                     long timeoutSeconds) throws InterruptedException {
    
    ApprovalRequest request = new ApprovalRequest(instanceId, nodeId, message, context);
    pendingApprovals.put(instanceId + ":" + nodeId, request);
    
    // 更新工作流状态
    WorkflowInstance instance = workflowInstances.get(instanceId);
    if (instance != null) {
        instance.setStatus("WAITING_APPROVAL");
        instance.setCurrentNode(nodeId);
    }
    
    System.out.println("⏸️  工作流 [" + instanceId + "] 在节点 [" + nodeId + "] 暂停,等待人工审批...");
    
    // 阻塞等待审批结果(核心机制)
    boolean completed = request.getLatch().await(timeoutSeconds, TimeUnit.SECONDS);
    
    if (!completed) {
        throw new RuntimeException("审批超时(" + timeoutSeconds + "秒)");
    }
    
    return request.getResult();
}
3. 提交审批(唤醒)
public boolean submitApproval(String instanceId, String nodeId, ApprovalResult result) {
    String key = instanceId + ":" + nodeId;
    ApprovalRequest request = pendingApprovals.get(key);
    
    if (request == null) {
        return false;
    }
    
    request.setResult(result);
    request.getLatch().countDown();  // 唤醒阻塞的线程(核心机制)
    pendingApprovals.remove(key);
    
    // 更新工作流状态
    WorkflowInstance instance = workflowInstances.get(instanceId);
    if (instance != null) {
        instance.setStatus(result.isApproved() ? "RUNNING" : "REJECTED");
    }
    
    return true;
}

完整代码:

package com.badao.ai.workflow;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 审批管理器:管理工作流的暂停与恢复
 * 使用 ConcurrentHashMap + CountDownLatch 实现线程安全的审批等待机制
 */
public class ApprovalManager {

    /**
     * 审批请求信息
     */
    public static class ApprovalRequest {
        private final String workflowInstanceId;
        private final String nodeId;
        private final String message;
        private final Map<String, Object> context;
        private final CountDownLatch latch;
        private volatile ApprovalResult result;

        public ApprovalRequest(String workflowInstanceId, String nodeId, String message, Map<String, Object> context) {
            this.workflowInstanceId = workflowInstanceId;
            this.nodeId = nodeId;
            this.message = message;
            this.context = context;
            this.latch = new CountDownLatch(1);
            this.result = null;
        }

        public String getWorkflowInstanceId() {
            return workflowInstanceId;
        }

        public String getNodeId() {
            return nodeId;
        }

        public String getMessage() {
            return message;
        }

        public Map<String, Object> getContext() {
            return context;
        }

        public CountDownLatch getLatch() {
            return latch;
        }

        public ApprovalResult getResult() {
            return result;
        }

        public void setResult(ApprovalResult result) {
            this.result = result;
        }
    }

    /**
     * 审批结果
     */
    public static class ApprovalResult {
        private final boolean approved;
        private final String comment;

        public ApprovalResult(boolean approved, String comment) {
            this.approved = approved;
            this.comment = comment;
        }

        public boolean isApproved() {
            return approved;
        }

        public String getComment() {
            return comment;
        }

        public static ApprovalResult approve(String comment) {
            return new ApprovalResult(true, comment);
        }

        public static ApprovalResult reject(String comment) {
            return new ApprovalResult(false, comment);
        }
    }

    /**
     * 工作流实例状态
     */
    public static class WorkflowInstance {
        private final String instanceId;
        private final String workflowPath;
        private final Map<String, Object> context;
        private volatile String status; // RUNNING, WAITING_APPROVAL, COMPLETED, REJECTED
        private volatile String currentNode;

        public WorkflowInstance(String instanceId, String workflowPath, Map<String, Object> context) {
            this.instanceId = instanceId;
            this.workflowPath = workflowPath;
            this.context = context;
            this.status = "RUNNING";
            this.currentNode = null;
        }

        public String getInstanceId() {
            return instanceId;
        }

        public String getWorkflowPath() {
            return workflowPath;
        }

        public Map<String, Object> getContext() {
            return context;
        }

        public String getStatus() {
            return status;
        }

        public void setStatus(String status) {
            this.status = status;
        }

        public String getCurrentNode() {
            return currentNode;
        }

        public void setCurrentNode(String currentNode) {
            this.currentNode = currentNode;
        }
    }

    // 存储所有待审批的请求
    private final Map<String, ApprovalRequest> pendingApprovals = new ConcurrentHashMap<>();

    // 存储所有工作流实例
    private final Map<String, WorkflowInstance> workflowInstances = new ConcurrentHashMap<>();

    /**
     * 创建工作流实例
     */
    public WorkflowInstance createInstance(String instanceId, String workflowPath, Map<String, Object> context) {
        WorkflowInstance instance = new WorkflowInstance(instanceId, workflowPath, context);
        workflowInstances.put(instanceId, instance);
        return instance;
    }

    /**
     * 获取工作流实例
     */
    public WorkflowInstance getInstance(String instanceId) {
        return workflowInstances.get(instanceId);
    }

    /**
     * 请求人工审批(阻塞当前线程)
     * 
     * @param instanceId 工作流实例ID
     * @param nodeId 当前节点ID
     * @param message 审批说明
     * @param context 当前上下文
     * @param timeoutSeconds 超时时间(秒)
     * @return 审批结果
     */
    public ApprovalResult requestApproval(String instanceId, String nodeId, 
                                         String message, Map<String, Object> context,
                                         long timeoutSeconds) throws InterruptedException {
        
        ApprovalRequest request = new ApprovalRequest(instanceId, nodeId, message, context);
        pendingApprovals.put(instanceId + ":" + nodeId, request);

        // 更新工作流状态
        WorkflowInstance instance = workflowInstances.get(instanceId);
        if (instance != null) {
            instance.setStatus("WAITING_APPROVAL");
            instance.setCurrentNode(nodeId);
        }

        System.out.println("⏸️  工作流 [" + instanceId + "] 在节点 [" + nodeId + "] 暂停,等待人工审批...");
        System.out.println("📋 审批信息: " + message);

        // 阻塞等待审批结果
        boolean completed = request.getLatch().await(timeoutSeconds, TimeUnit.SECONDS);

        if (!completed) {
            // 超时
            pendingApprovals.remove(instanceId + ":" + nodeId);
            throw new RuntimeException("审批超时(" + timeoutSeconds + "秒)");
        }

        // 返回审批结果
        ApprovalResult result = request.getResult();
        System.out.println("✅ 审批完成: " + (result.isApproved() ? "通过" : "拒绝") + 
                          " - " + result.getComment());

        return result;
    }

    /**
     * 提交审批结果(由外部 API 调用)
     * 
     * @param instanceId 工作流实例ID
     * @param nodeId 节点ID
     * @param result 审批结果
     * @return 是否成功提交
     */
    public boolean submitApproval(String instanceId, String nodeId, ApprovalResult result) {
        String key = instanceId + ":" + nodeId;
        ApprovalRequest request = pendingApprovals.get(key);

        if (request == null) {
            System.err.println("❌ 未找到审批请求: " + key);
            return false;
        }

        request.setResult(result);
        request.getLatch().countDown(); // 唤醒等待的线程
        pendingApprovals.remove(key);

        // 更新工作流状态
        WorkflowInstance instance = workflowInstances.get(instanceId);
        if (instance != null) {
            if (result.isApproved()) {
                instance.setStatus("RUNNING");
            } else {
                instance.setStatus("REJECTED");
            }
        }

        return true;
    }

    /**
     * 获取所有待审批的请求
     */
    public Map<String, ApprovalRequest> getPendingApprovals() {
        return new ConcurrentHashMap<>(pendingApprovals);
    }

    /**
     * 获取指定实例的待审批请求
     */
    public ApprovalRequest getPendingApproval(String instanceId, String nodeId) {
        return pendingApprovals.get(instanceId + ":" + nodeId);
    }

    /**
     * 获取所有工作流实例
     */
    public Map<String, WorkflowInstance> getAllInstances() {
        return new ConcurrentHashMap<>(workflowInstances);
    }

    /**
     * 移除工作流实例
     */
    public void removeInstance(String instanceId) {
        workflowInstances.remove(instanceId);
    }
}

4.3 扩展 WorkflowEngine

文件: WorkflowEngine.java

1. 注入 ApprovalManager
@Component
public class WorkflowEngine {
    private final ChatClient chatClient;
    private final Map<String, Function<String, String>> toolRegistry;
    private final ApprovalManager approvalManager;  // 新增
    
    public WorkflowEngine(ChatClient chatClient,
                         Map<String, Function<String, String>> toolRegistry,
                         ApprovalManager approvalManager) {
        this.chatClient = chatClient;
        this.toolRegistry = toolRegistry;
        this.approvalManager = approvalManager;
    }
}
2. 新增节点分发
private String executeNode(WorkflowDefinition.Node node, Map<String, Object> context,
                           WorkflowDefinition wf, String instanceId) {
    switch (node.getType()) {
        case "start":
            return getNextNodeId(wf, node.getId());
        case "tool":
            return executeToolNode(node, context, wf);
        case "llm":
            return executeLlmNode(node, context, wf);
        case "while":
            return executeWhileNode(node, context, wf, instanceId);
        case "parallel":
            return executeParallelNode(node, context, wf, instanceId);
        case "human_approval":  // 新增
            return executeHumanApprovalNode(node, context, wf, instanceId);
        default:
            return getNextNodeId(wf, node.getId());
    }
}
3. 实现审批节点执行
private String executeHumanApprovalNode(WorkflowDefinition.Node node, Map<String, Object> context,
                                        WorkflowDefinition wf, String instanceId) {
    String approvalMessage = node.getApprovalMessage();
    if (approvalMessage == null || approvalMessage.trim().isEmpty()) {
        approvalMessage = "请审批节点: " + node.getId();
    }
    
    long timeoutSeconds = node.getTimeoutSeconds();
    if (timeoutSeconds <= 0) {
        timeoutSeconds = 300;  // 默认5分钟
    }
    
    try {
        // 请求审批(这会阻塞当前线程)
        ApprovalManager.ApprovalResult result = approvalManager.requestApproval(
            instanceId, 
            node.getId(), 
            approvalMessage, 
            context,
            timeoutSeconds
        );
        
        // 保存审批结果到上下文
        String approvalOutput = String.format(
            "审批结果: %s | 审批意见: %s",
            result.isApproved() ? "通过" : "拒绝",
            result.getComment()
        );
        saveOutput(node.getId(), approvalOutput, context);
        context.put(node.getId() + ".approved", result.isApproved());
        context.put(node.getId() + ".comment", result.getComment());
        
        // 根据审批结果决定下一步
        if (result.isApproved()) {
            System.out.println("✅ 审批通过,继续执行");
            return getNextNodeId(wf, node.getId());
        } else {
            System.out.println("❌ 审批拒绝");
            String onReject = node.getOnReject();
            return (onReject != null && !onReject.isEmpty()) ? onReject : "end";
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("审批被中断", e);
    }
}

完整代码:

package com.badao.ai.workflow;

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Component
public class WorkflowEngine {

    private final ChatClient chatClient;
    private final Map<String, Function<String, String>> toolRegistry;
    private final ApprovalManager approvalManager;

    public WorkflowEngine(ChatClient chatClient,
                          Map<String, Function<String, String>> toolRegistry,
                          ApprovalManager approvalManager) {
        this.chatClient = chatClient;
        this.toolRegistry = toolRegistry;
        this.approvalManager = approvalManager;
    }

    // ---------- 工作流执行入口 ----------
    public String execute(String workflowPath, Map<String, Object> initialContext) {
        return execute(workflowPath, initialContext, "workflow-" + System.currentTimeMillis());
    }

    /**
     * 执行工作流(带实例ID)
     */
    public String execute(String workflowPath, Map<String, Object> initialContext, String instanceId) {
        WorkflowDefinition wf = loadWorkflow(workflowPath);
        Map<String, Object> context = new HashMap<>(initialContext);
        context.put("_nodes", new HashMap<String, Object>());

        // 创建工作流实例
        ApprovalManager.WorkflowInstance instance = 
            approvalManager.createInstance(instanceId, workflowPath, context);

        String currentNode = wf.getStart();
        while (currentNode != null && !currentNode.equals("end")) {
            WorkflowDefinition.Node node = findNodeById(wf, currentNode);
            if (node == null) break;
            System.out.println("Executing node: " + node.getId());
            currentNode = executeNode(node, context, wf, instanceId);
            
            // 检查工作流是否被拒绝
            if (instance.getStatus().equals("REJECTED")) {
                System.out.println("⛔ 工作流 [" + instanceId + "] 被拒绝,终止执行");
                return "工作流被拒绝: " + 
                    (instance.getCurrentNode() != null ? instance.getCurrentNode() : "unknown");
            }
        }

        // 找到工作流中最后一个非 end、非 start 的节点,并返回其输出
        String lastOutput = null;
        Map<String, Object> nodesOutput = (Map<String, Object>) context.get("_nodes");
        if (nodesOutput != null) {
            // 按边的顺序找到最后一个被执行的节点(或直接取最后一个节点的 id)
            for (WorkflowDefinition.Edge edge : wf.getEdges()) {
                if ("end".equals(edge.getTo())) {
                    lastOutput = nodesOutput.getOrDefault(edge.getFrom(), "").toString();
                    break;
                }
            }
        }
        
        // 更新工作流状态为完成
        instance.setStatus("COMPLETED");
        
        return lastOutput != null ? lastOutput : "No output generated";
    }

    // ---------- 节点分发 ----------
    private String executeNode(WorkflowDefinition.Node node, Map<String, Object> context,
                               WorkflowDefinition wf, String instanceId) {
        switch (node.getType()) {
            case "start":
                return getNextNodeId(wf, node.getId());
            case "tool":
                return executeToolNode(node, context, wf);
            case "llm":
                return executeLlmNode(node, context, wf);
            case "while":
                return executeWhileNode(node, context, wf, instanceId);
            case "parallel":
                return executeParallelNode(node, context, wf, instanceId);
            case "human_approval":
                return executeHumanApprovalNode(node, context, wf, instanceId);
            default:
                return getNextNodeId(wf, node.getId());
        }
    }

    // ---------- 工具节点 ----------
    private String executeToolNode(WorkflowDefinition.Node node, Map<String, Object> context,
                                   WorkflowDefinition wf) {
        Function<String, String> tool = toolRegistry.get(node.getTool());
        if (tool == null) throw new RuntimeException("Tool not found: " + node.getTool());
        String resolvedInput = resolveExpression(node.getInput(), context);
        String result = tool.apply(resolvedInput);
        saveOutput(node.getId(), result, context);
        return getNextNodeId(wf, node.getId());
    }

    // ---------- LLM 节点 ----------
    private String executeLlmNode(WorkflowDefinition.Node node, Map<String, Object> context,
                                  WorkflowDefinition wf) {
        String resolvedPrompt = resolveExpression(node.getPrompt(), context);
        String response = chatClient.prompt().user(resolvedPrompt).call().content();
        saveOutput(node.getId(), response, context);
        return getNextNodeId(wf, node.getId());
    }

    // ---------- While 循环 ----------
    private String executeWhileNode(WorkflowDefinition.Node node, Map<String, Object> context,
                                    WorkflowDefinition wf, String instanceId) {
        int count = 0;
        while (count < node.getMaxIterations()) {
            // 先解析条件
            String condition = resolveExpression(node.getCondition(), context);
            // 条件不满足(null、空、false)则直接退出
            if (condition == null || !"true".equalsIgnoreCase(condition)) {
                break;
            }
            // 条件满足,执行循环体
            for (WorkflowDefinition.Node bodyNode : node.getBody()) {
                executeNode(bodyNode, context, wf, instanceId);
            }
            count++;
        }
        return getNextNodeId(wf, node.getId());
    }

    // ---------- 并行节点 ----------
    private String executeParallelNode(WorkflowDefinition.Node node, Map<String, Object> context,
                                       WorkflowDefinition wf, String instanceId) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (WorkflowDefinition.ParallelBranch branch : node.getBranches()) {
            List<WorkflowDefinition.Node> branchNodes = branch.getNodes(); // 获取分支内的节点列表
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                Map<String, Object> branchContext = new HashMap<>(context);
                for (WorkflowDefinition.Node branchNode : branchNodes) {
                    executeNode(branchNode, branchContext, wf, instanceId);
                }
                synchronized (context) {
                    context.putAll(branchContext);
                }
            });
            futures.add(future);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        return getNextNodeId(wf, node.getId());
    }

    // ---------- 人工审批节点 ----------
    private String executeHumanApprovalNode(WorkflowDefinition.Node node, Map<String, Object> context,
                                            WorkflowDefinition wf, String instanceId) {
        String approvalMessage = node.getApprovalMessage();
        if (approvalMessage == null || approvalMessage.trim().isEmpty()) {
            approvalMessage = "请审批节点: " + node.getId();
        }
        
        long timeoutSeconds = node.getTimeoutSeconds();
        if (timeoutSeconds <= 0) {
            timeoutSeconds = 300; // 默认5分钟
        }

        try {
            // 请求审批(这会阻塞当前线程)
            ApprovalManager.ApprovalResult result = approvalManager.requestApproval(
                instanceId, 
                node.getId(), 
                approvalMessage, 
                context,
                timeoutSeconds
            );

            // 保存审批结果到上下文
            String approvalOutput = String.format(
                "审批结果: %s | 审批意见: %s",
                result.isApproved() ? "通过" : "拒绝",
                result.getComment()
            );
            saveOutput(node.getId(), approvalOutput, context);
            context.put(node.getId() + ".approved", result.isApproved());
            context.put(node.getId() + ".comment", result.getComment());

            // 根据审批结果决定下一步
            if (result.isApproved()) {
                System.out.println("✅ 审批通过,继续执行");
                return getNextNodeId(wf, node.getId());
            } else {
                System.out.println("❌ 审批拒绝");
                // 如果配置了 onReject,跳转到指定节点,否则结束工作流
                String onReject = node.getOnReject();
                return (onReject != null && !onReject.isEmpty()) ? onReject : "end";
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("审批被中断", e);
        } catch (RuntimeException e) {
            throw new RuntimeException("审批失败: " + e.getMessage(), e);
        }
    }

    // ---------- 辅助方法 ----------
    private WorkflowDefinition.Node findNodeById(WorkflowDefinition wf, String id) {
        return wf.getNodes().stream().filter(n -> n.getId().equals(id)).findFirst().orElse(null);
    }

    private String getNextNodeId(WorkflowDefinition wf, String currentId) {
        return wf.getEdges().stream()
                .filter(e -> e.getFrom().equals(currentId))
                .map(WorkflowDefinition.Edge::getTo)
                .findFirst().orElse("end");
    }

    private void saveOutput(String nodeId, String output, Map<String, Object> context) {
        ((Map<String, Object>) context.get("_nodes")).put(nodeId, output);
        context.put(nodeId + ".output", output);
    }

    private String resolveExpression(String expr, Map<String, Object> context) {
        if (expr == null) return null;
        Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}");
        Matcher matcher = pattern.matcher(expr);
        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            String var = matcher.group(1);
            Object value = context.get(var);
            if (value == null) {
                Map<String, Object> nodes = (Map<String, Object>) context.get("_nodes");
                if (nodes != null) value = nodes.get(var);
            }
            matcher.appendReplacement(sb, value != null ? value.toString() : "");
        }
        matcher.appendTail(sb);
        return sb.toString();
    }

    private WorkflowDefinition loadWorkflow(String path) {
        Yaml yaml = new Yaml();
        InputStream in = getClass().getResourceAsStream(path);
        return yaml.loadAs(in, WorkflowDefinition.class);
    }
}

4.4 创建 ApprovalController

文件: ApprovalController.java

@RestController
@RequestMapping("/api/approval")
public class ApprovalController {
    
    private final ApprovalManager approvalManager;
    
    public ApprovalController(ApprovalManager approvalManager) {
        this.approvalManager = approvalManager;
    }
    
    // 提交审批结果
    @PostMapping("/submit")
    public ResponseEntity<Map<String, Object>> submitApproval(
            @RequestBody ApprovalRequest request) {
        // ...
    }
    
    // 查看待审批列表
    @GetMapping("/pending")
    public ResponseEntity<Map<String, Object>> getPendingApprovals() {
        // ...
    }
    
    // 查看工作流实例
    @GetMapping("/instances")
    public ResponseEntity<Map<String, Object>> getInstances() {
        // ...
    }
}

4.5 配置 Bean

文件: ToolConfig.java

@Configuration
public class ToolConfig {
    
    @Bean
    public ApprovalManager approvalManager() {
        return new ApprovalManager();
    }
    
    // ... 其他 Bean
}

五、关键知识点解析

5.1 CountDownLatch 原理

什么是 CountDownLatch?

CountDownLatch 是 JDK java.util.concurrent 包中的同步辅助类,允许一个或多个线程等待其他线程完成操作。

核心方法
方法 说明
CountDownLatch(int count) 构造函数,设置初始计数
await() 阻塞等待,直到计数为0
await(long timeout, TimeUnit unit) 带超时的阻塞等待
countDown() 计数减1
getCount() 获取当前计数
工作原理
初始状态: count = 1

工作流线程:                    HTTP API 线程:
  ↓                              ↓
latch.await()  ←── 阻塞等待 ───┐
  ↓                           │
  │                    latch.countDown()
  │                           │
  │                    count = 0
  │                           │
  │                    唤醒等待的线程
  ↓                           ↓
继续执行...
代码示例
// 1. 创建 CountDownLatch,初始计数为1
CountDownLatch latch = new CountDownLatch(1);

// 2. 工作流线程阻塞等待
new Thread(() -> {
    System.out.println("等待审批...");
    latch.await();  // 阻塞,直到 countDown 被调用
    System.out.println("审批完成,继续执行");
}).start();

// 3. HTTP API 线程提交审批
Thread.sleep(2000);  // 模拟审批时间
System.out.println("提交审批结果");
latch.countDown();  // 计数减1,唤醒阻塞的线程

5.2 ConcurrentHashMap 原理

为什么使用 ConcurrentHashMap?
对比项 HashMap ConcurrentHashMap
线程安全 ❌ 否 ✅ 是
并发读写 ❌ 会出现问题 ✅ 支持
性能 较高(分段锁/CAS)
Null 值 允许 不允许
JDK 8+ 实现原理
ConcurrentHashMap 结构:

┌─────────────────────────────────────┐
│            数组 (Node[])             │
├──────────┬──────────┬───────────────┤
│ Node[0]  │ Node[1]  │ Node[2] ...   │
│    ↓     │    ↓     │               │
│ 链表/树  │ 链表/树  │               │
└──────────┴──────────┴───────────────┘

并发控制机制:
- 读操作: 无锁(volatile 保证可见性)
- 写操作: CAS + synchronized(只锁当前桶)
- 扩容: 多线程协助扩容
在审批系统中的应用
// 存储待审批请求
private final Map<String, ApprovalRequest> pendingApprovals = new ConcurrentHashMap<>();

// 存储工作流实例
private final Map<String, WorkflowInstance> workflowInstances = new ConcurrentHashMap<>();

// 多线程安全访问
pendingApprovals.put(key, request);    // HTTP 线程写入
ApprovalRequest req = pendingApprovals.get(key);  // 工作流线程读取

5.3 volatile 关键字

作用
  1. 保证可见性: 一个线程修改了 volatile 变量,其他线程立即可见
  2. 禁止指令重排序: 防止编译器和处理器对指令重排序
在审批系统中的应用
public static class ApprovalRequest {
    private volatile ApprovalResult result;  // 使用 volatile
    
    public void setResult(ApprovalResult result) {
        this.result = result;  // HTTP 线程写入
    }
    
    public ApprovalResult getResult() {
        return result;  // 工作流线程读取,保证看到最新值
    }
}
为什么需要 volatile?
没有 volatile 的问题:

HTTP 线程:                    工作流线程:
  ↓                              ↓
result = new ApprovalResult()   │
  ↓                              │
(修改在 CPU 缓存中)            │
                                 ↓
                              result == null?  ← 可能看不到更新!

使用 volatile 后:

HTTP 线程:                    工作流线程:
  ↓                              ↓
result = new ApprovalResult()   │
  ↓                              │
(强制刷新到主内存)             │
                                 ↓
                              result != null  ← 一定能看到更新!

5.4 异步执行机制

为什么需要异步?
同步执行的问题:

HTTP 请求 → 启动工作流 → 遇到审批节点 → 阻塞等待 → ... (可能几分钟)
                ↓
            HTTP 超时!客户端等待太久

异步执行的解决方案:

HTTP 请求 → 启动工作流(新线程) → 立即返回
                ↓
            工作流在后台运行
                ↓
            遇到审批节点 → 阻塞等待(不影响 HTTP 响应)
                ↓
            外部 API 提交审批 → 工作流继续
实现代码
@PostMapping("/workflow/approval")
public Map<String, Object> runWorkflowWithApproval(@RequestBody Map<String, String> input) {
    String instanceId = input.getOrDefault("instanceId", "workflow-" + System.currentTimeMillis());
    
    // 在新线程中执行工作流
    Thread workflowThread = new Thread(() -> {
        try {
            String result = engine.execute(workflow, context, instanceId);
            System.out.println("✅ 工作流 [" + instanceId + "] 执行完成");
        } catch (Exception e) {
            System.err.println("❌ 工作流执行失败: " + e.getMessage());
        }
    });
    workflowThread.setName("workflow-" + instanceId);
    workflowThread.start();
    
    // 立即返回响应
    Map<String, Object> response = new HashMap<>();
    response.put("success", true);
    response.put("instanceId", instanceId);
    response.put("message", "工作流已启动");
    return response;
}

5.5 YAML DSL 解析

SnakeYAML 库

Spring Boot 默认包含 SnakeYAML 依赖,用于解析 YAML 文件。

解析流程
YAML 文件:                    Java 对象:
                                   
name: "content_review"    →    WorkflowDefinition
start: "start_node"              ├─ name: "content_review"
nodes:                           ├─ start: "start_node"
  - id: "start_node"             └─ nodes:
    type: "start"                    ├─ Node(id="start_node", type="start")
  - id: "review"                     └─ Node(id="review", type="human_approval")
    type: "human_approval"
    approvalMessage: "请审核"
代码实现
private WorkflowDefinition loadWorkflow(String path) {
    Yaml yaml = new Yaml();
    InputStream in = getClass().getResourceAsStream(path);
    return yaml.loadAs(in, WorkflowDefinition.class);
}

六、完整使用流程

6.1 启动项目

# 1. 启动 Ollama
ollama serve

# 2. 启动 Spring Boot 应用
mvn spring-boot:run

6.2 启动工作流

Windows PowerShell:

Invoke-RestMethod -Uri "http://localhost:886/api/workflow/approval" `
  -Method POST -ContentType "application/json" -Body @{
    workflow = "/workflows/content_review_workflow.yaml"
    instanceId = "test-001"
    product_name = "智能手表Pro"
} | ConvertTo-Json

Linux/Mac:

curl -X POST http://localhost:886/api/workflow/approval \
  -H "Content-Type: application/json" \
  -d '{
    "workflow": "/workflows/content_review_workflow.yaml",
    "instanceId": "test-001",
    "product_name": "智能手表Pro"
  }'

响应:

{
  "instanceId": "test-001",
  "success": true,
  "message": "工作流已启动,请通过 /api/approval/pending 查看待审批请求"
}

请添加图片描述

6.3 查看待审批

curl http://localhost:886/api/approval/pending

控制台输出:

Executing node: start_node
Executing node: generate_content
Executing node: content_review
⏸️  工作流 [test-001] 在节点 [content_review] 暂停,等待人工审批...
📋 审批信息: 请审核AI生成的营销文案是否合适发布

请添加图片描述

6.4 提交审批

审批通过:

curl -X POST http://localhost:886/api/approval/submit \
  -H "Content-Type: application/json" \
  -d '{
    "instanceId": "test-001",
    "nodeId": "content_review",
    "approved": true,
    "comment": "文案很好,可以发布"
  }'

请添加图片描述

审批拒绝:

curl -X POST http://localhost:886/api/approval/submit \
  -H "Content-Type: application/json" \
  -d '{
    "instanceId": "test-001",
    "nodeId": "content_review",
    "approved": false,
    "comment": "语气过于夸张,需要更专业"
  }'

6.5 观察执行

审批通过后:

✅ 审批完成: 通过 - 文案很好,可以发布
✅ 审批通过,继续执行
Executing node: translate_to_english
Executing node: final_output
✅ 工作流 [test-001] 执行完成: ...

请添加图片描述

七、实战场景示例

7.1 内容审核工作流

文件: content_review_workflow.yaml

name: "content_review_workflow"
start: "start_node"
nodes:
  - id: "start_node"
    type: "start"

  # AI 生成内容
  - id: "generate_content"
    type: "llm"
    prompt: "请为产品 ${product_name} 生成营销文案..."

  # 人工审核
  - id: "content_review"
    type: "human_approval"
    approvalMessage: "请审核AI生成的营销文案是否合适发布"
    timeoutSeconds: 300
    onReject: "regenerate_content"

  # 审批通过:翻译为英文
  - id: "translate_to_english"
    type: "llm"
    prompt: "请将以下中文翻译为英文:${generate_content.output}"

  # 审批拒绝:重新生成
  - id: "regenerate_content"
    type: "llm"
    prompt: "根据拒绝意见重新生成:${content_review.comment}"

edges:
  - from: "start_node"
    to: "generate_content"
  - from: "generate_content"
    to: "content_review"
  - from: "content_review"
    to: "translate_to_english"
  - from: "translate_to_english"
    to: "end"

7.2 大额支付审批

文件: payment_approval_workflow.yaml

name: "payment_approval_workflow"
nodes:
  # 查询订单
  - id: "get_order_info"
    type: "llm"
    prompt: "查询订单 ${order_id} 详情..."

  # 检查金额
  - id: "check_amount"
    type: "llm"
    prompt: "订单金额是否超过1000元?回复 true 或 false"

  # 条件审批
  - id: "need_approval_check"
    type: "while"
    condition: "${check_amount.output}"
    maxIterations: 1
    body:
      - id: "human_approval"
        type: "human_approval"
        approvalMessage: "订单金额较大,请人工审批"
        timeoutSeconds: 600
        onReject: "reject_notification"

  # 审批通过:处理支付
  - id: "process_payment"
    type: "llm"
    prompt: "订单已通过审批,处理支付..."

八、最佳实践与注意事项

8.1 超时设置

# ✅ 推荐:根据业务场景设置合理超时
- id: "content_review"
  type: "human_approval"
  timeoutSeconds: 300  # 5分钟

# ❌ 不推荐:超时时间过短
- id: "content_review"
  timeoutSeconds: 10  # 10秒太短

# ❌ 不推荐:不设超时(会永久阻塞)
- id: "content_review"
  # 缺少 timeoutSeconds

8.2 拒绝后跳转

# ✅ 推荐:配置拒绝后的处理逻辑
- id: "human_approval"
  onReject: "regenerate_content"

# ❌ 不推荐:不配置(工作流直接结束)
- id: "human_approval"
  # 缺少 onReject

8.3 并发安全

// ✅ 推荐:使用 ConcurrentHashMap
private final Map<String, ApprovalRequest> pendingApprovals = new ConcurrentHashMap<>();

// ❌ 不推荐:使用 HashMap(线程不安全)
private final Map<String, ApprovalRequest> pendingApprovals = new HashMap<>();

8.4 状态可见性

// ✅ 推荐:使用 volatile
private volatile ApprovalResult result;

// ❌ 不推荐:不使用 volatile(可能看不到更新)
private ApprovalResult result;

8.5 异步执行

// ✅ 推荐:异步执行工作流
Thread workflowThread = new Thread(() -> {
    engine.execute(workflow, context, instanceId);
});
workflowThread.start();

// ❌ 不推荐:同步执行(会阻塞 HTTP 响应)
engine.execute(workflow, context, instanceId);

九、扩展方向

9.1 持久化

// 将审批记录存储到数据库
public void recordApproval(String instanceId, String nodeId, ApprovalResult result) {
    ApprovalRecord record = new ApprovalRecord();
    record.setInstanceId(instanceId);
    record.setNodeId(nodeId);
    record.setApproved(result.isApproved());
    record.setComment(result.getComment());
    record.setCreateTime(LocalDateTime.now());
    
    approvalRecordRepository.save(record);
}

9.2 消息通知

// 发送邮件通知
public void notifyApprover(String instanceId, String message) {
    SimpleMailMessage mailMessage = new SimpleMailMessage();
    mailMessage.setTo("approver@example.com");
    mailMessage.setSubject("工作流审批请求");
    mailMessage.setText("实例ID: " + instanceId + "\n审批信息: " + message);
    
    mailSender.send(mailMessage);
}

9.3 多级审批

// 支持多级审批流程
public ApprovalResult requestMultiLevelApproval(
        String instanceId, 
        List<String> approverRoles) {
    
    for (String role : approverRoles) {
        ApprovalResult result = waitForRoleApproval(instanceId, role);
        if (!result.isApproved()) {
            return result;
        }
    }
    
    return ApprovalResult.approve("多级审批全部通过");
}

9.4 WebSocket 推送

// 实时推送审批请求
@Autowired
private SimpMessagingTemplate messagingTemplate;

public void notifyApprover(String instanceId, String message) {
    messagingTemplate.convertAndSend(
        "/topic/approval/" + instanceId,
        new ApprovalNotification(instanceId, message)
    );
}

十、总结

10.1 核心成果

通过本次实战,我们成功实现了:

工作流暂停与恢复: 使用 CountDownLatch 实现线程安全的阻塞/唤醒
灵活配置: YAML 中配置审批信息和拒绝后跳转
REST API: 完整的审批管理接口
状态跟踪: 实时查看工作流实例状态
超时处理: 防止无限期等待
零额外依赖: 仅使用 JDK 自带并发工具

10.2 技术亮点

技术点 说明
CountDownLatch JDK 并发工具,实现线程阻塞/唤醒
ConcurrentHashMap 线程安全的 Map,支持高并发访问
volatile 保证变量可见性,防止指令重排序
异步执行 新线程运行工作流,避免阻塞 HTTP 响应
YAML DSL 声明式配置,易于理解和维护

10.3 适用场景

  • 💰 大额支付审批
  • 📝 内容审核发布
  • 🔐 敏感操作确认
  • 👤 权限申请审批
  • 📋 订单人工复核

10.4 学习价值

通过本项目,你可以掌握:

  1. JDK 并发编程: CountDownLatch、ConcurrentHashMap、volatile
  2. Spring Boot 开发: REST API、Bean 配置、依赖注入
  3. Spring AI 集成: ChatClient、工具链、工作流引擎
  4. YAML DSL 设计: 声明式配置、节点编排、变量传递
  5. 异步编程模式: 线程管理、阻塞/唤醒、状态同步

📚 相关资源

  • Spring AI 官方文档: https://docs.spring.io/spring-ai/reference
  • JDK 并发工具: https://docs.oracle.com/javase/tutorial/essential/concurrency

技术栈: Spring Boot 3.3.3 + Spring AI 1.1.2 + Ollama
并发工具: JDK ConcurrentHashMap + CountDownLatch

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐