目录

Canal的工作原理:

准备工作

开启mysql的日志功能

Docker安装Canal,连接Mysql

有关表结构变化的配置

​编辑

SpringBoot整合Canal(同步MySQL到Redis)

springboot第二种整合方式 

canal的正则表达式

消息投递到RocketMQ(监听MySQL变化,投递MQ)

配置文件介绍

canal.properties配置文件介绍

instance.properties参数列表


Canal的工作原理:

  1. canal模拟mysql slave与mysql master的交互协议,伪装自己是一个mysql slave,向mysql master发送dump协议;
  2. mysql master收到mysql slave(canal)发送的dump请求,开始推送binlog增量日志给slave(也就是canal)
  3. mysql slave(canal伪装的)收到binlog增量日志后,就可以对这部分日志进行解析,获取主库的结构及数据变更;

准备工作

        没有安装Docker的点击这里安装

        没有安装MySQL的点击这里安装

开启mysql的日志功能

   注意:我这里是docker 启动的mysql

//随便一个目录创建 canal.cnf 配置文件,在该文件编辑一下信息---------------------
[mysqld]
log_bin
server_id=1
binlog-format=ROW

//保存后复制到mysql的/etc/mysql/conf.d 文件夹下
docker cp canal.cnf mysql:/etc/mysql/conf.d/

//重启MySQL
docker restart mysql

Docker安装Canal,连接Mysql

//拉取Canal
docker pull canal/canal-server:v1.1.4

//在root目录下创建canal的工作目录
mkdir /root/canal

//运行canal,canal启动后canal会生产example.log(日志)文件
//这里你要是想链接canal-admin(canal的web页面试图画工具)的话多映射俩端口
//分别是 -p 11110:11110 -p 11112:11112
docker run --name canal-server -p 11111:11111 \
-v /root/canal/log:/home/admin/canal-server/logs/example \
-d canal/canal-server:v1.1.4

//将instance.properties(配置文件)文件拷贝到canal的工作目录下
docker cp canal-server:/home/admin/canal-server/conf/example/instance.properties /root/canal/

//---------------然后修改instance.properties中的一些配置,修改以下部分即可------------------
#这里随边写千万不要和mysql的重复
canal.instance.mysql.slaveId=12
#你mysql数据库的地址(不要写localhost和127.0.0.1,写实际IP)
canal.instance.master.address=192.168.11.110:3306
#数据库的用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

//修改完成后,在instance.properties目录下,将该文件复制回去
docker cp instance.properties canal-server:/home/admin/canal-server/conf/example/instance.properties

//然后后重启canal
docker restart canal-server

//上面工作都完成后,查看启动日志是否又报错,没报错在往下进行
/**刚才我们使用-v /root/canal/log:/home/admin/canal-server/logs/example,命令挂载了日志文件,此时/root/canal/log目录下就会有一个叫example.log的日志文件,查看该日志中是否有报错**/



有关表结构变化的配置

        下面是有关表结构变化的配置,也是在 instance.properties 文件中,可选配置,不配置也行但是当你的表结构有所变化的时候需要你手动去同步结构变化。

//tsdb这几项是为预防表结构发生变化从而需要配置的
canal.instance.tsdb.enable=true
//这里的数据源连接信息是你存放下面两张表的数据库
canal.instance.tsdb.url=jdbc:mysql://192.168.11.110:3306/canal
canal.instance.tsdb.dbUsername=root
canal.instance.tsdb.dbPassword=123456
//固定写法
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

/**创建数据库名字自定义,在该数据库下创建这两张表,这两张表记录了你的表结构变化。
CREATE TABLE IF NOT EXISTS `meta_snapshot` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `gmt_create` datetime NOT NULL COMMENT '创建时间',
  `gmt_modified` datetime NOT NULL COMMENT '修改时间',
  `destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
  `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
  `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
  `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
  `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
  `data` longtext DEFAULT NULL COMMENT '表结构数据',
  `extra` text DEFAULT NULL COMMENT '额外的扩展信息',
  PRIMARY KEY (`id`),
  UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  KEY `destination` (`destination`),
  KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  KEY `gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='表结构记录表快照表';

CREATE TABLE IF NOT EXISTS `meta_history` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `gmt_create` datetime NOT NULL COMMENT '创建时间',
  `gmt_modified` datetime NOT NULL COMMENT '修改时间',
  `destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
  `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
  `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
  `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
  `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
  `use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema',
  `sql_schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
  `sql_table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
  `sql_text` longtext DEFAULT NULL COMMENT '执行的sql',
  `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
  `extra` text DEFAULT NULL COMMENT '额外的扩展信息',
  PRIMARY KEY (`id`),
  UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  KEY `destination` (`destination`),
  KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  KEY `gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='表结构变化明细表';
**/

修改后的案例: 

        假如说我们表结构哪几项配置了的话,其后修改表结构的SQL都会存放到meta_history这个表中,sql_text这一列 ,我们只需要监听这个表就看可以获取修改表结构的SQL。

注意有坑:

Canal不能通过Nginx等这样的代理服务器同步MySQL的数据。

Canal通过代理服务器访问MySQL会出现Canal获取不到MySQL的binlog日志的问题。

SpringBoot整合Canal(同步MySQL到Redis)

        pom依赖

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

        yml配置

canal:
  server: 192.168.11.110:11111  #你canal的地址
  destination: example         

destination:跟配置文件中的这里写相同

        案例

package com.zx.baibaoxiangfront.config;

import com.zx.baibaoxiangfront.front.entity.UserTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

/**
 * 实现该接口EntryHandler,泛型是监听表的实体类
 * 监听者user_test表的变化
 */
@Component
@CanalTable("user_test")
public class CanalConfig implements EntryHandler<UserTest> {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    //user_test有新增就会调用此方法,获取到新增的数据
    @Override
    public void insert(UserTest userTest) {
        stringRedisTemplate.opsForValue().set("userTest", userTest.getName());
        EntryHandler.super.insert(userTest);
    }

    //修改
    @Override
    public void update(UserTest before, UserTest after) {
        EntryHandler.super.update(before, after);
    }

    //删除
    @Override
    public void delete(UserTest userTest) {
        EntryHandler.super.delete(userTest);
    }
}

springboot第二种整合方式 

        上面那种整合方式虽然较为简单整洁,但是每监听一个表就要实现一个监听器,较为繁琐,所以介绍第二种整合方式。

 pom依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

java代码 


@Component
public class CanalListener{
    

    @PostConstruct
    public void run() throws Exception {
        CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", username, password);
        while (true) {
            conn.connect();
            //订阅实例中所有的数据库和表
            /*
             这里注意下:
               conn.subscribe(".*\\..*"); 会导致服务端的canal.instance.filter.regex=.*\\..* 失效。
               更严重的是canal会一直向你的example.log日志文件写入日志,
               测了一下大概12小时会写入20M大小的日志。
            */
            //conn.subscribe(".*\\..*");
            // 回滚到未进行ack的地方
            conn.rollback();
            // 获取数据 每次获取一百条改变数据
            Message message = conn.getWithoutAck(100);
            //获取这条消息的id
            long id = message.getId();
            int size = message.getEntries().size();
            if (id != -1 && size > 0) {
                // 数据解析
                analysis(message.getEntries());
            }else {
                //暂停1秒防止重复链接数据库
                Thread.sleep(1000);
            }
            // 确认消费完成这条消息
            conn.ack(message.getId());
            // 关闭连接
            conn.disconnect();
        }
    }

    /**
     * 数据解析
     */
    private void analysis(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 解析binlog
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
            }
            if (rowChange != null) {
                // 获取操作类型
                CanalEntry.EventType eventType = rowChange.getEventType();
                // 获取当前操作所属的数据库
                String dbName = entry.getHeader().getSchemaName();
                // 获取当前操作所属的表
                String tableName = entry.getHeader().getTableName();
                // 事务提交时间
                long timestamp = entry.getHeader().getExecuteTime();
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);

                }
            }
        }
    }


    /**
     * 解析数据
     * @param beforeColumns 修改、删除后的数据
     * @param afterColumns  新增、修改、删除前的数据
     * @param dbName 数据库名字
     * @param tableName  表大的名字
     * @param eventType  操作类型(INSERT,UPDATE,DELETE)
     * @param timestamp 消耗时间
     */
    private static void dataDetails(List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns, String dbName, String tableName, CanalEntry.EventType eventType, long timestamp) {

        System.out.println("数据库:" + dbName);
        System.out.println("表名:" + tableName);
        System.out.println("操作类型:" + eventType);
        if (CanalEntry.EventType.INSERT.equals(eventType)) {
            System.out.println("这是一条新增的数据");
        } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
            System.out.println("删除数据:"+afterColumns);
        } else {
            System.out.println("更新数据:更新前数据--"+afterColumns);
            System.out.println("更新数据:更新后数据--"+beforeColumns);

        }
        System.out.println("操作时间:" + timestamp);
    }
    
}

canal的正则表达式

表示监听canal数据库下所有表:                      canal\\..*

表示监听canal数据库下canal打头的表:          canal\\.canal.*

表示监听canal数据库下test1表:                     canal.test1

消息投递到RocketMQ(监听MySQL变化,投递MQ)

向MQ投递消息需要修改两个配置文件,我们接着上面的继续改

1. 修改 instance.properties ,文件在容器内 /home/admin/canal-server/conf/example文件夹下

#根据自己业务修改具体向哪个主题下投递消息
# 这里代表向RocketMQ的哪个主题投递消息
canal.mq.topic=example

2. 修改 canal.properties, 文件在容器内 /home/admin/canal-server/conf/ 文件夹下

# 服务模式只能选择一种模式,选择投递到MQ,代码就不能实现监听了
#tcp:  canal客户端自己实现监听(代码监听模式)
#kafka:  直接投递消息到kafka
#RocketMQ: 直接投递消息到RocketMQ
canal.serverMode = RocketMQ
#MQ的地址,写实际地址,不要写,localhost和127.0.0.1
#端口是RocketMQ的Broker的端口
canal.mq.servers = 192.168.11.110:9876


------------------------------一下配置不是必须修改,只为介绍作用-----------------------------
# aliyun账号的ak/sk信息,如果使用阿里云的RocketMQ服务必填,
canal.aliyun.accessKey =
canal.aliyun.secretKey =

canal疯狂输出日志,导致占用磁盘过大的问题 

原因 1

  •         由于我们在代码中配置了  connector.subscribe("xxxxxxx"); 导致覆盖掉了canal-server 配置文件 instance.properties 中的 canal.instance.filter.regex=.*\\..* 从而导致过滤失效.
  •          说白了这行代码执行一次  connector.subscribe("xxxxxxx"); canal就行向日志文件 example.log中一直写入一次日志。


配置文件介绍

canal.properties配置文件介绍

在 /home/admin/canal-server/conf/ 文件夹下

参数名字参数说明默认值
canal.destinations当前server上部署的instance列表
canal.conf.dirconf/目录所在的路径../conf
canal.auto.scan开启instance自动扫描
如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:
a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动
b. instance目录删除:卸载对应instance配置,如已启动则进行关闭
c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作
true
canal.auto.scan.intervalinstance自动扫描的间隔时间,单位秒5
canal.instance.global.mode全局配置加载方式spring
canal.instance.global.lazy全局lazy模式false
canal.instance.global.manager.address全局的manager配置方式的链接信息
canal.instance.global.spring.xml全局的spring配置方式的组件文件classpath:spring/memory-instance.xml 
 (spring目录相对于canal.conf.dir)
canal.instance.example.mode
canal.instance.example.lazy
canal.instance.example.spring.xml
.....
instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式
命名规则:canal.instance.{name}.xxx
canal.instance.tsdb.spring.xmlv1.0.25版本新增,全局的tsdb配置方式的组件文件classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir)
参数名字参数说明默认值
canal.id每个canal server实例的唯一标识,暂无实际意义1
canal.ipcanal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.portcanal server提供socket服务的端口11111
canal.zkServerscanal server链接zookeeper集群的链接信息
例子:10.20.144.22:2181,10.20.144.51:2181
canal.zookeeper.flush.periodcanal持久化数据到zookeeper上的更新频率,单位毫秒1000
canal.instance.memory.batch.modecanal内存store中数据缓存模式
1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
2. MEMSIZE : 根据buffer.size  * buffer.memunit的大小,限制缓存记录的大小
MEMSIZE
canal.instance.memory.buffer.sizecanal内存store中可缓存buffer记录数,需要为2的指数16384
canal.instance.memory.buffer.memunit内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小1024
canal.instance.transactionn.size最大事务完整解析的长度支持
超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性
1024
canal.instance.fallbackIntervalInSecondscanal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒
说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢
60
canal.instance.detecting.enable是否开启心跳检查false
canal.instance.detecting.sql心跳检查sqlinsert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.interval.time心跳检查频率,单位秒3
canal.instance.detecting.retry.threshold心跳检查失败重试次数3
canal.instance.detecting.heartbeatHaEnable心跳检查失败后,是否开启自动mysql自动切换
说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据
false
canal.instance.network.receiveBufferSize网络链接参数,SocketOptions.SO_RCVBUF16384
canal.instance.network.sendBufferSize网络链接参数,SocketOptions.SO_SNDBUF16384
canal.instance.network.soTimeout网络链接参数,SocketOptions.SO_TIMEOUT30
canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名

true

canal.instance.filter.query.dcl是否忽略dcl语句false
canal.instance.filter.query.dml是否忽略dml语句
(mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)
false
canal.instance.filter.query.ddl是否忽略ddl语句false
canal.instance.filter.table.error

是否忽略binlog表结构获取失败的异常

(主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况)

false
canal.instance.filter.rows

是否dml的数据变更事件

(主要针对用户只订阅ddl/dcl的操作)

false
canal.instance.filter.transaction.entry是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件false
canal.instance.binlog.format支持的binlog format格式列表
(otter会有支持format格式限制)
ROW,STATEMENT,MIXED
canal.instance.binlog.image支持的binlog image格式列表
(otter会有支持format格式限制)
FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation

ddl语句是否单独一个batch返回

(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)

false
canal.instance.parser.parallel

是否开启binlog并行解析模式

(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)

true
canal.instance.parser.parallelBufferSizebinlog并行解析的异步ringbuffer队列
(必须为2的指数)

256

canal.instance.tsdb.enable是否开启tablemeta的tsdb能力true
canal.instance.tsdb.dir主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url

jdbc url的配置

(h2的地址为默认值,如果是mysql需要自行定义)

jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername

jdbc url的配置

(h2的地址为默认值,如果是mysql需要自行定义)

canal
canal.instance.tsdb.dbPassword

jdbc url的配置

(h2的地址为默认值,如果是mysql需要自行定义)

canal
canal.instance.rds.accesskey

aliyun账号的ak信息 (如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)

canal.instance.rds.secretkey

aliyun账号的sk信息

(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)

instance.properties参数列表

在 /home/admin/canal-server/conf/example 目录下

参数名字参数说明默认值
canal.instance.mysql.slaveIdmysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 
(v1.1.x版本之后canal会自动生成,不需要手工指定)
canal.instance.master.addressmysql主库链接地址127.0.0.1:3306
canal.instance.master.journal.namemysql主库链接时起始的binlog文件
canal.instance.master.positionmysql主库链接时起始的binlog偏移量
canal.instance.master.timestampmysql主库链接时起始的binlog的时间戳
canal.instance.gtidon是否启用mysql gtid的订阅模式false
canal.instance.master.gtidmysql主库链接时对应的gtid位点
canal.instance.dbUsernamemysql数据库帐号canal
canal.instance.dbPasswordmysql数据库密码canal
canal.instance.defaultDatabaseNamemysql链接时默认schema
canal.instance.connectionCharsetmysql 数据解析编码UTF-8
canal.instance.filter.regex

mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)


常见例子:

1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal\\.test1

5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

.*\\..*
canal.instance.filter.black.regex

mysql 数据解析表的黑名单,表达式规则见白名单的规则

canal.instance.rds.instanceId

aliyun rds对应的实例id信息

(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)

GitHub 加速计划 / ca / canal
25
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
923675aa - 5 天前
ebb54100 - 5 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐