可以通过配置对源数据库的指定表同步指定字段,无需修改代码

具体配置如下,可通过字段映射关系,自动生成对应sql执行,

canal:
  server: 172.16.4.62:11111
  # canal instance名字
  destination: example
  server-mode: tcp
  # 消费的时间间隔(s)
  timeout: 2
  # canal 的用户名
  username:
  # canal 的密码
  password:
  sync:
    # canal监听的数据库名
    - database: test
      tableInfo:
          # canal监听的表名
        - tableName: t_user
          # 本服务数据库同步的表名
          targetTableName: t_em_user
          # 字段映射,key:value多个逗号隔开   key为监控数据库表的字段,value为从库对应表的字段   *代表同步所有字段并且两个库表字段相同
          fieldMapping: "*"
          # id映射,key:value多个逗号隔开   key为监控数据库表的字段,value为从库对应表的字段,一样可以只写一个
          id: USER_CODE
#        - tableName: t_org
#          targetTableName: t_em_org
#          fieldMapping: "*"
#          id: ORG_ID
        - tableName: t_org
          targetTableName: my_org
          fieldMapping: ORG_ID:id,ORG_NAME:name
          id: ORG_ID:id

部署方式

1.下载canal  执行脚本解压
链接:https://pan.baidu.com/s/1yOPrihHwG9MfkiuiBkOD2Q?pwd=e171
mkdir /opt/canal
cd /opt/canal
tar zxvf canal.deployer-1.1.6.tar.gz
2.修改配置文件
vi conf/example/instance.properties
- 基本连接配置修改如下
canal.instance.master.address=192.168.4.31:3306  #数据库地址
canal.instance.dbUsername=root #数据库账号
canal.instance.dbPassword=1234 #数据库密码
canal.instance.defaultDatabaseName = test  #数据库
canal.instance.connectionCharset = UTF-8 #数据库编码

 监听指定的数据库和表 

#canal.instance.filter.regex=.*\\..*  (默认为所有的库所有的表)
# 只监听icpdb_dev数据库的这两个表
canal.instance.filter.regex=test.t_user,test.t_org


mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal\\.test1

5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

检查mysql是否开启了bin_log日志

SHOW VARIABLES LIKE '%log_bin%'

没有开启则修改mysql配置 

vi /etc/my.cnf

添加如下

log-bin=mysql-bin
binlog-format=ROW 

重启mysql

service mysqld restart;
进入canal的bin文件夹,启动,默认端口11111
./bin/startup.sh

java部署canal客户端

核心pom


        <dependency>
            <groupId>io.github.xizixuejie</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>0.0.2</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    

        <!-- Mysql Connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

添加BaseCanalListener统一消费binLog日志



import com.google.common.collect.Maps;
import com.netinfo.cannel.mapper.BaseMapper;
import io.xzxj.canal.core.listener.EntryListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang.ObjectUtils;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Autowired;

import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * @author yangguang
 * @date 2023年04月24日 14:57
 */
@Data
@Slf4j
public class BaseCanalListener implements EntryListener<HashMap> {

    private BaseMapper baseMapper;
    /**
     * 数据库名字
     */
    private String schemaName;
    /**
     * 表名字
     */
    private String tableName;
    private String targetTableName;
    private String fieldMapping;
    private String id;

    private final String insertTemplate = "insert into {0} ({1}) values ({2})";
    private final String updateTemplate = "update {0} set {1} where {2}";
    private final String deleteTemplate = "delete from {0} where {1}";

    public void insert(HashMap t) {
        log.info("插入获取的数据\n"+t);
        String sql = getInsertSql(t);
        baseMapper.insert(sql);
    }

    public void update(HashMap before, HashMap after) {
        log.info("更新之前的数据\n"+before);
        log.info("更新之后的数据\n"+after);
        String sql = getUpdateSql(before,after);
        if(sql!=null)
        baseMapper.update(sql);
    }

    public void delete(HashMap t) {
        log.info("删除的数据\n"+t);
        String sql = getDeleteSql(t);
        baseMapper.delete(sql);
    }

    private String getDeleteSql(HashMap t) {
        //"delete from {0} where {2}";
        String[] params = new String[2];
        params[0] = targetTableName;
        params[1] = getWhereSql(t);
        return MessageFormat.format(deleteTemplate,params);
    }


    private String getUpdateSql(HashMap<String,String> before, HashMap after) {
        if(fieldMapping==null){
            fieldMapping = "*";
        }
        Map<String,String> updateFieldMap = Maps.newHashMap();
        Set<String> updateOriginField = before.keySet();
        if(!"*".equals(fieldMapping)){
            for (String kv : fieldMapping.split(",")) {
                String[] kvArr = kv.split(":");
                if(updateOriginField.contains(kvArr[0])){
                    updateFieldMap.put(kvArr[1],toString(after.get(kvArr[0])));
                }
            }
        }else{
            for (Map.Entry<String, String> entry : before.entrySet()) {
                updateFieldMap.put(entry.getKey(),toString(after.get(entry.getKey())));
            }
        }
        if(updateFieldMap.isEmpty()){
            return null;
        }
        //updateTemplate = "update {0} set {1} where {2}";
        String[] params = new String[3];
        params[0] = targetTableName;
        StringBuilder sb = new StringBuilder();
        updateFieldMap.entrySet().forEach(entry->{
            sb.append(entry.getKey());
            sb.append("=");
            sb.append(entry.getValue());
            sb.append(",");
        });
        params[1] = sb.toString().substring(0,sb.length()-1);
        params[2] =getWhereSql(after);

        return MessageFormat.format(updateTemplate,params);

    }

    private String getWhereSql(HashMap<String,String> data) {
        StringBuilder sb = new StringBuilder();
        String originId,currentId;
        for (String idMapper : id.split(",")) {
            if(idMapper.contains(":")){
                String[] split = idMapper.split(":");
                originId=split[0];
                currentId=split[1];
            }else{
                originId=idMapper;
                currentId=idMapper;
            }
            sb.append(currentId+"="+data.get(originId));
            sb.append(" and ");
        }
        return sb.toString().substring(0,sb.length()-5);
    }

    private String getInsertSql(HashMap t) {
        //首先查询插入的字段
        if(fieldMapping==null){
            fieldMapping = "*";
        }
        List<String> originFieldList = Lists.newArrayList();
        List<String> newFieldList = Lists.newArrayList();
        List<String> newFieldValueList = Lists.newArrayList();
        if(!fieldMapping.equals("*")){
            //指定字段
            for (String kv : fieldMapping.split(",")) {
                String[] kvArr = kv.split(":");
                originFieldList.add(kvArr[0]);
                newFieldList.add(kvArr[1]);
                newFieldValueList.add(toString(t.get(kvArr[0])));
            }
        }else{
            for (Object o : t.keySet()) {
                originFieldList.add((String)o);
                newFieldList.add((String)o);
                newFieldValueList.add(toString(t.get(o)));
            }
        }
        String[] params = new String[3];
        params[0] = targetTableName;
        params[1] = String.join(",",newFieldList);
        params[2] = String.join(",",newFieldValueList);
        return MessageFormat.format(insertTemplate,params);
    }


    private String toString(Object obj){
        if(obj == null || obj.toString().length()==0){
            return null;
        }
        return "\""+ObjectUtils.toString(obj)+"\"";
    }
}

添加Mapper操作数据库



import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;

/**
 * @author yangguang
 * @date 2023年04月25日 9:48
 */
@Repository
public interface BaseMapper {

    @Insert("${sql}")
    void insert(String sql);

    @Update("${sql}")
    void update(String sql);

    @Delete("${sql}")
    void delete(String sql);
}

添加CanalListenerBeanPostProcessor 读取配置文件,生成BaseCanalListener类


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.*;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.List;

/**
 * @author yangguang
 * @date 2023年04月24日 15:11
 */
@Component
@Slf4j
public class CanalListenerBeanPostProcessor implements BeanPostProcessor, BeanDefinitionRegistryPostProcessor, EnvironmentAware {

    private Environment environment;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {


        return bean;
    }

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        int dbNum=0;
        while (true){
            String database = getConfigByKey("canal.sync["+dbNum+"].database");
            if(StringUtils.isEmpty(database)){
                break;
            }
            int tableNum=0;
            while (true){
                String tableName = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].tableName");
                String targetTableName = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].targetTableName");
                String fieldMapping = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].fieldMapping");
                String id = getConfigByKey("canal.sync["+dbNum+"].tableInfo["+tableNum+"].id");
                if(StringUtils.isEmpty(tableName)){
                    break;
                }
                String beanName = database+tableName;
                //一个数据库一个表一个监听类
                GenericBeanDefinition beandefinition=new GenericBeanDefinition();
                beandefinition.setBeanClassName("com.netinfo.cannel.config.BaseCanalListener");
                beandefinition.getPropertyValues().add("schemaName",database);
                beandefinition.getPropertyValues().add("tableName",tableName);
                beandefinition.getPropertyValues().add("targetTableName",targetTableName);
                beandefinition.getPropertyValues().add("fieldMapping",fieldMapping);
                beandefinition.getPropertyValues().add("id",id);
                beandefinition.getPropertyValues().add("baseMapper",new RuntimeBeanReference("baseMapper"));

                registry.registerBeanDefinition(beanName,beandefinition);
                tableNum++;
            }
            dbNum++;
        }
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    private String getConfigByKey(String key) {
        try{
            return environment.getProperty(key);
        }catch (Exception e){
            return null;
        }
    }
}
到此客户端搭建完成,但是canal-spring-boot-starter 西子小姐姐封装的不支持Map实体,并且没有重连机制,所以需要修改源码进行覆盖一下

io.xzxj.canal.spring.autoconfigure.TcpClientAutoConfiguration 类

package io.xzxj.canal.spring.autoconfigure;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.client.TcpCanalClient;
import io.xzxj.canal.core.factory.EntryColumnConvertFactory;
import io.xzxj.canal.core.handler.IMessageHandler;
import io.xzxj.canal.core.handler.RowDataHandler;
import io.xzxj.canal.core.handler.impl.AsyncMessageHandlerImpl;
import io.xzxj.canal.core.handler.impl.RowDataHandlerImpl;
import io.xzxj.canal.core.handler.impl.SyncMessageHandlerImpl;
import io.xzxj.canal.core.listener.EntryListener;
import io.xzxj.canal.spring.properties.CanalProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.util.List;
import java.util.concurrent.ExecutorService;

/**
 * @author xzxj
 * @date 2023/3/11 11:37
 */
@Configuration
@EnableConfigurationProperties(CanalProperties.class)
@ConditionalOnProperty(value = "canal.server-mode", havingValue = "tcp")
@Import(value = {TcpCanalClient.class,ThreadPoolAutoConfiguration.class})
public class TcpClientAutoConfiguration {

    private final CanalProperties canalProperties;

    public TcpClientAutoConfiguration(CanalProperties canalProperties) {
        this.canalProperties = canalProperties;
    }

    @Bean
    public RowDataHandler<CanalEntry.RowData> rowDataHandler() {
        return new RowDataHandlerImpl(new EntryColumnConvertFactory());
    }

    @Bean
    @ConditionalOnProperty(value = "canal.async", havingValue = "true", matchIfMissing = true)
    public IMessageHandler<Message> asyncMessageHandler(List<EntryListener<?>> entryListenerList,
                                                        RowDataHandler<CanalEntry.RowData> rowDataHandler,
                                                        ExecutorService executorService) {
        return new AsyncMessageHandlerImpl(entryListenerList, rowDataHandler, executorService);
    }

    @Bean
    @ConditionalOnProperty(value = "canal.async", havingValue = "false")
    public IMessageHandler<Message> syncMessageHandler(List<EntryListener<?>> entryListenerList,
                                                       RowDataHandler<CanalEntry.RowData> rowDataHandler) {
        return new SyncMessageHandlerImpl(entryListenerList, rowDataHandler);
    }

}

io.xzxj.canal.core.client.AbstractCanalClient

package io.xzxj.canal.core.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import io.xzxj.canal.core.handler.IMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author xzxj
 * @date 2023/3/11 10:33
 */
public abstract class AbstractCanalClient implements ICanalClient {

    private static final Logger log = LoggerFactory.getLogger(AbstractCanalClient.class);

    protected volatile boolean runStatus;

    private Thread thread;

    protected CanalConnector connector;

    protected IMessageHandler messageHandler;

    protected String filter = "";

    protected Integer batchSize = 1;

    protected Long timeout = 1L;

    protected TimeUnit unit = TimeUnit.SECONDS;

    @Override
    public void init() {
        startConnect();
    }
    private void startConnect(){
        log.info("canal client init");
        this.connectCanal();
        thread = new Thread(this::handleListening);
        thread.setName("canal-client-thread");
        runStatus = true;
        thread.start();
    }

    private void connectCanal() {
        try {
            log.info("canal client connecting");
            connector.connect();
            this.subscribe();
            log.info("canal client connect success");
        } catch (CanalClientException e) {
            log.error("canal client connect error: {}", e.getMessage(), e);
            this.destroy();
        }
    }

    public void subscribe() {
        connector.subscribe(filter);
    }

    @Override
    public void destroy() {
        try {
            log.info("canal client destroy");
            connector.unsubscribe();
        }catch (Exception e){
            log.error("canal client connect exception");
        }
        if (thread != null) {
            thread.interrupt();
        }
        runStatus = false;
    }

    private void sleep(int num){
        try {
            Thread.sleep(num);
        }catch (Exception e){
        }
    }
}
io.xzxj.canal.core.client.TcpCanalClient
package io.xzxj.canal.core.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.handler.IMessageHandler;
import io.xzxj.canal.spring.properties.CanalProperties;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

/**
 * @author xzxj
 * @date 2023/3/11 10:51
 */
public class TcpCanalClient extends AbstractCanalClient {

    private static final Logger log = LoggerFactory.getLogger(TcpCanalClient.class);

    @Autowired
    private CanalProperties canalProperties;
    @Autowired
    private IMessageHandler messageHandler;

    private static TcpCanalClient runClient ;
    @PostConstruct
    public void initClient(){
        String server = canalProperties.getServer();
        String[] array = server.split(":");
        TcpCanalClient.builder()
                .hostname(array[0])
                .port(Integer.parseInt(array[1]))
                .destination(canalProperties.getDestination())
                .username(canalProperties.getUsername())
                .password(canalProperties.getPassword())
                .messageHandler(messageHandler)
                .batchSize(canalProperties.getBatchSize())
                .filter(canalProperties.getFilter())
                .timeout(canalProperties.getTimeout())
                .unit(canalProperties.getUnit())
                .build().init();
        runClient = this;
    }

    @Override
    public void handleListening() {
        try {
            while (runStatus) {
                Message message = connector.getWithoutAck(batchSize, timeout, unit);
                log.debug("receive message={}", message);
                long batchId = message.getId();
                if (message.getId() != -1 && message.getEntries().size() != 0) {
                    messageHandler.handleMessage(message);
                }
                connector.ack(batchId);
            }
        } catch (Exception e) {
            log.error("canal client exception", e);
            runClient.reConnect();
        }
    }

    private void reConnect() {
        log.error("canal服务端断开连接,10s后准备重连");
        try {
            Thread.sleep(10000);
        }catch (Exception e){
        }
        initClient();
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private String filter = StringUtils.EMPTY;
        private Integer batchSize = 1;
        private Long timeout = 1L;
        private TimeUnit unit = TimeUnit.SECONDS;
        private String hostname;
        private Integer port;
        private String destination;
        private String username;
        private String password;
        private IMessageHandler<?> messageHandler;

        private Builder() {
        }

        public Builder hostname(String hostname) {
            this.hostname = hostname;
            return this;
        }

        public Builder port(Integer port) {
            this.port = port;
            return this;
        }

        public Builder destination(String destination) {
            this.destination = destination;
            return this;
        }

        public Builder username(String username) {
            this.username = username;
            return this;
        }

        public Builder password(String password) {
            this.password = password;
            return this;
        }


        public Builder filter(String filter) {
            this.filter = filter;
            return this;
        }

        public Builder batchSize(Integer batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        public Builder timeout(Long timeout) {
            this.timeout = timeout;
            return this;
        }

        public Builder unit(TimeUnit unit) {
            this.unit = unit;
            return this;
        }

        public Builder messageHandler(IMessageHandler<?> messageHandler) {
            this.messageHandler = messageHandler;
            return this;
        }

        public TcpCanalClient build() {
            CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, username, password);
            TcpCanalClient tcpCanalClient = new TcpCanalClient();
            tcpCanalClient.connector = canalConnector;
            tcpCanalClient.messageHandler = messageHandler;
            tcpCanalClient.filter = this.filter;
            tcpCanalClient.unit = this.unit;
            tcpCanalClient.batchSize = this.batchSize;
            tcpCanalClient.timeout = this.timeout;
            return tcpCanalClient;
        }
    }

}

io.xzxj.canal.core.factory.EntryColumnConvertFactory

package io.xzxj.canal.core.factory;

import com.alibaba.otter.canal.protocol.CanalEntry;
import io.xzxj.canal.core.listener.EntryListener;
import io.xzxj.canal.core.util.TableFieldUtil;
import io.xzxj.canal.core.util.TableInfoUtil;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * @author xzxj
 * @date 2023/3/12 12:10
 */
public class EntryColumnConvertFactory extends AbstractConvertFactory<List<CanalEntry.Column>> {

    @Override
    <R> R newInstance(Class<R> clazz, List<CanalEntry.Column> columnList) throws InstantiationException, IllegalAccessException, NoSuchFieldException {
        R object = clazz.newInstance();

        if(object instanceof HashMap){
            for (CanalEntry.Column column : columnList) {
                ((Map)object).put(column.getName().toUpperCase(),column.getValue());
            }
            return object;
        }

        Map<String, String> fieldMap = TableFieldUtil.getFieldMap(object.getClass());
        for (CanalEntry.Column column : columnList) {
            String fieldName = fieldMap.get(column.getName());
            if (StringUtils.isNotEmpty(fieldName)) {
                TableFieldUtil.setFieldValue(object, fieldName, column.getValue());
            }
        }
        return object;
    }

    @Override
    public <R> R newInstance(EntryListener<?> entryHandler, List<CanalEntry.Column> columnList, Set<String> updateColumn) throws InstantiationException, IllegalAccessException, NoSuchFieldException {
        Class<R> tableClass = TableInfoUtil.getTableClass(entryHandler);
        if (tableClass == null) {
            return null;
        }
        R r = tableClass.newInstance();
        if(r instanceof HashMap){
            for (CanalEntry.Column column : columnList) {
                if (!updateColumn.contains(column.getName())) {
                    continue;
                }
                ((Map)r).put(column.getName().toUpperCase(),column.getValue());
            }
            return r;
        }

        Map<String, String> columnNames = TableFieldUtil.getFieldMap(r.getClass());
        for (CanalEntry.Column column : columnList) {
            if (!updateColumn.contains(column.getName())) {
                continue;
            }
            String fieldName = columnNames.get(column.getName());
            if (StringUtils.isNotEmpty(fieldName)) {
                TableFieldUtil.setFieldValue(r, fieldName, column.getValue());
            }
        }
        return r;
    }

}

io.xzxj.canal.core.util.TableInfoUtil

package io.xzxj.canal.core.util;

import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.CaseFormat;
import com.netinfo.cannel.config.BaseCanalListener;
import io.xzxj.canal.core.annotation.CanalListener;
import io.xzxj.canal.core.listener.EntryListener;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nullable;
import javax.persistence.Table;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author xzxj
 * @date 2023/3/11 16:19
 */
public class TableInfoUtil {

    private static Map<Class<? extends EntryListener>, Class>
            CLASS_LISTENER_CACHE_MAP = new ConcurrentHashMap<>();

    @Nullable
    public static String getTableName(EntryListener<?> entryListener) {
        if(entryListener instanceof BaseCanalListener){
            BaseCanalListener listener = (BaseCanalListener)entryListener;
            return listener.getSchemaName()+"."+listener.getTableName();
        }
        CanalListener annotation = entryListener.getClass().getAnnotation(CanalListener.class);
        if (annotation == null) {
            return null;
        }
        StringBuilder fullName = new StringBuilder();
        if (StringUtils.isNotBlank(annotation.schemaName())) {
            fullName.append(annotation.schemaName()).append(".");
        }
        if (StringUtils.isNotBlank(annotation.tableName())) {
            fullName.append(annotation.tableName());
        }else {
            String tableName = findTableName(entryListener);
            fullName.append(tableName);
        }
        return fullName.toString();
    }

    @Nullable
    private static String findTableName(EntryListener<?> entryListener) {
        Class<Object> tableClass = getTableClass(entryListener);
        if (tableClass == null) {
            return null;
        }
        TableName tableName = tableClass.getAnnotation(TableName.class);
        if (tableName != null && StringUtils.isNotBlank(tableName.value())) {
            return tableName.value();
        }
        Table table = tableClass.getAnnotation(Table.class);
        if (table != null && StringUtils.isNotBlank(table.name())) {
            return table.name();
        }
        return CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, tableClass.getName());
    }

    /**
     * 找到EntryListener泛型中的数据库实体类
     *
     * @param object
     * @param <T>
     * @return
     */
    @Nullable
    public static <T> Class<T> getTableClass(EntryListener<?> object) {
        Class<? extends EntryListener> listenerClass = object.getClass();
        Class<T> tableClass = CLASS_LISTENER_CACHE_MAP.get(listenerClass);
        if (tableClass != null) {
            return tableClass;
        }
        Type[] interfacesTypes = listenerClass.getGenericInterfaces();
        for (Type type : interfacesTypes) {
            Class<?> c = (Class<?>) ((ParameterizedType) type).getRawType();
            if (c.equals(EntryListener.class)) {
                tableClass = (Class<T>) ((ParameterizedType) type).getActualTypeArguments()[0];
                CLASS_LISTENER_CACHE_MAP.putIfAbsent(listenerClass, tableClass);
                return tableClass;
            }
        }
        return null;
    }

}

覆盖完后,就可以根据项目配置,自动进行表的同步了

GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐