工作流图执行引擎(GraphEngine)—— 源码解析与面试指南

本文档面向两类读者:

  • 学习者:系统了解基于 DAG 的工作流引擎的完整设计思路与实现

:文档中的代码已做脱敏处理,包名统一改为 com.workflow,内部工具类替换为标准实现。


目录


一、整体架构概览

1.1 系统位置

前端设计器 (React Flow)
      │ 保存工作流 JSON
      ▼
  后端 API 层
      │ 解析 FlowWsDTO
      ▼
 GraphEngineFactory          ← 唯一入口,工厂 + 缓存
      │ clone() 深拷贝
      ▼
   GraphEngine                ← 图执行引擎核心
      ├── EdgeManage           ← 边的倒排索引(DAG 结构)
      ├── GraphState           ← 运行时共享状态
      ├── NodeFactory          ← 按类型实例化节点
      ├── RunGraphNodeThreadPool ← 并行执行线程池
      └── BaseNode 子类群      ← 各种具体节点实现

1.2 工作流执行状态机

           首次调用 run()
                │
                ▼
           WAITING(初始)
                │ startRun()
                ▼
           RUNNING(运行中)
          /     │     \     \
         /      │      \     \
    WAITING  COMPLETED FAILED STOPPED
   (遇到交互  (队列为空  (节点异常  (用户主动
     节点)    正常结束)  或超步数)    停止)
        │
        │ continueRun(inputData)
        ▼
       RUNNING

1.3 DAG 执行时序示例

工作流结构:
  [START] → [LLM节点A] → [RAG节点B] ─┐
                        → [代码节点C] ─┤→ [聚合节点D] → [END]
                        → [SQL节点E]  ─┘

执行时序(主线程视角):
  轮次1: runQueue=[START]     → 执行START → 入队[A]
  轮次2: runQueue=[A]         → 执行A     → 入队[B,C,E]
  轮次3: runQueue=[B,C,E]     → B/C/E并行 → 各自入队[D](去重)
  轮次4: runQueue=[D(WAITING)]→ 前驱全完 → 执行D → 入队[END]
  轮次5: runQueue=[END]       → 执行END  → COMPLETED

二、核心类清单

层级 类名 作用简述
核心引擎 GraphEngine 图调度引擎,驱动整个工作流运行
核心引擎 GraphEngineFactory 工厂单例,预加载模板+深拷贝创建实例
核心引擎 GraphState 运行时全局状态,节点间变量共享
边管理 EdgeManage 维护正向/反向倒排索引,支持 DAG 遍历
边管理 EdgeBase 边数据结构(源节点、目标节点、条件等)
节点层 BaseNode 节点抽象基类,定义生命周期钩子和路由接口
节点层 NodeFactory 策略模式节点工厂,按类型创建具体节点
并行执行 RunGraphNodeThreadPool Spring Bean 线程池,负责节点并行调度
并行执行 RunGraphNodeTask 节点执行任务封装(Callable)
枚举/模型 NodeInteractiveType 节点交互类型(5种),决定调度策略
枚举/模型 GraphEngineStatus 引擎状态枚举
枚举/模型 GraphEngineMonitor 运行监控信息(步数、耗时等)

三、核心类源码解析

3.1 EdgeManage — 边管理与倒排索引

设计思路:在引擎初始化时,一次性将所有边构建成两个 HashMap,空间换时间,运行期 O(1) 查询。

package com.workflow.engine.edges;

/**
 * 边管理类 —— 核心是两个倒排索引
 *
 * 设计目的:避免每次查找边时遍历全量边列表(O(n)),
 * 预先建立 HashMap 后运行期 O(1) 查询
 */
public class EdgeManage {

    /**
     * 正向索引:sourceNodeId → List<出边>
     * 用途:节点执行完毕后,查找它的所有后继节点
     */
    private Map<String, List<EdgeBase>> nodeTargetEdges = new HashMap<>();

    /**
     * 反向索引:targetNodeId → List<入边>
     * 用途:节点被创建时,注入其所有入边(前驱信息)
     *       节点内部可用来判断前驱是否全部执行完成
     */
    private Map<String, List<EdgeBase>> nodeSourceEdges = new HashMap<>();

    public EdgeManage(List<EdgeBase> edges) {
        buildEdgeCache(edges);
    }

    /** 构建时一次性建立两个索引 */
    private void buildEdgeCache(List<EdgeBase> edges) {
        for (EdgeBase edge : edges) {
            // 正向:source → [edge1, edge2, ...]
            nodeTargetEdges.computeIfAbsent(edge.getSourceId(), k -> new ArrayList<>()).add(edge);
            // 反向:target → [edge1, edge2, ...]
            nodeSourceEdges.computeIfAbsent(edge.getTargetId(), k -> new ArrayList<>()).add(edge);
        }
    }

    /** 获取某节点的所有出边(找后继) */
    public List<EdgeBase> getTargetEdges(String sourceId) {
        return nodeTargetEdges.getOrDefault(sourceId, new ArrayList<>());
    }

    /** 获取某节点的所有入边(找前驱) */
    public List<EdgeBase> getSourceEdges(String targetId) {
        return nodeSourceEdges.getOrDefault(targetId, new ArrayList<>());
    }

    /** 按条件(sourceHandle)过滤出边,用于条件分支路由 */
    public List<EdgeBase> getTargetEdgesByOutputType(String sourceId, String outputType) {
        return getTargetEdges(sourceId).stream()
                .filter(edge -> outputType == null || outputType.equals(edge.getSourceHandleType()))
                .collect(Collectors.toList());
    }
}

3.2 GraphState — 运行时状态管理

设计思路:以 nodeId 为一级 Key 分区存储,天然隔离各节点的变量,同时使用 ConcurrentHashMap 保证并行节点写入的线程安全。

package com.workflow.engine.graph;

/**
 * 图状态类 —— 工作流运行期间的全局共享状态
 *
 * 核心设计:
 * 1. 双层 ConcurrentHashMap:外层 key=nodeId,内层 key=变量名
 *    → 各节点变量天然分区,并行节点写入互不干扰
 * 2. transferVariables:主流程向子流程传参的专用通道
 * 3. referGraphEnginList:持有子流程 GraphEngine 引用,用于 close() 时递归释放
 */
public class GraphState {

    /** 节点变量存储:nodeId → (varName → value) */
    private Map<String, Map<String, Object>> nodeVariables = new ConcurrentHashMap<>();

    /** 对话上下文历史 */
    private List<Map<String, String>> chatContext = new ArrayList<>();

    /** 主流程向子流程传输的变量:nodeId → data */
    private Map<String, Map<String, Object>> transferVariables = new ConcurrentHashMap<>();

    /** 本流程引用的子流程引擎列表(用于 close() 时递归释放资源) */
    private List<GraphEngine> referGraphEnginList = new ArrayList<>();

    /** 设置变量,以 nodeId 为分区键 */
    public void setVariable(String nodeId, String name, Object value) {
        nodeVariables.computeIfAbsent(nodeId, k -> new ConcurrentHashMap<>()).put(name, value);
    }

    /**
     * 获取变量,优先从 nodeVariables 找,找不到再从 transferVariables 找
     * transferVariables 存放的是主流程传入的参数
     */
    public Object getVariable(String nodeId, String name) {
        Map<String, Object> vars = nodeVariables.getOrDefault(nodeId,
                transferVariables.get(nodeId));
        return vars != null ? vars.get(name) : null;
    }

    /**
     * 通过变量路径字符串获取值
     * 格式:"nodeId.varName",例如 "llm_001.output"
     */
    public Object getVariableByStr(String variablePath) {
        String[] parts = variablePath.split("\\.");
        if (parts.length != 2) return null;
        return getVariable(parts[0], parts[1]);
    }

    /** 保存对话上下文(human/ai 角色) */
    public void saveContext(String message, RoleType role) {
        Map<String, String> item = new HashMap<>();
        item.put("message", message);
        item.put("role", role.getType());
        item.put("timestamp", String.valueOf(System.currentTimeMillis()));
        chatContext.add(item);
    }

    /** 释放所有状态(引擎 close() 时调用) */
    public void clear() {
        nodeVariables.clear();
        chatContext.clear();
        transferVariables.clear();
        referGraphEnginList.forEach(GraphEngine::close);
    }
}

3.3 NodeInteractiveType — 节点交互类型枚举

设计思路:用枚举定义 5 种调度策略,引擎通过多态(getInteractiveType())而非 instanceof 判断决定如何调度,符合开闭原则。

package com.workflow.engine.common.enums;

/**
 * 节点交互类型枚举 —— 决定节点在图引擎中的调度策略
 *
 * 这是整个调度系统的核心分类依据,引擎根据此类型决定:
 * - 串行 or 并行
 * - 是否挂起等待用户输入
 * - 执行顺序优先级
 */
public enum NodeInteractiveType {

    /**
     * 前置交互节点
     * 需要先收集用户输入,再执行节点
     * 典型代表:InputNode(用户输入节点)、OutputNode(选择/输入模式)
     * 调度行为:优先处理,立即挂起引擎(status=WAITING),等待用户输入后恢复
     */
    PERPOSE(1, "prepose", "交互前置节点"),

    /**
     * 后置交互节点
     * 先执行节点,执行过程中可能触发内部等待
     * 典型代表:SubWorkFlowNode(子工作流节点)、ParamCollectorNode(参数收集节点)
     * 调度行为:串行执行,执行完后检查是否有内部等待状态
     */
    POSTPOSITION(2, "postposition", "交互后置节点"),

    /**
     * 非交互节点(默认类型)
     * 无需用户交互,纯计算型节点
     * 典型代表:LLMNode、RagNode、CodeNode、SqlExecutorNode、ToolNode 等
     * 调度行为:同批节点全部提交线程池并行执行
     */
    NOINTERACTIVE(3, "noInteractive", "非交互节点"),

    /**
     * 结束节点
     * 工作流的终止节点
     * 典型代表:EndNode
     * 调度行为:始终加入队尾,在所有其他节点完成后最后串行执行
     */
    END(4, "end", "结束节点"),

    /**
     * 等待/聚合节点(合流节点)
     * 需要等待所有上游并行分支全部完成后才能执行
     * 典型代表:RunWaitingNode(聚合节点)
     * 调度行为:每轮检查前驱是否完成,未完成则放回队尾(忙等策略)
     */
    WAITING(5, "polymerize", "等待节点");

    private final Integer value;
    private final String type;
    private final String desc;

    NodeInteractiveType(Integer value, String type, String desc) {
        this.value = value;
        this.type = type;
        this.desc = desc;
    }

    // getter 方法省略
}

各节点类型与 InteractiveType 对应关系:

节点类型 InteractiveType 是否重写 getInteractiveType() 特殊说明
StartNode NOINTERACTIVE 否(继承默认值)
EndNode END 固定返回 END
InputNode PERPOSE / NOINTERACTIVE 动态:无预设值→PERPOSE,有预设值→NOINTERACTIVE
OutputNode PERPOSE / NOINTERACTIVE 动态:选择/输入模式→PERPOSE,纯展示→NOINTERACTIVE
LLMNode NOINTERACTIVE
RagNode NOINTERACTIVE
CodeNode NOINTERACTIVE
SubWorkFlowNode POSTPOSITION 子流程内部可能触发等待
ParamCollectorNode POSTPOSITION 运行中动态收集参数
RunWaitingNode WAITING 合流等待节点

3.4 BaseNode — 节点抽象基类

设计思路:模板方法模式,run() 定义执行骨架(生命周期回调、变量保存、日志),_run() 由子类实现具体逻辑。生命周期钩子(lifecycle_*)允许节点自定义交互行为,消除引擎中的 instanceof 判断。

package com.workflow.engine.nodes;

/**
 * 节点抽象基类
 *
 * 核心设计模式:
 * 1. 模板方法模式:run() 定义执行骨架,_run() 由子类实现
 * 2. 生命周期钩子:lifecycle_* 方法允许子类自定义行为,引擎通过多态调用
 * 3. 多态路由:routeNode() 默认返回所有出边目标,条件节点可重写为按条件路由
 */
public abstract class BaseNode {

    protected String id;           // 节点ID
    protected String type;         // 节点类型字符串
    protected String name;         // 节点名称
    protected List<EdgeBase> targetEdges;  // 出边列表(正向索引注入)
    protected List<EdgeBase> sourceEdges;  // 入边列表(反向索引注入)
    protected GraphState graphState;       // 全局状态(共享)
    protected BaseNodeData nodeData;       // 节点配置数据
    protected Map<String, Object> nodeParams = new HashMap<>(); // 展平后的参数
    protected BaseCallback callback;       // 回调接口(WebSocket推送)
    protected String runSourceNodeId;      // 执行来源节点ID(由引擎设置)
    protected boolean stopFlag = false;    // 停止标志

    /**
     * 节点执行入口 —— 模板方法
     * 定义固定骨架:回调通知 → 执行 → 保存变量 → 日志
     */
    public Map<String, Object> run(Map<String, Object> state) {
        if (stopFlag) throw new RuntimeException("节点已被停止");
        String execId = UUID.randomUUID().toString().replaceAll("-", "");

        // 1. 通知回调:节点开始(用于 WebSocket 实时推送)
        callback.onNodeStart(NodeStartData.builder().nodeId(id).name(name).build());

        Map<String, Object> result = null;
        try {
            // 2. 执行子类实现的具体逻辑(模板方法核心)
            result = _run(execId);

            // 3. 将执行结果保存到 GraphState(以 nodeId 分区)
            if (result != null) {
                result.forEach((key, value) -> {
                    graphState.setVariable(id, key, value);
                    state.put(key, value);
                });
            }
        } catch (Exception e) {
            callback.onNodeError(this, e.getMessage());
            throw e;
        } finally {
            // 4. 通知回调:节点结束(附带日志和耗时)
            callback.onNodeEnd(NodeEndData.builder()
                    .nodeId(id).logData(parseLog(execId, result)).build());
        }
        return state;
    }

    /**
     * 子类实现具体执行逻辑 —— 模板方法的变化点
     */
    public abstract Map<String, Object> _run(String uniqueId);

    /**
     * 路由决策 —— 决定下一步执行哪些节点
     *
     * 默认实现:返回所有出边的目标节点(普通顺序节点)
     * 重写场景:条件节点、意图识别节点 —— 根据运行结果只返回满足条件的分支
     *
     * 对应 LangGraph 的 conditional_edge function
     */
    public List<String> routeNode() {
        if (targetEdges != null && !targetEdges.isEmpty()) {
            return targetEdges.stream()
                    .map(EdgeBase::getTargetId)
                    .collect(Collectors.toList());
        }
        return new ArrayList<>();
    }

    /**
     * 返回节点的调度类型 —— 多态替代 instanceof
     * 默认:NOINTERACTIVE(非交互节点)
     * 子类按需重写以声明不同的调度行为
     */
    public NodeInteractiveType getInteractiveType() {
        return NodeInteractiveType.NOINTERACTIVE;
    }

    // ==================== 生命周期钩子 ====================
    // 引擎在特定时机调用这些钩子,子类可重写以实现自定义行为
    // 好处:消除引擎中的 if-else/instanceof 类型判断,符合开闭原则

    /** 是否需要用户输入 */
    public boolean lifecycle_needsUserInput() { return false; }

    /** 获取等待原因描述(用于前端展示) */
    public String lifecycle_getWaitingReason() { return "等待用户输入"; }

    /** 准备用户输入(发送交互消息、设置输入模式等) */
    public void lifecycle_prepareUserInput() {}

    /** 用户输入完成后的后处理(验证输入、更新状态等) */
    public void lifecycle_afterUserInput() {}

    /** 是否需要特殊路由处理(条件节点、意图节点等) */
    public boolean lifecycle_needsSpecialRouting() { return false; }

    /** 是否可以继续执行(合流节点检查前驱是否完成) */
    public boolean canContinueExecution() { return true; }

    /** 节点执行前的准备工作 */
    public Map<String, Object> lifecycle_beforeExecution(Map<String, Object> state) { return state; }

    /** 节点执行后的后处理 */
    public Map<String, Object> lifecycle_afterExecution(Map<String, Object> state,
                                                         Map<String, Object> result) { return result; }
}

3.5 NodeFactory — 节点工厂

设计思路:策略模式 + 函数式编程,用 EnumMap<NodeTypeEnum, Function<Context, BaseNode>> 替代 if-else/switch,新增节点类型只需在 static{} 块中加一行注册,引擎代码零改动。

package com.workflow.engine.nodes;

/**
 * 节点工厂类 —— 策略模式
 *
 * 核心设计:
 * EnumMap 存储 "节点类型 → 创建函数" 的映射
 * 新增节点:只需在 static 块加一行注册,无需修改任何其他代码(开闭原则)
 */
public class NodeFactory {

    /**
     * 节点创建器注册表
     * key: 节点类型枚举, value: 创建该类型节点的 Lambda 函数
     */
    private static final Map<NodeTypeEnum, Function<NodeCreationContext, BaseNode>> NODE_CREATORS
            = new EnumMap<>(NodeTypeEnum.class);

    static {
        // 基础节点
        NODE_CREATORS.put(NodeTypeEnum.START,   ctx -> new StartNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.END,     ctx -> new EndNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.INPUT,   ctx -> new InputNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.OUTPUT,  ctx -> new OutputNode(ctx.getNodeData(), ...));

        // AI 能力节点
        NODE_CREATORS.put(NodeTypeEnum.LLM,     ctx -> new LLMNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.RAG,     ctx -> new RagNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.AGENT,   ctx -> new AgentNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.TOOL,    ctx -> new ToolNode(ctx.getNodeData(), ...));

        // 逻辑控制节点
        NODE_CREATORS.put(NodeTypeEnum.CONDITION,  ctx -> new ConditionNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.INTENTION,  ctx -> new IntentRecognitionNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.RUNWAIT,    ctx -> new RunWaitingNode(ctx.getNodeData(), ...));

        // 数据处理节点
        NODE_CREATORS.put(NodeTypeEnum.CODE,          ctx -> new CodeNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.SQL_EXECUTOR,  ctx -> new SqlExecutorNode(ctx.getNodeData(), ...));
        NODE_CREATORS.put(NodeTypeEnum.VAR_EXTRACTOR, ctx -> new VarExtractorNode(ctx.getNodeData(), ...));

        // 工作流节点
        NODE_CREATORS.put(NodeTypeEnum.WORKFLOW,      ctx -> new SubWorkFlowNode(ctx.getNodeData(), ...));
        // ... 其他节点类型
    }

    /**
     * 创建节点实例
     * @param nodeType 节点类型整数值
     * @param params   包含创建节点所需全部信息的 Map
     */
    public static BaseNode instanceNode(Integer nodeType, Map<String, Object> params) {
        NodeTypeEnum type = NodeTypeEnum.fromValue(nodeType);
        if (type == null) {
            throw new RuntimeException("不支持的节点类型: " + nodeType);
        }

        Function<NodeCreationContext, BaseNode> creator = NODE_CREATORS.get(type);
        if (creator == null) {
            throw new RuntimeException("未找到节点创建器: " + type);
        }

        NodeCreationContext context = new NodeCreationContext(
                (BaseNodeData)   params.get("nodeData"),
                (String)         params.get("workflowId"),
                (String)         params.get("userId"),
                (GraphState)     params.get("graphState"),
                (List<EdgeBase>) params.get("targetEdges"),
                (List<EdgeBase>) params.get("sourceEdges"),
                (Integer)        params.getOrDefault("maxSteps", 100),
                (BaseCallback)   params.get("callback")
        );

        return creator.apply(context);
    }
}

3.6 RunGraphNodeThreadPool — 节点并行执行线程池

设计思路:Spring 单例 Bean,全局唯一线程池,所有并行节点共享,避免为每个工作流创建独立线程池导致的资源浪费。

package com.workflow.engine.graph.thread;

/**
 * 工作流节点并行执行线程池
 *
 * 设计要点:
 * 1. Spring @Component 单例,全局共享,避免重复创建
 * 2. 参数外部化配置,支持生产环境调优
 * 3. @PreDestroy 优雅关闭,等待任务完成后再停止
 */
@Component
public class RunGraphNodeThreadPool {

    @Value("${graph.node.thread.pool.coreSize:16}")
    private Integer corePoolSize;

    @Value("${graph.node.thread.pool.maxSize:32}")
    private Integer maxPoolSize;

    @Value("${graph.node.thread.pool.keepAliveTime:60}")
    private Integer keepAliveTime;

    @Value("${graph.node.thread.pool.workQueueSize:5000}")
    private Integer workQueueSize;

    private ExecutorService threadPool;

    @PostConstruct
    public void init() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("run-graph-node-thread-%d")  // 线程命名,方便排查问题
                .build();

        threadPool = new ThreadPoolExecutor(
                corePoolSize,                           // 核心线程数:16
                maxPoolSize,                            // 最大线程数:32
                keepAliveTime, TimeUnit.SECONDS,        // 空闲存活时间:60s
                new LinkedBlockingQueue<>(workQueueSize), // 有界队列:5000
                threadFactory,
                new ThreadPoolExecutor.AbortPolicy()    // 队满直接拒绝,避免无限堆积
        );
    }

    /**
     * 提交节点任务,返回 CompletableFuture
     * 调用方可以 .thenAccept() 异步处理结果,或 allOf().get() 阻塞等待
     */
    public CompletableFuture<NodeRunResult> submitNodeTask(RunGraphNodeTask task) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, threadPool);
    }

    @PreDestroy
    public void destroy() {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();  // 超时后强制关闭
            }
        } catch (InterruptedException e) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

3.7 RunGraphNodeTask — 节点执行任务

设计思路:将节点执行封装为 Callable<NodeRunResult>,与线程池解耦。注意跨线程的国际化语言传递:主线程的 Locale 必须手动传入子线程。

package com.workflow.engine.graph.thread;

/**
 * 节点执行任务 —— 线程池的执行单元
 *
 * 关键点:跨线程的 Locale 传递
 * Spring 的 LocaleContextHolder 是 ThreadLocal 实现,
 * 子线程无法自动继承,需要在创建任务时捕获,在 call() 中手动设置
 */
public class RunGraphNodeTask implements Callable<NodeRunResult> {

    private final BaseNode node;
    private final String workflowId;
    private final Locale locale;    // 从主线程捕获,传入子线程

    public RunGraphNodeTask(String workflowId, BaseNode node, Locale locale) {
        this.workflowId = workflowId;
        this.node = node;
        this.locale = locale;
    }

    @Override
    public NodeRunResult call() {
        try {
            // 关键:将主线程的语言设置传递到当前子线程
            LocaleContextHolder.setLocale(locale);

            // 执行节点生命周期
            Map<String, Object> state = new HashMap<>();
            state = node.lifecycle_beforeExecution(state);   // 前置钩子
            Map<String, Object> result = node.run(state);    // 执行节点
            node.lifecycle_afterExecution(state, result);    // 后置钩子

            return new NodeRunResult(NodeRunStatus.COMPLETED, "执行成功");

        } catch (Exception e) {
            log.error("节点执行失败: workflowId={}, nodeId={}", workflowId, node.getId(), e);
            return new NodeRunResult(NodeRunStatus.FAILED, e.getMessage());
        } finally {
            // 重置线程局部变量,避免线程复用时污染下一个任务
            LocaleContextHolder.resetLocaleContext();
        }
    }
}

3.8 GraphEngineFactory — 引擎工厂

设计思路:双重检查锁单例 + 模板缓存池。工作流发布后,引擎模板预加载进内存;用户调用时通过深拷贝创建全新实例,既复用解析结果,又保证状态隔离。

package com.workflow.engine.graph;

/**
 * 图引擎工厂类 —— 唯一创建 GraphEngine 实例的入口
 *
 * 核心设计:
 * 1. 单例工厂(双重检查锁)
 * 2. 模板缓存池:已发布工作流的引擎模板预加载,避免重复解析
 * 3. 深拷贝创建实例:每个用户请求获得独立实例,状态完全隔离
 * 4. LRU 淘汰:缓存超过 100 个时,淘汰最久未使用的模板
 */
public class GraphEngineFactory {

    private static final int CACHE_MAX_SIZE = 100;

    /** 工作流模板缓存:workflowId → 模板引擎(只读,不执行) */
    private static final ConcurrentHashMap<String, GraphEngine> templateCache
            = new ConcurrentHashMap<>();

    private static volatile GraphEngineFactory instance;

    public static GraphEngineFactory getInstance() {
        if (instance == null) {
            synchronized (GraphEngineFactory.class) {
                if (instance == null) {
                    instance = new GraphEngineFactory();
                    startCacheMonitorThread();  // 启动缓存淘汰后台线程
                }
            }
        }
        return instance;
    }

    /**
     * 获取可执行的 GraphEngine 新实例(生产环境入口)
     *
     * 流程:查缓存 → 未命中则从数据库加载 → 深拷贝创建新实例
     * 双重检查锁保证并发下同一工作流只加载一次
     */
    public GraphEngine getNewEngineInstanceById(String userId, String workflowId,
                                                 boolean asyncMode, BaseCallback callback) {
        GraphEngine template = templateCache.get(workflowId);

        if (template == null) {
            synchronized (workflowId.intern()) {    // 以 workflowId 为粒度加锁,避免全局锁
                template = templateCache.get(workflowId);
                if (template == null) {
                    template = loadEngineFromDb(workflowId);  // 从数据库加载并解析
                    templateCache.put(workflowId, template);
                }
            }
        }

        // 深拷贝:模板不可变,每次返回全新实例
        GraphEngine instance = template.clone();
        instance.setAsyncMode(asyncMode);
        instance.setUserId(userId);
        if (callback != null) instance.setCallback(callback);

        return instance;
    }

    /**
     * 创建 GraphEngine —— 解析 FlowWsDTO 为引擎所需数据结构
     *
     * 关键步骤:
     * 1. 解析边 → EdgeManage(建立倒排索引)
     * 2. 解析节点 → allNodeDataMap(节点配置扁平化)
     *    特别处理:groupParams(分组参数)→ allNodeParams(展平Map),
     *    方便节点运行时 O(1) 取参
     */
    private GraphEngine createEngine(String userId, boolean asyncMode,
                                      GraphRunType runType, FlowWsDTO flowDTO,
                                      BaseCallback callback) {
        // 解析边,建立倒排索引
        EdgeManage edgeManage = EdgeManage.fromEdges(
                flowDTO.getEdges() != null ? flowDTO.getEdges() : new ArrayList<>());

        // 解析节点,扁平化参数
        Map<String, BaseNodeData> allNodeDataMap = new HashMap<>();
        for (NodeDataBO nodeDataBO : flowDTO.getNodes()) {
            BaseNodeData nodeData = nodeDataBO.getData();
            if (nodeData == null) continue;

            // 将分组参数结构扁平化为 Map<key, NodeParam>
            Map<String, NodeParam> allNodeParams = new HashMap<>();
            if (nodeData.getGroupParams() != null) {
                nodeData.getGroupParams().forEach(group ->
                    group.getParams().forEach(param -> {
                        allNodeParams.put(param.getKey(), param);
                        // 递归处理子参数
                        if (param.getChildren() != null) {
                            param.getChildren().forEach(child ->
                                allNodeParams.put(child.getKey(), child));
                        }
                    })
                );
            }
            nodeData.setParams(allNodeParams);
            allNodeDataMap.put(nodeData.getId(), nodeData);
        }

        return new GraphEngine(userId, asyncMode, runType, flowDTO,
                edgeManage, allNodeDataMap, callback);
    }
}

3.9 GraphEngine — 图执行引擎核心

设计思路:状态机驱动的 DAG 调度器,通过 runQueue(双端队列)+ 节点分类执行,实现 DAG 的动态拓扑推进和并行调度。

package com.workflow.engine.graph;

/**
 * 图执行引擎 —— 工作流调度核心
 *
 * 核心设计:
 * 1. 状态机:WAITING/RUNNING/COMPLETED/FAILED/STOPPED
 * 2. 动态 DAG 遍历:节点完成后才将后继节点入队(BFS 变体)
 * 3. 节点五分类调度:不同类型节点采用不同执行策略
 * 4. 并行执行:NOINTERACTIVE 节点提交线程池并行,allOf 等待
 * 5. Cloneable:支持深拷贝,多用户实例隔离
 * 6. AutoCloseable:支持 try-with-resources,资源自动释放
 */
public class GraphEngine implements Cloneable, AutoCloseable {

    private final FlowWsDTO workflowDTO;
    private final EdgeManage edgeManage;
    private final Map<String, BaseNodeData> allNodeDataMap;  // 节点配置(只读)

    private final GraphState graphState;                      // 运行时状态(可写,线程安全)
    private final Map<String, BaseNode> nodeInstanceMap = new ConcurrentHashMap<>();  // 节点实例缓存

    private final Deque<String> runQueue = new LinkedList<>();  // 待执行队列(双端)
    private List<BaseNode> lastRunNodeList = new ArrayList<>();  // 最近一批执行节点(用于判断合流)
    private final Set<String> visitedNodeSet = new HashSet<>();  // 已访问节点(防重复)

    private GraphEngineStatus status = GraphEngineStatus.WAITING;
    private String pendingInputNodeId;  // 当前等待用户输入的节点 ID

    private final int graphMaxSteps;    // 最大步骤数(防死循环)
    private final int nodeMaxSteps;     // 单节点最大步骤数

    private RunGraphNodeThreadPool nodeThreadPool;  // 并行执行线程池(Spring Bean)

    // ==================== 公开 API ====================

    /**
     * 运行工作流(外部调用入口)
     * @param inputData 用户输入,首次调用传 null,后续续跑传用户数据
     */
    public GraphRunResult run(Map<String, Object> inputData) {
        if (inputData != null && !inputData.isEmpty()) {
            saveUserInput(inputData);   // 保存输入到对话上下文
            continueRun(inputData);     // 处理用户输入并恢复执行
        } else {
            monitor.setStartRunTime(System.currentTimeMillis());
            startRun();                 // 首次启动,找开始节点
        }

        // 同步阻塞循环,直到状态不再是 RUNNING
        while (GraphEngineStatus.RUNNING == status) {
            continueRun(null);
        }

        return new GraphRunResult(status, reason);
    }

    /** 停止工作流 */
    public void stop() {
        nodeInstanceMap.values().forEach(BaseNode::stop);
        status = GraphEngineStatus.STOPPED;
    }

    // ==================== 内部调度逻辑 ====================

    /**
     * 启动工作流
     * 遍历 allNodeDataMap 找到 START 节点,入队后进入调度循环
     */
    private void startRun() {
        status = GraphEngineStatus.RUNNING;

        // 遍历节点配置 Map,找 START 类型节点
        for (Map.Entry<String, BaseNodeData> entry : allNodeDataMap.entrySet()) {
            if (entry.getValue().getType().startsWith(NodeTypeEnum.START.getType())) {
                runQueue.add(entry.getKey());  // 只入队开始节点 ID
                break;
            }
        }

        if (runQueue.isEmpty()) {
            throw new IllegalStateException("工作流没有开始节点");
        }

        continueRun(null);
    }

    /**
     * 核心调度循环 —— DAG 遍历引擎
     *
     * 每轮循环处理 runQueue 中当前所有节点,按五种类型分别处理:
     * 优先级:前置交互 > 普通节点 > 后置交互 > 等待节点 > 结束节点
     */
    private void runGraph() {
        while (!runQueue.isEmpty() && status == GraphEngineStatus.RUNNING) {

            // 防无限循环保护
            if (monitor.getDoCount() >= graphMaxSteps) {
                status = GraphEngineStatus.FAILED;
                reason = "已达到最大执行步数: " + graphMaxSteps;
                break;
            }

            // ── 第一步:扫描分类 ──────────────────────────────────────
            List<String> preposeNodes      = new ArrayList<>();  // 前置交互节点
            List<String> noInteractiveNodes = new ArrayList<>();  // 普通节点
            List<String> postpositionNodes  = new ArrayList<>();  // 后置交互节点
            List<String> waitingNodes       = new ArrayList<>();  // 等待/聚合节点
            List<String> endNodes           = new ArrayList<>();  // 结束节点

            for (String nodeId : runQueue) {
                BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
                if (node == null) { status = GraphEngineStatus.FAILED; return; }

                switch (node.getInteractiveType()) {
                    case PERPOSE:       preposeNodes.add(nodeId);       break;
                    case POSTPOSITION:  postpositionNodes.add(nodeId);  break;
                    case WAITING:       waitingNodes.add(nodeId);       break;
                    case END:           endNodes.add(nodeId);           break;
                    default:            noInteractiveNodes.add(nodeId); break;
                }
            }

            // ── 第二步:前置交互节点(最高优先级,立即挂起)────────────
            if (!preposeNodes.isEmpty()) {
                String nodeId = preposeNodes.get(0);  // 每次只处理一个
                BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
                runQueue.remove(nodeId);
                handleUserInputNode(node);             // 设置 WAITING 状态
                return;                                // 立即返回,等待用户输入
            }

            // ── 第三步:普通节点(线程池并行执行)────────────────────────
            if (!noInteractiveNodes.isEmpty()) {
                executeNodesInThreadPool(noInteractiveNodes);  // 并行,allOf 等待
                for (String nodeId : noInteractiveNodes) {
                    BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
                    runQueue.remove(nodeId);
                    List<String> nextIds = handleNodeRunQueue(node);  // 后继节点入队
                    addToLastRunNodeList(nextIds);
                }
            }

            // ── 第四步:后置交互节点(串行,检测内部等待)────────────────
            if (!postpositionNodes.isEmpty()) {
                executePostpositionNodes(postpositionNodes);
            }

            // ── 第五步:等待/聚合节点(忙等策略)────────────────────────
            for (String nodeId : waitingNodes) {
                if (executeWaitingNode(nodeId)) {
                    // 前驱已全部完成,正常执行
                    BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
                    runQueue.remove(nodeId);
                    List<String> nextIds = handleNodeRunQueue(node);
                    addToLastRunNodeList(nextIds);
                } else {
                    // 前驱未完成,放回队尾,下轮再试(忙等)
                    runQueue.remove(nodeId);
                    runQueue.addLast(nodeId);
                }
            }

            // ── 第六步:结束节点(最低优先级,串行执行)─────────────────
            for (String nodeId : endNodes) {
                executeEndNode(nodeId);
                if (status != GraphEngineStatus.RUNNING) break;
            }
        }

        if (runQueue.isEmpty() && GraphEngineStatus.RUNNING == status) {
            status = GraphEngineStatus.COMPLETED;
        }
    }

    /**
     * 并行执行普通节点 —— 线程池 + CompletableFuture
     */
    private void executeNodesInThreadPool(List<String> nodeIds) {
        List<CompletableFuture<NodeRunResult>> futures = new ArrayList<>();

        for (String nodeId : nodeIds) {
            BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
            Locale locale = LocaleContextHolder.getLocale();  // 捕获主线程 Locale
            RunGraphNodeTask task = new RunGraphNodeTask(workflowDTO.getId(), node, locale);

            CompletableFuture<NodeRunResult> future = nodeThreadPool.submitNodeTask(task);
            futures.add(future);

            // 异步处理结果(非阻塞回调)
            future.thenAccept(result -> {
                if (result.getStatus() == NodeRunStatus.COMPLETED) {
                    synchronized (this) {  // 写共享状态需要同步
                        graphNodeLogs.addLogs(node.getNodeLogs());
                        monitor.incrementDoCount();
                        visitedNodeSet.add(nodeId);
                    }
                } else {
                    synchronized (this) {
                        status = GraphEngineStatus.FAILED;
                        reason = "节点执行失败: " + result.getReason();
                    }
                }
            });
        }

        try {
            // 主线程阻塞等待所有并行任务完成(最长 30 分钟)
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                    .get(30, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            futures.forEach(f -> { if (!f.isDone()) f.cancel(true); });
            status = GraphEngineStatus.FAILED;
            reason = "并行节点执行超时(30分钟)";
        } catch (Exception e) {
            status = GraphEngineStatus.FAILED;
            reason = "并行节点执行异常: " + e.getMessage();
        }
    }

    /**
     * 路由推进 —— 节点执行完毕后,将后继节点入队
     *
     * 规则:
     * - 普通节点 → addFirst(队首,优先处理)
     * - 结束节点 → addLast(队尾,最后处理)
     * - 已在队列中的节点不重复入队(防止并行分支重复添加同一个合流节点)
     */
    private List<String> handleNodeRunQueue(BaseNode node) {
        List<String> addedIds = new ArrayList<>();

        for (String targetId : node.routeNode()) {
            if (!runQueue.contains(targetId)) {
                if (targetId.startsWith(NodeTypeEnum.END.getType())) {
                    runQueue.addLast(targetId);
                } else {
                    runQueue.addFirst(targetId);
                }
                addedIds.add(targetId);
            }
            // 设置后继节点的来源,供条件判断使用
            getOrCreateNodeInstance(targetId, allNodeDataMap.get(targetId))
                    .setRunSourceNodeId(node.getId());
        }
        return addedIds;
    }

    /**
     * 节点懒加载 —— 首次访问时创建,之后复用缓存实例
     *
     * 创建时注入:出边列表(targetEdges)+ 入边列表(sourceEdges)
     * 出边用于路由决策,入边用于前驱完成检测
     */
    private BaseNode getOrCreateNodeInstance(String nodeId, BaseNodeData nodeData) {
        if (nodeInstanceMap.containsKey(nodeId)) {
            return nodeInstanceMap.get(nodeId);  // 缓存命中
        }

        List<EdgeBase> targetEdges = edgeManage.getTargetEdges(nodeId);  // 正向索引
        List<EdgeBase> sourceEdges = edgeManage.getSourceEdges(nodeId);  // 反向索引

        Map<String, Object> params = new HashMap<>();
        params.put("nodeData",    nodeData);
        params.put("workflowId",  workflowDTO.getId());
        params.put("userId",      userId);
        params.put("graphState",  graphState);
        params.put("targetEdges", targetEdges);
        params.put("sourceEdges", sourceEdges);
        params.put("maxSteps",    nodeMaxSteps);
        params.put("callback",    callback);

        BaseNode node = NodeFactory.instanceNode(
                Integer.parseInt(nodeData.getType()), params);
        nodeInstanceMap.put(nodeId, node);
        return node;
    }

    /**
     * 深拷贝 —— 为每个用户创建独立实例
     *
     * 深拷贝的字段:workflowDTO、allNodeDataMap(通过 JSON 序列化实现)
     * 清空的字段:nodeInstanceMap、runQueue、visitedNodeSet(全新执行)
     * 共享的字段:nodeThreadPool(线程池单例,全局共享)、callback
     */
    @Override
    public GraphEngine clone() {
        GraphEngine copy = new GraphEngine(this);  // 调用私有拷贝构造函数
        this.monitor.setLastCloneTime(System.currentTimeMillis());
        return copy;
    }

    private GraphEngine(GraphEngine source) {
        this.userId        = source.userId;
        this.asyncMode     = source.asyncMode;
        this.graphRunType  = source.graphRunType;
        this.graphMaxSteps = source.graphMaxSteps;
        this.nodeMaxSteps  = source.nodeMaxSteps;
        this.callback      = source.callback;
        this.nodeThreadPool = source.nodeThreadPool;  // 共享线程池

        // JSON 深拷贝(隔离数据)
        this.workflowDTO    = deepCopy(source.workflowDTO, FlowWsDTO.class);
        this.allNodeDataMap = deepCopyNodeMap(source.allNodeDataMap);
        this.edgeManage     = EdgeManage.fromEdges(source.workflowDTO.getEdges());

        this.graphState = new GraphState();  // 全新状态,完全隔离
        this.status     = source.status;
        // nodeInstanceMap、runQueue、visitedNodeSet 均为空(全新执行)
    }

    /**
     * 释放资源(AutoCloseable,支持 try-with-resources)
     */
    @Override
    public void close() {
        // 递归释放子流程引擎
        graphState.getReferGraphEnginList().forEach(GraphEngine::close);
        // 批量持久化节点运行日志
        graphNodeLogs.saveLogs();
        // 清理内存
        nodeInstanceMap.clear();
        runQueue.clear();
        visitedNodeSet.clear();
        graphState.clear();
    }
}

3.10 RunGraphEngineThreadPool — 引擎级线程池

设计思路:与 RunGraphNodeThreadPool 对称设计,但职责不同。一个负责整个工作流的异步执行,一个负责批量节点的并行执行,两者共同构成二级线程池架构。

package com.workflow.engine.graph.thread;

/**
 * 工作流引擎级执行线程池
 *
 * 与 RunGraphNodeThreadPool 的区别:
 * - 本类:每个用户会话的整个工作流在此线程中运行(引擎级)
 * - 节点池:单个工作流内多个并行节点在此线程中运行(节点级)
 *
 * 线程命名:run-graph-engine-thread-%d
 */
@Slf4j
@Component
public class RunGraphEngineThreadPool {

    @Value("${graph.engine.thread.pool.coreSize:16}")
    private Integer corePoolSize;

    @Value("${graph.engine.thread.pool.maxSize:32}")
    private Integer maxPoolSize;

    @Value("${graph.engine.thread.pool.keepAliveTime:60}")
    private Integer keepAliveTime;

    @Value("${graph.engine.thread.pool.workQueueSize:5000}")
    private Integer workQueueSize;

    private ExecutorService threadPool;

    @PostConstruct
    public void init() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("run-graph-engine-thread-%d")
                .build();

        threadPool = new ThreadPoolExecutor(
                corePoolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(workQueueSize),
                threadFactory,
                new ThreadPoolExecutor.AbortPolicy()
        );
    }

    public CompletableFuture<GraphRunResult> submitTask(RunGraphEngineTask task) {
        return CompletableFuture.supplyAsync(() -> {
            try { return task.call(); }
            catch (Exception e) { throw new RuntimeException(e); }
        }, threadPool);
    }

    @PreDestroy
    public void destroy() {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

两级线程池对比:

对比项 RunGraphEngineThreadPool RunGraphNodeThreadPool
线程名 run-graph-engine-thread-%d run-graph-node-thread-%d
执行粒度 整个工作流(一个会话一个线程) 单个节点(一批并行节点多个线程)
任务类型 RunGraphEngineTask RunGraphNodeTask
调用位置 GraphEngineServiceImpl.run() GraphEngine.executeNodesInThreadPool()
并发来源 多用户同时发起工作流 同一工作流内的并行节点

3.11 RunGraphEngineTask — 工作流启动任务

设计思路:工作流级别的门面,将 graphEngine.run() 的阻塞调用封装为 Callable,统一处理三个工作流生命周期回调事件,并解决跨线程的国际化和日志追踪问题。

package com.workflow.engine.graph.thread;

/**
 * 工作流执行任务 —— 引擎级 Callable
 *
 * 与 RunGraphNodeTask 的对比:
 * - 本类:包裹整个工作流,触发 onWorkflowStart/End/Error
 * - 节点任务:包裹单个节点,触发 onNodeStart/End/Error
 *
 * 两者都需要解决同一个问题:跨线程的 Locale 传递
 */
@Slf4j
public class RunGraphEngineTask implements Callable<GraphRunResult> {

    private final GraphEngine graphEngine;
    private final String chatId;
    private final Map<String, Object> inputData;
    private final Locale locale;          // 从主线程捕获,防止国际化错乱

    @Override
    public GraphRunResult call() {
        try {
            // ① 传递国际化语言到当前线程
            LocaleContextHolder.setLocale(locale);
            // ② 传递日志追踪 ID(chatId/webId 用于 MDC 日志链路追踪)
            LogMdcUtil.putChatId(graphEngine.getCallback().getChatId());

            // ③ 回调:通知前端工作流已启动(WebSocket 推送)
            graphEngine.getCallback().onWorkflowStart(chatId);

            // ④ 阻塞运行整个工作流(内部是 DAG 调度循环)
            GraphRunResult runResult = graphEngine.run(inputData);

            // ⑤ 根据最终状态触发对应回调
            GraphEngineStatus status = runResult.getStatus();
            if (GraphEngineStatus.FAILED == status) {
                graphEngine.getCallback().onWorkflowError(chatId, runResult.getReason());
            } else if (GraphEngineStatus.COMPLETED == status) {
                graphEngine.getCallback().onWorkflowEnd(chatId, status, runResult.getReason());
            }
            // WAITING 状态不触发结束回调,等待用户继续输入

            return runResult;

        } catch (Exception e) {
            log.error("工作流执行异常: chatId={}", chatId, e);
            graphEngine.getCallback().onWorkflowError(chatId, e.getMessage());
            return new GraphRunResult(GraphEngineStatus.FAILED, e.getMessage());
        } finally {
            // ⑥ 清理线程局部变量,防止线程复用时污染下一个任务
            LogMdcUtil.clear();
            LocaleContextHolder.resetLocaleContext();
        }
    }
}

三种结束状态的回调行为:

状态 触发回调 说明
COMPLETED onWorkflowEnd(COMPLETED) 正常结束,通知前端关闭进度条
FAILED onWorkflowError(reason) 异常结束,通知前端展示错误信息
WAITING 不触发结束回调 挂起等待,前端保持输入态
抛出异常 onWorkflowError(e.getMessage()) 未捕获异常兜底处理

3.12 GraphEngineServiceImpl — 会话管理层

设计思路:介于外部 API 和 GraphEngine 之间的服务层,核心职责是chatId 为键管理所有正在运行的工作流实例,提供启动、续跑、停止、状态查询四个对外接口,并承担超时检测的被调用方。

package com.workflow.engine.graph.service;

/**
 * 工作流引擎服务实现 —— 会话生命周期管理
 *
 * 核心数据结构:
 * graphEnginesMap: chatId → GraphEngine
 * 存放所有【正在运行中】的引擎实例
 * 工作流结束(COMPLETED/FAILED)或超时后,自动从 Map 中移除并释放资源
 */
@Service
public class GraphEngineServiceImpl {

    private static final long TIMEOUT_MS      = 180 * 60 * 1000L; // 超时阈值:3小时
    private static final int  WARN_COUNT      = 5000;              // 预警阈值:5000个会话

    /** 运行中会话注册表 */
    private final ConcurrentHashMap<String, GraphEngine> graphEnginesMap = new ConcurrentHashMap<>();

    @Autowired
    private RunGraphEngineThreadPool runGraphEngineThreadPool;

    // ==================== 对外接口 ====================

    /** 生产模式:通过 workflowId 从缓存/数据库加载工作流并运行 */
    public void startRunById(String workflowId, String userId, String chatId, BaseCallback callback) {
        GraphEngine engine = getOrCreateGraphEngine(PUBLISH, workflowId, userId, chatId, null, callback);
        run(chatId, engine, null);
    }

    /** 调试模式:直接传入工作流 JSON 数据运行 */
    public void startRunByData(FlowWsDTO workflowData, String userId, String chatId, BaseCallback callback) {
        GraphEngine engine = getOrCreateGraphEngine(DEBUG, null, userId, chatId, workflowData, callback);
        run(chatId, engine, null);
    }

    /** 续跑:用户输入后继续执行挂起的工作流 */
    public void continueRun(String chatId, Map<String, Object> inputData) {
        GraphEngine engine = graphEnginesMap.get(chatId);
        if (engine == null) { log.error("工作流不存在: {}", chatId); return; }
        run(chatId, engine, inputData);
    }

    /** 主动停止工作流 */
    public boolean stopGraphEngine(String chatId) {
        GraphEngine engine = graphEnginesMap.get(chatId);
        if (engine == null) return false;
        engine.stop();                    // 设置 stopFlag,通知所有节点停止
        closeWorkflow(chatId);            // 从注册表移除 + 释放资源
        engine.getCallback().onWorkflowEnd(chatId, FAILED, "用户主动停止");
        return true;
    }

    // ==================== 内部实现 ====================

    /**
     * 提交到引擎级线程池异步执行
     * 执行完成后(COMPLETED/FAILED)自动 closeWorkflow()
     */
    private void run(String chatId, GraphEngine engine, Map<String, Object> inputData) {
        Locale locale = LocaleContextHolder.getLocale();
        RunGraphEngineTask task = new RunGraphEngineTask(chatId, engine, inputData, locale);

        CompletableFuture<GraphRunResult> future = runGraphEngineThreadPool.submitTask(task);

        // 异步回调:执行完成后清理资源
        future.thenAccept(result -> {
            if (result.getStatus() == FAILED || result.getStatus() == COMPLETED) {
                closeWorkflow(chatId);
            }
        }).exceptionallyAsync(ex -> {
            closeWorkflow(chatId);
            return null;
        });
    }

    /**
     * 创建引擎实例 —— 双重检查锁,同一 chatId 只创建一次
     */
    private GraphEngine getOrCreateGraphEngine(GraphRunType type, String workflowId,
            String userId, String chatId, FlowWsDTO data, BaseCallback callback) {
        if (graphEnginesMap.containsKey(chatId)) {
            return graphEnginesMap.get(chatId); // 续跑场景:直接返回已有实例
        }
        synchronized (chatId.intern()) {
            GraphEngine engine = graphEnginesMap.get(chatId);
            if (engine == null) {
                GraphEngineFactory factory = GraphEngineFactory.getInstance();
                engine = (type == PUBLISH)
                        ? factory.getNewEngineInstanceById(userId, workflowId, false, callback)
                        : factory.getNewEngineInstanceByData(userId, data, false, callback);
                graphEnginesMap.put(chatId, engine);
            }
            return engine;
        }
    }

    /**
     * 关闭工作流 —— 从注册表移除 + 释放引擎资源
     */
    private void closeWorkflow(String chatId) {
        synchronized (chatId.intern()) {
            GraphEngine engine = graphEnginesMap.remove(chatId);
            if (engine != null) engine.close(); // 释放线程、状态、日志等资源
        }
    }

    /**
     * 健康检查 —— 由 CheckGraphEngineThread 每 10 分钟调用一次
     */
    public void checkGrapEngineHealth() {
        // 预警检查(不强制停止,只打日志)
        if (graphEnginesMap.size() > WARN_COUNT) {
            log.warn("当前会话数 {} 超过预警值 {}", graphEnginesMap.size(), WARN_COUNT);
        }
        // 超时强制关闭
        graphEnginesMap.forEach((chatId, engine) -> {
            long lifeTime = System.currentTimeMillis() - engine.getMonitor().getCreateTime();
            if (lifeTime > TIMEOUT_MS) {
                engine.getCallback().onWorkflowEnd(chatId, FAILED, "工作流超时(3小时)");
                closeWorkflow(chatId);
                log.info("超时会话已强制关闭: chatId={}, lifeTime={}min", chatId, lifeTime/60000);
            }
        });
    }
}

3.13 CheckGraphEngineThread — 看门狗线程

设计思路:独立的守护线程,每 10 分钟调用一次 checkGrapEngineHealth(),配合 CheckGraphEngineException(未捕获异常处理器)实现自愈式看门狗——看门狗线程自己崩了会自动重启。

package com.workflow.engine.graph.thread;

/**
 * 工作流超时检测线程 —— 看门狗
 *
 * 两个特性:
 * 1. 无限循环:永不退出,持续监控所有运行中的工作流
 * 2. 自愈:注册 UncaughtExceptionHandler,异常退出后自动重启新线程
 */
public class CheckGraphEngineThread extends Thread {

    @Override
    public void run() {
        // 关键:注册未捕获异常处理器,保证线程崩了能自动重启
        Thread.currentThread().setUncaughtExceptionHandler(new CheckGraphEngineException());

        while (true) {
            try {
                // 调用健康检查(超时 → 强制关闭 + 通知前端)
                GraphEngineServiceImpl service = SpringContextHolder.getBean(GraphEngineServiceImpl.class);
                if (service != null) {
                    service.checkGrapEngineHealth();
                }
                Thread.sleep(10 * 60 * 1000); // 每 10 分钟检查一次
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 保留中断标志
            }
        }
    }
}

/**
 * 看门狗线程的异常处理器 —— 实现自愈
 *
 * 当 CheckGraphEngineThread 因未捕获异常退出时,
 * 此处理器自动创建并启动一个新的看门狗线程
 */
@Slf4j
public class CheckGraphEngineException implements Thread.UncaughtExceptionHandler {

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        log.error("看门狗线程异常退出,正在重启: {}", e.getMessage());
        // 重启新的看门狗线程
        new CheckGraphEngineThread().start();
    }
}

看门狗完整工作流程:

应用启动(@PostConstruct 或 ApplicationRunner)
         │
         ▼
CheckGraphEngineThread.start()
         │
         └── while(true)
               │
               ├── checkGrapEngineHealth()
               │     ├── graphEnginesMap.size() > 5000 → log.warn(预警)
               │     └── 遍历所有引擎
               │           lifeTime > 3小时
               │             → onWorkflowEnd(FAILED, "超时")
               │             → closeWorkflow()(释放资源)
               │
               └── sleep(10分钟)

         如果 while 内部抛出 RuntimeException 等未捕获异常
                    ↓
         UncaughtExceptionHandler.uncaughtException()
                    ↓
         new CheckGraphEngineThread().start()  ← 自动重启!

四、完整调用链路

4.1 整体架构分层图

┌─────────────────────────────────────────────────────────────────────┐
│                      外部请求(HTTP / WebSocket)                     │
└─────────────────────────────┬───────────────────────────────────────┘
                               │ startRunById / startRunByData / continueRun
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    GraphEngineServiceImpl                            │
│   会话注册表:graphEnginesMap(chatId → GraphEngine)                │
│   职责:会话生命周期管理、超时健康检查的被调用方                       │
└──────────────────────────────┬──────────────────────────────────────┘
                                │ submitTask(RunGraphEngineTask)
                                ▼
               ┌────────────────────────────────┐
               │   RunGraphEngineThreadPool      │  ← 引擎级线程池
               │   run-graph-engine-thread-%d    │    每会话一线程
               └───────────────┬────────────────┘
                                │ call()
                                ▼
                  ┌─────────────────────────────┐
                  │     RunGraphEngineTask       │  ← 工作流门面
                  │   onStart / onEnd / onError  │    Locale + MDC 跨线程
                  └──────────────┬──────────────┘
                                  │ graphEngine.run()
                                  ▼
               ┌──────────────────────────────────────┐
               │            GraphEngine               │  ← DAG 调度核心
               │  runQueue / graphState               │
               │  六步分类 / handleNodeRunQueue        │
               └───────────────┬──────────────────────┘
                                │ executeNodesInThreadPool()
                                ▼
               ┌────────────────────────────────┐
               │   RunGraphNodeThreadPool        │  ← 节点级线程池
               │   run-graph-node-thread-%d      │    批量并行节点
               └───────────────┬────────────────┘
                                │ call()
                                ▼
                  ┌─────────────────────────────┐
                  │      RunGraphNodeTask        │  ← 节点执行单元
                  │   lifecycle钩子 / node.run() │    Locale + MDC 跨线程
                  └─────────────────────────────┘

─────────────────────── 独立监控链路 ─────────────────────────────────

  CheckGraphEngineThread(每 10 分钟)
       └── checkGrapEngineHealth()
             ├── 会话数 > 5000 → log.warn 预警
             └── 存活 > 3小时 → onWorkflowEnd(FAILED) + closeWorkflow()
       └── CheckGraphEngineException(崩了自动重启新线程)

4.2 单次工作流执行详细流程

外部请求 startRunById(workflowId, userId, chatId, callback)
    │
    ▼ GraphEngineServiceImpl
    ├── getOrCreateGraphEngine()
    │     ├── graphEnginesMap.containsKey(chatId)? → 直接返回(续跑)
    │     └── 否则 双重检查锁创建:
    │           GraphEngineFactory.getInstance()
    │             └── templateCache.get(workflowId)
    │                   命中 → clone() 深拷贝新实例
    │                   未命中 → loadFromDb() → createEngine() → 放缓存 → clone()
    │
    └── run(chatId, engine, null)
          └── 封装 RunGraphEngineTask,提交引擎级线程池(非阻塞返回)
                resultFuture.thenAccept: COMPLETED/FAILED → closeWorkflow()

                在 run-graph-engine-thread-N 线程中:
                RunGraphEngineTask.call()
                  ├── LocaleContextHolder.setLocale(locale)  // 传递国际化
                  ├── callback.onWorkflowStart(chatId)       // 通知前端启动
                  ├── graphEngine.run(null)                  // 阻塞运行
                  │       ├── startRun()
                  │       │     └── 遍历 allNodeDataMap 找 START 节点
                  │       │         runQueue.add(startNodeId)
                  │       │
                  │       └── while(RUNNING) → runGraph()
                  │               │
                  │               ├─ 扫描 runQueue,按 getInteractiveType() 分类
                  │               │
                  │               ├─ [PERPOSE] → handleUserInputNode()
                  │               │   status=WAITING, return ← 挂起等用户
                  │               │   (下次 continueRun(inputData) 恢复)
                  │               │
                  │               ├─ [NOINTERACTIVE] → executeNodesInThreadPool()
                  │               │   ├── 每节点封装 RunGraphNodeTask
                  │               │   ├── 提交节点级线程池(run-graph-node-thread-N)
                  │               │   │     call(): lifecycle_before → run() → lifecycle_after
                  │               │   │     结果写入 graphState.setVariable(nodeId, key, value)
                  │               │   └── allOf().get(30min) 主线程阻塞等待所有并行完成
                  │               │       完成 → handleNodeRunQueue() → 后继节点入队
                  │               │
                  │               ├─ [POSTPOSITION] → 串行执行,检测内部等待
                  │               │
                  │               ├─ [WAITING] → canContinueExecution()?
                  │               │   true  → 执行 → 后继入队
                  │               │   false → 放回队尾(忙等)
                  │               │
                  │               └─ [END] → executeEndNode()
                  │                           status = COMPLETED
                  │
                  ├── status==COMPLETED → callback.onWorkflowEnd()
                  ├── status==FAILED    → callback.onWorkflowError()
                  └── finally: LogMdcUtil.clear() + resetLocaleContext()

五、关键设计亮点总结

5.1 工厂模式 + 模板缓存 + 深拷贝(解决多用户并发)

问题:每个用户都要运行同一个工作流,如果每次都解析 JSON 很慢,
      如果共享同一个引擎实例又会有状态污染。

方案:工厂维护"模板缓存池"(workflowId → 已解析的 GraphEngine)
      每个用户请求时,从模板 clone() 出全新实例
      → 解析只做一次(O(1) 查缓存),状态完全隔离

5.2 动态推进式 DAG 遍历(懒加载 + BFS 变体)

问题:工作流结构复杂,有条件分支、循环、子流程,
      无法提前预计算完整拓扑序。

方案:只将开始节点入队,每个节点执行完后才将后继节点入队
      → 支持运行时动态决策(条件节点、意图节点)
      → 自然支持循环(节点可以将自身前驱再次入队)

5.3 节点五分类调度(策略模式替代 if-else)

问题:不同节点需要不同的调度策略(并行、挂起、合流),
      如果在引擎里写大量 instanceof 判断,耦合度高难以扩展。

方案:NodeInteractiveType 枚举 + BaseNode.getInteractiveType() 多态
      → 新增节点只需重写 getInteractiveType(),引擎代码零修改(开闭原则)

5.4 生命周期钩子(模板方法模式)

问题:不同节点在执行前后、用户输入前后,有各自的初始化、
      校验、清理逻辑,但这些逻辑又不应该写进引擎里。

方案:BaseNode 定义 lifecycle_* 系列钩子,默认实现为空,
      子类按需重写
      → 引擎调用统一接口,节点自己决定行为

5.5 线程安全的状态共享

问题:并行执行的节点需要读写共享的 GraphState,
      如何保证线程安全且不相互干扰?

方案:
  1. ConcurrentHashMap 保证并发写入安全
  2. nodeId 作为一级 Key 分区,各节点变量天然隔离
  3. 读写共享计数器(doCount、visitedNodeSet)时加 synchronized
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐