一、xxl-job 2.1.2版本调度器启动流程
大家好,我是瓜哥,最近公司由于新接手了一个大型项目,使用到了批量调度任务的需求,通过各种调度开源组件的选型,最终打算使用xxl-job来进行任务调度,本次主要对2.1.2源码进行流程梳理。由于当前项目调度任务需求的特殊要求,需要对xxl-job源码进行二次修改和开发完善。本人将官方源码下载下来后做了相关的流程梳理,下面主要对调度器启动流程做一个完整的总结。
Xxl-job笔记整理
1、xxl-job-admin在启动时spring会扫描到XxlJobAdminConfig类,该类的主要代码如下:
由于实现了spring的InitializingBean和DisposableBean两个接口。InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。
@Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null; public static XxlJobAdminConfig getAdminConfig() { return adminConfig; } // ---------------------- XxlJobScheduler ---------------------- private XxlJobScheduler xxlJobScheduler; @Override public void afterPropertiesSet() throws Exception { System.out.println("开始执行afterPropertiesSet"); adminConfig = this; xxlJobScheduler = new XxlJobScheduler(); System.out.println("开始执行xxlJobScheduler.init()"); xxlJobScheduler.init(); }
@Override public void destroy() throws Exception { xxlJobScheduler.destroy(); } // ---------------------- XxlJobScheduler ----------------------
// conf @Value("${xxl.job.i18n}") private String i18n;
@Value("${xxl.job.accessToken}") private String accessToken;
@Value("${spring.mail.username}") private String emailUserName;
@Value("${xxl.job.triggerpool.fast.max}") private int triggerPoolFastMax; @Value("${xxl.job.triggerpool.slow.max}") private int triggerPoolSlowMax; @Value("${xxl.job.logretentiondays}") private int logretentiondays; // dao, service @Resource private XxlJobLogDao xxlJobLogDao; @Resource private XxlJobInfoDao xxlJobInfoDao; @Resource private XxlJobRegistryDao xxlJobRegistryDao; @Resource private XxlJobGroupDao xxlJobGroupDao; @Resource private XxlJobLogReportDao xxlJobLogReportDao; @Resource private JavaMailSender mailSender; @Resource private DataSource dataSource;
|
2、然后在afterPropertiesSet()中调用xxlJobScheduler.init()方法,下面是init方法的代码;
public class XxlJobScheduler { private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class); public void init() throws Exception { // init i18n initI18n(); // admin registry monitor run JobRegistryMonitorHelper.getInstance().start(); // admin monitor run JobFailMonitorHelper.getInstance().start();
// admin trigger pool start JobTriggerPoolHelper.toStart();
// admin log report start JobLogReportHelper.getInstance().start();
// start-schedule // JobScheduleHelper调度器,死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的 JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success."); }
public void destroy() throws Exception {
// stop-schedule JobScheduleHelper.getInstance().toStop(); // admin log report stop JobLogReportHelper.getInstance().toStop(); // admin trigger pool stop JobTriggerPoolHelper.toStop(); // admin monitor stop JobFailMonitorHelper.getInstance().toStop(); // admin registry stop JobRegistryMonitorHelper.getInstance().toStop(); } // ---------------------- I18n ---------------------- private void initI18n(){ for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) { item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name()))); } } // ---------------------- executor-client ---------------------- private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>(); public static ExecutorBiz getExecutorBiz(String address) throws Exception { // valid if (address==null || address.trim().length()==0) { return null; }
// load-cache address = address.trim(); ExecutorBiz executorBiz = executorBizRepository.get(address); if (executorBiz != null) { return executorBiz; }
// set-cache XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(); referenceBean.setClient(NettyHttpClient.class); referenceBean.setSerializer(HessianSerializer.class); referenceBean.setCallType(CallType.SYNC); referenceBean.setLoadBalance(LoadBalance.ROUND); referenceBean.setIface(ExecutorBiz.class); referenceBean.setVersion(null); referenceBean.setTimeout(3000); referenceBean.setAddress(address); referenceBean.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken()); referenceBean.setInvokeCallback(null); referenceBean.setInvokerFactory(null);
executorBiz = (ExecutorBiz) referenceBean.getObject();
executorBizRepository.put(address, executorBiz); return executorBiz; } } |
A、JobRegistryMonitorHelper.getInstance().start();会启动注册线程,该线程start方法主要作用是开通了一个守护线程,每隔30s扫描一次执行器的注册信息表,剔除90s内没有进行健康检查的执行器信息,将自动注册类型的执行器注册信息(XxlJobRegistry)经过处理更新执行器信息(XxlJobGroup)。具体代码如下:
public class JobRegistryMonitorHelper { private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class); private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper(); public static JobRegistryMonitorHelper getInstance(){ return instance; } private Thread registryThread; private volatile boolean toStop = false; public void start(){ registryThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { // auto registry group //获取类型为自动注册的执行器(XxlJobGroup)地址列表 List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor) //删除90秒内没有更新的注册机器信息,90秒没有心跳信息返回代表机器已经出现问题,所以移除 List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); }
// fresh online address (admin/executor) 查询90秒内有更新的注册机器信息列表 HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>(); List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) {//遍历注册信息列表,得到自动注册类型的执行器与其对应的地址信息关系Map for (XxlJobRegistry item: list) { if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appName = item.getRegistryKey(); List<String> registryList = appAddressMap.get(appName); if (registryList == null) { registryList = new ArrayList<String>(); }
if (!registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } //收集执行器信息,根据执行器appName做区分:appAddressMap的key为appName,value为此执行器的注册地址列表(集群环境下会有多个注册地址) appAddressMap.put(appName, registryList); } } }
// fresh group address for (XxlJobGroup group: groupList) { List<String> registryList = appAddressMap.get(group.getAppName()); String addressListStr = null; if (registryList!=null && !registryList.isEmpty()) { Collections.sort(registryList); addressListStr = ""; for (String item:registryList) { addressListStr += item + ","; } addressListStr = addressListStr.substring(0, addressListStr.length()-1); } group.setAddressList(addressListStr); XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } try {//线程停顿30秒 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } } logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); } }); //将此线程设置成守护线程 registryThread.setDaemon(true); registryThread.setName("xxl-job, admin JobRegistryMonitorHelper"); registryThread.start(); }
public void toStop(){ toStop = true; // interrupt and wait registryThread.interrupt(); try { registryThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } }
} |
所以总结上面代码最终执行的作用就是:
- 删除无效注册地址,服务自动下线
- 刷新有效注册地址,服务自动上线
- 刷新执行器里的注册地址,这样在远程调用 job 的时候就可以调用到最新的有效的服务
B、JobFailMonitorHelper.getInstance().start();该线程这里主要作用是开通了一个守护线程,每隔10s扫描一次失败日志,如果任务失败可重试次数>0,那么重新触发任务,如果任务执行失败,会进行告警,默认采用邮件形式进行告警。核心代码如下:
public class JobFailMonitorHelper { private static Logger logger = LoggerFactory.getLogger(JobFailMonitorHelper.class); private static JobFailMonitorHelper instance = new JobFailMonitorHelper(); public static JobFailMonitorHelper getInstance(){ return instance; } // ---------------------- monitor ---------------------- private Thread monitorThread; private volatile boolean toStop = false; public void start(){ //创建一个监控线程 monitorThread = new Thread(new Runnable() { @Override public void run() {
// monitor while (!toStop) { try { //从数据库中取出所有执行失败且告警状态为0(默认)的日志ID,虽然这里传了pageSize为1000,实际没有进行分页,取出来的是所有符合条件的失败日志 List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000); if (failLogIds!=null && !failLogIds.isEmpty()) { for (long failLogId: failLogIds) { //将日志的告警状态更新为-1 //告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败 // lock log int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; } //取出失败日志完整信息 XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); //获取失败日志对应的任务信息 XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor // 如果剩余失败可重试次数>0(注意:日志里存的失败重试次数实为剩余可重复次数) if (log.getExecutorFailRetryCount() > 0) { //触发任务执行 JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam()); String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); }
// 2、fail alarm monitor int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) { boolean alarmResult = true; try { //发送失败告警:默认是邮件通知 alarmResult = failAlarm(info, log); } catch (Exception e) { alarmResult = false; logger.error(e.getMessage(), e); } newAlarmStatus = alarmResult?2:3; } else { newAlarmStatus = 1; } //更新告警状态 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); } }
} catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e); } }
try { TimeUnit.SECONDS.sleep(10); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } }
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
} }); //设置为守护线程 monitorThread.setDaemon(true); monitorThread.setName("xxl-job, admin JobFailMonitorHelper"); monitorThread.start(); }
|
obFailMonitorHelper 的作用是开启一个 Daemon 线程主要管理异常调用的任务。如果任务调用失败,它会判断这个任务是否配置了失败重试(默认不重试)。如果配置重试次数大于0就会执行失败重试,如果一直失败最大重试次数就是你配置的重试次数。
另外一个作用就是如果你这个任务里面配置了报警邮件信息。这个失败的任务就是发送任务失败的相关信息到配置的邮箱。
C、JobTriggerPoolHelper.toStart();该类中区别快线程池 和慢线程池,1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,如下面代码:
public class JobTriggerPoolHelper { private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); // ---------------------- trigger pool ---------------------- // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start(){ logger.info("-----开始执行JobTriggerPoolHelper-------start()"); logger.info("-----开始执行JobTriggerPoolHelper-------fastTriggerPool new ThreadPoolExecutor()"); fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); logger.info("-----开始执行JobTriggerPoolHelper-------slowTriggerPool new ThreadPoolExecutor()"); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); } |
JobTriggerPoolHelper它的作用就是初始化快慢两个线程池,当任务开始执行的时候使用快线程池。然后当任务执行失败次数达到阈值的时候就会使用慢线程池来执行任务。使执行任务的线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性。
D、JobLogReportHelper.getInstance().start();主要是报表相关的功能,保存任务执行的日志。并且生成日结报表,并且清理过期日志。支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等。代码如下:
public class JobLogReportHelper { private static Logger logger = LoggerFactory.getLogger(JobLogReportHelper.class); private static JobLogReportHelper instance = new JobLogReportHelper(); public static JobLogReportHelper getInstance(){ return instance; } private Thread logrThread; private volatile boolean toStop = false; public void start(){ logrThread = new Thread(new Runnable() { @Override public void run() { // last clean log time long lastCleanLogTime = 0; while (!toStop) {
// 1、log-report refresh: refresh log report in 3 days try {
for (int i = 0; i < 3; i++) {
// today Calendar itemDay = Calendar.getInstance(); itemDay.add(Calendar.DAY_OF_MONTH, -i); itemDay.set(Calendar.HOUR_OF_DAY, 0); itemDay.set(Calendar.MINUTE, 0); itemDay.set(Calendar.SECOND, 0); itemDay.set(Calendar.MILLISECOND, 0);
Date todayFrom = itemDay.getTime();
itemDay.set(Calendar.HOUR_OF_DAY, 23); itemDay.set(Calendar.MINUTE, 59); itemDay.set(Calendar.SECOND, 59); itemDay.set(Calendar.MILLISECOND, 999);
Date todayTo = itemDay.getTime();
// refresh log-report every minute XxlJobLogReport xxlJobLogReport = new XxlJobLogReport(); xxlJobLogReport.setTriggerDay(todayFrom); xxlJobLogReport.setRunningCount(0); xxlJobLogReport.setSucCount(0); xxlJobLogReport.setFailCount(0);
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo); if (triggerCountMap!=null && triggerCountMap.size()>0) { int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0; int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0; int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0; int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning); xxlJobLogReport.setSucCount(triggerDayCountSuc); xxlJobLogReport.setFailCount(triggerDayCountFail); }
// do refresh int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport); if (ret < 1) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport); } }
} catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e); } }
// 2、log-clean: switch open & once each day if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0 && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
// expire-time Calendar expiredDay = Calendar.getInstance(); expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays()); expiredDay.set(Calendar.HOUR_OF_DAY, 0); expiredDay.set(Calendar.MINUTE, 0); expiredDay.set(Calendar.SECOND, 0); expiredDay.set(Calendar.MILLISECOND, 0); Date clearBeforeTime = expiredDay.getTime();
// clean expired log List<Long> logIds = null; do { logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000); if (logIds!=null && logIds.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds); } } while (logIds!=null && logIds.size()>0);
// update clean time lastCleanLogTime = System.currentTimeMillis(); }
try { TimeUnit.MINUTES.sleep(1); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } }
}
logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");
} }); logrThread.setDaemon(true); logrThread.setName("xxl-job, admin JobLogReportHelper"); logrThread.start(); }
public void toStop(){ toStop = true; // interrupt and wait logrThread.interrupt(); try { logrThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } }
|
E、JobScheduleHelper.getInstance().start();JobScheduleHelper调度器,死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的。代码如下:
//这个类就是死循环从xxl_job_info表中取出未来5秒内要执行的任务,进行调度分发。 //我们可以发现调度中心由两个线程完成,第一个线程不停的取最近10s钟待开始的任务, //把任务放入时间轮中,第二个线程从时间轮中获取需要开始的任务,开始执行任务。 public class JobScheduleHelper { private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
private static JobScheduleHelper instance = new JobScheduleHelper(); public static JobScheduleHelper getInstance(){ return instance; }
public static final long PRE_READ_MS = 5000; // pre read
private Thread scheduleThread; private Thread ringThread; private volatile boolean scheduleThreadToStop = false; private volatile boolean ringThreadToStop = false; private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
public void start(){ logger.info("JobScheduleHelper start开始执行---------!"); // schedule thread scheduleThread = new Thread(new Runnable() { @Override public void run() { logger.info("JobScheduleHelper 开始执行线程---------!"); try { //针对多个节点,防止调度器出现并发调度一个任务,调度器执行频率控制,最长睡眠5秒,最短4秒多点,即(5000-999)毫秒 //如A节点的调度器刚启动,并且获取一个任务,然后加锁,如果同时B节点也启动,也获取到这个任务,防止重复调用,随机睡眠 TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info("------------- init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; logger.info("---------------- scheduleThreadToStop="+scheduleThreadToStop); while (!scheduleThreadToStop) {
// Scan Job long start = System.currentTimeMillis();
Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null;
boolean preReadSuc = true; try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); //锁定资源 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute();
// tx start
// 1、pre read long nowTime = System.currentTimeMillis(); //获取5秒内状态为正常运行的任务 List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) {
//这里的触发时间处理,用一个时间横轴就比较好理解 //当前时间已经超过触发时间+5秒,不调度,直接计算下次调度时间,对应时间段A // time-ring jump if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// fresh next 刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); //当前时间已超过调度时间,但超过时间在5秒内,直接触发调度,并且更新下次调度时间,即触发时间在当前时间前5秒内 //对应时间段B } else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time //将任务放到触发线程池中 // 1、trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next refreshNextValidTime(jobInfo, new Date()); //如果下次触发时间在当前时间之后的5秒内,并且将这个时间段的任务单独放在ringThread线程中处理,即触发时间在当前时间的后5秒内 //特别处理当前时间之后的5秒内,是因为本循环最长5秒循环一次,防止有漏掉的定时任务,对应时间段D //注:scheduleThread和ringThread两个线程的执行评率不一样 // next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second //此处计算出的ringSecond的值范围是0-59 //对触发时间秒数进行60取模,跟进pushTimeRing方法 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); //将job放入ringThread线程 // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); }
} else { preReadSuc = false; }
// tx stop
} catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally {
// commit if (conn != null) { try { conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.setAutoCommit(connAutoCommit); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } }
// close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } long cost = System.currentTimeMillis()-start;
//preReadSuc 有5秒内正常运行的任务,则睡眠一秒以内,没有则睡眠5-(0至999)秒 // pre-read period: success > scan each second; fail > skip this period; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } }
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); } }); scheduleThread.setDaemon(true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start();
//此线程是处理当前时间以后在每秒时是否有定时任务,有则直接启动, // ring thread ringThread = new Thread(new Runnable() { @Override public void run() {
// align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } }
while (!ringThreadToStop) {
try { // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } }
// ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } }
// next second, align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); }
private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException { Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(fromTime); if (nextValidTime != null) { jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime()); jobInfo.setTriggerNextTime(nextValidTime.getTime()); } else { jobInfo.setTriggerStatus(0); jobInfo.setTriggerLastTime(0); jobInfo.setTriggerNextTime(0); } }
private void pushTimeRing(int ringSecond, int jobId){ // push async ring List<Integer> ringItemData = ringData.get(ringSecond); if (ringItemData == null) { ringItemData = new ArrayList<Integer>(); ringData.put(ringSecond, ringItemData); } ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) ); }
public void toStop(){
// 1、stop schedule scheduleThreadToStop = true; try { TimeUnit.SECONDS.sleep(1); // wait } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (scheduleThread.getState() != Thread.State.TERMINATED){ // interrupt and wait scheduleThread.interrupt(); try { scheduleThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } }
// if has ring data boolean hasRingData = false; if (!ringData.isEmpty()) { for (int second : ringData.keySet()) { List<Integer> tmpData = ringData.get(second); if (tmpData!=null && tmpData.size()>0) { hasRingData = true; break; } } } if (hasRingData) { try { TimeUnit.SECONDS.sleep(8); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } }
// stop ring (wait job-in-memory stop) ringThreadToStop = true; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } if (ringThread.getState() != Thread.State.TERMINATED){ // interrupt and wait ringThread.interrupt(); try { ringThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } }
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop"); }
}
|
更多推荐
所有评论(0)