作者:鱼仔

博客首页: https://codeease.top

公众号:Java鱼仔

前言

在XxlJob源码的samples代码中,配置文件里可以看到这样一段配置

在这段配置里配置了Xxl所需要的配置信息,包括地址、IP、端口等信息,这里的XxlJobSpringExecutor是一个任务执行器,本文将会介绍xxl-job中的任务执行器源码。

源码分析

xxl-job任务执行器涉及到的类主要三个:XxlJobExecutor、XxlJobSimpleExecutor和XxlJobSpringExecutor。其中XxlJobSimpleExecutor和XxlJobSpringExecutor继承自XxlJobExecutor,两个类分别用于处理普通任务和Spring任务。先来看看这个基础类XxlJobExecutor

XxlJobExecutor

进入这个类首先看到的是参数

// 调度中心配置的地址
private String adminAddresses;
// 访问控制的token
private String accessToken;
// 本地应用的名字
private String appname;
// 本地的地址,也就是注册地址,如果设置了值就忽略ip+端口的地址
private String address;
// 本地的ip
private String ip;
// 本地的端口
private int port;
// 日志路径
private String logPath;
// 保留日志的天数
private int logRetentionDays;

接着定义了一个start方法和destroy方法,来看看start方法

public void start() throws Exception {

    // init logpath
    // 初始化日志路径
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    // 初始化客户端地址信息
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    // 初始化日志文件清理线程
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    // 初始化触发回调接口
    TriggerCallbackThread.getInstance().start();

    // init executor-server
	// 初始化执行器服务器端
    initEmbedServer(address, ip, port, appname, accessToken);
}

start()方法中定义了五个方法的初始化流程,我们依次来看一下

initLogPath

这个方法是初始化日志相关的信息,初始化了日志文件的路径和glue文件的路径。

private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
public static void initLogPath(String logPath){
    // init
    if (logPath!=null && logPath.trim().length()>0) {
        logBasePath = logPath;
    }
    // mk base dir
    File logPathDir = new File(logBasePath);
    if (!logPathDir.exists()) {
        logPathDir.mkdirs();
    }
    logBasePath = logPathDir.getPath();

    // mk glue dir
    File glueBaseDir = new File(logPathDir, "gluesource");
    if (!glueBaseDir.exists()) {
        glueBaseDir.mkdirs();
    }
    glueSrcPath = glueBaseDir.getPath();
}

initAdminBizList

    private static List<AdminBiz> adminBizList;
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {

                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }

这个方法主要做了下面几件事情

  1. 通过解析传入的adminAddresses参数,将调度中心的地址拆分为多个地址字符串。
  2. 遍历每个地址字符串,创建一个AdminBizClient对象,该对象负责与对应的调度中心地址建立连接和进行通信。AdminBizClient是XxlJob提供的RPC调用客户端,用于与调度中心进行交互。
  3. 将创建的AdminBizClient对象添加到adminBizList列表中,以便后续使用。

总结起来,这段代码的作用是根据传入的调度中心地址和访问令牌,创建与调度中心的连接,并将连接对象存储在adminBizList列表中,以便后续使用。通过这个列表,可以实现与调度中心的交互,例如获取任务、上报任务执行结果等操作。

JobLogFileCleanThread

JobLogFileCleanThread 方法初始化了一个清理日志的线程

这个方法实现了一个定时清理过期日志文件的功能,通过启动一个后台线程,在每天固定时间执行清理操作,删除过期的日志文件。

/**
 * 1.获取日志文件所在的目录,并获取该目录下的所有子目录(即日期目录)。
 * 2.获取当前日期(不包含时间)。
 * 3.遍历每个子目录,进行以下判断和操作:
 *  - 如果子目录不是一个目录(即不是有效的日期目录),则跳过。
 *  - 如果子目录名称中不包含"-"字符,也跳过。
 *  - 解析子目录名称为日期对象,如果解析失败,则跳过。
 *  - 计算当前日期与子目录创建日期的时间差,如果超过logRetentionDays指定的天数,则删除该子目录及其下的所有文件。
 * 4.捕获异常并记录日志,如果不是停止状态,则记录异常信息。
 * 5.休眠1天的时间。
 * 6.重复执行上述步骤直到线程被停止。
 * @param logRetentionDays
 */
public void start(final long logRetentionDays){

    // limit min value
    if (logRetentionDays < 3 ) {
        return;
    }

    localThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!toStop) {
                try {
                    // clean log dir, over logRetentionDays
                    File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                    if (childDirs!=null && childDirs.length>0) {

                        // today
                        Calendar todayCal = Calendar.getInstance();
                        todayCal.set(Calendar.HOUR_OF_DAY,0);
                        todayCal.set(Calendar.MINUTE,0);
                        todayCal.set(Calendar.SECOND,0);
                        todayCal.set(Calendar.MILLISECOND,0);

                        Date todayDate = todayCal.getTime();

                        for (File childFile: childDirs) {

                            // valid
                            if (!childFile.isDirectory()) {
                                continue;
                            }
                            if (childFile.getName().indexOf("-") == -1) {
                                continue;
                            }

                            // file create date
                            Date logFileCreateDate = null;
                            try {
                                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                            } catch (ParseException e) {
                                logger.error(e.getMessage(), e);
                            }
                            if (logFileCreateDate == null) {
                                continue;
                            }

                            if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                FileUtil.deleteRecursively(childFile);
                            }

                        }
                    }

                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    TimeUnit.DAYS.sleep(1);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");

        }
    });
    localThread.setDaemon(true);
    localThread.setName("xxl-job, executor JobLogFileCleanThread");
    localThread.start();
}

TriggerCallbackThread

这是一个触发回调的方法,在这个回调方法中创建了两个回调线程,第一个是回调线程

// callback
triggerCallbackThread = new Thread(new Runnable() {

    @Override
    public void run() {

        // normal callback
        while(!toStop){
            try {
                HandleCallbackParam callback = getInstance().callBackQueue.take();
                if (callback != null) {

                    // callback list param
                    // 将回调任务放到回调集合中
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    callbackParamList.add(callback);

                    // callback, will retry if error
                    // 如果集合不为空,就触发一次回调
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        doCallback(callbackParamList);
                    }
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        // last callback
        // 在线程将要停止之后,将回调队列中剩下的回调任务全部执行
        try {
            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
            if (callbackParamList!=null && callbackParamList.size()>0) {
                doCallback(callbackParamList);
            }
        } catch (Exception e) {
            if (!toStop) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");

    }
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();

在上面线程中,在正常运行情况下,一直会通过循环取出回调任务,然后通过doCallback进行回调。当线程将要被关闭的时候,会有一段代码将回调队列中剩下的回调任务全部执行。

doCallback

执行回调调用了doCallback方法

private void doCallback(List<HandleCallbackParam> callbackParamList){
    boolean callbackRet = false;
    // callback, will retry if error
    // 遍历所有的调度器服务,通过rpc执行回调方法
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
            if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                callbackRet = true;
                break;
            } else {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
            }
        } catch (Exception e) {
            callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
        }
    }
    // 如果callback结果是失败的,则写入失败文件中
    if (!callbackRet) {
        appendFailCallbackFile(callbackParamList);
    }
}

在callback方法中,会遍历所有的调度器集合,然后每个调度器执行callback方法,这里的callback方法会通过rpc去调用调度器的方法。然后根据回调拿到的结果,执行不同的写日志的操作。

@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}

具体的执行过程,我们在后续的文章里聊。
doCallback方法的最后会将错误写入到失败文件中,这个失败文件会用作失败重试。

private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
    // valid
    if (callbackParamList==null || callbackParamList.size()==0) {
        return;
    }
    // append file
    // 将callback参数序列化成byte数组
    byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
    File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
    if (callbackLogFile.exists()) {
        for (int i = 0; i < 100; i++) {
            callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));
            if (!callbackLogFile.exists()) {
                break;
            }
        }
    }
    // 将byte数组写入文件
    FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
}

第一个回调线程做的事情都讲清楚了,接下来介绍TriggerCallbackThread中的第二个线程,重试线程。

triggerRetryCallbackThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while(!toStop){
            try {
                retryFailCallbackFile();
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }

            }
            try {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            } catch (InterruptedException e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
    }
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();

这个线程就做一件事情,每隔30秒执行一次retryFailCallbackFile()方法,在这个方法中,会扫描上面一步写入的失败文件,然后取出入参后再次调用doCallback方法,实现重试。

private void retryFailCallbackFile(){

    // valid
    // 对失败回调文件做一系列校验
    File callbackLogPath = new File(failCallbackFilePath);
    if (!callbackLogPath.exists()) {
        return;
    }
    if (callbackLogPath.isFile()) {
        callbackLogPath.delete();
    }
    if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {
        return;
    }

    // load and clear file, retry
    // 遍历回调文件,取出被序列化为byte数组的参数,执行doCallback
    for (File callbaclLogFile: callbackLogPath.listFiles()) {
        byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);

        // avoid empty file
        if(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){
            callbaclLogFile.delete();
            continue;
        }

        List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);

        callbaclLogFile.delete();
        doCallback(callbackParamList);
    }

initEmbedServer

这个方法负责初始化 xxl-job 的 EmbedServer,并配置服务器的地址、端口、应用名称和访问令牌等参数,然后启动该服务器。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    // 如果传入的端口大于0,则使用传入的端口;否则,使用 NetUtil.findAvailablePort(9999) 方法查找一个可用的端口。
    port = port>0?port: NetUtil.findAvailablePort(9999);
    // 如果传入的 IP 不为空,则使用传入的 IP;否则,使用 IpUtil.getIp() 方法获取本机 IP。
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    // 如果传入的地址为空,则根据 IP 和端口生成地址,格式为 http://{ip_port}/,其中 {ip_port} 会被替换为 IP 和端口的组合。
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    // 如果传入的访问令牌为空,则输出警告日志,提醒用户设置访问令牌以确保系统安全。
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    // 创建一个 EmbedServer 实例,并调用 start() 方法启动嵌入式服务器。
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

这个EmbedServer服务是基于Netty实现的NIO服务,主要为了实现包括任务下发、执行结果上报、心跳等功能。同时,通过嵌入式的 Netty HTTP 服务器,可以方便地处理任务调度中心的 HTTP 请求,并执行相应的任务操作。

XxlJobSimpleExecutor

XxlJobExecutor有两个子类,其中一个是XxlJobSimpleExecutor,用于处理简单的Java程序,不依赖Spring容器。

@Override
public void start() {

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(xxlJobBeanList);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

首先执行Job任务的注册,然后执行父类的start方法。

private void initJobHandlerMethodRepository(List<Object> xxlJobBeanList) {
    if (xxlJobBeanList==null || xxlJobBeanList.size()==0) {
        return;
    }

    // init job handler from method
    for (Object bean: xxlJobBeanList) {
        // method
        Method[] methods = bean.getClass().getDeclaredMethods();
        if (methods.length == 0) {
            continue;
        }
        for (Method executeMethod : methods) {
            XxlJob xxlJob = executeMethod.getAnnotation(XxlJob.class);
            // registry
            registJobHandler(xxlJob, bean, executeMethod);
        }

    }

}

注册方法也比较简单,如果用@XxlJob注解了,就注册到JobHandler中。

XxlJobSpringExecutor

XxlJobSpringExecutor是XxlJobExecutor的另一个子类,是 xxl-job 框架提供的基于 Spring 的执行器实现类,它是一个 Spring Bean,在 Spring 容器中进行管理。

public void afterSingletonsInstantiated() {

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

XxlJobSpringExecutor的初始化也是先执行Job任务的注册,然后执行父类的start方法。中间有一个刷新GlueFactory实例的方法,Glue是xxljob支持的一种脚本执行模式。注册方法的注释已经放在代码内了。

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
// 检查传入的 ApplicationContext 是否为空,如果为空,则直接返回。
if (applicationContext == null) {
    return;
}
// init job handler from method
// 从 ApplicationContext 中获取所有的 bean 定义名称,并遍历每个 bean。
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
// 对于每个 bean,首先检查是否有 @Lazy 注解,如果有,则跳过该 bean 的处理。否则,继续下一步。
for (String beanDefinitionName : beanDefinitionNames) {

    // get bean
    Object bean = null;
    Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
    if (onBean!=null){
        logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
        continue;
    }else {
        bean = applicationContext.getBean(beanDefinitionName);
    }

    // filter method
    Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
    try {
        // 使用 Spring 的 MethodIntrospector 工具类,检查 bean 类中的方法,并找到被 @XxlJob 注解修饰的方法。
        annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                new MethodIntrospector.MetadataLookup<XxlJob>() {
                    @Override
                    public XxlJob inspect(Method method) {
                        return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                    }
                });
    } catch (Throwable ex) {
        logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
    }
    if (annotatedMethods==null || annotatedMethods.isEmpty()) {
        continue;
    }

    // generate and regist method job handler
    for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
        // 获取方法对象和 @XxlJob 注解的信息。
        Method executeMethod = methodXxlJobEntry.getKey();
        XxlJob xxlJob = methodXxlJobEntry.getValue();
        // regist
        // 调用 registJobHandler() 方法,注册任务处理器,将 @XxlJob 注解和方法信息传递给注册方法。
        registJobHandler(xxlJob, bean, executeMethod);
    }
}
}

总结

本文主要对执行器进行了源码分析,源码类型的文章还是建议大家跟着代码一起走一遍,记忆会更加深刻。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐