EventParser WorkFlow

 EventStore负责存储解析后的Binlog事件,而解析动作负责拉取Binlog,它的流程比较复杂。需要和MetaManager进行交互。
比如要记录每次拉取的Position,这样下一次就可以从上一次的最后一个位置继续拉取。所以MetaManager应该是有状态的。

EventParser的流程如下:

  1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
  2. Connection建立链接,发送BINLOG_DUMP指令
  3. Mysql开始推送Binaly Log
  4. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
  5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
  6. 存储成功后,定时记录Binaly Log位置

上面提到的Connection指的是实现了ErosaConnection接口的MysqlConnection
EventParser的实现类是实现了AbstractEventParserMysqlEventParser

EventParser解析binlog后通过EventSink写入到EventStore,这条链路可以通过EventStore的put方法串联起来;

其实这里还有一个EventTransactionBuffer缓冲区,即Parser解析后先放到缓冲区中,
当事务发生时或者数据超过阈值,就会执行刷新操作:即消费缓冲区的数据,放到EventStore中。
这个缓冲区有两个偏移量指针:putSequence和flushSequence。

Canal HA

单机模拟两个Canal Server,将单机模式复制出两个文件夹,并修改相关配置

canal_m/conf/canal.properties:

canal.id= 2
canal.ip=
canal.port= 11112
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

canal_m/conf/example/instance.properties

canal.instance.mysql.slaveId = 1235

canal_s

canal.id= 3
canal.ip=
canal.port= 11113
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

canal_s/conf/example/instance.properties
canal.instance.mysql.slaveId = 1236

忘记出处了,抱歉

GitHub 加速计划 / ca / canal
53
10
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7 * 1. Fix compressed OSS binlog data 2. Fix first second data loss caused by dumping from OSS binlog * Fix CI failed test cases 30 天前
79338be0 - String.format is lower than StringBuilder. Benchmark like below: code snippet: String str = String.format("%s-%s-%s", 0, 1, 10); Benchmark Mode Cnt Score Error Units StringBenchmark.append thrpt 46431458.255 ops/s StringBenchmark.format thrpt 985724.313 ops/s StringBenchmark.append avgt ≈ 10⁻⁸ s/op StringBenchmark.format avgt ≈ 10⁻⁶ s/op StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op StringBenchmark.append:p1.00 sample 0.001 s/op StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op StringBenchmark.format:p1.00 sample 0.001 s/op StringBenchmark.append ss ≈ 10⁻⁶ s/op StringBenchmark.format ss ≈ 10⁻⁵ s/op 30 天前
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐