docker安装canal数据同步工具

假设一个场景:编写一个博客系统,需要引入elasticsearch搜索引擎实现对文章内容的检索。则需要解决MySQL与elasticsearch数据同步的问题。

此时我们有三种选择:

  • 1、使用业务代码实现同步:
    在操作数据库数据同步操作elasticsearch中的数据。(优点:实现简单,缺点:代码耦合度高,效率低下)

在业务层执行增加、修改、删除改变mysql数据库之后,也执行操作redis的逻辑代码。
优点:操作简单
缺点:与业务操作代码耦合度变高;执行效率低。

  • 2、使用MQ实现同步:

在执行完增加、修改、删除之后, 往MQ中发送一条消息 ;同步程序作为MQ中的消费者,从消息队列中获取消息,然后执行同步elasticsearch数据库的逻辑。
优点:业务代码解耦, 并且可以做到准实时。
缺点:需要在业务代码中加入发送消息到MQ中的代码 , API耦合。

  • 3、binglog来实现同步(canal):

binglog实现同步的方法再细分不止一种,这个笔记主要学习canal,所以以canal为例。而且canal不止可以将数据同步给redis,也可以同步给其他类型的数据库。
优点:与业务代码完全解耦,API完全解耦,可以做到准实时。
缺点:canal是第三方实现的,需要学习成本(学无止尽,技多不压身)。

本章我们学习第三种学习思路,仅实现canal和mysql的数据同步。

1、 创建 canal用户, 并授权

create user canal identified by 'canal';
grant select,replication slave, replication client on *.* to 'canal'@'%';
flush privileges;

# 查看bin-log是否开启 on: 开启 off: 关闭
show variables like 'log_bin'; 

编写my.conf, 挂载容器中的mysql配置文件

[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake
skip-name-resolve

# start binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=1

2、 docker安装canal

docker pull canal/canal-server:v1.1.5
# 创建一个容器
docker run --name canal -d canal/canal-server:v1.1.5
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /wuming/canal
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /wuming/canal

3、配置canal配置文件

查看容器mysql-8的ip

docker inspect mysql  # 这里假设此处ip是 177.17.0.1

修改canal.properties配置文件

# 默认端口 11111
# 默认输出model为tcp, 这里根据使用的mq类型进行修改
# tcp, kafka, RocketMQ
canal.serverMode = tcp

#################################################
######### destinations ############# 
#################################################
# canal可以有多个instance,每个实例有独立的配置文件,默认只 有一个example实例。
# 如果需要处理多个mysql数据的话,可以复制出多个example,对其重新命名,
# 命令和配置文件中指定的名称一致。然后修改canal.properties 中的 canal.destinations
# canal.destinations=实例 1,实例 2,实例 3
canal.destinations = example

修改instance.properties配置文件

# 不能和mysql重复
canal.instance.mysql.slaveId=2
# 使用mysql的虚拟ip和端口
canal.instance.master.address=177.17.0.1:3306
# 使用已创建的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# canal.instance.defaultDatabaseName =test

# 问题:(原本这样的,值同步test库,此处没能解决,单据指定数据库同步配置)
# canal.instance.filter.regex=.*\\..*
# canal.instance.defaultDatabaseName =test

# 注掉上面,然后添加,同步所有的库。
# .\*\\\\..\*:  表示匹配所有的库所有的表
canal.instance.filter.regex =.\*\\\\..\*

# 目的地,可以认识一个消息队列,不需要更改。
canal.mq.topic=example

# 如果是

4、重新创建canal容器并挂在配置文件。

docker run --name canal -p 11111:11111 \
-v /wuming/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-v /wuming/canal/canal.properties:/home/admin/canal-server/conf/canal.properties \
-d canal/canal-server:v1.1.5

5、使用java程序连接canal验证是否可以实现同步

引入依赖

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.5</version>
        </dependency>

        <!-- MessageCanalEntry.Entry等来自此安装包 -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.5</version>
        </dependency>

程序连接

@Slf4j
public class CanalTest {

    //Canal服务地址 使用自己虚拟机的ip地址
    private static final String SERVER_ADDRESS = "127.0.0.1";

    //Canal Server 服务端口号
    private static final Integer PORT = 11111;

    //目的地,其实Canal Service内部有一个队列,和配置文件中一致即可,参考【修改instance.properties】图中
    private static final String DESTINATION = "example";

    //用户名和密码,但是目前不支持,只能为空
    private static final String USERNAME = "";

    //用户名和密码,但是目前不支持,只能为空
    private static final String PASSWORD= "";

    public static void main(String[] args){
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
        canalConnector.connect();
        //订阅所有消息
        canalConnector.subscribe(".*\\..*");
        // 只订阅test数据库下的所有表
        //canalConnector.subscribe("test");
        //恢复到之前同步的那个位置
        canalConnector.rollback();

        for(;;){
            //获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
            Message message = canalConnector.getWithoutAck(100);
            //获取消息id
            long batchId = message.getId();
            if(batchId != -1){
                log.info("msgId -> " + batchId);
                printEnity(message.getEntries());
                //提交确认
                //canalConnector.ack(batchId);
                //处理失败,回滚数据
                //canalConnector.rollback(batchId);
            }
        }
    }

    private static void printEnity(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if(entry.getEntryType() != CanalEntry.EntryType.ROWDATA){
                continue;
            }
            try{
                // 序列化数据
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    System.out.println(rowChange.getEventType());
                    switch (rowChange.getEventType()){
                        //如果希望监听多种事件,可以手动增加case
                        case INSERT:
                            // 表名
                            String tableName = entry.getHeader().getTableName();
                            //System.out.println("表名:"+tableName);
                            //测试users表进行映射处
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            //for(CanalEntry.Column c:afterColumnsList){
                            //	System.out.println("字段:"+c.getName()+"值:"+c.getValue());
                            //}

                            System.out.println(afterColumnsList);
                            break;
                        case UPDATE:
                            List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
                            System.out.println("新插入的数据是:" + afterColumnsList2);
                            break;
                        case DELETE:
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            System.out.println("被删除的数据是:" + beforeColumnsList);
                            break;
                        default:
                    }
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    }
}

执行程序并,使用navicat操作数据,查看程序是否读取bin-log。

6、canal是什么及工作原理

6.1、canal是什么

Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

6.2、Canal 的工作原理
很简单,就是把自己伪装成 Slave,假装从 Master 复制数据。

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

补充:查看mysql的binlog日志情况

# 查看binlog文件列表
show binary logs;
# 查看当前正在写入的binlog文件
show master status;

# 查看指定binlog文件的内容
show binlog events [in 'log_name'] [FROM pos] [limit [offset,] row_count]
GitHub 加速计划 / ca / canal
27
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:5 个月前 )
c15129b0 - 6 天前
0741ccda - 6 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐