作者:鱼仔

博客首页: https://codeease.top

公众号:Java鱼仔

先跑起来

本文档基于 xxl-job V2.4.0 版本讲解,是写这一篇文章以来最新的一个 release 版本。

1、首先通过 git clone 命令将项目拿到本地

git clone git@github.com:xuxueli/xxl-job.git

2、将分支切换到2.4.0版本

3、在 doc/db 下有个 SQL 文件 tables_xxl_job.sql,在本地的数据库中执行这段 SQL 。

4、调整 xxl-job-admin 中的 application.properties,将 spring.datasource 修改为自己本地的配置。

5、启动 XxlJobAdminApplication ,并在浏览器上输入 http://localhost:8080/xxl-job-admin/ ,如果出现 xxl-job 的登陆界面则表示启动成功,默认账号密码为 admin/123456。

xxl-Job架构设计

项目的目录介绍:

- /doc :文档资料
- /db :“调度数据库”建表脚本
- /xxl-job-admin :管理平台模块,提供任务配置、任务调度管理和执行器管理等功能。
- /xxl-job-core :核心模块,包含任务调度的核心逻辑。
- /xxl-job-executor-samples :执行器,Sample示例项目

8张数据库表:

- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
- xxl_job_user:系统用户表;

设计思想

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

系统组成

调度模块(调度中心):

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;

支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

执行模块(执行器):

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
接收“调度中心”的执行请求、终止请求和日志请求等。

架构图

xxl-job 源码解读

xxl-job 调度器的整体初始化流程

XxlJobAdminConfig

XxlJobAdminConfig 实现了 InitializingBean 接口,会在初始化时执行 afterPropertiesSet 方法。

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

    private static XxlJobAdminConfig adminConfig = null;
    public static XxlJobAdminConfig getAdminConfig() {
        return adminConfig;
    }


    // ---------------------- XxlJobScheduler ----------------------

    private XxlJobScheduler xxlJobScheduler;

    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

    @Override
    public void destroy() throws Exception {
        xxlJobScheduler.destroy();
    }
    ......
}

xxlJobScheduler.init()

init() 中有一系列的初始化方法,每个初始化方法干了什么,我已经写到上面的流程图中了基本的作用

public class XxlJobScheduler  {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);


    public void init() throws Exception {
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        JobCompleteHelper.getInstance().start();

        // admin log report start
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
	......
}

JobTriggerPoolHelper.toStart()

start 方法中定义了一个快线程池和慢线程池,快线程池的最大线程数量最小为200,慢线程池的最大线程数量最小为100。
快慢线程池的设计思想是为了解决任务执行器并发度控制的问题,让任务执行器在任务量较大时能够快速响应,同时在任务量较小时能够节省资源。具体来说,快线程池和慢线程池的作用如下:

快线程池:用于执行任务量较大的任务,线程数较多,执行速度较快,能够快速响应任务。

慢线程池:用于执行任务量较小的任务,线程数较少,执行速度较慢,能够节省资源。

public void start(){
    fastTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(1000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
            }
        });

    slowTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
            }
        });
}

JobRegistryHelper.getInstance().start()

创建一个线程池 registryOrRemoveThreadPool 用于注册或删除任务,创建一个后台守护线程 registryMonitorThread ,使用了一个死循环每隔30秒执行一次,删除超时的注册表信息,更新xxl_job_group执行器地址列表。

public void start(){

  // for registry or remove
  registryOrRemoveThreadPool = new ThreadPoolExecutor(
      2,
      10,
      30L,
      TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(2000),
      new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
          return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
        }
      },
      new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
          r.run();
          logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
        }
      });

  // 创建一个守护线程 registryMonitorThread
  registryMonitorThread = new Thread(new Runnable() {
    @Override
    public void run() {
      while (!toStop) {
        try {
          // 查询执行器地址类型是自动注册的执行器信息表
          List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
          if (groupList!=null && !groupList.isEmpty()) {

            // 查找xxl_job_registry表中超时了90秒的注册信息
            List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
            // 如果超时 ids 集合不为空,则直接删除这些数据
            if (ids!=null && ids.size()>0) {
              XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
            }

            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();

            // 查询所有未过期的注册信息,将注册类型为EXECUTOR的XxlJobRegistry集合改装成appname=>设置触发器的ip地址
            List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
            if (list != null) {
              for (XxlJobRegistry item: list) {
                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                  String appname = item.getRegistryKey();
                  List<String> registryList = appAddressMap.get(appname);
                  if (registryList == null) {
                    registryList = new ArrayList<String>();
                  }

                  if (!registryList.contains(item.getRegistryValue())) {
                    registryList.add(item.getRegistryValue());
                  }
                  appAddressMap.put(appname, registryList);
                }
              }
            }

            // 遍历 XxlJobGroup 列表,将 appname 对应的注册器 IP 地址转化为 IP1,IP2,IP3 的形式,
            // 并更新到xxl_job_group表中
            for (XxlJobGroup group: groupList) {
              List<String> registryList = appAddressMap.get(group.getAppname());
              String addressListStr = null;
              if (registryList!=null && !registryList.isEmpty()) {
                Collections.sort(registryList);
                StringBuilder addressListSB = new StringBuilder();
                for (String item:registryList) {
                  addressListSB.append(item).append(",");
                }
                addressListStr = addressListSB.toString();
                addressListStr = addressListStr.substring(0, addressListStr.length()-1);
              }
              group.setAddressList(addressListStr);
              group.setUpdateTime(new Date());

              XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
            }
          }
        } catch (Exception e) {
          if (!toStop) {
            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
          }
        }
        try {
          // 每 30 秒执行一次
          TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        } catch (InterruptedException e) {
          if (!toStop) {
            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
          }
        }
      }
      logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
    }
  });
  registryMonitorThread.setDaemon(true);
  registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
  registryMonitorThread.start();
}

JobFailMonitorHelper.getInstance().start()

创建一个守护线程 monitorThread ,如果失败任务设置了重试机制,则触发重试流程,设置了告警策略,则会根据告警策略触发告警操作

public void start(){
    monitorThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // monitor
            while (!toStop) {
                try {
                    // 查询前1000条执行器执行失败且告警状态是默认的 xxl_job_log 表的 id 集合
                    List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                    if (failLogIds!=null && !failLogIds.isEmpty()) {
                        for (long failLogId: failLogIds) {

                            // 以乐观锁的方式将日志的告警状态设置为 -1
                            int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                            if (lockRet < 1) {
                                continue;
                            }
                            XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                            XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                            // 1、如果设置的失败重复次数大于0,则再次执行触发器,并更新日志信息
                            if (log.getExecutorFailRetryCount() > 0) {
                                JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                                String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                                log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                            }

                            int newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
                            // 2、如果报警器信息不为空,则触发报警操作,并更新告警状态
                            if (info != null) {
                                boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                                newAlarmStatus = alarmResult?2:3;
                            } else {
                                newAlarmStatus = 1;
                            }

                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                        }
                    }

                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                    }
                }

                try {
                    // 每10ms需要执行一次
                    TimeUnit.SECONDS.sleep(10);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

        }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
    monitorThread.start();
}

JobCompleteHelper.getInstance().start()

创建一个回调线程池 callbackThreadPool 和一个守护线程monitorThread,守护线程负责将调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;

public void start(){

    // for callback
    callbackThreadPool = new ThreadPoolExecutor(
            2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(3000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    r.run();
                    logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
                }
            });


    // for monitor
    monitorThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // wait for JobTriggerPoolHelper-init
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }

            // monitor
            while (!toStop) {
                try {
                    // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                    Date losedTime = DateUtil.addMinutes(new Date(), -10);
                    List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

                    if (losedJobIds!=null && losedJobIds.size()>0) {
                        for (Long logId: losedJobIds) {

                            XxlJobLog jobLog = new XxlJobLog();
                            jobLog.setId(logId);

                            jobLog.setHandleTime(new Date());
                            jobLog.setHandleCode(ReturnT.FAIL_CODE);
                            jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

                            XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                    }
                }

                try {
                    TimeUnit.SECONDS.sleep(60);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

        }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
    monitorThread.start();
}

JobLogReportHelper.getInstance().start()

每分钟刷新一次日志报告,包括当天的运行情况,包括运行次数、成功次数、失败次数等,并将这些信息保存到数据库中。
每天执行一次日志清理,删除指定天数前的日志数据。

public void start(){
    logrThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // 最近一次清理日志的时间
            long lastCleanLogTime = 0;


            while (!toStop) {

                // 1、log-report refresh: refresh log report in 3 days
                try {
                    // 分别统计今天、昨天、前天 0到24点的 总调用次数、成功调用次数、失败调用次数以及正在运行的次数
                    for (int i = 0; i < 3; i++) {

                        // today
                        Calendar itemDay = Calendar.getInstance();
                        itemDay.add(Calendar.DAY_OF_MONTH, -i);
                        itemDay.set(Calendar.HOUR_OF_DAY, 0);
                        itemDay.set(Calendar.MINUTE, 0);
                        itemDay.set(Calendar.SECOND, 0);
                        itemDay.set(Calendar.MILLISECOND, 0);

                        Date todayFrom = itemDay.getTime();

                        itemDay.set(Calendar.HOUR_OF_DAY, 23);
                        itemDay.set(Calendar.MINUTE, 59);
                        itemDay.set(Calendar.SECOND, 59);
                        itemDay.set(Calendar.MILLISECOND, 999);

                        Date todayTo = itemDay.getTime();

                        // refresh log-report every minute
                        XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                        xxlJobLogReport.setTriggerDay(todayFrom);
                        xxlJobLogReport.setRunningCount(0);
                        xxlJobLogReport.setSucCount(0);
                        xxlJobLogReport.setFailCount(0);

                        Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                        if (triggerCountMap!=null && triggerCountMap.size()>0) {
                            int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
                            int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
                            int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
                            int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

                            xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                            xxlJobLogReport.setSucCount(triggerDayCountSuc);
                            xxlJobLogReport.setFailCount(triggerDayCountFail);
                        }

                        // 数据当天数据存在就更新,不存在就新建
                        int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                        if (ret < 1) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                        }
                    }

                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                    }
                }

                // 如果设置了日志保留时间且最近24小时内没有清理过日志,则进入if语句内
                if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
                        && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

                    // 根据日志保留时间计算日志过期的时间
                    Calendar expiredDay = Calendar.getInstance();
                    expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                    expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                    expiredDay.set(Calendar.MINUTE, 0);
                    expiredDay.set(Calendar.SECOND, 0);
                    expiredDay.set(Calendar.MILLISECOND, 0);
                    Date clearBeforeTime = expiredDay.getTime();

                    // 清除过期的日志
                    List<Long> logIds = null;
                    do {
                        logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                        if (logIds!=null && logIds.size()>0) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                        }
                    } while (logIds!=null && logIds.size()>0);

                    // 更新最近一次清除日志的时间,最近24小时不再执行清理操作
                    lastCleanLogTime = System.currentTimeMillis();
                }

                try {
                    // 每分钟执行一次
                    TimeUnit.MINUTES.sleep(1);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

            logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");

        }
    });
    logrThread.setDaemon(true);
    logrThread.setName("xxl-job, admin JobLogReportHelper");
    logrThread.start();
}

JobScheduleHelper.getInstance().start()

在这个类中,定义了两个线程,第一个线程是定时任务扫描处理线程,第二个线程则是一个时间轮线程。

scheduleThread线程中做的事情是将JobInfo中即将要执行的任务取出来(5秒的预读时间),然后根据三种不同的情况分别进行处理:

  1. 如果当前时间大于任务触发时间+5秒,说明这个任务漏触发,根据触发漏发策略决定是否执行任务。
  2. 如果当前时间大于任务的触发时间且小于触发时间+5秒,触发任务,计算下一次任务执行时间,如果下一次任务执行时间在五秒内,则放入时间轮。
  3. 其他情况,任务还没到时间触发,则放入时间轮中。

ringThread线程将时间轮中的任务按秒触发。

   public void start(){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    // 使线程的执行时间对齐到整秒
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
                // 计算预读的任务数,这个数量由参数控制
                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while (!scheduleThreadToStop) {

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {
                        // 获取连接,设置自动提交为False
                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        // 通过for update语句锁住行,如果集群部署情况下同一时间只有一个线程可以执行后续的操作
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        // 将当前时间+5秒要触发的任务全部读取出来
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) {
                                // 如果当前时间大于任务触发时间+5秒,则进入第一个IF,说明任务到期但未触发,判断MisfireStrategy是立刻执行(FIRE_ONCE_NOW)
                                // 还是不做任何事情(DO_NOTHING),如果是FIRE_ONCE_NOW则立刻触发一次
                                // 然后刷新下一次的触发时间
                                // time-ring jump
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 如果当前时间大于任务的触发时间且小于触发时间+5秒,进入这个代码模块中
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 直接触发一次任务
                                    // 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 刷新下一次触发时间
                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                    // 如果下一次触发时间在5秒内,则把任务放入时间轮中
                                    // next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                } else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
                                    // 除了上面两种情况,其他情况都加入到时间轮中
                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }
                            // 更新任务触发情况
                            // 3、update trigger info
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // commit
                        if (conn != null) {
                            try {
                                // 将事务提交
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis()-start;

                    // 如果花费时间小于1s,则等待1s后
                    // Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();

        // 时间轮线程
        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // 每次从时间轮中取出当前秒和前一秒的数据进行处理
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

总结

本节主要针对调度器的初始化进行了源码的解读,这里面包含了大量对线程的应用,以及一个时间轮的设计思路,还是很值得学习的。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐