Canal数据同步多库循环问题,修改源码解决
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
用canal做数据同步,业务上分为主子库,相互同步,这样就会出现循环同步的情况。
因为用的开源代码,最简单的办法就是修改源码。
下载最新1.1.7版本代码。
这里遇到了几个问题:
- 我的MySQL版本是8.0.3x,之前用1.1.6版本链接会提示链接错误,更换1.1.7版本解决。
- 编辑器是idea,在启动1.1.7版本代码,总是提示错误:
Could not initialize class com.sun.org.apache.xml.internal.serializer.Encodings
网上查到问题点如下:链接
解决方案,从jdk8换成了jdk17,解决了。
canal有两个入口,一个是deployer模块下的CanalLauncher,一个是client-adaptor的launcher模块下的CanalAdapterApplication,至于admin的就很简单了。
剩下的就是修改源码了。
我这边多库同步是通过的rdb进行的,这里就涉及到两个部分:
DML:数据内容的同步,这里主要针对insert和update,因为不涉及大量数据吞吐的情况,所以通过最简单的query查询,然后去比对是否相同,来判断是否再进行处理。
/**
* 插入操作
*
* @param config 配置项
* @param dml DML数据
*/
private ResultSet query(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
Map<String, Object> data = dml.getData();
DbMapping dbMapping = config.getDbMapping();
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
StringBuilder sql = new StringBuilder();
sql.append("select * FROM ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" WHERE ");
List<Map<String, ?>> values = new ArrayList<>();
// 拼接主键
appendCondition(dbMapping, sql, ctype, values, data);
logger.info("execute query sql => {}",sql);
return batchExecutor.query(sql.toString(),values);
}
insert(){
//进行校验
ResultSet resultSet = query(batchExecutor,config,dml);
boolean hasNext = resultSet.next();
resultSet.close();
if(hasNext){
logger.warn("insert data already exists,pls check!");
return;
}
......
}
update(){
ResultSet currentRes = query(batchExecutor,config,dml);
boolean updateFlag = false;
if (currentRes.next()){
for (String srcColumnName : old.keySet()) {
if(!currentRes.getString(srcColumnName).equalsIgnoreCase(data.get(srcColumnName).toString())){
updateFlag = true;
break;
}
}
}else{
logger.warn("The UPDATE data is not correct,pls check");
}
currentRes.close();
if(!updateFlag){
logger.warn("The UPDATE data not changed,no need to update");
return;
}
}
DDL:数据结构的同步
这里做了异常捕获处理,分析错误原因,判断是否继续进行操作。
然后给rdb打包,替代原来的就可以了。
GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 1 个月前
ff82fd65
1 个月前
更多推荐
已为社区贡献1条内容
所有评论(0)