canal的工作原理

 

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)
  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)
canal架构设计

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

 

EventParser
EventSink
EventStore

 

目前实现了Memory内存、本地file存储以及持久化到zookeeper以保障数据集群共享。
Memory内存的RingBuffer设计:

定义了3个cursor

  • Put : Sink模块进行数据存储的最后一次写入位置
  • Get : 数据订阅获取的最后一次提取位置
  • Ack : 数据消费成功的最后一次消费位置

借鉴Disruptor的RingBuffer的实现

实现说明:

  • Put/Get/Ack cursor用于递增,采用long型存储
  • buffer的get操作,通过取余或者与操作。(与操作: cusor & (size – 1) , size需要为2的指数,效率比较高)

Canal会导致消息重复吗?

    答:会,这从两个大的方面谈起。

        1)Canal instance初始化时,根据“消费者的Cursor”来确定binlog的起始位置,但是Cursor在ZK中的保存是滞后的(间歇性刷新),所以Canal instance获得的起始position一定不会大于消费者真实已见的position,这种情况主要出现在主从切换的时候

        2)Consumer端,因为某种原因的rollback,也可能导致一个batch内的所有消息重发,此时可能导致重复消费。

    我们建议,Consumer端需要保持幂等,对于重复数据可以进行校验或者replace。对于非幂等操作,比如累加、计费,需要慎重

Canal会不会丢失数据?

    答:Canal正常情况下不会丢失数据,比如集群节点失效、重启、Consumer关闭等;但是,存在丢数据的风险可能存在如下几种可能:

        1)ZK的数据可靠性或者安全性被破坏,比如ZK数据丢失,ZK的数据被人为串改,特别是有关Position的值。

        2)MySQL binlog非正常运维,比如binglog迁移、重命名、丢失等。

        3)切换MySQL源,比如原来基于M1实例,后来M1因为某种原因失效,那么Canal将数据源切换为M2,而且M1和M2可能binlog数据存在不一致(非常有可能)。

        4)Consumer端ACK的时机不佳,比如调用get()方法,而不是getWithoutAck(),那么消息有可能尚未完全消费,就已经ACK,那么此时由异常或者Consumer实例失效,则可能导致消息丢失。我们需要在ACK时机上保障“at lease once”。

 

Canal的延迟很大是什么原因?

    答:根据数据流的pipeline,“Master” > "Slave" > "Canal" > "Consumer",每个环节都需要耗时,而且整个管道中都是单线程、串行、阻塞式。(假如网络层面都是良好的)

        1)如果批量insert、update、delete,都可能导致大量的binlog产生,也会加剧Master与slave之间数据同步的延迟。(写入频繁)

        2)“Consumer”消费的效能较低,比如每条event执行耗时很长。这会导致数据变更的消息ACK较慢,那么对于Canal而言也将阻塞,直到Canal内部的store有足够的空间存储新消息、才会继续与Slave进行数据同步。

        3)如果Canal节点ZK的网络联通性不畅,将会导致Canal集群处于动荡状态,大量的时间消耗在ZK状态监测和维护上,而无法对外提供正常服务,包括不能顺畅的dump数据库数据。

Canal如何重置消费的position?

    答:比如当消费者在消费binlog时,数据异常,需要回溯到旧的position重新消费,是这个场景!

    1)我们首先确保,你需要回溯的position所对应的binlog文件仍然存在,可以通过需要回溯的时间点来确定position和binlog文件名,这一点可以通过DBA来确认。

    2)关闭消费者,否则重置位点操作无法生效。(你可以在关闭消费者之前执行unsubscribe,来删除ZK中历史位点的信息)

    3)关闭Canal集群,修改对应的destination下的配置文件中的“canal.instance.master.journal.name = <此position对应的binlog名称>”、“canal.instance.master.position = <此position>”;可以只需要修改一台。

    4)删除zk中此destination的消费者meta信息,“${destination}/1001"此path下所有的子节点,以及“1001”节点。(可以通过消费者执行unsubscribe来实现)

    5)重启2)中的此canal节点,观察日志。

    6)重启消费者。

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
1e5b8a20 - 3 个月前
ff82fd65 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐