xxl-job源码阅读——(六)调度线程与时间轮算法
本章介绍init()最后一个步骤,初始化调度线程。
另外 第六步的JobLogReportHelper.getInstance().start()只是做了一个日志整理收集,最终在页面图表展示的工作,这里不再深入。
JobScheduleHelper.getInstance().start()开启了两个线程,基础调度scheduleThread,与时间轮调度ringThread
一. 时间对齐
scheduleThread作为时间调度线程,自身的时间是如何对齐到整秒上的呢?
下图可见,在线程启动的同时,sleep了5000-System.currentTimeMillis()%1000 个毫秒。
举例: 现在是17:37:05的100ms,那么上述公式 = 4900ms,从现在开始睡眠4900ms,唤醒的时刻,便是整秒的17:37:10。
那么为什么是5s呢?再看预读时间变量,一致。预读时间变量的作用是每次读取现在开始的未来5s内的任务,用于处理执行。
shceduled线程在while循环的最后还有一次时间对齐:如果预读处理了一些数据,那么就等待到下一个整s,如果没有预读到数据说明当前无任务,直接等待下一个5s。
时间轮线程也有同样的时间对齐,只不过不是5s,而是1s,不再展开。
二. scheduleThread 调度线程
跳过时间对齐,往后看
- 计算预读数据,这里的数据是作者根据qps平均计算得到的,正常case下5s内能够处理的数量。这里的时间计算只涉及调度过程,实际trigger业务已经被快慢线程池接手,所以这里的数量预估理论上是没问题的。
- 悲观锁,这一步感觉目前没有意义,调度器本身并没有支持多节点部署。
- 根据预读数量和预读时间,取出即将要处理的任务。
- 如果当前时间已经超过了任务原定计划时间+5s的范围,则跳过,本次不再执行。
- 如果当前时间已经超过原定计划时间但是未超过5s,还能抢救一下,执行任务。
并且如果下一次执行时间再未来5s内,那么直接将任务塞给时间轮线程,让时间轮线程负责下一次执行
- 还未到执行时间,直接扔给时间轮线程。
- 更新任务信息,(下一次执行时间,任务状态等)。
由上可见,未过期的任务,在5s的时间范围内,精确的调度都被交给了时间轮线程
,下面我们就继续深入,了解一下时间轮算法的实现。
三. ringThread 时间轮(算法)线程
原理
思考一下,我们实现一个遵循cron表达式的调度功能会怎么做?
方案1
,启动一个线程,计算将要执行时间到当前时间的秒数,直接sleep这个秒数。当执行完一次任务后,再计算下次执行时间到当前时间的秒数,继续sleep。
这个方法想想也不是不行,但是缺点是,当我们需要多个cron任务时,需要开启多个线程,造成资源的浪费。
方案2
,只用一个守护线程,任务死循环扫任务数据,拿执行时间距离当前最近的任务,如果该任务时间等同于当前时间(或者在当前之间很小的一个范围内),则执行,否则不执行,等待下一个循环。
此方案似乎解决了线程数量爆炸的问题,但是又会引入一个新的问题,如果某一个任务执行时间太长,显然会阻塞其他任务,导致其他任务不能及时执行。
方案3
,在方案2
的基础上,责任拆分,一个线程为调度线程,另外有一个线程池为执行线程池,这样便可以一定程度避免长任务阻塞的问题。
但是,毫无限制的死循环查询数据,无论这个任务数据存在数据库还是其他地方,似乎都不是一个优雅的方案。那么有没有一种方式,能如同时钟一般,指针到了才执行对应时间的任务。
- 时间轮算法
顾名思义,时间轮其实很简单,就是用实际的时钟刻度槽位来存储任务,如下图,我们以小时为单位,9:00执行A任务,10:00执行B,C任务。
这里的刻度当然也可以更细致,比如把一天切分成246060个秒的刻度,秒的刻度上挂任务。
我们只需要在方案3
的基础上改造:- 声明一个变量Map<时间刻度,所属任务集合>。
- 任务增加时,只需要增加到对应的时间轮上。
- 仍然有一个线程在死循环,按照秒的刻度1秒执行一次(如何对齐时间请看第一部分),到达这一秒时从Map中取出对应任务,使用线程池进行执行。
时间轮算法也不是完美的,如果某一个刻度上的任务太多,即便任务的执行使用线程池处理,仍然可能会导致执行到下一秒还没完成。毕竟我们对任务的调度,总要对任务的状态等细节进行处理,尤其是这些状态的更新依赖数据库等外部数据源时。
源码实现
xxl-job 的时间轮算法实现与上述有所区别,通过之前的描述我们已经知道scheduleThread已经做了调度的一部分工作,包括取出任务,对过期/到期任务进行执行。
而对将来5秒内将要执行的任务,scheduleThread则是通过下图0
的pushTimRing
方法扔给了时间轮Map:
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
ringData是一个秒级的时间轮,时间轮的范围是0~59.
- 第一步,线程启动时,对齐这一秒。
- 第二步,通过当前秒获取ringData中的任务,同时为了防止之前有延时产生,也检查一下前一秒的刻度中是否还存在未处理的任务。
- 第三步,触发任务,扔到快慢线程池去处理。
- 第四步,清理临时变量。
- 第五步,对齐这一秒。
总结: xxl-job实现的是一个5s一次的定时任务调度,同时对未来5s将被执行的任务,使用一个范围为一分钟,刻度为秒的时间轮算法来执行。
欢迎关注微信公众号 【JAVA技术分享官】,公众号首发,持续输出原创高质量JAVA开发者知识点
更多推荐
所有评论(0)