kafka里存的是mysql的binlog通过canal转存的Message,各位大佬有这个Message的解析方法
kafka里存的是mysql的binlog通过canal转存的Message,Message的解析方法保存方法
01-实时ETL开发之转换POJO【思路】
分析从Kafka消费数据(JSON转换MessageBean对象),哪些字段是关系值。
- Canal采集MySQL数据库数据:12个字段
- Canal采集数据,核心数据字段
Canal采集MySQL数据库数据时,业务中关心字段:与OGG采集数据关心字段基本一致
- 第一个字段:
table
,表的名称,对哪个表进行操作- 第二个字段:
type
,数据操作类型,三个值【INSERT、UPDATE、DELETE】- 第三个字段:
data
,真正操作数据
经过前面分析可知,Canal采集数据,将JSON字符串封装为MessageBean以后,需要提取其中核心字段数据,进行转换操作。
02实时ETL开发之CanalBean转换POJO
任务:将Canal采集数据,提取数据字段值,将其封装到POJO实体类对象中。
step1、获取表名称,进行判断,确定封装具体POJO实体类
step2、提取数据字段和数据操作类型,封装到POJO对象中
step3、过滤为解析封装完成对象,null值过滤掉
以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
实现数据解析器:
DataParser
,将MessageBean
转换为AddressBean对象
其中注意事项:Canal采集MySQL数据库表的数据中,插入数据时,可能多条数据一起操作
此时获取数据字段值时,需要封装到多条数据中,使用数据结构:列表List返回
实现方法:toAddressBean,提取数据字段,封装到POJO对象中
// ================== 客户关系管理CRM系统业务数据解析 ==================
// TODO: 将CRM系统业务数据:crm_address 表业务数据转换为POJO对象
/*
"data": [{
"id": "10001",
"name": "葛秋红",
"tel": null,
"mobile": "17*******47",
"detail_addr": "恒大影城南侧小金庄",
"area_id": "130903",
"gis_addr": null,
"cdt": "2020-02-02 18:51:39",
"udt": "2020-02-02 18:51:39",
"remark": null
}]
*/
def toAddressBean(bean: MessageBean): List[AddressBean] = {
// a. 转换MessageBean对象为CanalMessageBean对象
val canalMessageBean: CanalMessageBean = getCanalMessageBean(bean)
// b. 获取数据操作类型的值
val opType: String = getOpType(canalMessageBean.getType)
// c. 获取数据值,类型为List列表
val datasValue: util.List[util.Map[String, AnyRef]] = canalMessageBean.getData
// d. 封装数据值和操作类型到POJO对象中
// list -> json
val datasJson = JSON.toJSONString(datasValue, true)
println(datasJson)
// json -> pojo
val list: util.List[AddressBean] = JSON.parseArray(datasJson, classOf[AddressBean])
// 如果不为空,有元素,设置数据操作类型
if(! CollectionUtils.isEmpty(list)){
// 将Java中列表转换为Scala中列表
import scala.collection.JavaConverters._
list.asScala.map{bean =>
bean.setOpType(opType)
bean
}.toList
}else{
// e. 返回POJO对象
Nil // 空列表
}
}
运行流式计算程序,在MySQL数据库中更新数据和删除数据,查看控制台打印结果。
03实时ETL开发之转换POJO【重构代码】
前面已经完成对OGG采集数据和Canal采集数据,分别进行转换操作,最后转换为具体某个表的POJO对象:
- 2)、Canal采集数据,总共3张表,仅仅完成一张表
CRM系统(Canal采集),其他所有的表,都是按照上述方式进行转换,封装提取到对应POJO对象中,方便保存数据到外部存储引擎。
需要对代码进行重构:
第一点、Canal采集,首先都是将JSON字符串,转换为MessageBean对象
使用
process
方法进行转换即可第二点、依据表名称提取对应数据,封装数据到POJO对象中,最后保存外部
- 首先、提取数据字段,封装POJO对象
- 然后,保存POJO对象到外部存储系统,比如Kudu、ES、CK等
针对具体业务系统,提取方法,每个方法中,针对具体表进行转换和保存操作
- 物流系统方法:
etlLogistics
,依据表名称获取对应数据,封装到POJO,再保存- CRM系统方法:
etlCrm
,依据表名称获取对应数据,封装到POJO,再保存
完整代码如下:
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm.AddressBean
import cn.itcast.logistics.common.beans.logistics.AreasBean
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
/**
* Kudu数据管道应用:实现Kudu数据库的实时ETL操作
*/
object KuduStreamApp extends BasicStreamApp {
/**
* 数据的处理,仅仅实现JSON -> MessageBean
*
* @param streamDF 流式数据集StreamingDataFrame
* @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
* @return 流式数据集StreamingDataFrame
*/
override def process(streamDF: DataFrame, category: String): DataFrame = {
// 导入隐式转换
import streamDF.sparkSession.implicits._
val etlStreamDF: DataFrame = category match {
// TODO: 物流系统业务数据,OGG采集数据
case "logistics" =>
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
.as[String]
// 过滤数据
.filter(msg => null != msg && msg.trim.length > 0)
// 解析每条数据
.map{
msg => JSON.parseObject(msg, classOf[OggMessageBean])
}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
// 返回转换后的数据
oggBeanStreamDS.toDF()
// TODO: CRM系统业务数据
case "crm" =>
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
// 过滤数据
.filter(row => !row.isNullAt(0))
// 解析数据,对分区数据操作
.mapPartitions { iter =>
iter.map { row =>
val jsonValue: String = row.getAs[String]("value")
// 解析JSON字符串
JSON.parseObject(jsonValue, classOf[CanalMessageBean])
}
}
// 返回转换后的数据
canalBeanStreamDS.toDF()
// TODO: 其他业务系统数据
case _ => streamDF
}
// 返回ETL转换后的数据
etlStreamDF
}
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
streamDF.writeStream
.queryName(s"query-${tableName}")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.start()
}
/**
* 物流Logistics系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlLogistics(streamDF: DataFrame): Unit = {
// 转换DataFrame为Dataset
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
/*
针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
*/
val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
// 1)、依据table字段判断数据:tbl_areas
.filter(bean => bean.getTable.equals(TableMapping.AREAS))
// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
.map(bean => DataParser.toAreaBean(bean))
// 3)、过滤掉转换为null数据
.filter(pojo => null != pojo)
save(areaPojoStreamDS.toDF(), "logistics-console")
}
/**
* 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlCrm(streamDF: DataFrame): Unit = {
// 将DataFrame转换为Dataset
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
/*
以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
*/
val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
// step1、获取表名称,进行判断,确定封装具体POJO实体类
//.filter($"table" === TableMapping.ADDRESS)
.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
// step2、提取数据字段和数据操作类型,封装到POJO对象中
.flatMap(bean => DataParser.toAddressBean(bean))
// step3、过滤为解析封装完成对象,null值过滤掉
.filter(pojo => null != pojo)
save(addressPojoStreamDS.toDF(), "crm-console")
}
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/
def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(crmDF, "crm")
// step4. 保存转换后数据到外部存储
//save(etlLogisticsDF, "logistics-console")
//save(etlCrmDF, "crm-console")
/*
TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
*/
etlLogistics(etlLogisticsDF)
etlCrm(etlCrmDF)
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}
04实时ETL开发之Bean转换POJO【编程测试】
任务:首先将物流系统和CRM系统中其他表的数据过滤出来,提取数据字段值,封装到POJO对象,保存外部存储。
package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.BeanImplicits._
import cn.itcast.logistics.common.beans.crm._
import cn.itcast.logistics.common.beans.logistics._
import cn.itcast.logistics.common.beans.parser._
import cn.itcast.logistics.common.{SparkUtils, TableMapping}
import cn.itcast.logistics.etl.parse.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
/**
* Kudu数据管道应用:实现Kudu数据库的实时ETL操作
*/
object KuduStreamApp extends BasicStreamApp {
/**
* 数据的处理,仅仅实现JSON -> MessageBean
*
* @param streamDF 流式数据集StreamingDataFrame
* @param category 业务数据类型,比如物流系统业务数据,CRM系统业务数据等
* @return 流式数据集StreamingDataFrame
*/
override def process(streamDF: DataFrame, category: String): DataFrame = {
// 导入隐式转换
import streamDF.sparkSession.implicits._
val etlStreamDF: DataFrame = category match {
// TODO: 物流系统业务数据,OGG采集数据
case "logistics" =>
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
.as[String]
// 过滤数据
.filter(msg => null != msg && msg.trim.length > 0)
// 解析每条数据
.map{
msg => JSON.parseObject(msg, classOf[OggMessageBean])
}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
// 返回转换后的数据
oggBeanStreamDS.toDF()
// TODO: CRM系统业务数据
case "crm" =>
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
// 过滤数据
.filter(row => !row.isNullAt(0))
// 解析数据,对分区数据操作
.mapPartitions { iter =>
iter.map { row =>
val jsonValue: String = row.getAs[String]("value")
// 解析JSON字符串
JSON.parseObject(jsonValue, classOf[CanalMessageBean])
}
}
// 返回转换后的数据
canalBeanStreamDS.toDF()
// TODO: 其他业务系统数据
case _ => streamDF
}
// 返回ETL转换后的数据
etlStreamDF
}
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean): Unit = {
streamDF.writeStream
.queryName(s"query-${tableName}")
.outputMode(OutputMode.Append())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.start()
}
/**
* 物流Logistics系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlLogistics(streamDF: DataFrame): Unit = {
// 转换DataFrame为Dataset
val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF.as[OggMessageBean]
/*
针对物流系统来说,以【tbl_areas】为例,进行数据转换(提取字段值,封装POJO对象)
*/
val areaPojoStreamDS: Dataset[AreasBean] = oggBeanStreamDS
// 1)、依据table字段判断数据:tbl_areas
.filter(bean => bean.getTable.equals(TableMapping.AREAS))
// 2)、获取数据字段值:getValue方法,将其转换为POJO对象
.map(bean => DataParser.toAreaBean(bean))
// 3)、过滤掉转换为null数据
.filter(pojo => null != pojo)
save(areaPojoStreamDS.toDF(), TableMapping.AREAS)
val warehouseSendVehicleStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_SEND_VEHICLE)
.map(bean => DataParser.toWarehouseSendVehicle(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseSendVehicleStreamDF, TableMapping.WAREHOUSE_SEND_VEHICLE)
val waybillLineStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAYBILL_LINE)
.map(bean => DataParser.toWaybillLine(bean))
.filter( pojo => null != pojo)
.toDF()
save(waybillLineStreamDF, TableMapping.WAYBILL_LINE)
val chargeStandardStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CHARGE_STANDARD)
.map(bean => DataParser.toChargeStandard(bean))
.filter( pojo => null != pojo)
.toDF()
save(chargeStandardStreamDF, TableMapping.CHARGE_STANDARD)
val codesStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CODES)
.map(bean => DataParser.toCodes(bean))
.filter( pojo => null != pojo)
.toDF()
save(codesStreamDF, TableMapping.CODES)
val collectPackageStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COLLECT_PACKAGE)
.map(bean => DataParser.toCollectPackage(bean))
.filter( pojo => null != pojo)
.toDF()
save(collectPackageStreamDF, TableMapping.COLLECT_PACKAGE)
val companyStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY)
.map(bean => DataParser.toCompany(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyStreamDF, TableMapping.COMPANY)
val companyDotMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY_DOT_MAP)
.map(bean => DataParser.toCompanyDotMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyDotMapStreamDF, TableMapping.COMPANY_DOT_MAP)
val companyTransportRouteMaStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
.map(bean => DataParser.toCompanyTransportRouteMa(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyTransportRouteMaStreamDF, TableMapping.COMPANY_TRANSPORT_ROUTE_MA)
val companyWarehouseMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COMPANY_WAREHOUSE_MAP)
.map(bean => DataParser.toCompanyWarehouseMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(companyWarehouseMapStreamDF, TableMapping.COMPANY_WAREHOUSE_MAP)
val consumerSenderInfoStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CONSUMER_SENDER_INFO)
.map(bean => DataParser.toConsumerSenderInfo(bean))
.filter( pojo => null != pojo)
.toDF()
save(consumerSenderInfoStreamDF, TableMapping.CONSUMER_SENDER_INFO)
val courierStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.COURIER)
.map(bean => DataParser.toCourier(bean))
.filter( pojo => null != pojo)
.toDF()
save(courierStreamDF, TableMapping.COURIER)
val deliverPackageStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DELIVER_PACKAGE)
.map(bean => DataParser.toDeliverPackage(bean))
.filter( pojo => null != pojo)
.toDF()
save(deliverPackageStreamDF, TableMapping.DELIVER_PACKAGE)
val deliverRegionStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DELIVER_REGION)
.map(bean => DataParser.toDeliverRegion(bean))
.filter( pojo => null != pojo)
.toDF()
save(deliverRegionStreamDF, TableMapping.DELIVER_REGION)
val deliveryRecordStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DELIVERY_RECORD)
.map(bean => DataParser.toDeliveryRecord(bean))
.filter( pojo => null != pojo)
.toDF()
save(deliveryRecordStreamDF, TableMapping.DELIVERY_RECORD)
val departmentStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DEPARTMENT)
.map(bean => DataParser.toDepartment(bean))
.filter( pojo => null != pojo)
.toDF()
save(departmentStreamDF, TableMapping.DEPARTMENT)
val dotStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DOT)
.map(bean => DataParser.toDot(bean))
.filter( pojo => null != pojo)
.toDF()
save(dotStreamDF, TableMapping.DOT)
val dotTransportToolStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DOT_TRANSPORT_TOOL)
.map(bean => DataParser.toDotTransportTool(bean))
.filter( pojo => null != pojo)
.toDF()
save(dotTransportToolStreamDF, TableMapping.DOT_TRANSPORT_TOOL)
val driverStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.DRIVER)
.map(bean => DataParser.toDriver(bean))
.filter( pojo => null != pojo)
.toDF()
save(driverStreamDF, TableMapping.DRIVER)
val empStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EMP)
.map(bean => DataParser.toEmp(bean))
.filter( pojo => null != pojo)
.toDF()
save(empStreamDF, TableMapping.EMP)
val empInfoMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EMP_INFO_MAP)
.map(bean => DataParser.toEmpInfoMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(empInfoMapStreamDF, TableMapping.EMP_INFO_MAP)
val expressBillStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EXPRESS_BILL)
.map(bean => DataParser.toExpressBill(bean))
.filter( pojo => null != pojo)
.toDF()
save(expressBillStreamDF, TableMapping.EXPRESS_BILL)
val expressPackageStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.EXPRESS_PACKAGE)
.map(bean => DataParser.toExpressPackage(bean))
.filter( pojo => null != pojo)
.toDF()
save(expressPackageStreamDF, TableMapping.EXPRESS_PACKAGE)
val fixedAreaStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.FIXED_AREA)
.map(bean => DataParser.toFixedArea(bean))
.filter( pojo => null != pojo)
.toDF()
save(fixedAreaStreamDF, TableMapping.FIXED_AREA)
val goodsRackStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.GOODS_RACK)
.map(bean => DataParser.toGoodsRack(bean))
.filter( pojo => null != pojo)
.toDF()
save(goodsRackStreamDF, TableMapping.GOODS_RACK)
val jobStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.JOB)
.map(bean => DataParser.toJob(bean))
.filter( pojo => null != pojo)
.toDF()
save(jobStreamDF, TableMapping.JOB)
val outWarehouseStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE)
.map(bean => DataParser.toOutWarehouse(bean))
.filter( pojo => null != pojo)
.toDF()
save(outWarehouseStreamDF, TableMapping.OUT_WAREHOUSE)
val outWarehouseDetailStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.OUT_WAREHOUSE_DETAIL)
.map(bean => DataParser.toOutWarehouseDetail(bean))
.filter( pojo => null != pojo)
.toDF()
save(outWarehouseDetailStreamDF, TableMapping.OUT_WAREHOUSE_DETAIL)
val pkgStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.PKG)
.map(bean => DataParser.toPkg(bean))
.filter( pojo => null != pojo)
.toDF()
save(pkgStreamDF, TableMapping.PKG)
val postalStandardStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.POSTAL_STANDARD)
.map(bean => DataParser.toPostalStandard(bean))
.filter( pojo => null != pojo)
.toDF()
save(postalStandardStreamDF, TableMapping.POSTAL_STANDARD)
val pushWarehouseStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE)
.map(bean => DataParser.toPushWarehouse(bean))
.filter( pojo => null != pojo)
.toDF()
save(pushWarehouseStreamDF, TableMapping.PUSH_WAREHOUSE)
val pushWarehouseDetailStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.PUSH_WAREHOUSE_DETAIL)
.map(bean => DataParser.toPushWarehouseDetail(bean))
.filter( pojo => null != pojo)
.toDF()
save(pushWarehouseDetailStreamDF, TableMapping.PUSH_WAREHOUSE_DETAIL)
val routeStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.ROUTE)
.map(bean => DataParser.toRoute(bean))
.filter( pojo => null != pojo)
.toDF()
save(routeStreamDF, TableMapping.ROUTE)
val serviceEvaluationStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.SERVICE_EVALUATION)
.map(bean => DataParser.toServiceEvaluation(bean))
.filter( pojo => null != pojo)
.toDF()
save(serviceEvaluationStreamDF, TableMapping.SERVICE_EVALUATION)
val storeGridStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.STORE_GRID)
.map(bean => DataParser.toStoreGrid(bean))
.filter( pojo => null != pojo)
.toDF()
save(storeGridStreamDF, TableMapping.STORE_GRID)
val transportToolStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.TRANSPORT_TOOL)
.map(bean => DataParser.toTransportTool(bean))
.filter( pojo => null != pojo)
.toDF()
save(transportToolStreamDF, TableMapping.TRANSPORT_TOOL)
val vehicleMonitorStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.VEHICLE_MONITOR)
.map(bean => DataParser.toVehicleMonitor(bean))
.filter( pojo => null != pojo)
.toDF()
save(vehicleMonitorStreamDF, TableMapping.VEHICLE_MONITOR)
val warehouseStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE)
.map(bean => DataParser.toWarehouse(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseStreamDF, TableMapping.WAREHOUSE)
val warehouseEmpStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_EMP)
.map(bean => DataParser.toWarehouseEmp(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseEmpStreamDF, TableMapping.WAREHOUSE_EMP)
val warehouseRackMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RACK_MAP)
.map(bean => DataParser.toWarehouseRackMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseRackMapStreamDF, TableMapping.WAREHOUSE_RACK_MAP)
val warehouseReceiptStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT)
.map(bean => DataParser.toWarehouseReceipt(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseReceiptStreamDF, TableMapping.WAREHOUSE_RECEIPT)
val warehouseReceiptDetailStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_RECEIPT_DETAIL)
.map(bean => DataParser.toWarehouseReceiptDetail(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseReceiptDetailStreamDF, TableMapping.WAREHOUSE_RECEIPT_DETAIL)
val warehouseTransportToolStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_TRANSPORT_TOOL)
.map(bean => DataParser.toWarehouseTransportTool(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseTransportToolStreamDF, TableMapping.WAREHOUSE_TRANSPORT_TOOL)
val warehouseVehicleMapStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAREHOUSE_VEHICLE_MAP)
.map(bean => DataParser.toWarehouseVehicleMap(bean))
.filter( pojo => null != pojo)
.toDF()
save(warehouseVehicleMapStreamDF, TableMapping.WAREHOUSE_VEHICLE_MAP)
val waybillStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAY_BILL)
.map(bean => DataParser.toWaybill(bean))
.filter( pojo => null != pojo)
.toDF()
save(waybillStreamDF, TableMapping.WAY_BILL)
val waybillStateRecordStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WAYBILL_STATE_RECORD)
.map(bean => DataParser.toWaybillStateRecord(bean))
.filter( pojo => null != pojo)
.toDF()
save(waybillStateRecordStreamDF, TableMapping.WAYBILL_STATE_RECORD)
val workTimeStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.WORK_TIME)
.map(bean => DataParser.toWorkTime(bean))
.filter( pojo => null != pojo)
.toDF()
save(workTimeStreamDF, TableMapping.WORK_TIME)
val transportRecordStreamDF = oggBeanStreamDS
.filter(bean => bean.getTable == TableMapping.TRANSPORT_RECORD)
.map(bean => DataParser.toTransportRecordBean(bean))
.filter( pojo => null != pojo)
.toDF()
save(transportRecordStreamDF, TableMapping.TRANSPORT_RECORD)
}
/**
* 客户管理管理CRM系统业务数据ETL转换处理及保存外部存储
* MessageBean -> POJO -> save
*
* @param streamDF 流式数据集StreamingDataFrame
*/
def etlCrm(streamDF: DataFrame): Unit = {
// 将DataFrame转换为Dataset
val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF.as[CanalMessageBean]
/*
以CRM系统中表:crm_address为例,从CanalMessageBean中提取字段值,将其封装到AddressBean对象中
*/
val addressPojoStreamDS: Dataset[AddressBean] = canalBeanStreamDS
// step1、获取表名称,进行判断,确定封装具体POJO实体类
//.filter($"table" === TableMapping.ADDRESS)
.filter(bean => bean.getTable.equals(TableMapping.ADDRESS))
// step2、提取数据字段和数据操作类型,封装到POJO对象中
.flatMap(bean => DataParser.toAddressBean(bean))
// step3、过滤为解析封装完成对象,null值过滤掉
.filter(pojo => null != pojo)
save(addressPojoStreamDS.toDF(), "crm-console")
// Customer 表数据
val customerStreamDS: Dataset[CustomerBean] = canalBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CUSTOMER)
.map(bean => DataParser.toCustomer(bean))
.filter( pojo => null != pojo)
save(customerStreamDS.toDF(), TableMapping.CUSTOMER)
// ConsumerAddressMap 表数据
val consumerAddressMapStreamDS: Dataset[ConsumerAddressMapBean] = canalBeanStreamDS
.filter(bean => bean.getTable == TableMapping.CONSUMER_ADDRESS_MAP)
.map(bean => DataParser.toConsumerAddressMap(bean))
.filter( pojo => null != pojo)
save(consumerAddressMapStreamDS.toDF(), TableMapping.CONSUMER_ADDRESS_MAP)
}
/*
实时Kudu ETL应用程序入口,数据处理逻辑步骤:
step1. 创建SparkSession实例对象,传递SparkConf
step2. 从Kafka数据源实时消费数据
step3. 对获取Json数据进行ETL转换
step4. 保存转换后数据到外部存储
step5. 应用启动以后,等待终止结束
*/
def main(args: Array[String]): Unit = {
// step1. 创建SparkSession实例对象,传递SparkConf
val spark: SparkSession = SparkUtils.createSparkSession(
SparkUtils.autoSettingEnv(SparkUtils.sparkConf()), this.getClass
)
// step2. 从Kafka数据源实时消费数据
// 物流系统Topic数据
val logisticsDF: DataFrame = load(spark, "logistics")
val crmDF: DataFrame = load(spark, "crm")
// step3. 对获取Json数据进行ETL转换:JSON -> MessageBean对象
val etlLogisticsDF: DataFrame = process(logisticsDF, "logistics")
val etlCrmDF: DataFrame = process(crmDF, "crm")
// step4. 保存转换后数据到外部存储
//save(etlLogisticsDF, "logistics-console")
//save(etlCrmDF, "crm-console")
/*
TODO: 针对每个业务系统,提供1个方法,专门针对该系统中表数据进行分组POJO和保存外部存储系统
*/
etlLogistics(etlLogisticsDF)
etlCrm(etlCrmDF)
// step5. 应用启动以后,等待终止结束
spark.streams.active.foreach(query => println(s"Query Starting: ${query.name} ......"))
spark.streams.awaitAnyTermination()
}
}
运行流式计算程序,分别对MySQL数据库和Oracle数据库表的数据进行插入、更新或删除操作,勘查看控制台是否有数据。
- MySQL数据库,任意一张表进行更新和删除
- Oracle数据库,任意一张表进行更新和删除
05实时ETL开发之保存Kudu表【save方法】
任务:需要将最终ETL获取POJO对象数据,保存到Kudu表中,实现
KuduStreamApp
对象中:save
方法。
/**
* 数据的保存,此时仅仅将数据打印控制台
*
* @param streamDF 保存数据集DataFrame
* @param tableName 保存表的名称
* @param isAutoCreateTable 是否自动创建表,默认创建表
*/
override def save(streamDF: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
// 1. 是否允许创建表,当表不存在时,如果允许,保存数据之前,将表创建
if(isAutoCreateTable) {
KuduTools.createKuduTable(tableName, streamDF.schema)
}
// 2. 保存数据到Kudu表中
streamDF.writeStream
.queryName(s"query-${Configuration.SPARK_KAFKA_FORMAT}-${tableName}")
.outputMode(OutputMode.Append())
.format("kudu")
.option("kudu.master", Configuration.KUDU_RPC_ADDRESS)
.option("kudu.table", tableName)
.option("kudu.operation", "upsert")
.start()
}
保存数据之前,需要考虑,表是否存在,如果表不存在,并且允许创建表,则先将表创建
更多推荐
所有评论(0)