【xxl-job系列】xxl-job2.4.0版本源码梳理及解读
本文将基于xxl-job2.4.0版本源码进行代码的梳理
作者:后端小肥肠
目录
3.2.1. TriggerCallbackThread.getInstance().start()
3.2.2. initEmbedServer(address, ip, port, appname, accessToken)
1. 前言
在企业级应用中,随着业务的复杂化和规模的扩大,任务调度成为提高效率的关键。传统任务调度工具往往面临着单点故障、难以管理的挑战。为了应对这些问题,xxl-job应运而生。作为一款分布式任务调度平台,xxl-job旨在解决企业任务调度的痛点,提供高可用性、易用性的解决方案。
xxl-job主要应用于企业信息系统中的定时任务和任务调度管理,其源码地址为:xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。主要特点如下:
-
分布式任务调度: 支持在多台服务器上进行任务的分布式调度,实现了任务的高可用和负载均衡。
-
动态扩展和管理: 执行器节点可以动态注册和注销,无需重启应用,实现了系统的动态扩缩和灵活管理。
-
易用的任务配置界面: 提供直观、简便的任务配置界面,用户可以轻松配置任务触发规则、执行器选择等。
-
丰富的任务监控和日志查看: 提供全面的任务监控功能,用户可以实时查看任务的执行状态和日志,方便问题追踪和排查。
-
分片广播任务: 支持将一个任务拆分为多个子任务并行执行,提高了任务的执行效率。
2. xxl-job概述
2.1. xxl-job 架构
xxl-job作为一款分布式任务调度平台,其架构核心涵盖了调度中心、执行器和注册中心,为任务管理和执行提供了强大的支持,其架构如下图所示:
调度中心(Admin)
调度中心充当任务的指挥中心,负责任务的管理、调度和监控。通过可视化的任务配置界面,用户可以轻松配置任务的触发规则和执行器选择。调度中心还承担着任务的日志记录和监控职责,帮助用户实时了解任务执行状态。
执行器(Executor)
执行器是实际执行任务的节点,接收调度中心分发的任务并执行。执行器通过注册到注册中心,使调度中心能够动态发现和管理执行器节点。这种设计实现了任务的分布式执行,多个执行器可以并行地执行任务,提高了系统的执行效率。
2.2. xxl-job表结构及接口信息
xxl-job(2.4.0)的表结构如下:
表名 | 中文名 | 表内容 |
---|---|---|
xxl_job_lock | 任务调度锁表 | |
xxl_job_group | 执行器信息表 | 维护任务执行器信息 |
xxl_job_info | 调度扩展信息表 | 用于保存 XXL-JOB 调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等 |
xxl_job_log | 调度日志表 | 用于保存 XXL-JOB 任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等 |
xxxl_job_log_report | 调度日志报表 | 用于存储 XXL-JOB 任务调度日志的报表,调度中心报表功能页面会用到 |
xxl_job_logglue | 任务GLUE日志 | 用于保存 GLUE 更新历史,用于支持 GLUE 的版本回溯功能 |
xxl_job_registry | 执行器注册表 | 维护在线的执行器和调度中心机器地址信息 |
xxl_job_user | 系统用户表 | 调度中心用户信息 |
xxl-job(2.4.0)的接口信息结构如下:
接口归类 | 接口名称 | 接口地址 |
---|---|---|
任务管理接口 | 跳转至任务管理界面接口 | /jobinfo |
新增任务接口 | /jobinfo/add | |
更新任务接口 | /jobinfo/update | |
删除任务接口 | /jobinfo/remove | |
触发任务接口 | /jobinfo/start | |
停止任务接口 | /jobinfo/stop | |
查询任务接口 | /jobinfo/pageList | |
/jobinfo/trigger | ||
/jobinfo/nextTriggerTime | ||
执行器管理接口 | 跳转至执行器管理界面接口 | /jobgroup |
新增执行器接口 | /jobgroup/save | |
更新执行器接口 | /jobgroup/update | |
删除执行器接口 | /jobgroup/remove | |
查询执行器接口 | /jobgroup/pageList | |
查看执行器详情接口 | /jobgroup/loadById | |
日志管理接口 | 查询任务日志接口 | /joblog/pageList |
/joblog/getJobsByGroup | ||
/joblog/logDetailPage | ||
/joblog/logDetailCat | ||
/joblog/logKill | ||
/joblog/clearLog | ||
跳转至日志界面接口 | /joblog | |
用户管理接口 | 跳转至用户管理页面接口 | /user |
分页查询用户列表接口 | /user/pageList | |
新增用户接口 | /user/add | |
更新用户接口 | /user/update | |
删除用户接口 | /remove | |
更新密码接口 | /updatePwd |
JobApiController :对外开放api,提供调度完成,执行器注册或移除的回调操作
JobCodeController:保存glue模式的源代码接口
根据表和接口信息,大致可以看出接口和表的对应关系,本文的重点不在这块,大家稍作了解即可。
3. 源码解读(核心!!)
3.1. 调度中心的初始化操作
首先找到xxl-job-admin包下的XxlJobAdminConfig,如下图所示:
打开 XxlJobAdminConfig的类图:
从XxlJobAdminConfig的类图中可以看出其实现了InitializingBean。InitializingBean
是 Spring 框架中的一个接口,它定义了一个单一的方法 afterPropertiesSet()
,用于在 bean 的所有属性被设置后执行一些初始化操作。当一个 bean 实现了 InitializingBean
接口时,在 Spring 容器实例化该 bean 并设置其所有属性后,容器会调用 afterPropertiesSet()
方法,以便让 bean 在完成基本属性设置后执行自定义的初始化逻辑。
那我们直接来看一下XxlJobAdminConfig的afterPropertiesSet()方法:
上述代码一共做了三件事:
- 初始一个单例对象(XxlJobAdminConfig)
- 初始化xxljob调度器
- 调度器init()操作
继续往下追init()函数:
上述代码一共做了7件事:
1. 国际化
initI18n();
2. 初始化线程池
JobTriggerPoolHelper.toStart();
3. 调度器注册监听
JobRegistryHelper.getInstance().start();
4. 失败任务线程监听
JobFailMonitorHelper.getInstance().start();
5. 任务完成现场监听·
JobCompleteHelper.getInstance().start();
6.日志线程生成今日任务报告并清除过期日志数据库
JobLogReportHelper.getInstance().start();
7.调度任务线程
JobScheduleHelper.getInstance().start();
除了国际化以外我们一步一步往下看一下其中的代码内容。
3.1.1. 初始化线程池
JobTriggerPoolHelper.toStart();
上述代码中初始花了两个线程池,一个是fastTriggerPool,一个是slowTriggerPool。
3.1.2. 调度器注册监听
JobRegistryHelper.getInstance().start();
在start()方法中做了两件事,分别是初始化线程池(registryOrRemoveThreadPool)和启动一个守护线程(registryMonitorThread)。
守护线程的作用在于监听并刷新xxl_job_group中的数据。
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 1. 从xxl_job_group中找adress_type=0的数据,即自动注册的调度器
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// 2. 从xxl_job_registry中根据update_time寻找超过心跳时间3倍(90s)的调度器,并将他们从表中移除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// 3. 定义局部变量appAddressMap
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
//4. 获取xxl_job_registry中的有效数据,registry_key就是appname,将所有EXECUTOR类型的数据放在Map<appname,List>appAddressMap中,让所有同名调度器在一个集合中。
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// 5. 将appAddressMap中的地址按照xxl_job_group中的app_name获取,把地址拼接,逗号分割后保存在xxl_job_group.adress_list中,这样就保存了多个调度器地址
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
3.1.3. 失败任务线程监听
JobFailMonitorHelper.getInstance().start();
在start()方法中,初始化了监听任务执行失败的线程,将其设置为守护线程并启动。
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
//1.获取xxl_job_log中执行失败的任务
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
//2. 更新xxl_job.alarm.status=-1
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// fail retry monitor
//3. 获取xxl_job_info中任务信息,如果xxl_jon_log.executor_fail_retry_count>0则重试,然后executor_fail_retry_count-1更新到表中
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// fail alarm monitor
//4. 失败任务发送告警邮件
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
//5. 更新xxl_job_log中alarm_status
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
3.1.4. 任务完成线程监听
JobCompleteHelper.getInstance().start();
在start()方法中做了两件事,分别注册一个callbacl线程池和初始化监听线程。
初始化监听线程,设置为守护线程并启动。
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// wait for JobTriggerPoolHelper-init
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
monitorThread.start();
3.1.5. 日志线程生成今日任务报告并清除过期日志数据库
JobLogReportHelper.getInstance().start();
在start()方法中初始化了日志线程,设置为守护线程后启动生成今日任务报告,并删除过期日志。
public void start(){
logrThread = new Thread(new Runnable() {
@Override
public void run() {
// last clean log time
long lastCleanLogTime = 0;
while (!toStop) {
// 1、log-report refresh: refresh log report in 3 days
try {
for (int i = 0; i < 3; i++) {
// today
Calendar itemDay = Calendar.getInstance();
itemDay.add(Calendar.DAY_OF_MONTH, -i);
itemDay.set(Calendar.HOUR_OF_DAY, 0);
itemDay.set(Calendar.MINUTE, 0);
itemDay.set(Calendar.SECOND, 0);
itemDay.set(Calendar.MILLISECOND, 0);
Date todayFrom = itemDay.getTime();
itemDay.set(Calendar.HOUR_OF_DAY, 23);
itemDay.set(Calendar.MINUTE, 59);
itemDay.set(Calendar.SECOND, 59);
itemDay.set(Calendar.MILLISECOND, 999);
Date todayTo = itemDay.getTime();
// refresh log-report every minute
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
//查询xxl_job表,获取今天任务总数,今日任务在运行数,今日任务运行成功数,失败姝=总数-运转数-成功数
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// do refresh
//更新xxl_job_report表,如果今日数据不存在则插入,否则更新
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
}
}
// 2、log-clean: switch open & once each day
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
// expire-time
Calendar expiredDay = Calendar.getInstance();
expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
expiredDay.set(Calendar.HOUR_OF_DAY, 0);
expiredDay.set(Calendar.MINUTE, 0);
expiredDay.set(Calendar.SECOND, 0);
expiredDay.set(Calendar.MILLISECOND, 0);
Date clearBeforeTime = expiredDay.getTime();
// clean expired log
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
if (logIds!=null && logIds.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds!=null && logIds.size()>0);
// update clean time
lastCleanLogTime = System.currentTimeMillis();
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");
}
});
logrThread.setDaemon(true);
logrThread.setName("xxl-job, admin JobLogReportHelper");
logrThread.start();
}
3.1.6. 调度任务线程
JobScheduleHelper.getInstance().start();
在star()t方法中初始化了ringThread和scheduleThread。
3.1.6.1 scheduleThread线程逻辑
1. 查询并锁表xxl_job_lock;
2. 获取xxl_job_info中trigger_status=1(可调度)的数据,trigger_next_time小于当前时间5秒后的任务信息:
查询的sql是一个limit条件,limit的数据为
(XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20
getTriggerPoolFastMax()最小返回200,getTriggerPoolSlowMax()最小返回100。
3. 这里分几种情况
3.1 nowTime>trigger_next_time+5s,这种时间需要看任务调度策略,是FIRE_ONCE_NOW的话就要立刻执行一次然后更新;
3.2 第二种是nowTime>trigger_next_time,这种情况就是正常的cron调度,调度完毕后更新trigger_next_time;
3.2.1 任务执行时间超过5秒。同时任务下次调度时间又在5秒内,这时就把任务交给ringThread线程调度,更新trigger_next_time。
3.3 else分支放入ringData中,交给ringThread线程调度,这时因为查询是查询nowTime+5s的数据,有一种任务nowTime+5s>trigger_next_time>nowTime。
4. 更新xxl_job_info数据。
3.1.6.2. 区间说明
上述的if-else-if 三个分支其实对应三种区间。
上图中三个分割线,分割成了4个区间:
if走的是第一区间jobInfo.getTriggerNextTime()<nowTime-5s
else if走的是第二区间 nowTime-5s<jobInfo.getTriggerNextTime()<nowTime
else走的是第三区间nowTime<jobInfo.getTriggerNextTime()<nowTime+5s的
第四区间jobInfo.getTriggerNextTime()>nowTime+5s由于在查询sql时就控制了,所有不存在这个区间
3.1.6.3. ringThread线程逻辑
将ringData中数据读取出来,并调度任务。
3.1.6.4. 流程图
3.2. 执行器的注册及回调操作
首先找到xxl-job-executor-sample-springboot包将其展开,找到其核心配置类XxlJobConfig:
在此配置类中主要对XxlJobSpringExecutor 类型的Bean进行了配置:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
进入XxlJobSpringExecutor 类后看到其实现了SmartInitializingSingleton接口,SmartInitializingSingleton接口的作用是在所有单例bean实例化完成之后进行回调操作。该接口只有一个方法afterSingletonsInstantiated(),在该方法中可以执行一些需要在所有单例bean实例化完成后进行的初始化操作。
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
以上代码一共做了三件事:
1. initJobHandlerMethodRepository(applicationContext)
此方法是搜索所有使用@XxlJob的方法,将此信息封装成MethodJobHandler放入JobHandlerRespository(ConcurrentMap)
2. GlueFactory.refreshInstance(1)
参数0表示初始化类GlueFactory
参数1表示初始化类SpringGlueFactory(SpringGlueFactory extends GlueFactory)
3. super.start()-执行器和调度中心交互核心
XxlJobFileAppender.initLogPath(logPath)和 initAdminBizList(adminAddresses, accessToke方法较为简单,其方法所做的工作我直接写在注释里面了。
public void start() throws Exception {
// init logpath 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
//初始化类-AdminBizClient, 并放入adminBizList集合中
//adminAddresses 调度中心地址
//accessToken调度中心token
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
//初始化一个线程,用来清理过期的本地日志目录文件
//清理条件logRetentionDays>3
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
3.2.1. TriggerCallbackThread.getInstance().start()
在start()方法中初始化了两个线程triggerCallbackThread(负责任务完成后的回调)和triggerRetryCallbackThread(负责任务失败后重试)。
triggerCallbackThread:
所有需要回调的任务都放在callBackQueue队列中,并由AdminBizClient像调度中心发送https请求,将数据传输给调度中心。
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
上述调用路径对应着调度中心JobApiController中的回调接口:
triggerRetryCallbackThread:triggerRetryCallbackThread失败重试也是由AdminBizClient像调度中心发送https请求,将数据传输给调度中心。
由triggerCallbackThread和triggerRetryCallbackThread的回调操作可以,具体回调方法是由调度中心执行,执行器只负责发送回调请求。
3.2.2. initEmbedServer(address, ip, port, appname, accessToken)
此方法就是初始化嵌入在客户端的服务器。ip和port都可以自己指定,如果ip没有指定则获取本机ip。adress就是注册在调度中心的地址,供调度中心发送数据给执行器。
具体流图图如下:
3.3. 调度中心如何调度任务
找到com.xxl.job.core.biz包下的ExecutorBiz,该接口的实现类ExecutorBizImpl中的run方法的作用为任务调度。其调度流程图如下:
4.结语
本文针对xxl-job2.4.0版本的源码进行了梳理,梳理和解读内容包括调度中心的初始化操作,执行器的注册操作及任务回调操作及任务调度流程,针对本文的内容如您有更好观点,欢迎在评论区留言探讨~~
5. 参考链接
更多推荐
所有评论(0)