maven依赖

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.0</version>
    </dependency>

核心类图

在这里插入图片描述

  • 抓取Binlog日志底层封装成一个Message 对应多个SQL的执行结果
  • Message中包含多个Entry
  • Entry中包含了具体的执行的影响的结果
    • tableName: 表明
    • EntryType: 操作类型 一般关心ROWDATA 数据改变
    • storeValue: 具体binlog日志中的数据
    • storeValue经过反序列之后对应RowChange对象
    • RowChange对象封装了数据的改变
      • EventType 具体SQL类型 如INSERT UPDATE DELETE ALTER等
      • BeforeColumnsList: 变更前的数据 INSERT空 为空
      • AfterColumnsList: 变更之后的数据 DELETE时为空
      • 具体每一个字段对应了一个一个的Column 每一个Column包含了name value index等分别对应了该字段在MySQL的字段名 具体值 位置等等信息

核心API

  • CanalConnectors.newSingleConnector 创建连接
  • connector.connect / disconnect 连接和断开
  • CanalConnector#subscribe(java.lang.String) 订阅具体的数据库 格式可以是dbName.tableName 或者使用通配符和 test_db.* 表示test_db数据库中所有的表
  • CanalConnector#get(int) 获取X条message不会阻塞等待,比如传入100不会等到存满100条才返回,有多少返回多少
  • Message#getEntries 从message中获取entry
  • CanalEntry.Entry#getStoreValue 从entry中拿到binlog日志中的数据
  • RowChange#parseFrom 将storeValue 反序列化成可以使用的数据
  • RowChange#getRowDatasList 具体的数据变更

实例代码

@Slf4j
public class CanalTest {

    public static void main(String[] args) throws InterruptedException {
        // 创建链接
        /**
         * address: MySQL 链接
         * destination: 对应canal中的dest配置
         * username: canal客户端链接canal服务端的账号密码不是MySQL的密码!!! 默认为空
         * password:
         */
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.209.100",
                11111), "example", "", "");
        // 客户端链接canal服务器
        connector.connect();
        // 订阅监听的数据库名称
        connector.subscribe("canal_test.*");
        while (true) {
            // 每次请求100条变更
            Message message = connector.get(100);
            // 如果没有获取到信息 休眠3sec
            // 变更的记录封装在Message的entry中
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size() == 0) {
                log.info("没有获取到数据");
                Thread.sleep(3000);
            }
            entries.forEach(entry -> {
                // 如果是数据变更
                /**
                 * ROWDATA 数据变更
                 * TRANSACTIONBEGIN 事务开启
                 * TRANSACTIONEND 事务关闭
                 * 一般我们关心的是ROWDATA
                 */
                if (entry.getEntryType() == ROWDATA) {
                    // entry中的storeValue是对应binlog日志中的序列化之后的数据 使用需要反序列化
                    ByteString storeValue = entry.getStoreValue();
                    try {
                        // 调用RowChange.parseFrom 对数据进行反序列化
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        printRowChange(rowChange);
                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    }

    public static void printRowChange(CanalEntry.RowChange rowChange) {
        // rowChange中的事件包含了INSERT DELETE UPDATE ALTER等等..一般关心增删改
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        if (rowChange.getEventType() == INSERT) {
            log.info("=================== 插入操作 ===================");
        } else if (rowChange.getEventType() == DELETE) {
            log.info("=================== 删除操作 ===================");
        } else if (rowChange.getEventType() == UPDATE) {
            log.info("=================== 更新操作 ===================");
        }
        rowDatasList.forEach(data -> {
            // 变更前的数据集合 RowData.getBeforeColumnsList 封装出来封装成column 中的name是MySQL的字段名 value就是对应的数据
            List<CanalEntry.Column> beforeColumnsList = data.getBeforeColumnsList();
            log.info("=================== before ===================");
            Map<String, String> beforeData = beforeColumnsList.stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
            log.info("变更前的数据:{}", beforeData.toString());
            log.info("=================== before end ===================");
            // 变更后的数据集合
            List<CanalEntry.Column> afterColumnsList = data.getAfterColumnsList();
            log.info("=================== after ===================");
            Map<String, String> afterData = afterColumnsList.stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
            log.info("变更后的数据:{}", afterData.toString());
            log.info("=================== after end ===================");
        });
    }

}
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== 删除操作 ===================
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== before ===================
11:42:56.237 [main] INFO com.corn.canal.CanalTest - 变更前的数据:{name=555, id=4}
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== before end ===================
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== after ===================

11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== 更新操作 ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== before ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - 变更前的数据:{name=464, id=2}
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== before end ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== after ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - 变更后的数据:{name=555, id=2}
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== after end ===================

11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== 插入操作 ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== before ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - 变更前的数据:{}
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== before end ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== after ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - 变更后的数据:{name=555, id=4}
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== after end ===================
GitHub 加速计划 / ca / canal
25
8
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:4 个月前 )
2d0dcf0e - 3 天前
515c05c8 - 3 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐