2.使用javaAPI连接canal订阅MySQL日志
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
maven依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
核心类图
- 抓取Binlog日志底层封装成一个Message 对应多个SQL的执行结果
- Message中包含多个Entry
- Entry中包含了具体的执行的影响的结果
- tableName: 表明
- EntryType: 操作类型 一般关心ROWDATA 数据改变
- storeValue: 具体binlog日志中的数据
- storeValue经过反序列之后对应RowChange对象
- RowChange对象封装了数据的改变
- EventType 具体SQL类型 如INSERT UPDATE DELETE ALTER等
- BeforeColumnsList: 变更前的数据 INSERT空 为空
- AfterColumnsList: 变更之后的数据 DELETE时为空
- 具体每一个字段对应了一个一个的Column 每一个Column包含了name value index等分别对应了该字段在MySQL的字段名 具体值 位置等等信息
核心API
- CanalConnectors.newSingleConnector 创建连接
- connector.connect / disconnect 连接和断开
- CanalConnector#subscribe(java.lang.String) 订阅具体的数据库 格式可以是dbName.tableName 或者使用通配符和 test_db.* 表示test_db数据库中所有的表
- CanalConnector#get(int) 获取X条message不会阻塞等待,比如传入100不会等到存满100条才返回,有多少返回多少
- Message#getEntries 从message中获取entry
- CanalEntry.Entry#getStoreValue 从entry中拿到binlog日志中的数据
- RowChange#parseFrom 将storeValue 反序列化成可以使用的数据
- RowChange#getRowDatasList 具体的数据变更
实例代码
@Slf4j
public class CanalTest {
public static void main(String[] args) throws InterruptedException {
// 创建链接
/**
* address: MySQL 链接
* destination: 对应canal中的dest配置
* username: canal客户端链接canal服务端的账号密码不是MySQL的密码!!! 默认为空
* password:
*/
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.209.100",
11111), "example", "", "");
// 客户端链接canal服务器
connector.connect();
// 订阅监听的数据库名称
connector.subscribe("canal_test.*");
while (true) {
// 每次请求100条变更
Message message = connector.get(100);
// 如果没有获取到信息 休眠3sec
// 变更的记录封装在Message的entry中
List<CanalEntry.Entry> entries = message.getEntries();
if (entries.size() == 0) {
log.info("没有获取到数据");
Thread.sleep(3000);
}
entries.forEach(entry -> {
// 如果是数据变更
/**
* ROWDATA 数据变更
* TRANSACTIONBEGIN 事务开启
* TRANSACTIONEND 事务关闭
* 一般我们关心的是ROWDATA
*/
if (entry.getEntryType() == ROWDATA) {
// entry中的storeValue是对应binlog日志中的序列化之后的数据 使用需要反序列化
ByteString storeValue = entry.getStoreValue();
try {
// 调用RowChange.parseFrom 对数据进行反序列化
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
printRowChange(rowChange);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
});
}
}
public static void printRowChange(CanalEntry.RowChange rowChange) {
// rowChange中的事件包含了INSERT DELETE UPDATE ALTER等等..一般关心增删改
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
if (rowChange.getEventType() == INSERT) {
log.info("=================== 插入操作 ===================");
} else if (rowChange.getEventType() == DELETE) {
log.info("=================== 删除操作 ===================");
} else if (rowChange.getEventType() == UPDATE) {
log.info("=================== 更新操作 ===================");
}
rowDatasList.forEach(data -> {
// 变更前的数据集合 RowData.getBeforeColumnsList 封装出来封装成column 中的name是MySQL的字段名 value就是对应的数据
List<CanalEntry.Column> beforeColumnsList = data.getBeforeColumnsList();
log.info("=================== before ===================");
Map<String, String> beforeData = beforeColumnsList.stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
log.info("变更前的数据:{}", beforeData.toString());
log.info("=================== before end ===================");
// 变更后的数据集合
List<CanalEntry.Column> afterColumnsList = data.getAfterColumnsList();
log.info("=================== after ===================");
Map<String, String> afterData = afterColumnsList.stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
log.info("变更后的数据:{}", afterData.toString());
log.info("=================== after end ===================");
});
}
}
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== 删除操作 ===================
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== before ===================
11:42:56.237 [main] INFO com.corn.canal.CanalTest - 变更前的数据:{name=555, id=4}
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== before end ===================
11:42:56.237 [main] INFO com.corn.canal.CanalTest - =================== after ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== 更新操作 ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== before ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - 变更前的数据:{name=464, id=2}
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== before end ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== after ===================
11:42:47.190 [main] INFO com.corn.canal.CanalTest - 变更后的数据:{name=555, id=2}
11:42:47.190 [main] INFO com.corn.canal.CanalTest - =================== after end ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== 插入操作 ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== before ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - 变更前的数据:{}
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== before end ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== after ===================
11:42:41.144 [main] INFO com.corn.canal.CanalTest - 变更后的数据:{name=555, id=4}
11:42:41.144 [main] INFO com.corn.canal.CanalTest - =================== after end ===================
GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65
2 个月前
更多推荐
已为社区贡献1条内容
所有评论(0)