admin调度中心的对于job的调度过程, 调度策略以及rpc通信

1.1 job调度过程

RemoteHttpJobBean类实现了QuartzJobBean, 当cron时间片到达时, 就会触发一次quartz调用, 回调executeInternal()方法, 而XxlJobTrigger.trigger(jobId)具体实现细节就是进行了一次网络请求.

// 当cron时间片到时, 就会调用RemoteHttpJobBean的executeInternal触发一次网络请求给执行器
// @DisallowConcurrentExecution
public class RemoteHttpJobBean extends QuartzJobBean {

    private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class);

    // 1 当quartz cron执行周期到达时, 就回调executeInternal()方法
    // 2 当点击"执行"时, 也会触发executeInternal()回调方法
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

        // load jobId
        JobKey jobKey = context.getTrigger().getJobKey();

        // 从xxl-job-qrtz-job-detail表中获取, jobname的值, 即为jobId
        Integer jobId = Integer.valueOf(jobKey.getName());

        // 调度中心触发, 发起一次rpc网络请求executor执行
        // trigger
        XxlJobTrigger.trigger(jobId);
    }

}

1.2 XxlJobTrigger.trigger()分析

该类的trigger()方法做的事情比较多, 所以代码量相对比较多一点, 咱们就来一行一行解析, 每段代码就做了什么事情.

① XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // 该jobid就是quartz框架触发一次调度时生成的, 这个jobid指的就是具体的哪一个job被执行.所以这行代码根据该jobId从xxl-job-qrtz-trigger-info表中查询本次job执行的一些参数, 这些参数就是在前台页面上新增任务时的一些参数, 如下图:比如路由策略, 运行模式, 阻塞处理策略, cron表达式等, 保存时, 就向该job保存到了xxl-job-qrtz-trigger-info表, 并生成了一个jobId, 当cron时间片到达时, 就会传递该jobId, 触发一次任务调度.

②  XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());// 根据配置, 获取该类型的执行器信息, 根据jobInfo中的job_group字段值, 找到该类型执行器所属的组
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
                                                                                  ExecutorBlockStrategyEnum.SERIAL_EXECUTION);// 根据配置, 匹配运行模式, 默认单机串行


        ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
                                                                               ExecutorFailStrategyEnum.FAIL_ALARM); //根据配置,  匹配失败后的处理模式, 默认失败告警
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),// 根据配置, 设置路由的策略, 获取路由策略, 必填, 没有默认值, 根据配置获取路由策略
        ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();// 获取该执行器的集群机器列表
        

if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
            && CollectionUtils.isNotEmpty(addressList)) // 判断是否需要惊醒分片广播, 即使, 是否本次任务是否每一个executor执行器都执行一遍本次任务

这里分为了两种情况, 如果进行分片广播的话, 就省去了选择路由策略这一步, 否则下面就得需要根据前端的配置项, 选择路由策略.

首先分析, 需要进行分片广播的形式:

接着, 再分析不是分片广播的情况:其实无非就是多了一个路由策略选择的过程, 如下:

1.3 执行调度时的路由策略选择分析

目前xxl-job支持, 第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等这几种路由策略, 以下就是调度时的路由策略类图, 从图中可以看出是一种典型的 策略模式

以上就是XxlJobTrigger类的trigger方法, 做的一些事情以及实现方式, 下面分析当执行分片调度时, 执行的runExecutor()方法, 和不执行分片策略, 而根据具体的路由策略执行的router.routeRun()方法时, 是如何完成rpc远程通信的.

其实router.routeRun()方法最终也是执行runExecutor()方法, 如下图:

附上代码:

public static void trigger(int jobId) {

        // 通过JobId从数据库中查询该任务的具体信息
        // load data
        XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
        if (jobInfo == null) {
            logger.warn(">>>>>>>>>>>> xxl-job trigger fail, jobId invalid,jobId={}", jobId);
            return;
        }

        logger.info("==>XxlJobTrigger.trigger jobInfo:{}", JSON.toJSONString(jobInfo));

        // 获取该类型的执行器信息, 根据jobInfo中的job_group字段值, 找到该类型执行器所属的组
        XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info

        logger.info("==>XxlJobTrigger.trigger group:{}", JSON.toJSONString(group));
        // 匹配运行模式, 默认单机串行
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
                                                                                  ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block
                                                                                                                               // strategy
        // 匹配失败后的处理模式, 默认失败告警
        ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
                                                                               ExecutorFailStrategyEnum.FAIL_ALARM); // fail
                                                                                                                     // strategy

        // 设置路由的策略
        // 获取路由策略, 必填, 没有默认值, 根据配置获取路由策略
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),
                                                                                              null); // route strategy
        // 获取该执行器的集群机器列表
        ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();

        // broadcast -- 判断路由策略, 而且执行器的机器列表不能为空 // 判断路由策略 是否为 分片广播模式
        // 分片广播 : for循环遍历所有地址, 都发送一遍执行
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
            && CollectionUtils.isNotEmpty(addressList)) {
            for (int i = 0; i < addressList.size(); i++) {
                String address = addressList.get(i);

                // 1、save log-id
                XxlJobLog jobLog = new XxlJobLog();
                jobLog.setJobGroup(jobInfo.getJobGroup());
                jobLog.setJobId(jobInfo.getId());
                XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
                logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

                // 2、prepare trigger-info
                // jobLog.setExecutorAddress(executorAddress);
                jobLog.setGlueType(jobInfo.getGlueType());
                jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
                jobLog.setExecutorParam(jobInfo.getExecutorParam());
                jobLog.setTriggerTime(new Date());

                ReturnT<String> triggerResult = new ReturnT<String>(null);
                StringBuffer triggerMsgSb = new StringBuffer();
                triggerMsgSb.append("注册方式:").append((group.getAddressType() == 0) ? "自动注册" : "手动录入");
                triggerMsgSb.append("<br>阻塞处理策略:").append(blockStrategy.getTitle());
                triggerMsgSb.append("<br>失败处理策略:").append(failStrategy.getTitle());
                triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
                triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle()).append("(" + i + "/"
                                                                                                     + addressList.size()
                                                                                                     + ")"); // update01

                // 3、trigger-valid
                if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
                    triggerResult.setCode(ReturnT.FAIL_CODE);
                    triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
                }

                if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
                    // 4.1、trigger-param
                    TriggerParam triggerParam = new TriggerParam();
                    triggerParam.setJobId(jobInfo.getId());
                    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
                    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
                    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
                    triggerParam.setLogId(jobLog.getId());
                    triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
                    triggerParam.setGlueType(jobInfo.getGlueType());
                    triggerParam.setGlueSource(jobInfo.getGlueSource());
                    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
                    triggerParam.setBroadcastIndex(i);
                    triggerParam.setBroadcastTotal(addressList.size()); // update02

                    // 4.2、trigger-run (route run / trigger remote executor)
                    // 利用动态代理, admin调度中心发起一次rpc请求executor执行器端, 根据参数执行一次cron job任务
                    // 并且返回执行结果
                    triggerResult = runExecutor(triggerParam, address); // update03
                    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>触发调度<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());

                    // 重试一次, 且失败策略是配置的"失败重试"
                    // 4.3、trigger (fail retry)
                    if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
                        && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
                        triggerResult = runExecutor(triggerParam, address); // update04
                        triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
                    }
                }

                // 5、save trigger-info
                jobLog.setExecutorAddress(triggerResult.getContent());
                jobLog.setTriggerCode(triggerResult.getCode());
                jobLog.setTriggerMsg(triggerMsgSb.toString());
                XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);

                // 6、monitor triger
                //
                JobFailMonitorHelper.monitor(jobLog.getId());
                logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

            }
            return;
        }

        // 否则除分片模式外,其他的路由策略均走这里, 比如First--第一个
        // 每触发一次job, 就记录一次log表
        // 1、save log-id
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());

        // 将jobLog信息首先进行保存
        XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、prepare trigger-info
        // jobLog.setExecutorAddress(executorAddress);
        jobLog.setGlueType(jobInfo.getGlueType());
        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
        jobLog.setExecutorParam(jobInfo.getExecutorParam());
        jobLog.setTriggerTime(new Date());

        ReturnT<String> triggerResult = new ReturnT<String>(null);
        StringBuffer triggerMsgSb = new StringBuffer();
        triggerMsgSb.append("注册方式:").append((group.getAddressType() == 0) ? "自动注册" : "手动录入");
        triggerMsgSb.append("<br>阻塞处理策略:").append(blockStrategy.getTitle());
        triggerMsgSb.append("<br>失败处理策略:").append(failStrategy.getTitle());
        triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
        triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle());

        // 3、trigger-valid
        if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
            triggerResult.setCode(ReturnT.FAIL_CODE);
            triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
        }

        if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
            // 4.1、trigger-param
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setExecutorParams(jobInfo.getExecutorParam());
            triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            triggerParam.setLogId(jobLog.getId());
            triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
            triggerParam.setGlueType(jobInfo.getGlueType());
            triggerParam.setGlueSource(jobInfo.getGlueSource());
            triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
            triggerParam.setBroadcastIndex(0);
            triggerParam.setBroadcastTotal(1);

            // 此处使用了策略模式, 根据不同的策略 使用不同的实现类,此处不再详细说明
            // 4.2、trigger-run (route run / trigger remote executor)
            // 根据不同的路由模式, 解析addressList

            // 根据前台的配置从jobInfo获取的策略, 并放入executorRouteStrategyEnum实例中, 调用相应的策略类
            // 将下面代码拆分后, 便于debug调试:
            // triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);

            ExecutorRouter router = executorRouteStrategyEnum.getRouter();

            // 利用动态代理, admin调度中心发起一次rpc请求executor执行器端, 根据参数执行一次cron job任务
            // 并且返回执行结果
            triggerResult = router.routeRun(triggerParam, addressList);

            triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>触发调度<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());

            // 失败重试一次, 而且失败处理策略为"失败重试"
            // 4.3、trigger (fail retry)
            if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
                && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
                triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
                triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
            }
        }

        // 5、save trigger-info
        jobLog.setExecutorAddress(triggerResult.getContent());
        jobLog.setTriggerCode(triggerResult.getCode());
        jobLog.setTriggerMsg(triggerMsgSb.toString());

        // 更新操作, 将操作参数, 执行返回结果值写入log表.
        XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);

        // 6、monitor triger
        // 将logid写入缓存队列, JobFailMonitorHelper开启一个线程从缓存中取数据, 根据该id从数据库中查执行结果
        JobFailMonitorHelper.monitor(jobLog.getId());
        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }

1.4 admin调度中心和executor执行器之间的rpc远程通信, 是如何进行数据交互的

①首先从缓存中根据address为key获取动态代理类实例,

ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);

jdk的方式实现动态代理:

所以当admin调度中心, 每触发一次调度时, 最终调用executorBiz.run(triggerParam)方法, 其实底层就是利用了jdk的动态代理实现了一次rpc远程调用, 并接收返回的执行结果.而executor执行器接受到admin调度中心传递过来的执行参数, 利用java的反射技术具体调用实现类的具体方法.下一篇将分析executor执行器端, 是如何接受admin调度中心的参数, 和利用该参数使用反射技术调用具体的实现类的.

附上代码:

 public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
        ReturnT<String> runResult = null;
        try {
            // 返回一个ExcutorBiz动态代理类对象,重点在这个方法里面
            // 创建执行器代理对象, 首先从executorBizRepository内存仓库根据根据key=address地址, 获取该executor执行器代理对象
            // 首次从executorBizRepository内存仓库中没有该key=address的代理对象, 就创建一个该key=address的executor执行器代理对象

            // 以ip地址作为key, 获取executor执行器代理对象
            // 当根据ip作为key, 从本地缓存中获取不到executor执行器代理对象时, 就创建一个ip地址的executor执行器代理对象
            ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);

            // 这个run 方法不会最终执行,仅仅只是为了触发 proxy object 的 invoke方法,同时将目标的类型传送给服务端,
            // 因为在代理对象的invoke的方法里面没有执行目标对象的方法

            // 通过网络, admin调度中心发起一次rpc请求executor执行器端的jetty server, 执行器端ExecutorBizImpl类执行run()方法
            runResult = executorBiz.run(triggerParam);

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, "" + e);
        }

        StringBuffer runResultSB = new StringBuffer("触发调度:");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        runResult.setContent(address);
        return runResult;
    }

总结:

一,本章首先分析了job的调度过程, 触发一次调度都经历了什么流程, 都干了什么事:

基于quartz框架在cron时间片到达后, 触发一次调度, 从数据库中获取前台页面配置的job执行参数, 从而选择调度的路由策略.

二 ,其次, 分析了调度策略都有哪些:

第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;

三, 接着分析了admin调度中心与executor执行器之间进行数据通信的方式是什么, 数据传输的形式:

admin调度中心和executor执行器通信的方式是http+hessian的形式, admin调度中心这边采用动态代理技术每触发一次调度请求, 就会发起一次网络请求, 而executor执行器利用反射技术, 执行具体实现类的具体方法, 并返回执行结果.

下一篇将分析executor执行器端, 是如何接受admin调度中心的参数, 以及如何利用该参数, 使用反射技术调用具体实现类的具体方法返回结果的.

 

GitHub 加速计划 / xx / xxl-job
23
10
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:4 个月前 )
e5d26ba2 - 5 个月前
977ad87b - 5 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐