kafka里存的是mysql的binlog通过canal转存的Message,Message的解析方法保存方法

01-实时ETL开发之转换POJO【思路】

分析从Kafka消费数据(JSON转换MessageBean对象),哪些字段是关系值。

  • Canal采集MySQL数据库数据:12个字段

Canal采集MySQL数据库数据时,业务中关心字段:与OGG采集数据关心字段基本一致

  • 第一个字段:table,表的名称,对哪个表进行操作
  • 第二个字段:type,数据操作类型,三个值【INSERT、UPDATE、DELETE】
  • 第三个字段:data,真正操作数据

1616119230880

​ 经过前面分析可知,Canal采集数据,将JSON字符串封装为MessageBean以后,需要提取其中核心字段数据,进行转换操作。

将提取字段【type】类型和【data】数据,封装到具体表table的实体类【POJO】中,后面方便进行操作。

1616119695869

02实时ETL开发之CanalBean转换POJO

任务:将Canal采集数据,提取数据字段值,将其封装到POJO实体类对象中。

step1、获取表名称,进行判断,确定封装具体POJO实体类

step2、提取数据字段和数据操作类型,封装到POJO对象中

step3、过滤为解析封装完成对象,null值过滤掉

以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中

1616125077871

实现数据解析器:DataParser,将MessageBean转换为AddressBean对象
其中注意事项:Canal采集MySQL数据库表的数据中,插入数据时,可能多条数据一起操作

1616135564297

此时获取数据字段值时,需要封装到多条数据中,使用数据结构:列表List返回

1616135613491

实现方法:toAddressBean,提取数据字段,封装到POJO对象中

	// ================== 客户关系管理CRM系统业务数据解析 ==================
	// TODO: 将CRM系统业务数据:crm_address 表业务数据转换为POJO对象
	/*
		"data": [{
		"id": "10001",
		"name": "葛秋红",
		"tel": null,
		"mobile": "17*******47",
		"detail_addr": "恒大影城南侧小金庄",
		"area_id": "130903",
		"gis_addr": null,
		"cdt": "2020-02-02 18:51:39",
		"udt": "2020-02-02 18:51:39",
		"remark": null
		}]
	*/
	def toAddressBean(bean: MessageBean): List[AddressBean] = {
		// a. 转换MessageBean对象为CanalMessageBean对象
		val canalMessageBean: CanalMessageBean = getCanalMessageBean(bean)
		// b. 获取数据操作类型的值
		val opType: String = getOpType(canalMessageBean.getType)
		// c. 获取数据值,类型为List列表
		val datasValue: util.List[util.Map[String, AnyRef]] = canalMessageBean.getData
		// d. 封装数据值和操作类型到POJO对象中
		// list -> json
		val datasJson = JSON.toJSONString(datasValue, true)
		println(datasJson)
		// json -> pojo
		val list: util.List[AddressBean] = JSON.parseArray(datasJson, classOf[AddressBean])
		// 如果不为空,有元素,设置数据操作类型
		if(! CollectionUtils.isEmpty(list)){
			// 将Java中列表转换为Scala中列表
			import scala.collection.JavaConverters._
			list.asScala.map{bean =>
				bean.setOpType(opType)
				bean
			}.toList
		}else{
           // e. 返回POJO对象
			Nil // 空列表
        }
	}

运行流式计算程序,在MySQL数据库中更新数据和删除数据,查看控制台打印结果。

1616136327618

03实时ETL开发之转换POJO【重构代码】

前面已经完成对OGG采集数据和Canal采集数据,分别进行转换操作,最后转换为具体某个表的POJO对象:

  • 2)、Canal采集数据,总共3张表,仅仅完成一张表

CRM系统(Canal采集),其他所有的表,都是按照上述方式进行转换,封装提取到对应POJO对象中,方便保存数据到外部存储引擎。

需要对代码进行重构:

  • 第一点、Canal采集,首先都是将JSON字符串,转换为MessageBean对象

    CRM系统:3张表

    使用process方法进行转换即可

  • 第二点、依据表名称提取对应数据,封装数据到POJO对象中,最后保存外部

    • 首先、提取数据字段,封装POJO对象
    • 然后,保存POJO对象到外部存储系统,比如Kudu、ES、CK等

    针对具体业务系统,提取方法,每个方法中,针对具体表进行转换和保存操作

    • 物流系统方法:etlLogistics,依据表名称获取对应数据,封装到POJO,再保存
    • CRM系统方法:etlCrm,依据表名称获取对应数据,封装到POJO,再保存

1616136999090

完整代码如下:

package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm.AddressBean
import cn.itcast.logistics.common.beans.logistics.AreasBean
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * Kudu数据管道应用:实现Kudu数据库的实时ETL操作
 */
object KuduStreamApp extends BasicStreamApp {
	
	/**
	 * 数据的处理,仅仅实现JSON -> MessageBean
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
	 * @return 流式数据集StreamingDataFrame
	 */
	override def process(streamDF: DataFrame, category: String): DataFrame = {
		// 导入隐式转换
		import streamDF.sparkSession.implicits._
		
		val etlStreamDF: DataFrame = category match {
			// TODO: 物流系统业务数据,OGG采集数据
			case "logistics" =>
				val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
					// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
					.as[String]
					// 过滤数据
					.filter(msg => null != msg && msg.trim.length > 0)
					// 解析每条数据
					.map{
						msg => JSON.parseObject(msg, classOf[OggMessageBean])
					}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
				
				// 返回转换后的数据
				oggBeanStreamDS.toDF()
			// TODO: CRM系统业务数据
			case "crm" =>
				val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
					// 过滤数据
					.filter(row => !row.isNullAt(0))
					// 解析数据,对分区数据操作
					.mapPartitions { iter =>
						iter.map { row =>
							val jsonValue: String = row.getAs[String]("value")
							// 解析JSON字符串
							JSON.parseObject(jsonValue, classOf[CanalMessageBean])
						}
					}
			
				// 返回转换后的数据
				canalBeanStreamDS.toDF()
				
			// TODO: 其他业务系统数据
			case _ => streamDF
		}
		// 返回ETL转换后的数据
		etlStreamDF
	}
	
	/**
	 * 数据的保存,此时仅仅将数据打印控制台
	 *
	 * @param streamDF          保存数据集DataFrame
	 * @param tableName         保存表的名称
	 * @param isAutoCreateTable 是否自动创建表,默认创建表
	 */
	override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
		streamDF.writeStream
			.queryName(s"query-${tableName}")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "100")
			.option("truncate", "false")
			.start()
	}
	
	/**
	 * 物流Logistics系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlLogistics(streamDF: DataFrame): Unit = {
		
		// 转换DataFrame为Dataset
		val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
		
		/*
			针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
		*/
		val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
			// 1)、依据table字段判断数据:tbl_areas
			.filter(bean => bean.getTable.equals(TableMapping.AREAS))
			// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
			.map(bean => DataParser.toAreaBean(bean))
			// 3)、过滤掉转换为null数据
			.filter(pojo => null != pojo)
		save(areaPojoStreamDS.toDF(), "logistics-console")
		
		
	}
	
	/**
	 * 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlCrm(streamDF: DataFrame): Unit = {
		
		// 将DataFrame转换为Dataset
		val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
		/*
			以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
		 */
		val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
			// step1、获取表名称,进行判断,确定封装具体POJO实体类
			//.filter($"table" === TableMapping.ADDRESS)
			.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
			// step2、提取数据字段和数据操作类型,封装到POJO对象中
			.flatMap(bean => DataParser.toAddressBean(bean))
			// step3、过滤为解析封装完成对象,null值过滤掉
			.filter(pojo => null != pojo)
		save(addressPojoStreamDS.toDF(), "crm-console")
		
	}
	
	/*
			实时Kudu ETL应用程序入口,数据处理逻辑步骤:
				step1. 创建SparkSession实例对象,传递SparkConf
				step2. 从Kafka数据源实时消费数据
				step3. 对获取Json数据进行ETL转换
				step4. 保存转换后数据到外部存储
				step5. 应用启动以后,等待终止结束
			*/
	def main(args: Array[String]): Unit = {
		// step1. 创建SparkSession实例对象,传递SparkConf
		val spark: SparkSession = SparkUtils.createSparkSession(
			SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
		)
		
		// step2. 从Kafka数据源实时消费数据
		// 物流系统Topic数据
		val logisticsDF: DataFrame = load(spark, "logistics")
		val crmDF: DataFrame = load(spark, "crm")
		
		// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
		val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
		val etlCrmDF: DataFrame = process(crmDF, "crm")
		
		// step4. 保存转换后数据到外部存储
		//save(etlLogisticsDF, "logistics-console")
		//save(etlCrmDF, "crm-console")
		
		/*
			TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
		 */
		etlLogistics(etlLogisticsDF)
		etlCrm(etlCrmDF)
		
		// step5. 应用启动以后,等待终止结束
		spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
		spark.streams.awaitAnyTermination()
	}
}

04实时ETL开发之Bean转换POJO【编程测试】

任务:首先将物流系统和CRM系统中其他表的数据过滤出来,提取数据字段值,封装到POJO对象,保存外部存储。

package cn.itcast.logistics.etl.realtime

import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm._
import cn.itcast.logistics.common.beans.logistics._
import cn.itcast.logistics.common.beans.parser._
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * Kudu数据管道应用:实现Kudu数据库的实时ETL操作
 */
object KuduStreamApp extends BasicStreamApp {
	
	/**
	 * 数据的处理,仅仅实现JSON -> MessageBean
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 * @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
	 * @return 流式数据集StreamingDataFrame
	 */
	override def process(streamDF: DataFrame, category: String): DataFrame = {
		// 导入隐式转换
		import streamDF.sparkSession.implicits._
		
		val etlStreamDF: DataFrame = category match {
			// TODO: 物流系统业务数据,OGG采集数据
			case "logistics" =>
				val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
					// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
					.as[String]
					// 过滤数据
					.filter(msg => null != msg && msg.trim.length > 0)
					// 解析每条数据
					.map{
						msg => JSON.parseObject(msg, classOf[OggMessageBean])
					}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
				
				// 返回转换后的数据
				oggBeanStreamDS.toDF()
			// TODO: CRM系统业务数据
			case "crm" =>
				val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
					// 过滤数据
					.filter(row => !row.isNullAt(0))
					// 解析数据,对分区数据操作
					.mapPartitions { iter =>
						iter.map { row =>
							val jsonValue: String = row.getAs[String]("value")
							// 解析JSON字符串
							JSON.parseObject(jsonValue, classOf[CanalMessageBean])
						}
					}
			
				// 返回转换后的数据
				canalBeanStreamDS.toDF()
				
			// TODO: 其他业务系统数据
			case _ => streamDF
		}
		// 返回ETL转换后的数据
		etlStreamDF
	}
	
	/**
	 * 数据的保存,此时仅仅将数据打印控制台
	 *
	 * @param streamDF          保存数据集DataFrame
	 * @param tableName         保存表的名称
	 * @param isAutoCreateTable 是否自动创建表,默认创建表
	 */
	override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
		streamDF.writeStream
			.queryName(s"query-${tableName}")
			.outputMode(OutputMode.Append())
			.format("console")
			.option("numRows", "100")
			.option("truncate", "false")
			.start()
	}
	
	/**
	 * 物流Logistics系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlLogistics(streamDF: DataFrame): Unit = {
		
		// 转换DataFrame为Dataset
		val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
		
		/*
			针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
		*/
		val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
			// 1)、依据table字段判断数据:tbl_areas
			.filter(bean => bean.getTable.equals(TableMapping.AREAS))
			// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
			.map(bean => DataParser.toAreaBean(bean))
			// 3)、过滤掉转换为null数据
			.filter(pojo => null != pojo)
		save(areaPojoStreamDS.toDF(), TableMapping.AREAS)
		
		val warehouseSendVehicleStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_SEND_VEHICLE)
			.map(bean => DataParser.toWarehouseSendVehicle(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseSendVehicleStreamDF, TableMapping.WAREHOUSE_SEND_VEHICLE)
		
		val waybillLineStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAYBILL_LINE)
			.map(bean => DataParser.toWaybillLine(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(waybillLineStreamDF, TableMapping.WAYBILL_LINE)
		
		val chargeStandardStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CHARGE_STANDARD)
			.map(bean => DataParser.toChargeStandard(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(chargeStandardStreamDF, TableMapping.CHARGE_STANDARD)
		
		val codesStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CODES)
			.map(bean => DataParser.toCodes(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(codesStreamDF, TableMapping.CODES)
		
		val collectPackageStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COLLECT_PACKAGE)
			.map(bean => DataParser.toCollectPackage(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(collectPackageStreamDF, TableMapping.COLLECT_PACKAGE)
		
		val companyStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY)
			.map(bean => DataParser.toCompany(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyStreamDF, TableMapping.COMPANY)
		
		val companyDotMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY_DOT_MAP)
			.map(bean => DataParser.toCompanyDotMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyDotMapStreamDF, TableMapping.COMPANY_DOT_MAP)
		
		val companyTransportRouteMaStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
			.map(bean => DataParser.toCompanyTransportRouteMa(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyTransportRouteMaStreamDF, TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
		
		val companyWarehouseMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COMPANY_WAREHOUSE_MAP)
			.map(bean => DataParser.toCompanyWarehouseMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(companyWarehouseMapStreamDF, TableMapping.COMPANY_WAREHOUSE_MAP)
		
		val consumerSenderInfoStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CONSUMER_SENDER_INFO)
			.map(bean => DataParser.toConsumerSenderInfo(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(consumerSenderInfoStreamDF, TableMapping.CONSUMER_SENDER_INFO)
		
		val courierStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.COURIER)
			.map(bean => DataParser.toCourier(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(courierStreamDF, TableMapping.COURIER)
		
		val deliverPackageStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DELIVER_PACKAGE)
			.map(bean => DataParser.toDeliverPackage(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(deliverPackageStreamDF, TableMapping.DELIVER_PACKAGE)
		
		val deliverRegionStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DELIVER_REGION)
			.map(bean => DataParser.toDeliverRegion(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(deliverRegionStreamDF, TableMapping.DELIVER_REGION)
		
		val deliveryRecordStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DELIVERY_RECORD)
			.map(bean => DataParser.toDeliveryRecord(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(deliveryRecordStreamDF, TableMapping.DELIVERY_RECORD)
		
		val departmentStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DEPARTMENT)
			.map(bean => DataParser.toDepartment(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(departmentStreamDF, TableMapping.DEPARTMENT)
		
		val dotStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DOT)
			.map(bean => DataParser.toDot(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(dotStreamDF, TableMapping.DOT)
		
		val dotTransportToolStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DOT_TRANSPORT_TOOL)
			.map(bean => DataParser.toDotTransportTool(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(dotTransportToolStreamDF, TableMapping.DOT_TRANSPORT_TOOL)
		
		val driverStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.DRIVER)
			.map(bean => DataParser.toDriver(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(driverStreamDF, TableMapping.DRIVER)
		
		val empStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EMP)
			.map(bean => DataParser.toEmp(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(empStreamDF, TableMapping.EMP)
		
		val empInfoMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EMP_INFO_MAP)
			.map(bean => DataParser.toEmpInfoMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(empInfoMapStreamDF, TableMapping.EMP_INFO_MAP)
		
		val expressBillStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EXPRESS_BILL)
			.map(bean => DataParser.toExpressBill(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(expressBillStreamDF, TableMapping.EXPRESS_BILL)
		
		val expressPackageStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.EXPRESS_PACKAGE)
			.map(bean => DataParser.toExpressPackage(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(expressPackageStreamDF, TableMapping.EXPRESS_PACKAGE)
		
		val fixedAreaStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.FIXED_AREA)
			.map(bean => DataParser.toFixedArea(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(fixedAreaStreamDF, TableMapping.FIXED_AREA)
		
		val goodsRackStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.GOODS_RACK)
			.map(bean => DataParser.toGoodsRack(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(goodsRackStreamDF, TableMapping.GOODS_RACK)
		
		val jobStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.JOB)
			.map(bean => DataParser.toJob(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(jobStreamDF, TableMapping.JOB)
		
		val outWarehouseStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE)
			.map(bean => DataParser.toOutWarehouse(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(outWarehouseStreamDF, TableMapping.OUT_WAREHOUSE)
		
		val outWarehouseDetailStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE_DETAIL)
			.map(bean => DataParser.toOutWarehouseDetail(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(outWarehouseDetailStreamDF, TableMapping.OUT_WAREHOUSE_DETAIL)
		
		val pkgStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.PKG)
			.map(bean => DataParser.toPkg(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(pkgStreamDF, TableMapping.PKG)
		
		val postalStandardStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.POSTAL_STANDARD)
			.map(bean => DataParser.toPostalStandard(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(postalStandardStreamDF, TableMapping.POSTAL_STANDARD)
		
		val pushWarehouseStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE)
			.map(bean => DataParser.toPushWarehouse(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(pushWarehouseStreamDF, TableMapping.PUSH_WAREHOUSE)
		
		val pushWarehouseDetailStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE_DETAIL)
			.map(bean => DataParser.toPushWarehouseDetail(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(pushWarehouseDetailStreamDF, TableMapping.PUSH_WAREHOUSE_DETAIL)
		
		val routeStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.ROUTE)
			.map(bean => DataParser.toRoute(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(routeStreamDF, TableMapping.ROUTE)
		
		val serviceEvaluationStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.SERVICE_EVALUATION)
			.map(bean => DataParser.toServiceEvaluation(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(serviceEvaluationStreamDF, TableMapping.SERVICE_EVALUATION)
		
		val storeGridStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.STORE_GRID)
			.map(bean => DataParser.toStoreGrid(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(storeGridStreamDF, TableMapping.STORE_GRID)
		
		val transportToolStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.TRANSPORT_TOOL)
			.map(bean => DataParser.toTransportTool(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(transportToolStreamDF, TableMapping.TRANSPORT_TOOL)
		
		val vehicleMonitorStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.VEHICLE_MONITOR)
			.map(bean => DataParser.toVehicleMonitor(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(vehicleMonitorStreamDF, TableMapping.VEHICLE_MONITOR)
		
		val warehouseStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE)
			.map(bean => DataParser.toWarehouse(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseStreamDF, TableMapping.WAREHOUSE)
		
		val warehouseEmpStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_EMP)
			.map(bean => DataParser.toWarehouseEmp(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseEmpStreamDF, TableMapping.WAREHOUSE_EMP)
		
		val warehouseRackMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RACK_MAP)
			.map(bean => DataParser.toWarehouseRackMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseRackMapStreamDF, TableMapping.WAREHOUSE_RACK_MAP)
		
		val warehouseReceiptStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT)
			.map(bean => DataParser.toWarehouseReceipt(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseReceiptStreamDF, TableMapping.WAREHOUSE_RECEIPT)
		
		val warehouseReceiptDetailStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT_DETAIL)
			.map(bean => DataParser.toWarehouseReceiptDetail(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseReceiptDetailStreamDF, TableMapping.WAREHOUSE_RECEIPT_DETAIL)
		
		val warehouseTransportToolStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_TRANSPORT_TOOL)
			.map(bean => DataParser.toWarehouseTransportTool(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseTransportToolStreamDF, TableMapping.WAREHOUSE_TRANSPORT_TOOL)
		
		val warehouseVehicleMapStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAREHOUSE_VEHICLE_MAP)
			.map(bean => DataParser.toWarehouseVehicleMap(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(warehouseVehicleMapStreamDF, TableMapping.WAREHOUSE_VEHICLE_MAP)
		
		val waybillStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAY_BILL)
			.map(bean => DataParser.toWaybill(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(waybillStreamDF, TableMapping.WAY_BILL)
		
		val waybillStateRecordStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WAYBILL_STATE_RECORD)
			.map(bean => DataParser.toWaybillStateRecord(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(waybillStateRecordStreamDF, TableMapping.WAYBILL_STATE_RECORD)
		
		val workTimeStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.WORK_TIME)
			.map(bean => DataParser.toWorkTime(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(workTimeStreamDF, TableMapping.WORK_TIME)
		
		val transportRecordStreamDF = oggBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.TRANSPORT_RECORD)
			.map(bean => DataParser.toTransportRecordBean(bean))
			.filter( pojo => null != pojo)
			.toDF()
		save(transportRecordStreamDF, TableMapping.TRANSPORT_RECORD)
		
	}
	
	/**
	 * 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
	 *      MessageBean -> POJO  -> save
	 *
	 * @param streamDF 流式数据集StreamingDataFrame
	 */
	def etlCrm(streamDF: DataFrame): Unit = {
		
		// 将DataFrame转换为Dataset
		val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
		/*
			以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
		 */
		val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
			// step1、获取表名称,进行判断,确定封装具体POJO实体类
			//.filter($"table" === TableMapping.ADDRESS)
			.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
			// step2、提取数据字段和数据操作类型,封装到POJO对象中
			.flatMap(bean => DataParser.toAddressBean(bean))
			// step3、过滤为解析封装完成对象,null值过滤掉
			.filter(pojo => null != pojo)
		save(addressPojoStreamDS.toDF(), "crm-console")
		
		// Customer 表数据
		val customerStreamDS: Dataset[CustomerBean] = canalBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CUSTOMER)
			.map(bean => DataParser.toCustomer(bean))
			.filter( pojo => null != pojo)
		save(customerStreamDS.toDF(), TableMapping.CUSTOMER)
		// ConsumerAddressMap 表数据
		val consumerAddressMapStreamDS: Dataset[ConsumerAddressMapBean] = canalBeanStreamDS
			.filter(bean => bean.getTable == TableMapping.CONSUMER_ADDRESS_MAP)
			.map(bean => DataParser.toConsumerAddressMap(bean))
			.filter( pojo => null != pojo)
		save(consumerAddressMapStreamDS.toDF(), TableMapping.CONSUMER_ADDRESS_MAP)
	}
	
	/*
			实时Kudu ETL应用程序入口,数据处理逻辑步骤:
				step1. 创建SparkSession实例对象,传递SparkConf
				step2. 从Kafka数据源实时消费数据
				step3. 对获取Json数据进行ETL转换
				step4. 保存转换后数据到外部存储
				step5. 应用启动以后,等待终止结束
			*/
	def main(args: Array[String]): Unit = {
		// step1. 创建SparkSession实例对象,传递SparkConf
		val spark: SparkSession = SparkUtils.createSparkSession(
			SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
		)
		
		// step2. 从Kafka数据源实时消费数据
		// 物流系统Topic数据
		val logisticsDF: DataFrame = load(spark, "logistics")
		val crmDF: DataFrame = load(spark, "crm")
		
		// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
		val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
		val etlCrmDF: DataFrame = process(crmDF, "crm")
		
		// step4. 保存转换后数据到外部存储
		//save(etlLogisticsDF, "logistics-console")
		//save(etlCrmDF, "crm-console")
		
		/*
			TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
		 */
		etlLogistics(etlLogisticsDF)
		etlCrm(etlCrmDF)
		
		// step5. 应用启动以后,等待终止结束
		spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
		spark.streams.awaitAnyTermination()
	}
}

​ 运行流式计算程序,分别对MySQL数据库和Oracle数据库表的数据进行插入、更新或删除操作,勘查看控制台是否有数据。

  • MySQL数据库,任意一张表进行更新和删除

1616138400590

  • Oracle数据库,任意一张表进行更新和删除

1616138388697

05实时ETL开发之保存Kudu表【save方法】

任务:需要将最终ETL获取POJO对象数据,保存到Kudu表中,实现KuduStreamApp对象中:save方法。

	/**
	 * 数据的保存,此时仅仅将数据打印控制台
	 *
	 * @param streamDF          保存数据集DataFrame
	 * @param tableName         保存表的名称
	 * @param isAutoCreateTable 是否自动创建表,默认创建表
	 */
	override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
		// 1. 是否允许创建表,当表不存在时,如果允许,保存数据之前,将表创建
		if(isAutoCreateTable) {
			KuduTools.createKuduTable(tableName, streamDF.schema)
		}
		
		// 2. 保存数据到Kudu表中
		streamDF.writeStream
    			.queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}")
    			.outputMode(OutputMode.Append())
    			.format("kudu")
				.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
				.option("kudu.table", tableName)
				.option("kudu.operation", "upsert")
    			.start()
	}

保存数据之前,需要考虑,表是否存在,如果表不存在,并且允许创建表,则先将表创建

GitHub 加速计划 / ca / canal
28.22 K
7.57 K
下载
alibaba/canal: Canal 是由阿里巴巴开源的分布式数据库同步系统,主要用于实现MySQL数据库的日志解析和实时增量数据订阅与消费,广泛应用于数据库变更消息的捕获、数据迁移、缓存更新等场景。
最近提交(Master分支:3 个月前 )
1e5b8a20 - 2 个月前
ff82fd65 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐