将mysql数据通过canal+kafka+sparkstructedstreaming写入hudi并同步hive
canal
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
项目地址:https://gitcode.com/gh_mirrors/ca/canal
免费下载资源
·
1 配置canal 读取mysql日志正则将数据分发(动态分区)至对应kafka topic
2 sparkstructedstreaming获取kafka数据 并将数据存储至hudi
本人有大量表名为 document_xxx(document_1,document_2,document_3…)
通过canal将数据存储kafka topic (document)
object SSSHudiETL {
case class Model_Document(table: String, sql_type: String, data1: Documents)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("sparkstructedstreaming")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.shuffle.partitions", "3")
// .setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val document_df = sparkSession.readStream.format("kafka")
.option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:6667")
//topic
.option("subscribe", "document")
//如果本地checkpointLocation 后即使配置 也会读取存储的位置开始
.option("startingOffsets", "earliest")
//数据保存7 天如果失败 则没数据
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "30000")
.load()
val b2s = new Bean2Sql
import sparkSession.implicits._
/*
* document 表ETL
* */
val document_query = document_df.selectExpr("cast (value as string)").as[String]
.map(item => {
val jsonObject = ParseJsonData.getJsonData(item)
val table = if (jsonObject.containsKey("table")) jsonObject.getString("table") else ""
val data = if (jsonObject.containsKey("data")) ParseJsonData.getJsonData(jsonObject.getJSONArray("data").get(0).toString) else null
val sql_type = if (jsonObject.containsKey("type")) jsonObject.getString("type") else ""
//将json里的字段转为存储同步Hive的分区字段 和hudi的TS时间戳
data.put("DT", castDateFormat(data.get("CREATE_DATE").toString))
data.put("TS", castDateTimeStamp(data.get("CREATE_DATE").toString))
# 工具类 将json数据转为 case document对象
val data1 = b2s.json2document(data)
Model_Document(table, sql_type, data1)
})
.writeStream.foreachBatch { (batchDF: Dataset[Model_Document], _: Long) =>
batchDF.cache() //缓存数据
val upsertData = batchDF.filter("sql_type =='INSERT' or sql_type =='UPDATE'") //新增数据 和修改数据
val deleteData = batchDF.filter("sql_type =='DELETE'") //删除数据
upsertHudiDocument(upsertData, sparkSession)
deleteHudiDocument(deleteData, sparkSession)
batchDF.unpersist().show()
}.option("checkpointLocation", "/save_path/checkpoint/document").start()
document_query.awaitTermination()
}
/**
* 插入更新hudi document
* */
def upsertHudiDocument(batchDF: Dataset[Model_Document], sparkSession: SparkSession) {
import sparkSession.implicits._
val result = batchDF.mapPartitions(partitions => {
partitions.map(item => {
item.data1
})
})
//写入hudi
result.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
//本人字段
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "FPQQLSH") //设置主键
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "TS") //数据更新时间戳的
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "DT") //hudi分区列
.option("hoodie.table.name", "document") //hudi表名
.option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://aisino-master.pro.com:2181,aisino-slave01.pro.com:2181,aisino-slave02.pro.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2") //hiveserver2地址
.option(DataSourceWriteOptions.HIVE_DATABASE.key(), "hudi") //设置hudi与hive同步的数据库
.option(DataSourceWriteOptions.HIVE_TABLE.key(), "document") //设置hudi与hive同步的表名
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "DT") //hive表同步的分区列
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") //设置数据集注册并同步到hive
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") //设置当分区变更时,当前数据的分区目录是否变更
.option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
.option(HoodieCompactionConfig.INLINE_COMPACT.key(),"true")
// 本来想通过下列方式让 hudi compact 速度快点,但是试了好像没有效果,好像是flink的配置
// .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key(),"true")
// .option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),"NUM_OR_TIME")
// .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3")
// .option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key(),"60")
.option("hoodie.bulkinsert.shuffle.parallelism", "12")
.option("hoodie.insert.shuffle.parallelism", "12")
.option("hoodie.upsert.shuffle.parallelism", "12")
.option("hoodie.bootstrap.parallelism", "12")
.mode(SaveMode.Append)
.save("/save_path/hudi/document")
}
/**
* 删除hudi document
* */
def deleteHudiDocument(batchDF: Dataset[Model_Document], sparkSession: SparkSession) = {
import sparkSession.implicits._
val result = batchDF.mapPartitions(partitions => {
partitions.map(item => {
item.data1
})
})
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
.option(DataSourceWriteOptions.OPERATION.key(), "delete") //删除数据操作
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "FPQQLSH") //设置主键
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "TS") //数据更新时间戳的
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "DT") //hudi分区列
.option("hoodie.table.name", "document") //hudi表名
.option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://aisino-master.pro.com:2181,aisino-slave01.pro.com:2181,aisino-slave02.pro.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2") //hiveserver2地址
.option(DataSourceWriteOptions.HIVE_DATABASE.key(), "hudi") //设置hudi与hive同步的数据库
.option(DataSourceWriteOptions.HIVE_TABLE.key(), "document") //设置hudi与hive同步的表名
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "DT") //hive表同步的分区列
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
// .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[NonPartitionedExtractor].getName) // 没有分区
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") //设置数据集注册并同步到hive
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") //设置当分区变更时,当前数据的分区目录是否变更
.option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
.option(HoodieCompactionConfig.INLINE_COMPACT.key(),"true")
// .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key(),"true")
// .option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),"NUM_OR_TIME")
// .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3")
// .option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key(),"60")
.option("hoodie.bulkinsert.shuffle.parallelism", "12")
.option("hoodie.delete.shuffle.parallelism", "12")
.option("hoodie.bootstrap.parallelism", "12")
.mode(SaveMode.Append)
.save("/save_path/hudi/document")
}
def castDateFormat(date: String): String = {
val spf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val spf2 = new SimpleDateFormat("yyyy-MM-dd")
val dateFormat = spf1.parse(date)
spf2.format(dateFormat)
}
def castDateTimeStamp(date: String): Long = {
val spf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val dateFormat = spf1.parse(date)
dateFormat.getTime()
}
}
GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65
2 个月前
更多推荐
已为社区贡献4条内容
所有评论(0)