本文将基于xxl-job2.4.0版本源码进行代码的梳理

作者:后端小肥肠

目录

1.  前言

2. xxl-job概述

2.1. xxl-job 架构

2.2. xxl-job表结构及接口信息

3. 源码解读(核心!!)

3.1. 调度中心的初始化操作

3.1.1. 初始化线程池

3.1.2. 调度器注册监听

3.1.3. 失败任务线程监听

3.1.4. 任务完成线程监听

3.1.5. 日志线程生成今日任务报告并清除过期日志数据库

3.1.6. 调度任务线程

3.1.6.1 scheduleThread线程逻辑

3.1.6.2. 区间说明

3.1.6.3. ringThread线程逻辑

3.1.6.4. 流程图

3.2. 执行器的注册及回调操作

3.2.1. TriggerCallbackThread.getInstance().start()

3.2.2. initEmbedServer(address, ip, port, appname, accessToken)

3.3.  调度中心如何调度任务

4.结语

5. 参考链接


1.  前言

        在企业级应用中,随着业务的复杂化和规模的扩大,任务调度成为提高效率的关键。传统任务调度工具往往面临着单点故障、难以管理的挑战。为了应对这些问题,xxl-job应运而生。作为一款分布式任务调度平台,xxl-job旨在解决企业任务调度的痛点,提供高可用性、易用性的解决方案。

        xxl-job主要应用于企业信息系统中的定时任务和任务调度管理,其源码地址为:xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。主要特点如下:

  1. 分布式任务调度: 支持在多台服务器上进行任务的分布式调度,实现了任务的高可用和负载均衡。

  2. 动态扩展和管理: 执行器节点可以动态注册和注销,无需重启应用,实现了系统的动态扩缩和灵活管理。

  3. 易用的任务配置界面: 提供直观、简便的任务配置界面,用户可以轻松配置任务触发规则、执行器选择等。

  4. 丰富的任务监控和日志查看: 提供全面的任务监控功能,用户可以实时查看任务的执行状态和日志,方便问题追踪和排查。

  5. 分片广播任务: 支持将一个任务拆分为多个子任务并行执行,提高了任务的执行效率。

2. xxl-job概述

2.1. xxl-job 架构

       xxl-job作为一款分布式任务调度平台,其架构核心涵盖了调度中心、执行器和注册中心,为任务管理和执行提供了强大的支持,其架构如下图所示:

 调度中心(Admin)

        调度中心充当任务的指挥中心,负责任务的管理、调度和监控。通过可视化的任务配置界面,用户可以轻松配置任务的触发规则和执行器选择。调度中心还承担着任务的日志记录和监控职责,帮助用户实时了解任务执行状态。

执行器(Executor)

        执行器是实际执行任务的节点,接收调度中心分发的任务并执行。执行器通过注册到注册中心,使调度中心能够动态发现和管理执行器节点。这种设计实现了任务的分布式执行,多个执行器可以并行地执行任务,提高了系统的执行效率。

2.2. xxl-job表结构及接口信息

xxl-job(2.4.0)的表结构如下:

表名中文名表内容
xxl_job_lock任务调度锁表
xxl_job_group执行器信息表维护任务执行器信息
xxl_job_info调度扩展信息表用于保存 XXL-JOB 调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等
xxl_job_log调度日志表用于保存 XXL-JOB 任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等
xxxl_job_log_report调度日志报表用于存储 XXL-JOB 任务调度日志的报表,调度中心报表功能页面会用到
xxl_job_logglue任务GLUE日志用于保存 GLUE 更新历史,用于支持 GLUE 的版本回溯功能
xxl_job_registry执行器注册表维护在线的执行器和调度中心机器地址信息
xxl_job_user系统用户表调度中心用户信息

xxl-job(2.4.0)的接口信息结构如下:

接口归类接口名称接口地址
任务管理接口跳转至任务管理界面接口/jobinfo
新增任务接口/jobinfo/add
更新任务接口/jobinfo/update
删除任务接口/jobinfo/remove
触发任务接口/jobinfo/start
停止任务接口/jobinfo/stop
查询任务接口/jobinfo/pageList
/jobinfo/trigger
/jobinfo/nextTriggerTime
执行器管理接口跳转至执行器管理界面接口/jobgroup
新增执行器接口/jobgroup/save
更新执行器接口/jobgroup/update
删除执行器接口/jobgroup/remove
查询执行器接口/jobgroup/pageList
查看执行器详情接口/jobgroup/loadById
日志管理接口查询任务日志接口/joblog/pageList
/joblog/getJobsByGroup
/joblog/logDetailPage
/joblog/logDetailCat
/joblog/logKill
/joblog/clearLog
跳转至日志界面接口/joblog
用户管理接口跳转至用户管理页面接口/user
分页查询用户列表接口/user/pageList
新增用户接口/user/add
更新用户接口/user/update
删除用户接口/remove
更新密码接口/updatePwd

JobApiController :对外开放api,提供调度完成,执行器注册或移除的回调操作

JobCodeController:保存glue模式的源代码接口

        根据表和接口信息,大致可以看出接口和表的对应关系,本文的重点不在这块,大家稍作了解即可。

3. 源码解读(核心!!)

3.1. 调度中心的初始化操作

首先找到xxl-job-admin包下的XxlJobAdminConfig,如下图所示:

       打开 XxlJobAdminConfig的类图:

        从XxlJobAdminConfig的类图中可以看出其实现了InitializingBean。InitializingBean 是 Spring 框架中的一个接口,它定义了一个单一的方法 afterPropertiesSet(),用于在 bean 的所有属性被设置后执行一些初始化操作。当一个 bean 实现了 InitializingBean 接口时,在 Spring 容器实例化该 bean 并设置其所有属性后,容器会调用 afterPropertiesSet() 方法,以便让 bean 在完成基本属性设置后执行自定义的初始化逻辑。

        那我们直接来看一下XxlJobAdminConfig的afterPropertiesSet()方法:

上述代码一共做了三件事:

  1.  初始一个单例对象(XxlJobAdminConfig
  2. 初始化xxljob调度器
  3. 调度器init()操作

继续往下追init()函数:

上述代码一共做了7件事:

1. 国际化

initI18n();

2. 初始化线程池

JobTriggerPoolHelper.toStart();

3. 调度器注册监听

JobRegistryHelper.getInstance().start();

4. 失败任务线程监听

JobFailMonitorHelper.getInstance().start();

5. 任务完成现场监听·

JobCompleteHelper.getInstance().start();

6.日志线程生成今日任务报告并清除过期日志数据库

JobLogReportHelper.getInstance().start();

7.调度任务线程

JobScheduleHelper.getInstance().start();

除了国际化以外我们一步一步往下看一下其中的代码内容。

3.1.1. 初始化线程池
JobTriggerPoolHelper.toStart();

上述代码中初始花了两个线程池,一个是fastTriggerPool,一个是slowTriggerPool。

3.1.2. 调度器注册监听
JobRegistryHelper.getInstance().start();

        在start()方法中做了两件事,分别是初始化线程池(registryOrRemoveThreadPool)和启动一个守护线程(registryMonitorThread)。

         守护线程的作用在于监听并刷新xxl_job_group中的数据。

		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// 1. 从xxl_job_group中找adress_type=0的数据,即自动注册的调度器
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// 2. 从xxl_job_registry中根据update_time寻找超过心跳时间3倍(90s)的调度器,并将他们从表中移除
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// 3. 定义局部变量appAddressMap
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							//4. 获取xxl_job_registry中的有效数据,registry_key就是appname,将所有EXECUTOR类型的数据放在Map<appname,List>appAddressMap中,让所有同名调度器在一个集合中。
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// 5. 将appAddressMap中的地址按照xxl_job_group中的app_name获取,把地址拼接,逗号分割后保存在xxl_job_group.adress_list中,这样就保存了多个调度器地址
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								group.setAddressList(addressListStr);
								group.setUpdateTime(new Date());

								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});
3.1.3. 失败任务线程监听
JobFailMonitorHelper.getInstance().start();

        在start()方法中,初始化了监听任务执行失败的线程,将其设置为守护线程并启动。

	public void start(){
		monitorThread = new Thread(new Runnable() {

			@Override
			public void run() {

				// monitor
				while (!toStop) {
					try {

						//1.获取xxl_job_log中执行失败的任务
						List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
						if (failLogIds!=null && !failLogIds.isEmpty()) {
							for (long failLogId: failLogIds) {

								// lock log
								//2. 更新xxl_job.alarm.status=-1
								int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
								if (lockRet < 1) {
									continue;
								}
								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
								XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

								// fail retry monitor
								//3. 获取xxl_job_info中任务信息,如果xxl_jon_log.executor_fail_retry_count>0则重试,然后executor_fail_retry_count-1更新到表中
								if (log.getExecutorFailRetryCount() > 0) {
									JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
									String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
									log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
									XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
								}

								// fail alarm monitor
								//4. 失败任务发送告警邮件
								int newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
								if (info != null) {
									boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
									newAlarmStatus = alarmResult?2:3;
								} else {
									newAlarmStatus = 1;
								}
                                //5. 更新xxl_job_log中alarm_status
								XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
							}
						}

					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
						}
					}

                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

				logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

			}
		});
		monitorThread.setDaemon(true);
		monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
		monitorThread.start();
	}
3.1.4. 任务完成线程监听
JobCompleteHelper.getInstance().start();

        在start()方法中做了两件事,分别注册一个callbacl线程池和初始化监听线程。

        初始化监听线程,设置为守护线程并启动。

	monitorThread = new Thread(new Runnable() {

			@Override
			public void run() {

				// wait for JobTriggerPoolHelper-init
				try {
					TimeUnit.MILLISECONDS.sleep(50);
				} catch (InterruptedException e) {
					if (!toStop) {
						logger.error(e.getMessage(), e);
					}
				}

				// monitor
				while (!toStop) {
					try {
						// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
						Date losedTime = DateUtil.addMinutes(new Date(), -10);
						List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

						if (losedJobIds!=null && losedJobIds.size()>0) {
							for (Long logId: losedJobIds) {

								XxlJobLog jobLog = new XxlJobLog();
								jobLog.setId(logId);

								jobLog.setHandleTime(new Date());
								jobLog.setHandleCode(ReturnT.FAIL_CODE);
								jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

								XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
							}

						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
						}
					}

                    try {
                        TimeUnit.SECONDS.sleep(60);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

				logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

			}
		});
		monitorThread.setDaemon(true);
		monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
		monitorThread.start();

3.1.5. 日志线程生成今日任务报告并清除过期日志数据库
JobLogReportHelper.getInstance().start();

        在start()方法中初始化了日志线程,设置为守护线程后启动生成今日任务报告,并删除过期日志。

    public void start(){
        logrThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // last clean log time
                long lastCleanLogTime = 0;


                while (!toStop) {

                    // 1、log-report refresh: refresh log report in 3 days
                    try {

                        for (int i = 0; i < 3; i++) {

                            // today
                            Calendar itemDay = Calendar.getInstance();
                            itemDay.add(Calendar.DAY_OF_MONTH, -i);
                            itemDay.set(Calendar.HOUR_OF_DAY, 0);
                            itemDay.set(Calendar.MINUTE, 0);
                            itemDay.set(Calendar.SECOND, 0);
                            itemDay.set(Calendar.MILLISECOND, 0);

                            Date todayFrom = itemDay.getTime();

                            itemDay.set(Calendar.HOUR_OF_DAY, 23);
                            itemDay.set(Calendar.MINUTE, 59);
                            itemDay.set(Calendar.SECOND, 59);
                            itemDay.set(Calendar.MILLISECOND, 999);

                            Date todayTo = itemDay.getTime();

                            // refresh log-report every minute
                            XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                            xxlJobLogReport.setTriggerDay(todayFrom);
                            xxlJobLogReport.setRunningCount(0);
                            xxlJobLogReport.setSucCount(0);
                            xxlJobLogReport.setFailCount(0);
                            //查询xxl_job表,获取今天任务总数,今日任务在运行数,今日任务运行成功数,失败姝=总数-运转数-成功数
                            Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                            if (triggerCountMap!=null && triggerCountMap.size()>0) {
                                int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
                                int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
                                int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
                                int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

                                xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                                xxlJobLogReport.setSucCount(triggerDayCountSuc);
                                xxlJobLogReport.setFailCount(triggerDayCountFail);
                            }

                            // do refresh
                            //更新xxl_job_report表,如果今日数据不存在则插入,否则更新
                            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                            if (ret < 1) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                            }
                        }

                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                        }
                    }

                    // 2、log-clean: switch open & once each day
                    if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
                            && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

                        // expire-time
                        Calendar expiredDay = Calendar.getInstance();
                        expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                        expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                        expiredDay.set(Calendar.MINUTE, 0);
                        expiredDay.set(Calendar.SECOND, 0);
                        expiredDay.set(Calendar.MILLISECOND, 0);
                        Date clearBeforeTime = expiredDay.getTime();

                        // clean expired log
                        List<Long> logIds = null;
                        do {
                            logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                            if (logIds!=null && logIds.size()>0) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                            }
                        } while (logIds!=null && logIds.size()>0);

                        // update clean time
                        lastCleanLogTime = System.currentTimeMillis();
                    }

                    try {
                        TimeUnit.MINUTES.sleep(1);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");

            }
        });
        logrThread.setDaemon(true);
        logrThread.setName("xxl-job, admin JobLogReportHelper");
        logrThread.start();
    }
3.1.6. 调度任务线程
JobScheduleHelper.getInstance().start();

        在star()t方法中初始化了ringThread和scheduleThread。

3.1.6.1 scheduleThread线程逻辑

        1. 查询并锁表xxl_job_lock;

        2. 获取xxl_job_info中trigger_status=1(可调度)的数据,trigger_next_time小于当前时间5秒后的任务信息:

        查询的sql是一个limit条件,limit的数据为

(XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20
        getTriggerPoolFastMax()最小返回200,getTriggerPoolSlowMax()最小返回100。

        3. 这里分几种情况

          3.1 nowTime>trigger_next_time+5s,这种时间需要看任务调度策略,是FIRE_ONCE_NOW的话就要立刻执行一次然后更新;

                   3.2 第二种是nowTime>trigger_next_time,这种情况就是正常的cron调度,调度完毕后更新trigger_next_time;

                        3.2.1 任务执行时间超过5秒。同时任务下次调度时间又在5秒内,这时就把任务交给ringThread线程调度,更新trigger_next_time。

             3.3 else分支放入ringData中,交给ringThread线程调度,这时因为查询是查询nowTime+5s的数据,有一种任务nowTime+5s>trigger_next_time>nowTime。

        4. 更新xxl_job_info数据。

3.1.6.2. 区间说明

        上述的if-else-if 三个分支其实对应三种区间。

上图中三个分割线,分割成了4个区间:

if走的是第一区间jobInfo.getTriggerNextTime()<nowTime-5s

else if走的是第二区间 nowTime-5s<jobInfo.getTriggerNextTime()<nowTime

else走的是第三区间nowTime<jobInfo.getTriggerNextTime()<nowTime+5s的

第四区间jobInfo.getTriggerNextTime()>nowTime+5s由于在查询sql时就控制了,所有不存在这个区间

3.1.6.3. ringThread线程逻辑

         将ringData中数据读取出来,并调度任务。

3.1.6.4. 流程图

3.2. 执行器的注册及回调操作

首先找到xxl-job-executor-sample-springboot包将其展开,找到其核心配置类XxlJobConfig

在此配置类中主要对XxlJobSpringExecutor 类型的Bean进行了配置:

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

        进入XxlJobSpringExecutor 类后看到其实现了SmartInitializingSingleton接口,SmartInitializingSingleton接口的作用是在所有单例bean实例化完成之后进行回调操作。该接口只有一个方法afterSingletonsInstantiated(),在该方法中可以执行一些需要在所有单例bean实例化完成后进行的初始化操作

    @Override
    public void afterSingletonsInstantiated() {

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/

        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

以上代码一共做了三件事:

1. initJobHandlerMethodRepository(applicationContext)

        此方法是搜索所有使用@XxlJob的方法,将此信息封装成MethodJobHandler放入JobHandlerRespository(ConcurrentMap)

2. GlueFactory.refreshInstance(1)

参数0表示初始化类GlueFactory

参数1表示初始化类SpringGlueFactory(SpringGlueFactory extends GlueFactory)

3. super.start()-执行器和调度中心交互核心

         XxlJobFileAppender.initLogPath(logPath)和 initAdminBizList(adminAddresses, accessToke方法较为简单,其方法所做的工作我直接写在注释里面了。

    public void start() throws Exception {

        // init logpath 初始化日志路径
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client
        //初始化类-AdminBizClient, 并放入adminBizList集合中
        //adminAddresses 调度中心地址
        //accessToken调度中心token
        initAdminBizList(adminAddresses, accessToken);


        // init JobLogFileCleanThread
        //初始化一个线程,用来清理过期的本地日志目录文件
        //清理条件logRetentionDays>3
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        initEmbedServer(address, ip, port, appname, accessToken);
    }
3.2.1. TriggerCallbackThread.getInstance().start()

        在start()方法中初始化了两个线程triggerCallbackThread(负责任务完成后的回调)和triggerRetryCallbackThread(负责任务失败后重试)。

        triggerCallbackThread:

        所有需要回调的任务都放在callBackQueue队列中,并由AdminBizClient像调度中心发送https请求,将数据传输给调度中心。

    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

 上述调用路径对应着调度中心JobApiController中的回调接口:

        triggerRetryCallbackThread:triggerRetryCallbackThread失败重试也是由AdminBizClient像调度中心发送https请求,将数据传输给调度中心。

        由triggerCallbackThread和triggerRetryCallbackThread的回调操作可以,具体回调方法是由调度中心执行,执行器只负责发送回调请求。

3.2.2. initEmbedServer(address, ip, port, appname, accessToken)

        此方法就是初始化嵌入在客户端的服务器。ip和port都可以自己指定,如果ip没有指定则获取本机ip。adress就是注册在调度中心的地址,供调度中心发送数据给执行器。

具体流图图如下:

3.3.  调度中心如何调度任务

        找到com.xxl.job.core.biz包下的ExecutorBiz,该接口的实现类ExecutorBizImpl中的run方法的作用为任务调度。其调度流程图如下:

4.结语

        本文针对xxl-job2.4.0版本的源码进行了梳理,梳理和解读内容包括调度中心的初始化操作,执行器的注册操作及任务回调操作及任务调度流程,针对本文的内容如您有更好观点,欢迎在评论区留言探讨~~

5. 参考链接

xxl-job源码解析(看这一篇就够了,超简约且详细)-CSDN博客

XXL-JOB 极简入门_xxl-job-core2.3.1-CSDN博客

GitHub 加速计划 / xx / xxl-job
27.16 K
10.79 K
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:3 个月前 )
e5d26ba2 - 4 个月前
977ad87b - 4 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐