xxl-job学习笔记
目录
一、路由策略:(相当于执行器集群部署下的负载均衡)
执行器集群部署时,几点要求和建议:
- 执行器回调地址(xxl.job.admin.addresses)需要保持一致;执行器根据该配置进行执行器自动注册等操作。
- 同一个执行器集群内AppName(xxl.job.executor.appname)需要保持一致;调度中心根据该配置动态发现不同集群的在线执行器列表。
执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等。
1、一个枚举类里配置了不同的路由和路由实现类,一个判断匹配路由的方法:
public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
this.title = title;
this.router = router;
}
private String title;
private ExecutorRouter router;
public String getTitle() {
return title;
}
public ExecutorRouter getRouter() {
return router;
}
//由name参数匹配路由返回对应枚举值。
public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){
if (name != null) {
for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
if (item.name().equals(name)) {
return item;
}
}
}
return defaultItem;
}
}
2、一个路由抽象类规定了统一的路由接口:
public abstract class ExecutorRouter {
protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);
/**
* route address
*
* @param addressList
* @return ReturnT.content=address
*/
public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);
}
3、不同的路由实现类继承了上述抽象接口,并做了对应策略的实现:
//第一个策略实现,取机器list里的第一个。
public class ExecutorRouteFirst extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
return new ReturnT<String>(addressList.get(0));
}
}
//最后一个策略实现
public class ExecutorRouteLast extends ExecutorRouter {
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<String>(addressList.get(addressList.size()-1));
}
}
//轮询策略实现
public class ExecutorRouteRound extends ExecutorRouter {
private static ConcurrentMap<Integer, Integer> routeCountEachJob = new ConcurrentHashMap<Integer, Integer>();
private static long CACHE_VALID_TIME = 0;
private static int count(int jobId) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {//一轮轮询只在24小时内,超过重新开始。
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// count++
Integer count = routeCountEachJob.get(jobId);
count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力。不从固定的机器开始,实际也缓解了每一次的压力。如果同一个工程里开发了不同任务,集群部署,如果任务配置的调度时间一样,则每次调度这些任务都去跑同一台机器而其他机器空闲。所以不同任务的初始机器随机,然后各自挨个往下轮。
routeCountEachJob.put(jobId, count);
return count;
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(count(triggerParam.getJobId())%addressList.size());//通过每次count++出的数和总数取余实现轮询。
return new ReturnT<String>(address);
}
}
//随机策略实现:每次从总数里随机出一个整数。
public class ExecutorRouteRandom extends ExecutorRouter {
private static Random localRandom = new Random();
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT<String>(address);
}
}
其余路由策略看git上源码。
分片策略的使用:当需要集群处理一大批数据时,为了保证数据不被重复处理,土办法需要用redis分布式锁来锁定多个程序在同一时刻只能有一个应用消费数据,保证每个应用拿到不同的数据。
利用分片任务的分配总数和当前分片数巧妙实现,主要是表的id得是自增的,用该id的值对总分片数进行求余,求余后的数正好等于应用的当前分片数,巧妙的实现了该分布式任务,直接通过SQL取余获取不同数据:
sql每次从表中取100条数据:
SELECT id,name,password
FROM t_push
WHERE `status` = 0
AND mod(id,#{number}) = #{index} //number 分片总数,index当前分片数
order by id desc
LIMIT 100;
4、路由判断及调用实现类:
调用等主要实现在XxlJobTrigger类的trigger方法里。
//通过参数获得对应枚举值
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
//通过route公关接口获得地址,其中分片广播没有路由策略实现类
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
这样实现不同的策略,让代码更规整更易读。更利于优化扩展。
详细可参考https://blog.csdn.net/u012394095/article/details/79552533,版本稍有差异。
二、调度中心集群部署如何保证一次任务调度被消费一次
分布式任务调度有个很重要的点,就是怎样保证一次任务调度被消费一次?也就是怎样保证幂等性?
调度中心支持集群部署,集群情况下各节点务必连接同一个mysql实例;
如果mysql做主从,调度中心集群节点务必强制走主库;
调度中心集群部署时,几点要求和建议:
- DB配置保持一致;
- 集群机器时钟保持一致(单机集群忽视);
- 建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
xxl-job 使用数据库悲观锁的形式,来保证任务调度的幂等性。如下SQL
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
select ...for update对满足条件的数据进行了加锁,本次事务提交之前(事务提交时会释放事务过程中的锁),外界无法修改这些记录。在此处代码处就是该sql不能再次执行,相当于加锁。
也就是通过该数据库悲观锁,让集群部署的调度中心不会同时去处理调度。再通过其他代码细节和时间算法保证任务的处理。
使用该SQL的源码:
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
详细代码分析可参考:https://blog.csdn.net/yuanshangshenghuo/article/details/98110509
三、系统远程调用思想
常见的dubbo类的本地注入代理对象直接调用接口,生成的代理对象里面的 InvocationHandler 中的invoke方法,并没有真实的执行方法,而是将类名,方法名,参数等信息发送给远程服务器,由远程服务器去执行。
旧版xxljob可参考https://blog.csdn.net/u012394095/article/details/79626846,新版2.1.0的已经升级改造。此处RPC框架用的netty。
2.1.0XxlJobTrigger类里的runExecutor方法入口:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
…………
return runResult;
}
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress(address);
referenceBean.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken());
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
executorBiz = (ExecutorBiz) referenceBean.getObject();//注意去看此getObject方法实现
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
详细看git源码。
更多详细分析参考https://blog.csdn.net/u012394095/category_9279585.html
https://blog.csdn.net/yuanshangshenghuo/category_9293338.html
四、代码在线编辑,动态执行。
1、多种类型代码在线编辑:
xxl-job中运行模式用到了在线编辑执行器代码动态执行,且支持多种脚本语言:
其中前端用到了FreeMarker,可参考使用。
2、后端动态执行:
具体编译Java源码为class文件的代码实现:
package com.xxl.job.core.glue;
import com.xxl.job.core.glue.impl.SpringGlueFactory;
import com.xxl.job.core.handler.IJobHandler;
import groovy.lang.GroovyClassLoader;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* glue factory, product class/object by name
*
* @author xuxueli 2016-1-2 20:02:27
*/
public class GlueFactory {
private static GlueFactory glueFactory = new GlueFactory();
public static GlueFactory getInstance(){
return glueFactory;
}
public static void refreshInstance(int type){
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
glueFactory = new SpringGlueFactory();
}
}
/**
* groovy class loader
*/
private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
private ConcurrentMap<String, Class<?>> CLASS_CACHE = new ConcurrentHashMap<>();
/**
* load new instance, prototype
*
* @param codeSource
* @return
* @throws Exception
*/
public IJobHandler loadNewInstance(String codeSource) throws Exception{
if (codeSource!=null && codeSource.trim().length()>0) {
Class<?> clazz = getCodeSourceClass(codeSource);
if (clazz != null) {
Object instance = clazz.newInstance();
if (instance!=null) {
if (instance instanceof IJobHandler) {
this.injectService(instance);
return (IJobHandler) instance;
} else {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
+ "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
}
}
}
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
}
private Class<?> getCodeSourceClass(String codeSource){
try {
// md5
byte[] md5 = MessageDigest.getInstance("MD5").digest(codeSource.getBytes());
String md5Str = new BigInteger(1, md5).toString(16);
Class<?> clazz = CLASS_CACHE.get(md5Str);
if(clazz == null){
clazz = groovyClassLoader.parseClass(codeSource);
CLASS_CACHE.putIfAbsent(md5Str, clazz);
}
return clazz;
} catch (Exception e) {
return groovyClassLoader.parseClass(codeSource);
}
}
/**
* inject service of bean field
*
* @param instance
*/
public void injectService(Object instance) {
// do something
}
}
其他脚本语言的:
具体脚本文件处理和执行参见:(直接去看源码)
package com.xxl.job.core.handler.impl;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ScriptUtil;
import com.xxl.job.core.util.ShardingUtil;
import java.io.File;
/**
* Created by xuxueli on 17/4/27.
*/
public class ScriptJobHandler extends IJobHandler {
private int jobId;
private long glueUpdatetime;
private String gluesource;
private GlueTypeEnum glueType;
public ScriptJobHandler(int jobId, long glueUpdatetime, String gluesource, GlueTypeEnum glueType){
this.jobId = jobId;
this.glueUpdatetime = glueUpdatetime;
this.gluesource = gluesource;
this.glueType = glueType;
// clean old script file
File glueSrcPath = new File(XxlJobFileAppender.getGlueSrcPath());
if (glueSrcPath.exists()) {
File[] glueSrcFileList = glueSrcPath.listFiles();
if (glueSrcFileList!=null && glueSrcFileList.length>0) {
for (File glueSrcFileItem : glueSrcFileList) {
if (glueSrcFileItem.getName().startsWith(String.valueOf(jobId)+"_")) {
glueSrcFileItem.delete();
}
}
}
}
}
public long getGlueUpdatetime() {
return glueUpdatetime;
}
@Override
public ReturnT<String> execute(String param) throws Exception {
if (!glueType.isScript()) {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "glueType["+ glueType +"] invalid.");
}
// cmd
String cmd = glueType.getCmd();
// make script file
String scriptFileName = XxlJobFileAppender.getGlueSrcPath()
.concat(File.separator)
.concat(String.valueOf(jobId))
.concat("_")
.concat(String.valueOf(glueUpdatetime))
.concat(glueType.getSuffix());
File scriptFile = new File(scriptFileName);
if (!scriptFile.exists()) {
ScriptUtil.markScriptFile(scriptFileName, gluesource);
}
// log file
String logFileName = XxlJobFileAppender.contextHolder.get();
// script params:0=param、1=分片序号、2=分片总数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
String[] scriptParams = new String[3];
scriptParams[0] = param;
scriptParams[1] = String.valueOf(shardingVO.getIndex());
scriptParams[2] = String.valueOf(shardingVO.getTotal());
// invoke
XxlJobLogger.log("----------- script file:"+ scriptFileName +" -----------");
int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName, scriptParams);
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "script exit value("+exitValue+") is failed");
}
}
}
五、使用注意点及常见问题:
1、
任务终止时通过 "interrupt" 执行线程的方式实现, 将会触发 "InterruptedException" 异常。因此如果JobHandler内部catch到了该异常并消化掉的话, 任务终止功能将不可用。
因此, 如果遇到上述任务终止不可用的情况, 需要在JobHandler中应该针对 "InterruptedException" 异常进行特殊处理 (向上抛出) , 正确逻辑如下:
```
try{
// do something
} catch (Exception e) {
if (e instanceof InterruptedException) {
throw e;
}
logger.warn("{}", e);
}
```
而且,在JobHandler中开启子线程时,子线程也不可catch处理"InterruptedException",应该主动向上抛出。
任务终止时会执行对应JobHandler的"destroy()"方法,可以借助该方法处理一些资源回收的逻辑。
2、源码中的md官方文档## 五、总体设计
这部分有一些xxl-job的设计思路、复杂情况的设计说明及操作建议等。包括:
5.14 执行器灰度上线、
5.20 避免任务重复执行。针对调度密集或者耗时任务的。
3、当执行器服务有多个,且部署在同一台服务器的时候,注意执行器的端口不要配置成一样的!!!且不同服务的执行器appname务必唯一!!!如下图位置的端口。
执行器会通过jobadmin地址将自己的ip和端口写到admin_qrtz_trigger_registry表里。jobadmin会通过该ip和端口去调用执行器。如果两个服务中的执行器端口配成一样的,且部署在同一台服务器上,则会有一个执行器无法注册到jobadmin,则无法调度。
有问题可排查一下这两张表里的数据:(下图中的数据是一个执行器服务集群部署了两台的数据结果。)
xxl开源社区(还有一些其他开源项目):https://www.xuxueli.com/xxl-job/
https://www.cnblogs.com/xuxueli/p/5021979.html
更多推荐
所有评论(0)