使用canal同步mysql数据
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
1.canal相当于mysql的一个连接的从库,canal监听指定库和指定表的数据变更,从而发出默认的广播模式mq,
2.然后业务端监听mq的交换机,去用队列去绑定交换机,来监听收到消息,来同步到es库上
一、服务器部分
1. admin:canal的客户端控制台 2.deployer 是canal的服务器 http://172.16.8.35:8089/#/canalServer/canalInstances
2.服务端配置文件
canal.properties
/home/devops/canal/deployer/conf/dst_vehicle/instance.properties
二、然后通过java代码mq监听去处理对应表数据的事务操作增删改
package com.lx.utils.essync.mq;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.utils.essync.canal.CanalEntity;
import com.lx.utils.essync.constant.CanalConstant;
import com.lx.utils.essync.constant.MqConstant;
import com.lx.utils.essync.handler.SyncHandler;
import com.lx.utils.essync.handler.SyncHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
/**
* canal同步数据消费者
*/
@Component
@Slf4j
public class CanalMessageConsumer {
@Autowired
private SyncHandlerFactory handlerFactory;
private static final String LOG_PREFIX = "canal数据同步";
@RabbitListener(queues = MqConstant.CANAL_VEHICLE_QUEUE + "-${spring.profiles.active}")
public void processMessage(Message msg) {
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.debug("收到{}消息:{}", LOG_PREFIX, message);
try {
final CanalEntity canalEntity = JsonUtil.json2Bean(message, CanalEntity.class);
if (Boolean.TRUE.equals(canalEntity.getIsDdl())) {
log.info("{}消息接受到DDL,忽略", LOG_PREFIX);
return;
}
SyncHandler handler = handlerFactory.getSyncHandler(canalEntity.getTable());
if (Objects.isNull(handler)) {
log.info("{}未匹配到业务处理类[{}]", LOG_PREFIX, message);
return;
}
switch (canalEntity.getType()) {
case CanalConstant.TYPE_INSERT:
handler.handlerInsert(canalEntity);
break;
case CanalConstant.TYPE_DELETE:
handler.handlerDelete(canalEntity);
break;
case CanalConstant.TYPE_UPDATE:
handler.handlerUpdate(canalEntity);
break;
}
// 广播车辆变更的信息
handler.fanoutChangeMessage(canalEntity);
} catch (Exception e) {
log.error("{}消息消费异常{}", LOG_PREFIX, e);
}
}
}
1.使用策略模式处理对应的表数据变更业务
package com.lx.utils.essync.handler;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@Component
public class SyncHandlerFactory {
private final Map<String, SyncHandler> syncHandlerMap;
public SyncHandlerFactory(List<SyncHandler> syncHandlers) {
this.syncHandlerMap = syncHandlers.stream()
.filter(it -> StringUtils.isNotBlank(it.getTableName()))
.collect(Collectors.toMap(SyncHandler::getTableName, Function.identity()));
}
public SyncHandler getSyncHandler(String tableName) {
return syncHandlerMap.get(tableName);
}
}
GitHub 加速计划 / ca / canal
27
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:5 个月前 )
02fffc9e - 1 天前
5ca4c393 - 1 天前
更多推荐
已为社区贡献1条内容
所有评论(0)