作者:鱼仔

博客首页: https://codeease.top

公众号:Java鱼仔

前言

前面两篇文章介绍了执行器和调度器的初始化流程,这篇文章将一起来看一下任务是如何被触发的。

源码分析

不管是主动触发执行还是被动触发执行,都会进入到JobInfoController 中的triggerJob 方法

JobInfoController

在这个接口中,首先会判断 executorParam 是否为 null,如果是 null 的话就设置executorParam参数为空,接着触发 JobTriggerPoolHelper 的 trigger 方法。

@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
    // force cover job param
    if (executorParam == null) {
        executorParam = "";
    }

    JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
    return ReturnT.SUCCESS;
}

JobTriggerPoolHelper

进入trigger方法后会进入到addTrigger方法内,这个方法做了两个事情,第一按执行耗时,将任务分到快线程池和或者慢线程池。第二件事情是触发 XxlJobTrigger.trigger 来执行任务。

public void addTrigger(final int jobId,
                   final TriggerTypeEnum triggerType,
                   final int failRetryCount,
                   final String executorShardingParam,
                   final String executorParam,
                   final String addressList) {

// choose thread pool
// 刚开始所有任务都放到Fast线程池内,如果任务执行时间大于500ms的次数大于10次,就放入Slow线程池中
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
    triggerPool_ = slowTriggerPool;
}

// trigger
triggerPool_.execute(new Runnable() {
    @Override
    public void run() {

        long start = System.currentTimeMillis();

        try {
            // do trigger
            // 调用 XxlJobTrigger.trigger 方法来触发作业执行,传递相应的参数。
            XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {

            // check timeout-count-map
            // 获取当前分钟对应的时间戳,并与之前记录的分钟时间戳 minTim 进行比较。如果不一致,表示进入了新的一分钟,清空超时计数映射表。
            long minTim_now = System.currentTimeMillis()/60000;
            if (minTim != minTim_now) {
                minTim = minTim_now;
                jobTimeoutCountMap.clear();
            }

            // incr timeout-count-map
            // 对于单次执行时间超过500ms的任务,jobTimeoutCountMap 增加一次计数
            long cost = System.currentTimeMillis()-start;
            if (cost > 500) {       // ob-timeout threshold 500ms
                AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                if (timeoutCount != null) {
                    timeoutCount.incrementAndGet();
                }
            }

        }

    }
});

XxlJobTrigger

在trigger方法中,会根据传入的参数做一系列的初始化,然后执行processTrigger方法进行任务的触发。

public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // load data
    // 根据 jobId 获取 XxlJobInfo 信息
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    // 如果传入的失败重试次数 failRetryCount 大于等于0,则将其作为最终的失败重试次数。否则使用作业信息中配置的执行失败重试次数。
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // cover addressList
    // 如果传入的地址列表 addressList 不为null且不为空字符串,则将作业组的地址类型设置为1(手动录入),地址列表设置为传入的地址列表。
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    // 如果分片策略为分片广播(SHARDING_BROADCAST),并且任务组的注册地址列表不为空且不为空集合,并且 shardingParam 为null,则遍历注册地址列表触发任务
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

processTrigger方法主要作用是处理触发作业执行的逻辑。它初始化触发参数,确定执行器地址,触发执行器执行作业,并记录触发信息和日志。

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
    // param
    // 阻塞策略
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    // 路由策略
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    // 分片参数
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1、save log-id
    // 保存 XxlJobLog 日志信息
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2、init trigger-param
    // 设置触发参数
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 3、init address
    // 根据不同的策略,确定执行器的地址
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4、trigger remote executor
    // 根据上一步确定的地址以及执行器参数,调用 runExecutor 方法,向远程执行器触发执行作业
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 5、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、save log trigger-info
    // 更新作业日志的触发信息,包括执行器地址、执行器处理器、执行器参数、执行器分片参数、失败重试次数
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

在上面这个方法中,通过runExecutor(triggerParam, address)触发任务的执行。在这个runExecutor方法中,通过RPC的方式找到对应地址的执行器,调用执行器的run方法。并将调用的结果返回。

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        // 通过RPC的方式找到对应地址的执行器,调用执行器的run方法
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }
    // 拼接调度返回结果
    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

ExecutorBizImpl

executorBiz.run(triggerParam),会根据传入的条件构建出IJobHandler和JobThread,最终将JobThread传入TriggerQueue等待触发。具体的细节流程我都在代码里加上中文注释了。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    // 根据传入的JobId,加载旧的jobHandler和jobThread(如果一个任务已经执行过一次,会存入jobThreadRepository这个本地的Map里)
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    // 根据不同的GlueType,构建不同的IJobHandler
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    // 如果触发参数中的 Glue 类型为 BEAN
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        // 校验jobThread是否存在且jobHandler是否与新的jobHandler相同。如果不相同,将jobThread和jobHandler设置为 null
        // valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        // 如果jobHandler是null,就给jobHandler赋值为新的jobHandler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

        // valid old jobThread
        // 校验jobThread是否存在且JobHandler是否是 GlueJobHandler 类型且 Glue 更新时间与触发参数中的 Glue 更新时间相同
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // 如果jobHandler是null,创建一个新的 GlueJobHandler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

        // valid old jobThread
        // 校验jobThread是否存在且JobHandler是否是 ScriptJobHandler 类型且 Glue 更新时间与触发参数中的 Glue 更新时间相同
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // 如果jobHandler是null,创建一个新的 ScriptJobHandler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // executor block strategy
    // 判断阻塞策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        // 如果阻塞策略为 DISCARD_LATER 并且jobThread正在运行或在触发队列中,则返回错误信息
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
            // 如果阻塞策略为 COVER_EARLY 并且任务线程正在运行或有队列,则终止旧的任务线程,并将任务线程设置为 null
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // replace thread (new or exists invalid)
    // 如果任务线程为null,注册一个新的任务线程
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // push data to queue
    // 将触发参数推送到任务线程的触发队列中,等待执行
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

JobThread

在上面这个方法中,最终会把triggerParam放入TriggerQueue中,那么真正的任务执行是在哪里呢?还是看上面这段代码,XxlJobExecutor.registJobThread这个方法中,注册了一个新的jobThread,并且通过start方法启动了这个线程。

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
    // 新建了一个 JobThread ,并且通过start方法启动
    JobThread newJobThread = new JobThread(jobId, handler);
    newJobThread.start();
    logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

    JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
    if (oldJobThread != null) {
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();
    }

    return newJobThread;
}

调用了线程的start方法之后,JobThread的run方法就开始执行。

@Override
public void run() {

    // init
    // 如果IJobHandler有初始化方法的话,就执行初始化方法
    try {
        handler.init();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    // execute
    while(!toStop){
        running = false;
        idleTimes++;

        TriggerParam triggerParam = null;
        try {
            // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
            // 从 triggerQueue 中获取 TriggerParam 对象,使用 poll 方法,最多等待 3 秒。如果获取到了 TriggerParam 对象,说明有任务需要执行
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerParam!=null) {
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());

                // log filename, like "logPath/yyyy-MM-dd/9999.log"
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                // 构建XxlJobContext上下文对象
                XxlJobContext xxlJobContext = new XxlJobContext(
                        triggerParam.getJobId(),
                        triggerParam.getExecutorParams(),
                        logFileName,
                        triggerParam.getBroadcastIndex(),
                        triggerParam.getBroadcastTotal());

                // init job context
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // execute
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
                // 如果任务设置了超时时间,则通过创建一个 FutureTask,在另一个线程中执行任务,并设置超时时间进行限制。
                if (triggerParam.getExecutorTimeout() > 0) {
                    // limit timeout
                    Thread futureThread = null;
                    try {
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {

                                // init job context
                                XxlJobContext.setXxlJobContext(xxlJobContext);
                                // 通过该方法真正执行任务
                                handler.execute();
                                return true;
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();

                        Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {
                        // 如果超时了,就会进入到catch方法内
                        XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                        XxlJobHelper.log(e);

                        // handle result
                        XxlJobHelper.handleTimeout("job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {
                    // just execute
                    // 如果没有设置超时时间,直接执行任务
                    handler.execute();
                }

                // valid execute handle data
                // 如果任务执行失败或结果丢失,调用 XxlJobHelper.handleFail() 方法进行处理。
                // 如果任务执行成功,记录执行结果信息。
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                    XxlJobHelper.handleFail("job handle result lost.");
                } else {
                    String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                    tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                            ?tempHandleMsg.substring(0, 50000).concat("...")
                            :tempHandleMsg;
                    XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                        + XxlJobContext.getXxlJobContext().getHandleCode()
                        + ", handleMsg = "
                        + XxlJobContext.getXxlJobContext().getHandleMsg()
                );

            } else {
                // 空闲次数计数器 idleTimes。如果 idleTimes 达到阈值(30 次),相当于有30次都没有执行对应的任务,就从执行器缓存中删除这个任务
                if (idleTimes > 30) {
                    if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                        XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            }
        } catch (Throwable e) {
            // 如果进入catch代码块内,记录异常信息

            if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }

            // handle result
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();

            XxlJobHelper.handleFail(errorMsg);

            XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {
            if(triggerParam != null) {
                // callback handler info
                // 如果任务线程未被停止,将推送回调信息 push 到 TriggerCallbackThread
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.getXxlJobContext().getHandleCode(),
                            XxlJobContext.getXxlJobContext().getHandleMsg() )
                    );
                } else {
                    // is killed
                    // 如果任务线程被停止,则推送线程停止信息到回调线程
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.HANDLE_CODE_FAIL,
                            stopReason + " [job running, killed]" )
                    );
                }
            }
        }
    }


    // callback trigger request in queue
    // 如果线程被停止后触发队列不为空,则推送线程停止信息到回调线程
    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    triggerParam.getLogId(),
                    triggerParam.getLogDateTime(),
                    XxlJobContext.HANDLE_CODE_FAIL,
                    stopReason + " [job not executed, in the job queue, killed.]")
            );
        }
    }

    // destroy
    // 如果执行器配置了destroyMethod方法,就执行对应的destroyMethod方法
    try {
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

真正执行是在handler.execute(); execute方法通过反射,执行目标方法。

@Override
public void execute() throws Exception {
    Class<?>[] paramTypes = method.getParameterTypes();
    if (paramTypes.length > 0) {
        method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
    } else {
        method.invoke(target);
    }
}

总结

这篇文章主要介绍了XXL-JOB在任务执行时相关的源码信息,结合前两章一起看,相信你可以对XXL-JOB有不一样的认识。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐