canal介绍和使用docker安装canal
概述
1.1 背景
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
1.2 工作原理
MySQL主从复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
架构
server 代表一个canal服务,管理多个instance
instance 伪装成一个slave,从mysql dump数据
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
1.4 HA机制设计
canal的高可用HA(High Availability)
为了减少对mysql dump的请求,要求同一时间只能有一个处于running,其他的处于standby状态
如下图所示:
大致步骤:
canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建短暂的节点,谁创建成功就允许谁启动)
创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于备用状态
一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
canal client每次进行连接时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect
1.5 docker上安装canal
创建mysql容器
docker run -id --name canal_mysql -v /mnt/canal_mysql:/var/lib/mysql
-p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7
安装vim
需要在MySQL容器中修改配置文件,但是容器中默认没有vim命令,需要进行安装。
直接执行命令安装vim速度会很慢,因为使用的是国外的源,需要更新Debian源以提高速度。
#在宿主机创建sources.list配置文件
vi sources.list
#内容为:
deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster main
deb http://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main
deb http://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main
#复制宿主机的配置到MySQL容器中
docker cp sources.list canal_mysql:/etc/apt/
#进入MySQL容器
docker exec -it canal_mysql /bin/bash
#执行安装命令
apt-get update && apt-get install vim -y
修改MySQL配置
需要让canal伪装成salve并正确获取mysql中的binary log,首先要开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,命令如下:
#修改MySQL配置文件
vim /etc/mysql/mysql.conf.d/mysqld.cnf
#添加的内容如下:
log-bin=mysql-bin
binlog-format=ROW
server_id=1
#开启binlog 选择ROW模式
#server_id不要和canal的slaveId重复
重启MySQL
docker restart canal_mysql
远程登录MySQL,查看配置状态,执行以下sql:
show variables like 'log_bin';
show variables like 'binlog_format';
show master status;
创建Canal账号
创建连接MySQL的账号canal并授予作为 MySQL slave 的权限,执行以下sql:
创建账号
CREATE USER canal IDENTIFIED BY 'canal';
授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
刷新并应用
FLUSH PRIVILEGES
创建canal-server容器
docker run -d --name canal-server -p 11111:11111 canal/canal-server:v1.1.4
配置canal-server
#进入canal-server容器
docker exec -it canal-server /bin/bash
#编辑canal-server的配置
vi canal-server/conf/example/instance.properties
重启canal-server
修改完成后重启canal-server,并查看日志:
#按ctrl+D退出容器,并重启容器
docker restart canal-server
可以看到现在已经有两个镜像在启动:
#重启成功后进入容器
docker exec -it canal-server /bin/bash
#查看日志
tail -100f canal-server/logs/example/example.log
1.6 简单使用
在数据库服务中创建数据库canal_test并创建表:
CREATE TABLE `student` (
`id` varchar(20) NOT NULL,
`name` varchar(50) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`sex` varchar(5) DEFAULT NULL,
`city` varchar(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
创建Maven工程canal-demo,在pom.xml中添加依赖:
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
编写代码获取canal数据:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class CanalTest {
public static void main(String[] args) {
String ip = "自己ip地址";
String destination = "example";
//创建连接对象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(ip, 11111), destination, "", ""
);
//进行连接
canalConnector.connect();
//进行订阅
canalConnector.subscribe();
int batchSize = 5 * 1024;
//使用死循环不断的获取canal信息
while (true) {
//获取Message对象
Message message = canalConnector.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
System.out.println("当前监控到的binLog消息数量是:" + size);
//判断是否有数据
if (id == -1 || size == 0) {
//如果没有数据,等待1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,进行数据解析
List<Entry> entries = message.getEntries();
//遍历获取到的Entry集合
for (Entry entry : entries) {
System.out.println("----------------------------------------");
System.out.println("当前的二进制日志的条目(entry)类型是:" + entry.getEntryType());
//如果属于原始数据ROWDATA,进行打印内容
if (entry.getEntryType() == EntryType.ROWDATA) {
try {
//获取存储的内容
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
//打印事件的类型,增删改查哪种 eventType
System.out.println("事件类型是:" + rowChange.getEventType());
//打印改变的内容(增量数据)
for (RowData rowData : rowChange.getRowDatasList()) {
System.out.println("改变前的数据:" + rowData.getBeforeColumnsList());
System.out.println("改变后的数据:" + rowData.getAfterColumnsList());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//消息确认已经处理了
canalConnector.ack(id);
}
}
}
}
更多推荐
所有评论(0)