【引言】

上篇博客分析的xxl-job的调度中心任务触发源码,本篇博客分析的内容是执行器在接收到任务后,如何处理的。

【实现】

在xxl-job系列博客的第一篇demo实例中,结合spring boot框架集成的,其中在xxl-job配置类中,有如下一段代码:

	@Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
        xxlJobExecutor.setAdminAddresses(adminAddresses);
        xxlJobExecutor.setAppName(appName);
        xxlJobExecutor.setIp(ip);
        xxlJobExecutor.setPort(port);
        xxlJobExecutor.setAccessToken(accessToken);
        xxlJobExecutor.setLogPath(logPath);
        xxlJobExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobExecutor;
    }

执行器在接收到任务后,会执行start(),代码如下:

 public void start() throws Exception {
        // init admin-client
        // 初始化调度中心的地址列表, 通过NetComClientProxy创建好adminBiz实例
        initAdminBizList(adminAddresses, accessToken);

        // init executor-jobHandlerRepository
        // 初始化所有带有@JobHandler的handle, 根据name , 放入一个ConcurrentHashMap 中
        initJobHandlerRepository(applicationContext);

        // init logpath
        // 初始化本地日志路径
        XxlJobFileAppender.initLogPath(logPath);

        // init executor-server
        // 初始化本地jetty服务器
        initExecutorServer(port, ip, appName, accessToken);

        // init JobLogFileCleanThread
        // 启动一个线程,用来清理本地日志, 默认保留最近一天的日志
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
}

以上五个方法,重点在第四个,源码如下:

private void initExecutorServer(int port, String ip, String appName, String accessToken) throws Exception {
        // valid param
        // 如果port为空,则默认9999为他的jetty服务器端口
        port = port>0?port: NetUtil.findAvailablePort(9999);

        // start server
        // 创建一个ExecutorService 实例,放入Map中,后面会通过class获取到他的实例执行run方法
        NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());   // rpc-service, base on jetty
        NetComServerFactory.setAccessToken(accessToken);
        // 启动jetty 服务器
        serverFactory.start(port, ip, appName); // jetty + registry
    }

jetty启动方法源码如下:

public void start(final int port, final String ip, final String appName) throws Exception {
		thread = new Thread(new Runnable() {
			@Override
			public void run() {

				// The Server
				server = new Server(new ExecutorThreadPool(32, 256, 60L * 1000));  // 非阻塞

				// HTTP connector
				ServerConnector connector = new ServerConnector(server);
				if (ip!=null && ip.trim().length()>0) {
					//connector.setHost(ip);	// The network interface this connector binds to as an IP address or a hostname.  If null or 0.0.0.0, then bind to all interfaces.
				}
				connector.setPort(port);
				server.setConnectors(new Connector[]{connector});

				// Set a handler
				HandlerCollection handlerc =new HandlerCollection();
				handlerc.setHandlers(new Handler[]{new JettyServerHandler()});
				server.setHandler(handlerc);

				try {
					// Start server
					server.start();
					logger.info(">>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);

					// Start Registry-Server
					// 启动一个执行器注册的线程, 该线程第一次执行的时候,将该执行器的信息注册到数据库, xxl_job_qrtz_trigger_registry 这张表中  ,

                    // 此后,每过30秒, 执行器就会去数据库更新数据,表示自己还在存活中

                    // 调度中心那边会有一个线程定期的去数据库扫描,会自动的将30秒之内未更新信息的机器剔除, 同时将新加入的服务载入到集群列表中
            
					ExecutorRegistryThread.getInstance().start(port, ip, appName);

					// Start Callback-Server
					// 启动一个日志监控的线程,里面设置了一个队列,每次有任务结束后,都会把任务的日志ID和处理结果放入队列,

                    // 线程从队列里面拿到日志ID和处理结果,通过调用adminBiz的callback方法来回调给调度中心执行结果
					TriggerCallbackThread.getInstance().start();

					server.join();	// block until thread stopped
					logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port);
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				} finally {
					//destroy();
				}
			}
		});
		thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
		thread.start();
	}

JettyServerHandler 接收请求后的核心处理方法主要是NetComServerFactory中的invokeService(),源码如下:

public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
// request参数就是上篇博客中最后写到的通过调度中心发送的http请求参数
		if (serviceBean==null) {
		//  这个serviceBean 就是在执行器启动的时候,initExecutorServer () 这个方法中,将一个ExecutorBiz的实例放进去了,此处通过classname来获取这个实例
			serviceBean = serviceMap.get(request.getClassName());
		}
		if (serviceBean == null) {
			// TODO
		}

		RpcResponse response = new RpcResponse();

		if (System.currentTimeMillis() - request.getCreateMillisTime() > 180000) {
			response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The timestamp difference between admin and executor exceeds the limit."));
			return response;
		}
		if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) {
			response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The access token[" + request.getAccessToken() + "] is wrong."));
			return response;
		}

		try {
			Class<?> serviceClass = serviceBean.getClass();
			String methodName = request.getMethodName();
			Class<?>[] parameterTypes = request.getParameterTypes();
			Object[] parameters = request.getParameters();

			FastClass serviceFastClass = FastClass.create(serviceClass);
			FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);

			Object result = serviceFastMethod.invoke(serviceBean, parameters);

			response.setResult(result);
		} catch (Throwable t) {
			t.printStackTrace();
			response.setError(t.getMessage());
		}

		return response;
	}

通过调度任务和以上分析,可以得出下面执行的方法是ExecutorBizImpl中的run方法:

@Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        //匹配任务类型, BEAN模式
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            // 通过参数中的handlerName从本地内存中获取handler实例 (在执行器启动的时候,是把所有带有@JobHandler的实例通过name放入到一个map中的 )
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }
       // GLUE模式
        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
			...
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
 // 其他脚本执行模式
           ...
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        // 如果jobThread为空,那么这个时候,就要注册一个线程到本地线程库里面去。 同时启动这个线程。
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        // 将本次任务的参数 ,放入到队列里面去,供线程调度。
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

以上过程主要是通过jobid,从本地线程库去获取该任务对应的线程,同时,如果任务的JobHandler有更新的话,那么会自动使用最新的jobHandler,其次根据任务的阻塞策略,执行不同的操作。最终,如果是第一次执行任务的时候,系统会分配给任务一个线程,同时启动该线程。最后,看下任务的执行方法,JobThread.run(),源码如下:

@Override
	public void run() {

    	// init
    	try {
    	// 执行IJobHandler 中的init方法,以后如果有一些,在执行handler之前的初始化的工作,可以覆写这个方法
			handler.init();
		} catch (Throwable e) {
    		logger.error(e.getMessage(), e);
		}

		// execute
		while(!toStop){
			running = false;
			idleTimes++;

            TriggerParam triggerParam = null;
            ReturnT<String> executeResult = null;
            try {
				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if (triggerParam!=null) {
					running = true;
					idleTimes = 0;
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
					XxlJobFileAppender.contextHolder.set(logFileName);
					// 写入分片信息, 将当前机器的分片标记和分片总数写入到ShardingUtil中
					ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

					// execute
					XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							final TriggerParam triggerParamTmp = triggerParam;
							FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
								@Override
								public ReturnT<String> call() throws Exception {
								// 执行
									return handler.execute(triggerParamTmp.getExecutorParams());
								}
							});
							futureThread = new Thread(futureTask);
							futureThread.start();

							executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						} catch (TimeoutException e) {

							XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
							XxlJobLogger.log(e);

							executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
						} finally {
							futureThread.interrupt();
						}
					} else {
						// just execute
						executeResult = handler.execute(triggerParam.getExecutorParams());
					}

					if (executeResult == null) {
						executeResult = IJobHandler.FAIL;
					}
					XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

				} else {
					if (idleTimes > 30) {
						XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
					}
				}
			} catch (Throwable e) {
				if (toStop) {
					XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				StringWriter stringWriter = new StringWriter();
				e.printStackTrace(new PrintWriter(stringWriter));
				String errorMsg = stringWriter.toString();
				executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

				XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
			} finally {
                if(triggerParam != null) {
                    // callback handler info
                    if (!toStop) {
                        // commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
                    } else {
                        // is killed
                        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]");
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
                    }
                }
            }
        }

		// callback trigger request in queue
		while(triggerQueue !=null && triggerQueue.size()>0){
			TriggerParam triggerParam = triggerQueue.poll();
			if (triggerParam!=null) {
				// is killed
				ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
				// handler执行完成之后,将结果写入到日志里面去, 就是在执行器启动的时候,会建立一个线程,用来实时处理日志,此处是将结果和logID放入到队列里面去,

                //  日志线程异步的去处理
				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
			}
		}

		// destroy
		try {
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
	}

【总结】

通过以上分析过程,只能说大致了解了执行器处理的过程,结合之前的几篇博客,只能说是对xxl-job有了大概的了解,学习过程中,除了官网资料,还查了不少其他博客文章做参考,便于自己理解。

从最开始的demo实例到源码分析,这一系列博客下来,还是学习到很多。源码分析过程中并不是源码都能看懂,但在看代码过程中还是看到了不少熟悉的代码,类似线程,队列,其他工具类的使用,这也是一个积累,以后将其用在项目实践中也是很好的。

GitHub 加速计划 / xx / xxl-job
27.15 K
10.79 K
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:2 个月前 )
e5d26ba2 - 3 个月前
977ad87b - 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐