Xxl-Job调度器原理解析
xxl-job
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
项目地址:https://gitcode.com/gh_mirrors/xx/xxl-job
免费下载资源
·
xxl-job版本:2.3.0
Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。
调度器初始化过程步骤如下
1 国际化相关
配置参数: xxl.job.i18n=zh_CN, 这里设置为中文简体
2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool
配置参数:xxl.job.triggerpool.fast.max=200, 这里设置为fastTriggerPool的最大线程数=200, 不能小于200
xxl.job.triggerpool.slow.max=100, 这里设置为slowTriggerPool的最大线程数=100, 不能小于100
3 启动注册监听线程
3.1 初始化registryOrRemoveThreadPool线程池:用于
注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread:
清除心跳超过90s的注册信息,并且刷新分组注册信息
4 启动失败任务监听线程(重试、告警)
配置参数:spring.mail.from=xxx@qq.com, 告警邮箱
5 启动监控线程
5.1 初始化callbackThreadPool线程池:用于callback
回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态
超过10min,且对应执行器心跳注册
失败不在线,则将本地调度主动标记失败
6 启动日志统计和清除线程logrThread
-- 日志记录刷新,刷新
最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次
失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除
7 启动任务调度(很重要!!主要靠这两个线程进行塞数据到时间轮,然后时间轮取数调度任务)
7.1 scheduleThread线程-取待执行任务数据入时间轮(塞数据)
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
---- preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
**** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
-------- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
-------- 2、刷新上一次触发 和 下一次待触发时间
**** 当前时间 大于 任务的下一次触发时间 并且是没有过期的:
-------- 1、直接触发任务执行器
-------- 2、刷新上一次触发 和 下一次待触发时间
-------- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度
---------------- 1、求当前任务下一次触发时间所处一分钟的第N秒
---------------- 2、将当前任务ID和ringSecond放进时间轮里面
---------------- 3、刷新上一次触发 和 下一次待触发时间
**** 当前时间 小于 下一次触发时间:
-------- 1、求当前任务下一次触发时间所处一分钟的第N秒
-------- 2、将当前任务ID和ringSecond放进时间轮里面
-------- 3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time
-- 第五步:提交数据库事务,释放数据库select for update排它锁
7.2 ringThread线程
-根据时间轮执行job任务 (取数据执行)
首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-------- 阻塞策略:串行、废弃后面、覆盖前面
-------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询
初始化的入口代码为
XxlJobAdminConfig,代码如下:
@Component
public class
XxlJobAdminConfig
implements
InitializingBean, DisposableBean
{
private static
XxlJobAdminConfig
adminConfig
=
null;
public static
XxlJobAdminConfig
getAdminConfig
() {
return
adminConfig
;
}
// ---------------------- XxlJobScheduler ----------------------
private
XxlJobScheduler
xxlJobScheduler
;
@Override
public void
afterPropertiesSet
()
throws
Exception {
// 生命周期中的属性注入来对xxlJobScheduler初始化
adminConfig
=
this;
//
初始化
xxl-job
定时任务
xxlJobScheduler
=
new
XxlJobScheduler()
;
xxlJobScheduler
.init()
;
}
@Override
public void
destroy
()
throws
Exception {
// 生命周期中的销毁来对xxlJobScheduler销毁
xxlJobScheduler
.destroy()
;
}
..............省略..............
}
xxlJobScheduler.init()进行初始化会执行如下过程:
public class
XxlJobScheduler {
private static final
Logger
logger
= LoggerFactory.
getLogger
(XxlJobScheduler.
class
)
;
public void
init
()
throws
Exception {
// 1
国际化相关
init i18n
initI18n()
;
// 2
初始化快线程池
fastTriggerPool
、慢线程池
slowTriggerPool admin trigger pool start
JobTriggerPoolHelper.
toStart
()
;
// 3
启动注册监听线程
admin registry monitor run
JobRegistryHelper.
getInstance
().start()
;
// 4
启动失败任务监听线程
(
重试、告警
) admin fail-monitor run
JobFailMonitorHelper.
getInstance
().start()
;
// 5
启动监控线程(调度记录停留在
"
运行中
"
状态超过
10min
,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败)
admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.
getInstance
().start()
;
// 6
启动日志统计和清除线程(日志记录刷新,刷新最近三天的日志
Report
(即统计每天的失败、成功、运行次数等);每天清除一次失效过期的日志数据)
admin log report start
JobLogReportHelper.
getInstance
().start()
;
// 7
启动任务调度
(scheduleThread-
取待执行任务数据入时间轮;
ringThread-
根据时间轮执行
job
任务
) start-schedule ( depend on JobTriggerPoolHelper )
JobScheduleHelper.
getInstance
().start()
;
logger
.info(
">>>>>>>>> init xxl-job admin success."
)
;
}
................省略........................
}
上面初始化的7个步骤拆分如下==============
1 国际化相关
private void
initI18n
(){
// 根据环境设置title为中文、英文等
for
(ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.
values
()) {
item.setTitle(I18nUtil.
getString
(
"jobconf_block_"
.concat(item.name())))
;
}
}
2 初始化快线程fastTriggerPool、慢线程池slowTriggerPool
这个步骤初始化了两个线程池
fastTriggerPool和
slowTriggerPool
在触发调度的时候会有一个选择快慢线程池的过程,如果job在一分钟内超过超过10次,就用slowTriggerPool来处理,如下:
ThreadPoolExecutor
triggerPool_
=
fastTriggerPool
;
AtomicInteger jobTimeoutCount =
jobTimeoutCountMap
.get(jobId)
;
if
(jobTimeoutCount!=
null
&& jobTimeoutCount.get() >
10
) {
// job
在一分钟内超过超过
10
次,就用
slowTriggerPool
来处理
job-timeout 10 times in 1 min
triggerPool_ =
slowTriggerPool
;
}
triggerPool_.execute(
new
Runnable() {.........省略............}
3 启动注册监听线程
3.1 初始化registryOrRemoveThreadPool线程池:用于
注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销
3.2 启动监听注册的线程registryMonitorThread:
清除心跳超过90s的注册信息,并且刷新分组注册信息
public void start(){
// 用于注册或者移除的线程池,客户端调用api/registry或api/registryRemove接口时,会用这个线程池进行注册或注销 for registry or remove
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(
2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
// 启动监听注册的线程 for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 获取自动注册的执行器组(执行器地址类型:0=自动注册、1=手动录入) auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty())
{// group组集合不为空
// 移除死掉的调用地址(心跳时间超过90秒,就当线程挂掉了。默认是30s做一次心跳) remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
// 移除挂掉的注册地址信息
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// 找出所有正常没死掉的注册地址
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
// 确保是 EXECUTOR 执行器类型
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// 刷新分组注册地址信息 fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
}
4 启动失败任务监听线程(重试、告警)
这部分逻辑比较简单,就是重试 + 告警,核心代码如下
// 获取执行失败的job信息
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、失败塞回重试 fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、进行失败告警 fail alarm monitor
int newAlarmStatus = 0;
// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
5 启动监控线程
5.1 初始化callbackThreadPool线程池:用于callback
回调的线程池,客户端调用api/callback接口时会使用这个线程池
5.2 启动监控线monitorThread:调度记录停留在 "运行中" 状态
超过10min,且对应执行器心跳注册
失败不在线,则将本地调度主动标记失败
逻辑较简单,如上两点
6 启动日志统计和清除线程logrThread
-- 日志记录刷新,刷新
最近三天的日志Report(即统计每天的失败、成功、运行次数等)
-- 每天清除一次
失效过期的日志数据
配置参数:xxl.job.logretentiondays=30, 清除xxl-job数据库日志的过期时间, 小于7天则不清除
逻辑较简单,如上两点
7 启动任务调度
7.1 scheduleThread-取待执行任务数据入时间轮
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
---- preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
**** 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
-------- 1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
-------- 2、刷新上一次触发 和 下一次待触发时间
**** 当前时间 大于 任务的下一次触发时间 并且是没有过期的:
-------- 1、直接触发任务执行器
-------- 2、刷新上一次触发 和 下一次待触发时间
-------- 3、如果下一次触发在五秒内,直接放进时间轮里面待调度
---------------- 1、求当前任务下一次触发时间所处一分钟的第N秒
---------------- 2、将当前任务ID和ringSecond放进时间轮里面
---------------- 3、刷新上一次触发 和 下一次待触发时间
**** 当前时间 小于 下一次触发时间:
-------- 1、求当前任务下一次触发时间所处一分钟的第N秒
-------- 2、将当前任务ID和ringSecond放进时间轮里面
-------- 3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time
-- 第五步:提交数据库事务,释放数据库select for update排它锁
7.2 ringThread-根据时间轮执行job任务
首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-------- 阻塞策略:串行、废弃后面、覆盖前面
-------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询
启动两个线程解析的核心源码如下:
public void
start
(){
//
启动调度线程,这些线程是用来取数据的
schedule thread
scheduleThread
=
new
Thread(
new
Runnable() {
@Override
public void
run
() {
try
{
//
不知道为啥要休眠
4-5
秒 时间,然后再启动
TimeUnit.
MILLISECONDS
.sleep(
5000
- System.
currentTimeMillis
()%
1000
)
;
}
catch
(InterruptedException e) {
if
(!
scheduleThreadToStop
) {
logger
.error(e.getMessage()
,
e)
;
}
}
logger
.info(
">>>>>>>>> init xxl-job admin scheduler success."
)
;
//
这里是预读数量
pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int
preReadCount = (XxlJobAdminConfig.
getAdminConfig
().getTriggerPoolFastMax() + XxlJobAdminConfig.
getAdminConfig
().getTriggerPoolSlowMax()) *
20
;
while
(!
scheduleThreadToStop
) {
//
扫描任务
Scan Job
long
start = System.
currentTimeMillis
()
;
Connection conn =
null;
Boolean connAutoCommit =
null;
PreparedStatement preparedStatement =
null
boolean
preReadSuc =
true;
try
{
conn = XxlJobAdminConfig.
getAdminConfig
().getDataSource().getConnection()
;
connAutoCommit = conn.getAutoCommit()
;
conn.setAutoCommit(
false
)
;
//
采用
select for update
,是排它锁。说白了
xxl-job
用一张数据库表来当分布式锁了,确保多个
xxl-job admin
节点下,依旧只能同时执行一个调度线程任务
preparedStatement = conn.prepareStatement(
"select
*
from xxl_job_lock where lock_name = 'schedule_lock' for update"
)
;
preparedStatement.execute()
;
// tx start
// 1
、预读数据
pre read
long
nowTime = System.
currentTimeMillis
()
;
// --
从数据库中读取截止到五秒后未执行的
job
,并且读取
preReadCount=6000
条
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.
getAdminConfig
().getXxlJobInfoDao().scheduleJobQuery(nowTime +
PRE_READ_MS
,
preReadCount)
;
if
(scheduleList!=
null
&& scheduleList.size()>
0
) {
// 2
、
push
压进 时间轮
push time-ring
for
(XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if
(nowTime > jobInfo.getTriggerNextTime() +
PRE_READ_MS
) {
//
当前时间 大于 (任务的下一次触发时间
+ PRE_READ_MS
(
5s
))
,
可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
// 2.1
、
trigger-expire > 5s
:
pass && make next-trigger-time
logger
.warn(
">>>>>>>>>>> xxl-job, schedule misfire, jobId = "
+ jobInfo.getId())
;
// 1
、匹配过期失效的策略:
DO_NOTHING=
过期啥也不干,废弃;
FIRE_ONCE_NOW=
过期立即触发一次
misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.
match
(jobInfo.getMisfireStrategy()
,
MisfireStrategyEnum.
DO_NOTHING
)
;
if
(MisfireStrategyEnum.
FIRE_ONCE_NOW
== misfireStrategyEnum) {
// FIRE_ONCE_NOW
》
trigger
JobTriggerPoolHelper.
trigger
(jobInfo.getId()
,
TriggerTypeEnum.
MISFIRE
,
-
1
, null, null, null
)
;
logger
.debug(
">>>>>>>>>>> xxl-job, schedule push trigger : jobId = "
+ jobInfo.getId() )
;
}
// 2
、刷新上一次触发 和 下一次待触发时间
fresh next
refreshNextValidTime(jobInfo
, new
Date())
;
}
else if
(nowTime > jobInfo.getTriggerNextTime()) {
//
当前时间 大于 任务的下一次触发时间 并且是没有过期的
// 2.2
、
trigger-expire < 5s
:
direct-trigger && make next-trigger-time
// 1
、直接触发任务执行器
trigger
JobTriggerPoolHelper.
trigger
(jobInfo.getId()
,
TriggerTypeEnum.
CRON
,
-
1
, null, null, null
)
;
logger
.debug(
">>>>>>>>>>> xxl-job, schedule push trigger : jobId = "
+ jobInfo.getId() )
;
// 2
、刷新上一次触发 和 下一次待触发时间
fresh next
refreshNextValidTime(jobInfo
, new
Date())
;
//
如果下一次触发在五秒内,直接放进时间轮里面待调度
next-trigger-time in 5s, pre-read again
if
(jobInfo.getTriggerStatus()==
1
&& nowTime +
PRE_READ_MS
> jobInfo.getTriggerNextTime()) {
// 1
、求当前任务下一次触发时间所处一分钟的第
N
秒
make ring second
int
ringSecond = (
int
)((jobInfo.getTriggerNextTime()/
1000
)%
60
)
;
// 2
、将当前任务
ID
和
ringSecond
放进时间轮里面
push time ring
pushTimeRing(ringSecond
,
jobInfo.getId())
;
// 3
、刷新上一次触发 和 下一次待触发时间
fresh next
refreshNextValidTime(jobInfo
, new
Date(jobInfo.getTriggerNextTime()))
;
}
}
else
{
//
当前时间 小于 下一次触发时间
// 2.3
、
trigger-pre-read
:
time-ring trigger && make next-trigger-time
// 1
、求当前任务下一次触发时间所处一分钟的第
N
秒
make ring second
int
ringSecond = (
int
)((jobInfo.getTriggerNextTime()/
1000
)%
60
)
;
// 2
、将当前任务
ID
和
ringSecond
放进时间轮里面
push time ring
pushTimeRing(ringSecond
,
jobInfo.getId())
;
// 3
、刷新上一次触发 和 下一次待触发时间
fresh next
refreshNextValidTime(jobInfo
, new
Date(jobInfo.getTriggerNextTime()))
;
}
}
// 3
、更新数据库执行器信息,如
trigger_last_time
、
trigger_next_time update trigger info
for
(XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.
getAdminConfig
().getXxlJobInfoDao().scheduleUpdate(jobInfo)
;
}
}
else
{
preReadSuc =
false;
}
// tx stop
}
catch
(Exception e) {
if
(!
scheduleThreadToStop
) {
logger
.error(
">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}"
,
e)
;
}
}
finally
{
//
提交事务,释放数据库
select for update
的锁
commit
.......................省略.............
}
long
cost = System.
currentTimeMillis
()-start
;
//
如果执行太快了,就稍微
sleep
等待一下
Wait seconds, align second
if
(cost <
1000
) {
// scan-overtime, not wait
try
{
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.
MILLISECONDS
.sleep((preReadSuc?
1000
:
PRE_READ_MS
) - System.
currentTimeMillis
()%
1000
)
;
}
catch
(InterruptedException e) {
if
(!
scheduleThreadToStop
) {
logger
.error(e.getMessage()
,
e)
;
}
}
})
;
scheduleThread
.setDaemon(
true
)
;
scheduleThread
.setName(
"xxl-job, admin JobScheduleHelper#scheduleThread"
)
;
scheduleThread
.start()
;
//
时间轮线程,用于取出每秒的数据,然后处理
ring thread
ringThread
=
new
Thread(
new
Runnable() {
@Override
public void
run
() {
while
(!
ringThreadToStop
) {
// align second
try
{
TimeUnit.
MILLISECONDS
.sleep(
1000
- System.
currentTimeMillis
() %
1000
)
;
}
catch
(InterruptedException e) {
if
(!
ringThreadToStop
) {
logger
.error(e.getMessage()
,
e)
;
}
}
try
{
// second data
List<Integer> ringItemData =
new
ArrayList<>()
;
//
获取当前所处的一分钟第几秒,然后
for
两次,第二次是为了重跑前面一个刻度没有被执行的的
job list
,避免前面的刻度遗漏了
int
nowSecond = Calendar.
getInstance
().get(Calendar.
SECOND
)
;
//
避免处理耗时太长,跨过刻度,向前校验一个刻度;
for
(
int
i =
0
;
i <
2
;
i++) {
List<Integer> tmpData =
ringData
.remove( (nowSecond+
60
-i)%
60
)
;
if
(tmpData !=
null
) {
ringItemData.addAll(tmpData)
;
}
}
// ring trigger
logger
.debug(
">>>>>>>>>>> xxl-job, time-ring beat : "
+ nowSecond +
" = "
+ Arrays.
asList
(ringItemData) )
;
if
(ringItemData.size() >
0
) {
// do trigger
for
(
int
jobId: ringItemData) {
//
执行触发器
do trigger
JobTriggerPoolHelper.
trigger
(jobId
,
TriggerTypeEnum.
CRON
,
-
1
, null, null, null
)
;
}
//
清除当前刻度列表的数据
clear
ringItemData.clear()
;
}
}
catch
(Exception e) {
if
(!
ringThreadToStop
) {
logger
.error(
">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}"
,
e)
;
}
}
}
logger
.info(
">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"
)
;
}
})
;
ringThread
.setDaemon(
true
)
;
ringThread
.setName(
"xxl-job, admin JobScheduleHelper#ringThread"
)
;
ringThread
.start()
;
}
GitHub 加速计划 / xx / xxl-job
23
10
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:4 个月前 )
e5d26ba2 - 5 个月前
977ad87b - 5 个月前
更多推荐
已为社区贡献2条内容
所有评论(0)