1 配置canal 读取mysql日志正则将数据分发(动态分区)至对应kafka topic

2 sparkstructedstreaming获取kafka数据 并将数据存储至hudi

本人有大量表名为 document_xxx(document_1,document_2,document_3…)
通过canal将数据存储kafka topic (document)

object SSSHudiETL {

  case class Model_Document(table: String, sql_type: String, data1: Documents)

  def main(args: Array[String]): Unit = {


    val sparkConf = new SparkConf().setAppName("sparkstructedstreaming")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.sql.shuffle.partitions", "3")
    //      .setMaster("local[*]")

    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()


    val document_df = sparkSession.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "xxx.xxx.xxx.xxx:6667")
      //topic
      .option("subscribe", "document")
      //如果本地checkpointLocation 后即使配置 也会读取存储的位置开始
      .option("startingOffsets", "earliest")
      //数据保存7 天如果失败 则没数据
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", "30000")
      .load()
	
    val b2s = new Bean2Sql
    import sparkSession.implicits._

    /*
    * document 表ETL
    * */
    val document_query = document_df.selectExpr("cast (value as string)").as[String]
      .map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val table = if (jsonObject.containsKey("table")) jsonObject.getString("table") else ""
        val data = if (jsonObject.containsKey("data")) ParseJsonData.getJsonData(jsonObject.getJSONArray("data").get(0).toString) else null
        val sql_type = if (jsonObject.containsKey("type")) jsonObject.getString("type") else ""
		//将json里的字段转为存储同步Hive的分区字段 和hudi的TS时间戳
        data.put("DT", castDateFormat(data.get("CREATE_DATE").toString))
        data.put("TS", castDateTimeStamp(data.get("CREATE_DATE").toString))
		# 工具类 将json数据转为 case document对象
        val data1 = b2s.json2document(data)
        Model_Document(table, sql_type, data1)

      })
      .writeStream.foreachBatch { (batchDF: Dataset[Model_Document], _: Long) =>
      batchDF.cache() //缓存数据

      val upsertData = batchDF.filter("sql_type =='INSERT' or sql_type =='UPDATE'") //新增数据 和修改数据
      val deleteData = batchDF.filter("sql_type =='DELETE'") //删除数据

      upsertHudiDocument(upsertData, sparkSession)
      deleteHudiDocument(deleteData, sparkSession)

      batchDF.unpersist().show()

    }.option("checkpointLocation", "/save_path/checkpoint/document").start()

    document_query.awaitTermination()

  }

  /**
   * 插入更新hudi document
   * */

  def upsertHudiDocument(batchDF: Dataset[Model_Document], sparkSession: SparkSession) {


    import sparkSession.implicits._
    val result = batchDF.mapPartitions(partitions => {
      partitions.map(item => {
        item.data1

      })
    })

    //写入hudi
    result.write.format("org.apache.hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
      //本人字段
      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "FPQQLSH") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "TS") //数据更新时间戳的
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "DT") //hudi分区列
      .option("hoodie.table.name", "document") //hudi表名
      .option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://aisino-master.pro.com:2181,aisino-slave01.pro.com:2181,aisino-slave02.pro.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2") //hiveserver2地址
      .option(DataSourceWriteOptions.HIVE_DATABASE.key(), "hudi") //设置hudi与hive同步的数据库
      .option(DataSourceWriteOptions.HIVE_TABLE.key(), "document") //设置hudi与hive同步的表名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "DT") //hive表同步的分区列
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") //设置数据集注册并同步到hive
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") //设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieCompactionConfig.INLINE_COMPACT.key(),"true")
      // 本来想通过下列方式让 hudi compact 速度快点,但是试了好像没有效果,好像是flink的配置
      //      .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key(),"true")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),"NUM_OR_TIME")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key(),"60")
      .option("hoodie.bulkinsert.shuffle.parallelism", "12")
      .option("hoodie.insert.shuffle.parallelism", "12")
      .option("hoodie.upsert.shuffle.parallelism", "12")
      .option("hoodie.bootstrap.parallelism", "12")
      .mode(SaveMode.Append)
      .save("/save_path/hudi/document")

  }

  /**
   * 删除hudi document
   * */
  def deleteHudiDocument(batchDF: Dataset[Model_Document], sparkSession: SparkSession) = {
    import sparkSession.implicits._
    val result = batchDF.mapPartitions(partitions => {
      partitions.map(item => {
        item.data1
      })
    })
    result.write.format("hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
      .option(DataSourceWriteOptions.OPERATION.key(), "delete") //删除数据操作
      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "FPQQLSH") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "TS") //数据更新时间戳的
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "DT") //hudi分区列
      .option("hoodie.table.name", "document") //hudi表名
      .option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://aisino-master.pro.com:2181,aisino-slave01.pro.com:2181,aisino-slave02.pro.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2") //hiveserver2地址
      .option(DataSourceWriteOptions.HIVE_DATABASE.key(), "hudi") //设置hudi与hive同步的数据库
      .option(DataSourceWriteOptions.HIVE_TABLE.key(), "document") //设置hudi与hive同步的表名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "DT") //hive表同步的分区列
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
      //      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[NonPartitionedExtractor].getName) // 没有分区
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") //设置数据集注册并同步到hive
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") //设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieCompactionConfig.INLINE_COMPACT.key(),"true")
      //      .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key(),"true")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),"NUM_OR_TIME")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3")
//      .option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key(),"60")
      .option("hoodie.bulkinsert.shuffle.parallelism", "12")
      .option("hoodie.delete.shuffle.parallelism", "12")
      .option("hoodie.bootstrap.parallelism", "12")
      .mode(SaveMode.Append)
      .save("/save_path/hudi/document")
  }

  def castDateFormat(date: String): String = {
    val spf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val spf2 = new SimpleDateFormat("yyyy-MM-dd")
    val dateFormat = spf1.parse(date)
    spf2.format(dateFormat)
  }

  def castDateTimeStamp(date: String): Long = {
    val spf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    val dateFormat = spf1.parse(date)
    dateFormat.getTime()

  }
}

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐