canal使用了Rocketmq来接收mysql采集的binlog的事件,做到采集和处理的解耦。同时满足一次采集多方消费的需求。那么既然使用到Rocketmq就一定会存在MQ消费超时或是处理失败MQ重发的问题。

那么canal是如何处理MQ重复消费幂等性问题的呢

一般,在业务上我们都会为每个消息生成一个uuid来标记这条消息的唯一性。在消费时业务表增加uuid字段或是MQ唯一表来判断是否已经处理过这条消息,如果消费过了就直接回给MQ ack。
但我们定义的t_user表中并没有用于检查唯一性的uuid字段。那canal是如何做的呢。
首先从canal接收RocketMQ的代码开始分析。
canal正对消费方做了不同的adapter实现,例:RdbAdapter、ESAdapter、HbaseAdapter
我们使用的mysql数据库,直接分析RdbAdapter

    /**
     * 同步方法
     *
     * @param dmls 数据包
     */
    @Override
    public void sync(List<Dml> dmls) {
        if (dmls == null || dmls.isEmpty()) {
            return;
        }
        try {
            //rdb同步服务
            rdbSyncService.sync(mappingConfigCache, dmls, envProperties);
            rdbMirrorDbSyncService.sync(dmls);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

RdbSyncService

    //dmlsPartition这里不展开分析,下文做分析
    futures.add(executorThreads[i].submit(() -> {
        try {
            //通过多线程并行执行dmlsPartition的里dml
            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
                syncItem.config,
                syncItem.singleDml));
            dmlsPartition[j].clear();
            batchExecutors[j].commit();
            return true;
        } catch (Throwable e) {
            dmlsPartition[j].clear();
            batchExecutors[j].rollback();
            throw new RuntimeException(e);
        }
    }));
    /**
     * 单条 dml 同步
     *
     * @param batchExecutor 批量事务执行器
     * @param config 对应配置对象
     * @param dml DML
     */
    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
        if (config != null) {
            try {
                String type = dml.getType();
                if (type != null && type.equalsIgnoreCase("INSERT")) {
                    //直接分析insert
                    insert(batchExecutor, config, dml);
                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
                    update(batchExecutor, config, dml);
                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                    delete(batchExecutor, config, dml);
                } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
                    truncate(batchExecutor, config);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

canal在insert时,出现主键冲突时走了SQLException。skipDupException默认是=true,直接忽略了这个异常

    /**
     * 插入操作
     *
     * @param config 配置项
     * @param dml DML数据
     */
    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) {
            return;
        }

        DbMapping dbMapping = config.getDbMapping();

        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

        StringBuilder insertSql = new StringBuilder();
        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");

        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`")
            .append(targetColumnName)
            .append("`")
            .append(","));
        int len = insertSql.length();
        insertSql.delete(len - 1, len).append(") VALUES (");
        int mapLen = columnsMap.size();
        for (int i = 0; i < mapLen; i++) {
            insertSql.append("?,");
        }
        len = insertSql.length();
        insertSql.delete(len - 1, len).append(")");

        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

        List<Map<String, ?>> values = new ArrayList<>();
        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = Util.cleanColumn(targetColumnName);
            }

            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
            if (type == null) {
                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
            }
            Object value = data.get(srcColumnName);
            BatchExecutor.setValue(values, type, value);
        }

        try {
            batchExecutor.execute(insertSql.toString(), values);
        } catch (SQLException e) {
            if (skipDupException
                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) {
                // ignore
                // TODO 增加更多关系数据库的主键冲突的错误码
            } else {
                throw e;
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Insert into target table, sql: {}", insertSql);
        }

    }

结论

canal在处理MQ重复消费insert事件时,使用的是忽略的方式。当数据库存在这条数据时,数据库会返回Duplicate entry告诉canal这条数据已经在数据库里了。canal直接回复MQ ack就行了。

扩展: insert的批量插入

在源数据库中执行一条批量插入的sql,canal是怎么进行同步的。

insert into t_user (username,password,create_time,sex)
values ('1','1','2020-10-10',1) , ('1','1','2020-10-10',1);

回到canal的RdbAdapter的批量同步方法

    /**
     * 批量同步
     *
     * @param mappingConfig 配置集合
     * @param dmls 批量 DML
     */
    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
        sync(dmls, dml -> {
            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                // DDL
            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
            return false;
        } else {
            // DML
            ......

            for (MappingConfig config : configMap.values()) {
                boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
                if (config.getConcurrent()) {
                    //将批量的多个values数据转换成了一条条单个的insert
                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                    singleDmls.forEach(singleDml -> {
                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
                        SyncItem syncItem = new SyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);
                    });
                } else {
                    int hash = 0;
                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                    singleDmls.forEach(singleDml -> {
                        SyncItem syncItem = new SyncItem(config, singleDml);
                        dmlsPartition[hash].add(syncItem);
                    });
                }
            }
            return true;
        }
    }   );
    }

在这里插入图片描述

继续分析dmlsPartition的作用

关键代码

    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
    singleDmls.forEach(singleDml -> {
        int hash = pkHash(config.getDbMapping(), singleDml.getData());
        SyncItem syncItem = new SyncItem(config, singleDml);
        dmlsPartition[hash].add(syncItem);
    });

canal将单条批量insert的sql,转换成了多条单个的insert。并将每条的主键pk和处理线程数threads做hash(pk % threads)放入不通的分区,多线程执行提高canal的处理能力。

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐