基于SpringBoot集成DATAX、XXL-JOB实现离线数据定时同步实践
xxl-job
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
项目地址:https://gitcode.com/gh_mirrors/xx/xxl-job
免费下载资源
·
在很多实际应用中,需要实现离线数据定时同步,比如生产库和运营库分离之后,在运营库实现离线数据对账。
本文介绍一种实现方式,完成基于SpringBoot集成DATAX、XXL-JOB实现离线数据定时同步。
一、集成XXL-JOB
集成XXL-JOB,需要添加xxl-job-admin、xxl-job-core模块。在主工程配置中添加:
<modules>
<module>xxl-job-admin</module>
<module>xxl-job-core</module>
</modules>
二、集成DATAX
在主工程配置中添加
<modules>
<module>datax-integration</module>
</modules>
添加相关依赖,主工程配置最终版本如下:
<dependencies>
<!--JSON处理-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.18</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<!-- slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.28</version>
</dependency>
</dependencies>
三、在数据同步服务datax-integration工程中,添加数据同步的相关组件依赖
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/webapp/WEB-INF/lib/datax-common-0.0.1-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/webapp/WEB-INF/lib/datax-core-0.0.1-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/webapp/WEB-INF/lib/mysqlreader-0.0.1-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/webapp/WEB-INF/lib/mysqlwriter-0.0.1-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/webapp/WEB-INF/lib/plugin-rdbms-util-0.0.1-SNAPSHOT.jar</systemPath>
</dependency>
<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
四、在数据同步服务datax-integration工程中,添加分布式定时调度注册配置信息,这样服务启动时会注册到xxl-job
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
五、编写数据同步配置文件
本文为验证实际结果,分别建立两个库datax1、datax2;datax1建立表test,预先录制相应数据;datax2建立表test,为空;
编写datax配置文件如下:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "****",
"column": [
"orderId",
"status",
"settle"
],
"splitPk": "id",
"connection": [
{
"table": [
"test"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/datax1?useUnicode=true&characterEncoding=utf-8"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "****",
"column": [
"orderId",
"status",
"settle"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/datax2?useUnicode=true&characterEncoding=utf-8",
"table": [
"test"
]
}
]
}
}
}
]
}
}
六、编写数据同步任务代码
七、启动分布式调度任务管理平台
八、根据datax-integration工程配置信息,新增执行器
九、在任务管理栏目,选择刚刚新建的执行器,新增定时任务
在JobHandler参数中,填写之前在第六步编写代码的任务信息,各参数填写好后,保存。
十、执行一次任务,检验数据同步结果
验证结果,发现数据已由生产库同步到运营库。
GitHub 加速计划 / xx / xxl-job
27.15 K
10.79 K
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:3 个月前 )
e5d26ba2 - 3 个月前
977ad87b - 3 个月前
更多推荐
已为社区贡献1条内容
所有评论(0)