PySpark综合案例
# Spark是什么
定义:Apache Spark是用于大规模数据(large-scala data) 处理的统一(unified)分析引擎。
简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。
Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发。而Python语言,则是Spark重点支持的方向。
# PySpark
Spark对Python语言的支持,重点体现在,Python第三方库:PySpark之上。
PySpark是由Spark官方开发的Python语言第三方库。
Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。
# 基础准备
# PySpark库的安装
同其它的第三方库一样,PySpark同样可以使用pip程序进行安装。
在“CMD”命令提示符程序内,输入:
pip install pyspark
或者使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
# 构建PySpark执行环境入口对象
想要使用PySpark库完成数据库处理,首先需要构建一个执行环境入口对象。
PySpark的执行环境入口对象是:类 SparkContext 的类对象。
# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
2
3
4
5
6
7
8
9
10
11
12
13
14
注意
第一次运行报错了
原因是我没有安装java的jdk:
- 安装Java:如果未安装,请前往Oracle官网下载并安装Java,我装的17。
- 设置JAVA_HOME环境变量:打开“系统属性”,选择“高级”选项卡,点击“环境变量”。在系统变量中点击“新建”,变量名输入JAVA_HOME,变量值输入Java安装目录的路径,不用带bin。
- 在path中添加 %JAVA_HOME%\bin
- 然后配置JAVA_HOME
再次运行就不报错了。
# PySpark的编程模型
SparkContext类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为如下三大步骤:
数据输入
通过SparkContext类对象的成员方法,完成数据的读取操作,读取后得到RDD类对象。数据处理计算
通过RDD类对象的成员方法,完成各种数据计算的需求。数据输出
将处理完成后的RDD对象,调用各种成员方法完成,写出文件、转换为list等操作。
# 数据输入
# RDD对象
PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象。
RDD全程为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体,即:
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD的成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
# Python数据容器转RDD对象
PySpark支持通过SparkContext对象的parallelize成员方法,将:
- list
- tuple
- set
- dict
- str
转换为PySpark的RDD对象
注意
- 字符串会被拆分出1个个的字符,存入RDD对象
- 字典仅有key会被存入RDD对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize((1,2,3,4,5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1,2,3,4,5})
rdd5 = sc.parallelize({"key1":"value1", "key2":"value2"})
# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect()) # [1, 2, 3, 4, 5]
print(rdd2.collect()) # [1, 2, 3, 4, 5]
print(rdd3.collect()) # ['a', 'b', 'c', 'd', 'e', 'f', 'g']
print(rdd4.collect()) # [1, 2, 3, 4, 5]
print(rdd5.collect()) # ['key1', 'key2']
sc.stop()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 读取文件转RDD对象
PySpark也支持通过SparkContext入口对象,来读取文件,来构建出RDD对象。
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 用textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("./hello.txt")
print(rdd.collect()) # ['itheima', 'nihao', 'daodao', 'youyou', 'haha']
sc.stop()
2
3
4
5
6
7
8
9
10
11
# 数据计算
# map方法
PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?
自然是依赖RDD对象内置丰富的:成员方法(算子)
# map算子
功能:map算子,是将RDD的数据,一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
语法:
rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数(方法)
# (T) -> U 表示的是方法的定义:
# ()表示传入参数,(T)表示传入1个参数,()表示没有传入参数
# T是泛型的代称,在这里表示 任意类型
# U也是泛型的代称,在这里表示 任意类型
# -> U 表示返回值
# (T) -> U 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A 总结起来的意思是:这事一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值和传入参数类型一致。
2
3
4
5
6
7
8
9
10
from pyspark import SparkConf, SparkContext
# 这样设置才不会报错,spark才能找到python解释权的位置
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1,2,3,4,5])
# 通过map方法将全部数据都乘以10
def func(data):
return data * 10
rdd2 = rdd.map(func)
print(rdd2.collect())
print(rdd.collect())
# [10, 20, 30, 40, 50]
# [1, 2, 3, 4, 5]
sc.stop()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
注意
一开始一直报错,后来把python版本改成了3.10.0就可以了
# 链式调用
对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子
# flatMap算子
功能:对rdd执行map操作,然后进行解除嵌套操作
解除嵌套:
# 嵌套的list
lst = [[1,2,3],[4,5,6],[7,8,9]]
# 如果解除了嵌套
lst = [1,2,3,4,5,6,7,8,9]
2
3
4
演示代码:
# coding:utf8
# 演示rdd的flatMap算子
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
# 0. 构建Spark执行环境
conf = SparkConf().setAppName("create rdd").setMaster("local[*]")
sc = SparkContext(conf = conf)
rdd = sc.parallelize(["a b c","a c e","e c a"])
# 按照空格切分数据后,解除嵌套
print(rdd.flatMap(lambda x:x.split(" ")).collect())
2
3
4
5
6
7
8
9
10
11
12
13
# reduceByKey算子
功能: 针对KV型RDD(二元元组),自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
用法:
rdd.reduceByKey(func)
# func:(V, V) -> V
# 接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。
2
3
代码:
rdd = sc.parallelize[('a',1),('a',1),('b',1),('b',1),('b',1)]
result = rdd.reduceByKey(lambda a,b: a+b)
print(result.collect())
# [('b',3),('a',2)]
2
3
4
注意
reduceByKey中接收的函数,只负责聚合,不理会分组,分组是自动 by key 来分组的
# Filter
功能:过滤想要的数据进行保留
语法:
rdd.filter(func)
# func:(T) -> bool 传入1个参数进来随意类型,返回值必须是True or False
2
注意
返回值是True的数据被保留,False的数据被丢弃
示例代码:
# coding:utf8
# 演示rdd的filter算子
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("create rdd").setMaster("local[*]")
sc = SparkContext(conf = conf)
rdd = sc.parallelize([1,2,3,4,5])
# 保留奇数
print(rdd.filter(lambda x: x % 2 == 1).collect())
# [2, 4]
2
3
4
5
6
7
8
9
10
11
12
# distinct算子
功能:对RDD数据进行去重,返回新RDD
语法: rdd.distinct() 无需传参
演示代码:
# coding:utf8
# 演示rdd的distinct算子
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("create rdd").setMaster("local[*]")
sc = SparkContext(conf = conf)
rdd = sc.parallelize([1,1,3,3,4,4,6,6,9,9,9])
print(rdd.distinct().collect())
# [1, 9, 3, 4, 6]
2
3
4
5
6
7
8
9
10
11
# sortBy算子
功能:对RDD数据进行排序,基于指定的排序依据
语法:
rdd.sortBy(func, ascending=False, numPartitions=1)
# func:(T) -> U: 告知按照rdd中的哪个数据进行排序,比如 lambda x: x[1] 表示按照rdd中的第二列元素进行排序
# ascending True升序, False降序
# numPartitions: 用多少分区排序
2
3
4
"""
完成练习案例:单词计数统计
"""
# 1.构建执行环境入口对象
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("count_app")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("./hello.txt")
# 3.取出全部单词
rdd1 = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
rdd2 = rdd1.map(lambda x: (x, 1))
# 5.分组并求和
rdd3 = rdd2.reduceByKey(lambda a, b: a + b)
# 6.对结果进行排序
result = rdd3.sortBy(lambda x:x[1], ascending=False, numPartitions=1)
# 7.打印并输出结果
print(result.collect())
# [('itheima', 8), ('python', 6), ('spark', 4), ('itcast', 3), ('pyspark', 3)]
sc.stop()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 数据输出
数据输入:
- sc.parallelize
- sc.textFile
数据计算: 返回值还是rdd的方法
- rdd.map
- rdd.flatMap
- rdd.reduceByKey
- rdd.filter
- rdd.distinct
- rdd.sortBy
数据输出: 输出Python对象或文件,返回值不是rdd的方法
- collect
- reduce
- take
- count
# collect算子
功能:将RDD各个分区的数据,统一收集到Driver中,形成一个List对象
用法: rdd.collect() 返回值是一个list
# reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)
# func: (T, T) -> T
# 2参数传入,1个返回值,返回值和参数要求类型一致
2
3
执行流程图:
代码:
rdd = sc.parallelize(range(1,10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a,b: a + b))
2
3
# take算子
功能:取RDD的前N个元素,组合成list返回给你
sc.parallelize([3,2,1,4,5,6]).take(5)
# [3,2,1,4,5]
2
# count算子
功能:计算RDD有多少条数据,返回值是一个数字
sc.parallelize([3,2,1,4,5,6]).count()
# 6
2
3
# saveAsTextFile算子
功能:将RDD的数据写入文本文件中
- 输出的结果是一个文件夹
- 有几个分区就输出多少个结果文件
支持本地写出,hdfs等文件系统
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("count_app")
sc = SparkContext(conf=conf)
# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])
# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3),("Spark", 5),("Hi", 7)])
# 准备RDD3
rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])
# 输出到文件中
rdd1.saveAsTextFile("./output1")
rdd2.saveAsTextFile("./output2")
rdd3.saveAsTextFile("./output3")
sc.stop()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
注意事项
调用保存文件的算子,需要配置Hadoop依赖
- 下载Hadoop安装包
- 解压到电脑任意位置
- 在Python代码中使用os模块配置:os.environ['HADOOP_HOME'] = 'HADOOP解压文件夹路径'
- 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
- 下载hadoop.dll,并放入 C:/Windows/System32 文件夹内
# 构建执行环境入口对象
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"
os.environ['HADOOP_HOME'] = "D:/python/hadoop3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("count_app")
sc = SparkContext(conf=conf)
# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])
# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3),("Spark", 5),("Hi", 7)])
# 准备RDD3
rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])
# 输出到文件中
rdd1.saveAsTextFile("./output1")
rdd2.saveAsTextFile("./output2")
rdd3.saveAsTextFile("./output3")
sc.stop()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
数据被分散到了8个分区中
# 修改rdd分区为1个
- 方式1,SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf = conf)
2
3
- 方式2,创建RDD的时候设置(parallelize方法传入numSlices参数为1)
rdd1 = sc.parallelize([1,2,3,4,5], numSlices = 1)
rdd1 = sc.parallelize([1,2,3,4,5], 1)
2