0 Canal介绍

Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)

1 MySQL 的 Binlog

1.1 什么是 Binlog

MySQL 的二进制日志,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。二进制有两个最重要的使用场景:

① MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves来达到 Master-Slave 数据一致的目的。

② 数据恢复,通过使用 MySQL Binlog 工具来使恢复数据。二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件

1.2 Binlog分类

MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配
置 binlog_format= statement|mixed|row。三种格式的区别
1)statement
语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空
间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

优点:节省空间。
缺点:有可能造成数据不一致。

2)row
行级, binlog 会记录每次操作后每行记录的变化。

优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,只记录执行后的效果。
缺点:占用较大空间。

3)mixed
statement 的升级版,一定程度上解决了因为一些情况而造成的 statement
模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理

优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。

综合上面对比,Canal 想做监控分析,选择 row 格式比较合适

1.3 Mysql主从复制

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库
在这里插入图片描述
Canal原理很简单,即伪装成mysql的slave节点,假装从Master拷贝数据。使用场景如下:
在这里插入图片描述

1.4 修改mysql配置文件

修改/etc/my.cnf文件如下内容
在这里插入图片描述

#开启binlog
log_bin = mysql-bin
#binlog日志类型
binlog_format = row
#MySQL服务器唯一id
server_id = 1
#设置需要同步的库
binlog-do-db=canal

binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog。修改完成后重启mysql生效

service mysql restart

1.5 测试binlog是否生效

建表

create table student (
	id varchar(20)
	,name varchar(20)
	,age int
	,sex varchar(5)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci;

插入数据

insert into student values('1001','zhangsan',18,'male');

对比查看未插入数据前binlog
在这里插入图片描述
可以看到binlog发生了变化,说明配置生效了。

1.6 Mysql中新建canal用户

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2 安装Canal

2.1 下载canal

进入官网下载canal安装包,这里以1.1.2版本为例

https://github.com/alibaba/canal/releases

2.2 创建canal文件夹并解压

这里的canal文件包解压后是很多文件,因此建议建立一个独立的文件夹用于存放解压后的文件。

mkdir canal
tar -zxvf canal.deployer-1.1.2.tar.gz -C ./canal

2.3 修改canal.properties 的配置

说明:
① 这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认 tcp,改为输出到 kafka.
② 多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。
在这里插入图片描述

2.4 修改 instance.properties

我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下

vim instance.properties
  1. 配置Mysql服务器地址
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=10
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=wavehouse-3:3306

在这里插入图片描述

注意:这里的canal.instance.mysql.slaveId=10,只要和my.cnf中的server_id不一样即可,因为在这里canal是伪装成mysql的slave,因此id要不一样。

2)配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =canal
# enable druid Decrypt database password
canal.instance.enableDruid=false

在这里插入图片描述

2.5 启动canal

./startup.sh

在这里插入图片描述

3 实时监控

3.1 创建maven项目

3.2 添加依赖

<dependencies>
 <dependency>
 <groupId>com.alibaba.otter</groupId>
 <artifactId>canal.client</artifactId>
 <version>1.1.2</version>
</dependency>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>2.4.1</version>
 </dependency>
</dependencies>

3.3 根据Canal的架构写代码

在这里插入图片描述

package com.chen.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        //1.获取Canal连接对象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.2.202", 11111),
                "example", "", "");
        while (true){
            //2.获取连接
            canalConnector.connect();
            //3.指定需要监控的数据库
            canalConnector.subscribe("canal.*");
            //4.获取message
            Message message = canalConnector.get(100);
            //4.1获取entries
            List<CanalEntry.Entry> entries = message.getEntries();
            //4.2 判断是否有数据
            if(entries.size() <= 0){
                System.out.println("当前没有数据,休息一下~~~~");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                //如果有数据,则进行遍历
                for (CanalEntry.Entry entry : entries) {
                    //5.获取表名
                    String tableName = entry.getHeader().getTableName();
                    //6.获取entryType
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    //7.判断entryType类型是否未ROWDATA
                    if(CanalEntry.EntryType.ROWDATA.equals(entryType)){
                        //7.1如果是则序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        //8.反序列化
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        //9.获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        //10.获取具体的数据
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        //11.遍历打印数据
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            //11.1获取before的数据
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            //11.2新建JSON存放before数据
                            JSONObject beforeData = new JSONObject();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(),column.getValue());
                            }
                            //11.3获取after数据
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            //11.4新建JSON存放after数据
                            JSONObject afterData = new JSONObject();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(),column.getValue());
                            }
                            //12.打印
                            System.out.println("TableName: "+tableName+" ,EvenType: "+eventType+
                                    " ,Before: "+beforeData+",After: "+afterData);

                        }
                    }
                }
                
            }
        }
    }
}

3.3.1 插入数据

insert into student values('1002','lisi',28,'fe');

插入数据后我们可以发现是After更新,Before不更新,因为Before之前没有数据为空
在这里插入图片描述

3.3.2 插入多行数据

insert into student values('1003','wangwu',29,'fe'),('1004','zhaoliu',38,'male'),('1005','zhuqi',8,'male')

在这里插入图片描述

3.3.3 更新数据

update student set age=22 where id=1002;

在这里插入图片描述

更新数据会在before和after中均有数据,before是修改前数据,after则是修改后数据

3.3. 删除数据

delete from student where id=1003;

在这里插入图片描述
删除后数据不存在,所以before中有数据,而after中没有数据

3.4 Kafka模式测试

1)修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka
在这里插入图片描述

2)修改 Kafka 集群的地址
在这里插入图片描述

3)修改 instance.properties 输出到 Kafka 的主题以及分区数
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序 , 如 果 要 提 高 并 行 度 , 首 先 设 置 kafka 的 分 区 数 >1, 然 后 设 置canal.mq.partitionHash 属性
在这里插入图片描述
4)启动 Canal
启动Canal之前,先启动kafka,启动kafka需要先启动zookeeper

bin/zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties

在这里插入图片描述
5)看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
在这里插入图片描述

6)启动 Kafka 消费客户端测试,查看消费情况

bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.200:9092 --topic canal_test

7)向 MySQL 中插入数据后查看消费者控制台

insert into student values('1008','zhuba',29,'fe'),('1009','yangjiu',38,'male');

在这里插入图片描述
kafka是将插入后的多条数据写在json数组中

  1. 更新MySQL数据查看消费者控制台
update student set name='zhuba2' where id=1008;

在这里插入图片描述
更新后Kafka将old字段展示为旧的数据,data字段展示为新的数据
9)删除MySQL数据查看消费控制台

delete from student where id=1005;

在这里插入图片描述

GitHub 加速计划 / ca / canal
27
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:5 个月前 )
f350fbdc - 4 小时前
2d0dcf0e - 14 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐