canal五部曲-如何保证消息的顺序
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
·
分析CanalRocketMQProducer.send
canal发送消息到RocketMQ使用到了partitionNum、partitionHash
通过partitionHash可以把消息发送到RocketMQ的不同分区上,因为同一个分区在消费时有序的
public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
if (!mqProperties.isFlatMessage()) {
......
} else {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
// 初始化分区合并队列
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
for (int i = 0; i < destination.getPartitionsNum(); i++) {
partitionFlatMessages.add(new ArrayList<>());
}
for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
// 增加null判断,issue #3267
if (partitionFlatMessage[i] != null) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
}
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < partitionFlatMessages.size(); i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
if (flatMessagePart != null && flatMessagePart.size() > 0) {
final int index = i;
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, index);
});
}
}
// 批量等所有分区的结果
template.waitForResult();
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
List<Message> messages = flatMessages.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, partition);
}
}
}
partitionHash计算公式
Math.abs(pk.hashCode) % partitionsNum
分析计算公式实现MQMessageUtils.messagePartition
List<String> pkNames = hashMode.pkNames;
if (hashMode.autoPkHash) {
pkNames = flatMessage.getPkNames();
}
int idx = 0;
for (Map<String, String> row : flatMessage.getData()) {
int hashCode = 0;
if (databaseHash) {
hashCode = database.hashCode();
}
if (pkNames != null) {
for (String pkName : pkNames) {
String value = row.get(pkName);
if (value == null) {
value = "";
}
hashCode = hashCode ^ value.hashCode();
}
}
int pkHash = Math.abs(hashCode) % partitionsNum;
// math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
pkHash = Math.abs(pkHash);
RocketMQ client发送消息时指定了partition
private void sendMessage(Message message, int partition) {
try {
SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {
if (partition >= mqs.size()) {
return mqs.get(partition % mqs.size());
} else {
return mqs.get(partition);
}
}, null);
if (logger.isDebugEnabled()) {
logger.debug("Send Message Result: {}", sendResult);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
partitionHash表达式如何配置
分析MQMessageUtils.getPartitionHashColumns,从partitionDatas中获取配置
public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
if (StringUtils.isEmpty(pkHashConfigs)) {
return null;
}
List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
for (PartitionData data : datas) {
if (data.simpleName != null) {
if (data.simpleName.equalsIgnoreCase(name)) {
return data.hashMode;
}
} else {
if (data.regexFilter.filter(name)) {
return data.hashMode;
}
}
}
return null;
}
代码中认为一个冒号后面的表达式为pkHash的表达式。使用$pk$变量名来表示取主键,当然也可以自定义表达式
Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
.softValues(),
pkHashConfigs -> {
List<PartitionData> datas = Lists.newArrayList();
String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
",",
";"),
";");
// schema.table:id^name
for (String pkHashConfig : pkHashConfigArray) {
PartitionData data = new PartitionData();
int i = pkHashConfig.lastIndexOf(":");
if (i > 0) {
String pkStr = pkHashConfig.substring(i + 1);
// 变量名
if (pkStr.equalsIgnoreCase("$pk$")) {
data.hashMode.autoPkHash = true;
} else {
//自定义表达式 val1 ^ val2 ^ val3
data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
'^'));
}
pkHashConfig = pkHashConfig.substring(0,
i);
} else {
data.hashMode.tableHash = true;
}
if (!isWildCard(pkHashConfig)) {
data.simpleName = pkHashConfig;
} else {
data.regexFilter = new AviaterRegexFilter(pkHashConfig);
}
datas.add(data);
}
return datas;
});
所以通用的多表分区表达式如下
.*\\..*:$pk$
实践
修改rocketmq分区
修改instance配置
测试查询消息分区后的结构
查询rocketmq
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7
* 1. Fix compressed OSS binlog data
2. Fix first second data loss caused by dumping from OSS binlog
* Fix CI failed test cases 25 天前
79338be0
- String.format is lower than StringBuilder. Benchmark like below:
code snippet:
String str = String.format("%s-%s-%s", 0, 1, 10);
Benchmark Mode Cnt Score Error Units
StringBenchmark.append thrpt 46431458.255 ops/s
StringBenchmark.format thrpt 985724.313 ops/s
StringBenchmark.append avgt ≈ 10⁻⁸ s/op
StringBenchmark.format avgt ≈ 10⁻⁶ s/op
StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op
StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op
StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op
StringBenchmark.append:p1.00 sample 0.001 s/op
StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op
StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op
StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op
StringBenchmark.format:p1.00 sample 0.001 s/op
StringBenchmark.append ss ≈ 10⁻⁶ s/op
StringBenchmark.format ss ≈ 10⁻⁵ s/op 25 天前
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)