分布式任务调度平台XXL-JOB--源码解析四:xxl-job-admin调度中心源码解析之job调度过程,调度策略以及rpc通信
admin调度中心的对于job的调度过程, 调度策略以及rpc通信
1.1 job调度过程
RemoteHttpJobBean类实现了QuartzJobBean, 当cron时间片到达时, 就会触发一次quartz调用, 回调executeInternal()方法, 而XxlJobTrigger.trigger(jobId)具体实现细节就是进行了一次网络请求.
// 当cron时间片到时, 就会调用RemoteHttpJobBean的executeInternal触发一次网络请求给执行器
// @DisallowConcurrentExecution
public class RemoteHttpJobBean extends QuartzJobBean {
private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class);
// 1 当quartz cron执行周期到达时, 就回调executeInternal()方法
// 2 当点击"执行"时, 也会触发executeInternal()回调方法
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
// load jobId
JobKey jobKey = context.getTrigger().getJobKey();
// 从xxl-job-qrtz-job-detail表中获取, jobname的值, 即为jobId
Integer jobId = Integer.valueOf(jobKey.getName());
// 调度中心触发, 发起一次rpc网络请求executor执行
// trigger
XxlJobTrigger.trigger(jobId);
}
}
1.2 XxlJobTrigger.trigger()分析
该类的trigger()方法做的事情比较多, 所以代码量相对比较多一点, 咱们就来一行一行解析, 每段代码就做了什么事情.
① XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // 该jobid就是quartz框架触发一次调度时生成的, 这个jobid指的就是具体的哪一个job被执行.所以这行代码根据该jobId从xxl-job-qrtz-trigger-info表中查询本次job执行的一些参数, 这些参数就是在前台页面上新增任务时的一些参数, 如下图:比如路由策略, 运行模式, 阻塞处理策略, cron表达式等, 保存时, 就向该job保存到了xxl-job-qrtz-trigger-info表, 并生成了一个jobId, 当cron时间片到达时, 就会传递该jobId, 触发一次任务调度.
② XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());// 根据配置, 获取该类型的执行器信息, 根据jobInfo中的job_group字段值, 找到该类型执行器所属的组
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
ExecutorBlockStrategyEnum.SERIAL_EXECUTION);// 根据配置, 匹配运行模式, 默认单机串行
ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
ExecutorFailStrategyEnum.FAIL_ALARM); //根据配置, 匹配失败后的处理模式, 默认失败告警
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),// 根据配置, 设置路由的策略, 获取路由策略, 必填, 没有默认值, 根据配置获取路由策略
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();// 获取该执行器的集群机器列表
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
&& CollectionUtils.isNotEmpty(addressList)) // 判断是否需要惊醒分片广播, 即使, 是否本次任务是否每一个executor执行器都执行一遍本次任务
这里分为了两种情况, 如果进行分片广播的话, 就省去了选择路由策略这一步, 否则下面就得需要根据前端的配置项, 选择路由策略.
首先分析, 需要进行分片广播的形式:
接着, 再分析不是分片广播的情况:其实无非就是多了一个路由策略选择的过程, 如下:
1.3 执行调度时的路由策略选择分析
目前xxl-job支持, 第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等这几种路由策略, 以下就是调度时的路由策略类图, 从图中可以看出是一种典型的 策略模式
以上就是XxlJobTrigger类的trigger方法, 做的一些事情以及实现方式, 下面分析当执行分片调度时, 执行的runExecutor()方法, 和不执行分片策略, 而根据具体的路由策略执行的router.routeRun()方法时, 是如何完成rpc远程通信的.
其实router.routeRun()方法最终也是执行runExecutor()方法, 如下图:
附上代码:
public static void trigger(int jobId) {
// 通过JobId从数据库中查询该任务的具体信息
// load data
XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> xxl-job trigger fail, jobId invalid,jobId={}", jobId);
return;
}
logger.info("==>XxlJobTrigger.trigger jobInfo:{}", JSON.toJSONString(jobInfo));
// 获取该类型的执行器信息, 根据jobInfo中的job_group字段值, 找到该类型执行器所属的组
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info
logger.info("==>XxlJobTrigger.trigger group:{}", JSON.toJSONString(group));
// 匹配运行模式, 默认单机串行
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block
// strategy
// 匹配失败后的处理模式, 默认失败告警
ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
ExecutorFailStrategyEnum.FAIL_ALARM); // fail
// strategy
// 设置路由的策略
// 获取路由策略, 必填, 没有默认值, 根据配置获取路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),
null); // route strategy
// 获取该执行器的集群机器列表
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
// broadcast -- 判断路由策略, 而且执行器的机器列表不能为空 // 判断路由策略 是否为 分片广播模式
// 分片广播 : for循环遍历所有地址, 都发送一遍执行
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
&& CollectionUtils.isNotEmpty(addressList)) {
for (int i = 0; i < addressList.size(); i++) {
String address = addressList.get(i);
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、prepare trigger-info
// jobLog.setExecutorAddress(executorAddress);
jobLog.setGlueType(jobInfo.getGlueType());
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append("注册方式:").append((group.getAddressType() == 0) ? "自动注册" : "手动录入");
triggerMsgSb.append("<br>阻塞处理策略:").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>失败处理策略:").append(failStrategy.getTitle());
triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle()).append("(" + i + "/"
+ addressList.size()
+ ")"); // update01
// 3、trigger-valid
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(i);
triggerParam.setBroadcastTotal(addressList.size()); // update02
// 4.2、trigger-run (route run / trigger remote executor)
// 利用动态代理, admin调度中心发起一次rpc请求executor执行器端, 根据参数执行一次cron job任务
// 并且返回执行结果
triggerResult = runExecutor(triggerParam, address); // update03
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>触发调度<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 重试一次, 且失败策略是配置的"失败重试"
// 4.3、trigger (fail retry)
if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
&& failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
triggerResult = runExecutor(triggerParam, address); // update04
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
}
// 5、save trigger-info
jobLog.setExecutorAddress(triggerResult.getContent());
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 6、monitor triger
//
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
return;
}
// 否则除分片模式外,其他的路由策略均走这里, 比如First--第一个
// 每触发一次job, 就记录一次log表
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
// 将jobLog信息首先进行保存
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、prepare trigger-info
// jobLog.setExecutorAddress(executorAddress);
jobLog.setGlueType(jobInfo.getGlueType());
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append("注册方式:").append((group.getAddressType() == 0) ? "自动注册" : "手动录入");
triggerMsgSb.append("<br>阻塞处理策略:").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>失败处理策略:").append(failStrategy.getTitle());
triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle());
// 3、trigger-valid
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(0);
triggerParam.setBroadcastTotal(1);
// 此处使用了策略模式, 根据不同的策略 使用不同的实现类,此处不再详细说明
// 4.2、trigger-run (route run / trigger remote executor)
// 根据不同的路由模式, 解析addressList
// 根据前台的配置从jobInfo获取的策略, 并放入executorRouteStrategyEnum实例中, 调用相应的策略类
// 将下面代码拆分后, 便于debug调试:
// triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
ExecutorRouter router = executorRouteStrategyEnum.getRouter();
// 利用动态代理, admin调度中心发起一次rpc请求executor执行器端, 根据参数执行一次cron job任务
// 并且返回执行结果
triggerResult = router.routeRun(triggerParam, addressList);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>触发调度<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 失败重试一次, 而且失败处理策略为"失败重试"
// 4.3、trigger (fail retry)
if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
&& failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
}
// 5、save trigger-info
jobLog.setExecutorAddress(triggerResult.getContent());
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
// 更新操作, 将操作参数, 执行返回结果值写入log表.
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 6、monitor triger
// 将logid写入缓存队列, JobFailMonitorHelper开启一个线程从缓存中取数据, 根据该id从数据库中查执行结果
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
1.4 admin调度中心和executor执行器之间的rpc远程通信, 是如何进行数据交互的
①首先从缓存中根据address为key获取动态代理类实例,
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
jdk的方式实现动态代理:
所以当admin调度中心, 每触发一次调度时, 最终调用executorBiz.run(triggerParam)方法, 其实底层就是利用了jdk的动态代理实现了一次rpc远程调用, 并接收返回的执行结果.而executor执行器接受到admin调度中心传递过来的执行参数, 利用java的反射技术具体调用实现类的具体方法.下一篇将分析executor执行器端, 是如何接受admin调度中心的参数, 和利用该参数使用反射技术调用具体的实现类的.
附上代码:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
ReturnT<String> runResult = null;
try {
// 返回一个ExcutorBiz动态代理类对象,重点在这个方法里面
// 创建执行器代理对象, 首先从executorBizRepository内存仓库根据根据key=address地址, 获取该executor执行器代理对象
// 首次从executorBizRepository内存仓库中没有该key=address的代理对象, 就创建一个该key=address的executor执行器代理对象
// 以ip地址作为key, 获取executor执行器代理对象
// 当根据ip作为key, 从本地缓存中获取不到executor执行器代理对象时, 就创建一个ip地址的executor执行器代理对象
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
// 这个run 方法不会最终执行,仅仅只是为了触发 proxy object 的 invoke方法,同时将目标的类型传送给服务端,
// 因为在代理对象的invoke的方法里面没有执行目标对象的方法
// 通过网络, admin调度中心发起一次rpc请求executor执行器端的jetty server, 执行器端ExecutorBizImpl类执行run()方法
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(e.getMessage(), e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, "" + e);
}
StringBuffer runResultSB = new StringBuffer("触发调度:");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
runResult.setContent(address);
return runResult;
}
总结:
一,本章首先分析了job的调度过程, 触发一次调度都经历了什么流程, 都干了什么事:
基于quartz框架在cron时间片到达后, 触发一次调度, 从数据库中获取前台页面配置的job执行参数, 从而选择调度的路由策略.
二 ,其次, 分析了调度策略都有哪些:
第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
三, 接着分析了admin调度中心与executor执行器之间进行数据通信的方式是什么, 数据传输的形式:
admin调度中心和executor执行器通信的方式是http+hessian的形式, admin调度中心这边采用动态代理技术每触发一次调度请求, 就会发起一次网络请求, 而executor执行器利用反射技术, 执行具体实现类的具体方法, 并返回执行结果.
下一篇将分析executor执行器端, 是如何接受admin调度中心的参数, 以及如何利用该参数, 使用反射技术调用具体实现类的具体方法返回结果的.
更多推荐
所有评论(0)