ThreadPoolTaskScheduler实现动态管理定时任务
最近,有个项目有需要用到定时任务,所以做了一个动态管理定时任务的模块。本文将从项目背景、需求、选型、思路、具体实现等方面展开介绍。
背景:有个支付类的项目,中间会产生一些中间态的订单,需要有个定时任务轮询确认订单状态。该类项目体量较小,单节点部署,客户比较多,需要简单快速的部署、维护。
需求:定时任务能够通过表达式灵活指定执行计划,并支持动态启动、关闭、修改。定时任务模块最好和业务包在一个jar包内,部署简单。
选型:说到定时任务,当下最火的当属xxl-job,本案为什么不采用xxl-job呢?不是因为它不够强大,是因为需要单独部署组件,并且需要建一系列相关的表,运维小哥哥不会弄或者嫌麻烦。基于上述原因,考虑采用Springboot自己的定时任务,常见的有两种实现方式,一种基于注解,如:
@Configuration //1.主要用于标记配置类,兼备Component的效果。
@EnableScheduling // 2.开启定时任务
public class SaticScheduleTask {
@Scheduled(cron = "0/5 * * * * ?")
private void configureTasks() {
System.out.println("开始执行静态定时任务时间");
}
}
另一种基于接口,主要代码如:
@Configuration //1.主要用于标记配置类,兼备Component的效果。
@EnableScheduling // 2.开启定时任务
public class DynamicScheduleTask implements SchedulingConfigurer {
/**
* 执行定时任务.
*/
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(
//1.添加任务内容(Runnable)
() -> System.out.println("执行动态定时任务: " + LocalDateTime.now().toLocalTime()),
//2.设置执行周期(Trigger)
triggerContext -> {
String cron = "0/5 * * * * ?";//这个表达式可以写在配置文件里,也可以从数据库读取
//合法性校验
if (StringUtils.isEmpty(cron)) {
//这里写具体的业务代码
}
//返回执行周期(Date)
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
}
}
但是上述两种都不太灵活,不能动态的启动、关闭和修改。最后,选择通过ThreadPoolTaskScheduler来实现动态管理定时任务。
思路:
1、ThreadPoolTaskScheduler可以实现任务调度,支持基于cron表达式的任务,其提交任务接口如下:
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
//...
}
那么,定时任务业务实现类实现Runnable接口,用cron构造CronTrigger,就可以完成任务提交。提交任务的时候将回调对象ScheduledFuture记录下来,后面可以用它的cancel方法实现任务的关闭。
2、在数据库建一张任务表,主要有任务编号、任务名称、调度计划表达式、任务状态等字段。在项目启动后,自动查询任务表,加载任务状态为启动状态的任务。
3、写一个前端管理页面,主要用于查询展示定时任务,新增、启动、关闭和修改定时任务。通过管理页面对任务进行启动、关闭等操作时,一方面修改数据库中的状态,另一方面也要同步做启动或者关闭任务操作。
具体实现:
核心代码:
@Component
public class DynamicTimedTask implements ApplicationRunner {
@Autowired
TimedTaskService timedTaskService;
private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);
/**
* @Author yrd
* @Description 项目启动后自动加载数据库里状态为启动的定时任务
* @Date 10:40 2021/12/20
* @Param [args]
* @return void
**/
@Override
public void run(ApplicationArguments args) throws Exception {
//查询数据库中定时任务信息
List<Map> timedTasks = timedTaskService.queryAllTimedTask();
for (Map timedTask : timedTasks) {
String taskId = timedTask.getString("taskid");//任务编号
String cron = timedTask.getString("cron");//表达式
String status = timedTask.getString("status");//任务状态
if ("1".equals(status)) {
startTask(taskId, cron);
}
}
}
//接受任务的返回结果
private ConcurrentHashMap<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
//实例化一个线程池任务调度类,可以使用自定义的ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler();
schedulerPool.setPoolSize(3);
schedulerPool.setWaitForTasksToCompleteOnShutdown(true);
schedulerPool.setAwaitTerminationSeconds(60);
return schedulerPool;
}
/**
* 启动定时任务
* @return
*/
public boolean startTask(String taskId, String cron) {
boolean flag = false;
ScheduledFuture future = futureMap.get(taskId);
if (future != null) {
if (future.isCancelled()) {
logger.info("任务【{}】已存在但是关闭状态!!!", taskId);
} else {
logger.info("任务【{}】已存在且已启动!!!", taskId);
return true;
}
}
if (SpringBeanUtils.containsBean(taskId)) {
future = threadPoolTaskScheduler.schedule((Runnable) SpringBeanUtils.getBean(taskId), new CronTrigger(cron));
if (future != null){
flag = true;
futureMap.put(taskId, future);
logger.info("任务【{}】启动成功!!!", taskId);
}else {
logger.info("任务【{}】启动失败!!!", taskId);
}
} else {
logger.info("任务【{}】未实现!!!", taskId);
}
return flag;
}
/**
* 停止定时任务
* @return
*/
public boolean stopTask(String taskId) {
boolean flag = false;
ScheduledFuture future = futureMap.get(taskId);
if (future == null) {
logger.info("任务【{}】不在任务队列中!!!", taskId);
flag = true;
} else {
if (future.isCancelled()) {
logger.info("任务【{}】已经是关闭状态!!!", taskId);
flag = true;
} else {
boolean cancel = future.cancel(true);
if (cancel){
flag = true;
logger.info("任务【{}】关闭成功!!!", taskId);
}else {
logger.info("任务【{}】关闭失败!!!", taskId);
}
}
}
return flag;
}
}
说明:上述代码用到了通过bean的ID从应用上下文中获取bean实例,具体实现方法网上有很多介绍,本文不再详述。因此,具体任务的业务实现类bean的ID需要与任务ID一致,上述代码中startTask方法也对次作了判断。如图:
任务管理页面对应的controller类:
@Controller
@RequestMapping("/timedTask")
public class TimedTaskManageController {
@Autowired
TimedTaskService timedTaskService;
/**
* @Author yrd
* @Description 进入定时任务管理页面
* @Date 10:37 2021/12/20
* @Param []
* @return java.lang.String
**/
@RequestMapping("/timedTaskManagePage")
public String timedTaskManagePage() throws AppException {
return "/timedTask/timedTaskManage";
}
/**
* @Author yrd
* @Description 查询定时任务
* @Date 10:38 2021/12/20
* @Param [para]
* @return java.util.List
**/
@RequestMapping("/queryTimedTask")
@ResponseBody
public List queryTimedTask(@RequestBody DataObject para) throws Exception {
return timedTaskService.queryTimedTask(para);
}
/**
* @Author yrd
* @Description 进入新增定时任务页面
* @Date 10:38 2021/12/20
* @Param []
* @return java.lang.String
**/
@RequestMapping("/addTimedTaskPage")
public String addTimedTaskPage() throws Exception {
return "/timedTask/addTimedTask";
}
/**
* @Author yrd
* @Description 根据任务编号检查定时任务
* @Date 10:38 2021/12/20
* @Param [taskId]
* @return java.lang.String
**/
@RequestMapping("/checkTimedTask")
@ResponseBody
public String checkTimedTask(@RequestParam("taskId") String taskId) throws Exception {
Map map = timedTaskService.queryTimedTaskById(taskId);
JSONObject object = new JSONObject();
if (map.isEmpty()) {
if (SpringBeanUtils.containsBean(taskId)) {
object.put("success", "true");
object.put("msg", "");
} else {
object.put("success", "false");
object.put("msg", "该任务编号对应的任务未实现!");
}
} else {
object.put("success", "false");
object.put("msg", "任务编号重复!");
}
return object.toString();
}
/**
* @Author yrd
* @Description 新增定时任务
* @Date 10:39 2021/12/20
* @Param [request]
* @return java.lang.String
**/
@RequestMapping("/addTimedTask")
@ResponseBody
public String addTimedTask(HttpServletRequest request) throws Exception {
Map param = new HashMap();
param.put("taskId", request.getParameter("taskId").toString());
param.put("taskName", request.getParameter("taskName").toString());
param.put("cron", request.getParameter("cron").toString());
return timedTaskService.addTimedTask(param);
}
/**
* @Author yrd
* @Description 进入修改定时任务页面
* @Date 10:39 2021/12/20
* @Param [taskId, model]
* @return java.lang.String
**/
@RequestMapping("/editTimedTaskPage")
public String editTimedTaskPage(@RequestParam("taskId") String taskId , Model model) throws Exception {
Map map = timedTaskService.queryTimedTaskById(taskId);
model.addAttribute("param", map);
return "/timedTask/editTimedTask";
}
/**
* @Author yrd
* @Description 启动定时任务
* @Date 10:39 2021/12/20
* @Param [taskId]
* @return java.lang.String
**/
@RequestMapping("/startTask")
@ResponseBody
public String startTask(@RequestParam("taskId") String taskId) throws Exception {
return timedTaskService.startTask(taskId);
}
/**
* @Author yrd
* @Description 关闭定时任务
* @Date 10:39 2021/12/20
* @Param [taskId]
* @return java.lang.String
**/
@RequestMapping("/stopTask")
@ResponseBody
public String stopTask(@RequestParam("taskId") String taskId) throws Exception {
return timedTaskService.stopTask(taskId);
}
/**
* @Author yrd
* @Description 修改定时任务
* @Date 10:39 2021/12/20
* @Param [request]
* @return java.lang.String
**/
@RequestMapping("/editTimedTask")
@ResponseBody
public String editTimedTask(HttpServletRequest request) throws Exception {
Map param = new HashMap();
param.put("taskId", request.getParameter("taskId").toString());
param.put("taskName", request.getParameter("taskName").toString());
param.put("cron", request.getParameter("cron").toString());
return timedTaskService.editTimedTask(param);
}
}
前端、service及dao层的对应代码本文不再详述,以上功能亲测可以使用,并已运用到实际项目当中去。
更多推荐
所有评论(0)