本文将对canal的server模块进行分析,跟之前一样,我们带着几个问题来看源码:

  • CanalServer有几种使用方式?
  • 控制台Admin、客户端client是如何与CanalServer交互的?
  • CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?
  • Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?

server模块内的结构如下:

主要分为了三个包:

  • admin包:

这个包的CanalAdmin接口定义了canalServer上暴露给canal-admin控制台使用的一些服务接口。

上一篇deployer模块解析中提到的CanalAdminController就是实现了CanalAdmin接口(把这个接口的实现放在deployer模块是挺奇怪的)。 Admin包中使用了netty作为服务端(CanalAdminWithNetty类中实现),接受控制台Admin的请求,返回当前canalServer的一些运行状态。

  • server包:

server模块的核心包,本文重点解析的部分,需要了解CanalServerWithEmbedded 和CanalServerWithNetty。

  • spi包:

定义了canalServer的监控内容 通过spi实现,比如项目中的Prometheus子模块实现了监控能力,我们不展开分析。

1.从CanalServer的架构说起

CanalServer目前支持两种模式:

  • serverMode = tcp的Server-Client模式
  • serverMode = kafak 或 rocketMQ 的 Server-MQ-Client模式

为了大家能充分理解canalServer的结构,这里精心制作了一个canalServer的架构图(如果觉得这图不错,给本文点个赞吧)。

1.1 Server-Client模式

架构如图所示:

我们可以清楚的看到Server模块中各个模块的关系与能力:

  • CanalServerWithEmbedde维护了具体的instance任务,负责对binlog进行订阅、过滤、缓存,就是之前的文章介绍过的parser-sink-store的方式。
  • CanalServerWithNetty作为服务端,接收CanalClient的请求,将binlog的消息发送给client。
  • CanalAdminWithNetty作为admin的服务器,接收控制台Admin的控制操作、查询状态操作等,启停或显示当前CanalServer以及instance的状态。

1.2 Server-MQ-Client模式

架构如图所示:

主体部分与Server-client模式一致,主要区别如下:

  • 不需要CanalServerWithNetty,改为CanalMQProducer投递消息给消息队列
  • 不使用CanalClient,改为MqClient获取消息队列的消息进行消费

这种模式相比于Server-client模式

  • 下游解耦,利用消息队列的特性,可以支持多个客户端广播消费、集群消费、重复消费等
  • 会增加系统的复杂度,增加一些延迟

具体模式的选择,需要根据具体的使用场景来决定。

2.server包

admin包和spi包都不属于核心逻辑,因此我们重点关注server包的代码。

我们看到,server包下面分为了embedded包、exception包、netty包和几个接口类。

其中,最顶层的设计就要从CanalServer接口入手。

它的实现类有两个,CanalServerWithEmbedded 和 CanalServerWithNetty。

它们之间的区别官方文档给了一些说明。

那么,对于官方文档中提到的Embedded(嵌入式)的自主开发是怎么使用呢?

跟我们上面提到的Server-Client模式和Server-MQ-Client模式完全不同,采用了一种无server的架构,如下图所示。

我们可以看到,这种模式没有了Canal-Server,直接在自己的应用中引入canal,然后使用CanalServerWithEmbedded进行数据抓取和订阅。

当然,这种方式开发成本有点高,一般也不会去这样使用。

对于CanalServerWithEmbedded 和 CanalServerWithNetty,官方文档里面实际上没有解释的特别到位,只讲了区别,没有讲联系。

这两个实现类除了官方文档中说明的区别之外,还有很大的联系。

可以看看我们上文介绍的架构图,对于Server-Client模式下的模块联系

实际上,真正的执行逻辑是在CanalServerWithEmbedded中的,CanalServerWithNetty中持有了CanalServerWithEmbedded对象,委托embedded进行相关逻辑处理,CanalServerWithNetty更多的作用是充当服务端与CanalClient进行交互。

3. CanalServerWithNetty类

下面,我们先看看CanalServerWithNetty类。

3.1 单例构建

使用 private构造器 + 静态内部类 来实现一个单例模式,保证了一个CanalServer内部只有一个CanalServerWithNetty。

同时,我们能看到内部持有一个CanalServerWithEmbedded对象,用来处理相关请求,验证了我们上面的说明。

3.2 启动逻辑 start()

源码如下:

主要流程如下:

  • 启动embeddedServer
  • 创建bootstrap实例,设置netty相关配置

参数NioServerSocketChannelFactory也是Netty的API,接受2个线程池参数,第一个线程池是Accept线程池,第二个线程池是woker线程池,Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。这里属于netty的知识,不熟悉的可以暂时不必深究,简单认为netty使用线程来处理客户端的高并发请求即可。

  • 构造对应的pipeline,包括解码处理、身份验证、创建netty的 seesionHandler(真正处理客户端请求,seesionHandler的实现是核心逻辑)

pipeline实际上就是netty对客户端请求的处理器链,可以类比JAVA EE编程中Filter的责任链模式,上一个filter处理完成之后交给下一个filter处理,只不过在netty中,不再是filter,而是ChannelHandler。

  • 启动netty,监听port端口,然后客户端对 这个端口的请求可以被接收到

对于 netty的相关知识 ,本文 不深入展开,简单理解 为一个高性能服务器即可,可以监听 端口请求,并 进行相应的处理。

重点在于sessionHandler的处理。

3.3 逻辑分发SessionHandler类

canalServer的处理逻辑显然都在sessionHandler里面,而这个handler在构建时,传入了embeddedServer。

前面我们提过,serverWithNetty的处理逻辑是委派给embeddedServer的,所以这里就非常顺理成章了,让handler维护embeddedServer实例,进行逻辑处理。

sessionHandler继承了netty的SimpleChannelHandler类,重写了messageReceived方法,接收到不同请求后,委托embeddedServer用不同方法进行处理 。

这个方法里面的代码非常冗长,而本质都是委托给embeddedServer去处理,因此,我们看下主干逻辑即可。

可以看到,根据不同的packet类型,最终都是委托给embeddedServer进行处理,这里只是做一个逻辑的判断和分发。

3.4 CanalServerWithNetty小结

到此,我们已经了解了CanalServerWithNetty是如何启动的。

并且,它的主要定位就是充当服务器,接收客户端的请求,然后做消息分发,委托给CanalServerEmbedded进行处理。

下面,我们来看下CanalServerEmbedded的相关实现。

4. CanalServerEmbedded类

4.1 基本认识

  • 非完全单例模式,这里使用public的构造器,用户还是有机会自己new对象出来的,应用是用来独立引入进行开发的时候使用。
  • 维护了instance的对象容器
  • 继承了CanalServer和CanalService接口

CannalServer接口其实就是就是start()和stop()方法,没有特别的地方,主要是start()配置了一个MigrateMap.makeComputingMap,

当需要某个instance的时候,就会调用apply方法用instanceGenerator创建对应的instance。

我们重点看下CanalService接口定义的方法。

每个方法的入参都带来clientIdentity,这个是客户端的身份标示

目前canal只支持一个客户端对一个instance进行订阅,clientId全部写死为1001,据说以后可能会支持多用户订阅。

了解CanalService定义的方法在CanalServerEmbedded中如何实现,基本也就能看清CanalServerEmbedded的全貌了。

尤其是,你能理解官网wiki中介绍的canal核心功能——异步消费流式api(get/ack/rollback协议) 设计。

4.2 subscribe方法

主要步骤:

  • 根据客户端标识clientIdentity中的destination,找到对应的instance
  • 通过instance的metaManager记录下当前这个客户端在订阅
  • 通过instace的metaManage获取当前订阅binlog的position位置。如果是第一次订阅,那么metaManage没有position信息,就从eventStore获取第一个binlog的position,然后更新到metaManager
  • 通知下订阅关系变化

这里需要注意一下metaManager,这是一个接口,有多种实现方式,包括基于内存、基于文件、基于内存+zookeeper混合、基于zookeeper等,都在meta模块中,这里就简单了解下概念即可。

  • MemoryMetaManager:位点信息保存在内存中
  • ZookeeperMetaManage:位点信息保存在zk上
  • PeriodMixedMetaManager:前面两种的混合,保存在内存中,然后位点信息定期刷新到zk上

我们在集群模式下,default-instance.xml使用的是基于PeriodMixedMetaManager的实现。

4.3 unsubscribe方法

这个方法比较简单,就不放源码了。

就是找到instance对应的metaManager,然后调用unsubscribe方法取消这个客户端的订阅。

需要注意的是,取消订阅,instance本身仍然是在运行的,可以有新的client来订阅这个instance。

4.4 getWithoutAck方法

先解释几个概念。

我们用的集群版canalServer,默认是使用PeriodMixedMetaManager来管理位点信息,也就是MemoryMetaManager + zookeeperMetaManager。

其中,对于客户端消费instance消息的情况,内部维护了一个对象MemoryClientIdentityBatch进行记录

回到这个方法来说,这个方法用于客户端获取binlog消息,大致流程如下:

  • 根据clientIdentity的destination获取对应的instance
  • 获取到流式数据中的最后一批获取的位置positionRanges(跟batchId有关联,就是上面那个map里面的)
  • 从cananlEventStore里面获取binlog,转化为event。一般是从最后的一个batchId位置开始,如果之前没有batchId,那么就从cursor记录的消费位点开始;如果cursor为空,那只能从eventStore的第一条消息开始。
  • event转化为entry,并生成新的batchId,组合成message返回给客户端

注意在eventStore获取event的时候,用户可以自己设置batchSize和超时时间timeout。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。 如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。具体eventStore的获取逻辑,我们下次讲到这个模块再展开。

4.5 get方法

这个方法主要是用于客户端获取binlog消息,与getWithoutAck基本一致。

主要区别在于,客户端获取batch后,自动ack,这样相对来说肯定更快,但是无法保证可靠性。

在项目中看起来暂时没有使用,我们就不展开了。

4.6 ack方法

进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。

  • 从metaManager中移除batchId对应的记录
  • 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始
  • 已经ack的数据,在eventStore中清除

4.7 rollback

rollback有两个方法,回滚所有和回滚指定batchId,不过从源码来看,目前回滚指定指定batchId也是回滚所有。

回滚的本质,就是把所有还没ack的batchId都清空,流式api被get但是还没ack的消息会被重新get。

5.canalMQStarter

在第一节的架构模式中我们分析过了,在启动过程中,如果serverMode选择tcp,会启动canalServerWithNetty,如果serverMode选择了mq,就会启动cannalMQStarter。

所以从模块组成来说,canalMQStarter跟canalServerWithNetty是比较相似的。

canalMQStarter也是委托embeddedCanal做处理,同时委托CanalMQProducer把消息投递到mq集群。

canalServerWithNetty也是委托embeddedCanal做处理,然后通过netty来跟canal-client做交互。

如果我们以后应用中要内嵌embeddedCanal,完全可以参照canalMQStarter和canalServerWithNetty的模式来写。

主要组成如下:

  • 工作线程池executorService,对每个instance起一个worker线程
  • canalMQWorks,记录了destination(instance的标识)和worker线程的关系
  • CanalServerWithEmbedded
  • CanalMQProducer投递mq消息

5.1 start方法

这个方法就是前面canalStarter类里面的start()方法中,对CanalMQStarter.start()的调用。

具体做了三件事情:

  • 获取CanalServerWithEmbedded的单例对象
  • 对应每个instance启动一个worker线程CanalMQRunnable
  • 注册ShutdownHook,退出时关闭线程池和mqProducer

这里主要看看CanalMQRunnable做了些什么。

5.2 CanalMQRunnable

这是一个内部类,就是看看worker里面做了什么

只有一个worker方法,主要逻辑非常清晰:

  • 给自己创建一个身份标识,作为client
  • 根据destination获取对应instance,如果没有就sleep,等待产生(比如从别的server那边HA过来一个instance)
  • 构建一个MQ的destination对象,加载相关mq的配置信息,用作mqProducer的入参
  • 在embeddedCanal中注册这个订阅客户端
  • 开始运行,并通过embededCanal进行流式get/ack/rollback协议,进行数据消费

6.总结

回到开头的几个问题,相信文中都已经做了解答。

  • CanalServer有几种使用方式?

可以独立部署(推荐),可以使用Server-Client模式 和 Server-MQ-Client模式两种。

可以内嵌部署开发(embedded,难度较高)。

  • 控制台Admin、客户端client是如何与CanalServer交互的?

控制台Admin通过CanalAdminWithNetty与服务端交互 客户端client通过CanalServerWithNetty与服务端交互。

  • CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?

CanalServerWithEmbedded是真正核心逻辑(parser-sink-store)处理的地方 。CanalServerWithNetty持有CanalServerWithEmbedded对象,接收client的请求然后转发给CanalServerWithEmbedded对象处理。

  • Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?

CanalServerWithEmbedded集成了CanalService接口,实现了具体的get/ack/rollback协议

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐