记录一次springboot2.x整合canal+kafka踩坑
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
运维大大搭建好服务之后,监听到kafka的canal数据后,发现数据全是乱码,程序根本没法正常往下走,贴张截图:
网上找了一圈也没找到一个有效的处理办法,后来有个前辈做过这个,给我指导了一下,原来是序列化问题,直接贴代码:
import com.alibaba.otter.canal.client.kafka.MessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setAutoStartup(false);
// 将过滤抛弃的消息自动确认
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy() {
@Override
public boolean filter(ConsumerRecord consumerRecord) {
long length = consumerRecord.toString().getBytes().length;
long maxLength = 5 * 1024 *1024; //大于5M
if(length < maxLength){
return false;
}
// 返回true将会被丢弃
return true;
}
});
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 【注意】重点就是这里,反序列化时需要用canal中的序列化方式
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
## kafka配置
#kafka.consumer.servers=192.168.3.2:9092
#kafka.consumer.enable.auto.commit=true
#kafka.consumer.session.timeout=6000
#kafka.consumer.auto.commit.interval=1000
#kafka.consumer.auto.offset.reset=earliest
#kafka.consumer.topic=canal-test-topic
#kafka.consumer.group.id=canal-test-group
#kafka.consumer.concurrency=20
这样反序列化出来的信息就对了。
后续处理的一段伪代码:
@Component
public class BaselineCanalKafkaListener {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@KafkaListener(id = CanalConstants.CANAL_KAFKA_ID, topics = {"canal-wp-baseline"})
public void listen(ConsumerRecord<?, ?> record) {
if(!(record.value() instanceof Message)){
return;
}
Message message = (Message) record.value();
int size = message.getEntries().size();
if (message.getId() == -1 || size == 0) {
return;
}
List<EsData> infos = createEventInfos(message.getEntries());
if (CollectionUtils.isEmpty(infos)) {
return;
}
// key--handlerBeanName,value--EsData对象
Map<String, List<EsData>> listMap = infos.stream().collect(Collectors.groupingBy(EsData::getHandlerBean));
for (Map.Entry<String, List<EsData>> entry : listMap.entrySet()) {
ThreadPoolUtils.submit(() -> {
try {
SpringContextUtils.getBean(entry.getKey(), EsDataHandler.class).handle(entry.getValue());
} catch (Exception exception) {
log.error("EsDataHandler执行异常:", exception);
}
});
}
}
/**
* 产生事件信息和数据。将事件中的信息转为EsData对象
*
* @param entrys
* @return
*/
private List<EsData> createEventInfos(List<CanalEntry.Entry> entrys) {
//事件信息集合
List<EsData> eventInfoList = new ArrayList<>(entrys.size());
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.error("canal 数据转化 RowChange 对象出错, 数据:" + entry.toString(), e);
continue;
}
//事件类型
CanalEntry.EventType eventType = rowChage.getEventType();
//表名
String tableName = entry.getHeader().getTableName();
log.info("===========binlog[{} : {}] ,tableName[{}] , eventType : {}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), tableName, eventType);
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
List<CanalEntry.Column> columns = getChangeColumns(eventType, rowData);
Set<EsData> dataSet = mysqlEsMappings.stream().filter(e -> e.getTableName().equals(tableName)).collect(Collectors.toSet());
if (CollectionUtils.isEmpty(dataSet)) {
log.error("未找到表【{}】的配置处理", tableName);
continue;
}
for (EsData esData : dataSet) {
String esDataIdName = esData.getEsDataIdName();
Optional<CanalEntry.Column> optional = columns.stream().filter(e -> e.getName().equals(esDataIdName)).findAny();
if (!optional.isPresent()) {
log.error("表:【{}】,dataId:【{}】字段不存在,原始数据:{}", tableName, esDataIdName, columns.toString());
continue;
}
eventInfoList.add(new EsData(tableName, esDataIdName, optional.get().getValue(), esData.getHandlerBean(), eventType, columns));
}
}
}
return eventInfoList;
}
/**
* 获取变更的列
* @param eventType 当前仅支持:增删改
* @param rowData
* @return
*/
private List<CanalEntry.Column> getChangeColumns(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
if (eventType.equals(CanalEntry.EventType.INSERT)) {
return rowData.getAfterColumnsList();
} else if (eventType.equals(CanalEntry.EventType.UPDATE)) {
return rowData.getAfterColumnsList();
} else if (eventType.equals(CanalEntry.EventType.DELETE)) {
return rowData.getBeforeColumnsList();
}
log.error("不支持的操作类型");
return Collections.emptyList();
}
}
import com.alibaba.otter.canal.protocol.CanalEntry;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Getter
@Setter
public class EsData {
public EsData(String tableName, String esDataIdName, Object esDataIdValue, String handlerBean, CanalEntry.EventType eventType, List<CanalEntry.Column> changeColumns) {
this.esDataIdName = esDataIdName;
this.esDataIdValue = esDataIdValue;
this.tableName = tableName;
this.handlerBean = handlerBean;
this.eventType = eventType;
this.changeColumns = changeColumns;
}
/**
* es数据id对应数据库字段的名称,如:ark_order表是将insure_no作为ES数据的主键,那么就填insure_no
*/
private String esDataIdName;
/**
* es数据id对应数据库字段的值,如:ark_order表是将insure_no作为ES数据的主键,那么此列数据insure_no的值为 4541564132
*/
private Object esDataIdValue;
/**
* 变更的表名
*/
private String tableName;
/**
* 操作该数据的bean名称
*/
private String handlerBean;
/**
* 操作类型
*/
private CanalEntry.EventType eventType;
/**
* 该列的原始canal数据
*/
private List<CanalEntry.Column> changeColumns;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EsData esData = (EsData) o;
if (tableName != null ? !tableName.equals(esData.tableName) : esData.tableName != null) return false;
return handlerBean != null ? handlerBean.equals(esData.handlerBean) : esData.handlerBean == null;
}
@Override
public int hashCode() {
int result = tableName != null ? tableName.hashCode() : 0;
result = 31 * result + (handlerBean != null ? handlerBean.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "EsData{" +
"esDataIdName='" + esDataIdName + '\'' +
", esDataIdValue=" + esDataIdValue +
", tableName='" + tableName + '\'' +
", handlerBean='" + handlerBean + '\'' +
", eventType=" + eventType +
'}';
}
}
GitHub 加速计划 / ca / canal
27
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:5 个月前 )
02fffc9e - 2 天前
5ca4c393 - 2 天前
更多推荐
已为社区贡献4条内容
所有评论(0)