SparkSql中DataFrame与json、csv、mysql、hive数据的互操作
SparkSql中DataFrame与json、csv、mysql、hive数据的互操作
1.RDD转换成DataFrame
1.1 RDD to DataFrame
RDD转成DataFrame有2种方式,一种是通过隐式转换,一种是通过SparkSession来进行创建。
1.1.1 RDD隐式转换成DataFrame(推荐)
object RDDToDF {
def main(args: Array[String]): Unit = {
// 创建一个SparkSession
val session = SparkSession.builder().master("local[2]").appName("RDDToDF").getOrCreate()
// 通过session获取SparkContext
val sc = session.sparkContext
// 将文件中的数据转换成RDD
val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Person(x(0), x(1).toLong))
rdd.foreach(println)
// 导入隐式转换(****重要****)
import session.implicits._
// 将rdd转换成DataFrame
val df = rdd.toDF()
df.show()
// 打印 DataFrame 的 Schema约束信息
df.printSchema()
}
}
// 创建一个 case 类,Person封装RDD中的数据,需要给DataFrame形成一个Schema约束
case class Person(name: String, age: Long)
输出结果:
1.1.2 通过SaprkSession创建的方式
需要通过SchemaType来进行约束
object RDDToDF2 {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local[2]").appName("RDDToDF2").getOrCreate()
val sc = session.sparkContext
// 这里不使用case class类,使用spark 提供的 Row类
val rdd = sc.textFile("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.txt").map(_.split(" ")).map(x => Row(x(0), x(1).trim))
val schemaString = "name age"
val splits = schemaString.split(" ")
// 约束字段
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
// 约束类型
val schema = StructType(fields)
// 将RDD转换成DataFrame
val df = session.createDataFrame(rdd, schema)
// 显示DataFrame的数据
df.show()
// 打印schema
df.printSchema()
}
}
输出结果:
1.2 DataFrame几个api介绍
show():将DataFrame的数据直接输出
printSchema():打印schema约束信息
select():选择字段
filter():对DataFrame字段进行过滤
groupby():对字段进行分组
count():求数量
DataFrame还有一些其他的api,作用和sql语句类似
eg:
df.select("name").show()
结果:
# 这里$"age"需要导入隐式转换才行
df.select("age").filter($"age" >=28).show()
结果:
df.groupBy("name").count().show()
结果:
2.json文件转DataFrame
2.1 json转DataFrame
// 创建session对象
val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate()
// 通过session 读取 json数据,转换成DataFrame对象
val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json")
// 展示数据
df.show()
结果:
2.2 DataFrame创建temp view,使用sql语句查询
object JsonToDF {
def main(args: Array[String]): Unit = {
// 创建session对象
val session = SparkSession.builder().appName("JsonToDF").master("local[2]").getOrCreate()
// 通过session 读取 json数据,转换成DataFrame对象
val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json")
// 将DataFrame的数据,创建成一个临时的"表",表名:person,这个只能在当前的session域中有效
df.createOrReplaceTempView("person")
// 通过sql语句进行查询
session.sql("select * from person").show()
// 创建一个全局的temp“表”,这个可以跨session会话有效
df.createGlobalTempView("person")
session.newSession().sql("select * from global_temp.person where age>=28").show()
session.stop()
}
}
结果:
3.csv文件转DataFrame
3.1 csv文件转DataFrame
val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate()
// 通过指定格式,load数据,option:可以给文件格式添加参数,如是否有header。如果没有header,生成的DataFrame会默认生成如:_c1,_c2
//val df = session.read.format("csv").option("header", "true").load("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv")
// df封装的csv方法
val df = session.read.option("header", "true").csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people2.csv")
df.show()
3.2 保存DataFrame数据到文件
object CsvToDF {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("CsvToDF").master("local[2]").getOrCreate()
val df = session.read.csv("file:///D:/workspacescala/sparkdemo/sparksqldemo/people.csv")
// 导入隐式转换
import session.implicits._
// 将df的数据保存到本地文件,这里因为csv没有头信息,没有对应的schema
df.filter($"_c1" > 28).write.csv("d:/csvfile")
}
}
csv文件的储存和内容:
4.mysql数据与DataFrame的互操作
4.1 DataFrame将数据写入Mysql
object DFToMysql {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName("DFToMysql").master("local[2]").getOrCreate()
val df = session.read.json("file:///D:/workspacescala/sparkdemo/sparksqldemo/person.json")
df.show()
// df写入mysql的第一种方式
val p = new Properties()
p.put("user", "root")
p.put("password", "root")
// 将df数据写入mysql,这里mode的模式有4种:append:追加,overwrite:覆盖,error , ignore
// overwrite:如果表不存在,会自动创建,覆盖会将之间定义好字段类型,进行修改
df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p)
//df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", p)
// df写入mysql的第二种方式
/*df.write.mode("append")
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/db_people")
.option("dbtable", "tb_people")
.option("user", "root")
.option("password", "root")
.save()*/
}
}
结果:
4.2 Mysql数据导入为DataFrame
object MySqlToDF {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local[2]").appName("MySqlToDF").getOrCreate()
// jdbc加载的第一种方式
val p = new Properties()
p.put("user", "root")
p.put("password", "root")
// 通过加载jdbc,获取DataFrame数据
val df = session.read.format("jdbc").jdbc("jdbc:mysql://localhost:3306/db_people", "tb_people", properties = p)
df.show()
// 加载jdbc的第2中方式
/*val df2 = session.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/db_people")
.option("dbtable", "tb_people")
.option("user", "root")
.option("password", "root")
.load()
df2.show()*/
}
}
5 Spark使用HQL查询hive中的数据
前提:需要 安装Hive
5.1 将hive-site.xml放入resource目录
hive-site.xml内容
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://server01:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
</configuration>
5.2 Spark操作Hive数据
Spark新的api实现,使用SparkSession实现
object HiveToDF {
def main(args: Array[String]): Unit = {
// 新版本
val session = SparkSession.builder().appName("hivetosql").master("local").enableHiveSupport().getOrCreate()
session.sqlContext.sql("use default")
session.sqlContext.sql("select * from student").show()
}
}
使用HiveContext实现
object HiveToDF {
def main(args: Array[String]): Unit = {
// 老版本
val conf = new SparkConf().setAppName("HiveToDF").setMaster("local")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use default")
hiveContext.sql("select * from student").show()
}
}
补充:spark-shell直接操作Hive数据
将hive-site.xml文件放入$SPARK_HOME/conf目录下
启动spark-shell时指定mysql连接驱动位置
spark-shell \
--master spark://server01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar
- 操作hive数据
# spark是SparkSession对象
spark.sqlContext.sql("select * from student").show()
或者
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
hiveContext.sql("select * from student").show()
补充2:Spark-sql直接操作Hive
将hive-site.xml文件放入$SPARK_HOME/conf目录下
启动spark-shell时指定mysql连接驱动位置
spark-sql \
--master spark://server01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--driver-class-path /hadoop/hive/lib/mysql-connector-java-5.1.35-bin.jar
3.操作hive数据
show tables;
select * from person;
更多推荐
所有评论(0)