基于canal client 自实现canal同步es(增量,全量)
canal是阿里开源的一款纯java语言的mysql增量订阅工具,主要是伪装成slave节点,向mysql发送 dump协议官方原理图如下:
官方的解释:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1..n个instance)
个人是这么理解的instance当做一个mq中的topic队列(kafka中的broker),canal client相当于mq中的消费者.instance接收到数据通过parse解析之后经过sink进行数据加工,最后存储到sore中,由canal client来进行消费.
由于业务数据量较大,单表超亿,根据实际业务对数据库进行分库分表,es中存储主键和账号对应关系,访问数据库时先送es中获取主键再查db,这样可以直接打到对应分片上秒回
起初打算采用官方提供的工具实现,后续发现官方的不太符合我们的需求(或许是自己没搞明白官方的工具),并切我们是分库分表的,按照官方的配置太过繁琐.一个表到底层就分了几百张表,所以决定就基于canal client自己实现了同步es,全量和增量.不过踩了不少得吭.
采用es的bulk api批量更新 .设置自定义模板,后续翻阅官方的源码,其实大致也是这么实现的,只不过官方的具有通用性而已.
启动方法
public static void main(String[] args) {
// 注册钩子 用于优雅关机
new KillHandler().registerSignal("TERM");
initCanal();
initES();
start();
}
/**
* 启动
*/
private static void start(){
//创建 ES模板
esTemplate=new EsTemplate(transportClient,MAX_BATCH_SIZE);
//执行业务逻辑
esService=new ESService(esTemplate,connector);
esService.execute();
}
private static class KillHandler implements SignalHandler {
public void registerSignal(String signalName) {
Signal signal = new Signal(signalName);
Signal.handle(signal, this);
}
@Override
public void handle(Signal signal) {
Log.info(".......接收到的信号........."+signal.getName());
if(signal.getName().equals("TERM")){
Main.stop=true;
}
}
}
/**
* 销毁 ES和canal 连接
*/
public static void destory(){
Log.info(".......开始释放资源.........");
try {
//将为提交的任务全部提交
esTemplate.commit();
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (esConfigMonitor != null) {
esConfigMonitor.destroy();
}
if(null!=transportClient){
transportClient.close();
Log.info(".......ES连接关闭.........");
}
if(null !=connector){
connector.disconnect();
Log.info(".......canal连接关闭.........");
}
}
连接canal server代码
private static void initCanal(){
Log.info("........开始初始化 canal.........");
Map<String, String> canalConfig = ESUtil.getCanalProperties();
try {
connector = CanalConnectors.newClusterConnector(canalConfig.get("zookeeper.ip"),
canalConfig.get("canal.instence"),
"",
"");
connector.connect();
//订阅所有库中的所有表 canal-server端配置了黑名单,过滤的不需要的表
connector.subscribe(canalConfig.get("canal.subscribe"));
connector.rollback();
} catch (NumberFormatException e) {
e.printStackTrace();
Log.error(".......canal 初始化失败 ..........."+e.getMessage());
System.exit(0);
}
Log.info("........初始化 canal 完毕.........");
}
public void execute(){
int batchSize = 1000;
while (true) {
if(!Main.stop) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
try {
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
TimeUnit.SECONDS.sleep(5);
} else {
//根据sql语句解析
// sqlParser.sqlParserBySql(message.getEntries());
//根据数据信息解析
sqlParser.sqlParserByBinlog(message.getEntries());
}
connector.ack(batchId); // 提交确认
} catch (Exception e) {
connector.rollback(batchId);
Log.error(e.getMessage() + "........业务执行出现异常......" + message.toString());
}
}else {
Main.destory();
System.exit(0);
}
}
}
解析binlog的代码也很简单,不过自己实现的不具备通用性,勿喷
public void sqlParserByBinlog(List<CanalEntry.Entry> entrys) throws ExecutionException, InterruptedException {
List<Map<String,String>> list=new ArrayList<>();
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
//判断类型
CanalEntry.EventType eventType = rowChage.getEventType();
String tableName = entry.getHeader().getTableName();
if (tableName.contains("history") || tableName.contains("log")
|| tableName.contains("auth") || tableName.contains("historical")
|| tableName.contains("Report") || tableName.contains("report")
|| tableName.contains("info") || tableName.contains("21cn")
|| eventType == CanalEntry.EventType.QUERY) {
continue;
}
// Log.info(tableName+"操作:"+eventType+","+rowChage.getSql());
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//获取更新之后的列数据
List<CanalEntry.Column> afterList = rowData.getAfterColumnsList();
if (eventType == CanalEntry.EventType.INSERT) {
insert(entry.getHeader().getSchemaName(), tableName, list, afterList);
} else if (eventType == CanalEntry.EventType.UPDATE) {
update(entry.getHeader().getSchemaName(), tableName, list, afterList);
} else {
delete(entry.getHeader().getSchemaName(), tableName, list, afterList);
}
}
}
if(list.size()>0) {
//批量执行
esTemplate.bulkIndex(list);
}
}
将数据进行组装批量提交
public void bulkIndex(List<Map<String, String>> list) throws ExecutionException, InterruptedException {
for (int i=0;i<list.size();i++){
Map<String, String> map = list.get(i);
String tableName = map.get(ESCONSTANT.TABLENAME);
map.remove(ESCONSTANT.TABLENAME);
if(map.containsKey(ESCONSTANT.DELETE)){
map.remove(ESCONSTANT.DELETE);
//执行删除操作 即注销 或者解绑
getBulk().add(transportClient.prepareDelete(
tableName,
ESCONSTANT.INDEXTYPE,
map.containsKey(ESCONSTANT.ID)?map.get(ESCONSTANT.ID):map.get(ESCONSTANT.UNIFIEDID)));
}else if(map.containsKey(ESCONSTANT.UPDATE)){
map.remove(ESCONSTANT.UPDATE);
//执行更新操作 即改号
getBulk().add(transportClient.prepareUpdate(
tableName,
ESCONSTANT.INDEXTYPE,
map.containsKey(ESCONSTANT.ID)?map.get(ESCONSTANT.ID):map.get(ESCONSTANT.UNIFIEDID))
.setDoc(map,XContentType.JSON)
);
}
else {//默认新增操作
//执行新增操作 即新增
getBulk().add(transportClient.prepareIndex(tableName, ESCONSTANT.INDEXTYPE)
.setSource(
map,XContentType.JSON
).setId(map.containsKey(ESCONSTANT.ID)?map.get(ESCONSTANT.ID):map.get(ESCONSTANT.UNIFIEDID)));
}
logger.debug("...."+tableName+"........");
}
commitBulk();
}
private void commitBulk(){
if(getBulk().numberOfActions()>=MAX_BATCH_SIZE){
commit();
}
}
public void commit(){
if(getBulk().numberOfActions()>0){
BulkResponse response = getBulk().execute().actionGet();
if (response.hasFailures()) {
for (BulkItemResponse itemResponse : response.getItems()) {
if (!itemResponse.isFailed()) {
continue;
}
if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
logger.error(itemResponse.getFailureMessage());
} else {
throw new RuntimeException("ES sync commit error" + itemResponse.getFailureMessage());
}
}
}
resetBulkRequestBuilder();
}
}
就这个不困扰了.es和msyql总是差那么10-100条不等,后续找到的原因,由于设置批量是1000,而数据库更新操作tps较低,所以造成两者之间数据延时.将批量数逐个降低测试最后调整到50批次,es和mysql数据时延降低到毫级了.
全量同步的较为简单.个人是这样实现的.首先将mysql中20亿数据量导出到文本文件,我们只需要主键和账号两个子弹即可,并且是分库分表的,导出非常快.1亿的数据量30秒以内全部导出.然后通过缓冲流一行一行读取文件,调用bulk api批量入es,es 5个数据节5分片一副本,单节点40核物理机 内存512G jvm设置31G raid0 sas盘,1亿数据量基本5分钟以内就可导入es中,目前线上运行2月没发现异常.
虽然现在开源项目众多,很多东西都兼容大部分业务,条件允许的情况下个人还是建议自己根据实际业务再造轮子.希望可以帮到新接触canal的朋友们
更多推荐
所有评论(0)