MysqlEventParser#findStartPositionInternalStep2:这里分如下两种情况

  • 如果日志位点管理器(LogPositionManager)中并未存储相关的位点信息,例如初次启动时的处理逻辑。

  • 如果日志位点管理器中已存储相关的位点信息的处理逻辑。

由于初次启动时日志位点管理器并没有存储其位点信息,故我们先看位点管理器并未存储位点的情况。

MysqlEventParser#findStartPositionInternal

Step3:如果当前连接的是主节点,则尝试使用 masterPosition,如果当前连接的是从节点(发生了切换),即使用 standbyPosition,那这两个位点信息是从哪来的的呢?原来在 Canal Instance 实例启动之前,可以手动通过 positions 属性手动设置开始解析位点。

MysqlEventParser#findStartPositionInternalStep4:如果在启动时未手动设置初始解析位点,则从当前 binlog 日志最后的位点开始同步,其实现原理是向 MySQL 服务器发送 show master status\G 命令,其命令输出结果如下图所示:

接下来再关注一下如果从日志位点管理器中查找到位点的处理逻辑,在进入该流程的探究之前,先看一下表示位点的实体类,一睹其结构。

在这里插入图片描述

会在 LogIdentity 中记录该日志位点是由哪个 slaveId 以及所连接的 MySQL 服务器信息。

MysqlEventParser#findStartPositionInternal

Step5:如果从日志位点管理器中查询到位点,则需要判断当前连接的服务器地址与日志位点中记录的是否一致,如果不一致则说明发生了故障切换,为了确保数据不丢失,提供了回退时间的机制,其具体实现关键点如下:

  • 如果解析 dump 出现的次数超过其阔值,可能是基于VIP模式发生了漂移,此时可以根据 serverId 来判断是否发生了切换,如何切换了,则按时间回退来重新寻找位点。

  • 如果查找到的位点连接的信息与当前连接的信息不符合,说明发生了切换,则需要回退指定的时间,即根据时间区重新定位位点,至于回退多久的时间,可以通过参数 fallbackIntervalInSeconds 进行设置,默认为 60s。

Canal Instance 启动时如何定位同步位点的流程就介绍到这里了,接下来我们再来看一下 Canal 如何基于时间戳来定位 binlog 位点。

为了流程的完整性,在学习如何根据时间戳查找binlog位点之前,我们先来看一下从位点管理器中查询到对应的位点信息后的处理流程。

MysqlEventParser#findStartPositionInternal如果从位点管理器中查询到位点信息,首先判断当前连接的MySQL服务器(主或从)与位点信息是否一致,如果不一致,说明发生了主从切换,为了保证数据的完整性,需要对位点进行前移,默认为回退到60s之前的位点,

1.2 基于时间戳从查找 binlog 位点

基于时间戳查找 binlog 位点的实现方法为 MysqlEventParser 的 findByStartTimeStamp,接下来我们来看一下其实现原理。

MysqlEventParser#findByStartTimeStamp

Step1:首先先查询最大的位点与最小位点,最小位点可发送SQL:show binlog events limit 1。

MysqlEventParser#findByStartTimeStamp

Step2:然后从最后一个文件开始,尝试根据开始时间戳进行日志查找,等下会详细介绍如果从一个binlog日志定位 endposition。

MysqlEventParser#findByStartTimeStamp

Step3:如果找到一个合适的endposition,则结束寻找。如果没有找到一个合适的endposition,则尝试向前一个文件进行解析,首先解析出要查找的最小文件的名称,例如(mysql-bin.000036),从文件名称序号,然后减1,再判断该文件名是否小于这次可查找的最小文件名,如果不大于,则向前继续选择,否则结束查找,返回null。

接下来我们看一下如果在一个binlog文件中根据时间戳查找合适的位点。

MysqlEventParser#findAsPerTimestampInSpecificLogFile

通过向 MySQL Master 发送 dump 命令,建立连接,一条一条从 binlog 日志中解析事件,一条日志日志进行匹配,每从master获取一个logevent,调用 SinkFunction 的 seek 方法。

在这里插入图片描述

Step1:如果 justForPositionTimestamp 参数为 true,表示在查询位点时只考虑时间戳,并不考虑事务,在按开始时间戳寻找的方法中该参数为 false,即不会进入该方法。

SinkFunction#sink

Step2:获取当前日志的基本信息,例如所在的binlog日志文件、日志偏移量、日志写入时间戳、master serverId。

SinkFunction#sink

Step3:如果记录日志的时间戳大于等于待查找的时间戳,返回 false,停止在文件中的停止,是否继续查找其他文件取决在在当前文件中是否已查到符合条件的日志(LogEvent),即是否查找到小于或等于要查找的时间戳。

温馨提示:按照时间戳去查找,其设计理念就是查找小于待查找时间戳中的最大时间戳的LogEvent。

SinkFunction#sink

Step4:如果当前的解析的日志偏移量小于此次待查找的最大偏移量,同样结束本文件的查找(针对查找的第一个文件)。因为在查询的时候,首先会查询当前最大偏移量,即查找时的快照,新的内容不在本次查找范围内。

在这里插入图片描述

Step5:重点查找事件类型为TRANSACTIONEND与TRANSACTIONBEGIN ,即事务结束与事务开始的事件,并将其存储在 logPostion 中,表示该文件中满足查找条件的事件,但并不是只要找到一条就退出,而是继续向后找,直到找到最合适的事件。

由于源码剖析不够直观,为了更好的理解按照时间戳查找日志位点,再给出其流程图,如下:

在这里插入图片描述

2、总结


阅读源码不那么直观,故先来一张流程图对其进行一个总结,辅助大家了解定位位点的核心步骤。

在这里插入图片描述

最后

金三银四到了,送上一个小福利!

image.png

image.png

专题+大厂.jpg
[外链图片转存中…(img-oSN92lRu-1721832773984)]

[外链图片转存中…(img-8ptp0Ngq-1721832773985)]

[外链图片转存中…(img-sHgNDJHw-1721832773985)]

GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐