版权声明:本文为博主原创文章,未经博主允许不得转载!!

欢迎访问:https://blog.csdn.net/qq_21439395/article/details/80710180

交流QQ: 824203453

欢迎关注B站,收看更多视频内容:https://space.bilibili.com/383891492 

 

SparkSql 版本为 2.2.0

sparksql解析json格式的数据源

首先,获取操作sparkSql的SparkSession操作实例:

val session = SparkSession.builder()
  .master("local[*]")
  .appName(this.getClass.getSimpleName)
  .getOrCreate()

// 导入隐式转换和functions
import session.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

 

 

1.1.  根据json数据,创建Dataset

指定嵌套json格式的数据:

val opds = session.createDataset(
  // 三引号中,编写json字符串
  List("""{"name":"xx","address":{"city":"bj"}}""")
)
val otherPeople = session.read.json(opds)
otherPeople.printSchema()

schema如下:

1.2.  读取普通json文件

json数据格式为:

val json1: DataFrame =session.read.json("jsonlog1.json")

json1.printSchema()

获取schema为:

1.3.  读取嵌套json文件

数据格式为:

val json: DataFrame = session.read.json("jsonlog2.json")
json.printSchema()

schema信息如下:

操作嵌套json的方式:

//DSL 语法的查询
json.select("address.province").show()

// 使用sql语法查询
json.createTempView("v_tmp")
session.sql("select address.city from  v_tmp").show()

 

1.4.  操作嵌套json数组-explode函数

数据格式为:

 

读取json数组的数据:

val json3 = session.read.json("jsonlog3array.json")
json3.printSchema()
json3.show()

schema信息为:

示例数据为:


这种结果的展示数据,查询非常不方便。

 

解决方案:

利用explode函数,把数组数据进行展开。

// 导入sparksql中的函数
import org.apache.spark.sql.functions._
// 利用explode函数  把json数组进行展开, 数组中的每一条数据,都是一条记录
val explodeDF = json3.select($"name", explode($"myScore")).toDF("name", "score")

explodeDF.printSchema()

// 再次进行查询  类似于普通的数据库表  默认schema: score1, 可以通过as 指定schema名称
val json3Res: DataFrame = explodeDF.select($"name", $"score.score1",
  $"score.score2" as "score222")
// 创建临时视图
json3Res.createTempView("v_jsonArray")
// 写sql,分别求平均值
session.sql("select name,avg(score1),avg(score222) from v_jsonArray group by name")
  .show() 

explodeDFschema信息为:

最终,查询结果为:

 

1.5.  get_json_object() 方法

get_json_object() 方法 从一个json 字符串中根据指定的json路径抽取一个json 对象

 

根据指定数据,获取一个DataFrame

val json4 = Seq(
  (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cn": "United States"}"""))
  .toDF("id", "json")
json4.printSchema()

schema信息为:

 

使用get_json_object 从json字符串中提取列:

// 利用get_json_object 从 json字符串中,提取列
val jsDF = json4.select($"id",
  get_json_object($"json", "$.device_type").alias("device_type"),
  get_json_object($"json", "$.ip").alias("ip"),
  get_json_object($"json", "$.cn").alias("cn"))
jsDF.printSchema()

schema信息为:

 

更多复杂操作:可参考:https://cloud.tencent.com/developer/article/1032532

 

版权声明:本文为博主原创文章,未经博主允许不得转载!!

欢迎访问:https://blog.csdn.net/qq_21439395/article/details/80710180

交流QQ: 824203453

欢迎关注B站,收看更多视频内容:https://space.bilibili.com/383891492 

GitHub 加速计划 / js / json
18
5
下载
适用于现代 C++ 的 JSON。
最近提交(Master分支:2 个月前 )
960b763e 5 个月前
8c391e04 8 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐