1. canal

阿里巴巴的产品,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

安装包下载地址,以及官方文档地址:官方github主页

2. canal 工作原理

模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

3. 修改mysql binlog配置

修改mysql my.conf文件

log-bin=mysql-bin

binlog_format=row

server-id=1

4. 在mysql中,给canal添加权限

mysql> create user canal identified by ‘canal’;

mysql> grant select,replication slave,replication client on *.* to ‘canal’@’%’;

mysql> flush privileges;

5. 修改canal的配置文件

# conf/example/instance.properties

canal.instance.master.address=127.0.0.1:3306 #mysql的地址端口

canal.instance.dbUsername=canal #mysql中赋予权限的用户

canal.instance.dbPassword=canal #mysql中赋予权限的密码

canal.instance.connectionCharset = UTF-8

canal.instance.filter.regex=.*\\..* #过滤模式,默认不过滤

canal.mq.topic=example

# conf/canal.properties

canal.serverMode = kafka

canal.mq.servers = node01:9092,node02:9092.node03:9093

canal.mq.producerGroup = test

6. 测试

启动canal

sh bin/startup.sh

启动kafka消费者

代码参见:MaxWell+kafka解析mysql binlog 将其中 topic 改为 ‘example’ 后运行即可。

在mysql中新增数据

结果:

# idea 控制台打印如下:

消费的数据为:{"data":[{"id":"9","name":"fak","money":"30.0"}],"database":"db01","es":1575958431000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","money":"double"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"money":8},"table":"account2","ts":1575958431511,"type":"INSERT"}

GitHub 加速计划 / ca / canal
53
10
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7 * 1. Fix compressed OSS binlog data 2. Fix first second data loss caused by dumping from OSS binlog * Fix CI failed test cases 29 天前
79338be0 - String.format is lower than StringBuilder. Benchmark like below: code snippet: String str = String.format("%s-%s-%s", 0, 1, 10); Benchmark Mode Cnt Score Error Units StringBenchmark.append thrpt 46431458.255 ops/s StringBenchmark.format thrpt 985724.313 ops/s StringBenchmark.append avgt ≈ 10⁻⁸ s/op StringBenchmark.format avgt ≈ 10⁻⁶ s/op StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op StringBenchmark.append:p1.00 sample 0.001 s/op StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op StringBenchmark.format:p1.00 sample 0.001 s/op StringBenchmark.append ss ≈ 10⁻⁶ s/op StringBenchmark.format ss ≈ 10⁻⁵ s/op 29 天前
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐