启动工程

拉取源码

配置并启动服务端

配置depolyer模块下example.instance.properties中的数据库,然后启动CanalLauncher,服务端启动完成,并监听了刚配置的数据库的binlog。

配置客户端

创建简单表

CREATE TABLE `dailylog` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=143 DEFAULT CHARSET=utf8;

client-adapter模块中launcher模块,配置application.yml中srcDataSources的数据源,以及es相关配置。

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es
        hosts: 127.0.0.1:9300
        properties:
          cluster.name: elasticsearch

找到aclient-adapter模块elasticsearch模块中resource文件夹下的es文件夹,里面有各种配置示例。
配置一个my.yml,具体如下:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: cetc_moniwa_es_sta_rmo_vim_camera
  _type: _doc
  _id: _id
  upsert: true
  sql: "select a.id _id, a.id, a.name, concat(IFNULL(latitude_84,0), ',', IFNULL(longitude_84,0)) as location, a.camera_code, a.type, a.state, a.group_code from sta_rmo_vim_camera a LEFT JOIN sta_rmo_vim_video_label_relation b on a.id = b.video_id LEFT JOIN sta_rmo_vim_label c on b.label_id = c.label_id"
  etlCondition:
  commitBatch: 3000
  #objFields:
  #  labels: array:;

idea运行client-adapter中的launcher中的CanalAdapterApplication,然后将刚刚配置的文件拷贝至canal\client-adapter\launcher\target\classes\es下,然后重启client,便可以开始愉快地调试了。

重要类

第一感觉如下:

  • ESAdapter 重要入口
  • ESConfigMonitor 监听配置文件变化
  • ESSyncService 同步ES的服务
  • LoggerAdapterExample 日志打印
  • ESTemplate 生成ES语句
  • Dml 存放数据,用于传递
  • BulkRequestBuilder 对应es bulk类型 是es client里面的org.elasticsearch.action.bulk

新增一条数据后,ESSyncService首先收到新增信息,然后
123行insert 函数中判断类型,实例如下:

if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
   // ------单表 & 所有字段都为简单字段------
    singleTableSimpleFiledInsert(config, dml, data);
} else {
    // ------是主表 查询sql来插入------
    if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
        mainTableInsert(config, dml, data);
    }

最终执行insert操作的是70行insert函数

说明与ES交互,还是用client好一些,用http的rest接口,不好控制。

重点在于ESSyncService类,看如何解析。
ESEtlService相当于线程调度以及拼装工作。
ESSyncService中用到DataSource,应该根据不同逻辑查了数据库

ESSyncService

首先看使用了DataSource的函数
455行:private void mainTableInsert(ESSyncConfig config, Dml dml, Map<String, Object> data)
主表(单表)复杂字段insert的主要逻辑为:

  1. 使用Util.sqlRS()方法,根据数据源执行sql,获取resultset,再遍历数据
  2. 每一行数据通过ESTemplate.getESDataFromRS(),将一行数据转换为es的数据格式。
  3. 然后执行ESTemplate.insert()方法,插入数据。

配置更新

当es文件夹下的配置更新时,首先ESConfigMonitor会监测到变化,然后执行
SqlParser.pase()->…->SchemaItem.getRelationTableFields()
经常报错Relation condition column must in select columns 就是来自这里,他会检测,
用于关联的字段是否被最上层的select选择了,没有则报错。

存在的问题

目前canal不支持将 多对多表 从 mysql 同步至es。canal server本身只是提供解析binlog的功能。
主要sql解析和处理逻辑在client,因此需要阅读canal adapter client源码逻辑。

重要类

  • CanalAdapterLoader 用于加载外部适配器
  • CanalAdapterService 适配器启动业务类,启动和销毁外部适配器、CanalAdapterWorker。
  • CanalAdapterWorker 终于找到干活的类了。
  • CanalClientConfig 里面有同步的配置信息,例如超时时间、同步分批提交大小默认1000
  • Dml DML操作转换对象,包含多条数据
  • Message

CanalAdapterWorker

CanalAdapterWorker,继承了AbstractCanalAdapterWorker(适配器工作线程抽象类)
该类本身只提供了不同模式下,worker的构造方法、销毁方法、执行方法(一直循环、1000ms sleep)。
正真的逻辑在于AbstractCanalAdapterWorker。经过下面对AbstractCanalAdapterWorker研究,
真正的逻辑还是在这一层。尴尬= =
关键在于process()方法逻辑的理解。

process()

首先阻塞等待,1000ms获取一次状态,running=false开始执行以下逻辑:

  1. 获取一些配置,例如重试次数等。
  2. 执行调度逻辑,包括重试次数。

在每次尝试的循环内部:

  1. 通过connect 向mysql获取指定数量的数据。大致原理是利用binlog的position位移。存储在Message变量中。
  2. Message再获取size,这就是里面真实的数据条数。
  3. 执行writeOut(message);应该就是向外写数据,里面干活的是batchSync(dmls, adapter); 最终是调用OuterAdapter.sync(List dmls); 去干活,而这个抽象方法,在ESAdapter中被实现了。看来重点要研究ESAdapter。在研究ESAdapter之前,先调试,确认传入的格式里面具体数据是什么。以便后面调试ESAdapter更清楚外部交互,更清楚找那些方法,就不会零散了。

貌似逻辑完了。但好像少了点什么?
理清canal 中mysql同步es的逻辑需要以下三个步骤:
1.canal的触发机制(比较难,需要理解binlog的设计)
2.canal传递的数据格式,如何解析(中等,需要工作量)
3.如何与es同步(这个相对简单,用transport client)

好像不知道他何时触发?Message获取的是什么格式?这些需要动态调试,来分析确定。

AbstractCanalAdapterWorker

提供了同步方法、向外写数据方法、批处理提交方法(根据dml的size判断聚集大小,也就是说dml里面包含多条数据)。
重点是process()抽象方法,就是执行同步逻辑的入口。因此需要找到es相关process实现方法。结果只找到CanalAdapterWorker实现了该方法。

关于更新配置后保存,同步不执行问题

AbstractCanalAdapterWorker.batchSync()可以看到,第一次进来的时候adapter.sync,是LoggerAdapterExample,第二次进来才是ESAdapter,直接进入(162行)ESAdapter.sync(Dml dml),在这里面获取configMap的时候判断为空,因此没有执行同步逻辑。
所需需要分析dbTableEsSyncConfig变量里面为什么相关配置被移除了。

根据分析dbTableEsSyncConfig变量,是hashmap,里面的key是表名,复杂sql里面有多张表的话,这里面就有多个key,value就是对应的配置文件名,例如my.yml

所以整个新增同步逻辑是,数据库变化后,canal client获取到变化的表,然后获取到了id,然后client通过主键id,执行配置文件中的sql向数据库获取数据,最终通过esmapping和获取的数据,向es新增数据。

ESAdapter

ESAdapter.sync逐步添加配置信息,最终执行ESSyncService.sync方法。
现在可以确定一点,就是所有的判断是否同步的逻辑就在ESAdapter中,上层只做调度,具体逻辑都在最底层。这样上层就抽象出来了,具体干活一定是底层,否则就会出现乱统的现象。一线最了解对应的需求。
一句话,ESAdapter中的sync包含了一切逻辑,包括解析、决定是否要同步数据、同步逻辑。

ESAdapter.sync接收配置和dml数据及描述。然后根据dml获取操作类型,分为三种逻辑:insert、update、delete。
下午重点是理清楚三种操作的具体分析逻辑。

mainTableInsert()

主表(单表)复杂字段insert,同步insert操作进入之后,拼接where语句,Util.sqlRS()去数据库查询对应数据,然后再调用esTemplate.insert()去同步至es中。
70行ESTemplate.insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData)执行向es插入数据

Dml

示例:

Dml{destination=‘example’, database=‘test’, table=‘tb_mo_st_rmo_cam_label_rel’, type=‘UPDATE’, es=1563421830000, ts=1563421831430, sql=’’, data=[{id=12, camera_code=13, label_code=1, create_by=, create_time=2019-07-18 11:50:30.0, modify_by=, modify_time=2019-07-18 11:50:30.0, is_del=null}], old=[{id=11, create_time=2019-07-16 11:19:44.0, modify_time=2019-07-16 11:19:44.0}]}

GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 1 个月前
ff82fd65 1 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐