xxl-job分布式任务调度的使用
为什么我们需要定时任务
很多业务场景需要我们某一特定的时刻去做某件任务,定时任务解决的就是这种业务场景。一般来说,系统可以使用消息传递代替部分定时任务,两者有很多相似之处,可以相互替换场景。如,上面发货成功发短信通知客户的业务场景,我们可以在发货成功后发送MQ消息到队列,然后去消费mq消息,发送短信。
但在某些场景下不能互换:
a)时间驱动/事件驱动:内部系统一般可以通过时间来驱动,但涉及到外部系统,则只能使用时间驱动。如怕取外部网站价格,每小时爬一次
b)批量处理/逐条处理:批量处理堆积的数据更加高效,在不需要实时性的情况下比消息中间件更有优势。而且有的业务逻辑只能批量处理。如移动每个月结算我们的话费
c)实时性/非实时性:消息中间件能够做到实时处理数据,但是有些情况下并不需要实时,比如:vip升级
d)系统内部/系统解耦:定时任务调度一般是在系统内部,而消息中间件可用于两个系统间
如果你的项目后期部署到集群环境下,如果不做处理,就会出现意想不到的问题,原因:由于我们项目同时部署在多台集群机器上,因此到达指定的定时时间时,多台机器上的定时器可能会同时启动,造成重复数据或者程序异常等问题
java有哪些定时任务的框架
单机
- timer:是一个定时器类,通过该类可以为指定的定时任务进行配置。TimerTask类是一个定时任务类,该类实现了Runnable接口,缺点异常未检查会中止线程
- ScheduledExecutorService:相对延迟或者周期作为定时任务调度,缺点没有绝对的日期或者时间
- spring定时框架:配置简单功能较多,如果系统使用单机的话可以优先考虑spring定时器
分布
- Quartz:Java事实上的定时任务标准。但Quartz关注点在于定时任务而非数据,并无一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能
- TBSchedule:阿里早期开源的分布式任务调度系统。代码略陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重
- elastic-job:当当开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,实现任务高可用以及分片,目前是版本2.15,并且可以支持云开发
- Saturn:是唯品会自主研发的分布式的定时任务的调度平台,基于当当的elastic-job 版本1开发,并且可以很好的部署到docker容器上。
- xxl-job: 是大众点评员工徐雪里于2015年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。
elastic-job的上次更新时间是两年前, 所以我们果断使用xxl-job了.
- 下载xxl-job的源码, http://www.xuxueli.com/xxl-job/
这次下载的是2.1.0的版本
- 解压源码, 导入源码到idea中去
- 执行数据库的文件
- 打开xxl-job-admin 项目,修改配置文件,修改application.properties中的数据库的配置信息, 然后启动xxl-job-admin 项目, 访问 http://localhost:8080/xxl-job-admin 就可以了, 默认登录账号 "admin/123456"
- 把xxl-job-core 项目达成一个jar 包, 放入到一个新的springboot 暂时名字叫做max-demo项目中去, 添加依赖到pom文件中去,
<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.1.0</version>
</dependency>
- 修改max-demo的配置文件
# web port
server.port=8081
# log config
logging.config=classpath:logback.xml
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job executor address
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.ip=
xxl.job.executor.port=9999
### xxl-job, access token
xxl.job.accessToken=
### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job log retention days
xxl.job.executor.logretentiondays=-1
- 添加日志的配置文件: logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">
<contextName>logback</contextName>
<property name="log.path" value="/data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n
</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
<appender-ref ref="file"/>
</root>
</configuration>
- 添加配置文件:
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
- 添加自己的定时任务处理类
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import org.springframework.stereotype.Component;
/**
* @author zk
* @Description:
* @date 2019-10-09 9:47
*/
@Component
@JobHandler("myJobhandler2")
public class MyJobhandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
System.out.println("这是自定义的第二个任务" + param);
return ReturnT.SUCCESS;
}
}
- 在网页上新建自己的定时任务.
分别是cron 表达式, jobhandker, 就是自定义的一个jobhandler, 名字对应就可以了.
执行就是把这个任务执行一次, 启动就是直接启动这个定时任务了, 根据cron表达式去执行.
调度中心集群
调度中心支持集群部署,提升调度系统容灾和可用性。
调度中心集群部署时,几点要求和建议:
- DB配置保持一致;
- 登陆账号配置保持一致;
- 集群机器时钟保持一致(单机集群忽视);
- 建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
在原来服务的基础上新建一个启动文件, 注释掉原项目中的端口信息,才用配置文件的形式.增加token,其他不改动.
就可以启动两个调度中心了.在任意一个调度中心上新建一个任务, 另一个都会有的.
执行器集群
执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。
执行器集群部署时,几点要求和建议:
- 执行器回调地址(xxl.job.admin.addresses)需要保持一致;执行器根据该配置进行执行器自动注册等操作。
- 同一个执行器集群内AppName(xxl.job.executor.appname)需要保持一致;调度中心根据该配置动态发现不同集群的在线执行器列表。
Web 的端口号和executor的端口号都不能重复.
修改配置文件,添加多个调度中心的地址, 以及token,修改端口号,注释掉端口号
新建一个springboot 启动类,然后启动两个服务,开始执行定时任务,执行了3次任务, 都发到同一个客户端执行了, 没有问题,在关闭了一个客户端之后,再次执行,任务失败. 过了一会,再执行,任务成功, 已经自动切换到另外一台应用上了,关闭一个调度中心也是没有问题的.
更多推荐
所有评论(0)