用canal做数据同步,业务上分为主子库,相互同步,这样就会出现循环同步的情况。

因为用的开源代码,最简单的办法就是修改源码。

下载最新1.1.7版本代码。

这里遇到了几个问题:

  • 我的MySQL版本是8.0.3x,之前用1.1.6版本链接会提示链接错误,更换1.1.7版本解决。
  • 编辑器是idea,在启动1.1.7版本代码,总是提示错误:
    Could not initialize class com.sun.org.apache.xml.internal.serializer.Encodings

网上查到问题点如下:链接

解决方案,从jdk8换成了jdk17,解决了。

canal有两个入口,一个是deployer模块下的CanalLauncher,一个是client-adaptor的launcher模块下的CanalAdapterApplication,至于admin的就很简单了。

剩下的就是修改源码了。

我这边多库同步是通过的rdb进行的,这里就涉及到两个部分:

DML:数据内容的同步,这里主要针对insert和update,因为不涉及大量数据吞吐的情况,所以通过最简单的query查询,然后去比对是否相同,来判断是否再进行处理。



    /**
     * 插入操作
     *
     * @param config 配置项
     * @param dml DML数据
     */
    private ResultSet query(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {

        Map<String, Object> data = dml.getData();
        DbMapping dbMapping = config.getDbMapping();
        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
        StringBuilder sql = new StringBuilder();
        sql.append("select * FROM ").append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())).append(" WHERE ");

        List<Map<String, ?>> values = new ArrayList<>();
        // 拼接主键
        appendCondition(dbMapping, sql, ctype, values, data);
        logger.info("execute query sql => {}",sql);
        return batchExecutor.query(sql.toString(),values);
    }


    insert(){

        //进行校验
        ResultSet resultSet = query(batchExecutor,config,dml);
        boolean hasNext = resultSet.next();
        resultSet.close();
        if(hasNext){
            logger.warn("insert data already exists,pls check!");
            return;
        }

        ......

}


update(){

ResultSet currentRes = query(batchExecutor,config,dml);
        boolean updateFlag = false;
        if (currentRes.next()){
            for (String srcColumnName : old.keySet()) {
                if(!currentRes.getString(srcColumnName).equalsIgnoreCase(data.get(srcColumnName).toString())){
                    updateFlag = true;
                    break;
                }
            }
        }else{
            logger.warn("The UPDATE data is not correct,pls check");
        }
        currentRes.close();
        if(!updateFlag){
            logger.warn("The UPDATE data not changed,no need to update");
            return;
        }


}

DDL:数据结构的同步

这里做了异常捕获处理,分析错误原因,判断是否继续进行操作。

然后给rdb打包,替代原来的就可以了。

GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 1 个月前
ff82fd65 1 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐