Canal学习(1)Canal环境搭建以及多数据库配置和Java代码整合Canal
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
Canal环境搭建以及多数据库配置和Java代码整合Canal
1、Canal基本了解
Canal是阿里巴巴出的一个基于binlog增量订阅的一个组件。主要是做mysql数据库增量或全量同步的。
1、github地址
https://github.com/alibaba/canal
2、官方的快速使用
https://github.com/alibaba/canal/wiki/QuickStart
2、环境搭建
1、安装mysql
地址:
https://blog.csdn.net/leeue/article/details/103920335
2、开启binlog写入功能。
配置binglog-format为row模式,在my.cnf中配置
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复,这个server_id在后面配置canal会用上
3、授权,可以不授权,可以用root账号做个demo
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
4、启动
下载canal,这个是一个开箱即用的。
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
进入 vi conf/example/instance.properties 修改一些参数
## mysql serverId 这个修改成你自己配置的数据库的server_id
## show variables like '%server_id%'; 注意配置的监听要查看
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息 必须要配置
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息 必须要配置
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
# mq config 如果你用默认的队列,这里不需要配置的
canal.mq.topic=example_topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
然后就是进入bin目录
./start.sh 就启动了
3、Canal自定义数据库和多数据库配置
1、首先把以前所有生成的log里面都清空
2、删除example文件夹下,meta.dat文件,这个文件是记录上次同步的位置信息。
如果不删除,会报一些错误,就是id不对。什么的。
3、复制example文件夹
我这里想同时监听,demo01数据源,demo02数据源,和demo03数据源。注意这里说的是数据源,不是某个数据库。
4、修改canal.properties文件
vi canal.properties
其他配置不变
## 如果你这样配置了,说明是指定了
canal.destinations = demo01,demo02,demo03
## 如果你这样配置了,说明是动态扫描的,会自动扫描该文件统计目录下你配置的一些数据源。文件夹名称就是destinations名称。
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
5、再重新启动就行了。
5、java代码整合canal,只做参考
1、pom.xml
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
2、CanalClient
package com.carelinker.datax.biz.utils;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
/**
* @author liyue
* @date 2020-04-13 16:28
*/
@Slf4j
@Component
public class CanalClient {
private static String SERVER_ADDRESS = "127.0.0.1";
/**
* canal server 端口号
*/
private static Integer PORT = 11111;
/**
* DESTINATION 你配置的监听文件名称 这个是你canal下面的 文件夹的名称。如果你没写名称的话
*/
private static String DESTINATION = "demo01";
/**
* canal 用户名
*/
private static String USERNAME = "";
/**
* canal 密码
*/
private static String PASSWORD = "";
private static CanalConnector canalConnector;
private static CanalClient canalClient;
@PostConstruct
public void init() {
canalClient = this;
}
public static CanalConnector getConnect() {
if (canalConnector == null) {
canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(SERVER_ADDRESS,
PORT), DESTINATION, USERNAME, PASSWORD);
canalConnector.connect();
}
System.out.println(canalConnector.checkValid());
return canalConnector;
}
public static void disconnect() {
canalConnector.disconnect();
canalConnector = null;
}
public static void subscribe(String filter) {
//订阅 所有的变化都会获取
//canalConnector.subscribe(".*\\..*");
//canalConnector.subscribe("tpdata\\\\.bi.*");
// canalConnector.subscribe("tpdata.bi_base_data_age_group");
canalConnector.subscribe(filter);
//回到以前状态,回到之前同步的位置
canalConnector.rollback();
}
}
```java
##### 2、解析
````json
package com.carelinker.datax.biz.job;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.carelinker.datax.biz.service.ICanalSubscribeTableService;
import com.carelinker.datax.biz.utils.CanalClient;
import com.carelinker.datax.biz.utils.ParseMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.List;
/**
* @author liyue
* @date 2020-04-20 18:11
*/
@Slf4j
@Configuration
@EnableScheduling
public class CanalJob {
private Boolean connectStatus = false;
private CanalConnector canalConnector;
private int subscribeTableCount = 0;
@Resource
private ParseMessage parseMessage;
@Resource
private ICanalSubscribeTableService canalSubscribeTableService;
@Scheduled(cron = "0/5 * * * * ?")
private void canalJob() {
//1、建立连接
this.getCanalConnect();
//2.获取需要订阅的表
this.canalSubscribeTable();
//3.获取消息
Message message = this.getMessage();
//4.解析消息
this.parseMessage(message);
}
private void getCanalConnect() {
if (!connectStatus) {
// //获取指定数量的数据,但是不做确认,下次取还是会取到
this.canalConnector = CanalClient.getConnect();
this.connectStatus = true;
}
}
private void canalSubscribeTable() {
List<String> canalSubscribeTableList = canalSubscribeTableService.listTableName();
String join = StringUtils.join(canalSubscribeTableList.toArray(), ",");
if (CollectionUtil.isNotEmpty(canalSubscribeTableList) && canalSubscribeTableList.size() != subscribeTableCount) {
this.canalConnector.subscribe(join);
this.subscribeTableCount = canalSubscribeTableList.size();
}
}
private Message getMessage() {
Message message;
try {
message = canalConnector.getWithoutAck(5);
} catch (Exception e) {
e.printStackTrace();
log.error("canal估计是连接挂了", e);
// 尝试重新获取连接
CanalClient.disconnect();
this.connectStatus = false;
this.getCanalConnect();
message = canalConnector.getWithoutAck(5);
}
return message;
}
private void parseMessage(Message message) {
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId < 0 || size == 0) {
return;
}
if (batchId != -1) {
try {
//传入变化数据
parseMessage.printEntity(message.getEntries());
//提交确认
canalConnector.ack(batchId);
} catch (Exception e) {
//处理失败 回滚数据
canalConnector.rollback();
}
}
}
}
3、插入数据库,监听状态操作
package com.carelinker.datax.biz.utils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.carelinker.canal.api.enums.EntityEnums;
import com.carelinker.datax.biz.handle.ContentStrategy;
import com.carelinker.datax.biz.service.ICanalSubscribeTableService;
import com.carelinker.datax.biz.strategy.BiBaseDataAgeGroupStrategy;
import com.carelinker.datax.biz.strategy.BiBaseDataStoreMemberStrategy;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @author liyue
* @date 2020-04-26 10:57
*/
@Slf4j
@Component
public class ParseMessage {
@Resource(name = "biBaseDataAgeGroupStrategy")
private BiBaseDataAgeGroupStrategy biBaseDataAgeGroupStrategy;
@Resource(name = "biBaseDataStoreMemberStrategy")
private BiBaseDataStoreMemberStrategy biBaseDataStoreMemberStrategy;
@Resource
private ICanalSubscribeTableService canalSubscribeTableService;
/**
* 接受变化的数据
*
* @param entryList
*/
public void printEntity(List<CanalEntry.Entry> entryList) {
for (CanalEntry.Entry entry : entryList) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
if (!entry.getHeader().getSchemaName().equals("tpdata")) {
continue;
}
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
List<String> canalSubscribeTableList = canalSubscribeTableService.listTableName();
if (!canalSubscribeTableList.contains(entry.getHeader().getSchemaName() + "." + tableName)) {
continue;
}
log.info("表名为{}有数据开始进来了" + tableName);
ContentStrategy contentStrategy;
switch (Enum.valueOf(EntityEnums.class, tableName.toUpperCase())) {
case BI_BASE_DATA_AGE_GROUP:
contentStrategy = new ContentStrategy(biBaseDataAgeGroupStrategy);
contentStrategy.excute(rowChange);
break;
case BI_BASE_DATA_STORE_MEMBER:
contentStrategy = new ContentStrategy(biBaseDataStoreMemberStrategy);
contentStrategy.excute(rowChange);
break;
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
}
4、TypeConvertHandler
package com.carelinker.datax.biz.utils;
import com.carelinker.common.utils.NumberUtils;
import org.apache.commons.lang.StringUtils;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class TypeConvertHandler {
public static final Map<Class, String> BEAN_FIELD_TYPE;
static {
BEAN_FIELD_TYPE = new HashMap<>(8);
BEAN_FIELD_TYPE.put(Integer.class, "Integer");
BEAN_FIELD_TYPE.put(Long.class, "Long");
BEAN_FIELD_TYPE.put(Double.class, "Double");
BEAN_FIELD_TYPE.put(String.class, "String");
BEAN_FIELD_TYPE.put(Date.class, "java.util.Date");
BEAN_FIELD_TYPE.put(java.sql.Date.class, "java.sql.Date");
BEAN_FIELD_TYPE.put(java.sql.Timestamp.class, "java.sql.Timestamp");
BEAN_FIELD_TYPE.put(java.sql.Time.class, "java.sql.Time");
BEAN_FIELD_TYPE.put(LocalDateTime.class, "java.time.LocalDateTime");
BEAN_FIELD_TYPE.put(BigDecimal.class, "java.math.BigDecimal");
}
protected static final Integer parseToInteger(String source) {
if (isSourceNull(source)) {
return null;
}
return Integer.valueOf(source);
}
protected static final Long parseToLong(String source) {
if (isSourceNull(source)) {
return null;
}
return Long.valueOf(source);
}
protected static final Double parseToDouble(String source) {
if (isSourceNull(source)) {
return null;
}
return Double.valueOf(source);
}
protected static final Date parseToDate(String source) {
if (isSourceNull(source)) {
return null;
}
if (source.length() == 10) {
source = source + "00:00:00";
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = null;
try {
date = sdf.parse(source);
} catch (ParseException e) {
return null;
}
return date;
}
protected static final java.sql.Date parseToSqlDate(String source) {
if (isSourceNull(source)) {
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
java.sql.Date sqlDate;
Date date;
try {
date = sdf.parse(source);
} catch (ParseException e) {
return null;
}
sqlDate = new java.sql.Date(date.getTime());
return sqlDate;
}
protected static final java.sql.Timestamp parseToTimestamp(String source) {
if (isSourceNull(source)) {
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date;
java.sql.Timestamp timestamp;
try {
date = sdf.parse(source);
} catch (ParseException e) {
return null;
}
timestamp = new java.sql.Timestamp(date.getTime());
return timestamp;
}
protected static final java.sql.Time parseToTime(String source) {
if (isSourceNull(source)) {
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
Date date;
java.sql.Time time;
try {
date = sdf.parse(source);
} catch (ParseException e) {
return null;
}
time = new java.sql.Time(date.getTime());
return time;
}
public static LocalDateTime parseStringToDateTime(String time) {
if (StringUtils.isEmpty(time)) {
return null;
}
return LocalDateTime.parse(time, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
public static BigDecimal parseStringToBigDecimal(String source) {
if (StringUtils.isBlank(source)) {
return null;
}
return NumberUtils.mul(source, "1");
}
private static boolean isSourceNull(String source) {
if (source == "" || source == null) {
return true;
}
return false;
}
}
5、CanalDataHandler.java
package com.carelinker.datax.biz.utils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CanalDataHandler extends TypeConvertHandler {
public static <T> T ConvertToBean(List<CanalEntry.Column> columnList, Class<T> clazz) {
T bean = null;
try {
bean = clazz.newInstance();
Field[] fields = clazz.getDeclaredFields();
Field.setAccessible(fields, true);
Map<String, Field> fieldMap = new HashMap<>(fields.length);
for (Field field : fields) {
if (field.getName().equals("storeDeptId")) {
fieldMap.put("drug_store_id", field);
} else if (field.getName().equals("updateTime")) {
fieldMap.put("update_date", field);
} else if (field.getName().equals("createTime")) {
fieldMap.put("create_date", field);
} else {
fieldMap.put(camelToUnderline(field.getName(), 0), field);
}
}
if (fieldMap.containsKey("serialVersionUID")) {
fieldMap.remove("serialVersionUID".toLowerCase());
}
for (CanalEntry.Column column : columnList) {
String columnName = column.getName();
String columnValue = column.getValue();
if (fieldMap.containsKey(columnName)) {
Field field = fieldMap.get(columnName);
Class<?> type = field.getType();
if (BEAN_FIELD_TYPE.containsKey(type)) {
switch (BEAN_FIELD_TYPE.get(type)) {
case "Integer":
field.set(bean, parseToInteger(columnValue));
break;
case "Long":
field.set(bean, parseToLong(columnValue));
break;
case "Double":
field.set(bean, parseToDouble(columnValue));
break;
case "String":
field.set(bean, columnValue);
break;
case "java.util.Date":
field.set(bean, parseToDate(columnValue));
break;
case "java.sql.Date":
field.set(bean, parseToSqlDate(columnValue));
break;
case "java.sql.Timestamp":
field.set(bean, parseToTimestamp(columnValue));
break;
case "java.sql.Time":
field.set(bean, parseToTime(columnValue));
break;
case "java.time.LocalDateTime":
field.set(bean, parseStringToDateTime(columnValue));
break;
case "java.math.BigDecimal":
field.set(bean, parseStringToBigDecimal(columnValue));
break;
default:
break;
}
} else {
}
}
}
} catch (InstantiationException | IllegalAccessException e) {
System.err.println("无法转换对象");
}
return bean;
}
//驼峰转下划线
public static String camelToUnderline(String param, Integer charType) {
char UNDERLINE = '_';
if (param == null || "".equals(param.trim())) {
return "";
}
int len = param.length();
StringBuilder sb = new StringBuilder(len);
for (int i = 0; i < len; i++) {
char c = param.charAt(i);
if (Character.isUpperCase(c)) {
sb.append(UNDERLINE);
}
if (charType == 2) {
//统一都转大写
sb.append(Character.toUpperCase(c));
} else {
//统一都转小写
sb.append(Character.toLowerCase(c));
}
}
return sb.toString();
}
}
GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65
2 个月前
更多推荐
已为社区贡献2条内容
所有评论(0)