xxl-job分布任务调度框架-2源码解析
xxl-job
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
项目地址:https://gitcode.com/gh_mirrors/xx/xxl-job
免费下载资源
·
文章目录
一、执行器项目
1、执行器组件配置类
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
2、XxlJobSpringExecutor 类
InitializingBean接口:初始化后就会调用afterPropertiesSet()方法
ApplicationContextAware接口:有了获取applicationContext能务
DisposableBean接口:IOC销毁时 调用destory方法
3、查看XxlJobSpringExecutor 类的afterPropertiesSet()方法
@Override
public void afterPropertiesSet() throws Exception {
//获取类上有@JobHandler注解的bean,
//放到jobHandlerRepository变量map中
initJobHandlerRepository(applicationContext);
//将方法上有@XxlJob注解的方法,打包成MethodJobHandler类,加入到jobHandlerRepository变量map中
initJobHandlerMethodRepository(applicationContext);
//初始化GlueFactory=new SpringGlueFactory()
GlueFactory.refreshInstance(1);
super.start();
}
3.1、查看super.start();
public void start() throws Exception {
// 初始化日志路径logpath
XxlJobFileAppender.initLogPath(logPath);
//初始化调度中心客户端对象 AdminBizClient
//加入到List<AdminBiz> adminBizList变量中
initAdminBizList(adminAddresses, accessToken);
//启动日志清理线程,定时清理时间范围内的日志文件
JobLogFileCleanThread.getInstance().start(logRetentionDays);
//启动一个线程,消费callBackQueue队列,
//遍历adminBizList调用adminBiz.callback(callbackParamList)方法
TriggerCallbackThread.getInstance().start();
//初始化监听服务
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}
3.2、查看initRpcProvider(ip, port, appName, accessToken);方法
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.setServer(NettyHttpServer.class);
xxlRpcProviderFactory.setSerializer(HessianSerializer.class);
xxlRpcProviderFactory.setCorePoolSize(20);
xxlRpcProviderFactory.setMaxPoolSize(200);
xxlRpcProviderFactory.setIp(ip);
xxlRpcProviderFactory.setPort(port);
xxlRpcProviderFactory.setAccessToken(accessToken);
xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class);
xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam);
// add services,为个service是重点,
//后面netty处理业务handler会获取这个service中的bean,
//根据接收到的参数执行不同的方法
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
// start
xxlRpcProviderFactory.start();
}
3.3、xxlRpcProviderFactory.start();方法
只列出核心代码
public void start() throws Exception {
this.serializerInstance = (Serializer)this.serializer.newInstance();
this.serviceAddress = IpUtil.getIpPort(this.ip, this.port);
//实例化server对象,也就是NettyHttpServer对象
this.serverInstance = (Server)this.server.newInstance();
//设置NettyHttpServer的startedCallback方法
this.serverInstance.setStartedCallback(new BaseCallback() {
public void run() throws Exception {
if (XxlRpcProviderFactory.this.serviceRegistry != null) {
XxlRpcProviderFactory.this.serviceRegistryInstance = (ServiceRegistry)XxlRpcProviderFactory.this.serviceRegistry.newInstance();
XxlRpcProviderFactory.this.serviceRegistryInstance.start(XxlRpcProviderFactory.this.serviceRegistryParam);
if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
XxlRpcProviderFactory.this.serviceRegistryInstance.registry(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
}
}
}
});
//设置NettyHttpServer的StopedCallback方法
this.serverInstance.setStopedCallback(new BaseCallback() {
public void run() {
if (XxlRpcProviderFactory.this.serviceRegistryInstance != null) {
if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
XxlRpcProviderFactory.this.serviceRegistryInstance.remove(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
}
XxlRpcProviderFactory.this.serviceRegistryInstance.stop();
XxlRpcProviderFactory.this.serviceRegistryInstance = null;
}
}
});
//调用NettyHttpServer的start()方法
this.serverInstance.start(this);
}
3.4、查看NettyHttpServer.start()方法
这个方法,启动了一个 线程,启动netty服务监听9999端口,将NettyHttpServerHandler加到管道,这个类会接收输入流,处理业务逻辑
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
this.thread = new Thread(new Runnable() {
public void run() {
final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(NettyHttpServer.class.getSimpleName(), xxlRpcProviderFactory.getCorePoolSize(), xxlRpcProviderFactory.getMaxPoolSize());
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)});
}
}).childOption(ChannelOption.SO_KEEPALIVE, true);
//启动了一个netty服务,监听xxlRpcProviderFactory.getPort()这个端口,也就是9999端口
ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
NettyHttpServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());
NettyHttpServer.this.onStarted();
future.channel().closeFuture().sync();
} catch (InterruptedException var18) {
if (var18 instanceof InterruptedException) {
NettyHttpServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
} else {
NettyHttpServer.logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", var18);
}
} finally {
try {
serverHandlerPool.shutdown();
} catch (Exception var17) {
NettyHttpServer.logger.error(var17.getMessage(), var17);
}
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception var16) {
NettyHttpServer.logger.error(var16.getMessage(), var16);
}
}
}
});
this.thread.setDaemon(true);
//启动线程
this.thread.start();
}
3.5、NettyHttpServerHandler.channelRead0()方法
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());
final String uri = msg.uri();
final boolean keepAlive = HttpUtil.isKeepAlive(msg);
//启动一个线程,提交给线程池执行
this.serverHandlerPool.execute(new Runnable() {
public void run() {
NettyHttpServerHandler.this.process(ctx, uri, requestBytes, keepAlive);
}
});
}
NettyHttpServerHandler.this.process方法
private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive) {
String requestId = null;
XxlRpcResponse xxlRpcResponse;
byte[] responseBytes;
try {
if ("/services".equals(uri)) {
StringBuffer stringBuffer = new StringBuffer("<ui>");
Iterator var11 = this.xxlRpcProviderFactory.getServiceData().keySet().iterator();
while(var11.hasNext()) {
String serviceKey = (String)var11.next();
stringBuffer.append("<li>").append(serviceKey).append(": ").append(this.xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");
}
stringBuffer.append("</ui>");
byte[] responseBytes = stringBuffer.toString().getBytes("UTF-8");
this.writeResponse(ctx, keepAlive, responseBytes);
} else {
if (requestBytes.length == 0) {
throw new XxlRpcException("xxl-rpc request data empty.");
}
XxlRpcRequest xxlRpcRequest = (XxlRpcRequest)this.xxlRpcProviderFactory.getSerializerInstance().deserialize(requestBytes, XxlRpcRequest.class);
requestId = xxlRpcRequest.getRequestId();
if ("BEAT_PING_PONG".equalsIgnoreCase(xxlRpcRequest.getRequestId())) {
logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server read beat-ping.");
return;
}
//主要看这个方法,invokeService,
xxlRpcResponse = this.xxlRpcProviderFactory.invokeService(xxlRpcRequest);
responseBytes = this.xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);
this.writeResponse(ctx, keepAlive, responseBytes);
}
} catch (Exception var9) {
logger.error(var9.getMessage(), var9);
xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(requestId);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(var9));
responseBytes = this.xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);
this.writeResponse(ctx, keepAlive, responseBytes);
}
}
3.6、invokeService
只列出核心代码
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
//接收className和version
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
//从serviceDate 根据key获取bean,这个serviceData对象是3.2中初始化的,会获取到ExecutorBizImpl对象
Object serviceBean = this.serviceData.get(serviceKey);
try {
Class<?> serviceClass = serviceBean.getClass();
//获取接收到的方法名
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);
xxlRpcResponse.setResult(result);
} catch (Throwable var11) {
logger.error("xxl-rpc provider invokeService error.", var11);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(var11));
}
}
3.7、ExecutorBizImpl对象
3.8、执行调度任务的核心方法–run()方法
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
//如果是定义在本的bean对象,那么从jobHandlerRepository对象中获取newJobHandler
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
//如果是GLUE(Java),那么获取triggerParam.getGlueSource()源码
//反射实例化
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
//执行handler方法
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
3.9、registJobThread
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
//调用线程
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
查看run()方法:
//执行handler的init()方法
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
//执行jobHandler的execute方法,这里会调用methodHandler的execute()方法,里面反射调了任务逻辑的方法
executeResult = handler.execute(triggerParam.getExecutorParams());
//将执行结果,推送到callBackQueue队列中
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
二、调用中心项目
1、查看页面执行一次方法
在配置好定时任务后,点击执行一次,那么就会调用后台方法,执行一次任务方法,这个方法调用了trigger这个方法如下:
@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam) {
// force cover job param
if (executorParam == null) {
executorParam = "";
}
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam);
return ReturnT.SUCCESS;
}
2、XxlJobTrigger.trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
3、processTrigger
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
triggerResult = runExecutor(triggerParam, address);
runExecutor方法核心代码
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
getExecutorBiz
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress(address);
referenceBean.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken());
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
//返回了referenceBean.getObject方法返回的对象
executorBiz = (ExecutorBiz) referenceBean.getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
getOjbect()方法,返回了一代理对象,最终会调用代理类的invoke方法
public Object getObject() throws Exception {
//实例化了NettyHttpClient对象
this.initClient();
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{this.iface}, new InvocationHandler() {});
}
invoke方法,核心代码如下:
//发送请求到服务端
XxlRpcReferenceBean.this.clientInstance.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(XxlRpcReferenceBean.this.timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
var31 = xxlRpcResponse.getResult();
public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
ConnectClient.asyncSend(xxlRpcRequest, address, this.connectClientImpl, this.xxlRpcReferenceBean);
}
private Class<? extends ConnectClient> connectClientImpl = NettyHttpConnectClient.class;
//传入的是NettyHttpConnectClient.class
public static void asyncSend(XxlRpcRequest xxlRpcRequest, String address, Class<? extends ConnectClient> connectClientImpl, XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {
//获取一个连接
ConnectClient clientPool = getPool(address, connectClientImpl, xxlRpcReferenceBean);
try {
clientPool.send(xxlRpcRequest);
} catch (Exception var6) {
throw var6;
}
}
ConnectClient connectClient_new = (ConnectClient)connectClientImpl.newInstance();
try {
connectClient_new.init(address, xxlRpcReferenceBean.getSerializerInstance(), xxlRpcReferenceBean.getInvokerFactory());
connectClientMap.put(address, connectClient_new);
} catch (Exception var9) {
connectClient_new.close();
throw var9;
}
connectClient_new.init方法,启动了一个netty客端,发送请求到服务端
this.address = address;
URL url = new URL(address);
this.host = url.getHost();
int port = url.getPort() > -1 ? url.getPort() : 80;
this.group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)bootstrap.group(this.group)).channel(NioSocketChannel.class)).handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 30L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpClientCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpClientHandler(xxlRpcInvokerFactory, serializer, NettyHttpConnectClient.this)});
}
})).option(ChannelOption.SO_KEEPALIVE, true)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
this.channel = bootstrap.connect(this.host, port).sync().channel();
this.serializer = serializer;
三、总结
执行器项目启了一个netty服务端接收数据,根据接收参数执行不同的任务方法
调用中心项目启了一个Netty客户发送数据
GitHub 加速计划 / xx / xxl-job
27.15 K
10.79 K
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:3 个月前 )
e5d26ba2 - 3 个月前
977ad87b - 3 个月前
更多推荐
已为社区贡献3条内容
所有评论(0)