canal详解
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
·
EventParser WorkFlow
EventStore负责存储解析后的Binlog事件,而解析动作负责拉取Binlog,它的流程比较复杂。需要和MetaManager进行交互。
比如要记录每次拉取的Position,这样下一次就可以从上一次的最后一个位置继续拉取。所以MetaManager应该是有状态的。
EventParser的流程如下:
- Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
- Connection建立链接,发送BINLOG_DUMP指令
- Mysql开始推送Binaly Log
- 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
- 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
- 存储成功后,定时记录Binaly Log位置
上面提到的Connection指的是实现了ErosaConnection接口的MysqlConnection。EventParser的实现类是实现了AbstractEventParser的MysqlEventParser。
EventParser解析binlog后通过EventSink写入到EventStore,这条链路可以通过EventStore的put方法串联起来;
其实这里还有一个EventTransactionBuffer缓冲区,即Parser解析后先放到缓冲区中,
当事务发生时或者数据超过阈值,就会执行刷新操作:即消费缓冲区的数据,放到EventStore中。
这个缓冲区有两个偏移量指针:putSequence和flushSequence。
Canal HA
单机模拟两个Canal Server,将单机模式复制出两个文件夹,并修改相关配置
canal_m/conf/canal.properties:
canal.id= 2
canal.ip=
canal.port= 11112
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal_m/conf/example/instance.properties
canal.instance.mysql.slaveId = 1235
canal_s
canal.id= 3
canal.ip=
canal.port= 11113
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal_s/conf/example/instance.properties
canal.instance.mysql.slaveId = 1236
忘记出处了,抱歉
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7
* 1. Fix compressed OSS binlog data
2. Fix first second data loss caused by dumping from OSS binlog
* Fix CI failed test cases 30 天前
79338be0
- String.format is lower than StringBuilder. Benchmark like below:
code snippet:
String str = String.format("%s-%s-%s", 0, 1, 10);
Benchmark Mode Cnt Score Error Units
StringBenchmark.append thrpt 46431458.255 ops/s
StringBenchmark.format thrpt 985724.313 ops/s
StringBenchmark.append avgt ≈ 10⁻⁸ s/op
StringBenchmark.format avgt ≈ 10⁻⁶ s/op
StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op
StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op
StringBenchmark.append:p1.00 sample 0.001 s/op
StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op
StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op
StringBenchmark.format:p1.00 sample 0.001 s/op
StringBenchmark.append ss ≈ 10⁻⁶ s/op
StringBenchmark.format ss ≈ 10⁻⁵ s/op 30 天前
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)