canal动态监控mysql的数据表
·
1、需求:动态监控mysql中gmall2数据库的order_info数据表,将变化的信息转成json字符串存储到kafka中,具体实现如下:
CanalClient —— 监控 order_info 单表的代码
package com.zyj.gmall.canal import java.net.{InetSocketAddress, SocketAddress} import java.util import com.alibaba.fastjson.JSONObject import com.alibaba.otter.canal.client.CanalConnectors import com.alibaba.otter.canal.protocol.CanalEntry import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowChange} import com.zyj.gmall.common.Constant import scala.collection.JavaConversions._ object CanalClient { def main(args: Array[String]): Unit = { //1. 连接到canal val address = new InetSocketAddress("hadoop103", 11111) val connector = CanalConnectors.newSingleConnector(address, "example", "", "") connector.connect() //连接 //1.1 订阅数据 gmall2.* 表示gmall2数据下所有的表 connector.subscribe("gmall2.*") //2.读数据,解析数据 while (true) { // 2.1 使用循环的方式持续的从canal读取数据 val msg = connector.get(100) // 2.2 一次从canal拉取最多100条sql数据引起的变化 //2.3 一个entry封装一条sql的变化结果 ,做非空判断 val entriesOption = if (msg != null) Some(msg.getEntries) else None if (entriesOption.isDefined && entriesOption.get.nonEmpty) { val entries = entriesOption.get for (entry <- entries) { //2.4 从每个entry获取一个storeValue val storeValue = entry.getStoreValue //2.5 把storeValue解析出来rowChange val rowChange = RowChange.parseFrom(storeValue) //2.6 一个storeValue中有多个RowData,每个RowData表示一行数据的变化 val rowDatas = rowChange.getRowDatasList //2.7 解析rowDatas中的每行的每列数据 handleDate(entry.getHeader.getTableName, rowDatas, rowChange.getEventType) } } else { println("没有拉取到数据,2秒后重试。。。") Thread.sleep(2000) } } // 处理rowData数据 def handleDate(tableName: String, rowDatas: util.List[CanalEntry.RowData], eventType: CanalEntry.EventType): Unit = { if ("order_info" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty) { for (rowData <- rowDatas) { val result = new JSONObject() //1. 一行所有的变化后的列 val columnsList = rowData.getAfterColumnsList //2. 一行数据将来在kafka中,应该房一样,多列中封装一个json字符串 for (column <- columnsList) { val key = column.getName // 列名 val value = column.getValue // 列值 result.put(key, value) } //3.把数据转成json字符串写入到kafka中,{列名:列值,列名:列值,....} val content = result.toJSONString println(content) MyKafkaUtil.send(Constant.TOPIC_ORDER_INFO, content) } } } } }
CanalClient —— 监控 order_info 和 order_detail 多表的代码,对代码做封装
package com.zyj.gmall.canal import java.net.{InetSocketAddress, SocketAddress} import java.util import com.alibaba.fastjson.JSONObject import com.alibaba.otter.canal.client.CanalConnectors import com.alibaba.otter.canal.protocol.CanalEntry import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowChange} import com.zyj.gmall.common.Constant import scala.collection.JavaConversions._ object CanalClient { def main(args: Array[String]): Unit = { //1. 连接到canal val address = new InetSocketAddress("hadoop103", 11111) val connector = CanalConnectors.newSingleConnector(address, "example", "", "") connector.connect() //连接 //1.1 订阅数据 gmall2.* 表示gmall2数据下所有的表 connector.subscribe("gmall2.*") //2.读数据,解析数据 while (true) { // 2.1 使用循环的方式持续的从canal读取数据 val msg = connector.get(100) // 2.2 一次从canal拉取最多100条sql数据引起的变化 //2.3 一个entry封装一条sql的变化结果 ,做非空判断 val entriesOption = if (msg != null) Some(msg.getEntries) else None if (entriesOption.isDefined && entriesOption.get.nonEmpty) { val entries = entriesOption.get for (entry <- entries) { //2.4 从每个entry获取一个storeValue val storeValue = entry.getStoreValue //2.5 把storeValue解析出来rowChange val rowChange = RowChange.parseFrom(storeValue) //2.6 一个storeValue中有多个RowData,每个RowData表示一行数据的变化 val rowDatas = rowChange.getRowDatasList //2.7 解析rowDatas中的每行的每列数据 handleDate(entry.getHeader.getTableName, rowDatas, rowChange.getEventType) } } else { println("没有拉取到数据,2秒后重试。。。") Thread.sleep(2000) } } // 处理rowData数据 def handleDate(tableName: String, rowDatas: util.List[CanalEntry.RowData], eventType: CanalEntry.EventType): Unit = { if ("order_info" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty) { sendToKafka(Constant.TOPIC_ORDER_INFO, rowDatas) } else if ("order_detail" == tableName && eventType == EventType.INSERT && rowDatas != null && rowDatas.nonEmpty) { sendToKafka(Constant.TOPIC_ORDER_DETAIL, rowDatas) } } } // 把数据发送到kafka private def sendToKafka(topic: String, rowDatas: util.List[CanalEntry.RowData]) = { for (rowData <- rowDatas) { val result = new JSONObject() //1. 一行所有的变化后的列 val columnsList = rowData.getAfterColumnsList //2. 一行数据将来在kafka中,应该房一样,多列中封装一个json字符串 for (column <- columnsList) { val key = column.getName // 列名 val value = column.getValue // 列值 result.put(key, value) } //3.把数据转成json字符串写入到kafka中,{列名:列值,列名:列值,....} val content = result.toJSONString println(content) MyKafkaUtil.send(topic, content) } } }
MyKafkaUtil
package com.zyj.gmall.canal import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} object MyKafkaUtil { val prop = new Properties() prop.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092") prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](prop) def send(topic: String, content: String) = { producer.send(new ProducerRecord[String, String](topic, content)) } }
pom (引用的父模块中fastjson依赖)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>gmall1015</artifactId> <groupId>com.zyj.gmall</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>gmall-canal</artifactId> <dependencies> <!--canal 客户端, 从 canal 服务器读取数据--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <!-- kafka 客户端 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.2</version> </dependency> <dependency> <groupId>com.zyj.gmall</groupId> <artifactId>gmall-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
更多推荐
所有评论(0)