LiteFlowEngine - 轻量级流程引擎
LiteFlowEngine - 轻量级流程引擎开源实现文档
版本: v1.0.0
作者: voyaqi_ovo
更新日期: 2026-03-22
框架性质: 自研轻量级流程引擎框架
概述
1.1 什么是LiteFlowEngine
LiteFlowEngine是一个自研的轻量级流程引擎框架,旨在帮助开发者快速构建业务流程编排能力。它汲取了业界主流流程引擎的设计精华,结合中小型项目的实际需求,提供了一套极简、高效、易用的流程编排解决方案。
框架定位
| 框架类型 | 代表框架 | LiteFlowEngine定位 |
|---|---|---|
| 重量级BPM框架 | Activiti, Camunda | 功能全面,学习成本高,适合企业级复杂流程 |
| 轻量级编排框架 | LiteFlow(国产), AsyncTool | 易于上手,灵活性高,适合中小型项目 |
| 自研框架 | LiteFlowEngine | 汲取两者优点,提供开箱即用的流程编排能力 |
核心特性:
- ✅ 轻量级: 核心代码仅3000行,无复杂依赖,易于理解和二次开发
- ✅ 可配置化: 通过XML配置流程,无需硬编码流程逻辑
- ✅ 插件化: Activity节点可插拔,支持自定义扩展
- ✅ 上下文传递: 统一的上下文机制,Activity间数据共享
- ✅ 并发安全: 支持分布式锁和幂等控制
- ✅ 异常处理: 统一的异常处理机制,支持流程中断和回滚
- ✅ 模板方法: 提供通用业务模板和分布式锁模板,统一处理入口
1.2 与开源框架对比
1.2.1 与Activiti/Camunda对比
| 对比维度 | Activiti/Camunda | LiteFlowEngine |
|---|---|---|
| 复杂度 | 高,需要学习BPMN规范 | 低,传统编程思维即可上手 |
| 部署方式 | 需要独立部署流程引擎 | 嵌入式,无需独立部署 |
| 配置方式 | 流程建模工具+XML | 纯XML配置 |
| 学习成本 | 数周 | 数天 |
| 适用场景 | 企业级复杂流程 | 中小型项目快速落地 |
| 监控能力 | 强大的流程监控中心 | 基础日志监控 |
1.2.2 与LiteFlow(国产开源)对比
| 对比维度 | LiteFlow(开源) | LiteFlowEngine(自研) |
|---|---|---|
| 设计理念 | 基于EL表达式的规则引擎 | 基于模板方法的流程编排 |
| 复杂性 | 需要学习EL表达式规则 | 传统面向对象编程 |
| 灵活性 | 极高,支持复杂规则 | 适中,适合标准流程 |
| 开箱即用 | 需要学习配置规则 | 直接继承模板即可使用 |
| 分布锁支持 | 无内置支持 | 提供分布式锁模板 |
LiteFlowEngine的优势:
- 更易上手: 传统面向对象编程思想,无需学习复杂的规则引擎语法
- 开箱即用: 提供完整的模板方法,继承即可使用
- 生产验证: 经过生产环境验证,稳定可靠
- 文档完善: 提供详细的实现文档和最佳实践
1.3 适用场景
- 商户入驻流程编排
- 订单处理流程编排
- 数据同步流程编排
- 审批流程编排
- 报表生成流程编排
- 任何需要多步骤处理的业务流程
1.3 类似开源框架
| 框架名称 | 特点 | 适用场景 |
|---|---|---|
| Activiti | 功能全面,BPMN标准支持 | 复杂业务流程,需要可视化监控 |
| Camunda | 高性能,微服务友好 | 企业级工作流,需要流程建模 |
| Flowable | 轻量级,可扩展 | 中小型项目,需要灵活定制 |
| LiteFlowEngine | 极简设计,开箱即用 | 轻量级场景,快速流程编排 |
LiteFlowEngine的优势: 相比Activiti/Camunda等重量级框架,LiteFlowEngine更加轻量和灵活,适合中小型项目快速落地;相比硬编码流程,LiteFlowEngine提供了配置化能力,降低了维护成本。
核心设计理念
2.1 设计原则

核心思想:
- 配置化优于硬编码: 流程定义通过配置文件管理,降低代码耦合度
- 插件化设计: Activity节点可插拔,支持运行时动态扩展
- 上下文驱动: 统一的上下文机制,Activity间通过上下文传递数据
- 异常中断机制: 任一Activity失败可中断流程,支持自定义异常处理
- 轻量级实现: 避免复杂的状态机,采用链式执行模式
2.2 架构层次
快速开始
3.1 环境要求
- JDK 8+
- Maven 3.x
- Spring Framework 4.3+ (可选)
3.2 Maven依赖
<!-- LiteFlowEngine核心依赖 -->
<dependency>
<groupId>com.liteflow</groupId>
<artifactId>lite-flow-engine</artifactId>
<version>1.0.0</version>
</dependency>
<!-- 可选: Spring集成 -->
<dependency>
<groupId>com.liteflow</groupId>
<artifactId>lite-flow-engine-spring</artifactId>
<version>1.0.0</version>
</dependency>
3.3 5分钟快速示例
步骤1: 定义流程配置
<?xml version="1.0" encoding="UTF-8"?>
<flows>
<!-- 用户注册流程 -->
<flow name="USER_REGISTER_PROCESS">
<activity name="checkActivity" bean-ref="userRegisterCheckActivity"/>
<activity name="convertActivity" bean-ref="userRegisterConvertActivity"/>
<activity name="persistActivity" bean-ref="userRegisterPersistActivity"/>
<activity name="notifyActivity" bean-ref="userRegisterNotifyActivity"/>
</flow>
</flows>
步骤2: 实现Activity节点
/**
* 用户注册参数校验Activity
*/
public class UserRegisterCheckActivity implements FlowActivity {
@Override
public ActivityResult execute(FlowContext context) {
// 1. 从上下文获取请求参数
UserRegisterRequest request = context.getRequest();
// 2. 参数校验
if (StringUtils.isBlank(request.getUsername())) {
return ActivityResult.fail("用户名不能为空");
}
if (StringUtils.isBlank(request.getPassword())) {
return ActivityResult.fail("密码不能为空");
}
// 3. 校验通过,返回成功
return ActivityResult.success();
}
}
/**
* 用户注册数据转换Activity
*/
public class UserRegisterConvertActivity implements FlowActivity {
@Override
public ActivityResult execute(FlowContext context) {
// 1. 从上下文获取请求参数
UserRegisterRequest request = context.getRequest();
// 2. 转换为UserEntity
UserEntity userEntity = new UserEntity();
userEntity.setUsername(request.getUsername());
userEntity.setPassword(encryptPassword(request.getPassword()));
userEntity.setEmail(request.getEmail());
userEntity.setPhone(request.getPhone());
// 3. 存入上下文,供后续Activity使用
context.put("userEntity", userEntity);
return ActivityResult.success();
}
private String encryptPassword(String password) {
// 密码加密逻辑
return DigestUtils.md5DigestAsHex(password.getBytes());
}
}
步骤3: 创建Handler
/**
* 用户注册Handler
*/
public class UserRegisterHandler extends AbstractFlowEngineHandler<
UserRegisterRequest, UserRegisterResponse> {
// 注入FlowEngine
@Autowired
private FlowEngine flowEngine;
@Override
protected UserRegisterResponse doProcess(UserRegisterRequest request) {
// 1. 构建上下文
FlowContext context = FlowContextBuilder.builder()
.request(request)
.response(new UserRegisterResponse())
.bizIdentify(request.getBizIdentify())
.processName("USER_REGISTER_PROCESS")
.build();
// 2. 执行流程
flowEngine.execute(context);
// 3. 返回结果
return context.getResponse();
}
@Override
protected void setFlowEngine(FlowEngine flowEngine) {
this.flowEngine = flowEngine;
}
}
步骤4: 运行测试
@Test
public void testUserRegister() {
UserRegisterRequest request = new UserRegisterRequest();
request.setUsername("testuser");
request.setPassword("password123");
request.setEmail("test@example.com");
UserRegisterResponse response = userRegisterHandler.process(request);
Assert.assertTrue(response.isSuccess());
Assert.assertNotNull(response.getUserId());
}
架构设计
4.1 核心类图

4.2 执行流程

4.3 核心组件职责
| 组件 | 职责 | 关键方法 |
|---|---|---|
| FlowActivity | Activity节点接口,定义执行契约 | execute(FlowContext) |
| AbstractActivity | Activity基类,提供通用能力 | getSuccessResult(), getFailResult() |
| FlowContext | 流程上下文,存储请求/响应/中间数据 | put(), get(), getRequest(), getResponse() |
| FlowEngine | 流程引擎接口,定义执行入口 | execute(FlowContext) |
| DefaultFlowEngineImpl | 流程引擎默认实现,编排Activity执行 | execute(), loadProcessConfig() |
| AbstractFlowEngineHandler | Handler基类,封装流程引擎调用 | process(), doProcess() |
| ActivityResult | Activity执行结果,包含成功/失败状态 | success(), fail() |
核心接口定义
5.1 FlowActivity接口
package com.liteflow.engine.api;
/**
* 流程Activity接口
*
* 所有流程节点必须实现此接口,定义统一的执行契约
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public interface FlowActivity {
/**
* 执行Activity逻辑
*
* @param context 流程上下文,包含请求/响应/中间数据
* @return 执行结果,包含成功/失败状态和错误信息
*/
ActivityResult execute(FlowContext context);
/**
* 获取Activity名称(可选实现)
*
* @return Activity名称
*/
default String getName() {
return this.getClass().getSimpleName();
}
/**
* 是否支持异步执行(可选实现)
*
* @return true表示支持异步执行,默认false
*/
default boolean isAsync() {
return false;
}
}
5.2 AbstractActivity抽象类
package com.liteflow.engine.core;
import com.liteflow.engine.api.FlowActivity;
import com.liteflow.engine.api.FlowContext;
import com.liteflow.engine.api.ActivityResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Activity抽象基类
*
* 提供通用能力:
* 1. 日志记录
* 2. 成功/失败结果构建
* 3. 上下文参数获取工具方法
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public abstract class AbstractActivity implements FlowActivity {
protected final Logger logger = LoggerFactory.getLogger(getClass());
/**
* 构建成功结果
*/
protected ActivityResult getSuccessResult() {
return ActivityResult.success();
}
/**
* 构建失败结果
*/
protected ActivityResult getFailResult(String errorMsg) {
return ActivityResult.fail(errorMsg);
}
/**
* 构建失败结果(带错误码)
*/
protected ActivityResult getFailResult(String errorCode, String errorMsg) {
return ActivityResult.fail(errorCode, errorMsg);
}
/**
* 从上下文获取参数(带默认值)
*/
protected <T> T getContextParam(FlowContext context, String key, T defaultValue) {
T value = context.get(key);
return value != null ? value : defaultValue;
}
/**
* 从上下文获取必填参数
*/
protected <T> T getRequiredContextParam(FlowContext context, String key) {
T value = context.get(key);
if (value == null) {
throw new IllegalArgumentException("Required parameter not found: " + key);
}
return value;
}
}
5.3 FlowContext上下文
package com.liteflow.engine.api;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 流程上下文
*
* 核心功能:
* 1. 存储请求/响应对象
* 2. Activity间数据传递
* 3. 流程元信息存储
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public class FlowContext {
/** 执行时间 */
private Date time;
/** 应用名称 */
private String appName;
/** 请求对象 */
private Object request;
/** 响应对象 */
private Object response;
/** 业务标识 */
private String bizIdentify;
/** 流程名称 */
private String processName;
/** 扩展属性 */
private Map<String, Object> attributes = new HashMap<>();
public FlowContext() {
this.time = new Date();
}
/**
* 存储属性
*/
public <T> T put(String key, T value) {
attributes.put(key, value);
return value;
}
/**
* 获取属性
*/
@SuppressWarnings("unchecked")
public <T> T get(String key) {
return (T) attributes.get(key);
}
/**
* 获取请求对象
*/
@SuppressWarnings("unchecked")
public <T> T getRequest() {
return (T) request;
}
/**
* 设置请求对象
*/
public <T> void setRequest(T request) {
this.request = request;
}
/**
* 获取响应对象
*/
@SuppressWarnings("unchecked")
public <T> T getResponse() {
return (T) response;
}
/**
* 设置响应对象
*/
public <T> void setResponse(T response) {
this.response = response;
}
// Getter/Setter方法省略...
}
5.4 ActivityResult结果对象
package com.liteflow.engine.api;
import java.io.Serializable;
/**
* Activity执行结果
*
* 封装Activity执行状态和错误信息
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public class ActivityResult implements Serializable {
private static final long serialVersionUID = 1L;
/** 是否成功 */
private boolean success;
/** 错误码 */
private String errorCode;
/** 错误信息 */
private String errorMsg;
/** 扩展数据 */
private Object data;
public ActivityResult() {
}
public ActivityResult(boolean success) {
this.success = success;
}
/**
* 构建成功结果
*/
public static ActivityResult success() {
return new ActivityResult(true);
}
/**
* 构建成功结果(带数据)
*/
public static ActivityResult success(Object data) {
ActivityResult result = new ActivityResult(true);
result.setData(data);
return result;
}
/**
* 构建失败结果
*/
public static ActivityResult fail(String errorMsg) {
ActivityResult result = new ActivityResult(false);
result.setErrorMsg(errorMsg);
return result;
}
/**
* 构建失败结果(带错误码)
*/
public static ActivityResult fail(String errorCode, String errorMsg) {
ActivityResult result = new ActivityResult(false);
result.setErrorCode(errorCode);
result.setErrorMsg(errorMsg);
return result;
}
// Getter/Setter方法省略...
}
5.5 FlowEngine引擎接口
package com.liteflow.engine.api;
/**
* 流程引擎接口
*
* 定义流程执行入口
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public interface FlowEngine {
/**
* 执行流程
*
* @param context 流程上下文
*/
void execute(FlowContext context);
/**
* 执行流程(指定流程名称)
*
* @param context 流程上下文
* @param processName 流程名称
*/
void execute(FlowContext context, String processName);
}
实现详解
6.1 DefaultFlowEngineImpl实现
package com.liteflow.engine.core;
import com.liteflow.engine.api.FlowActivity;
import com.liteflow.engine.api.FlowContext;
import com.liteflow.engine.api.FlowEngine;
import com.liteflow.engine.api.ActivityResult;
import com.liteflow.engine.config.ActivityRegistry;
import com.liteflow.engine.config.ProcessConfig;
import com.liteflow.engine.config.ProcessDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
/**
* 流程引擎默认实现
*
* 核心职责:
* 1. 加载流程配置
* 2. 编排Activity执行顺序
* 3. 异常处理和中断机制
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public class DefaultFlowEngineImpl implements FlowEngine {
private static final Logger logger = LoggerFactory.getLogger(DefaultFlowEngineImpl.class);
@Autowired
private ActivityRegistry activityRegistry;
@Autowired
private ProcessConfig processConfig;
@Override
public void execute(FlowContext context) {
String processName = context.getProcessName();
if (processName == null) {
throw new IllegalArgumentException("Process name not found in context");
}
execute(context, processName);
}
@Override
public void execute(FlowContext context, String processName) {
long startTime = System.currentTimeMillis();
try {
// 1. 加载流程定义
ProcessDefinition processDefinition = processConfig.getProcess(processName);
if (processDefinition == null) {
throw new IllegalArgumentException("Process not found: " + processName);
}
logger.info("[FlowEngine] Start process: {}, bizIdentify: {}",
processName, context.getBizIdentify());
// 2. 获取Activity列表
List<String> activityNames = processDefinition.getActivityNames();
// 3. 按顺序执行Activity
for (String activityName : activityNames) {
FlowActivity activity = activityRegistry.getActivity(activityName);
if (activity == null) {
logger.error("[FlowEngine] Activity not found: {}", activityName);
throw new IllegalStateException("Activity not found: " + activityName);
}
logger.info("[FlowEngine] Execute activity: {}", activityName);
// 执行Activity
ActivityResult result = activity.execute(context);
// 判断执行结果
if (!result.isSuccess()) {
logger.error("[FlowEngine] Activity failed: {}, error: {}",
activityName, result.getErrorMsg());
// 中断流程
handleError(context, result);
break;
}
logger.info("[FlowEngine] Activity success: {}", activityName);
}
} catch (Exception e) {
logger.error("[FlowEngine] Process execute error", e);
handleException(context, e);
} finally {
long costTime = System.currentTimeMillis() - startTime;
logger.info("[FlowEngine] Process finished: {}, cost: {}ms",
processName, costTime);
}
}
/**
* 处理错误
*/
private void handleError(FlowContext context, ActivityResult result) {
// 设置错误信息到上下文
context.put("errorCode", result.getErrorCode());
context.put("errorMsg", result.getErrorMsg());
// 设置响应结果
Object response = context.getResponse();
if (response instanceof BaseResponse) {
BaseResponse baseResponse = (BaseResponse) response;
baseResponse.setSuccess(false);
baseResponse.setErrorCode(result.getErrorCode());
baseResponse.setErrorMsg(result.getErrorMsg());
}
}
/**
* 处理异常
*/
private void handleException(FlowContext context, Exception e) {
context.put("exception", e);
Object response = context.getResponse();
if (response instanceof BaseResponse) {
BaseResponse baseResponse = (BaseResponse) response;
baseResponse.setSuccess(false);
baseResponse.setErrorCode("SYSTEM_ERROR");
baseResponse.setErrorMsg(e.getMessage());
}
}
}
6.2 AbstractFlowEngineHandler实现
package com.liteflow.engine.handler;
import com.liteflow.engine.api.FlowContext;
import com.liteflow.engine.api.FlowEngine;
import com.liteflow.engine.api.BaseRequest;
import com.liteflow.engine.api.BaseResponse;
import com.liteflow.engine.util.FlowContextBuilder;
/**
* 流程引擎Handler抽象基类
*
* 封装流程引擎调用逻辑,业务Handler继承此类即可
*
* @param <T> 请求类型
* @param <R> 响应类型
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public abstract class AbstractFlowEngineHandler<T extends BaseRequest, R extends BaseResponse> {
protected FlowEngine flowEngine;
/**
* 处理入口
*
* @param request 请求对象
* @return 响应对象
*/
public R process(T request) {
// 1. 参数校验
validateRequest(request);
// 2. 执行业务逻辑
return doProcess(request);
}
/**
* 业务处理逻辑(子类实现)
*
* @param request 请求对象
* @return 响应对象
*/
protected abstract R doProcess(T request);
/**
* 构建流程上下文
*
* @param request 请求对象
* @param response 响应对象
* @return 流程上下文
*/
protected FlowContext buildContext(T request, R response) {
return FlowContextBuilder.builder()
.request(request)
.response(response)
.bizIdentify(getBizIdentify(request))
.processName(getProcessName(request))
.build();
}
/**
* 获取业务标识(子类可重写)
*/
protected String getBizIdentify(T request) {
return request.getBizIdentify();
}
/**
* 获取流程名称(子类必须实现)
*/
protected abstract String getProcessName(T request);
/**
* 参数校验(子类可重写)
*/
protected void validateRequest(T request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
}
/**
* 设置流程引擎
*/
public void setFlowEngine(FlowEngine flowEngine) {
this.flowEngine = flowEngine;
}
}
6.3 FlowContextBuilder构建器
package com.liteflow.engine.util;
import com.liteflow.engine.api.FlowContext;
import java.util.Map;
/**
* 流程上下文构建器
*
* 使用Builder模式构建FlowContext
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public class FlowContextBuilder {
private FlowContext context = new FlowContext();
public static FlowContextBuilder builder() {
return new FlowContextBuilder();
}
/**
* 设置请求对象
*/
public FlowContextBuilder request(Object request) {
context.setRequest(request);
return this;
}
/**
* 设置响应对象
*/
public FlowContextBuilder response(Object response) {
context.setResponse(response);
return this;
}
/**
* 设置业务标识
*/
public FlowContextBuilder bizIdentify(String bizIdentify) {
context.setBizIdentify(bizIdentify);
return this;
}
/**
* 设置流程名称
*/
public FlowContextBuilder processName(String processName) {
context.setProcessName(processName);
return this;
}
/**
* 设置应用名称
*/
public FlowContextBuilder appName(String appName) {
context.setAppName(appName);
return this;
}
/**
* 添加扩展属性
*/
public FlowContextBuilder put(String key, Object value) {
context.put(key, value);
return this;
}
/**
* 批量添加扩展属性
*/
public FlowContextBuilder putAll(Map<String, Object> attributes) {
if (attributes != null) {
attributes.forEach((key, value) -> context.put(key, value));
}
return this;
}
/**
* 构建FlowContext
*/
public FlowContext build() {
return context;
}
}
配置类详解
6.4 ActivityRegistry活动注册表
package com.liteflow.engine.config;
import com.liteflow.engine.api.FlowActivity;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Activity注册表
*
* 核心职责:
* 1. 管理所有Activity实例
* 2. 根据activityName获取Activity实例
* 3. 支持动态注册Activity
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
@Component
public class ActivityRegistry implements ApplicationContextAware {
/** Activity实例映射: activityName -> FlowActivity实例 */
private Map<String, FlowActivity> activityMap = new ConcurrentHashMap<>();
/** Spring应用上下文 */
private ApplicationContext applicationContext;
/**
* 获取Activity实例
*
* @param activityName Activity名称
* @return Activity实例
*/
public FlowActivity getActivity(String activityName) {
// 1. 先从本地缓存获取
FlowActivity activity = activityMap.get(activityName);
// 2. 本地缓存没有,从Spring容器获取
if (activity == null && applicationContext != null) {
try {
activity = applicationContext.getBean(activityName, FlowActivity.class);
// 缓存到本地
activityMap.put(activityName, activity);
} catch (Exception e) {
// Activity不存在
return null;
}
}
return activity;
}
/**
* 注册Activity实例
*
* @param activityName Activity名称
* @param activity Activity实例
*/
public void registerActivity(String activityName, FlowActivity activity) {
activityMap.put(activityName, activity);
}
/**
* 批量注册Activity
*
* @param activities Activity映射Map
*/
public void registerActivities(Map<String, FlowActivity> activities) {
activityMap.putAll(activities);
}
/**
* 移除Activity
*
* @param activityName Activity名称
*/
public void removeActivity(String activityName) {
activityMap.remove(activityName);
}
/**
* 清空所有Activity
*/
public void clear() {
activityMap.clear();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
6.5 ProcessDefinition流程定义
package com.liteflow.engine.config;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* 流程定义
*
* 核心职责:
* 1. 存储流程名称
* 2. 维护Activity有序列表
* 3. 记录Activity与BeanRef的映射关系
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public class ProcessDefinition implements Serializable {
private static final long serialVersionUID = 1L;
/** 流程名称 */
private String name;
/** 流程描述 */
private String description;
/**
* Activity映射表
* key: activityName
* value: beanRef (Spring Bean引用ID)
*/
private Map<String, String> activityMapping = new LinkedHashMap<>();
/** Activity顺序列表 */
private List<String> activitySequence = new ArrayList<>();
public ProcessDefinition() {
}
public ProcessDefinition(String name) {
this.name = name;
}
/**
* 添加Activity定义
*
* @param activityName Activity名称
* @param beanRef Spring Bean引用ID
*/
public void addActivity(String activityName, String beanRef) {
activityMapping.put(activityName, beanRef);
// 保持添加顺序
if (!activitySequence.contains(activityName)) {
activitySequence.add(activityName);
}
}
/**
* 获取Activity列表(按定义顺序)
*
* @return Activity名称列表
*/
public List<String> getActivityNames() {
return new ArrayList<>(activitySequence);
}
/**
* 获取Activity的BeanRef
*
* @param activityName Activity名称
* @return BeanRef
*/
public String getBeanRef(String activityName) {
return activityMapping.get(activityName);
}
/**
* 获取Activity数量
*/
public int getActivityCount() {
return activitySequence.size();
}
/**
* 判断流程是否包含指定Activity
*/
public boolean containsActivity(String activityName) {
return activityMapping.containsKey(activityName);
}
// Getter/Setter方法省略...
}
6.6 ProcessConfig流程配置
package com.liteflow.engine.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import javax.annotation.PostConstruct;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 流程配置管理器
*
* 核心职责:
* 1. 加载XML配置文件
* 2. 解析流程定义
* 3. 提供流程查询接口
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public class ProcessConfig {
private static final Logger logger = LoggerFactory.getLogger(ProcessConfig.class);
/** 配置文件路径 */
private String configLocation;
/** 流程定义映射: processName -> ProcessDefinition */
private Map<String, ProcessDefinition> processMap = new ConcurrentHashMap<>();
/**
* 初始化配置(Bean创建后自动调用)
*/
@PostConstruct
public void init() {
try {
loadConfig();
logger.info("[ProcessConfig] Load process config success, total processes: {}",
processMap.size());
} catch (Exception e) {
logger.error("[ProcessConfig] Load process config failed", e);
throw new RuntimeException("Load process config failed", e);
}
}
/**
* 加载配置文件
*/
private void loadConfig() throws Exception {
Resource resource = new ClassPathResource(configLocation);
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
// 禁用外部实体引用,防止XXE攻击
factory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
DocumentBuilder builder = factory.newDocumentBuilder();
Document document = builder.parse(resource.getInputStream());
// 解析所有flow节点
NodeList flowNodes = document.getElementsByTagName("flow");
for (int i = 0; i < flowNodes.getLength(); i++) {
Element flowElement = (Element) flowNodes.item(i);
String flowName = flowElement.getAttribute("name");
ProcessDefinition definition = new ProcessDefinition();
definition.setName(flowName);
// 解析flow下的所有activity节点
NodeList activityNodes = flowElement.getElementsByTagName("activity");
for (int j = 0; j < activityNodes.getLength(); j++) {
Element activityElement = (Element) activityNodes.item(j);
String activityName = activityElement.getAttribute("name");
String beanRef = activityElement.getAttribute("bean-ref");
definition.addActivity(activityName, beanRef);
logger.debug("[ProcessConfig] Add activity: {} -> {}", activityName, beanRef);
}
processMap.put(flowName, definition);
logger.info("[ProcessConfig] Load process: {}, activities: {}",
flowName, definition.getActivityCount());
}
}
/**
* 获取流程定义
*
* @param processName 流程名称
* @return 流程定义
*/
public ProcessDefinition getProcess(String processName) {
return processMap.get(processName);
}
/**
* 判断流程是否存在
*/
public boolean containsProcess(String processName) {
return processMap.containsKey(processName);
}
/**
* 获取所有流程名称
*/
public java.util.Set<String> getAllProcessNames() {
return processMap.keySet();
}
/**
* 动态添加流程定义
*/
public void addProcess(ProcessDefinition definition) {
processMap.put(definition.getName(), definition);
}
/**
* 移除流程定义
*/
public void removeProcess(String processName) {
processMap.remove(processName);
}
// Getter/Setter方法省略...
}
模板方法详解
6.7 AbstractFlowEngineHandler模板方法设计
LiteFlowEngine提供了两种模板方法,用于统一处理流程入口:
模板方法架构图

6.7.1 通用业务模板 CommonBizHandler
package com.liteflow.engine.handler;
import com.liteflow.engine.api.FlowContext;
import com.liteflow.engine.api.FlowEngine;
import com.liteflow.engine.api.BaseRequest;
import com.liteflow.engine.api.BaseResponse;
import com.liteflow.engine.util.FlowContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 通用业务处理模板
*
* 核心职责:
* 1. 参数校验
* 2. 构建上下文
* 3. 执行流程
* 4. 返回结果
*
* 适用场景: 普通业务流程,无需幂等控制
*
* @param <T> 请求类型
* @param <R> 响应类型
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public abstract class CommonBizHandler<T extends BaseRequest, R extends BaseResponse> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
protected FlowEngine flowEngine;
/**
* 处理入口(模板方法)
*
* 执行流程:
* 1. 参数校验
* 2. 构建响应对象
* 3. 构建上下文
* 4. 执行流程
* 5. 返回结果
*
* @param request 请求对象
* @return 响应对象
*/
public R process(T request) {
long startTime = System.currentTimeMillis();
String bizIdentify = "";
try {
// 1. 参数校验
validateRequest(request);
bizIdentify = getBizIdentify(request);
logger.info("[CommonBizHandler] Start process, processName: {}, bizIdentify: {}",
getProcessName(request), bizIdentify);
// 2. 构建响应对象
R response = createResponse();
// 3. 构建上下文
FlowContext context = buildContext(request, response);
// 4. 执行流程
flowEngine.execute(context);
// 5. 返回结果
return response;
} catch (Exception e) {
logger.error("[CommonBizHandler] Process failed, bizIdentify: {}", bizIdentify, e);
return handleException(request, e);
} finally {
long costTime = System.currentTimeMillis() - startTime;
logger.info("[CommonBizHandler] Process finished, bizIdentify: {}, cost: {}ms",
bizIdentify, costTime);
}
}
/**
* 参数校验(子类可重写)
*/
protected void validateRequest(T request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
}
/**
* 构建上下文
*/
protected FlowContext buildContext(T request, R response) {
return FlowContextBuilder.builder()
.request(request)
.response(response)
.bizIdentify(getBizIdentify(request))
.processName(getProcessName(request))
.appName(getAppName())
.build();
}
/**
* 获取业务标识(子类可重写)
*/
protected String getBizIdentify(T request) {
return request.getBizIdentify();
}
/**
* 获取流程名称(子类必须实现)
*/
protected abstract String getProcessName(T request);
/**
* 创建响应对象(子类必须实现)
*/
protected abstract R createResponse();
/**
* 获取应用名称(子类可重写)
*/
protected String getAppName() {
return "LiteFlowEngine";
}
/**
* 异常处理(子类可重写)
*/
protected R handleException(T request, Exception e) {
R response = createResponse();
response.setSuccess(false);
response.setErrorCode("SYSTEM_ERROR");
response.setErrorMsg(e.getMessage());
return response;
}
public void setFlowEngine(FlowEngine flowEngine) {
this.flowEngine = flowEngine;
}
}
6.7.2 分布式锁模板 IdempotentHandler
package com.liteflow.engine.handler;
import com.liteflow.engine.api.FlowContext;
import com.liteflow.engine.api.FlowEngine;
import com.liteflow.engine.api.BaseRequest;
import com.liteflow.engine.api.BaseResponse;
import com.liteflow.engine.lock.DistributedLockService;
import com.liteflow.engine.util.FlowContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 分布式锁幂等模板
*
* 核心职责:
* 1. 分布式锁控制
* 2. 幂等性校验
* 3. 构建上下文
* 4. 执行流程
*
* 适用场景: 需要幂等控制的业务流程,如订单创建、支付等
*
* @param <T> 请求类型
* @param <R> 响应类型
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public abstract class IdempotentHandler<T extends BaseRequest, R extends BaseResponse> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
protected FlowEngine flowEngine;
@Autowired
protected DistributedLockService lockService;
/** 锁超时时间(秒) */
protected static final long DEFAULT_LOCK_TIMEOUT = 30;
/**
* 处理入口(模板方法 - 带分布式锁)
*
* 执行流程:
* 1. 参数校验
* 2. 获取分布式锁
* 3. 幂等校验
* 4. 构建上下文
* 5. 执行流程
* 6. 释放锁
* 7. 返回结果
*
* @param request 请求对象
* @return 响应对象
*/
public R process(T request) {
long startTime = System.currentTimeMillis();
String bizIdentify = "";
String lockKey = "";
try {
// 1. 参数校验
validateRequest(request);
bizIdentify = getBizIdentify(request);
lockKey = buildLockKey(request);
logger.info("[IdempotentHandler] Start process, processName: {}, bizIdentify: {}, lockKey: {}",
getProcessName(request), bizIdentify, lockKey);
// 2. 尝试获取分布式锁
boolean locked = lockService.tryLock(lockKey, getLockTimeout());
if (!locked) {
logger.warn("[IdempotentHandler] Get lock failed, lockKey: {}", lockKey);
return handleLockFailed(request);
}
logger.info("[IdempotentHandler] Get lock success, lockKey: {}", lockKey);
try {
// 3. 幂等校验
R cachedResponse = checkIdempotent(request);
if (cachedResponse != null) {
logger.info("[IdempotentHandler] Idempotent hit, bizIdentify: {}", bizIdentify);
return cachedResponse;
}
// 4. 构建响应对象
R response = createResponse();
// 5. 构建上下文
FlowContext context = buildContext(request, response);
// 6. 执行流程
flowEngine.execute(context);
// 7. 缓存结果(可选)
cacheResponse(request, response);
return response;
} finally {
// 释放锁
lockService.unlock(lockKey);
logger.info("[IdempotentHandler] Release lock, lockKey: {}", lockKey);
}
} catch (Exception e) {
logger.error("[IdempotentHandler] Process failed, bizIdentify: {}", bizIdentify, e);
return handleException(request, e);
} finally {
long costTime = System.currentTimeMillis() - startTime;
logger.info("[IdempotentHandler] Process finished, bizIdentify: {}, cost: {}ms",
bizIdentify, costTime);
}
}
/**
* 构建锁Key(子类必须实现)
*
* 建议格式: 业务类型:锁资源:业务ID
* 示例: ORDER:CREATE:123456
*/
protected abstract String buildLockKey(T request);
/**
* 幂等校验(子类可重写)
*
* 返回null表示首次请求,继续执行流程
* 返回非null表示重复请求,直接返回缓存结果
*/
protected R checkIdempotent(T request) {
// 默认不进行幂等校验
return null;
}
/**
* 缓存响应结果(子类可重写)
*/
protected void cacheResponse(T request, R response) {
// 默认不缓存
}
/**
* 获取锁超时时间(子类可重写)
*/
protected long getLockTimeout() {
return DEFAULT_LOCK_TIMEOUT;
}
/**
* 锁获取失败处理(子类可重写)
*/
protected R handleLockFailed(T request) {
R response = createResponse();
response.setSuccess(false);
response.setErrorCode("LOCK_FAILED");
response.setErrorMsg("系统繁忙,请稍后重试");
return response;
}
/**
* 参数校验(子类可重写)
*/
protected void validateRequest(T request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
}
/**
* 构建上下文
*/
protected FlowContext buildContext(T request, R response) {
return FlowContextBuilder.builder()
.request(request)
.response(response)
.bizIdentify(getBizIdentify(request))
.processName(getProcessName(request))
.appName(getAppName())
.build();
}
/**
* 获取业务标识(子类可重写)
*/
protected String getBizIdentify(T request) {
return request.getBizIdentify();
}
/**
* 获取流程名称(子类必须实现)
*/
protected abstract String getProcessName(T request);
/**
* 创建响应对象(子类必须实现)
*/
protected abstract R createResponse();
/**
* 获取应用名称(子类可重写)
*/
protected String getAppName() {
return "LiteFlowEngine";
}
/**
* 异常处理(子类可重写)
*/
protected R handleException(T request, Exception e) {
R response = createResponse();
response.setSuccess(false);
response.setErrorCode("SYSTEM_ERROR");
response.setErrorMsg(e.getMessage());
return response;
}
public void setFlowEngine(FlowEngine flowEngine) {
this.flowEngine = flowEngine;
}
public void setLockService(DistributedLockService lockService) {
this.lockService = lockService;
}
}
6.7.3 分布式锁服务接口
package com.liteflow.engine.lock;
/**
* 分布式锁服务接口
*
* 支持多种实现:
* 1. Redis实现
* 2. Zookeeper实现
* 3. 数据库实现
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
public interface DistributedLockService {
/**
* 尝试获取锁
*
* @param lockKey 锁Key
* @param timeout 超时时间(秒)
* @return true表示获取成功,false表示获取失败
*/
boolean tryLock(String lockKey, long timeout);
/**
* 释放锁
*
* @param lockKey 锁Key
*/
void unlock(String lockKey);
/**
* 判断锁是否被占用
*
* @param lockKey 锁Key
* @return true表示被占用,false表示未被占用
*/
boolean isLocked(String lockKey);
}
package com.liteflow.engine.lock.impl;
import com.liteflow.engine.lock.DistributedLockService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* Redis分布式锁实现
*
* 使用Redis SET NX EX命令实现
*
* @author LiteFlowEngine Team
* @version 1.0.0
*/
@Component
public class RedisDistributedLockServiceImpl implements DistributedLockService {
private static final Logger logger = LoggerFactory.getLogger(RedisDistributedLockServiceImpl.class);
private StringRedisTemplate redisTemplate;
public RedisDistributedLockServiceImpl(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public boolean tryLock(String lockKey, long timeout) {
try {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "LOCKED", timeout, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
} catch (Exception e) {
logger.error("[RedisLock] Try lock failed, lockKey: {}", lockKey, e);
return false;
}
}
@Override
public void unlock(String lockKey) {
try {
redisTemplate.delete(lockKey);
} catch (Exception e) {
logger.error("[RedisLock] Unlock failed, lockKey: {}", lockKey, e);
}
}
@Override
public boolean isLocked(String lockKey) {
return Boolean.TRUE.equals(redisTemplate.hasKey(lockKey));
}
}
6.7.4 模板方法使用示例
示例1: 使用通用业务模板
/**
* 订单处理Handler(通用业务模板)
*/
@Component
public class OrderProcessHandler extends CommonBizHandler<OrderProcessRequest, OrderProcessResponse> {
/**
* 指定流程名称
*/
@Override
protected String getProcessName(OrderProcessRequest request) {
return "ORDER_PROCESS";
}
/**
* 创建响应对象
*/
@Override
protected OrderProcessResponse createResponse() {
return new OrderProcessResponse();
}
/**
* 可选: 重写业务标识获取逻辑
*/
@Override
protected String getBizIdentify(OrderProcessRequest request) {
return request.getOrderId();
}
}
示例2: 使用分布式锁模板
/**
* 用户注册Handler(分布式锁幂等模板)
*/
@Component
public class UserRegisterHandler extends IdempotentHandler<UserRegisterRequest, UserRegisterResponse> {
@Autowired
private UserRegisterCacheService cacheService;
/**
* 构建锁Key
*/
@Override
protected String buildLockKey(UserRegisterRequest request) {
return "USER:REGISTER:" + request.getUsername();
}
/**
* 幂等校验: 检查用户名是否已注册
*/
@Override
protected UserRegisterResponse checkIdempotent(UserRegisterRequest request) {
String cachedResult = cacheService.getCachedResult(request.getUsername());
if (cachedResult != null) {
// 命中缓存,返回之前的结果
UserRegisterResponse response = new UserRegisterResponse();
response.setSuccess(true);
response.setUserId(cachedResult);
return response;
}
return null;
}
/**
* 缓存注册结果
*/
@Override
protected void cacheResponse(UserRegisterRequest request, UserRegisterResponse response) {
if (response.isSuccess()) {
cacheService.cacheResult(request.getUsername(), response.getUserId());
}
}
/**
* 指定流程名称
*/
@Override
protected String getProcessName(UserRegisterRequest request) {
return "USER_REGISTER_PROCESS";
}
/**
* 创建响应对象
*/
@Override
protected UserRegisterResponse createResponse() {
return new UserRegisterResponse();
}
/**
* 锁超时时间: 60秒
*/
@Override
protected long getLockTimeout() {
return 60;
}
}
6.7.5 模板方法对比
| 维度 | CommonBizHandler | IdempotentHandler |
|---|---|---|
| 锁机制 | 无分布式锁 | 有分布式锁 |
| 幂等性 | 不保证幂等 | 保证幂等 |
| 性能 | 高性能,无锁开销 | 略低,有锁开销 |
| 并发安全 | 可能重复执行 | 不会重复执行 |
| 适用场景 | 查询类、非关键业务 | 创建订单、支付、注册等关键业务 |
| 使用示例 | 订单查询、数据导出 | 订单创建、用户注册、支付处理 |
6.7.6 模板方法选择建议

配置说明
7.1 流程配置文件
<?xml version="1.0" encoding="UTF-8"?>
<flows>
<!-- 用户注册流程 -->
<flow name="USER_REGISTER_PROCESS">
<activity name="checkActivity" bean-ref="userRegisterCheckActivity"/>
<activity name="convertActivity" bean-ref="userRegisterConvertActivity"/>
<activity name="persistActivity" bean-ref="userRegisterPersistActivity"/>
<activity name="notifyActivity" bean-ref="userRegisterNotifyActivity"/>
</flow>
<!-- 订单处理流程 -->
<flow name="ORDER_PROCESS">
<activity name="validateActivity" bean-ref="orderValidateActivity"/>
<activity name="inventoryActivity" bean-ref="inventoryCheckActivity"/>
<activity name="paymentActivity" bean-ref="paymentProcessActivity"/>
<activity name="fulfillmentActivity" bean-ref="orderFulfillmentActivity"/>
</flow>
</flows>
7.2 Spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 流程引擎核心Bean -->
<bean id="flowEngine" class="com.liteflow.engine.core.DefaultFlowEngineImpl"/>
<!-- Activity注册表 -->
<bean id="activityRegistry" class="com.liteflow.engine.config.ActivityRegistry"/>
<!-- 流程配置加载器 -->
<bean id="processConfig" class="com.liteflow.engine.config.ProcessConfig">
<property name="configLocation" value="classpath:flow-engine-process.xml"/>
</bean>
<!-- Activity Bean定义 -->
<bean id="userRegisterCheckActivity"
class="com.example.activity.UserRegisterCheckActivity"/>
<bean id="userRegisterConvertActivity"
class="com.example.activity.UserRegisterConvertActivity"/>
<bean id="userRegisterPersistActivity"
class="com.example.activity.UserRegisterPersistActivity"/>
<bean id="userRegisterNotifyActivity"
class="com.example.activity.UserRegisterNotifyActivity"/>
</beans>
使用示例
8.1 订单处理流程示例
步骤1: 定义请求/响应模型
/**
* 订单处理请求
*/
public class OrderProcessRequest extends BaseRequest {
private String orderId;
private String userId;
private List<OrderItem> items;
private BigDecimal totalAmount;
// Getter/Setter省略...
}
/**
* 订单处理响应
*/
public class OrderProcessResponse extends BaseResponse {
private String orderId;
private String orderStatus;
private String paymentId;
// Getter/Setter省略...
}
步骤2: 实现Activity节点
/**
* 订单参数校验Activity
*/
public class OrderValidateActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
OrderProcessRequest request = context.getRequest();
// 校验订单ID
if (StringUtils.isBlank(request.getOrderId())) {
return getFailResult("INVALID_PARAM", "订单ID不能为空");
}
// 校验订单项
if (CollectionUtils.isEmpty(request.getItems())) {
return getFailResult("INVALID_PARAM", "订单项不能为空");
}
// 校验金额
if (request.getTotalAmount() == null
|| request.getTotalAmount().compareTo(BigDecimal.ZERO) <= 0) {
return getFailResult("INVALID_PARAM", "订单金额必须大于0");
}
logger.info("Order validation passed, orderId: {}", request.getOrderId());
return getSuccessResult();
}
}
/**
* 库存检查Activity
*/
public class InventoryCheckActivity extends AbstractActivity {
@Autowired
private InventoryService inventoryService;
@Override
public ActivityResult execute(FlowContext context) {
OrderProcessRequest request = context.getRequest();
// 检查库存
for (OrderItem item : request.getItems()) {
int availableStock = inventoryService.getAvailableStock(item.getProductId());
if (availableStock < item.getQuantity()) {
return getFailResult("INSUFFICIENT_STOCK",
"商品库存不足, productId: " + item.getProductId());
}
}
// 预占库存
for (OrderItem item : request.getItems()) {
inventoryService.reserveStock(item.getProductId(), item.getQuantity());
}
logger.info("Inventory check passed, orderId: {}", request.getOrderId());
return getSuccessResult();
}
}
/**
* 支付处理Activity
*/
public class PaymentProcessActivity extends AbstractActivity {
@Autowired
private PaymentService paymentService;
@Override
public ActivityResult execute(FlowContext context) {
OrderProcessRequest request = context.getRequest();
OrderProcessResponse response = context.getResponse();
// 创建支付单
PaymentRequest paymentRequest = new PaymentRequest();
paymentRequest.setOrderId(request.getOrderId());
paymentRequest.setUserId(request.getUserId());
paymentRequest.setAmount(request.getTotalAmount());
PaymentResult paymentResult = paymentService.createPayment(paymentRequest);
if (!paymentResult.isSuccess()) {
return getFailResult("PAYMENT_FAILED", paymentResult.getErrorMsg());
}
// 保存支付ID到上下文
context.put("paymentId", paymentResult.getPaymentId());
logger.info("Payment created, orderId: {}, paymentId: {}",
request.getOrderId(), paymentResult.getPaymentId());
return getSuccessResult();
}
}
/**
* 订单履约Activity
*/
public class OrderFulfillmentActivity extends AbstractActivity {
@Autowired
private OrderService orderService;
@Autowired
private InventoryService inventoryService;
@Override
public ActivityResult execute(FlowContext context) {
OrderProcessRequest request = context.getRequest();
OrderProcessResponse response = context.getResponse();
// 确认库存扣减
for (OrderItem item : request.getItems()) {
inventoryService.confirmDeduction(item.getProductId(), item.getQuantity());
}
// 更新订单状态
orderService.updateOrderStatus(request.getOrderId(), "PAID");
// 设置响应
response.setOrderId(request.getOrderId());
response.setOrderStatus("PAID");
response.setPaymentId(context.get("paymentId"));
logger.info("Order fulfillment completed, orderId: {}", request.getOrderId());
return getSuccessResult();
}
}
步骤3: 实现Handler
/**
* 订单处理Handler
*/
@Component
public class OrderProcessHandler
extends AbstractFlowEngineHandler<OrderProcessRequest, OrderProcessResponse> {
@Autowired
private FlowEngine flowEngine;
@Override
protected OrderProcessResponse doProcess(OrderProcessRequest request) {
// 构建上下文
OrderProcessResponse response = new OrderProcessResponse();
FlowContext context = buildContext(request, response);
// 执行流程
flowEngine.execute(context);
return response;
}
@Override
protected String getProcessName(OrderProcessRequest request) {
return "ORDER_PROCESS";
}
@Override
protected void setFlowEngine(FlowEngine flowEngine) {
this.flowEngine = flowEngine;
}
}
步骤4: 配置流程
<flow name="ORDER_PROCESS">
<activity name="validateActivity" bean-ref="orderValidateActivity"/>
<activity name="inventoryActivity" bean-ref="inventoryCheckActivity"/>
<activity name="paymentActivity" bean-ref="paymentProcessActivity"/>
<activity name="fulfillmentActivity" bean-ref="orderFulfillmentActivity"/>
</flow>
最佳实践
9.1 Activity设计原则
原则1: 单一职责
// ❌ 错误示例: 一个Activity做太多事情
public class BadActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
// 校验参数
// 查询数据库
// 调用外部服务
// 发送消息
// 更新状态
// ... 一个Activity干了5件事
}
}
// ✅ 正确示例: 拆分为多个Activity
public class ValidateActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
// 只负责参数校验
}
}
public class QueryDataActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
// 只负责查询数据
}
}
public class CallServiceActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
// 只负责调用外部服务
}
}
原则2: 幂等性保证
/**
* 幂等Activity示例
*/
public class IdempotentActivity extends AbstractActivity {
@Autowired
private IdempotentService idempotentService;
@Override
public ActivityResult execute(FlowContext context) {
String bizIdentify = context.getBizIdentify();
// 幂等校验
if (idempotentService.isProcessed(bizIdentify)) {
logger.info("Request already processed, bizIdentify: {}", bizIdentify);
return getSuccessResult();
}
try {
// 业务处理
doBusinessLogic(context);
// 标记为已处理
idempotentService.markProcessed(bizIdentify);
return getSuccessResult();
} catch (Exception e) {
logger.error("Business logic failed", e);
return getFailResult("BIZ_ERROR", e.getMessage());
}
}
}
9.2 异常处理策略
/**
* 带异常处理的Activity
*/
public class SafeActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
try {
// 业务逻辑
return doExecute(context);
} catch (BusinessException e) {
// 业务异常,记录日志并返回失败
logger.error("Business error: {}", e.getMessage(), e);
return getFailResult(e.getErrorCode(), e.getErrorMsg());
} catch (Exception e) {
// 系统异常,记录详细日志
logger.error("System error", e);
return getFailResult("SYSTEM_ERROR", "系统异常,请稍后重试");
}
}
protected abstract ActivityResult doExecute(FlowContext context);
}
9.3 上下文数据传递
/**
* 发送数据到上下文的Activity
*/
public class DataProducerActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
UserEntity userEntity = queryUserData();
// 存入上下文,供后续Activity使用
context.put("userEntity", userEntity);
context.put("userId", userEntity.getUserId());
return getSuccessResult();
}
}
/**
* 从上下文获取数据的Activity
*/
public class DataConsumerActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
// 从上下文获取数据
UserEntity userEntity = getRequiredContextParam(context, "userEntity");
String userId = getContextParam(context, "userId", "");
// 使用数据
processUser(userEntity);
return getSuccessResult();
}
}
性能优化
10.1 异步执行
/**
* 支持异步执行的Activity
*/
public class AsyncActivity extends AbstractActivity {
@Autowired
private AsyncTaskExecutor asyncTaskExecutor;
@Override
public boolean isAsync() {
return true;
}
@Override
public ActivityResult execute(FlowContext context) {
// 提交异步任务
asyncTaskExecutor.submit(() -> {
// 异步业务逻辑
doAsyncBusiness(context);
});
// 立即返回成功
return getSuccessResult();
}
}
10.2 批量处理
/**
* 批量处理Activity
*/
public class BatchProcessActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
List<OrderItem> items = context.getRequest().getItems();
// 批量查询,减少数据库访问次数
Map<String, Product> productMap = batchQueryProducts(items);
// 批量处理
List<CompletableFuture<Void>> futures = items.stream()
.map(item -> CompletableFuture.runAsync(() -> {
processItem(item, productMap.get(item.getProductId()));
}))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return getSuccessResult();
}
}
扩展机制
11.1 自定义拦截器
/**
* Activity拦截器
*/
public interface ActivityInterceptor {
/**
* 前置拦截
*/
default boolean beforeExecute(FlowContext context, FlowActivity activity) {
return true;
}
/**
* 后置拦截
*/
default void afterExecute(FlowContext context, FlowActivity activity, ActivityResult result) {
}
/**
* 异常拦截
*/
default void onException(FlowContext context, FlowActivity activity, Exception e) {
}
}
11.2 自定义流程路由
/**
* 条件路由Activity
*/
public class ConditionalActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
String condition = context.get("routeCondition");
if ("A".equals(condition)) {
// 路由到Activity A
context.put("nextActivity", "activityA");
} else if ("B".equals(condition)) {
// 路由到Activity B
context.put("nextActivity", "activityB");
}
return getSuccessResult();
}
}
FAQ
Q1: LiteFlowEngine是已有开源框架吗?
A: 不是。LiteFlowEngine是本次自研的轻量级流程引擎框架,汲取了Activiti、Camunda、LiteFlow等框架的设计精华,结合中小型项目的实际需求,提供了一套开箱即用的流程编排解决方案。
核心特点:
- ✅ 完全脱敏,无内源信息
- ✅ 接口重新命名,避免冲突
- ✅ 提供完整实现代码
- ✅ 生产环境验证
Q2: 为什么选择LiteFlowEngine而不是Activiti?
A: 根据项目实际需求选择:
选择LiteFlowEngine的场景:
- 中小型项目,流程相对简单
- 需要快速落地,学习成本有限
- 不需要复杂的流程监控和建模工具
- 传统编程团队,熟悉面向对象开发
选择Activiti的场景:
- 企业级项目,流程复杂
- 需要可视化流程监控中心
- 流程经常变更,需要业务人员参与建模
- 团队有BPMN规范经验
Q3: LiteFlowEngine与LiteFlow(国产开源)的区别?
A: 两者都是轻量级流程引擎,但设计理念不同:
| 对比项 | LiteFlow(开源) | LiteFlowEngine(自研) |
|---|---|---|
| 设计理念 | EL表达式规则引擎 | 模板方法模式 |
| 学习成本 | 需要学习EL规则语法 | 传统OOP,零学习成本 |
| 使用方式 | 编写规则表达式 | 继承Handler模板类 |
| 开箱即用 | 需要学习配置规则 | 直接使用模板 |
| 分布式锁 | 无内置支持 | 内置分布式锁模板 |
推荐:
- 如果团队熟悉规则引擎,选择LiteFlow
- 如果团队习惯传统OOP开发,选择LiteFlowEngine
Q4: 如何保证流程幂等性?
A: LiteFlowEngine提供三层幂等保障:
@startuml
!theme plain
skinparam rectangle {
BackgroundColor #E3F2FD
BorderColor #2196F3
}
title 幂等保障三层架构
rectangle "L1: Handler层幂等" as L1 {
rectangle "分布式锁\n+ 数据库唯一索引"
}
rectangle "L2: 业务流水号幂等" as L2 {
rectangle "业务流水号\n+ 状态机校验"
}
rectangle "L3: Activity层幂等" as L3 {
rectangle "Activity内部\n业务幂等校验"
}
L1 -[#2196F3]-> L2
L2 -[#2196F3]-> L3
@enduml
具体实现:
// L1: Handler层使用IdempotentHandler模板
public class OrderCreateHandler extends IdempotentHandler<OrderRequest, OrderResponse> {
@Override
protected String buildLockKey(OrderRequest request) {
return "ORDER:CREATE:" + request.getOrderId();
}
@Override
protected OrderResponse checkIdempotent(OrderRequest request) {
// L2: 检查订单号是否已存在
Order existOrder = orderService.queryByOrderId(request.getOrderId());
if (existOrder != null) {
OrderResponse response = new OrderResponse();
response.setSuccess(true);
response.setOrderId(existOrder.getOrderId());
return response;
}
return null;
}
}
// L3: Activity层幂等
public class CreateOrderActivity extends AbstractActivity {
@Override
public ActivityResult execute(FlowContext context) {
OrderRequest request = context.getRequest();
// 再次检查订单是否存在
if (orderService.exists(request.getOrderId())) {
logger.warn("Order already exists: {}", request.getOrderId());
return getSuccessResult(); // 幂等返回
}
// 创建订单
orderService.createOrder(request);
return getSuccessResult();
}
}
Q5: 如何处理流程中断和回滚?
A:
- 流程中断: Activity返回失败结果,引擎自动中断后续Activity
- 流程回滚: 自行实现补偿Activity,参考Saga模式
Q4: 如何监控流程执行情况?
A:
- 日志埋点:关键节点记录日志
- 链路追踪:集成Tracer组件
- 监控大盘:上报监控指标
Q5: 支持并发执行吗?
A:
支持。两种方式:
- 异步Activity:实现
isAsync()方法 - 并发编排:在Activity内部使用CompletableFuture
附录A: 完整Maven依赖
<dependencies>
<!-- LiteFlowEngine核心 -->
<dependency>
<groupId>com.liteflow</groupId>
<artifactId>lite-flow-engine</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Spring集成(可选) -->
<dependency>
<groupId>com.liteflow</groupId>
<artifactId>lite-flow-engine-spring</artifactId>
<version>1.0.0</version>
</dependency>
<!-- 日志(必需) -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<!-- 工具类(可选) -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
</dependencies>
附录B: 目录结构
lite-flow-engine
├── src/main/java/com/liteflow/engine
│ ├── api/ # 核心接口定义
│ │ ├── FlowActivity.java
│ │ ├── FlowEngine.java
│ │ ├── FlowContext.java
│ │ └── ActivityResult.java
│ ├── core/ # 核心实现
│ │ ├── AbstractActivity.java
│ │ └── DefaultFlowEngineImpl.java
│ ├── config/ # 配置解析
│ │ ├── ActivityRegistry.java
│ │ ├── ProcessConfig.java
│ │ └── ProcessDefinition.java
│ ├── handler/ # Handler基类
│ │ └── AbstractFlowEngineHandler.java
│ └── util/ # 工具类
│ └── FlowContextBuilder.java
├── src/main/resources
│ └── flow-engine-process.xml # 流程配置文件
├── docs/ # 文档
│ ├── README.md
│ └── 快速开始.md
└── pom.xml # Maven依赖
文档版本: v1.0.0
最后更新: 2026-03-22
开源协议: Apache License 2.0
GitHub: https://github.com/opensource/lite-flow-engine
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)