xxl-job基础组件核心类解解读二:JobTriggerPoolHelper
前言:
在具体解读JobTriggerPoolHelper核心类功能原理的过程中你请大家先熟悉:ThreadPoolExecutor,volatile,ConcurrentMap,AtomicInteger等基础原理,同时请考虑下当采用集群部署时,JobTriggerPoolHelper类在程序运行中会有哪些问题?提示:volatile,AtomicInteger,ConcurrentMap。
摘要:
本次会通过一次任务的执行来解析JobTriggerPoolHelper类,会通过JobScheduleHelper类和XxlJobTrigger类来观察一次简单周期任务执行过程,在结合volatile关键字和基础类ConcurrentMap,AtomicInteger进行串联。
volatile原理:
被volatile关键字修饰的变量,编译器与运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他内存操作一起重排序。volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取volatile类型的变量时总会返回最新写入的值。
JobScheduleHelper类主要职能:任务投递和下次执行时间维护
step1: 每5秒扫描一次,条件小于(当前时间+5秒)要的任务,查询数量根据最小默认值计算结果为每次6000条(最小值)
step2: 判断为漏执行的,同时通过调用JobTriggerPoolHelper.trigger方法将任务投递到线程池中
step3:更新任务下次执行时间
JobTriggerPoolHelper类主要职能:线程池异步触发任务
解读一:线程池隔离--快慢分离
目的:部分慢执行的线程,会拖慢整个线程池,因此我们需要将快慢分离。
需要区分出哪些是慢线程,这里给一个依据是一分钟内的慢执行(耗时大于500ms)次数为10次。
注:
1.minTim 变量和jobTimeoutCountMap变量都使用volatile关键字修饰,但在集群环境下此处的计数就会不准确,当然对任务的执行并无影响,只是进来慢线程池的时间可能会延迟一些概率也就降低了
2.哪些情况下会进入慢线程池:
1.任务执行频率高每分钟执行次数要大于10次以上(jobTimeoutCountMap,存放每个任务的执行慢次数,60秒后自动清空该容器)
2.job Client端执行任务耗时较长,形成堆积 或者 网络延迟卡顿严重
3.数据库相关查询出现耗时严重,遇到cpu100%或者拿不到连接等瓶颈问题
3.添加触发任务到线程池中:核心方法,快慢线程池执行逻辑一样(不同点是队列的容量大小)
JobTriggerPoolHelper 类参考源码:
/**
* job trigger thread pool helper
*
* @author xuxueli 2018-07-03 21:08:07
*/
public class JobTriggerPoolHelper {
private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
// ---------------------- trigger pool ----------------------
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
public void stop() {
//triggerPool.shutdown();
fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
// job timeout count
private volatile long minTim = System.currentTimeMillis()/60000; // ms > min
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
/**
* add trigger
*/
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList,boolean isUnactive) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger
// 开始调度
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList,isUnactive);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
// ---------------------- helper ----------------------
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
public static void toStart() {
helper.start();
}
public static void toStop() {
helper.stop();
}
/**
* @param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam
* @param executorParam
* null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList,boolean isUnactive) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList,isUnactive);
}
}
XxlJobTrigger 类职能:执行任务触发方法
trigger方法解读:
step1.初始化任务执行过程中的相关参数,以及执行方式:路由策略和分片执行
step2.调用真正处理并调度的函数processTrigger进行任务触发
processTrigger方法解读:
step1: 竞争分布式锁,获取到锁的服务才能继续往下执行
step2:创建执行日志 + 选取执行器地址
step3:通过http client 触发任务执行 (此过程如果任务方法是同步方法切比较耗时就会形成耗时点,产生次数累计,形成任务投递到慢线程池中的条件)
参考XxlJobTrigger类源码:(其中锁竞争部分我们改为了redis和zookeeper)
/**
* xxl-job trigger
* Created by xuxueli on 17/7/13.
*/
public class XxlJobTrigger {
private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);
/**
* 默认锁类型
*/
private static final String DEFAULT_LOCK_TYPE = "redis";
/**
* trigger job
*
* @param jobId
* @param triggerType
* @param failRetryCount >=0: use this param
* <0: use param from job info config
* @param executorShardingParam
* @param executorParam null: use job param
* not null: cover job param
* @param addressList null: use executor addressList
* not null: cover
*/
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList, boolean isUnactive) {
// load data
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (isUnactive) {
jobInfo.setTriggerLastTime(System.currentTimeMillis());
}
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
int finalFailRetryCount = failRetryCount >= 0 ? failRetryCount : jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// cover addressList
if (addressList != null && addressList.trim().length() > 0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// sharding param
int[] shardingParam = null;
if (executorShardingParam != null) {
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length == 2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()
&& shardingParam == null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
private static boolean isNumeric(String str) {
try {
int result = Integer.valueOf(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
/**
* TODO 真正处理并调度的函数
*
* @param group job group, registry list may be empty
* @param jobInfo
* @param finalFailRetryCount
* @param triggerType
* @param index sharding index
* @param total sharding index
*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total) {
boolean result = false;
// 判断是否属于API调用方式
if (!TriggerTypeEnum.API.equals(jobInfo.getRealTriggerType())) {
String lockType = XxlJobAdminConfig.getAdminConfig().getLockType();
long time = System.currentTimeMillis() / 1000;
String currentTag = jobInfo.getId() + "-" + jobInfo.getExecutorParam();
if (!DEFAULT_LOCK_TYPE.equals(lockType)) {
result = ZookeeperUtils.lock("/job-id-" + currentTag, jobInfo.getJobDesc());
logger.info("xxl job 抢占zk 分布式锁 ");
} else {
String uid = UUID.randomUUID().toString();
/// RedisLockUtil.tryLock(jobInfo.getId()+"-" + time,uid,5000,0,0);
result = RedisUtils.lock(currentTag, uid, 100);
logger.info("xxl job 抢占redis 分布式锁 ");
}
if (!result) {
logger.info("xxl job 抢占分布式锁失败!锁=" + currentTag + "-" + time);
return;
}
logger.info("xxl job 抢占分布式锁成功! 锁=" + currentTag + "-" + time);
}
// param
// block strategy
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
// route strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) ? String.valueOf(index).concat("/").concat(String.valueOf(total)) : null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param
ExpandTriggerParam triggerParam = new ExpandTriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 最新执行时间(需要精确时间的可参考)
triggerParam.setTriggerLastTime(jobInfo.getTriggerLastTime());
// 3、init address
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList() != null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、collection trigger info
StringBuilder triggerMsgSb = new StringBuilder();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append((group.getAddressType() == 0) ? I18nUtil.getString("jobgroup_field_addressType_0") : I18nUtil.getString("jobgroup_field_addressType_1"));
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("(").append(shardingParam).append(")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>").append(I18nUtil.getString("jobconf_trigger_run")).append("<<<<<<<<<<< </span><br>")
.append((routeAddressResult != null && routeAddressResult.getMsg() != null) ? routeAddressResult.getMsg() + "<br><br>" : "").append(triggerResult.getMsg() != null ? triggerResult.getMsg() : "");
// 6、save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
// 特殊规则重试逻辑,触发客户端请求失败导致的重试
if (triggerResult.getCode() != ReturnT.SUCCESS_CODE && !jobInfo.getExecutorFailRetryStrategy().equals(ExecutorRetryStrategyEnum.DEFAULT_RETRY.name()) && jobLog.getExecutorFailRetryCount() > 0) {
MqMessageSender mqMessageSender = XxlJobAdminConfig.getAdminConfig().getMqMessageSender();
String regexTime = jobInfo.getExecutorFailRetryInterval();
if (jobInfo.getExecutorFailRetryStrategy().equals(ExecutorRetryStrategyEnum.REGEX_RETRY.name())) {
int idx = jobInfo.getExecutorFailRetryCount() - jobLog.getExecutorFailRetryCount();
String[] regexs = jobInfo.getExecutorFailRetryRegex().split(",");
// 剔除空串
List<String> regexList = Arrays.stream(regexs).filter(StrUtil::isNotBlank).collect(Collectors.toList());
if (index < regexList.size()) {
regexTime = regexList.get(idx);
}
}
if (!StringUtil.isNullOrEmpty(regexTime)) {
mqMessageSender.sendDeliverTime(XxlJobAdminConfig.getAdminConfig().getXxlJobMqConfig().getRetryTopic(), (jobLog.getId() + "").getBytes(), regexTime, null);
jobLog.setExecutorFailRetryCount(jobLog.getExecutorFailRetryCount() - 1);
}
}
/// jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
/// RedisLockUtil.releaseLock(jobInfo.getId()+"",uid);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
/**
* run executor
*
* @param triggerParam
* @param address
* @return
*/
public static ReturnT<String> runExecutor(ExpandTriggerParam triggerParam, String address) {
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
}
更多推荐
所有评论(0)