介绍

  • Spark是Apache基金会旗下的顶级开源项目,用于对海量数据进行大规模分布式计算
  • PySparkSparkPython实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发
  • PySpark不仅可以作为Python第三方库使用,也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。

例子1:简单使用

from pyspark import SparkConf, SparkContext

# 创建SparkConf对象
# setMaster 是设置模式
# 链式调用
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 基于 SparkConf 类对象创建 SparkContext 对象
sc = SparkContext(conf = conf)

# 打印出PySpark的当前版本
print(sc.version)

# 停止 SparkContext 对象工作(停止PySpark程序)
sc.stop()

PySpark编程模型

SparkContext类对象是PySpark编程中一切功能的入口:

PySpark编程主要分为如下三大步骤:

在这里插入图片描述

在这里插入图片描述

RDD对象

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体,即:

数据存储在RDD内

各类数据的计算方法,也都是RDD的成员方法

RDD的数据计算方法,返回值依旧是RDD对象

例子:Python对象转换为RDD对象

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 通过 parallelize 方法将Python 对象加载到Spark内,成为 RDD 对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5}) # 集合
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})

print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

print("----------------------------------")
# 文件内容转换为 RDD 对象
rdd = sc.textFile("./hello.txt")
print(rdd.collect())

sc.stop()

输出结果:

[1, 2, 3, 4, 5]

[1, 2, 3, 4, 5]

[‘a’, ‘b’, ‘c’, ‘d’, ‘e’, ‘f’, ‘g’]

[1, 2, 3, 4, 5]

[‘key1’, ‘key2’]

----------------------------------

[‘itheima itheima itcast itheima’, ‘spark python spark python itheima’, ‘itheima itcast itcast itheima python’, ‘python python spark pyspark pyspark’, ‘itheima python pyspark itcast spark’]

可以看出字典对象转换后只保留key,字符串会被拆成字符列表

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐