数据异构之 Canal 初探(技巧篇)
基于 MySQL 这种数据同步机制,那 Canal 的设计目标主要就是实现数据的同步,即数据的复制,从上面的图自然而然的想到了如下的设计:
原理相对比较简单:
-
canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave,向 mysql master 发送 dump 协议
-
mysql master 收到 dump 请求,开始推送 binary log 给 slave (canal)
-
canal解析 binary log 对象(原始为byte流)
接下来我们来看一下 Canale 的整体组成部分:
说明:
-
server代表一个canal运行实例,对应于一个jvm
-
instance对应于一个数据队列 (1个server对应1…n个instance)
instance模块:
-
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
-
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
-
eventStore (数据存储)
-
metaManager (增量订阅&消费信息管理器)
这些组件我暂时不打算深入去研究,因为在目前这个阶段我自己也不清楚,但这个是我后续需要学习研究的重点。
3、在 IntelliJ IDEA 中运行 Canal Demo
在 Linux 环境中安装 canal 比较简单,大家可以安装官方手册一步一步操作即可,在这里我就不重复介绍,本节主要的目的是希望在开发工具中运行 Canal 的 Demo,以便后续在研究源码的过程中遇到难题时可以进行 Debug。
温馨提示:大家在学习过程中,可以根据官方文档先安装一遍 canal,对理解 Canal 的核心组件有着非常重要的帮助。
首先先从 canal 源码中寻找官方提供的 Demo,其示例代码在 example 包中,如下图所示:
但是另外稍微遗憾的是 canal 提供提供的示例代码中只包含了 client 端相关的代码,并没有包含服务端(server),故我们将目光放到其单元测试中,如下图所示:
接下来我根据官方的一些提示,结合自己的理解,编写出如下测试代码,在 IDEA 开发工具中实现运行 Canal 相关的 Demo。下面的代码已通过测试,可直接使用。
1、Canal Server Demo
package com.alibaba.otter.canal.server;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class CanalServerTestMain {
protected static final String ZK_CLUSTER_ADDRESS = “127.0.0.1:2181”;
protected static final String DESTINATION = “example”;
protected static final String DETECTING_SQL = “select 1”;
protected static final String MYSQL_ADDRESS = “127.0.0.1”;
protected static final String USERNAME = “canal”;
protected static final String PASSWORD = “canal”;
protected static final String FILTER = “.\\\\\…\”;
/** 默认 500s 后关闭 */
protected static final long RUN_TIME = 120 * 1000;
private final ByteBuffer header = ByteBuffer.allocate(4);
private CanalServerWithNetty nettyServer;
public static void main(String[] args) {
CanalServerTestMain test = new CanalServerTestMain();
try {
test.setUp();
System.out.println(“start”);
} catch (Throwable e) {
e.printStackTrace();
} finally {
System.out.println(“sleep”);
try {
Thread.sleep(RUN_TIME);
} catch (Throwable ee) {
}
test.tearDown();
System.out.println(“end”);
}
}
public void setUp() {
CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
Canal canal = buildCanal();
return new CanalInstanceWithManager(canal, FILTER);
}
});
nettyServer = CanalServerWithNetty.instance();
nettyServer.setEmbeddedServer(embeddedServer);
nettyServer.setPort(11111);
nettyServer.start();
// 启动 instance
embeddedServer.start(“example”);
}
public void tearDown() {
nettyServer.stop();
}
private Canal buildCanal() {
Canal canal = new Canal();
canal.setId(1L);
canal.setName(DESTINATION);
canal.setDesc(“test”);
CanalParameter parameter = new CanalParameter();
//parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));
parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);
parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);
parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);
parameter.setMemoryStorageBufferSize(32 * 1024);
parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
new InetSocketAddress(MYSQL_ADDRESS, 3306)));
parameter.setDbUsername(USERNAME);
parameter.setDbPassword(PASSWORD);
parameter.setSlaveId(1234L);
parameter.setDefaultConnectionTimeoutInSeconds(30);
parameter.setConnectionCharset(“UTF-8”);
parameter.setConnectionCharsetNumber((byte) 33);
parameter.setReceiveBufferSize(8 * 1024);
parameter.setSendBufferSize(8 * 1024);
parameter.setDetectingEnable(false);
parameter.setDetectingIntervalInSeconds(10);
parameter.setDetectingRetryTimes(3);
parameter.setDetectingSQL(DETECTING_SQL);
canal.setCanalParameter(parameter);
return canal;
}
}
2、Canal Client Demo
package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
public class SimpleCanalClientExample {
public static void main(String[] args) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), “example”, “”, “”);
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(“.\…”);
connector.rollback();
int totalEmptyCount = 3000;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
最后
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门即可获取!
va获取)**
最后
[外链图片转存中…(img-7IxCdpzG-1712447992844)]
[外链图片转存中…(img-D5DgLCph-1712447992844)]
[外链图片转存中…(img-uCn12MMg-1712447992844)]
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门即可获取!
更多推荐
所有评论(0)