xxl-job版本:2.3.0
Xxl-Job分为执行器、调度器。而我们平时的客户端就属于一个执行器,执行器启动的时候会自动注册到调度器上,然后调度器进行远程调度。
 
执行器初始化过程步骤如下
1 通过加了@Conguration注解的XxlJobConfig初始化,并生成beanName=xxlJobExecutor的Bean
 
2 注册的BeanName=XxlJobConfig,会进行初始化,步骤如下:
- 扫描所有bean,加载加了@XxlJob注解类,并记录在jobHandlerRepository
- 选择工厂类:GlueFactory 或 SpringGlueFactory
- 启动,其步骤如下:
--- 1  初始化存放执行日志目录文件
--- 2  初始化执行者,管理客户端
--- 3  初始化日志清除线程,一天执行一次,默认清除N天(可配置)前数据
--- 4  初始化回调触发器线程,线程执行完会把数据回调调度器接口告诉他结果
--- 5  初始化执行服务器,初始化netty服务器,并发客户端信息注册到调度器上
 

接下来对各个源码进行一定的解析
1 通过加了@Conguration注解的XxlJobConfig初始化,并生成beanName=xxlJobExecutor的Bean
@Configuration
public class XxlJobConfig {// 注入XxlJob相关的配置信息,并生成Bean xxlJobExecutor
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    ...........省略
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        ................省略
        return xxlJobSpringExecutor;
    }
}
 
2 注册的BeanName=XxlJobConfig,会进行初始化,步骤如下
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
 
    // start
    @Override
    public void afterSingletonsInstantiated() {
        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/
        // 扫描所有bean,加载加了@XxlJob注解类,并记录在jobHandlerRepository    init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);
        // 选择工厂类:GlueFactory 或 SpringGlueFactory   选择工厂类:GlueFactory 或 SpringGlueFactory   refresh GlueFactory
        GlueFactory.refreshInstance(1);
        // 启动,其步骤如下:
        //--- 1 初始化存放执行日志目录文件
        //--- 2 初始化执行者,管理客户端
        //--- 3 初始化日志清除线程
        //--- 4 初始化回调触发器线程
        //--- 5 初始化执行服务器
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
 
上面我们对启动super.start()步骤来做具体的分析,其分析代码如下:
public void start() throws Exception {
    // 1 初始化存放执行日志目录文件 init logpath
    XxlJobFileAppender.initLogPath(logPath);
    // 2 初始化执行者,管理客户端 init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);
    // 3 初始化日志清除线程 init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);
    // 4 初始化回调触发器线程 init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();
    // 5 初始化执行服务器 init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);
}
 
启动super.start() ->  初始化存放执行日志目录文件
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
public static void initLogPath(String logPath){
    // init 判断有没有自定义目录
    if (logPath!=null && logPath.trim().length()>0) {
        logBasePath = logPath;
    }
    // mk base dir 如果当前目录为空,就创建一个
    File logPathDir = new File(logBasePath);
    if (!logPathDir.exists()) {
        logPathDir.mkdirs();
    }
    logBasePath = logPathDir.getPath();// 获取创建的目录路径
 
// mk glue dir 创建glue目录路径,没有就创建然后获取
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}
 
 
启动super.start() ->  初始化执行者,管理客户端
-- 把调度管理器的地址写入adminBizList中
 
启动super.start() ->  初始化日志清除线程
public void start(final long logRetentionDays){
 
    // 日志最多也只能清除三天前的 limit min value
    if (logRetentionDays < 3 ) {
        return;
    }
    // 启动一个本地线程,用于处理日志清除
    localThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!toStop) {
                try {
                    // 清除日志目录中超过logRetentionDays天的日志文件 clean log dir, over logRetentionDays
                    File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                    if (childDirs!=null && childDirs.length>0) {
                    // today
                    Calendar todayCal = Calendar.getInstance();
                    todayCal.set(Calendar.HOUR_OF_DAY,0);
                    todayCal.set(Calendar.MINUTE,0);
                    todayCal.set(Calendar.SECOND,0);
                    todayCal.set(Calendar.MILLISECOND,0);
                    Date todayDate = todayCal.getTime();
                    for (File childFile: childDirs) {
                        // valid
                        if (!childFile.isDirectory()) {
                            continue;
                        }
                        if (childFile.getName().indexOf("-") == -1) {
                            continue;
                        }
                        // file create date
                        Date logFileCreateDate = null;
                        try {
                            // 将日志文件名转为时间
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                            logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                        } catch (ParseException e) {
                            logger.error(e.getMessage(), e);
                        }
                        if (logFileCreateDate == null) {
                            continue;
                        }
                        // 如果文件时间超过logRetentionDays天,就进行删除
                        if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                            FileUtil.deleteRecursively(childFile);
                        }
                    }
                }
            } catch (Exception e) {
                。。。。。。日志打印
            }
    localThread.setDaemon(true);// 后台守护线程运行
    localThread.setName("xxl-job, executor JobLogFileCleanThread");
    localThread.start();
}
 
启动super.start() -> 初始化回调触发器线程
回调job执行结果给调度器,告诉调度器已执行完
/**
* 启动执行结果回调线程
*/
public void start() {
    // 判断是否合法的admin地址 valid
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }
    // 启动回调触发线程 callback
    triggerCallbackThread = new Thread(new Runnable() {
        @Override
        public void run() {
            // 进入回调循环处理中 normal callback
            while(!toStop){
                try {
                    // 任务执行完,会把执行结果塞入 LinkedBlockingQueue 中,然后LinkedBlockingQueue.tack()是阻塞形的,会阻塞等待执行结果
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {
 
                        // 把执行结果全部搞出来,然后塞入callbackParamList,然后批量回调处理 callback list param
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);
                        // callback, will retry if error
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);// 回调执行结果,告诉admin,请求的结果是 addressUrl+"api/callback"
                        }
                    }
                } catch (Exception e) {......}
                    // 进行回到后的回归操作 last callback
                    try {
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    } catch (Exception e) {..........}
                    logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
                }
            });
            triggerCallbackThread.setDaemon(true);
            triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
            triggerCallbackThread.start();
 
            // 上面的回调失败重试 线程(回调失败后会把数据存在文件中,然后xxl-job从文件中获取并从新发起doCallback() retry
            triggerRetryCallbackThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while(!toStop){
                        try {
                            retryFailCallbackFile();
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
            }
        });
        triggerRetryCallbackThread.setDaemon(true);
        triggerRetryCallbackThread.start();
}
 
启动super.start() ->初始化执行服务器
初始化netty服务器,并发客户端信息注册到调度器上
// 初始化执行服务器
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    // 填充ip和端口号 fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    // 地址为空,就根据ip:port生成新的地址 generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port); // registry-addressdefault use address to registry , otherwise use         ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }
    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }
    // 实例化一个网络server start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);// 启动server,基于netty
}
// embedServer.start 调用的就是以下方法,启动nettyServer服务器,然后请求注册到调度器上,调度器会通过netty通知来调度job
public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(020060LTimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000)new ThreadFactory() {xxxxx});
            try {
                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    ............省略.................
                    .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                    }}).childOption(ChannelOption.SO_KEEPALIVE, true);
 
                // bind
                ChannelFuture future = bootstrap.bind(port).sync();
                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
                // 启动注册线程,启动后自动把服务器的信息注册到admin,然后每隔30s重新注册一次,调用api/registry
                // -- (如果停止执行会执行服务注销 api/registryRemove start registry
                startRegistry(appname, address);
                // 启动netty服务器 wait util stop
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {...........省略..............}
    });
    thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}
上面启动了nettyServer,并且对应的实现handler为EmbedHttpServerHandler,通过监听,最终调用如下代码:
--  路径为:EmbedHttpServerHandler->channelRead0->process()->executorBiz.run(triggerParam)->ExecutorBizImpl#run()
/**
* 执行job任务最终会调用到这里
* @param triggerParam 执行参数
*/
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // 加载出对应的jobHandler + jobThread load oldjobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;
    // validjobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {// bean的方式调用,现在一般都是这个方式,其他方式是怎么处理的就不解析了
        // new jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        // 验证旧的任务处理器,如果不相等,就创建一个新的 valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            jobThread = null;// 任务线程
            jobHandler = null;// 任务处理器
        }
        // 验证处理器并再次赋值 valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }
    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {......省略多种其他方式......}
 
    // 执行block策略 executor block strategy
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// 废弃后面来的
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {// 如果有运行中线程,就终止当前调用
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// 覆盖前面的
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {// 如果有运行中线程,就废弃前面的任务线程 jobThread = null
                removeOldReason = "block strategy effect" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
 
    // replace thread (new or exists invalid)
    if (jobThread == null) {//如果线程为空(创建了新的 或者 kill原来旧的)
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
 
    // 把数据入队处理 push data to queue,这里还有一个JobThread的守护进程会从queue里面取数据处理
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}
 
 
 
 
 
 
 
 
 
 
GitHub 加速计划 / xx / xxl-job
27.16 K
10.79 K
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:3 个月前 )
e5d26ba2 - 4 个月前
977ad87b - 4 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐