分析CanalRocketMQProducer.send
canal发送消息到RocketMQ使用到了partitionNum、partitionHash
通过partitionHash可以把消息发送到RocketMQ的不同分区上,因为同一个分区在消费时有序的

    public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
        // 获取当前topic的分区数
        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
            destination.getDynamicTopicPartitionNum());
        if (partitionNum == null) {
            partitionNum = destination.getPartitionsNum();
        }
        if (!mqProperties.isFlatMessage()) {
            ......
        } else {
            // 并发构造
            MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
            // 串行分区
            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
            // 初始化分区合并队列
            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
                for (int i = 0; i < destination.getPartitionsNum(); i++) {
                    partitionFlatMessages.add(new ArrayList<>());
                }

                for (FlatMessage flatMessage : flatMessages) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        partitionNum,
                        destination.getPartitionHash(),
                        mqProperties.isDatabaseHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        // 增加null判断,issue #3267
                        if (partitionFlatMessage[i] != null) {
                            partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
                        }
                    }
                }

                ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
                for (int i = 0; i < partitionFlatMessages.size(); i++) {
                    final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
                    if (flatMessagePart != null && flatMessagePart.size() > 0) {
                        final int index = i;
                        template.submit(() -> {
                            List<Message> messages = flatMessagePart.stream()
                                .map(flatMessage -> new Message(topicName,
                                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
                                    JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
                                .collect(Collectors.toList());
                            // 批量发送
                            sendMessage(messages, index);
                        });
                    }
                }

                // 批量等所有分区的结果
                template.waitForResult();
            } else {
                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                List<Message> messages = flatMessages.stream()
                    .map(flatMessage -> new Message(topicName,
                        ((RocketMQProducerConfig) this.mqProperties).getTag(),
                        JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
                    .collect(Collectors.toList());
                // 批量发送
                sendMessage(messages, partition);
            }
        }
    }

partitionHash计算公式

Math.abs(pk.hashCode) % partitionsNum

分析计算公式实现MQMessageUtils.messagePartition

    List<String> pkNames = hashMode.pkNames;
    if (hashMode.autoPkHash) {
        pkNames = flatMessage.getPkNames();
    }

    int idx = 0;
    for (Map<String, String> row : flatMessage.getData()) {
        int hashCode = 0;
        if (databaseHash) {
            hashCode = database.hashCode();
        }
        if (pkNames != null) {
            for (String pkName : pkNames) {
                String value = row.get(pkName);
                if (value == null) {
                    value = "";
                }
                hashCode = hashCode ^ value.hashCode();
            }
        }

        int pkHash = Math.abs(hashCode) % partitionsNum;
        // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
        pkHash = Math.abs(pkHash);

RocketMQ client发送消息时指定了partition

    private void sendMessage(Message message, int partition) {
        try {
            SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {
                if (partition >= mqs.size()) {
                    return mqs.get(partition % mqs.size());
                } else {
                    return mqs.get(partition);
                }
            }, null);

            if (logger.isDebugEnabled()) {
                logger.debug("Send Message Result: {}", sendResult);
            }
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

partitionHash表达式如何配置
分析MQMessageUtils.getPartitionHashColumns,从partitionDatas中获取配置

    public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
        if (StringUtils.isEmpty(pkHashConfigs)) {
            return null;
        }

        List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
        for (PartitionData data : datas) {
            if (data.simpleName != null) {
                if (data.simpleName.equalsIgnoreCase(name)) {
                    return data.hashMode;
                }
            } else {
                if (data.regexFilter.filter(name)) {
                    return data.hashMode;
                }
            }
        }

        return null;
    }

代码中认为一个冒号后面的表达式为pkHash的表达式。使用$pk$变量名来表示取主键,当然也可以自定义表达式

Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
  .softValues(),
  pkHashConfigs -> {
      List<PartitionData> datas = Lists.newArrayList();

      String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
          ",",
          ";"),
          ";");
      // schema.table:id^name
      for (String pkHashConfig : pkHashConfigArray) {
          PartitionData data = new PartitionData();
          int i = pkHashConfig.lastIndexOf(":");
          if (i > 0) {
              String pkStr = pkHashConfig.substring(i + 1);
              // 变量名
              if (pkStr.equalsIgnoreCase("$pk$")) {
                  data.hashMode.autoPkHash = true;
              } else {
                  //自定义表达式 val1 ^ val2 ^ val3
                  data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
                      '^'));
              }

              pkHashConfig = pkHashConfig.substring(0,
                  i);
          } else {
              data.hashMode.tableHash = true;
          }

          if (!isWildCard(pkHashConfig)) {
              data.simpleName = pkHashConfig;
          } else {
              data.regexFilter = new AviaterRegexFilter(pkHashConfig);
          }
          datas.add(data);
      }

      return datas;
  });

所以通用的多表分区表达式如下

.*\\..*:$pk$

实践

修改rocketmq分区
在这里插入图片描述
修改instance配置
在这里插入图片描述
测试查询消息分区后的结构
在这里插入图片描述
查询rocketmq
在这里插入图片描述

GitHub 加速计划 / ca / canal
53
10
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
8a4199a7 * 1. Fix compressed OSS binlog data 2. Fix first second data loss caused by dumping from OSS binlog * Fix CI failed test cases 25 天前
79338be0 - String.format is lower than StringBuilder. Benchmark like below: code snippet: String str = String.format("%s-%s-%s", 0, 1, 10); Benchmark Mode Cnt Score Error Units StringBenchmark.append thrpt 46431458.255 ops/s StringBenchmark.format thrpt 985724.313 ops/s StringBenchmark.append avgt ≈ 10⁻⁸ s/op StringBenchmark.format avgt ≈ 10⁻⁶ s/op StringBenchmark.append sample 364232 ≈ 10⁻⁷ s/op StringBenchmark.append:p0.00 sample ≈ 10⁻⁸ s/op StringBenchmark.append:p0.50 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.90 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.95 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.99 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.999 sample ≈ 10⁻⁷ s/op StringBenchmark.append:p0.9999 sample ≈ 10⁻⁵ s/op StringBenchmark.append:p1.00 sample 0.001 s/op StringBenchmark.format sample 336220 ≈ 10⁻⁶ s/op StringBenchmark.format:p0.00 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.50 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.90 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.95 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.99 sample ≈ 10⁻⁶ s/op StringBenchmark.format:p0.999 sample ≈ 10⁻⁵ s/op StringBenchmark.format:p0.9999 sample ≈ 10⁻⁴ s/op StringBenchmark.format:p1.00 sample 0.001 s/op StringBenchmark.append ss ≈ 10⁻⁶ s/op StringBenchmark.format ss ≈ 10⁻⁵ s/op 25 天前
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐