1. canal

阿里巴巴的产品,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

安装包下载地址,以及官方文档地址:官方github主页

2. canal 工作原理

模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

3. 修改mysql binlog配置

修改mysql my.conf文件

log-bin=mysql-bin

binlog_format=row

server-id=1

4. 在mysql中,给canal添加权限

mysql> create user canal identified by ‘canal’;

mysql> grant select,replication slave,replication client on *.* to ‘canal’@’%’;

mysql> flush privileges;

5. 修改canal的配置文件

# conf/example/instance.properties

canal.instance.master.address=127.0.0.1:3306 #mysql的地址端口

canal.instance.dbUsername=canal #mysql中赋予权限的用户

canal.instance.dbPassword=canal #mysql中赋予权限的密码

canal.instance.connectionCharset = UTF-8

canal.instance.filter.regex=.*\\..* #过滤模式,默认不过滤

canal.mq.topic=example

# conf/canal.properties

canal.serverMode = kafka

canal.mq.servers = node01:9092,node02:9092.node03:9093

canal.mq.producerGroup = test

6. 测试

启动canal

sh bin/startup.sh

启动kafka消费者

代码参见:MaxWell+kafka解析mysql binlog 将其中 topic 改为 ‘example’ 后运行即可。

在mysql中新增数据

结果:

# idea 控制台打印如下:

消费的数据为:{"data":[{"id":"9","name":"fak","money":"30.0"}],"database":"db01","es":1575958431000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","money":"double"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"money":8},"table":"account2","ts":1575958431511,"type":"INSERT"}

GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:1 个月前 )
1e5b8a20 - 21 天前
ff82fd65 21 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐