运维大大搭建好服务之后,监听到kafka的canal数据后,发现数据全是乱码,程序根本没法正常往下走,贴张截图:
在这里插入图片描述
网上找了一圈也没找到一个有效的处理办法,后来有个前辈做过这个,给我指导了一下,原来是序列化问题,直接贴代码:

import com.alibaba.otter.canal.client.kafka.MessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.consumer.servers}")
    private String servers;

    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;

    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;

    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setAutoStartup(false);

        // 将过滤抛弃的消息自动确认
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                long length = consumerRecord.toString().getBytes().length;
                long maxLength = 5 * 1024 *1024; //大于5M
                if(length < maxLength){
                    return  false;
                }
                // 返回true将会被丢弃
                return true;
            }
        });
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 【注意】重点就是这里,反序列化时需要用canal中的序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }

}
## kafka配置
#kafka.consumer.servers=192.168.3.2:9092
#kafka.consumer.enable.auto.commit=true
#kafka.consumer.session.timeout=6000
#kafka.consumer.auto.commit.interval=1000
#kafka.consumer.auto.offset.reset=earliest
#kafka.consumer.topic=canal-test-topic
#kafka.consumer.group.id=canal-test-group
#kafka.consumer.concurrency=20

这样反序列化出来的信息就对了。

后续处理的一段伪代码:


@Component
public class BaselineCanalKafkaListener {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(id = CanalConstants.CANAL_KAFKA_ID, topics = {"canal-wp-baseline"})
    public void listen(ConsumerRecord<?, ?> record) {
        if(!(record.value() instanceof Message)){
            return;
        }
        Message message = (Message) record.value();
        int size = message.getEntries().size();
        if (message.getId() == -1 || size == 0) {
            return;
        }
        List<EsData> infos = createEventInfos(message.getEntries());
        if (CollectionUtils.isEmpty(infos)) {
            return;
        }
        // key--handlerBeanName,value--EsData对象
        Map<String, List<EsData>> listMap = infos.stream().collect(Collectors.groupingBy(EsData::getHandlerBean));
        for (Map.Entry<String, List<EsData>> entry : listMap.entrySet()) {
            ThreadPoolUtils.submit(() -> {
                try {
                    SpringContextUtils.getBean(entry.getKey(), EsDataHandler.class).handle(entry.getValue());
                } catch (Exception exception) {
                    log.error("EsDataHandler执行异常:", exception);
                }
            });
        }

    }


    /**
     * 产生事件信息和数据。将事件中的信息转为EsData对象
     *
     * @param entrys
     * @return
     */
    private List<EsData> createEventInfos(List<CanalEntry.Entry> entrys) {
        //事件信息集合
        List<EsData> eventInfoList = new ArrayList<>(entrys.size());
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                log.error("canal 数据转化 RowChange 对象出错, 数据:" + entry.toString(), e);
                continue;
            }
            //事件类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            //表名
            String tableName = entry.getHeader().getTableName();
            log.info("===========binlog[{} : {}] ,tableName[{}] , eventType : {}",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), tableName, eventType);

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                List<CanalEntry.Column> columns = getChangeColumns(eventType, rowData);
                Set<EsData> dataSet = mysqlEsMappings.stream().filter(e -> e.getTableName().equals(tableName)).collect(Collectors.toSet());
                if (CollectionUtils.isEmpty(dataSet)) {
                    log.error("未找到表【{}】的配置处理", tableName);
                    continue;
                }
                for (EsData esData : dataSet) {
                    String esDataIdName = esData.getEsDataIdName();
                    Optional<CanalEntry.Column> optional = columns.stream().filter(e -> e.getName().equals(esDataIdName)).findAny();
                    if (!optional.isPresent()) {
                        log.error("表:【{}】,dataId:【{}】字段不存在,原始数据:{}", tableName, esDataIdName, columns.toString());
                        continue;
                    }
                    eventInfoList.add(new EsData(tableName, esDataIdName, optional.get().getValue(), esData.getHandlerBean(), eventType, columns));
                }
            }
        }
        return eventInfoList;
    }

    /**
     * 获取变更的列
     * @param eventType 当前仅支持:增删改
     * @param rowData
     * @return
     */
    private List<CanalEntry.Column> getChangeColumns(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        if (eventType.equals(CanalEntry.EventType.INSERT)) {
            return rowData.getAfterColumnsList();
        } else if (eventType.equals(CanalEntry.EventType.UPDATE)) {
            return rowData.getAfterColumnsList();
        } else if (eventType.equals(CanalEntry.EventType.DELETE)) {
            return rowData.getBeforeColumnsList();
        }
        log.error("不支持的操作类型");
        return Collections.emptyList();
    }

}

import com.alibaba.otter.canal.protocol.CanalEntry;
import lombok.Getter;
import lombok.Setter;

import java.util.List;

@Getter
@Setter
public class EsData {

    public EsData(String tableName, String esDataIdName, Object esDataIdValue, String handlerBean, CanalEntry.EventType eventType, List<CanalEntry.Column> changeColumns) {
        this.esDataIdName = esDataIdName;
        this.esDataIdValue = esDataIdValue;
        this.tableName = tableName;
        this.handlerBean = handlerBean;
        this.eventType = eventType;
        this.changeColumns = changeColumns;
    }

    /**
     * es数据id对应数据库字段的名称,如:ark_order表是将insure_no作为ES数据的主键,那么就填insure_no
     */
    private String esDataIdName;
    /**
     * es数据id对应数据库字段的值,如:ark_order表是将insure_no作为ES数据的主键,那么此列数据insure_no的值为 4541564132
     */
    private Object esDataIdValue;
    /**
     * 变更的表名
     */
    private String tableName;
    /**
     * 操作该数据的bean名称
     */
    private String handlerBean;
    /**
     * 操作类型
     */
    private CanalEntry.EventType eventType;
    /**
     * 该列的原始canal数据
     */
    private List<CanalEntry.Column> changeColumns;


    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        EsData esData = (EsData) o;

        if (tableName != null ? !tableName.equals(esData.tableName) : esData.tableName != null) return false;
        return handlerBean != null ? handlerBean.equals(esData.handlerBean) : esData.handlerBean == null;
    }

    @Override
    public int hashCode() {
        int result = tableName != null ? tableName.hashCode() : 0;
        result = 31 * result + (handlerBean != null ? handlerBean.hashCode() : 0);
        return result;
    }

    @Override
    public String toString() {
        return "EsData{" +
                "esDataIdName='" + esDataIdName + '\'' +
                ", esDataIdValue=" + esDataIdValue +
                ", tableName='" + tableName + '\'' +
                ", handlerBean='" + handlerBean + '\'' +
                ", eventType=" + eventType +
                '}';
    }
}
GitHub 加速计划 / ca / canal
27
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:5 个月前 )
02fffc9e - 2 天前
5ca4c393 - 2 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐