mysql同步至es,canal源码调试【临时笔记】
启动工程
拉取源码
配置并启动服务端
配置depolyer模块下example.instance.properties中的数据库,然后启动CanalLauncher,服务端启动完成,并监听了刚配置的数据库的binlog。
配置客户端
创建简单表
CREATE TABLE `dailylog` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`content` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=143 DEFAULT CHARSET=utf8;
client-adapter模块中launcher模块,配置application.yml中srcDataSources的数据源,以及es相关配置。
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es
hosts: 127.0.0.1:9300
properties:
cluster.name: elasticsearch
找到aclient-adapter模块elasticsearch模块中resource文件夹下的es文件夹,里面有各种配置示例。
配置一个my.yml,具体如下:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: cetc_moniwa_es_sta_rmo_vim_camera
_type: _doc
_id: _id
upsert: true
sql: "select a.id _id, a.id, a.name, concat(IFNULL(latitude_84,0), ',', IFNULL(longitude_84,0)) as location, a.camera_code, a.type, a.state, a.group_code from sta_rmo_vim_camera a LEFT JOIN sta_rmo_vim_video_label_relation b on a.id = b.video_id LEFT JOIN sta_rmo_vim_label c on b.label_id = c.label_id"
etlCondition:
commitBatch: 3000
#objFields:
# labels: array:;
idea运行client-adapter中的launcher中的CanalAdapterApplication,然后将刚刚配置的文件拷贝至canal\client-adapter\launcher\target\classes\es
下,然后重启client,便可以开始愉快地调试了。
重要类
第一感觉如下:
- ESAdapter 重要入口
- ESConfigMonitor 监听配置文件变化
- ESSyncService 同步ES的服务
- LoggerAdapterExample 日志打印
- ESTemplate 生成ES语句
- Dml 存放数据,用于传递
- BulkRequestBuilder 对应es bulk类型 是es client里面的org.elasticsearch.action.bulk
新增一条数据后,ESSyncService首先收到新增信息,然后
123行insert 函数中判断类型,实例如下:
if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
// ------单表 & 所有字段都为简单字段------
singleTableSimpleFiledInsert(config, dml, data);
} else {
// ------是主表 查询sql来插入------
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
mainTableInsert(config, dml, data);
}
最终执行insert操作的是70行insert函数
说明与ES交互,还是用client好一些,用http的rest接口,不好控制。
重点在于ESSyncService类,看如何解析。
ESEtlService相当于线程调度以及拼装工作。
ESSyncService中用到DataSource,应该根据不同逻辑查了数据库
ESSyncService
首先看使用了DataSource的函数
455行:private void mainTableInsert(ESSyncConfig config, Dml dml, Map<String, Object> data)
主表(单表)复杂字段insert的主要逻辑为:
- 使用Util.sqlRS()方法,根据数据源执行sql,获取resultset,再遍历数据
- 每一行数据通过ESTemplate.getESDataFromRS(),将一行数据转换为es的数据格式。
- 然后执行ESTemplate.insert()方法,插入数据。
配置更新
当es文件夹下的配置更新时,首先ESConfigMonitor会监测到变化,然后执行
SqlParser.pase()->…->SchemaItem.getRelationTableFields()
经常报错Relation condition column must in select columns
就是来自这里,他会检测,
用于关联的字段是否被最上层的select选择了,没有则报错。
存在的问题
目前canal不支持将 多对多表 从 mysql 同步至es。canal server本身只是提供解析binlog的功能。
主要sql解析和处理逻辑在client,因此需要阅读canal adapter client源码逻辑。
重要类
- CanalAdapterLoader 用于加载外部适配器
- CanalAdapterService 适配器启动业务类,启动和销毁外部适配器、CanalAdapterWorker。
- CanalAdapterWorker 终于找到干活的类了。
- CanalClientConfig 里面有同步的配置信息,例如超时时间、同步分批提交大小默认1000
- Dml DML操作转换对象,包含多条数据
- Message
CanalAdapterWorker
CanalAdapterWorker,继承了AbstractCanalAdapterWorker(适配器工作线程抽象类)
该类本身只提供了不同模式下,worker的构造方法、销毁方法、执行方法(一直循环、1000ms sleep)。
正真的逻辑在于AbstractCanalAdapterWorker。经过下面对AbstractCanalAdapterWorker研究,
真正的逻辑还是在这一层。尴尬= =
关键在于process()方法逻辑的理解。
process()
首先阻塞等待,1000ms获取一次状态,running=false开始执行以下逻辑:
- 获取一些配置,例如重试次数等。
- 执行调度逻辑,包括重试次数。
在每次尝试的循环内部:
- 通过connect 向mysql获取指定数量的数据。大致原理是利用binlog的position位移。存储在Message变量中。
- Message再获取size,这就是里面真实的数据条数。
- 执行writeOut(message);应该就是向外写数据,里面干活的是batchSync(dmls, adapter); 最终是调用OuterAdapter.sync(List dmls); 去干活,而这个抽象方法,在ESAdapter中被实现了。看来重点要研究ESAdapter。在研究ESAdapter之前,先调试,确认传入的格式里面具体数据是什么。以便后面调试ESAdapter更清楚外部交互,更清楚找那些方法,就不会零散了。
貌似逻辑完了。但好像少了点什么?
理清canal 中mysql同步es的逻辑需要以下三个步骤:
1.canal的触发机制(比较难,需要理解binlog的设计)
2.canal传递的数据格式,如何解析(中等,需要工作量)
3.如何与es同步(这个相对简单,用transport client)
好像不知道他何时触发?Message获取的是什么格式?这些需要动态调试,来分析确定。
AbstractCanalAdapterWorker
提供了同步方法、向外写数据方法、批处理提交方法(根据dml的size判断聚集大小,也就是说dml里面包含多条数据)。
重点是process()抽象方法,就是执行同步逻辑的入口。因此需要找到es相关process实现方法。结果只找到CanalAdapterWorker实现了该方法。
关于更新配置后保存,同步不执行问题
AbstractCanalAdapterWorker.batchSync()可以看到,第一次进来的时候adapter.sync,是LoggerAdapterExample,第二次进来才是ESAdapter,直接进入(162行)ESAdapter.sync(Dml dml),在这里面获取configMap的时候判断为空,因此没有执行同步逻辑。
所需需要分析dbTableEsSyncConfig变量里面为什么相关配置被移除了。
根据分析dbTableEsSyncConfig变量,是hashmap,里面的key是表名,复杂sql里面有多张表的话,这里面就有多个key,value就是对应的配置文件名,例如my.yml
所以整个新增同步逻辑是,数据库变化后,canal client获取到变化的表,然后获取到了id,然后client通过主键id,执行配置文件中的sql向数据库获取数据,最终通过esmapping和获取的数据,向es新增数据。
ESAdapter
ESAdapter.sync逐步添加配置信息,最终执行ESSyncService.sync方法。
现在可以确定一点,就是所有的判断是否同步的逻辑就在ESAdapter中,上层只做调度,具体逻辑都在最底层。这样上层就抽象出来了,具体干活一定是底层,否则就会出现乱统的现象。一线最了解对应的需求。
一句话,ESAdapter中的sync包含了一切逻辑,包括解析、决定是否要同步数据、同步逻辑。
ESAdapter.sync接收配置和dml数据及描述。然后根据dml获取操作类型,分为三种逻辑:insert、update、delete。
下午重点是理清楚三种操作的具体分析逻辑。
mainTableInsert()
主表(单表)复杂字段insert,同步insert操作进入之后,拼接where语句,Util.sqlRS()去数据库查询对应数据,然后再调用esTemplate.insert()去同步至es中。
70行ESTemplate.insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData)执行向es插入数据
Dml
示例:
Dml{destination=‘example’, database=‘test’, table=‘tb_mo_st_rmo_cam_label_rel’, type=‘UPDATE’, es=1563421830000, ts=1563421831430, sql=’’, data=[{id=12, camera_code=13, label_code=1, create_by=, create_time=2019-07-18 11:50:30.0, modify_by=, modify_time=2019-07-18 11:50:30.0, is_del=null}], old=[{id=11, create_time=2019-07-16 11:19:44.0, modify_time=2019-07-16 11:19:44.0}]}
更多推荐
所有评论(0)