[2.1]Spark DataFrame操作(一)之读取并过滤json文件
参考
场景
DataFrame总论
IT界数据存储与操作发展的四大阶段
1、代码+文件系统
2、J2EE+DB (存在的问题:数据库不能进行分布式计算)
3、Hive
4、SparkSQL+Hive => SparkSQL+Hive+DataFrame -> SparkSQL+Hive +DataFrame+DataSet(DataSet目前处于实验阶段)Hive+SparkSQL+DataFrame 黄金组合
1、Hive:负责廉价的数据仓库存储
2、SparkSQL:负责高速的计算
3、DataFrame:负责复杂的数据挖掘DataFrame基本操作
读取Spark自带的people.json文件,并对其进行一些基本的sql操作,文件原始内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
实验
java版
package cool.pengych.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class DataFrameOps {
public static void main(String[] args) {
/*
* 1、创建SQLContext
*/
SparkConf conf = new SparkConf().setAppName("My Frame Ops").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(jsc);
/*
* 2、创建DataFrame
*/
DataFrame df = sqlc.read().json("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json");
/*
* 3、SQL
*/
// select * from table
df.show();
//desc table
df.printSchema();
// select name from table
df.select("name").show();
// select name,age+10 from table
df.select(df.col("name"),df.col("age").plus(10)).show();
// select * from table where age >10
df.filter(df.col("age").gt(10)).show();
df.groupBy(df.col("age")).count().show();
}
}
scala版
package main.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object DataFrameOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("DataFram Ops")
val sqlContext = new SQLContext(new SparkContext(conf))
val df = sqlContext.read.json("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df.show
df.printSchema
df.select("name").show
df.select("name", "age").show
df.select(df("name"),df("age")+10).show
df.filter(df("age")>10).show
}
}
执行结果
语句df.filter(df("age")>10).show
的执行结果:
16/05/25 22:27:44 INFO DAGScheduler: Job 10 finished: show at DataFrameOps.scala:18, took 0.037776 s
+---+------+
|age| name|
+---+------+
| 30| Andy|
| 19|Justin|
+---+------+
16/05/25 22:27:44 INFO SparkContext: Invoking stop() from shutdown hook
总结
“A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood”
1、DataFrame是基于列的,包含了每条记录的Metadata信息-也就是说DataFrame的优化是基于列内部的优化,而不像RDD一样基于行进行优化。
2、DataFrame从形式上看最大的不同点是其天生是分布式的;你可以简单的认为spark中的DataFrame是一个分布式的Table。
更多推荐
所有评论(0)