目录

一、canal服务端

1.1 下载

1.2 解压

1.3 配置

1.4 启动

1.5 查看

二、canal客户端(Java编写业务程序)

2.1 SQL脚本

2.2 写POM

2.3 写Yaml

2.4 写业务类

2.4.1.项目结构

2.4.2 Utils.RedisUtil

2.4.3 biz.RedisCanalClientExample


一、canal服务端

1.1 下载

1.2 解压

tar -zxvf canal.deployer-1.1.6.tar.gz 到mycanal文件夹

1.3 配置

修改/mycan/conf/example/instance.properties文件

  • 换成自己的mysql主机master的IP地址

  • 换成自己的在mysql新建的canal账户

1.4 启动

注意这个地方需要JDK环境支持才能正常启动,那就补充一下安装JDK

  • Centos7安装JDK8

    • 在下载linux64版本的jdk

    • 解压后放到自己指定的文件夹

    • 配置环境变量:vim /ect/profile新增内容后在source /etc/profile 最后java -version 看是否安装成功

export JAVA_HOME=/myjava/jdk1.8.0_371
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JRE_HOME/lib/ext:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
  • 启动canal--->在canal的bin目录下执行 ./startup.sh

1.5 查看

  • 查看server日志 在目录mycanal/logs/canal/下执行cat canal.log
  • 查看样例example的日志 在目录mycanal/logs/example/下执行cat example.log

二、canal客户端(Java编写业务程序)

2.1 SQL脚本

CREATE TABLE `t_user`(
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
  `userName` VARCHAR(100) NOT NULL,
  PRIMARY KEY(`id`)
)ENGINE=INNODB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

2.2 写POM

<dependencies>
  <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
  </dependency>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.2.14</version>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.16</version>
  </dependency>
  <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
	</dependency>
</dependencies>

2.3 写Yaml

server:
  port: 5555

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: root
    password: 980918
    driver-class-name: com.mysql.jdbc.Driver
    druid:
      test-while-idle: false

2.4 写业务类

2.4.1.项目结构

2.4.2 Utils.RedisUtil

package com.atguigu.canal.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @author zhumq
 * @date 2024/7/27 9:24
 */
public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "192.168.111.185";
    public static final String  REDIS_pwd = "111111";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }

}

2.4.3 biz.RedisCanalClientExample

package com.atguigu.canal.biz;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.utils.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author zhumq
 * @date 2024/7/27 9:26
 */
public class RedisCanalClientExample
{
    /**
     * 表示60秒的常量。
     * 用于定义某些操作的时间间隔。
     */
    public static final Integer _60SECONDS = 60;
    
    /**
     * Redis服务器的IP地址。
     * 用于数据存储和检索操作中定位Redis服务器。
     */
    public static final String REDIS_IP_ADDR = "192.168.111.185";
    
    /**
     * 将数据插入Redis。
     *
     * @param columns 列数据列表,每项包含列名、值和更新标志。
     *                本方法首先将列数据列表转换为JSON对象,
     *                然后使用第一列的值作为键,在Redis中存储JSON字符串。
     *
     *                设计目的可能是以结构化方式在Redis中存储实体的相关属性信息,
     *                以便于快速检索和使用。
     */
    private static void redisInsert(List<Column> columns)
    {
        // 创建一个JSON对象来存储列数据
        JSONObject jsonObject = new JSONObject();
        
        // 遍历列数据列表,填充JSON对象
        for (Column column : columns)
        {
            // 打印列信息,用于调试或日志记录
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            
            // 将列名和值添加到JSON对象中
            jsonObject.put(column.getName(), column.getValue());
        }
        
        // 如果列数据列表不为空,则执行Redis插入操作
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                // 使用第一列的值作为键,序列化的JSON对象作为值,存储到Redis中
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
            } catch (Exception e) {
                // 打印异常堆栈跟踪,用于错误处理或日志记录
                e.printStackTrace();
            }
        }
    }
    
    
    /**
     * 删除Redis中的键值对。
     *
     * 此方法通过接收一列列名和对应的值,构建一个JSON对象。然后,它从这个JSON对象中提取第一个列的值,
     * 并使用这个值作为Redis键来删除对应的条目。这个方法假设Redis已经连接,并且通过RedisUtils.getJedis()
     * 提供了Jedis实例。
     *
     * @param columns 列表,包含要删除的Redis键对应的列名和值。
     */
    private static void redisDelete(List<Column> columns)
    {
        // 构建一个JSON对象,用于存储列名和对应的值
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            // 将每一列的名称和值添加到JSON对象中
            jsonObject.put(column.getName(),column.getValue());
        }
        // 当列表不为空时,尝试删除Redis中的条目
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                // 使用列表中第一个列的值作为键,删除Redis中的对应条目
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                // 打印堆栈跟踪,以记录任何异常
                e.printStackTrace();
            }
        }
    }
    
    
    /**
     * 更新Redis中的数据。
     * 该方法接收一个列的列表,将这些列的名称和值存储到JSON对象中,然后将这个JSON对象存储到Redis中。
     * 此方法主要用于在数据更新后,将更新的列及其值同步到Redis缓存中,以保持数据的一致性。
     *
     * @param columns 列的列表,每个列包含一个名称、一个值和一个标志位表示该列是否被更新。
     */
    private static void redisUpdate(List<Column> columns)
    {
        // 创建一个JSON对象,用于存储列的名称和值。
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            // 打印列的名称、值和更新状态,用于调试和日志记录。
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            // 将列的名称和值添加到JSON对象中。
            jsonObject.put(column.getName(),column.getValue());
        }
        // 检查列表是否为空,如果不为空,则更新Redis。
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                // 使用列列表中第一个列的值作为键,将JSON对象序列化为字符串后存储到Redis中。
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                // 打印更新后的Redis数据,用于调试和确认更新是否成功。
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                // 捕获并打印任何异常,确保方法在异常情况下不会中断执行。
                e.printStackTrace();
            }
        }
    }

    
    /**
     * 打印日志条目中的变更信息。
     * 此方法忽略事务开始和结束的日志条目,因为它只对实际的数据变更感兴趣。
     * 它解析每条日志条目中的RowChange数据,并根据变更类型(插入、删除、更新)执行相应的操作。
     *
     * @param entrys 日志条目的列表,这些条目包含数据库变更的信息。
     */
    public static void printEntry(List<Entry> entrys) {
        // 遍历每个日志条目
        for (Entry entry : entrys) {
            // 跳过事务开始和结束的日志条目
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            
            RowChange rowChage = null;
            try {
                // 从日志条目的存储值中解析RowChange对象
                // 获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                // 如果解析失败,抛出运行时异常,并包含错误详情
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
            }
            // 获取事件类型 获取变动类型
            EventType eventType = rowChage.getEventType();
            // 打印日志条目的基本信息,包括日志文件名、偏移量、模式名、表名和事件类型
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
            
            // 遍历RowData列表,根据事件类型执行相应的操作(插入、删除、更新)
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    // 对于插入事件,调用redisInsert方法处理后的列数据
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    // 对于删除事件,调用redisDelete方法处理前的列数据
                    redisDelete(rowData.getBeforeColumnsList());
                } else {// EventType.UPDATE
                    // 对于更新事件,调用redisUpdate方法处理后的列数据
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }
    
    
    /**
     * 程序入口主方法,用于初始化并连接Canal服务器,以监听MySQL数据库的变化。
     * @param args 命令行参数
     */
    public static void main(String[] args)
    {
        // 初始化时的提示信息
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
        
        // 创建Canal客户端连接器,用于连接和通信
        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
                "example",
                "",
                "");
        
        // 定义每次获取的记录数量
        int batchSize = 1000;
        // 定义空闲循环计数器,用于判断是否需要重新连接
        // 空闲空转计数器
        int emptyCount = 0;
        
        // 连接初始化完成的提示信息
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            // 连接Canal服务器
            connector.connect();
            // 订阅指定的数据库表变更事件
            //connector.subscribe(".*\\..*");
            connector.subscribe("bigdata.t_user");
            // 回滚事务,确保数据一致性
            connector.rollback();
            // 定义空闲循环的总次数
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                // 每秒打印一次监控信息
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                // 获取一批变更记录
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                // 获取批次ID
                long batchId = message.getId();
                // 获取记录数量
                int size = message.getEntries().size();
                // 如果批次ID为-1或记录数量为0,表示没有数据变更
                if (batchId == -1 || size == 0) {
                    // 空闲计数器加1
                    emptyCount++;
                    // 每秒检查一次
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    // 有数据变更,重置空闲计数器
                    //计数器重新置零
                    emptyCount = 0;
                    // 打印变更记录
                    printEntry(message.getEntries());
                }
                // 确认处理完成,提交批次
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            // 监听结束的提示信息
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            // 断开与Canal服务器的连接
            connector.disconnect();
        }
    }
}

题外话:

  • Java程序下connector.subscribe配置的过滤正则

关闭资源简写

  • try-with-resources释放资源

        

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:2 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐