SparkSql中DataFrame与json、csv、mysql、hive数据的互操作

1.RDD转换成DataFrame

1.1 RDD to DataFrame

RDD转成DataFrame有2种方式,一种是通过隐式转换,一种是通过SparkSession来进行创建。

1.1.1 RDD隐式转换成DataFrame(推荐)

object RDDToDF {
  def main(args: Array[String]): Unit = {
    // 创建一个SparkSession
    val session = SparkSession.builder().master("local[2]").appName("RDDToDF").getOrCreate()
    // 通过session获取SparkContext
    val sc = session.sparkContext

    // 将文件中的数据转换成RDD
    val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Person(x(0), x(1).toLong))
    rdd.foreach(println)

    // 导入隐式转换(****重要****)
    import session.implicits._

    // 将rdd转换成DataFrame
    val df = rdd.toDF()
    df.show()

    // 打印 DataFrame 的 Schema约束信息
    df.printSchema()
  }
}

// 创建一个 case 类,Person封装RDD中的数据,需要给DataFrame形成一个Schema约束
case class Person(name: String, age: Long)

输出结果:

image

image

1.1.2 通过SaprkSession创建的方式

需要通过SchemaType来进行约束

object RDDToDF2 {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local[2]").appName("RDDToDF2").getOrCreate()
    val sc = session.sparkContext

    // 这里不使用case class类,使用spark 提供的 Row类
    val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Row(x(0), x(1).trim))

    val schemaString = "name age"
    val splits = schemaString.split(" ")
    // 约束字段
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))

    // 约束类型
    val schema = StructType(fields)

    // 将RDD转换成DataFrame
    val df = session.createDataFrame(rdd, schema)
    // 显示DataFrame的数据
    df.show()

    // 打印schema
    df.printSchema()
  }
}

输出结果:

image

1.2 DataFrame几个api介绍

  • show():将DataFrame的数据直接输出

  • printSchema():打印schema约束信息

  • select():选择字段

  • filter():对DataFrame字段进行过滤

  • groupby():对字段进行分组

  • count():求数量

DataFrame还有一些其他的api,作用和sql语句类似

eg:

df.select("name").show()

结果:

image

# 这里$"age"需要导入隐式转换才行
df.select("age").filter($"age" >=28).show()

结果:

image

df.groupBy("name").count().show()

结果:

image


2.json文件转DataFrame

2.1 json转DataFrame

// 创建session对象
val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate()
// 通过session 读取 json数据,转换成DataFrame对象
val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json")
// 展示数据
df.show()

结果:

image

2.2 DataFrame创建temp view,使用sql语句查询

object JsonToDF {
  def main(args: Array[String]): Unit = {
    // 创建session对象
    val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate()
    // 通过session 读取 json数据,转换成DataFrame对象
    val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json")

    // 将DataFrame的数据,创建成一个临时的"表",表名:person,这个只能在当前的session域中有效
    df.createOrReplaceTempView("person")
    // 通过sql语句进行查询
    session.sql("select * from person").show()

    // 创建一个全局的temp“表”,这个可以跨session会话有效
    df.createGlobalTempView("person")
    session.newSession().sql("select * from global_temp.person where age>=28").show()

    session.stop()
  }
}

结果:

image

image


3.csv文件转DataFrame

3.1 csv文件转DataFrame

val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate()

// 通过指定格式,load数据,option:可以给文件格式添加参数,如是否有header。如果没有header,生成的DataFrame会默认生成如:_c1,_c2
//val df = session.read.format("csv").option("header", "true").load("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv")

// df封装的csv方法
val df = session.read.option("header", "true").csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv")

df.show()

image

3.2 保存DataFrame数据到文件

object CsvToDF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate()
    val df = session.read.csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.csv")

    // 导入隐式转换
    import session.implicits._
    // 将df的数据保存到本地文件,这里因为csv没有头信息,没有对应的schema
    df.filter($"_c1" > 28).write.csv("d:/csvfile")
  }
}

csv文件的储存和内容:

image

image


4.mysql数据与DataFrame的互操作

4.1 DataFrame将数据写入Mysql

object DFToMysql {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("DFToMysql").master("local[2]").getOrCreate()
    val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json")
    df.show()

    // df写入mysql的第一种方式
    val p = new Properties()
    p.put("user", "root")
    p.put("password", "root")
    // 将df数据写入mysql,这里mode的模式有4种:append:追加,overwrite:覆盖,error , ignore
    // overwrite:如果表不存在,会自动创建,覆盖会将之间定义好字段类型,进行修改
    df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p)
    //df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p)


    // df写入mysql的第二种方式
    /*df.write.mode("append")
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/db_people")
      .option("dbtable", "tb_people")
      .option("user", "root")
      .option("password", "root")
      .save()*/
  }
}

结果:

image

4.2 Mysql数据导入为DataFrame

object MySqlToDF {

  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local[2]").appName("MySqlToDF").getOrCreate()

    // jdbc加载的第一种方式
    val p = new Properties()
    p.put("user", "root")
    p.put("password", "root")
    // 通过加载jdbc,获取DataFrame数据
    val df = session.read.format("jdbc").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", properties = p)
    df.show()

    // 加载jdbc的第2中方式
    /*val df2 = session.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/db_people")
      .option("dbtable", "tb_people")
      .option("user", "root")
      .option("password", "root")
      .load()

    df2.show()*/
  }
}

image

5 Spark使用HQL查询hive中的数据

前提:需要 安装Hive

5.1 将hive-site.xml放入resource目录

image

hive-site.xml内容

<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://server01:3306/hive?createDatabaseIfNotExist=true</value>
        <description>JDBC connect string for a JDBC metastore</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>Driver class name for a JDBC metastore</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
        <description>username to use against metastore database</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>root</value>
        <description>password to use against metastore database</description>
    </property>
</configuration>

5.2 Spark操作Hive数据

Spark新的api实现,使用SparkSession实现

object HiveToDF {
  def main(args: Array[String]): Unit = {
    // 新版本
    val session = SparkSession.builder().appName("hivetosql").master("local").enableHiveSupport().getOrCreate()
    session.sqlContext.sql("use default")
    session.sqlContext.sql("select * from student").show()
  }
}

使用HiveContext实现

object HiveToDF {
  def main(args: Array[String]): Unit = {
    // 老版本
    val conf = new SparkConf().setAppName("HiveToDF").setMaster("local")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)
    hiveContext.sql("use default")
    hiveContext.sql("select * from student").show()
  }
}

image

补充:spark-shell直接操作Hive数据
  1. 将hive-site.xml文件放入$SPARK_HOME/conf目录下

  2. 启动spark-shell时指定mysql连接驱动位置

spark-shell \
--master spark://server01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar

image

  1. 操作hive数据
# spark是SparkSession对象
spark.sqlContext.sql("select * from student").show()

image

或者

import org.apache.spark.sql.hive.HiveContext

val hiveContext = new HiveContext(sc)

hiveContext.sql("select * from student").show()

image

补充2:Spark-sql直接操作Hive
  1. 将hive-site.xml文件放入$SPARK_HOME/conf目录下

  2. 启动spark-shell时指定mysql连接驱动位置

spark-sql \
--master spark://server01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar

image

3.操作hive数据

show tables;

image

select * from person;

image

GitHub 加速计划 / js / json
41.72 K
6.61 K
下载
适用于现代 C++ 的 JSON。
最近提交(Master分支:1 个月前 )
960b763e 4 个月前
8c391e04 7 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐