Java版本工作流图执行引擎(GraphEngine)
工作流图执行引擎(GraphEngine)—— 源码解析与面试指南
本文档面向两类读者:
- 学习者:系统了解基于 DAG 的工作流引擎的完整设计思路与实现
注:文档中的代码已做脱敏处理,包名统一改为
com.workflow,内部工具类替换为标准实现。
目录
- 一、整体架构概览
- 二、核心类清单
- 三、核心类源码解析
- 3.1 EdgeManage — 边管理与倒排索引
- 3.2 GraphState — 运行时状态管理
- 3.3 NodeInteractiveType — 节点交互类型枚举
- 3.4 BaseNode — 节点抽象基类
- 3.5 NodeFactory — 节点工厂
- 3.6 RunGraphNodeThreadPool — 节点并行执行线程池
- 3.7 RunGraphNodeTask — 节点执行任务
- 3.8 GraphEngineFactory — 引擎工厂
- 3.9 GraphEngine — 图执行引擎核心
- 3.10 RunGraphEngineThreadPool — 引擎级线程池
- 3.11 RunGraphEngineTask — 工作流启动任务
- 3.12 GraphEngineServiceImpl — 会话管理层
- 3.13 CheckGraphEngineThread — 看门狗线程
- 四、完整调用链路
- 五、关键设计亮点总结
一、整体架构概览
1.1 系统位置
前端设计器 (React Flow)
│ 保存工作流 JSON
▼
后端 API 层
│ 解析 FlowWsDTO
▼
GraphEngineFactory ← 唯一入口,工厂 + 缓存
│ clone() 深拷贝
▼
GraphEngine ← 图执行引擎核心
├── EdgeManage ← 边的倒排索引(DAG 结构)
├── GraphState ← 运行时共享状态
├── NodeFactory ← 按类型实例化节点
├── RunGraphNodeThreadPool ← 并行执行线程池
└── BaseNode 子类群 ← 各种具体节点实现
1.2 工作流执行状态机
首次调用 run()
│
▼
WAITING(初始)
│ startRun()
▼
RUNNING(运行中)
/ │ \ \
/ │ \ \
WAITING COMPLETED FAILED STOPPED
(遇到交互 (队列为空 (节点异常 (用户主动
节点) 正常结束) 或超步数) 停止)
│
│ continueRun(inputData)
▼
RUNNING
1.3 DAG 执行时序示例
工作流结构:
[START] → [LLM节点A] → [RAG节点B] ─┐
→ [代码节点C] ─┤→ [聚合节点D] → [END]
→ [SQL节点E] ─┘
执行时序(主线程视角):
轮次1: runQueue=[START] → 执行START → 入队[A]
轮次2: runQueue=[A] → 执行A → 入队[B,C,E]
轮次3: runQueue=[B,C,E] → B/C/E并行 → 各自入队[D](去重)
轮次4: runQueue=[D(WAITING)]→ 前驱全完 → 执行D → 入队[END]
轮次5: runQueue=[END] → 执行END → COMPLETED
二、核心类清单
| 层级 | 类名 | 作用简述 |
|---|---|---|
| 核心引擎 | GraphEngine |
图调度引擎,驱动整个工作流运行 |
| 核心引擎 | GraphEngineFactory |
工厂单例,预加载模板+深拷贝创建实例 |
| 核心引擎 | GraphState |
运行时全局状态,节点间变量共享 |
| 边管理 | EdgeManage |
维护正向/反向倒排索引,支持 DAG 遍历 |
| 边管理 | EdgeBase |
边数据结构(源节点、目标节点、条件等) |
| 节点层 | BaseNode |
节点抽象基类,定义生命周期钩子和路由接口 |
| 节点层 | NodeFactory |
策略模式节点工厂,按类型创建具体节点 |
| 并行执行 | RunGraphNodeThreadPool |
Spring Bean 线程池,负责节点并行调度 |
| 并行执行 | RunGraphNodeTask |
节点执行任务封装(Callable) |
| 枚举/模型 | NodeInteractiveType |
节点交互类型(5种),决定调度策略 |
| 枚举/模型 | GraphEngineStatus |
引擎状态枚举 |
| 枚举/模型 | GraphEngineMonitor |
运行监控信息(步数、耗时等) |
三、核心类源码解析
3.1 EdgeManage — 边管理与倒排索引
设计思路:在引擎初始化时,一次性将所有边构建成两个 HashMap,空间换时间,运行期 O(1) 查询。
package com.workflow.engine.edges;
/**
* 边管理类 —— 核心是两个倒排索引
*
* 设计目的:避免每次查找边时遍历全量边列表(O(n)),
* 预先建立 HashMap 后运行期 O(1) 查询
*/
public class EdgeManage {
/**
* 正向索引:sourceNodeId → List<出边>
* 用途:节点执行完毕后,查找它的所有后继节点
*/
private Map<String, List<EdgeBase>> nodeTargetEdges = new HashMap<>();
/**
* 反向索引:targetNodeId → List<入边>
* 用途:节点被创建时,注入其所有入边(前驱信息)
* 节点内部可用来判断前驱是否全部执行完成
*/
private Map<String, List<EdgeBase>> nodeSourceEdges = new HashMap<>();
public EdgeManage(List<EdgeBase> edges) {
buildEdgeCache(edges);
}
/** 构建时一次性建立两个索引 */
private void buildEdgeCache(List<EdgeBase> edges) {
for (EdgeBase edge : edges) {
// 正向:source → [edge1, edge2, ...]
nodeTargetEdges.computeIfAbsent(edge.getSourceId(), k -> new ArrayList<>()).add(edge);
// 反向:target → [edge1, edge2, ...]
nodeSourceEdges.computeIfAbsent(edge.getTargetId(), k -> new ArrayList<>()).add(edge);
}
}
/** 获取某节点的所有出边(找后继) */
public List<EdgeBase> getTargetEdges(String sourceId) {
return nodeTargetEdges.getOrDefault(sourceId, new ArrayList<>());
}
/** 获取某节点的所有入边(找前驱) */
public List<EdgeBase> getSourceEdges(String targetId) {
return nodeSourceEdges.getOrDefault(targetId, new ArrayList<>());
}
/** 按条件(sourceHandle)过滤出边,用于条件分支路由 */
public List<EdgeBase> getTargetEdgesByOutputType(String sourceId, String outputType) {
return getTargetEdges(sourceId).stream()
.filter(edge -> outputType == null || outputType.equals(edge.getSourceHandleType()))
.collect(Collectors.toList());
}
}
3.2 GraphState — 运行时状态管理
设计思路:以 nodeId 为一级 Key 分区存储,天然隔离各节点的变量,同时使用 ConcurrentHashMap 保证并行节点写入的线程安全。
package com.workflow.engine.graph;
/**
* 图状态类 —— 工作流运行期间的全局共享状态
*
* 核心设计:
* 1. 双层 ConcurrentHashMap:外层 key=nodeId,内层 key=变量名
* → 各节点变量天然分区,并行节点写入互不干扰
* 2. transferVariables:主流程向子流程传参的专用通道
* 3. referGraphEnginList:持有子流程 GraphEngine 引用,用于 close() 时递归释放
*/
public class GraphState {
/** 节点变量存储:nodeId → (varName → value) */
private Map<String, Map<String, Object>> nodeVariables = new ConcurrentHashMap<>();
/** 对话上下文历史 */
private List<Map<String, String>> chatContext = new ArrayList<>();
/** 主流程向子流程传输的变量:nodeId → data */
private Map<String, Map<String, Object>> transferVariables = new ConcurrentHashMap<>();
/** 本流程引用的子流程引擎列表(用于 close() 时递归释放资源) */
private List<GraphEngine> referGraphEnginList = new ArrayList<>();
/** 设置变量,以 nodeId 为分区键 */
public void setVariable(String nodeId, String name, Object value) {
nodeVariables.computeIfAbsent(nodeId, k -> new ConcurrentHashMap<>()).put(name, value);
}
/**
* 获取变量,优先从 nodeVariables 找,找不到再从 transferVariables 找
* transferVariables 存放的是主流程传入的参数
*/
public Object getVariable(String nodeId, String name) {
Map<String, Object> vars = nodeVariables.getOrDefault(nodeId,
transferVariables.get(nodeId));
return vars != null ? vars.get(name) : null;
}
/**
* 通过变量路径字符串获取值
* 格式:"nodeId.varName",例如 "llm_001.output"
*/
public Object getVariableByStr(String variablePath) {
String[] parts = variablePath.split("\\.");
if (parts.length != 2) return null;
return getVariable(parts[0], parts[1]);
}
/** 保存对话上下文(human/ai 角色) */
public void saveContext(String message, RoleType role) {
Map<String, String> item = new HashMap<>();
item.put("message", message);
item.put("role", role.getType());
item.put("timestamp", String.valueOf(System.currentTimeMillis()));
chatContext.add(item);
}
/** 释放所有状态(引擎 close() 时调用) */
public void clear() {
nodeVariables.clear();
chatContext.clear();
transferVariables.clear();
referGraphEnginList.forEach(GraphEngine::close);
}
}
3.3 NodeInteractiveType — 节点交互类型枚举
设计思路:用枚举定义 5 种调度策略,引擎通过多态(getInteractiveType())而非 instanceof 判断决定如何调度,符合开闭原则。
package com.workflow.engine.common.enums;
/**
* 节点交互类型枚举 —— 决定节点在图引擎中的调度策略
*
* 这是整个调度系统的核心分类依据,引擎根据此类型决定:
* - 串行 or 并行
* - 是否挂起等待用户输入
* - 执行顺序优先级
*/
public enum NodeInteractiveType {
/**
* 前置交互节点
* 需要先收集用户输入,再执行节点
* 典型代表:InputNode(用户输入节点)、OutputNode(选择/输入模式)
* 调度行为:优先处理,立即挂起引擎(status=WAITING),等待用户输入后恢复
*/
PERPOSE(1, "prepose", "交互前置节点"),
/**
* 后置交互节点
* 先执行节点,执行过程中可能触发内部等待
* 典型代表:SubWorkFlowNode(子工作流节点)、ParamCollectorNode(参数收集节点)
* 调度行为:串行执行,执行完后检查是否有内部等待状态
*/
POSTPOSITION(2, "postposition", "交互后置节点"),
/**
* 非交互节点(默认类型)
* 无需用户交互,纯计算型节点
* 典型代表:LLMNode、RagNode、CodeNode、SqlExecutorNode、ToolNode 等
* 调度行为:同批节点全部提交线程池并行执行
*/
NOINTERACTIVE(3, "noInteractive", "非交互节点"),
/**
* 结束节点
* 工作流的终止节点
* 典型代表:EndNode
* 调度行为:始终加入队尾,在所有其他节点完成后最后串行执行
*/
END(4, "end", "结束节点"),
/**
* 等待/聚合节点(合流节点)
* 需要等待所有上游并行分支全部完成后才能执行
* 典型代表:RunWaitingNode(聚合节点)
* 调度行为:每轮检查前驱是否完成,未完成则放回队尾(忙等策略)
*/
WAITING(5, "polymerize", "等待节点");
private final Integer value;
private final String type;
private final String desc;
NodeInteractiveType(Integer value, String type, String desc) {
this.value = value;
this.type = type;
this.desc = desc;
}
// getter 方法省略
}
各节点类型与 InteractiveType 对应关系:
| 节点类型 | InteractiveType | 是否重写 getInteractiveType() | 特殊说明 |
|---|---|---|---|
| StartNode | NOINTERACTIVE |
否(继承默认值) | — |
| EndNode | END |
是 | 固定返回 END |
| InputNode | PERPOSE / NOINTERACTIVE |
是 | 动态:无预设值→PERPOSE,有预设值→NOINTERACTIVE |
| OutputNode | PERPOSE / NOINTERACTIVE |
是 | 动态:选择/输入模式→PERPOSE,纯展示→NOINTERACTIVE |
| LLMNode | NOINTERACTIVE |
否 | — |
| RagNode | NOINTERACTIVE |
否 | — |
| CodeNode | NOINTERACTIVE |
否 | — |
| SubWorkFlowNode | POSTPOSITION |
是 | 子流程内部可能触发等待 |
| ParamCollectorNode | POSTPOSITION |
是 | 运行中动态收集参数 |
| RunWaitingNode | WAITING |
是 | 合流等待节点 |
3.4 BaseNode — 节点抽象基类
设计思路:模板方法模式,run() 定义执行骨架(生命周期回调、变量保存、日志),_run() 由子类实现具体逻辑。生命周期钩子(lifecycle_*)允许节点自定义交互行为,消除引擎中的 instanceof 判断。
package com.workflow.engine.nodes;
/**
* 节点抽象基类
*
* 核心设计模式:
* 1. 模板方法模式:run() 定义执行骨架,_run() 由子类实现
* 2. 生命周期钩子:lifecycle_* 方法允许子类自定义行为,引擎通过多态调用
* 3. 多态路由:routeNode() 默认返回所有出边目标,条件节点可重写为按条件路由
*/
public abstract class BaseNode {
protected String id; // 节点ID
protected String type; // 节点类型字符串
protected String name; // 节点名称
protected List<EdgeBase> targetEdges; // 出边列表(正向索引注入)
protected List<EdgeBase> sourceEdges; // 入边列表(反向索引注入)
protected GraphState graphState; // 全局状态(共享)
protected BaseNodeData nodeData; // 节点配置数据
protected Map<String, Object> nodeParams = new HashMap<>(); // 展平后的参数
protected BaseCallback callback; // 回调接口(WebSocket推送)
protected String runSourceNodeId; // 执行来源节点ID(由引擎设置)
protected boolean stopFlag = false; // 停止标志
/**
* 节点执行入口 —— 模板方法
* 定义固定骨架:回调通知 → 执行 → 保存变量 → 日志
*/
public Map<String, Object> run(Map<String, Object> state) {
if (stopFlag) throw new RuntimeException("节点已被停止");
String execId = UUID.randomUUID().toString().replaceAll("-", "");
// 1. 通知回调:节点开始(用于 WebSocket 实时推送)
callback.onNodeStart(NodeStartData.builder().nodeId(id).name(name).build());
Map<String, Object> result = null;
try {
// 2. 执行子类实现的具体逻辑(模板方法核心)
result = _run(execId);
// 3. 将执行结果保存到 GraphState(以 nodeId 分区)
if (result != null) {
result.forEach((key, value) -> {
graphState.setVariable(id, key, value);
state.put(key, value);
});
}
} catch (Exception e) {
callback.onNodeError(this, e.getMessage());
throw e;
} finally {
// 4. 通知回调:节点结束(附带日志和耗时)
callback.onNodeEnd(NodeEndData.builder()
.nodeId(id).logData(parseLog(execId, result)).build());
}
return state;
}
/**
* 子类实现具体执行逻辑 —— 模板方法的变化点
*/
public abstract Map<String, Object> _run(String uniqueId);
/**
* 路由决策 —— 决定下一步执行哪些节点
*
* 默认实现:返回所有出边的目标节点(普通顺序节点)
* 重写场景:条件节点、意图识别节点 —— 根据运行结果只返回满足条件的分支
*
* 对应 LangGraph 的 conditional_edge function
*/
public List<String> routeNode() {
if (targetEdges != null && !targetEdges.isEmpty()) {
return targetEdges.stream()
.map(EdgeBase::getTargetId)
.collect(Collectors.toList());
}
return new ArrayList<>();
}
/**
* 返回节点的调度类型 —— 多态替代 instanceof
* 默认:NOINTERACTIVE(非交互节点)
* 子类按需重写以声明不同的调度行为
*/
public NodeInteractiveType getInteractiveType() {
return NodeInteractiveType.NOINTERACTIVE;
}
// ==================== 生命周期钩子 ====================
// 引擎在特定时机调用这些钩子,子类可重写以实现自定义行为
// 好处:消除引擎中的 if-else/instanceof 类型判断,符合开闭原则
/** 是否需要用户输入 */
public boolean lifecycle_needsUserInput() { return false; }
/** 获取等待原因描述(用于前端展示) */
public String lifecycle_getWaitingReason() { return "等待用户输入"; }
/** 准备用户输入(发送交互消息、设置输入模式等) */
public void lifecycle_prepareUserInput() {}
/** 用户输入完成后的后处理(验证输入、更新状态等) */
public void lifecycle_afterUserInput() {}
/** 是否需要特殊路由处理(条件节点、意图节点等) */
public boolean lifecycle_needsSpecialRouting() { return false; }
/** 是否可以继续执行(合流节点检查前驱是否完成) */
public boolean canContinueExecution() { return true; }
/** 节点执行前的准备工作 */
public Map<String, Object> lifecycle_beforeExecution(Map<String, Object> state) { return state; }
/** 节点执行后的后处理 */
public Map<String, Object> lifecycle_afterExecution(Map<String, Object> state,
Map<String, Object> result) { return result; }
}
3.5 NodeFactory — 节点工厂
设计思路:策略模式 + 函数式编程,用 EnumMap<NodeTypeEnum, Function<Context, BaseNode>> 替代 if-else/switch,新增节点类型只需在 static{} 块中加一行注册,引擎代码零改动。
package com.workflow.engine.nodes;
/**
* 节点工厂类 —— 策略模式
*
* 核心设计:
* EnumMap 存储 "节点类型 → 创建函数" 的映射
* 新增节点:只需在 static 块加一行注册,无需修改任何其他代码(开闭原则)
*/
public class NodeFactory {
/**
* 节点创建器注册表
* key: 节点类型枚举, value: 创建该类型节点的 Lambda 函数
*/
private static final Map<NodeTypeEnum, Function<NodeCreationContext, BaseNode>> NODE_CREATORS
= new EnumMap<>(NodeTypeEnum.class);
static {
// 基础节点
NODE_CREATORS.put(NodeTypeEnum.START, ctx -> new StartNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.END, ctx -> new EndNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.INPUT, ctx -> new InputNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.OUTPUT, ctx -> new OutputNode(ctx.getNodeData(), ...));
// AI 能力节点
NODE_CREATORS.put(NodeTypeEnum.LLM, ctx -> new LLMNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.RAG, ctx -> new RagNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.AGENT, ctx -> new AgentNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.TOOL, ctx -> new ToolNode(ctx.getNodeData(), ...));
// 逻辑控制节点
NODE_CREATORS.put(NodeTypeEnum.CONDITION, ctx -> new ConditionNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.INTENTION, ctx -> new IntentRecognitionNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.RUNWAIT, ctx -> new RunWaitingNode(ctx.getNodeData(), ...));
// 数据处理节点
NODE_CREATORS.put(NodeTypeEnum.CODE, ctx -> new CodeNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.SQL_EXECUTOR, ctx -> new SqlExecutorNode(ctx.getNodeData(), ...));
NODE_CREATORS.put(NodeTypeEnum.VAR_EXTRACTOR, ctx -> new VarExtractorNode(ctx.getNodeData(), ...));
// 工作流节点
NODE_CREATORS.put(NodeTypeEnum.WORKFLOW, ctx -> new SubWorkFlowNode(ctx.getNodeData(), ...));
// ... 其他节点类型
}
/**
* 创建节点实例
* @param nodeType 节点类型整数值
* @param params 包含创建节点所需全部信息的 Map
*/
public static BaseNode instanceNode(Integer nodeType, Map<String, Object> params) {
NodeTypeEnum type = NodeTypeEnum.fromValue(nodeType);
if (type == null) {
throw new RuntimeException("不支持的节点类型: " + nodeType);
}
Function<NodeCreationContext, BaseNode> creator = NODE_CREATORS.get(type);
if (creator == null) {
throw new RuntimeException("未找到节点创建器: " + type);
}
NodeCreationContext context = new NodeCreationContext(
(BaseNodeData) params.get("nodeData"),
(String) params.get("workflowId"),
(String) params.get("userId"),
(GraphState) params.get("graphState"),
(List<EdgeBase>) params.get("targetEdges"),
(List<EdgeBase>) params.get("sourceEdges"),
(Integer) params.getOrDefault("maxSteps", 100),
(BaseCallback) params.get("callback")
);
return creator.apply(context);
}
}
3.6 RunGraphNodeThreadPool — 节点并行执行线程池
设计思路:Spring 单例 Bean,全局唯一线程池,所有并行节点共享,避免为每个工作流创建独立线程池导致的资源浪费。
package com.workflow.engine.graph.thread;
/**
* 工作流节点并行执行线程池
*
* 设计要点:
* 1. Spring @Component 单例,全局共享,避免重复创建
* 2. 参数外部化配置,支持生产环境调优
* 3. @PreDestroy 优雅关闭,等待任务完成后再停止
*/
@Component
public class RunGraphNodeThreadPool {
@Value("${graph.node.thread.pool.coreSize:16}")
private Integer corePoolSize;
@Value("${graph.node.thread.pool.maxSize:32}")
private Integer maxPoolSize;
@Value("${graph.node.thread.pool.keepAliveTime:60}")
private Integer keepAliveTime;
@Value("${graph.node.thread.pool.workQueueSize:5000}")
private Integer workQueueSize;
private ExecutorService threadPool;
@PostConstruct
public void init() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("run-graph-node-thread-%d") // 线程命名,方便排查问题
.build();
threadPool = new ThreadPoolExecutor(
corePoolSize, // 核心线程数:16
maxPoolSize, // 最大线程数:32
keepAliveTime, TimeUnit.SECONDS, // 空闲存活时间:60s
new LinkedBlockingQueue<>(workQueueSize), // 有界队列:5000
threadFactory,
new ThreadPoolExecutor.AbortPolicy() // 队满直接拒绝,避免无限堆积
);
}
/**
* 提交节点任务,返回 CompletableFuture
* 调用方可以 .thenAccept() 异步处理结果,或 allOf().get() 阻塞等待
*/
public CompletableFuture<NodeRunResult> submitNodeTask(RunGraphNodeTask task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, threadPool);
}
@PreDestroy
public void destroy() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // 超时后强制关闭
}
} catch (InterruptedException e) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
3.7 RunGraphNodeTask — 节点执行任务
设计思路:将节点执行封装为 Callable<NodeRunResult>,与线程池解耦。注意跨线程的国际化语言传递:主线程的 Locale 必须手动传入子线程。
package com.workflow.engine.graph.thread;
/**
* 节点执行任务 —— 线程池的执行单元
*
* 关键点:跨线程的 Locale 传递
* Spring 的 LocaleContextHolder 是 ThreadLocal 实现,
* 子线程无法自动继承,需要在创建任务时捕获,在 call() 中手动设置
*/
public class RunGraphNodeTask implements Callable<NodeRunResult> {
private final BaseNode node;
private final String workflowId;
private final Locale locale; // 从主线程捕获,传入子线程
public RunGraphNodeTask(String workflowId, BaseNode node, Locale locale) {
this.workflowId = workflowId;
this.node = node;
this.locale = locale;
}
@Override
public NodeRunResult call() {
try {
// 关键:将主线程的语言设置传递到当前子线程
LocaleContextHolder.setLocale(locale);
// 执行节点生命周期
Map<String, Object> state = new HashMap<>();
state = node.lifecycle_beforeExecution(state); // 前置钩子
Map<String, Object> result = node.run(state); // 执行节点
node.lifecycle_afterExecution(state, result); // 后置钩子
return new NodeRunResult(NodeRunStatus.COMPLETED, "执行成功");
} catch (Exception e) {
log.error("节点执行失败: workflowId={}, nodeId={}", workflowId, node.getId(), e);
return new NodeRunResult(NodeRunStatus.FAILED, e.getMessage());
} finally {
// 重置线程局部变量,避免线程复用时污染下一个任务
LocaleContextHolder.resetLocaleContext();
}
}
}
3.8 GraphEngineFactory — 引擎工厂
设计思路:双重检查锁单例 + 模板缓存池。工作流发布后,引擎模板预加载进内存;用户调用时通过深拷贝创建全新实例,既复用解析结果,又保证状态隔离。
package com.workflow.engine.graph;
/**
* 图引擎工厂类 —— 唯一创建 GraphEngine 实例的入口
*
* 核心设计:
* 1. 单例工厂(双重检查锁)
* 2. 模板缓存池:已发布工作流的引擎模板预加载,避免重复解析
* 3. 深拷贝创建实例:每个用户请求获得独立实例,状态完全隔离
* 4. LRU 淘汰:缓存超过 100 个时,淘汰最久未使用的模板
*/
public class GraphEngineFactory {
private static final int CACHE_MAX_SIZE = 100;
/** 工作流模板缓存:workflowId → 模板引擎(只读,不执行) */
private static final ConcurrentHashMap<String, GraphEngine> templateCache
= new ConcurrentHashMap<>();
private static volatile GraphEngineFactory instance;
public static GraphEngineFactory getInstance() {
if (instance == null) {
synchronized (GraphEngineFactory.class) {
if (instance == null) {
instance = new GraphEngineFactory();
startCacheMonitorThread(); // 启动缓存淘汰后台线程
}
}
}
return instance;
}
/**
* 获取可执行的 GraphEngine 新实例(生产环境入口)
*
* 流程:查缓存 → 未命中则从数据库加载 → 深拷贝创建新实例
* 双重检查锁保证并发下同一工作流只加载一次
*/
public GraphEngine getNewEngineInstanceById(String userId, String workflowId,
boolean asyncMode, BaseCallback callback) {
GraphEngine template = templateCache.get(workflowId);
if (template == null) {
synchronized (workflowId.intern()) { // 以 workflowId 为粒度加锁,避免全局锁
template = templateCache.get(workflowId);
if (template == null) {
template = loadEngineFromDb(workflowId); // 从数据库加载并解析
templateCache.put(workflowId, template);
}
}
}
// 深拷贝:模板不可变,每次返回全新实例
GraphEngine instance = template.clone();
instance.setAsyncMode(asyncMode);
instance.setUserId(userId);
if (callback != null) instance.setCallback(callback);
return instance;
}
/**
* 创建 GraphEngine —— 解析 FlowWsDTO 为引擎所需数据结构
*
* 关键步骤:
* 1. 解析边 → EdgeManage(建立倒排索引)
* 2. 解析节点 → allNodeDataMap(节点配置扁平化)
* 特别处理:groupParams(分组参数)→ allNodeParams(展平Map),
* 方便节点运行时 O(1) 取参
*/
private GraphEngine createEngine(String userId, boolean asyncMode,
GraphRunType runType, FlowWsDTO flowDTO,
BaseCallback callback) {
// 解析边,建立倒排索引
EdgeManage edgeManage = EdgeManage.fromEdges(
flowDTO.getEdges() != null ? flowDTO.getEdges() : new ArrayList<>());
// 解析节点,扁平化参数
Map<String, BaseNodeData> allNodeDataMap = new HashMap<>();
for (NodeDataBO nodeDataBO : flowDTO.getNodes()) {
BaseNodeData nodeData = nodeDataBO.getData();
if (nodeData == null) continue;
// 将分组参数结构扁平化为 Map<key, NodeParam>
Map<String, NodeParam> allNodeParams = new HashMap<>();
if (nodeData.getGroupParams() != null) {
nodeData.getGroupParams().forEach(group ->
group.getParams().forEach(param -> {
allNodeParams.put(param.getKey(), param);
// 递归处理子参数
if (param.getChildren() != null) {
param.getChildren().forEach(child ->
allNodeParams.put(child.getKey(), child));
}
})
);
}
nodeData.setParams(allNodeParams);
allNodeDataMap.put(nodeData.getId(), nodeData);
}
return new GraphEngine(userId, asyncMode, runType, flowDTO,
edgeManage, allNodeDataMap, callback);
}
}
3.9 GraphEngine — 图执行引擎核心
设计思路:状态机驱动的 DAG 调度器,通过 runQueue(双端队列)+ 节点分类执行,实现 DAG 的动态拓扑推进和并行调度。
package com.workflow.engine.graph;
/**
* 图执行引擎 —— 工作流调度核心
*
* 核心设计:
* 1. 状态机:WAITING/RUNNING/COMPLETED/FAILED/STOPPED
* 2. 动态 DAG 遍历:节点完成后才将后继节点入队(BFS 变体)
* 3. 节点五分类调度:不同类型节点采用不同执行策略
* 4. 并行执行:NOINTERACTIVE 节点提交线程池并行,allOf 等待
* 5. Cloneable:支持深拷贝,多用户实例隔离
* 6. AutoCloseable:支持 try-with-resources,资源自动释放
*/
public class GraphEngine implements Cloneable, AutoCloseable {
private final FlowWsDTO workflowDTO;
private final EdgeManage edgeManage;
private final Map<String, BaseNodeData> allNodeDataMap; // 节点配置(只读)
private final GraphState graphState; // 运行时状态(可写,线程安全)
private final Map<String, BaseNode> nodeInstanceMap = new ConcurrentHashMap<>(); // 节点实例缓存
private final Deque<String> runQueue = new LinkedList<>(); // 待执行队列(双端)
private List<BaseNode> lastRunNodeList = new ArrayList<>(); // 最近一批执行节点(用于判断合流)
private final Set<String> visitedNodeSet = new HashSet<>(); // 已访问节点(防重复)
private GraphEngineStatus status = GraphEngineStatus.WAITING;
private String pendingInputNodeId; // 当前等待用户输入的节点 ID
private final int graphMaxSteps; // 最大步骤数(防死循环)
private final int nodeMaxSteps; // 单节点最大步骤数
private RunGraphNodeThreadPool nodeThreadPool; // 并行执行线程池(Spring Bean)
// ==================== 公开 API ====================
/**
* 运行工作流(外部调用入口)
* @param inputData 用户输入,首次调用传 null,后续续跑传用户数据
*/
public GraphRunResult run(Map<String, Object> inputData) {
if (inputData != null && !inputData.isEmpty()) {
saveUserInput(inputData); // 保存输入到对话上下文
continueRun(inputData); // 处理用户输入并恢复执行
} else {
monitor.setStartRunTime(System.currentTimeMillis());
startRun(); // 首次启动,找开始节点
}
// 同步阻塞循环,直到状态不再是 RUNNING
while (GraphEngineStatus.RUNNING == status) {
continueRun(null);
}
return new GraphRunResult(status, reason);
}
/** 停止工作流 */
public void stop() {
nodeInstanceMap.values().forEach(BaseNode::stop);
status = GraphEngineStatus.STOPPED;
}
// ==================== 内部调度逻辑 ====================
/**
* 启动工作流
* 遍历 allNodeDataMap 找到 START 节点,入队后进入调度循环
*/
private void startRun() {
status = GraphEngineStatus.RUNNING;
// 遍历节点配置 Map,找 START 类型节点
for (Map.Entry<String, BaseNodeData> entry : allNodeDataMap.entrySet()) {
if (entry.getValue().getType().startsWith(NodeTypeEnum.START.getType())) {
runQueue.add(entry.getKey()); // 只入队开始节点 ID
break;
}
}
if (runQueue.isEmpty()) {
throw new IllegalStateException("工作流没有开始节点");
}
continueRun(null);
}
/**
* 核心调度循环 —— DAG 遍历引擎
*
* 每轮循环处理 runQueue 中当前所有节点,按五种类型分别处理:
* 优先级:前置交互 > 普通节点 > 后置交互 > 等待节点 > 结束节点
*/
private void runGraph() {
while (!runQueue.isEmpty() && status == GraphEngineStatus.RUNNING) {
// 防无限循环保护
if (monitor.getDoCount() >= graphMaxSteps) {
status = GraphEngineStatus.FAILED;
reason = "已达到最大执行步数: " + graphMaxSteps;
break;
}
// ── 第一步:扫描分类 ──────────────────────────────────────
List<String> preposeNodes = new ArrayList<>(); // 前置交互节点
List<String> noInteractiveNodes = new ArrayList<>(); // 普通节点
List<String> postpositionNodes = new ArrayList<>(); // 后置交互节点
List<String> waitingNodes = new ArrayList<>(); // 等待/聚合节点
List<String> endNodes = new ArrayList<>(); // 结束节点
for (String nodeId : runQueue) {
BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
if (node == null) { status = GraphEngineStatus.FAILED; return; }
switch (node.getInteractiveType()) {
case PERPOSE: preposeNodes.add(nodeId); break;
case POSTPOSITION: postpositionNodes.add(nodeId); break;
case WAITING: waitingNodes.add(nodeId); break;
case END: endNodes.add(nodeId); break;
default: noInteractiveNodes.add(nodeId); break;
}
}
// ── 第二步:前置交互节点(最高优先级,立即挂起)────────────
if (!preposeNodes.isEmpty()) {
String nodeId = preposeNodes.get(0); // 每次只处理一个
BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
runQueue.remove(nodeId);
handleUserInputNode(node); // 设置 WAITING 状态
return; // 立即返回,等待用户输入
}
// ── 第三步:普通节点(线程池并行执行)────────────────────────
if (!noInteractiveNodes.isEmpty()) {
executeNodesInThreadPool(noInteractiveNodes); // 并行,allOf 等待
for (String nodeId : noInteractiveNodes) {
BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
runQueue.remove(nodeId);
List<String> nextIds = handleNodeRunQueue(node); // 后继节点入队
addToLastRunNodeList(nextIds);
}
}
// ── 第四步:后置交互节点(串行,检测内部等待)────────────────
if (!postpositionNodes.isEmpty()) {
executePostpositionNodes(postpositionNodes);
}
// ── 第五步:等待/聚合节点(忙等策略)────────────────────────
for (String nodeId : waitingNodes) {
if (executeWaitingNode(nodeId)) {
// 前驱已全部完成,正常执行
BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
runQueue.remove(nodeId);
List<String> nextIds = handleNodeRunQueue(node);
addToLastRunNodeList(nextIds);
} else {
// 前驱未完成,放回队尾,下轮再试(忙等)
runQueue.remove(nodeId);
runQueue.addLast(nodeId);
}
}
// ── 第六步:结束节点(最低优先级,串行执行)─────────────────
for (String nodeId : endNodes) {
executeEndNode(nodeId);
if (status != GraphEngineStatus.RUNNING) break;
}
}
if (runQueue.isEmpty() && GraphEngineStatus.RUNNING == status) {
status = GraphEngineStatus.COMPLETED;
}
}
/**
* 并行执行普通节点 —— 线程池 + CompletableFuture
*/
private void executeNodesInThreadPool(List<String> nodeIds) {
List<CompletableFuture<NodeRunResult>> futures = new ArrayList<>();
for (String nodeId : nodeIds) {
BaseNode node = getOrCreateNodeInstance(nodeId, allNodeDataMap.get(nodeId));
Locale locale = LocaleContextHolder.getLocale(); // 捕获主线程 Locale
RunGraphNodeTask task = new RunGraphNodeTask(workflowDTO.getId(), node, locale);
CompletableFuture<NodeRunResult> future = nodeThreadPool.submitNodeTask(task);
futures.add(future);
// 异步处理结果(非阻塞回调)
future.thenAccept(result -> {
if (result.getStatus() == NodeRunStatus.COMPLETED) {
synchronized (this) { // 写共享状态需要同步
graphNodeLogs.addLogs(node.getNodeLogs());
monitor.incrementDoCount();
visitedNodeSet.add(nodeId);
}
} else {
synchronized (this) {
status = GraphEngineStatus.FAILED;
reason = "节点执行失败: " + result.getReason();
}
}
});
}
try {
// 主线程阻塞等待所有并行任务完成(最长 30 分钟)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(30, TimeUnit.MINUTES);
} catch (TimeoutException e) {
futures.forEach(f -> { if (!f.isDone()) f.cancel(true); });
status = GraphEngineStatus.FAILED;
reason = "并行节点执行超时(30分钟)";
} catch (Exception e) {
status = GraphEngineStatus.FAILED;
reason = "并行节点执行异常: " + e.getMessage();
}
}
/**
* 路由推进 —— 节点执行完毕后,将后继节点入队
*
* 规则:
* - 普通节点 → addFirst(队首,优先处理)
* - 结束节点 → addLast(队尾,最后处理)
* - 已在队列中的节点不重复入队(防止并行分支重复添加同一个合流节点)
*/
private List<String> handleNodeRunQueue(BaseNode node) {
List<String> addedIds = new ArrayList<>();
for (String targetId : node.routeNode()) {
if (!runQueue.contains(targetId)) {
if (targetId.startsWith(NodeTypeEnum.END.getType())) {
runQueue.addLast(targetId);
} else {
runQueue.addFirst(targetId);
}
addedIds.add(targetId);
}
// 设置后继节点的来源,供条件判断使用
getOrCreateNodeInstance(targetId, allNodeDataMap.get(targetId))
.setRunSourceNodeId(node.getId());
}
return addedIds;
}
/**
* 节点懒加载 —— 首次访问时创建,之后复用缓存实例
*
* 创建时注入:出边列表(targetEdges)+ 入边列表(sourceEdges)
* 出边用于路由决策,入边用于前驱完成检测
*/
private BaseNode getOrCreateNodeInstance(String nodeId, BaseNodeData nodeData) {
if (nodeInstanceMap.containsKey(nodeId)) {
return nodeInstanceMap.get(nodeId); // 缓存命中
}
List<EdgeBase> targetEdges = edgeManage.getTargetEdges(nodeId); // 正向索引
List<EdgeBase> sourceEdges = edgeManage.getSourceEdges(nodeId); // 反向索引
Map<String, Object> params = new HashMap<>();
params.put("nodeData", nodeData);
params.put("workflowId", workflowDTO.getId());
params.put("userId", userId);
params.put("graphState", graphState);
params.put("targetEdges", targetEdges);
params.put("sourceEdges", sourceEdges);
params.put("maxSteps", nodeMaxSteps);
params.put("callback", callback);
BaseNode node = NodeFactory.instanceNode(
Integer.parseInt(nodeData.getType()), params);
nodeInstanceMap.put(nodeId, node);
return node;
}
/**
* 深拷贝 —— 为每个用户创建独立实例
*
* 深拷贝的字段:workflowDTO、allNodeDataMap(通过 JSON 序列化实现)
* 清空的字段:nodeInstanceMap、runQueue、visitedNodeSet(全新执行)
* 共享的字段:nodeThreadPool(线程池单例,全局共享)、callback
*/
@Override
public GraphEngine clone() {
GraphEngine copy = new GraphEngine(this); // 调用私有拷贝构造函数
this.monitor.setLastCloneTime(System.currentTimeMillis());
return copy;
}
private GraphEngine(GraphEngine source) {
this.userId = source.userId;
this.asyncMode = source.asyncMode;
this.graphRunType = source.graphRunType;
this.graphMaxSteps = source.graphMaxSteps;
this.nodeMaxSteps = source.nodeMaxSteps;
this.callback = source.callback;
this.nodeThreadPool = source.nodeThreadPool; // 共享线程池
// JSON 深拷贝(隔离数据)
this.workflowDTO = deepCopy(source.workflowDTO, FlowWsDTO.class);
this.allNodeDataMap = deepCopyNodeMap(source.allNodeDataMap);
this.edgeManage = EdgeManage.fromEdges(source.workflowDTO.getEdges());
this.graphState = new GraphState(); // 全新状态,完全隔离
this.status = source.status;
// nodeInstanceMap、runQueue、visitedNodeSet 均为空(全新执行)
}
/**
* 释放资源(AutoCloseable,支持 try-with-resources)
*/
@Override
public void close() {
// 递归释放子流程引擎
graphState.getReferGraphEnginList().forEach(GraphEngine::close);
// 批量持久化节点运行日志
graphNodeLogs.saveLogs();
// 清理内存
nodeInstanceMap.clear();
runQueue.clear();
visitedNodeSet.clear();
graphState.clear();
}
}
3.10 RunGraphEngineThreadPool — 引擎级线程池
设计思路:与 RunGraphNodeThreadPool 对称设计,但职责不同。一个负责整个工作流的异步执行,一个负责批量节点的并行执行,两者共同构成二级线程池架构。
package com.workflow.engine.graph.thread;
/**
* 工作流引擎级执行线程池
*
* 与 RunGraphNodeThreadPool 的区别:
* - 本类:每个用户会话的整个工作流在此线程中运行(引擎级)
* - 节点池:单个工作流内多个并行节点在此线程中运行(节点级)
*
* 线程命名:run-graph-engine-thread-%d
*/
@Slf4j
@Component
public class RunGraphEngineThreadPool {
@Value("${graph.engine.thread.pool.coreSize:16}")
private Integer corePoolSize;
@Value("${graph.engine.thread.pool.maxSize:32}")
private Integer maxPoolSize;
@Value("${graph.engine.thread.pool.keepAliveTime:60}")
private Integer keepAliveTime;
@Value("${graph.engine.thread.pool.workQueueSize:5000}")
private Integer workQueueSize;
private ExecutorService threadPool;
@PostConstruct
public void init() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("run-graph-engine-thread-%d")
.build();
threadPool = new ThreadPoolExecutor(
corePoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(workQueueSize),
threadFactory,
new ThreadPoolExecutor.AbortPolicy()
);
}
public CompletableFuture<GraphRunResult> submitTask(RunGraphEngineTask task) {
return CompletableFuture.supplyAsync(() -> {
try { return task.call(); }
catch (Exception e) { throw new RuntimeException(e); }
}, threadPool);
}
@PreDestroy
public void destroy() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
两级线程池对比:
| 对比项 | RunGraphEngineThreadPool |
RunGraphNodeThreadPool |
|---|---|---|
| 线程名 | run-graph-engine-thread-%d |
run-graph-node-thread-%d |
| 执行粒度 | 整个工作流(一个会话一个线程) | 单个节点(一批并行节点多个线程) |
| 任务类型 | RunGraphEngineTask |
RunGraphNodeTask |
| 调用位置 | GraphEngineServiceImpl.run() |
GraphEngine.executeNodesInThreadPool() |
| 并发来源 | 多用户同时发起工作流 | 同一工作流内的并行节点 |
3.11 RunGraphEngineTask — 工作流启动任务
设计思路:工作流级别的门面,将 graphEngine.run() 的阻塞调用封装为 Callable,统一处理三个工作流生命周期回调事件,并解决跨线程的国际化和日志追踪问题。
package com.workflow.engine.graph.thread;
/**
* 工作流执行任务 —— 引擎级 Callable
*
* 与 RunGraphNodeTask 的对比:
* - 本类:包裹整个工作流,触发 onWorkflowStart/End/Error
* - 节点任务:包裹单个节点,触发 onNodeStart/End/Error
*
* 两者都需要解决同一个问题:跨线程的 Locale 传递
*/
@Slf4j
public class RunGraphEngineTask implements Callable<GraphRunResult> {
private final GraphEngine graphEngine;
private final String chatId;
private final Map<String, Object> inputData;
private final Locale locale; // 从主线程捕获,防止国际化错乱
@Override
public GraphRunResult call() {
try {
// ① 传递国际化语言到当前线程
LocaleContextHolder.setLocale(locale);
// ② 传递日志追踪 ID(chatId/webId 用于 MDC 日志链路追踪)
LogMdcUtil.putChatId(graphEngine.getCallback().getChatId());
// ③ 回调:通知前端工作流已启动(WebSocket 推送)
graphEngine.getCallback().onWorkflowStart(chatId);
// ④ 阻塞运行整个工作流(内部是 DAG 调度循环)
GraphRunResult runResult = graphEngine.run(inputData);
// ⑤ 根据最终状态触发对应回调
GraphEngineStatus status = runResult.getStatus();
if (GraphEngineStatus.FAILED == status) {
graphEngine.getCallback().onWorkflowError(chatId, runResult.getReason());
} else if (GraphEngineStatus.COMPLETED == status) {
graphEngine.getCallback().onWorkflowEnd(chatId, status, runResult.getReason());
}
// WAITING 状态不触发结束回调,等待用户继续输入
return runResult;
} catch (Exception e) {
log.error("工作流执行异常: chatId={}", chatId, e);
graphEngine.getCallback().onWorkflowError(chatId, e.getMessage());
return new GraphRunResult(GraphEngineStatus.FAILED, e.getMessage());
} finally {
// ⑥ 清理线程局部变量,防止线程复用时污染下一个任务
LogMdcUtil.clear();
LocaleContextHolder.resetLocaleContext();
}
}
}
三种结束状态的回调行为:
| 状态 | 触发回调 | 说明 |
|---|---|---|
COMPLETED |
onWorkflowEnd(COMPLETED) |
正常结束,通知前端关闭进度条 |
FAILED |
onWorkflowError(reason) |
异常结束,通知前端展示错误信息 |
WAITING |
不触发结束回调 | 挂起等待,前端保持输入态 |
| 抛出异常 | onWorkflowError(e.getMessage()) |
未捕获异常兜底处理 |
3.12 GraphEngineServiceImpl — 会话管理层
设计思路:介于外部 API 和 GraphEngine 之间的服务层,核心职责是以 chatId 为键管理所有正在运行的工作流实例,提供启动、续跑、停止、状态查询四个对外接口,并承担超时检测的被调用方。
package com.workflow.engine.graph.service;
/**
* 工作流引擎服务实现 —— 会话生命周期管理
*
* 核心数据结构:
* graphEnginesMap: chatId → GraphEngine
* 存放所有【正在运行中】的引擎实例
* 工作流结束(COMPLETED/FAILED)或超时后,自动从 Map 中移除并释放资源
*/
@Service
public class GraphEngineServiceImpl {
private static final long TIMEOUT_MS = 180 * 60 * 1000L; // 超时阈值:3小时
private static final int WARN_COUNT = 5000; // 预警阈值:5000个会话
/** 运行中会话注册表 */
private final ConcurrentHashMap<String, GraphEngine> graphEnginesMap = new ConcurrentHashMap<>();
@Autowired
private RunGraphEngineThreadPool runGraphEngineThreadPool;
// ==================== 对外接口 ====================
/** 生产模式:通过 workflowId 从缓存/数据库加载工作流并运行 */
public void startRunById(String workflowId, String userId, String chatId, BaseCallback callback) {
GraphEngine engine = getOrCreateGraphEngine(PUBLISH, workflowId, userId, chatId, null, callback);
run(chatId, engine, null);
}
/** 调试模式:直接传入工作流 JSON 数据运行 */
public void startRunByData(FlowWsDTO workflowData, String userId, String chatId, BaseCallback callback) {
GraphEngine engine = getOrCreateGraphEngine(DEBUG, null, userId, chatId, workflowData, callback);
run(chatId, engine, null);
}
/** 续跑:用户输入后继续执行挂起的工作流 */
public void continueRun(String chatId, Map<String, Object> inputData) {
GraphEngine engine = graphEnginesMap.get(chatId);
if (engine == null) { log.error("工作流不存在: {}", chatId); return; }
run(chatId, engine, inputData);
}
/** 主动停止工作流 */
public boolean stopGraphEngine(String chatId) {
GraphEngine engine = graphEnginesMap.get(chatId);
if (engine == null) return false;
engine.stop(); // 设置 stopFlag,通知所有节点停止
closeWorkflow(chatId); // 从注册表移除 + 释放资源
engine.getCallback().onWorkflowEnd(chatId, FAILED, "用户主动停止");
return true;
}
// ==================== 内部实现 ====================
/**
* 提交到引擎级线程池异步执行
* 执行完成后(COMPLETED/FAILED)自动 closeWorkflow()
*/
private void run(String chatId, GraphEngine engine, Map<String, Object> inputData) {
Locale locale = LocaleContextHolder.getLocale();
RunGraphEngineTask task = new RunGraphEngineTask(chatId, engine, inputData, locale);
CompletableFuture<GraphRunResult> future = runGraphEngineThreadPool.submitTask(task);
// 异步回调:执行完成后清理资源
future.thenAccept(result -> {
if (result.getStatus() == FAILED || result.getStatus() == COMPLETED) {
closeWorkflow(chatId);
}
}).exceptionallyAsync(ex -> {
closeWorkflow(chatId);
return null;
});
}
/**
* 创建引擎实例 —— 双重检查锁,同一 chatId 只创建一次
*/
private GraphEngine getOrCreateGraphEngine(GraphRunType type, String workflowId,
String userId, String chatId, FlowWsDTO data, BaseCallback callback) {
if (graphEnginesMap.containsKey(chatId)) {
return graphEnginesMap.get(chatId); // 续跑场景:直接返回已有实例
}
synchronized (chatId.intern()) {
GraphEngine engine = graphEnginesMap.get(chatId);
if (engine == null) {
GraphEngineFactory factory = GraphEngineFactory.getInstance();
engine = (type == PUBLISH)
? factory.getNewEngineInstanceById(userId, workflowId, false, callback)
: factory.getNewEngineInstanceByData(userId, data, false, callback);
graphEnginesMap.put(chatId, engine);
}
return engine;
}
}
/**
* 关闭工作流 —— 从注册表移除 + 释放引擎资源
*/
private void closeWorkflow(String chatId) {
synchronized (chatId.intern()) {
GraphEngine engine = graphEnginesMap.remove(chatId);
if (engine != null) engine.close(); // 释放线程、状态、日志等资源
}
}
/**
* 健康检查 —— 由 CheckGraphEngineThread 每 10 分钟调用一次
*/
public void checkGrapEngineHealth() {
// 预警检查(不强制停止,只打日志)
if (graphEnginesMap.size() > WARN_COUNT) {
log.warn("当前会话数 {} 超过预警值 {}", graphEnginesMap.size(), WARN_COUNT);
}
// 超时强制关闭
graphEnginesMap.forEach((chatId, engine) -> {
long lifeTime = System.currentTimeMillis() - engine.getMonitor().getCreateTime();
if (lifeTime > TIMEOUT_MS) {
engine.getCallback().onWorkflowEnd(chatId, FAILED, "工作流超时(3小时)");
closeWorkflow(chatId);
log.info("超时会话已强制关闭: chatId={}, lifeTime={}min", chatId, lifeTime/60000);
}
});
}
}
3.13 CheckGraphEngineThread — 看门狗线程
设计思路:独立的守护线程,每 10 分钟调用一次 checkGrapEngineHealth(),配合 CheckGraphEngineException(未捕获异常处理器)实现自愈式看门狗——看门狗线程自己崩了会自动重启。
package com.workflow.engine.graph.thread;
/**
* 工作流超时检测线程 —— 看门狗
*
* 两个特性:
* 1. 无限循环:永不退出,持续监控所有运行中的工作流
* 2. 自愈:注册 UncaughtExceptionHandler,异常退出后自动重启新线程
*/
public class CheckGraphEngineThread extends Thread {
@Override
public void run() {
// 关键:注册未捕获异常处理器,保证线程崩了能自动重启
Thread.currentThread().setUncaughtExceptionHandler(new CheckGraphEngineException());
while (true) {
try {
// 调用健康检查(超时 → 强制关闭 + 通知前端)
GraphEngineServiceImpl service = SpringContextHolder.getBean(GraphEngineServiceImpl.class);
if (service != null) {
service.checkGrapEngineHealth();
}
Thread.sleep(10 * 60 * 1000); // 每 10 分钟检查一次
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 保留中断标志
}
}
}
}
/**
* 看门狗线程的异常处理器 —— 实现自愈
*
* 当 CheckGraphEngineThread 因未捕获异常退出时,
* 此处理器自动创建并启动一个新的看门狗线程
*/
@Slf4j
public class CheckGraphEngineException implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("看门狗线程异常退出,正在重启: {}", e.getMessage());
// 重启新的看门狗线程
new CheckGraphEngineThread().start();
}
}
看门狗完整工作流程:
应用启动(@PostConstruct 或 ApplicationRunner)
│
▼
CheckGraphEngineThread.start()
│
└── while(true)
│
├── checkGrapEngineHealth()
│ ├── graphEnginesMap.size() > 5000 → log.warn(预警)
│ └── 遍历所有引擎
│ lifeTime > 3小时
│ → onWorkflowEnd(FAILED, "超时")
│ → closeWorkflow()(释放资源)
│
└── sleep(10分钟)
如果 while 内部抛出 RuntimeException 等未捕获异常
↓
UncaughtExceptionHandler.uncaughtException()
↓
new CheckGraphEngineThread().start() ← 自动重启!
四、完整调用链路
4.1 整体架构分层图
┌─────────────────────────────────────────────────────────────────────┐
│ 外部请求(HTTP / WebSocket) │
└─────────────────────────────┬───────────────────────────────────────┘
│ startRunById / startRunByData / continueRun
▼
┌─────────────────────────────────────────────────────────────────────┐
│ GraphEngineServiceImpl │
│ 会话注册表:graphEnginesMap(chatId → GraphEngine) │
│ 职责:会话生命周期管理、超时健康检查的被调用方 │
└──────────────────────────────┬──────────────────────────────────────┘
│ submitTask(RunGraphEngineTask)
▼
┌────────────────────────────────┐
│ RunGraphEngineThreadPool │ ← 引擎级线程池
│ run-graph-engine-thread-%d │ 每会话一线程
└───────────────┬────────────────┘
│ call()
▼
┌─────────────────────────────┐
│ RunGraphEngineTask │ ← 工作流门面
│ onStart / onEnd / onError │ Locale + MDC 跨线程
└──────────────┬──────────────┘
│ graphEngine.run()
▼
┌──────────────────────────────────────┐
│ GraphEngine │ ← DAG 调度核心
│ runQueue / graphState │
│ 六步分类 / handleNodeRunQueue │
└───────────────┬──────────────────────┘
│ executeNodesInThreadPool()
▼
┌────────────────────────────────┐
│ RunGraphNodeThreadPool │ ← 节点级线程池
│ run-graph-node-thread-%d │ 批量并行节点
└───────────────┬────────────────┘
│ call()
▼
┌─────────────────────────────┐
│ RunGraphNodeTask │ ← 节点执行单元
│ lifecycle钩子 / node.run() │ Locale + MDC 跨线程
└─────────────────────────────┘
─────────────────────── 独立监控链路 ─────────────────────────────────
CheckGraphEngineThread(每 10 分钟)
└── checkGrapEngineHealth()
├── 会话数 > 5000 → log.warn 预警
└── 存活 > 3小时 → onWorkflowEnd(FAILED) + closeWorkflow()
└── CheckGraphEngineException(崩了自动重启新线程)
4.2 单次工作流执行详细流程
外部请求 startRunById(workflowId, userId, chatId, callback)
│
▼ GraphEngineServiceImpl
├── getOrCreateGraphEngine()
│ ├── graphEnginesMap.containsKey(chatId)? → 直接返回(续跑)
│ └── 否则 双重检查锁创建:
│ GraphEngineFactory.getInstance()
│ └── templateCache.get(workflowId)
│ 命中 → clone() 深拷贝新实例
│ 未命中 → loadFromDb() → createEngine() → 放缓存 → clone()
│
└── run(chatId, engine, null)
└── 封装 RunGraphEngineTask,提交引擎级线程池(非阻塞返回)
resultFuture.thenAccept: COMPLETED/FAILED → closeWorkflow()
在 run-graph-engine-thread-N 线程中:
RunGraphEngineTask.call()
├── LocaleContextHolder.setLocale(locale) // 传递国际化
├── callback.onWorkflowStart(chatId) // 通知前端启动
├── graphEngine.run(null) // 阻塞运行
│ ├── startRun()
│ │ └── 遍历 allNodeDataMap 找 START 节点
│ │ runQueue.add(startNodeId)
│ │
│ └── while(RUNNING) → runGraph()
│ │
│ ├─ 扫描 runQueue,按 getInteractiveType() 分类
│ │
│ ├─ [PERPOSE] → handleUserInputNode()
│ │ status=WAITING, return ← 挂起等用户
│ │ (下次 continueRun(inputData) 恢复)
│ │
│ ├─ [NOINTERACTIVE] → executeNodesInThreadPool()
│ │ ├── 每节点封装 RunGraphNodeTask
│ │ ├── 提交节点级线程池(run-graph-node-thread-N)
│ │ │ call(): lifecycle_before → run() → lifecycle_after
│ │ │ 结果写入 graphState.setVariable(nodeId, key, value)
│ │ └── allOf().get(30min) 主线程阻塞等待所有并行完成
│ │ 完成 → handleNodeRunQueue() → 后继节点入队
│ │
│ ├─ [POSTPOSITION] → 串行执行,检测内部等待
│ │
│ ├─ [WAITING] → canContinueExecution()?
│ │ true → 执行 → 后继入队
│ │ false → 放回队尾(忙等)
│ │
│ └─ [END] → executeEndNode()
│ status = COMPLETED
│
├── status==COMPLETED → callback.onWorkflowEnd()
├── status==FAILED → callback.onWorkflowError()
└── finally: LogMdcUtil.clear() + resetLocaleContext()
五、关键设计亮点总结
5.1 工厂模式 + 模板缓存 + 深拷贝(解决多用户并发)
问题:每个用户都要运行同一个工作流,如果每次都解析 JSON 很慢,
如果共享同一个引擎实例又会有状态污染。
方案:工厂维护"模板缓存池"(workflowId → 已解析的 GraphEngine)
每个用户请求时,从模板 clone() 出全新实例
→ 解析只做一次(O(1) 查缓存),状态完全隔离
5.2 动态推进式 DAG 遍历(懒加载 + BFS 变体)
问题:工作流结构复杂,有条件分支、循环、子流程,
无法提前预计算完整拓扑序。
方案:只将开始节点入队,每个节点执行完后才将后继节点入队
→ 支持运行时动态决策(条件节点、意图节点)
→ 自然支持循环(节点可以将自身前驱再次入队)
5.3 节点五分类调度(策略模式替代 if-else)
问题:不同节点需要不同的调度策略(并行、挂起、合流),
如果在引擎里写大量 instanceof 判断,耦合度高难以扩展。
方案:NodeInteractiveType 枚举 + BaseNode.getInteractiveType() 多态
→ 新增节点只需重写 getInteractiveType(),引擎代码零修改(开闭原则)
5.4 生命周期钩子(模板方法模式)
问题:不同节点在执行前后、用户输入前后,有各自的初始化、
校验、清理逻辑,但这些逻辑又不应该写进引擎里。
方案:BaseNode 定义 lifecycle_* 系列钩子,默认实现为空,
子类按需重写
→ 引擎调用统一接口,节点自己决定行为
5.5 线程安全的状态共享
问题:并行执行的节点需要读写共享的 GraphState,
如何保证线程安全且不相互干扰?
方案:
1. ConcurrentHashMap 保证并发写入安全
2. nodeId 作为一级 Key 分区,各节点变量天然隔离
3. 读写共享计数器(doCount、visitedNodeSet)时加 synchronized
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)