MaxKB4j 节点工作流引擎技术原理与实现

项目地址:https://gitee.com/taisan/MaxKB4j | Gitee GVP 认证

一、概述与架构设计

1.1 工作流引擎定位

MaxKB4j是一个基于Java的智能知识库系统,其核心特性之一是可视化工作流编排能力。工作流引擎作为系统的"神经中枢",负责将用户通过拖拽配置的节点图转化为可执行的业务逻辑流程,支持AI对话、知识检索、条件分支、循环迭代等复杂场景。

与传统的工作流引擎(如Activiti、Flowable)不同,MaxKB4j的工作流引擎专门为AI应用场景设计,具有以下特点:

  • 可视化编排:基于LogicFlow的前端图编辑,后端直接解析执行
  • 流式响应:原生支持AI大模型的流式输出,实时推送到前端
  • 多模态支持:支持文本、图片、语音、文档等多种输入输出
  • 智能节点:内置AI对话、知识库检索、意图识别等AI专用节点
  • 动态变量:支持跨节点的变量引用和模板渲染

1.2 整体架构

┌─────────────────────────────────────────────────────────────────┐
│                        前端层 (LogicFlow)                         │
│              ┌─────────────────────────────────┐                 │
│              │    节点拖拽 → JSON配置 → API调用   │                 │
│              └─────────────────────────────────┘                 │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                      maxkb4j-workflow-api                        │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │   AbsNode   │  │  Workflow   │  │   WorkflowContext       │  │
│  │  (节点抽象)  │  │  (工作流模型)│  │   (上下文管理)          │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐  │
│  │NodeFactory  │  │VariableRes. │  │  TemplateRenderer       │  │
│  │  (节点工厂)  │  │ (变量解析)   │  │   (模板渲染)            │  │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                       maxkb4j-workflow                           │
│  ┌─────────────────────┐  ┌─────────────────────────────────┐   │
│  │ AbsWorkflowHandler  │  │        INodeHandler             │   │
│  │    (执行器基类)      │  │    ┌─────────────────────┐     │   │
│  └─────────────────────┘  │    │ LLMNodeHandler      │     │   │
│  ┌─────────────────────┐  │    │ ConditionNodeHandler│     │   │
│  │ChatWorkflowHandler  │  │    │ LoopNodeHandler     │     │   │
│  │ KnowledgeWorkf...   │  │    │ SearchKnowledgeH... │     │   │
│  └─────────────────────┘  │    └─────────────────────┘     │   │
│                           └─────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

1.3 模块职责划分

模块 路径 职责
maxkb4j-workflow-api 服务API层 定义核心抽象、数据模型、工厂接口,供其他模块依赖
maxkb4j-workflow 服务实现层 实现工作流执行逻辑、节点处理器、条件判断等具体功能

这种分层设计遵循了依赖倒置原则:实现层依赖API层的抽象,便于替换实现和进行单元测试。


二、核心概念与数据模型

2.1 节点抽象设计

节点是工作流的基本执行单元。所有节点类型都继承自AbsNode抽象基类:

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/node/AbsNode.java
@Data
public abstract class AbsNode {
    // 节点标识
    private String id;              // 静态ID(来自前端配置)
    private String type;            // 节点类型标识
    private String runtimeNodeId;   // 运行时ID(动态生成)
    private String viewType;        // 视图类型

    // 节点配置
    private JSONObject properties;  // 节点属性(来自前端)
    private List<String> upNodeIdList; // 上游节点ID列表

    // 执行状态
    protected Map<String, Object> context;  // 节点上下文
    protected Map<String, Object> detail;   // 执行详情
    private Integer status;          // 节点状态
    private String errMessage;       // 错误信息

    // 输出
    private String answerText;       // 回答文本

    // 抽象方法:子类实现具体的上下文恢复逻辑
    public abstract void saveContext(Workflow workflow, Map<String, Object> detail);
}

设计要点

  1. 双ID机制

    • id:静态ID,对应前端节点配置,在整个工作流生命周期内不变
    • runtimeNodeId:运行时ID,结合上游节点路径生成,用于区分循环节点的不同迭代
    private String generateRuntimeNodeId() {
        return NodeIdGenerator.generateRuntimeNodeId(id, upNodeIdList);
    }
    
  2. 模板方法模式saveContext()是抽象方法,由各节点类型自行实现上下文恢复逻辑

  3. 状态机:节点状态通过NodeStatus枚举定义:

    public enum NodeStatus {
        READY(0),      // 就绪
        SUCCESS(1),    // 成功
        ERROR(2),      // 错误
        INTERRUPT(3),  // 中断(等待用户输入)
        SKIP(4);       // 跳过
    }
    

2.2 边与图的表示

工作流本质上是一个有向无环图(DAG),节点之间通过边连接:

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/logic/LfEdge.java
@Data
public class LfEdge {
    private String id;              // 边ID
    private String type;            // 边类型
    private String sourceNodeId;    // 源节点ID
    private String targetNodeId;    // 目标节点ID
    private String sourceAnchorId;  // 源锚点ID(用于条件分支)
    private String targetAnchorId;  // 目标锚点ID
    private List<LfPoint> pointsList; // 绘制点列表
    private JSONObject properties;  // 边属性
}

条件分支的实现:通过sourceAnchorId标识条件分支。例如,条件节点有两个分支,分别对应sourceNodeId_rightsourceNodeId_left

                    ┌── right ──→ Node A
[Condition Node] ───┤
                    └── left ───→ Node B

LogicFlow类封装了完整的图结构:

@Data
public class LogicFlow {
    private List<LfNode> nodes;  // 节点列表
    private List<LfEdge> edges;  // 边列表

    public static LogicFlow newInstance(JSONObject flowJson) {
        return JSON.parseObject(flowJson.toJSONString(), new TypeReference<>() {});
    }
}

2.3 工作流上下文管理

WorkflowContext实现了三级作用域的变量管理:

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/WorkflowContext.java
@Data
public class WorkflowContext {
    // 全局变量:跨会话共享的配置变量
    private final Map<String, Object> globalContext;

    // 聊天变量:当前会话的变量(如用户信息、会话参数)
    private final Map<String, Object> chatContext;

    // 节点变量:各节点的执行结果
    private final List<AbsNode> nodeContext;

    public void appendNode(AbsNode currentNode) {
        // 支持更新已存在的节点上下文
        for (int i = 0; i < this.nodeContext.size(); i++) {
            AbsNode node = this.nodeContext.get(i);
            if (currentNode.getId().equals(node.getId())
                && currentNode.getRuntimeNodeId().equals(node.getRuntimeNodeId())) {
                this.nodeContext.set(i, currentNode);
                return;
            }
        }
        this.nodeContext.add(currentNode);
    }
}

变量作用域层级

┌─────────────────────────────────────────┐
│           Global Context                 │  ← 应用级配置
│  (全局变量:模型配置、系统参数等)          │
├─────────────────────────────────────────┤
│           Chat Context                   │  ← 会话级变量
│  (聊天变量:用户信息、会话参数等)          │
├─────────────────────────────────────────┤
│           Node Context                   │  ← 节点级变量
│  (节点变量:各节点的执行结果)              │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐   │
│  │ Node 1  │ │ Node 2  │ │ Node N  │   │
│  │ answer  │ │ result  │ │ output  │   │
│  └─────────┘ └─────────┘ └─────────┘   │
└─────────────────────────────────────────┘

2.4 变量解析机制

VariableResolver负责将各级上下文变量解析为模板可用的格式:

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/VariableResolver.java
public class VariableResolver {
    private final WorkflowContext context;
    private final Map<String, Object> loopContext; // 循环变量(可选)

    // 获取所有模板变量:{scope.variable: value} 格式
    public Map<String, Object> getPromptVariables() {
        Map<String, Object> result = new HashMap<>();

        // 全局变量: global.xxx
        if (context.getGlobalContext() != null) {
            for (Map.Entry<String, Object> entry : context.getGlobalContext().entrySet()) {
                result.put("global." + entry.getKey(), entry.getValue());
            }
        }

        // 聊天变量: chat.xxx
        if (context.getChatContext() != null) {
            for (Map.Entry<String, Object> entry : context.getChatContext().entrySet()) {
                result.put("chat." + entry.getKey(), entry.getValue());
            }
        }

        // 循环变量: loop.xxx
        if (loopContext != null) {
            for (Map.Entry<String, Object> entry : loopContext.entrySet()) {
                result.put("loop." + entry.getKey(), entry.getValue());
            }
        }

        // 节点变量: nodeName.variableName
        if (context.getNodeContext() != null) {
            for (AbsNode node : context.getNodeContext()) {
                result.putAll(getNodeVariables(node));
            }
        }

        return result;
    }
}

变量引用示例

在AI对话节点的Prompt中,可以这样引用变量:

你是一个智能助手,用户名是 {{chat.userName}}。

上一轮对话结果是:{{AI对话节点.answer}}

当前循环索引:{{loop.index}}

2.5 模板渲染

TemplateRenderer基于LangChain4j的PromptTemplate实现变量替换:

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/TemplateRenderer.java
public record TemplateRenderer(VariableResolver variableResolver) {

    public String render(String prompt) {
        return render(prompt, variableResolver.getPromptVariables());
    }

    public String render(String prompt, Map<String, Object> addVariables) {
        if (StringUtils.isBlank(prompt)) {
            return "";
        }
        Map<String, Object> variables = new HashMap<>(variableResolver.getPromptVariables());
        if (addVariables != null) {
            variables.putAll(addVariables); // 额外变量可覆盖
        }
        PromptTemplate promptTemplate = PromptTemplate.from(prompt);
        return promptTemplate.apply(variables).text();
    }
}

三、节点类型体系

3.1 节点类型枚举

系统内置了丰富的节点类型,覆盖AI应用的常见场景:

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/enums/NodeType.java
@Getter
@AllArgsConstructor
public enum NodeType {
    // 基础节点
    BASE("base-node", "基础节点"),
    START("start-node", "开始节点"),

    // 流程控制节点
    CONDITION("condition-node", "条件节点"),
    LOOP("loop-node", "循环节点"),
    LOOP_START("loop-start-node", "循环开始"),
    LOOP_BREAK("loop-break-node", "循环跳出"),
    LOOP_CONTINUE("loop-continue-node", "循环继续"),

    // AI相关节点
    AI_CHAT("ai-chat-node", "智能聊天节点"),
    IMAGE_GENERATE("image-generate-node", "图片生成"),
    IMAGE_UNDERSTAND("image-understand-node", "图片理解"),
    SPEECH_TO_TEXT("speech-to-text-node", "语音转文字"),
    TEXT_TO_SPEECH("text-to-speech-node", "文字转语音"),

    // 知识库节点
    SEARCH_KNOWLEDGE("search-knowledge-node", "知识库搜索节点"),
    RERANKER("reranker-node", "多路召回"),
    KNOWLEDGE_WRITE("knowledge-write-node", "知识库写入节点"),

    // 数据处理节点
    VARIABLE_ASSIGN("variable-assign-node", "变量赋值"),
    VARIABLE_AGGREGATE("variable-aggregation-node", "变量聚合"),
    PARAMETER_EXTRACTION("parameter-extraction-node", "参数提取节点"),
    NL2SQL("nl2sql-node", "自然语言转SQL节点"),

    // 工具节点
    TOOL("tool-node", "自定义函数节点"),
    TOOL_LIB("tool-lib-node", "工具库节点"),
    HTTP_CLIENT("http-node", "HTTP请求节点"),
    MCP("mcp-node", "MCP节点"),

    // 交互节点
    FORM("form-node", "表单收集"),
    USER_SELECT("user-select-node", "用户选择节点"),
    QUESTION("question-node", "问题节点"),
    REPLY("reply-node", "回复节点");

    private final String key;
    private final String name;

    // O(1)查找优化
    private static final Map<String, NodeType> KEY_MAP;
    static {
        KEY_MAP = Arrays.stream(values())
            .collect(Collectors.toUnmodifiableMap(NodeType::getKey, Function.identity()));
    }
}

3.2 节点工厂与注册表模式

采用工厂模式 + 注册表模式实现节点的动态创建,符合开闭原则

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/factory/NodeFactory.java
@Getter
public class NodeFactory {
    @FunctionalInterface
    public interface NodeCreator {
        AbsNode create(String id, JSONObject properties);
    }

    private final NodeRegistry registry;

    public NodeFactory() {
        this.registry = new NodeRegistry();
        registerDefaultNodes();
    }

    // 注册默认节点类型(避免switch语句)
    private void registerDefaultNodes() {
        register(NodeType.START, StartNode::new);
        register(NodeType.AI_CHAT, AiChatNode::new);
        register(NodeType.CONDITION, ConditionNode::new);
        register(NodeType.LOOP, LoopNode::new);
        register(NodeType.SEARCH_KNOWLEDGE, SearchKnowledgeNode::new);
        // ... 其他节点类型
    }

    public void register(NodeType nodeType, NodeCreator creator) {
        registry.register(nodeType.getKey(), creator);
    }

    public AbsNode createNode(LfNode lfNode) {
        NodeCreator creator = registry.getCreator(lfNode.getType());
        return creator.create(lfNode.getId(), lfNode.getProperties());
    }
}

注册表实现

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/factory/NodeRegistry.java
public class NodeRegistry {
    private final Map<String, NodeFactory.NodeCreator> creators;

    public NodeRegistry() {
        this.creators = new ConcurrentHashMap<>(); // 线程安全
    }

    public void register(String nodeType, NodeFactory.NodeCreator creator) {
        creators.put(nodeType, creator);
    }

    public NodeFactory.NodeCreator getCreator(String nodeType) {
        return creators.get(nodeType);
    }
}

优势

  • 新增节点类型只需注册,无需修改工厂代码
  • 支持运行时动态注册(如插件机制)
  • 线程安全的注册表设计

3.3 核心节点类型详解

3.3.1 开始节点 (StartNode)

作为工作流的入口节点,负责初始化变量和加载上下文:

public class StartNode extends AbsNode {
    public StartNode(String id, JSONObject properties) {
        super(id, properties);
        super.setType(START.getKey());
    }

    @Override
    public void saveContext(Workflow workflow, Map<String, Object> detail) {
        // 保存用户输入
        context.put("question", detail.get("question"));
        context.put("image", detail.get("imageList"));
        context.put("document", detail.get("documentList"));
        context.put("audio", detail.get("audioList"));

        // 加载全局变量到工作流上下文
        JSONArray globalFields = (JSONArray) detail.get("globalFields");
        for (int i = 0; i < globalFields.size(); i++) {
            JSONObject globalField = globalFields.getJSONObject(i);
            workflow.getContext().put(globalField.getString("key"), globalField.get("value"));
        }

        // 加载聊天变量
        String chatId = (String) workflow.getContext().get("chatId");
        ChatInfo chatInfo = ChatCache.get(chatId);
        workflow.getChatContext().putAll(chatInfo.getChatVariables());
    }
}
3.3.2 AI对话节点 (AiChatNode)

与LLM交互的核心节点,支持流式输出和工具调用:

public class AiChatNode extends AbsNode {
    public AiChatNode(String id, JSONObject properties) {
        super(id, properties);
        super.setType(AI_CHAT.getKey());
    }

    @Override
    public void saveContext(Workflow workflow, Map<String, Object> detail) {
        context.put("answer", detail.get("answer"));
        context.put("reasoningContent", detail.get("reasoningContent"));
    }

    @Data
    public static class NodeParams {
        private String modelId;           // 模型ID
        private String system;            // 系统提示词
        private String prompt;            // 用户提示词
        private int dialogueNumber;       // 历史对话轮数
        private Boolean isResult;         // 是否返回结果
        private JSONObject modelParamsSetting; // 模型参数
        private List<String> toolIds;     // 工具ID列表
        private List<String> imageList;   // 图片字段引用
    }
}
3.3.3 条件节点 (ConditionNode)

实现条件分支逻辑:

public class ConditionNode extends AbsNode {
    public ConditionNode(String id, JSONObject properties) {
        super(id, properties);
        this.setType(CONDITION.getKey());
    }

    @Data
    public static class NodeParams {
        private List<Branch> branch;
    }

    @Data
    public static class Branch {
        private String id;
        private String type;           // "and" 或 "or"
        private String condition;      // 条件组合类型
        private List<Condition> conditions; // 条件列表
    }
}
3.3.4 循环节点 (LoopNode)

支持数组遍历和固定次数循环:

public class LoopNode extends AbsNode {
    public LoopNode(String id, JSONObject properties) {
        super(id, properties);
        this.setType(LOOP.getKey());
    }

    @Data
    public static class NodeParams {
        private String loopType;       // "ARRAY" / "LOOP" / "NUMBER"
        private List<String> array;    // 数组引用
        private Integer number;        // 固定次数
        private JSONObject loopBody;   // 循环体工作流JSON
    }
}

四、工作流执行引擎

4.1 执行器抽象设计

AbsWorkflowHandler采用模板方法模式定义工作流执行骨架:

// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/handler/AbsWorkflowHandler.java
public abstract class AbsWorkflowHandler implements IWorkflowHandler {

    // 核心执行方法:递归执行节点链
    protected void runChainNodes(Workflow workflow, List<AbsNode> nodeList) {
        if (nodeList == null || nodeList.isEmpty()) {
            return;
        }

        if (nodeList.size() == 1) {
            // 单节点:顺序执行
            List<AbsNode> nextNodeList = runChainNode(workflow, nodeList.get(0));
            runChainNodes(workflow, nextNodeList); // 递归
        } else {
            // 多节点:并行执行
            List<CompletableFuture<List<AbsNode>>> futureList = new ArrayList<>();
            for (AbsNode node : nodeList) {
                futureList.add(CompletableFuture.supplyAsync(
                    () -> runChainNode(workflow, node)
                ));
            }
            // 等待所有并行节点完成
            List<List<AbsNode>> nextNodeLists = futureList.stream()
                .map(CompletableFuture::join)
                .toList();
            // 继续执行后续节点
            for (List<AbsNode> nextNodeList : nextNodeLists) {
                runChainNodes(workflow, nextNodeList);
            }
        }
    }

    // 执行单个节点
    protected List<AbsNode> runChainNode(Workflow workflow, AbsNode node) {
        if (NodeStatus.READY.getStatus() == node.getStatus()
            || NodeStatus.INTERRUPT.getStatus() == node.getStatus()) {

            if (workflow.dependentNodeBeenExecuted(node)) {
                // 执行节点
                NodeResultFuture resultFuture = runNodeFuture(workflow, node);
                node.setStatus(resultFuture.getStatus());

                // 处理执行结果
                NodeResult nodeResult = resultFuture.getResult();
                if (nodeResult != null) {
                    nodeResult.writeContext(node, workflow);
                    nodeResult.writeDetail(node);
                }

                // 获取下游节点
                return workflow.getNextNodeList(node, nodeResult);
            }
        } else if (NodeStatus.SKIP.getStatus() == node.getStatus()) {
            // 跳过节点:传递跳过状态
            List<AbsNode> nextNodeList = workflow.getNextNodeList(node, new NodeResult(Map.of()));
            nextNodeList.forEach(nextNode -> {
                if (!workflow.isReadyJoinNode(nextNode)) {
                    nextNode.setStatus(NodeStatus.SKIP.getStatus());
                }
            });
            return nextNodeList;
        }
        return List.of();
    }

    // 执行节点并返回Future
    protected NodeResultFuture runNodeFuture(Workflow workflow, AbsNode node) {
        try {
            long startTime = System.currentTimeMillis();
            onNodeStart(workflow, node); // 钩子方法

            // 获取节点处理器并执行
            INodeHandler nodeHandler = NodeHandlerBuilder.getHandler(node.getType());
            NodeResult result = nodeHandler.execute(workflow, node);

            float runTime = (System.currentTimeMillis() - startTime) / 1000F;
            node.getDetail().put("runTime", runTime);

            onNodeSuccess(workflow, node, result); // 钩子方法
            return new NodeResultFuture(result, null, NodeStatus.SUCCESS.getStatus());
        } catch (Exception ex) {
            return handleNodeError(workflow, node, ex);
        }
    }

    // 钩子方法:子类可覆写
    protected void onNodeStart(Workflow workflow, AbsNode node) {}
    protected void onNodeSuccess(Workflow workflow, AbsNode node, NodeResult result) {}
    protected NodeResultFuture handleNodeError(Workflow workflow, AbsNode node, Exception ex) {
        node.setErrMessage(ex.getMessage());
        return new NodeResultFuture(null, ex, NodeStatus.ERROR.getStatus());
    }
}

4.2 节点调度策略

4.2.1 依赖检查

在执行节点前,检查其上游节点是否已执行:

// 文件: maxkb4j-workflow-api/src/main/java/com/maxkb4j/workflow/model/Workflow.java
public boolean dependentNodeBeenExecuted(AbsNode node) {
    List<String> upNodeIdList = edges.stream()
        .filter(edge -> node.getId().equals(edge.getTargetNodeId()))
        .map(LfEdge::getSourceNodeId)
        .toList();

    if (CollectionUtils.isEmpty(upNodeIdList)) {
        return true; // 开始节点无上游依赖
    }

    Set<String> upNodeIdSet = new HashSet<>(upNodeIdList);
    return nodes.stream()
        .filter(e -> upNodeIdSet.contains(e.getId()))
        .allMatch(e -> NodeStatus.SUCCESS.getStatus() == e.getStatus()
                    || NodeStatus.SKIP.getStatus() == e.getStatus());
}
4.2.2 并行分支处理

当工作流存在多个并行分支时,使用CompletableFuture实现并行执行:

// 并行执行示例
List<CompletableFuture<List<AbsNode>>> futureList = new ArrayList<>();
for (AbsNode node : nodeList) {
    futureList.add(CompletableFuture.supplyAsync(() -> runChainNode(workflow, node)));
}
List<List<AbsNode>> results = futureList.stream()
    .map(CompletableFuture::join)
    .toList();
4.2.3 Join节点处理

当多个分支汇聚到一个节点时,需要判断是否所有上游分支都已完成:

public boolean isReadyJoinNode(AbsNode node) {
    List<String> upNodeIdList = findUpstreamNodeIds(node.getId());
    if (upNodeIdList.size() > 1) {
        Set<String> upNodeIdSet = new HashSet<>(upNodeIdList);
        // 检查是否所有上游节点都是SKIP状态
        return !nodes.stream()
            .filter(e -> upNodeIdSet.contains(e.getId()))
            .allMatch(e -> NodeStatus.SKIP.getStatus() == e.getStatus());
    }
    return false;
}

4.3 节点处理器机制

采用策略模式,每个节点类型有对应的Handler实现:

// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/handler/node/INodeHandler.java
public interface INodeHandler {
    NodeResult execute(Workflow workflow, AbsNode node) throws Exception;
}

4.4 注解驱动的自动注册

通过@NodeHandlerType注解声明处理器支持的节点类型:

// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/annotation/NodeHandlerType.java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface NodeHandlerType {
    NodeType[] value();
}

// 使用示例
@NodeHandlerType({NodeType.AI_CHAT, NodeType.IMAGE_UNDERSTAND})
@Component
public class LLMNodeHandler implements INodeHandler {
    // ...
}

自动注册器:利用Spring的BeanPostProcessor在Bean初始化后自动注册:

// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/processor/NodeHandlerAutoRegistrar.java
@Component
public class NodeHandlerAutoRegistrar implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (bean instanceof INodeHandler) {
            NodeHandlerType annotation = bean.getClass().getAnnotation(NodeHandlerType.class);
            if (annotation != null) {
                for (NodeType nodeType : annotation.value()) {
                    NodeHandlerBuilder.registerHandler(nodeType, (INodeHandler) bean);
                }
            }
        }
        return bean;
    }
}

五、控制流实现

5.1 条件分支原理

条件节点的执行流程:

// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/handler/node/impl/ConditionNodeHandler.java
@NodeHandlerType(NodeType.CONDITION)
@Component
public class ConditionNodeHandler implements INodeHandler {

    @Override
    public NodeResult execute(Workflow workflow, AbsNode node) throws Exception {
        ConditionNode.NodeParams nodeParams = node.getNodeData()
            .toJavaObject(ConditionNode.NodeParams.class);
        ConditionNode.Branch branch = _execute(workflow, nodeParams.getBranch());
        return new NodeResult(Map.of("branchId", branch.getId(), "branchName", branch.getType()));
    }

    private ConditionNode.Branch _execute(Workflow workflow, List<ConditionNode.Branch> branchList) {
        for (ConditionNode.Branch branch : branchList) {
            if (ConditionUtil.assertion(workflow, branch.getCondition(), branch.getConditions())) {
                return branch;
            }
        }
        return null;
    }
}

条件判断工具类

// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/util/ConditionUtil.java
public class ConditionUtil {

    public static boolean assertion(Workflow workflow, String conditionType, List<Condition> conditionList) {
        if (conditionList == null || conditionList.isEmpty()) {
            return true;
        }
        boolean isAnd = "and".equals(conditionType);
        boolean result = isAnd;

        for (Condition cond : conditionList) {
            boolean conditionResult = assertion(workflow, cond.getField(), cond.getCompare(), cond.getValue());
            if (isAnd) {
                result = conditionResult;
                if (!result) break; // AND短路
            } else {
                result = conditionResult;
                if (result) break; // OR短路
            }
        }
        return result;
    }

    private static boolean assertion(Workflow workflow, List<String> fieldList, String compare, String valueToCompare) {
        Object fieldValue = workflow.getReferenceField(fieldList);
        Compare handler = CompareBuilder.getHandler(compare);
        return handler.compare(fieldValue, valueToCompare);
    }
}

比较器接口

// 文件: maxkb4j-workflow/src/main/java/com/maxkb4j/workflow/compare/Compare.java
public interface Compare {
    boolean compare(Object sourceValue, String targetValue);
}

// 实现示例:等于比较器
@CompareType("equals")
@Component
public class EqualCompare implements Compare {
    @Override
    public boolean compare(Object sourceValue, String targetValue) {
        return String.valueOf(sourceValue).equals(targetValue);
    }
}

分支路由

Workflow.getNextNodeList()中,根据branchId确定下游节点:

private boolean isAssertionNode(String nodeId, NodeResult currentNodeResult, List<LfEdge> sourceEdges) {
    List<String> assertionNodeIds = sourceEdges.stream().filter(edge -> {
        String branchId = (String) currentNodeResult.getNodeVariable().getOrDefault("branchId", "");
        String expectedAnchorId = String.format("%s_%s_right", edge.getSourceNodeId(), branchId);
        return expectedAnchorId.equals(edge.getSourceAnchorId());
    }).map(LfEdge::getTargetNodeId).toList();
    return assertionNodeIds.contains(nodeId);
}

5.2 循环迭代机制

LoopNodeHandler实现了复杂的循环逻辑:

@NodeHandlerType(NodeType.LOOP)
@Component
@RequiredArgsConstructor
public class LoopNodeHandler implements INodeHandler {

    private final IWorkFlowActuator workFlowActuator;

    @Override
    public NodeResult execute(Workflow workflow, AbsNode node) throws Exception {
        LoopNode.NodeParams nodeParams = node.getNodeData().toJavaObject(LoopNode.NodeParams.class);
        String loopType = nodeParams.getLoopType();

        List<JSONObject> loopDetails = new ArrayList<>();

        if ("ARRAY".equals(loopType)) {
            // 数组遍历
            Object value = workflow.getReferenceField(nodeParams.getArray());
            List<Object> list = parseLoopArray(value);
            loopDetails = generateLoopArray(list, workflow, nodeParams.getLoopBody(), node);
        } else if ("LOOP".equals(loopType)) {
            // 无限循环(最多1000次)
            loopDetails = generateLoopNumber(1000, workflow, nodeParams.getLoopBody(), node);
        } else {
            // 固定次数
            loopDetails = generateLoopNumber(nodeParams.getNumber(), workflow, nodeParams.getLoopBody(), node);
        }

        node.getDetail().put("loop_node_data", loopDetails);
        return new NodeResult(Map.of(), true, this::isInterrupt);
    }

    private List<JSONObject> generateLoop(List<Object> list, Workflow workflow, JSONObject loopBody, AbsNode node) {
        List<JSONObject> loopDetails = new ArrayList<>();
        AtomicBoolean breakOuter = new AtomicBoolean(false);
        int startIndex = 0;

        do {
            // 创建循环工作流
            Sinks.Many<ChatMessageVO> nodeSink = Sinks.many().unicast().onBackpressureBuffer();
            LogicFlow logicFlow = LogicFlow.newInstance(loopBody);

            List<AbsNode> nodes = logicFlow.getNodes().stream()
                .map(NodeBuilder::getNode)
                .filter(Objects::nonNull)
                .toList();

            LoopParams loopParams = new LoopParams(startIndex, list.get(startIndex));
            LoopWorkFlow loopWorkflow = new LoopWorkFlow(workflow, nodes, logicFlow.getEdges(), loopParams, details, nodeSink);

            // 异步执行循环体
            CompletableFuture<Void> future = CompletableFuture.runAsync(
                () -> workFlowActuator.execute(workflow)
            );

            // 订阅输出并处理中断信号
            AtomicBoolean isInterruptExec = new AtomicBoolean(false);
            nodeSink.asFlux().subscribe(e -> {
                if (LOOP_BREAK.getKey().equals(e.getNodeType()) && "BREAK".equals(e.getContent())) {
                    breakOuter.set(true); // 捕获break信号
                }
                // ... 处理输出消息
            });

            future.join();
            loopDetails.add(loopWorkflow.getRuntimeDetails());

            if (shouldBreakLoop(breakOuter.get(), isInterruptExec.get())) {
                break;
            }
            startIndex++;
        } while (startIndex < list.size());

        return loopDetails;
    }
}

LoopWorkFlow:循环体内的子工作流,继承自Workflow

public class LoopWorkFlow extends Workflow {
    private LoopParams loopParams;
    private Map<String, Object> loopContext;

    public LoopWorkFlow(Workflow workflow, List<AbsNode> nodes, List<LfEdge> edges,
                        LoopParams loopParams, JSONObject details, Sinks.Many<ChatMessageVO> sink) {
        super(workflow.getWorkflowMode(), nodes, edges, workflow.getChatParams(), details, sink);
        this.loopParams = loopParams;
        this.loopContext = new HashMap<>();
        // 重写变量解析器,增加loop作用域
        this.setVariableResolver(new VariableResolver(this.getWorkflowContext(), this.loopContext));
    }

    @Override
    public AbsNode getStartNode() {
        return getNodeClsById(NodeType.LOOP_START.getKey(), List.of(), null);
    }
}

5.3 中断与跳过逻辑

中断(INTERRUPT):节点需要等待外部输入(如表单提交)时设置,工作流暂停:

// NodeResult中的中断判断
public boolean isInterruptExec(AbsNode currentNode) {
    return this.isInterrupt.apply(currentNode);
}

跳过(SKIP):条件分支未命中的节点被跳过:

// 在条件分支处理中
if (currentNodeResult.isAssertionResult()) {
    List<AbsNode> targetNodes = buildNodes(targetNodeIds, currentNode);
    targetNodes.forEach(e -> {
        if (!isAssertionNode(e.getId(), currentNodeResult, sourceEdges)) {
            e.setStatus(NodeStatus.SKIP.getStatus()); // 设置跳过状态
        }
    });
    return targetNodes;
}

六、流式输出与实时通信

6.1 Reactor Sink机制

工作流使用Reactor的Sinks.Many实现实时消息推送:

// 在Workflow中定义
@JsonIgnore
private Sinks.Many<ChatMessageVO> sink;

// 初始化
this.sink = Sinks.many().unicast().onBackpressureBuffer();

6.2 流式AI响应处理

LLMNodeHandler实现了流式Token的接收和推送:

private NodeResult writeContextStream(AiChatNode.NodeParams nodeParams, TokenStream tokenStream,
                                      Workflow workflow, AbsNode node) {
    AtomicReference<String> errorMessage = new AtomicReference<>("");
    CompletableFuture<ChatResponse> chatResponseFuture = new CompletableFuture<>();

    tokenStream
        .onPartialThinking(thinking -> {
            // 推送推理内容
            if (reasoningContentEnable) {
                emitMessage(workflow, node, "", thinking.text());
            }
        })
        .onPartialResponse(content -> {
            // 推送响应内容
            if (isResult) {
                emitMessage(workflow, node, content, "");
            }
        })
        .onCompleteResponse(chatResponseFuture::complete)
        .onError(error -> {
            errorMessage.set(error.getMessage());
            chatResponseFuture.complete(ChatResponse.builder().build());
        })
        .start();

    ChatResponse response = chatResponseFuture.join();
    return handleChatResponse(response, node, errorMessage.get());
}

private void emitMessage(Workflow workflow, AbsNode node, String content, String reasoning) {
    if (workflow.needsSinkOutput()) {
        ChatMessageVO vo = node.toChatMessageVO(
            workflow.getChatParams().getChatId(),
            workflow.getChatParams().getChatRecordId(),
            content, reasoning, null, false
        );
        workflow.getSink().tryEmitNext(vo);
    }
}

6.3 前端实时推送

前端通过SSE(Server-Sent Events)或WebSocket订阅Sink,实现实时消息展示:

后端 Sink ──→ SSE/WebSocket ──→ 前端实时渲染

七、设计模式应用分析

7.1 工厂模式 + 注册表模式

应用场景:节点创建

优势

  • 避免大量switch-case语句
  • 符合开闭原则:新增节点只需注册
  • 支持运行时动态扩展

代码位置NodeFactory.java + NodeRegistry.java

7.2 策略模式

应用场景:节点执行逻辑

实现

  • INodeHandler接口定义策略
  • XxxNodeHandler实现具体策略
  • NodeHandlerBuilder作为策略上下文

优势

  • 节点执行逻辑可独立变化
  • 易于测试和维护

7.3 模板方法模式

应用场景:工作流执行流程

实现

  • AbsWorkflowHandler定义执行骨架
  • runChainNodes()runNodeFuture()等模板方法
  • onNodeStart()onNodeSuccess()等钩子方法

优势

  • 统一的执行流程
  • 子类可定制特定行为

7.4 Builder模式

应用场景:节点执行结果构建

实现

NodeResult result = NodeResult.builder()
    .nodeVariable(Map.of("answer", answer))
    .streamOutput(true)
    .writeContextFunc(this::customWriteContext)
    .isInterrupt(this::checkInterrupt)
    .build();

优势

  • 灵活构建复杂对象
  • 支持默认值和自定义函数

7.5 注解驱动设计

应用场景:处理器自动注册

实现

  • @NodeHandlerType声明支持的节点类型
  • NodeHandlerAutoRegistrar自动扫描注册

优势

  • 声明式编程
  • 减少配置代码

八、扩展性设计

8.1 自定义节点扩展

步骤

  1. 创建节点数据类继承AbsNode
public class CustomNode extends AbsNode {
    public CustomNode(String id, JSONObject properties) {
        super(id, properties);
        this.setType("custom-node");
    }

    @Override
    public void saveContext(Workflow workflow, Map<String, Object> detail) {
        // 实现上下文保存逻辑
    }
}
  1. 创建处理器实现INodeHandler
@NodeHandlerType(NodeType.CUSTOM)
@Component
public class CustomNodeHandler implements INodeHandler {
    @Override
    public NodeResult execute(Workflow workflow, AbsNode node) throws Exception {
        // 实现执行逻辑
        return NodeResult.builder().build();
    }
}
  1. 注册节点类型:
NodeBuilder.getFactory().register(NodeType.CUSTOM, CustomNode::new);

8.2 自定义比较器扩展

步骤

  1. 实现Compare接口并添加注解:
@CompareType("regex")
@Component
public class RegexCompare implements Compare {
    @Override
    public boolean compare(Object sourceValue, String targetValue) {
        return Pattern.matches(targetValue, String.valueOf(sourceValue));
    }
}
  1. 系统自动扫描注册

8.3 插件化架构思想

当前架构已具备插件化基础:

  • 节点工厂支持运行时注册
  • 处理器通过注解自动发现
  • 比较器通过注解自动发现

未来可扩展:

  • 定义插件接口和生命周期
  • 实现插件加载器
  • 支持热插拔

九、性能优化考量

9.1 并行执行优化

工作流引擎自动识别并行分支并使用CompletableFuture并行执行:

if (nodeList.size() > 1) {
    List<CompletableFuture<List<AbsNode>>> futureList = new ArrayList<>();
    for (AbsNode node : nodeList) {
        futureList.add(CompletableFuture.supplyAsync(() -> runChainNode(workflow, node)));
    }
    // 等待所有分支完成
}

注意事项

  • 共享状态的线程安全(如WorkflowContext使用CopyOnWriteArrayList
  • 异常处理和传播

9.2 缓存策略

节点查找优化

// Workflow中使用Map替代List遍历
private Map<String, AbsNode> nodeMap; // O(1)查找

// NodeType枚举中使用静态Map
private static final Map<String, NodeType> KEY_MAP;

模板渲染缓存:可考虑对相同Prompt的渲染结果进行缓存

9.3 内存管理

上下文清理:工作流执行完成后,清理不必要的上下文引用

大对象处理:对于大文件(如图片、文档),使用流式处理而非全量加载


十、总结与展望

10.1 架构亮点

  1. 清晰的分层设计:API层与实现层分离,职责明确
  2. 丰富的设计模式:工厂、策略、模板方法、Builder等模式综合运用
  3. 高度可扩展:通过注册表和注解驱动,支持自定义扩展
  4. 流式优先:原生支持AI流式输出,用户体验优秀
  5. 多级作用域:四级变量作用域,满足复杂业务场景

10.2 适用场景

  • AI对话应用
  • 智能客服系统
  • 知识库问答
  • 复杂业务流程编排
  • 多模态AI应用

10.3 未来展望

  1. 可视化调试:支持节点断点、变量监控
  2. 版本管理:工作流版本控制和回滚
  3. 性能监控:节点执行耗时统计和瓶颈分析
  4. 更多AI能力:支持更多模型和多模态能力
  5. 插件生态:构建第三方节点插件市场

附录:关键类索引

类名 路径 说明
AbsNode workflow-api/node/AbsNode.java 节点抽象基类
Workflow workflow-api/model/Workflow.java 工作流模型
WorkflowContext workflow-api/model/WorkflowContext.java 上下文管理
VariableResolver workflow-api/model/VariableResolver.java 变量解析
TemplateRenderer workflow-api/model/TemplateRenderer.java 模板渲染
NodeFactory workflow-api/factory/NodeFactory.java 节点工厂
NodeRegistry workflow-api/factory/NodeRegistry.java 节点注册表
INodeHandler workflow/handler/node/INodeHandler.java 节点处理器接口
AbsWorkflowHandler workflow/handler/AbsWorkflowHandler.java 执行器基类
NodeResult workflow-api/model/NodeResult.java 节点执行结果
ConditionUtil workflow/util/ConditionUtil.java 条件判断工具
Compare workflow/compare/Compare.java 比较器接口
LogicFlow workflow-api/logic/LogicFlow.java 图结构表示
LfEdge workflow-api/logic/LfEdge.java 边定义
NodeType workflow-api/enums/NodeType.java 节点类型枚举
NodeStatus workflow-api/enums/NodeStatus.java 节点状态枚举
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐