基于 MySQL 这种数据同步机制,那 Canal 的设计目标主要就是实现数据的同步,即数据的复制,从上面的图自然而然的想到了如下的设计:

在这里插入图片描述

原理相对比较简单:

  • canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave,向 mysql master 发送 dump 协议

  • mysql master 收到 dump 请求,开始推送 binary log 给 slave (canal)

  • canal解析 binary log 对象(原始为byte流)

接下来我们来看一下 Canale 的整体组成部分:

在这里插入图片描述

说明:

  • server代表一个canal运行实例,对应于一个jvm

  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)

  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)

  • eventStore (数据存储)

  • metaManager (增量订阅&消费信息管理器)

这些组件我暂时不打算深入去研究,因为在目前这个阶段我自己也不清楚,但这个是我后续需要学习研究的重点。

3、在 IntelliJ IDEA 中运行 Canal Demo


在 Linux 环境中安装 canal 比较简单,大家可以安装官方手册一步一步操作即可,在这里我就不重复介绍,本节主要的目的是希望在开发工具中运行 Canal 的 Demo,以便后续在研究源码的过程中遇到难题时可以进行 Debug。

温馨提示:大家在学习过程中,可以根据官方文档先安装一遍 canal,对理解 Canal 的核心组件有着非常重要的帮助。

首先先从 canal 源码中寻找官方提供的 Demo,其示例代码在 example 包中,如下图所示:

在这里插入图片描述

但是另外稍微遗憾的是 canal 提供提供的示例代码中只包含了 client 端相关的代码,并没有包含服务端(server),故我们将目光放到其单元测试中,如下图所示:

在这里插入图片描述

接下来我根据官方的一些提示,结合自己的理解,编写出如下测试代码,在 IDEA 开发工具中实现运行 Canal 相关的 Demo。下面的代码已通过测试,可直接使用。

1、Canal Server Demo

package com.alibaba.otter.canal.server;

import com.alibaba.otter.canal.instance.core.CanalInstance;

import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;

import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;

import com.alibaba.otter.canal.instance.manager.model.Canal;

import com.alibaba.otter.canal.instance.manager.model.CanalParameter;

import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;

import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.util.Arrays;

public class CanalServerTestMain {

protected static final String ZK_CLUSTER_ADDRESS = “127.0.0.1:2181”;

protected static final String DESTINATION = “example”;

protected static final String DETECTING_SQL = “select 1”;

protected static final String MYSQL_ADDRESS = “127.0.0.1”;

protected static final String USERNAME = “canal”;

protected static final String PASSWORD = “canal”;

protected static final String FILTER = “.\\\\\…\”;

/** 默认 500s 后关闭 */

protected static final long RUN_TIME = 120 * 1000;

private final ByteBuffer header = ByteBuffer.allocate(4);

private CanalServerWithNetty nettyServer;

public static void main(String[] args) {

CanalServerTestMain test = new CanalServerTestMain();

try {

test.setUp();

System.out.println(“start”);

} catch (Throwable e) {

e.printStackTrace();

} finally {

System.out.println(“sleep”);

try {

Thread.sleep(RUN_TIME);

} catch (Throwable ee) {

}

test.tearDown();

System.out.println(“end”);

}

}

public void setUp() {

CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();

embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {

public CanalInstance generate(String destination) {

Canal canal = buildCanal();

return new CanalInstanceWithManager(canal, FILTER);

}

});

nettyServer = CanalServerWithNetty.instance();

nettyServer.setEmbeddedServer(embeddedServer);

nettyServer.setPort(11111);

nettyServer.start();

// 启动 instance

embeddedServer.start(“example”);

}

public void tearDown() {

nettyServer.stop();

}

private Canal buildCanal() {

Canal canal = new Canal();

canal.setId(1L);

canal.setName(DESTINATION);

canal.setDesc(“test”);

CanalParameter parameter = new CanalParameter();

//parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));

parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);

parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);

parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);

parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);

parameter.setMemoryStorageBufferSize(32 * 1024);

parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);

parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),

new InetSocketAddress(MYSQL_ADDRESS, 3306)));

parameter.setDbUsername(USERNAME);

parameter.setDbPassword(PASSWORD);

parameter.setSlaveId(1234L);

parameter.setDefaultConnectionTimeoutInSeconds(30);

parameter.setConnectionCharset(“UTF-8”);

parameter.setConnectionCharsetNumber((byte) 33);

parameter.setReceiveBufferSize(8 * 1024);

parameter.setSendBufferSize(8 * 1024);

parameter.setDetectingEnable(false);

parameter.setDetectingIntervalInSeconds(10);

parameter.setDetectingRetryTimes(3);

parameter.setDetectingSQL(DETECTING_SQL);

canal.setCanalParameter(parameter);

return canal;

}

}

2、Canal Client Demo

package com.alibaba.otter.canal.example;

import java.net.InetSocketAddress;

import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.common.utils.AddressUtils;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import com.alibaba.otter.canal.protocol.CanalEntry.Column;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

public class SimpleCanalClientExample {

public static void main(String[] args) {

// 创建链接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),

11111), “example”, “”, “”);

int batchSize = 1000;

int emptyCount = 0;

try {

connector.connect();

connector.subscribe(“.\…”);

connector.rollback();

int totalEmptyCount = 3000;

while (emptyCount < totalEmptyCount) {

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

img

最后

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!
va获取)**

img

最后

[外链图片转存中…(img-7IxCdpzG-1712447992844)]

[外链图片转存中…(img-D5DgLCph-1712447992844)]

[外链图片转存中…(img-uCn12MMg-1712447992844)]

《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!

GitHub 加速计划 / ca / canal
28.21 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 1 个月前
ff82fd65 1 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐