基于canal根据配置实现数据库数据的同步
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
可以通过配置对源数据库的指定表同步指定字段,无需修改代码
具体配置如下,可通过字段映射关系,自动生成对应sql执行,
canal:
server: 172.16.4.62:11111
# canal instance名字
destination: example
server-mode: tcp
# 消费的时间间隔(s)
timeout: 2
# canal 的用户名
username:
# canal 的密码
password:
sync:
# canal监听的数据库名
- database: test
tableInfo:
# canal监听的表名
- tableName: t_user
# 本服务数据库同步的表名
targetTableName: t_em_user
# 字段映射,key:value多个逗号隔开 key为监控数据库表的字段,value为从库对应表的字段 *代表同步所有字段并且两个库表字段相同
fieldMapping: "*"
# id映射,key:value多个逗号隔开 key为监控数据库表的字段,value为从库对应表的字段,一样可以只写一个
id: USER_CODE
# - tableName: t_org
# targetTableName: t_em_org
# fieldMapping: "*"
# id: ORG_ID
- tableName: t_org
targetTableName: my_org
fieldMapping: ORG_ID:id,ORG_NAME:name
id: ORG_ID:id
部署方式
1.下载canal 执行脚本解压 链接:https://pan.baidu.com/s/1yOPrihHwG9MfkiuiBkOD2Q?pwd=e171
mkdir /opt/canal
cd /opt/canal
tar zxvf canal.deployer-1.1.6.tar.gz
2.修改配置文件
vi conf/example/instance.properties
- 基本连接配置修改如下
canal.instance.master.address=192.168.4.31:3306 #数据库地址
canal.instance.dbUsername=root #数据库账号
canal.instance.dbPassword=1234 #数据库密码
canal.instance.defaultDatabaseName = test #数据库
canal.instance.connectionCharset = UTF-8 #数据库编码
监听指定的数据库和表
#canal.instance.filter.regex=.*\\..* (默认为所有的库所有的表)
# 只监听icpdb_dev数据库的这两个表
canal.instance.filter.regex=test.t_user,test.t_org
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal\\.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
检查mysql是否开启了bin_log日志
SHOW VARIABLES LIKE '%log_bin%'
没有开启则修改mysql配置
vi /etc/my.cnf
添加如下
log-bin=mysql-bin
binlog-format=ROW
重启mysql
service mysqld restart;
进入canal的bin文件夹,启动,默认端口11111
./bin/startup.sh
java部署canal客户端
核心pom
<dependency>
<groupId>io.github.xizixuejie</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
添加BaseCanalListener统一消费binLog日志
import com.google.common.collect.Maps;
import com.netinfo.cannel.mapper.BaseMapper;
import io.xzxj.canal.core.listener.EntryListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang.ObjectUtils;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author yangguang
* @date 2023年04月24日 14:57
*/
@Data
@Slf4j
public class BaseCanalListener implements EntryListener<HashMap> {
private BaseMapper baseMapper;
/**
* 数据库名字
*/
private String schemaName;
/**
* 表名字
*/
private String tableName;
private String targetTableName;
private String fieldMapping;
private String id;
private final String insertTemplate = "insert into {0} ({1}) values ({2})";
private final String updateTemplate = "update {0} set {1} where {2}";
private final String deleteTemplate = "delete from {0} where {1}";
public void insert(HashMap t) {
log.info("插入获取的数据\n"+t);
String sql = getInsertSql(t);
baseMapper.insert(sql);
}
public void update(HashMap before, HashMap after) {
log.info("更新之前的数据\n"+before);
log.info("更新之后的数据\n"+after);
String sql = getUpdateSql(before,after);
if(sql!=null)
baseMapper.update(sql);
}
public void delete(HashMap t) {
log.info("删除的数据\n"+t);
String sql = getDeleteSql(t);
baseMapper.delete(sql);
}
private String getDeleteSql(HashMap t) {
//"delete from {0} where {2}";
String[] params = new String[2];
params[0] = targetTableName;
params[1] = getWhereSql(t);
return MessageFormat.format(deleteTemplate,params);
}
private String getUpdateSql(HashMap<String,String> before, HashMap after) {
if(fieldMapping==null){
fieldMapping = "*";
}
Map<String,String> updateFieldMap = Maps.newHashMap();
Set<String> updateOriginField = before.keySet();
if(!"*".equals(fieldMapping)){
for (String kv : fieldMapping.split(",")) {
String[] kvArr = kv.split(":");
if(updateOriginField.contains(kvArr[0])){
updateFieldMap.put(kvArr[1],toString(after.get(kvArr[0])));
}
}
}else{
for (Map.Entry<String, String> entry : before.entrySet()) {
updateFieldMap.put(entry.getKey(),toString(after.get(entry.getKey())));
}
}
if(updateFieldMap.isEmpty()){
return null;
}
//updateTemplate = "update {0} set {1} where {2}";
String[] params = new String[3];
params[0] = targetTableName;
StringBuilder sb = new StringBuilder();
updateFieldMap.entrySet().forEach(entry->{
sb.append(entry.getKey());
sb.append("=");
sb.append(entry.getValue());
sb.append(",");
});
params[1] = sb.toString().substring(0,sb.length()-1);
params[2] =getWhereSql(after);
return MessageFormat.format(updateTemplate,params);
}
private String getWhereSql(HashMap<String,String> data) {
StringBuilder sb = new StringBuilder();
String originId,currentId;
for (String idMapper : id.split(",")) {
if(idMapper.contains(":")){
String[] split = idMapper.split(":");
originId=split[0];
currentId=split[1];
}else{
originId=idMapper;
currentId=idMapper;
}
sb.append(currentId+"="+data.get(originId));
sb.append(" and ");
}
return sb.toString().substring(0,sb.length()-5);
}
private String getInsertSql(HashMap t) {
//首先查询插入的字段
if(fieldMapping==null){
fieldMapping = "*";
}
List<String> originFieldList = Lists.newArrayList();
List<String> newFieldList = Lists.newArrayList();
List<String> newFieldValueList = Lists.newArrayList();
if(!fieldMapping.equals("*")){
//指定字段
for (String kv : fieldMapping.split(",")) {
String[] kvArr = kv.split(":");
originFieldList.add(kvArr[0]);
newFieldList.add(kvArr[1]);
newFieldValueList.add(toString(t.get(kvArr[0])));
}
}else{
for (Object o : t.keySet()) {
originFieldList.add((String)o);
newFieldList.add((String)o);
newFieldValueList.add(toString(t.get(o)));
}
}
String[] params = new String[3];
params[0] = targetTableName;
params[1] = String.join(",",newFieldList);
params[2] = String.join(",",newFieldValueList);
return MessageFormat.format(insertTemplate,params);
}
private String toString(Object obj){
if(obj == null || obj.toString().length()==0){
return null;
}
return "\""+ObjectUtils.toString(obj)+"\"";
}
}
添加Mapper操作数据库
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
/**
* @author yangguang
* @date 2023年04月25日 9:48
*/
@Repository
public interface BaseMapper {
@Insert("${sql}")
void insert(String sql);
@Update("${sql}")
void update(String sql);
@Delete("${sql}")
void delete(String sql);
}
添加CanalListenerBeanPostProcessor 读取配置文件,生成BaseCanalListener类
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.*;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* @author yangguang
* @date 2023年04月24日 15:11
*/
@Component
@Slf4j
public class CanalListenerBeanPostProcessor implements BeanPostProcessor, BeanDefinitionRegistryPostProcessor, EnvironmentAware {
private Environment environment;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
int dbNum=0;
while (true){
String database = getConfigByKey("canal.sync["+dbNum+"].database");
if(StringUtils.isEmpty(database)){
break;
}
int tableNum=0;
while (true){
String tableName = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].tableName");
String targetTableName = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].targetTableName");
String fieldMapping = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].fieldMapping");
String id = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].id");
if(StringUtils.isEmpty(tableName)){
break;
}
String beanName = database+tableName;
//一个数据库一个表一个监听类
GenericBeanDefinition beandefinition=new GenericBeanDefinition();
beandefinition.setBeanClassName("com.netinfo.cannel.config.BaseCanalListener");
beandefinition.getPropertyValues().add("schemaName",database);
beandefinition.getPropertyValues().add("tableName",tableName);
beandefinition.getPropertyValues().add("targetTableName",targetTableName);
beandefinition.getPropertyValues().add("fieldMapping",fieldMapping);
beandefinition.getPropertyValues().add("id",id);
beandefinition.getPropertyValues().add("baseMapper",new RuntimeBeanReference("baseMapper"));
registry.registerBeanDefinition(beanName,beandefinition);
tableNum++;
}
dbNum++;
}
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
private String getConfigByKey(String key) {
try{
return environment.getProperty(key);
}catch (Exception e){
return null;
}
}
}
到此客户端搭建完成,但是canal-spring-boot-starter 西子小姐姐封装的不支持Map实体,并且没有重连机制,所以需要修改源码进行覆盖一下
io.xzxj.canal.spring.autoconfigure.TcpClientAutoConfiguration 类
package io.xzxj.canal.spring.autoconfigure;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.client.TcpCanalClient;
import io.xzxj.canal.core.factory.EntryColumnConvertFactory;
import io.xzxj.canal.core.handler.IMessageHandler;
import io.xzxj.canal.core.handler.RowDataHandler;
import io.xzxj.canal.core.handler.impl.AsyncMessageHandlerImpl;
import io.xzxj.canal.core.handler.impl.RowDataHandlerImpl;
import io.xzxj.canal.core.handler.impl.SyncMessageHandlerImpl;
import io.xzxj.canal.core.listener.EntryListener;
import io.xzxj.canal.spring.properties.CanalProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* @author xzxj
* @date 2023/3/11 11:37
*/
@Configuration
@EnableConfigurationProperties(CanalProperties.class)
@ConditionalOnProperty(value = "canal.server-mode", havingValue = "tcp")
@Import(value = {TcpCanalClient.class,ThreadPoolAutoConfiguration.class})
public class TcpClientAutoConfiguration {
private final CanalProperties canalProperties;
public TcpClientAutoConfiguration(CanalProperties canalProperties) {
this.canalProperties = canalProperties;
}
@Bean
public RowDataHandler<CanalEntry.RowData> rowDataHandler() {
return new RowDataHandlerImpl(new EntryColumnConvertFactory());
}
@Bean
@ConditionalOnProperty(value = "canal.async", havingValue = "true", matchIfMissing = true)
public IMessageHandler<Message> asyncMessageHandler(List<EntryListener<?>> entryListenerList,
RowDataHandler<CanalEntry.RowData> rowDataHandler,
ExecutorService executorService) {
return new AsyncMessageHandlerImpl(entryListenerList, rowDataHandler, executorService);
}
@Bean
@ConditionalOnProperty(value = "canal.async", havingValue = "false")
public IMessageHandler<Message> syncMessageHandler(List<EntryListener<?>> entryListenerList,
RowDataHandler<CanalEntry.RowData> rowDataHandler) {
return new SyncMessageHandlerImpl(entryListenerList, rowDataHandler);
}
}
io.xzxj.canal.core.client.AbstractCanalClient
package io.xzxj.canal.core.client;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import io.xzxj.canal.core.handler.IMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* @author xzxj
* @date 2023/3/11 10:33
*/
public abstract class AbstractCanalClient implements ICanalClient {
private static final Logger log = LoggerFactory.getLogger(AbstractCanalClient.class);
protected volatile boolean runStatus;
private Thread thread;
protected CanalConnector connector;
protected IMessageHandler messageHandler;
protected String filter = "";
protected Integer batchSize = 1;
protected Long timeout = 1L;
protected TimeUnit unit = TimeUnit.SECONDS;
@Override
public void init() {
startConnect();
}
private void startConnect(){
log.info("canal client init");
this.connectCanal();
thread = new Thread(this::handleListening);
thread.setName("canal-client-thread");
runStatus = true;
thread.start();
}
private void connectCanal() {
try {
log.info("canal client connecting");
connector.connect();
this.subscribe();
log.info("canal client connect success");
} catch (CanalClientException e) {
log.error("canal client connect error: {}", e.getMessage(), e);
this.destroy();
}
}
public void subscribe() {
connector.subscribe(filter);
}
@Override
public void destroy() {
try {
log.info("canal client destroy");
connector.unsubscribe();
}catch (Exception e){
log.error("canal client connect exception");
}
if (thread != null) {
thread.interrupt();
}
runStatus = false;
}
private void sleep(int num){
try {
Thread.sleep(num);
}catch (Exception e){
}
}
}
io.xzxj.canal.core.client.TcpCanalClient
package io.xzxj.canal.core.client;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.handler.IMessageHandler;
import io.xzxj.canal.spring.properties.CanalProperties;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
/**
* @author xzxj
* @date 2023/3/11 10:51
*/
public class TcpCanalClient extends AbstractCanalClient {
private static final Logger log = LoggerFactory.getLogger(TcpCanalClient.class);
@Autowired
private CanalProperties canalProperties;
@Autowired
private IMessageHandler messageHandler;
private static TcpCanalClient runClient ;
@PostConstruct
public void initClient(){
String server = canalProperties.getServer();
String[] array = server.split(":");
TcpCanalClient.builder()
.hostname(array[0])
.port(Integer.parseInt(array[1]))
.destination(canalProperties.getDestination())
.username(canalProperties.getUsername())
.password(canalProperties.getPassword())
.messageHandler(messageHandler)
.batchSize(canalProperties.getBatchSize())
.filter(canalProperties.getFilter())
.timeout(canalProperties.getTimeout())
.unit(canalProperties.getUnit())
.build().init();
runClient = this;
}
@Override
public void handleListening() {
try {
while (runStatus) {
Message message = connector.getWithoutAck(batchSize, timeout, unit);
log.debug("receive message={}", message);
long batchId = message.getId();
if (message.getId() != -1 && message.getEntries().size() != 0) {
messageHandler.handleMessage(message);
}
connector.ack(batchId);
}
} catch (Exception e) {
log.error("canal client exception", e);
runClient.reConnect();
}
}
private void reConnect() {
log.error("canal服务端断开连接,10s后准备重连");
try {
Thread.sleep(10000);
}catch (Exception e){
}
initClient();
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private String filter = StringUtils.EMPTY;
private Integer batchSize = 1;
private Long timeout = 1L;
private TimeUnit unit = TimeUnit.SECONDS;
private String hostname;
private Integer port;
private String destination;
private String username;
private String password;
private IMessageHandler<?> messageHandler;
private Builder() {
}
public Builder hostname(String hostname) {
this.hostname = hostname;
return this;
}
public Builder port(Integer port) {
this.port = port;
return this;
}
public Builder destination(String destination) {
this.destination = destination;
return this;
}
public Builder username(String username) {
this.username = username;
return this;
}
public Builder password(String password) {
this.password = password;
return this;
}
public Builder filter(String filter) {
this.filter = filter;
return this;
}
public Builder batchSize(Integer batchSize) {
this.batchSize = batchSize;
return this;
}
public Builder timeout(Long timeout) {
this.timeout = timeout;
return this;
}
public Builder unit(TimeUnit unit) {
this.unit = unit;
return this;
}
public Builder messageHandler(IMessageHandler<?> messageHandler) {
this.messageHandler = messageHandler;
return this;
}
public TcpCanalClient build() {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password);
TcpCanalClient tcpCanalClient = new TcpCanalClient();
tcpCanalClient.connector = canalConnector;
tcpCanalClient.messageHandler = messageHandler;
tcpCanalClient.filter = this.filter;
tcpCanalClient.unit = this.unit;
tcpCanalClient.batchSize = this.batchSize;
tcpCanalClient.timeout = this.timeout;
return tcpCanalClient;
}
}
}
io.xzxj.canal.core.factory.EntryColumnConvertFactory
package io.xzxj.canal.core.factory;
import com.alibaba.otter.canal.protocol.CanalEntry;
import io.xzxj.canal.core.listener.EntryListener;
import io.xzxj.canal.core.util.TableFieldUtil;
import io.xzxj.canal.core.util.TableInfoUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author xzxj
* @date 2023/3/12 12:10
*/
public class EntryColumnConvertFactory extends AbstractConvertFactory<List<CanalEntry.Column>> {
@Override
<R> R newInstance(Class<R> clazz, List<CanalEntry.Column> columnList) throws InstantiationException, IllegalAccessException, NoSuchFieldException {
R object = clazz.newInstance();
if(object instanceof HashMap){
for (CanalEntry.Column column : columnList) {
((Map)object).put(column.getName().toUpperCase(),column.getValue());
}
return object;
}
Map<String, String> fieldMap = TableFieldUtil.getFieldMap(object.getClass());
for (CanalEntry.Column column : columnList) {
String fieldName = fieldMap.get(column.getName());
if (StringUtils.isNotEmpty(fieldName)) {
TableFieldUtil.setFieldValue(object, fieldName, column.getValue());
}
}
return object;
}
@Override
public <R> R newInstance(EntryListener<?> entryHandler, List<CanalEntry.Column> columnList, Set<String> updateColumn) throws InstantiationException, IllegalAccessException, NoSuchFieldException {
Class<R> tableClass = TableInfoUtil.getTableClass(entryHandler);
if (tableClass == null) {
return null;
}
R r = tableClass.newInstance();
if(r instanceof HashMap){
for (CanalEntry.Column column : columnList) {
if (!updateColumn.contains(column.getName())) {
continue;
}
((Map)r).put(column.getName().toUpperCase(),column.getValue());
}
return r;
}
Map<String, String> columnNames = TableFieldUtil.getFieldMap(r.getClass());
for (CanalEntry.Column column : columnList) {
if (!updateColumn.contains(column.getName())) {
continue;
}
String fieldName = columnNames.get(column.getName());
if (StringUtils.isNotEmpty(fieldName)) {
TableFieldUtil.setFieldValue(r, fieldName, column.getValue());
}
}
return r;
}
}
io.xzxj.canal.core.util.TableInfoUtil
package io.xzxj.canal.core.util;
import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.CaseFormat;
import com.netinfo.cannel.config.BaseCanalListener;
import io.xzxj.canal.core.annotation.CanalListener;
import io.xzxj.canal.core.listener.EntryListener;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
import javax.persistence.Table;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author xzxj
* @date 2023/3/11 16:19
*/
public class TableInfoUtil {
private static Map<Class<? extends EntryListener>, Class>
CLASS_LISTENER_CACHE_MAP = new ConcurrentHashMap<>();
@Nullable
public static String getTableName(EntryListener<?> entryListener) {
if(entryListener instanceof BaseCanalListener){
BaseCanalListener listener = (BaseCanalListener)entryListener;
return listener.getSchemaName()+"."+listener.getTableName();
}
CanalListener annotation = entryListener.getClass().getAnnotation(CanalListener.class);
if (annotation == null) {
return null;
}
StringBuilder fullName = new StringBuilder();
if (StringUtils.isNotBlank(annotation.schemaName())) {
fullName.append(annotation.schemaName()).append(".");
}
if (StringUtils.isNotBlank(annotation.tableName())) {
fullName.append(annotation.tableName());
}else {
String tableName = findTableName(entryListener);
fullName.append(tableName);
}
return fullName.toString();
}
@Nullable
private static String findTableName(EntryListener<?> entryListener) {
Class<Object> tableClass = getTableClass(entryListener);
if (tableClass == null) {
return null;
}
TableName tableName = tableClass.getAnnotation(TableName.class);
if (tableName != null && StringUtils.isNotBlank(tableName.value())) {
return tableName.value();
}
Table table = tableClass.getAnnotation(Table.class);
if (table != null && StringUtils.isNotBlank(table.name())) {
return table.name();
}
return CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, tableClass.getName());
}
/**
* 找到EntryListener泛型中的数据库实体类
*
* @param object
* @param <T>
* @return
*/
@Nullable
public static <T> Class<T> getTableClass(EntryListener<?> object) {
Class<? extends EntryListener> listenerClass = object.getClass();
Class<T> tableClass = CLASS_LISTENER_CACHE_MAP.get(listenerClass);
if (tableClass != null) {
return tableClass;
}
Type[] interfacesTypes = listenerClass.getGenericInterfaces();
for (Type type : interfacesTypes) {
Class<?> c = (Class<?>) ((ParameterizedType) type).getRawType();
if (c.equals(EntryListener.class)) {
tableClass = (Class<T>) ((ParameterizedType) type).getActualTypeArguments()[0];
CLASS_LISTENER_CACHE_MAP.putIfAbsent(listenerClass, tableClass);
return tableClass;
}
}
return null;
}
}
覆盖完后,就可以根据项目配置,自动进行表的同步了
GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 2 个月前
ff82fd65
2 个月前
更多推荐
已为社区贡献5条内容
所有评论(0)