MaxKB4j智能体平台- 节点工作流引擎技术原理与实现
MaxKB4j 节点工作流引擎技术原理与实现
项目地址:https://gitee.com/taisan/MaxKB4j | Gitee GVP 认证
一、概述与架构设计
1.1 工作流引擎定位
MaxKB4j是一个基于Java的智能知识库系统,其核心特性之一是可视化工作流编排能力。工作流引擎作为系统的"神经中枢",负责将用户通过拖拽配置的节点图转化为可执行的业务逻辑流程,支持AI对话、知识检索、条件分支、循环迭代等复杂场景。
与传统的工作流引擎(如Activiti、Flowable)不同,MaxKB4j的工作流引擎专门为AI应用场景设计,具有以下特点:
- 可视化编排:基于LogicFlow的前端图编辑,后端直接解析执行
- 流式响应:原生支持AI大模型的流式输出,实时推送到前端
- 多模态支持:支持文本、图片、语音、文档等多种输入输出
- 智能节点:内置AI对话、知识库检索、意图识别等AI专用节点
- 动态变量:支持跨节点的变量引用和模板渲染
1.2 整体架构
┌─────────────────────────────────────────────────────────────────┐
│ 前端层 (LogicFlow) │
│ ┌─────────────────────────────────┐ │
│ │ 节点拖拽 → JSON配置 → API调用 │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ maxkb4j-workflow-api │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ AbsNode │ │ Workflow │ │ WorkflowContext │ │
│ │ (节点抽象) │ │ (工作流模型)│ │ (上下文管理) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │NodeFactory │ │VariableRes. │ │ TemplateRenderer │ │
│ │ (节点工厂) │ │ (变量解析) │ │ (模板渲染) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ maxkb4j-workflow │
│ ┌─────────────────────┐ ┌─────────────────────────────────┐ │
│ │ AbsWorkflowHandler │ │ INodeHandler │ │
│ │ (执行器基类) │ │ ┌─────────────────────┐ │ │
│ └─────────────────────┘ │ │ LLMNodeHandler │ │ │
│ ┌─────────────────────┐ │ │ ConditionNodeHandler│ │ │
│ │ChatWorkflowHandler │ │ │ LoopNodeHandler │ │ │
│ │ KnowledgeWorkf... │ │ │ SearchKnowledgeH... │ │ │
│ └─────────────────────┘ │ └─────────────────────┘ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
1.3 模块职责划分
| 模块 | 路径 | 职责 |
|---|---|---|
maxkb4j-workflow-api |
服务API层 | 定义核心抽象、数据模型、工厂接口,供其他模块依赖 |
maxkb4j-workflow |
服务实现层 | 实现工作流执行逻辑、节点处理器、条件判断等具体功能 |
这种分层设计遵循了依赖倒置原则:实现层依赖API层的抽象,便于替换实现和进行单元测试。
二、核心概念与数据模型
2.1 节点抽象设计
节点是工作流的基本执行单元。所有节点类型都继承自AbsNode抽象基类:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/node/AbsNode.java
@Data
public abstract class AbsNode {
// 节点标识
private String id; // 静态ID(来自前端配置)
private String type; // 节点类型标识
private String runtimeNodeId; // 运行时ID(动态生成)
private String viewType; // 视图类型
// 节点配置
private JSONObject properties; // 节点属性(来自前端)
private List<String> upNodeIdList; // 上游节点ID列表
// 执行状态
protected Map<String, Object> context; // 节点上下文
protected Map<String, Object> detail; // 执行详情
private Integer status; // 节点状态
private String errMessage; // 错误信息
// 输出
private String answerText; // 回答文本
// 抽象方法:子类实现具体的上下文恢复逻辑
public abstract void saveContext(Workflow workflow, Map<String, Object> detail);
}
设计要点:
-
双ID机制:
id:静态ID,对应前端节点配置,在整个工作流生命周期内不变runtimeNodeId:运行时ID,结合上游节点路径生成,用于区分循环节点的不同迭代
private String generateRuntimeNodeId() { return NodeIdGenerator.generateRuntimeNodeId(id, upNodeIdList); } -
模板方法模式:
saveContext()是抽象方法,由各节点类型自行实现上下文恢复逻辑 -
状态机:节点状态通过
NodeStatus枚举定义:public enum NodeStatus { READY(0), // 就绪 SUCCESS(1), // 成功 ERROR(2), // 错误 INTERRUPT(3), // 中断(等待用户输入) SKIP(4); // 跳过 }
2.2 边与图的表示
工作流本质上是一个有向无环图(DAG),节点之间通过边连接:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/logic/LfEdge.java
@Data
public class LfEdge {
private String id; // 边ID
private String type; // 边类型
private String sourceNodeId; // 源节点ID
private String targetNodeId; // 目标节点ID
private String sourceAnchorId; // 源锚点ID(用于条件分支)
private String targetAnchorId; // 目标锚点ID
private List<LfPoint> pointsList; // 绘制点列表
private JSONObject properties; // 边属性
}
条件分支的实现:通过sourceAnchorId标识条件分支。例如,条件节点有两个分支,分别对应sourceNodeId_right和sourceNodeId_left:
┌── right ──→ Node A
[Condition Node] ───┤
└── left ───→ Node B
LogicFlow类封装了完整的图结构:
@Data
public class LogicFlow {
private List<LfNode> nodes; // 节点列表
private List<LfEdge> edges; // 边列表
public static LogicFlow newInstance(JSONObject flowJson) {
return JSON.parseObject(flowJson.toJSONString(), new TypeReference<>() {});
}
}
2.3 工作流上下文管理
WorkflowContext实现了三级作用域的变量管理:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/WorkflowContext.java
@Data
public class WorkflowContext {
// 全局变量:跨会话共享的配置变量
private final Map<String, Object> globalContext;
// 聊天变量:当前会话的变量(如用户信息、会话参数)
private final Map<String, Object> chatContext;
// 节点变量:各节点的执行结果
private final List<AbsNode> nodeContext;
public void appendNode(AbsNode currentNode) {
// 支持更新已存在的节点上下文
for (int i = 0; i < this.nodeContext.size(); i++) {
AbsNode node = this.nodeContext.get(i);
if (currentNode.getId().equals(node.getId())
&& currentNode.getRuntimeNodeId().equals(node.getRuntimeNodeId())) {
this.nodeContext.set(i, currentNode);
return;
}
}
this.nodeContext.add(currentNode);
}
}
变量作用域层级:
┌─────────────────────────────────────────┐
│ Global Context │ ← 应用级配置
│ (全局变量:模型配置、系统参数等) │
├─────────────────────────────────────────┤
│ Chat Context │ ← 会话级变量
│ (聊天变量:用户信息、会话参数等) │
├─────────────────────────────────────────┤
│ Node Context │ ← 节点级变量
│ (节点变量:各节点的执行结果) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node 1 │ │ Node 2 │ │ Node N │ │
│ │ answer │ │ result │ │ output │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
2.4 变量解析机制
VariableResolver负责将各级上下文变量解析为模板可用的格式:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/VariableResolver.java
public class VariableResolver {
private final WorkflowContext context;
private final Map<String, Object> loopContext; // 循环变量(可选)
// 获取所有模板变量:{scope.variable: value} 格式
public Map<String, Object> getPromptVariables() {
Map<String, Object> result = new HashMap<>();
// 全局变量: global.xxx
if (context.getGlobalContext() != null) {
for (Map.Entry<String, Object> entry : context.getGlobalContext().entrySet()) {
result.put("global." + entry.getKey(), entry.getValue());
}
}
// 聊天变量: chat.xxx
if (context.getChatContext() != null) {
for (Map.Entry<String, Object> entry : context.getChatContext().entrySet()) {
result.put("chat." + entry.getKey(), entry.getValue());
}
}
// 循环变量: loop.xxx
if (loopContext != null) {
for (Map.Entry<String, Object> entry : loopContext.entrySet()) {
result.put("loop." + entry.getKey(), entry.getValue());
}
}
// 节点变量: nodeName.variableName
if (context.getNodeContext() != null) {
for (AbsNode node : context.getNodeContext()) {
result.putAll(getNodeVariables(node));
}
}
return result;
}
}
变量引用示例:
在AI对话节点的Prompt中,可以这样引用变量:
你是一个智能助手,用户名是 {{chat.userName}}。
上一轮对话结果是:{{AI对话节点.answer}}
当前循环索引:{{loop.index}}
2.5 模板渲染
TemplateRenderer基于LangChain4j的PromptTemplate实现变量替换:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/TemplateRenderer.java
public record TemplateRenderer(VariableResolver variableResolver) {
public String render(String prompt) {
return render(prompt, variableResolver.getPromptVariables());
}
public String render(String prompt, Map<String, Object> addVariables) {
if (StringUtils.isBlank(prompt)) {
return "";
}
Map<String, Object> variables = new HashMap<>(variableResolver.getPromptVariables());
if (addVariables != null) {
variables.putAll(addVariables); // 额外变量可覆盖
}
PromptTemplate promptTemplate = PromptTemplate.from(prompt);
return promptTemplate.apply(variables).text();
}
}
三、节点类型体系
3.1 节点类型枚举
系统内置了丰富的节点类型,覆盖AI应用的常见场景:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/enums/NodeType.java
@Getter
@AllArgsConstructor
public enum NodeType {
// 基础节点
BASE("base-node", "基础节点"),
START("start-node", "开始节点"),
// 流程控制节点
CONDITION("condition-node", "条件节点"),
LOOP("loop-node", "循环节点"),
LOOP_START("loop-start-node", "循环开始"),
LOOP_BREAK("loop-break-node", "循环跳出"),
LOOP_CONTINUE("loop-continue-node", "循环继续"),
// AI相关节点
AI_CHAT("ai-chat-node", "智能聊天节点"),
IMAGE_GENERATE("image-generate-node", "图片生成"),
IMAGE_UNDERSTAND("image-understand-node", "图片理解"),
SPEECH_TO_TEXT("speech-to-text-node", "语音转文字"),
TEXT_TO_SPEECH("text-to-speech-node", "文字转语音"),
// 知识库节点
SEARCH_KNOWLEDGE("search-knowledge-node", "知识库搜索节点"),
RERANKER("reranker-node", "多路召回"),
KNOWLEDGE_WRITE("knowledge-write-node", "知识库写入节点"),
// 数据处理节点
VARIABLE_ASSIGN("variable-assign-node", "变量赋值"),
VARIABLE_AGGREGATE("variable-aggregation-node", "变量聚合"),
PARAMETER_EXTRACTION("parameter-extraction-node", "参数提取节点"),
NL2SQL("nl2sql-node", "自然语言转SQL节点"),
// 工具节点
TOOL("tool-node", "自定义函数节点"),
TOOL_LIB("tool-lib-node", "工具库节点"),
HTTP_CLIENT("http-node", "HTTP请求节点"),
MCP("mcp-node", "MCP节点"),
// 交互节点
FORM("form-node", "表单收集"),
USER_SELECT("user-select-node", "用户选择节点"),
QUESTION("question-node", "问题节点"),
REPLY("reply-node", "回复节点");
private final String key;
private final String name;
// O(1)查找优化
private static final Map<String, NodeType> KEY_MAP;
static {
KEY_MAP = Arrays.stream(values())
.collect(Collectors.toUnmodifiableMap(NodeType::getKey, Function.identity()));
}
}
3.2 节点工厂与注册表模式
采用工厂模式 + 注册表模式实现节点的动态创建,符合开闭原则:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/factory/NodeFactory.java
@Getter
public class NodeFactory {
@FunctionalInterface
public interface NodeCreator {
AbsNode create(String id, JSONObject properties);
}
private final NodeRegistry registry;
public NodeFactory() {
this.registry = new NodeRegistry();
registerDefaultNodes();
}
// 注册默认节点类型(避免switch语句)
private void registerDefaultNodes() {
register(NodeType.START, StartNode::new);
register(NodeType.AI_CHAT, AiChatNode::new);
register(NodeType.CONDITION, ConditionNode::new);
register(NodeType.LOOP, LoopNode::new);
register(NodeType.SEARCH_KNOWLEDGE, SearchKnowledgeNode::new);
// ... 其他节点类型
}
public void register(NodeType nodeType, NodeCreator creator) {
registry.register(nodeType.getKey(), creator);
}
public AbsNode createNode(LfNode lfNode) {
NodeCreator creator = registry.getCreator(lfNode.getType());
return creator.create(lfNode.getId(), lfNode.getProperties());
}
}
注册表实现:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/factory/NodeRegistry.java
public class NodeRegistry {
private final Map<String, NodeFactory.NodeCreator> creators;
public NodeRegistry() {
this.creators = new ConcurrentHashMap<>(); // 线程安全
}
public void register(String nodeType, NodeFactory.NodeCreator creator) {
creators.put(nodeType, creator);
}
public NodeFactory.NodeCreator getCreator(String nodeType) {
return creators.get(nodeType);
}
}
优势:
- 新增节点类型只需注册,无需修改工厂代码
- 支持运行时动态注册(如插件机制)
- 线程安全的注册表设计
3.3 核心节点类型详解
3.3.1 开始节点 (StartNode)
作为工作流的入口节点,负责初始化变量和加载上下文:
public class StartNode extends AbsNode {
public StartNode(String id, JSONObject properties) {
super(id, properties);
super.setType(START.getKey());
}
@Override
public void saveContext(Workflow workflow, Map<String, Object> detail) {
// 保存用户输入
context.put("question", detail.get("question"));
context.put("image", detail.get("imageList"));
context.put("document", detail.get("documentList"));
context.put("audio", detail.get("audioList"));
// 加载全局变量到工作流上下文
JSONArray globalFields = (JSONArray) detail.get("globalFields");
for (int i = 0; i < globalFields.size(); i++) {
JSONObject globalField = globalFields.getJSONObject(i);
workflow.getContext().put(globalField.getString("key"), globalField.get("value"));
}
// 加载聊天变量
String chatId = (String) workflow.getContext().get("chatId");
ChatInfo chatInfo = ChatCache.get(chatId);
workflow.getChatContext().putAll(chatInfo.getChatVariables());
}
}
3.3.2 AI对话节点 (AiChatNode)
与LLM交互的核心节点,支持流式输出和工具调用:
public class AiChatNode extends AbsNode {
public AiChatNode(String id, JSONObject properties) {
super(id, properties);
super.setType(AI_CHAT.getKey());
}
@Override
public void saveContext(Workflow workflow, Map<String, Object> detail) {
context.put("answer", detail.get("answer"));
context.put("reasoningContent", detail.get("reasoningContent"));
}
@Data
public static class NodeParams {
private String modelId; // 模型ID
private String system; // 系统提示词
private String prompt; // 用户提示词
private int dialogueNumber; // 历史对话轮数
private Boolean isResult; // 是否返回结果
private JSONObject modelParamsSetting; // 模型参数
private List<String> toolIds; // 工具ID列表
private List<String> imageList; // 图片字段引用
}
}
3.3.3 条件节点 (ConditionNode)
实现条件分支逻辑:
public class ConditionNode extends AbsNode {
public ConditionNode(String id, JSONObject properties) {
super(id, properties);
this.setType(CONDITION.getKey());
}
@Data
public static class NodeParams {
private List<Branch> branch;
}
@Data
public static class Branch {
private String id;
private String type; // "and" 或 "or"
private String condition; // 条件组合类型
private List<Condition> conditions; // 条件列表
}
}
3.3.4 循环节点 (LoopNode)
支持数组遍历和固定次数循环:
public class LoopNode extends AbsNode {
public LoopNode(String id, JSONObject properties) {
super(id, properties);
this.setType(LOOP.getKey());
}
@Data
public static class NodeParams {
private String loopType; // "ARRAY" / "LOOP" / "NUMBER"
private List<String> array; // 数组引用
private Integer number; // 固定次数
private JSONObject loopBody; // 循环体工作流JSON
}
}
四、工作流执行引擎
4.1 执行器抽象设计
AbsWorkflowHandler采用模板方法模式定义工作流执行骨架:
// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/handler/AbsWorkflowHandler.java
public abstract class AbsWorkflowHandler implements IWorkflowHandler {
// 核心执行方法:递归执行节点链
protected void runChainNodes(Workflow workflow, List<AbsNode> nodeList) {
if (nodeList == null || nodeList.isEmpty()) {
return;
}
if (nodeList.size() == 1) {
// 单节点:顺序执行
List<AbsNode> nextNodeList = runChainNode(workflow, nodeList.get(0));
runChainNodes(workflow, nextNodeList); // 递归
} else {
// 多节点:并行执行
List<CompletableFuture<List<AbsNode>>> futureList = new ArrayList<>();
for (AbsNode node : nodeList) {
futureList.add(CompletableFuture.supplyAsync(
() -> runChainNode(workflow, node)
));
}
// 等待所有并行节点完成
List<List<AbsNode>> nextNodeLists = futureList.stream()
.map(CompletableFuture::join)
.toList();
// 继续执行后续节点
for (List<AbsNode> nextNodeList : nextNodeLists) {
runChainNodes(workflow, nextNodeList);
}
}
}
// 执行单个节点
protected List<AbsNode> runChainNode(Workflow workflow, AbsNode node) {
if (NodeStatus.READY.getStatus() == node.getStatus()
|| NodeStatus.INTERRUPT.getStatus() == node.getStatus()) {
if (workflow.dependentNodeBeenExecuted(node)) {
// 执行节点
NodeResultFuture resultFuture = runNodeFuture(workflow, node);
node.setStatus(resultFuture.getStatus());
// 处理执行结果
NodeResult nodeResult = resultFuture.getResult();
if (nodeResult != null) {
nodeResult.writeContext(node, workflow);
nodeResult.writeDetail(node);
}
// 获取下游节点
return workflow.getNextNodeList(node, nodeResult);
}
} else if (NodeStatus.SKIP.getStatus() == node.getStatus()) {
// 跳过节点:传递跳过状态
List<AbsNode> nextNodeList = workflow.getNextNodeList(node, new NodeResult(Map.of()));
nextNodeList.forEach(nextNode -> {
if (!workflow.isReadyJoinNode(nextNode)) {
nextNode.setStatus(NodeStatus.SKIP.getStatus());
}
});
return nextNodeList;
}
return List.of();
}
// 执行节点并返回Future
protected NodeResultFuture runNodeFuture(Workflow workflow, AbsNode node) {
try {
long startTime = System.currentTimeMillis();
onNodeStart(workflow, node); // 钩子方法
// 获取节点处理器并执行
INodeHandler nodeHandler = NodeHandlerBuilder.getHandler(node.getType());
NodeResult result = nodeHandler.execute(workflow, node);
float runTime = (System.currentTimeMillis() - startTime) / 1000F;
node.getDetail().put("runTime", runTime);
onNodeSuccess(workflow, node, result); // 钩子方法
return new NodeResultFuture(result, null, NodeStatus.SUCCESS.getStatus());
} catch (Exception ex) {
return handleNodeError(workflow, node, ex);
}
}
// 钩子方法:子类可覆写
protected void onNodeStart(Workflow workflow, AbsNode node) {}
protected void onNodeSuccess(Workflow workflow, AbsNode node, NodeResult result) {}
protected NodeResultFuture handleNodeError(Workflow workflow, AbsNode node, Exception ex) {
node.setErrMessage(ex.getMessage());
return new NodeResultFuture(null, ex, NodeStatus.ERROR.getStatus());
}
}
4.2 节点调度策略
4.2.1 依赖检查
在执行节点前,检查其上游节点是否已执行:
// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/Workflow.java
public boolean dependentNodeBeenExecuted(AbsNode node) {
List<String> upNodeIdList = edges.stream()
.filter(edge -> node.getId().equals(edge.getTargetNodeId()))
.map(LfEdge::getSourceNodeId)
.toList();
if (CollectionUtils.isEmpty(upNodeIdList)) {
return true; // 开始节点无上游依赖
}
Set<String> upNodeIdSet = new HashSet<>(upNodeIdList);
return nodes.stream()
.filter(e -> upNodeIdSet.contains(e.getId()))
.allMatch(e -> NodeStatus.SUCCESS.getStatus() == e.getStatus()
|| NodeStatus.SKIP.getStatus() == e.getStatus());
}
4.2.2 并行分支处理
当工作流存在多个并行分支时,使用CompletableFuture实现并行执行:
// 并行执行示例
List<CompletableFuture<List<AbsNode>>> futureList = new ArrayList<>();
for (AbsNode node : nodeList) {
futureList.add(CompletableFuture.supplyAsync(() -> runChainNode(workflow, node)));
}
List<List<AbsNode>> results = futureList.stream()
.map(CompletableFuture::join)
.toList();
4.2.3 Join节点处理
当多个分支汇聚到一个节点时,需要判断是否所有上游分支都已完成:
public boolean isReadyJoinNode(AbsNode node) {
List<String> upNodeIdList = findUpstreamNodeIds(node.getId());
if (upNodeIdList.size() > 1) {
Set<String> upNodeIdSet = new HashSet<>(upNodeIdList);
// 检查是否所有上游节点都是SKIP状态
return !nodes.stream()
.filter(e -> upNodeIdSet.contains(e.getId()))
.allMatch(e -> NodeStatus.SKIP.getStatus() == e.getStatus());
}
return false;
}
4.3 节点处理器机制
采用策略模式,每个节点类型有对应的Handler实现:
// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/handler/node/INodeHandler.java
public interface INodeHandler {
NodeResult execute(Workflow workflow, AbsNode node) throws Exception;
}
4.4 注解驱动的自动注册
通过@NodeHandlerType注解声明处理器支持的节点类型:
// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/annotation/NodeHandlerType.java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface NodeHandlerType {
NodeType[] value();
}
// 使用示例
@NodeHandlerType({NodeType.AI_CHAT, NodeType.IMAGE_UNDERSTAND})
@Component
public class LLMNodeHandler implements INodeHandler {
// ...
}
自动注册器:利用Spring的BeanPostProcessor在Bean初始化后自动注册:
// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/processor/NodeHandlerAutoRegistrar.java
@Component
public class NodeHandlerAutoRegistrar implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof INodeHandler) {
NodeHandlerType annotation = bean.getClass().getAnnotation(NodeHandlerType.class);
if (annotation != null) {
for (NodeType nodeType : annotation.value()) {
NodeHandlerBuilder.registerHandler(nodeType, (INodeHandler) bean);
}
}
}
return bean;
}
}
五、控制流实现
5.1 条件分支原理
条件节点的执行流程:
// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/handler/node/impl/ConditionNodeHandler.java
@NodeHandlerType(NodeType.CONDITION)
@Component
public class ConditionNodeHandler implements INodeHandler {
@Override
public NodeResult execute(Workflow workflow, AbsNode node) throws Exception {
ConditionNode.NodeParams nodeParams = node.getNodeData()
.toJavaObject(ConditionNode.NodeParams.class);
ConditionNode.Branch branch = _execute(workflow, nodeParams.getBranch());
return new NodeResult(Map.of("branchId", branch.getId(), "branchName", branch.getType()));
}
private ConditionNode.Branch _execute(Workflow workflow, List<ConditionNode.Branch> branchList) {
for (ConditionNode.Branch branch : branchList) {
if (ConditionUtil.assertion(workflow, branch.getCondition(), branch.getConditions())) {
return branch;
}
}
return null;
}
}
条件判断工具类:
// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/util/ConditionUtil.java
public class ConditionUtil {
public static boolean assertion(Workflow workflow, String conditionType, List<Condition> conditionList) {
if (conditionList == null || conditionList.isEmpty()) {
return true;
}
boolean isAnd = "and".equals(conditionType);
boolean result = isAnd;
for (Condition cond : conditionList) {
boolean conditionResult = assertion(workflow, cond.getField(), cond.getCompare(), cond.getValue());
if (isAnd) {
result = conditionResult;
if (!result) break; // AND短路
} else {
result = conditionResult;
if (result) break; // OR短路
}
}
return result;
}
private static boolean assertion(Workflow workflow, List<String> fieldList, String compare, String valueToCompare) {
Object fieldValue = workflow.getReferenceField(fieldList);
Compare handler = CompareBuilder.getHandler(compare);
return handler.compare(fieldValue, valueToCompare);
}
}
比较器接口:
// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/compare/Compare.java
public interface Compare {
boolean compare(Object sourceValue, String targetValue);
}
// 实现示例:等于比较器
@CompareType("equals")
@Component
public class EqualCompare implements Compare {
@Override
public boolean compare(Object sourceValue, String targetValue) {
return String.valueOf(sourceValue).equals(targetValue);
}
}
分支路由:
在Workflow.getNextNodeList()中,根据branchId确定下游节点:
private boolean isAssertionNode(String nodeId, NodeResult currentNodeResult, List<LfEdge> sourceEdges) {
List<String> assertionNodeIds = sourceEdges.stream().filter(edge -> {
String branchId = (String) currentNodeResult.getNodeVariable().getOrDefault("branchId", "");
String expectedAnchorId = String.format("%s_%s_right", edge.getSourceNodeId(), branchId);
return expectedAnchorId.equals(edge.getSourceAnchorId());
}).map(LfEdge::getTargetNodeId).toList();
return assertionNodeIds.contains(nodeId);
}
5.2 循环迭代机制
LoopNodeHandler实现了复杂的循环逻辑:
@NodeHandlerType(NodeType.LOOP)
@Component
@RequiredArgsConstructor
public class LoopNodeHandler implements INodeHandler {
private final IWorkFlowActuator workFlowActuator;
@Override
public NodeResult execute(Workflow workflow, AbsNode node) throws Exception {
LoopNode.NodeParams nodeParams = node.getNodeData().toJavaObject(LoopNode.NodeParams.class);
String loopType = nodeParams.getLoopType();
List<JSONObject> loopDetails = new ArrayList<>();
if ("ARRAY".equals(loopType)) {
// 数组遍历
Object value = workflow.getReferenceField(nodeParams.getArray());
List<Object> list = parseLoopArray(value);
loopDetails = generateLoopArray(list, workflow, nodeParams.getLoopBody(), node);
} else if ("LOOP".equals(loopType)) {
// 无限循环(最多1000次)
loopDetails = generateLoopNumber(1000, workflow, nodeParams.getLoopBody(), node);
} else {
// 固定次数
loopDetails = generateLoopNumber(nodeParams.getNumber(), workflow, nodeParams.getLoopBody(), node);
}
node.getDetail().put("loop_node_data", loopDetails);
return new NodeResult(Map.of(), true, this::isInterrupt);
}
private List<JSONObject> generateLoop(List<Object> list, Workflow workflow, JSONObject loopBody, AbsNode node) {
List<JSONObject> loopDetails = new ArrayList<>();
AtomicBoolean breakOuter = new AtomicBoolean(false);
int startIndex = 0;
do {
// 创建循环工作流
Sinks.Many<ChatMessageVO> nodeSink = Sinks.many().unicast().onBackpressureBuffer();
LogicFlow logicFlow = LogicFlow.newInstance(loopBody);
List<AbsNode> nodes = logicFlow.getNodes().stream()
.map(NodeBuilder::getNode)
.filter(Objects::nonNull)
.toList();
LoopParams loopParams = new LoopParams(startIndex, list.get(startIndex));
LoopWorkFlow loopWorkflow = new LoopWorkFlow(workflow, nodes, logicFlow.getEdges(), loopParams, details, nodeSink);
// 异步执行循环体
CompletableFuture<Void> future = CompletableFuture.runAsync(
() -> workFlowActuator.execute(workflow)
);
// 订阅输出并处理中断信号
AtomicBoolean isInterruptExec = new AtomicBoolean(false);
nodeSink.asFlux().subscribe(e -> {
if (LOOP_BREAK.getKey().equals(e.getNodeType()) && "BREAK".equals(e.getContent())) {
breakOuter.set(true); // 捕获break信号
}
// ... 处理输出消息
});
future.join();
loopDetails.add(loopWorkflow.getRuntimeDetails());
if (shouldBreakLoop(breakOuter.get(), isInterruptExec.get())) {
break;
}
startIndex++;
} while (startIndex < list.size());
return loopDetails;
}
}
LoopWorkFlow:循环体内的子工作流,继承自Workflow:
public class LoopWorkFlow extends Workflow {
private LoopParams loopParams;
private Map<String, Object> loopContext;
public LoopWorkFlow(Workflow workflow, List<AbsNode> nodes, List<LfEdge> edges,
LoopParams loopParams, JSONObject details, Sinks.Many<ChatMessageVO> sink) {
super(workflow.getWorkflowMode(), nodes, edges, workflow.getChatParams(), details, sink);
this.loopParams = loopParams;
this.loopContext = new HashMap<>();
// 重写变量解析器,增加loop作用域
this.setVariableResolver(new VariableResolver(this.getWorkflowContext(), this.loopContext));
}
@Override
public AbsNode getStartNode() {
return getNodeClsById(NodeType.LOOP_START.getKey(), List.of(), null);
}
}
5.3 中断与跳过逻辑
中断(INTERRUPT):节点需要等待外部输入(如表单提交)时设置,工作流暂停:
// NodeResult中的中断判断
public boolean isInterruptExec(AbsNode currentNode) {
return this.isInterrupt.apply(currentNode);
}
跳过(SKIP):条件分支未命中的节点被跳过:
// 在条件分支处理中
if (currentNodeResult.isAssertionResult()) {
List<AbsNode> targetNodes = buildNodes(targetNodeIds, currentNode);
targetNodes.forEach(e -> {
if (!isAssertionNode(e.getId(), currentNodeResult, sourceEdges)) {
e.setStatus(NodeStatus.SKIP.getStatus()); // 设置跳过状态
}
});
return targetNodes;
}
六、流式输出与实时通信
6.1 Reactor Sink机制
工作流使用Reactor的Sinks.Many实现实时消息推送:
// 在Workflow中定义
@JsonIgnore
private Sinks.Many<ChatMessageVO> sink;
// 初始化
this.sink = Sinks.many().unicast().onBackpressureBuffer();
6.2 流式AI响应处理
LLMNodeHandler实现了流式Token的接收和推送:
private NodeResult writeContextStream(AiChatNode.NodeParams nodeParams, TokenStream tokenStream,
Workflow workflow, AbsNode node) {
AtomicReference<String> errorMessage = new AtomicReference<>("");
CompletableFuture<ChatResponse> chatResponseFuture = new CompletableFuture<>();
tokenStream
.onPartialThinking(thinking -> {
// 推送推理内容
if (reasoningContentEnable) {
emitMessage(workflow, node, "", thinking.text());
}
})
.onPartialResponse(content -> {
// 推送响应内容
if (isResult) {
emitMessage(workflow, node, content, "");
}
})
.onCompleteResponse(chatResponseFuture::complete)
.onError(error -> {
errorMessage.set(error.getMessage());
chatResponseFuture.complete(ChatResponse.builder().build());
})
.start();
ChatResponse response = chatResponseFuture.join();
return handleChatResponse(response, node, errorMessage.get());
}
private void emitMessage(Workflow workflow, AbsNode node, String content, String reasoning) {
if (workflow.needsSinkOutput()) {
ChatMessageVO vo = node.toChatMessageVO(
workflow.getChatParams().getChatId(),
workflow.getChatParams().getChatRecordId(),
content, reasoning, null, false
);
workflow.getSink().tryEmitNext(vo);
}
}
6.3 前端实时推送
前端通过SSE(Server-Sent Events)或WebSocket订阅Sink,实现实时消息展示:
后端 Sink ──→ SSE/WebSocket ──→ 前端实时渲染
七、设计模式应用分析
7.1 工厂模式 + 注册表模式
应用场景:节点创建
优势:
- 避免大量
switch-case语句 - 符合开闭原则:新增节点只需注册
- 支持运行时动态扩展
代码位置:NodeFactory.java + NodeRegistry.java
7.2 策略模式
应用场景:节点执行逻辑
实现:
INodeHandler接口定义策略- 各
XxxNodeHandler实现具体策略 NodeHandlerBuilder作为策略上下文
优势:
- 节点执行逻辑可独立变化
- 易于测试和维护
7.3 模板方法模式
应用场景:工作流执行流程
实现:
AbsWorkflowHandler定义执行骨架runChainNodes()、runNodeFuture()等模板方法onNodeStart()、onNodeSuccess()等钩子方法
优势:
- 统一的执行流程
- 子类可定制特定行为
7.4 Builder模式
应用场景:节点执行结果构建
实现:
NodeResult result = NodeResult.builder()
.nodeVariable(Map.of("answer", answer))
.streamOutput(true)
.writeContextFunc(this::customWriteContext)
.isInterrupt(this::checkInterrupt)
.build();
优势:
- 灵活构建复杂对象
- 支持默认值和自定义函数
7.5 注解驱动设计
应用场景:处理器自动注册
实现:
@NodeHandlerType声明支持的节点类型NodeHandlerAutoRegistrar自动扫描注册
优势:
- 声明式编程
- 减少配置代码
八、扩展性设计
8.1 自定义节点扩展
步骤:
- 创建节点数据类继承
AbsNode:
public class CustomNode extends AbsNode {
public CustomNode(String id, JSONObject properties) {
super(id, properties);
this.setType("custom-node");
}
@Override
public void saveContext(Workflow workflow, Map<String, Object> detail) {
// 实现上下文保存逻辑
}
}
- 创建处理器实现
INodeHandler:
@NodeHandlerType(NodeType.CUSTOM)
@Component
public class CustomNodeHandler implements INodeHandler {
@Override
public NodeResult execute(Workflow workflow, AbsNode node) throws Exception {
// 实现执行逻辑
return NodeResult.builder().build();
}
}
- 注册节点类型:
NodeBuilder.getFactory().register(NodeType.CUSTOM, CustomNode::new);
8.2 自定义比较器扩展
步骤:
- 实现
Compare接口并添加注解:
@CompareType("regex")
@Component
public class RegexCompare implements Compare {
@Override
public boolean compare(Object sourceValue, String targetValue) {
return Pattern.matches(targetValue, String.valueOf(sourceValue));
}
}
- 系统自动扫描注册
8.3 插件化架构思想
当前架构已具备插件化基础:
- 节点工厂支持运行时注册
- 处理器通过注解自动发现
- 比较器通过注解自动发现
未来可扩展:
- 定义插件接口和生命周期
- 实现插件加载器
- 支持热插拔
九、性能优化考量
9.1 并行执行优化
工作流引擎自动识别并行分支并使用CompletableFuture并行执行:
if (nodeList.size() > 1) {
List<CompletableFuture<List<AbsNode>>> futureList = new ArrayList<>();
for (AbsNode node : nodeList) {
futureList.add(CompletableFuture.supplyAsync(() -> runChainNode(workflow, node)));
}
// 等待所有分支完成
}
注意事项:
- 共享状态的线程安全(如
WorkflowContext使用CopyOnWriteArrayList) - 异常处理和传播
9.2 缓存策略
节点查找优化:
// Workflow中使用Map替代List遍历
private Map<String, AbsNode> nodeMap; // O(1)查找
// NodeType枚举中使用静态Map
private static final Map<String, NodeType> KEY_MAP;
模板渲染缓存:可考虑对相同Prompt的渲染结果进行缓存
9.3 内存管理
上下文清理:工作流执行完成后,清理不必要的上下文引用
大对象处理:对于大文件(如图片、文档),使用流式处理而非全量加载
十、总结与展望
10.1 架构亮点
- 清晰的分层设计:API层与实现层分离,职责明确
- 丰富的设计模式:工厂、策略、模板方法、Builder等模式综合运用
- 高度可扩展:通过注册表和注解驱动,支持自定义扩展
- 流式优先:原生支持AI流式输出,用户体验优秀
- 多级作用域:四级变量作用域,满足复杂业务场景
10.2 适用场景
- AI对话应用
- 智能客服系统
- 知识库问答
- 复杂业务流程编排
- 多模态AI应用
10.3 未来展望
- 可视化调试:支持节点断点、变量监控
- 版本管理:工作流版本控制和回滚
- 性能监控:节点执行耗时统计和瓶颈分析
- 更多AI能力:支持更多模型和多模态能力
- 插件生态:构建第三方节点插件市场
附录:关键类索引
| 类名 | 路径 | 说明 |
|---|---|---|
AbsNode |
workflow-api/node/AbsNode.java |
节点抽象基类 |
Workflow |
workflow-api/model/Workflow.java |
工作流模型 |
WorkflowContext |
workflow-api/model/WorkflowContext.java |
上下文管理 |
VariableResolver |
workflow-api/model/VariableResolver.java |
变量解析 |
TemplateRenderer |
workflow-api/model/TemplateRenderer.java |
模板渲染 |
NodeFactory |
workflow-api/factory/NodeFactory.java |
节点工厂 |
NodeRegistry |
workflow-api/factory/NodeRegistry.java |
节点注册表 |
INodeHandler |
workflow/handler/node/INodeHandler.java |
节点处理器接口 |
AbsWorkflowHandler |
workflow/handler/AbsWorkflowHandler.java |
执行器基类 |
NodeResult |
workflow-api/model/NodeResult.java |
节点执行结果 |
ConditionUtil |
workflow/util/ConditionUtil.java |
条件判断工具 |
Compare |
workflow/compare/Compare.java |
比较器接口 |
LogicFlow |
workflow-api/logic/LogicFlow.java |
图结构表示 |
LfEdge |
workflow-api/logic/LfEdge.java |
边定义 |
NodeType |
workflow-api/enums/NodeType.java |
节点类型枚举 |
NodeStatus |
workflow-api/enums/NodeStatus.java |
节点状态枚举 |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)