PySpark综合案例

更新时间: 2024-04-01 11:28:35

# Spark是什么

定义:Apache Spark是用于大规模数据(large-scala data) 处理的统一(unified)分析引擎。
简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。 Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发。而Python语言,则是Spark重点支持的方向。

# PySpark

Spark对Python语言的支持,重点体现在,Python第三方库:PySpark之上。

PySpark是由Spark官方开发的Python语言第三方库。
Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。

# 基础准备

# PySpark库的安装

同其它的第三方库一样,PySpark同样可以使用pip程序进行安装。

在“CMD”命令提示符程序内,输入:
pip install pyspark
或者使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

# 构建PySpark执行环境入口对象

想要使用PySpark库完成数据库处理,首先需要构建一个执行环境入口对象。
PySpark的执行环境入口对象是:类 SparkContext 的类对象。

# 导包 
from pyspark import SparkConf, SparkContext

# 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

# 打印PySpark的运行版本
print(sc.version)

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14

注意

第一次运行报错了

原因是我没有安装java的jdk:

  1. 安装Java:如果未安装,请前往Oracle官网下载并安装Java,我装的17。
  2. 设置JAVA_HOME环境变量:打开“系统属性”,选择“高级”选项卡,点击“环境变量”。在系统变量中点击“新建”,变量名输入JAVA_HOME,变量值输入Java安装目录的路径,不用带bin。
  3. 在path中添加 %JAVA_HOME%\bin
  4. 然后配置JAVA_HOME

再次运行就不报错了。

# PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为如下三大步骤:

  • 数据输入
    通过SparkContext类对象的成员方法,完成数据的读取操作,读取后得到RDD类对象。

  • 数据处理计算
    通过RDD类对象的成员方法,完成各种数据计算的需求。

  • 数据输出
    将处理完成后的RDD对象,调用各种成员方法完成,写出文件、转换为list等操作。

# 数据输入

# RDD对象

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象。
RDD全程为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

# Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将:

  • list
  • tuple
  • set
  • dict
  • str
    转换为PySpark的RDD对象

注意

  • 字符串会被拆分出1个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

sc = SparkContext(conf=conf)

# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize((1,2,3,4,5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1,2,3,4,5})
rdd5 = sc.parallelize({"key1":"value1", "key2":"value2"})

# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect()) # [1, 2, 3, 4, 5]
print(rdd2.collect()) # [1, 2, 3, 4, 5]
print(rdd3.collect()) # ['a', 'b', 'c', 'd', 'e', 'f', 'g']
print(rdd4.collect()) # [1, 2, 3, 4, 5]
print(rdd5.collect()) # ['key1', 'key2']

sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 读取文件转RDD对象

PySpark也支持通过SparkContext入口对象,来读取文件,来构建出RDD对象。

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

sc = SparkContext(conf=conf)

# 用textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("./hello.txt")
print(rdd.collect()) # ['itheima', 'nihao', 'daodao', 'youyou', 'haha']

sc.stop()
1
2
3
4
5
6
7
8
9
10
11

# 数据计算

# map方法

PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?
自然是依赖RDD对象内置丰富的:成员方法(算子)

# map算子

功能:map算子,是将RDD的数据,一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
语法:

rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数(方法)
# (T) -> U 表示的是方法的定义:
#    ()表示传入参数,(T)表示传入1个参数,()表示没有传入参数
# T是泛型的代称,在这里表示 任意类型 
# U也是泛型的代称,在这里表示 任意类型
# -> U 表示返回值
# (T) -> U 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A 总结起来的意思是:这事一个方法,这个方法接受一个参数传入,传入参数类型不限,返回一个返回值,返回值和传入参数类型一致。
1
2
3
4
5
6
7
8
9
10
from pyspark import SparkConf, SparkContext
# 这样设置才不会报错,spark才能找到python解释权的位置
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1,2,3,4,5])

# 通过map方法将全部数据都乘以10
def func(data):
    return data * 10


rdd2 = rdd.map(func)


print(rdd2.collect())
print(rdd.collect())

# [10, 20, 30, 40, 50]
# [1, 2, 3, 4, 5]

sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

注意

一开始一直报错,后来把python版本改成了3.10.0就可以了

# 链式调用

对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子

# flatMap算子

功能:对rdd执行map操作,然后进行解除嵌套操作
解除嵌套:

# 嵌套的list
lst = [[1,2,3],[4,5,6],[7,8,9]]
# 如果解除了嵌套
lst = [1,2,3,4,5,6,7,8,9]
1
2
3
4

演示代码:

# coding:utf8
# 演示rdd的flatMap算子
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    # 0. 构建Spark执行环境
    conf = SparkConf().setAppName("create rdd").setMaster("local[*]")
    sc = SparkContext(conf = conf)

    rdd = sc.parallelize(["a b c","a c e","e c a"])

    # 按照空格切分数据后,解除嵌套
    print(rdd.flatMap(lambda x:x.split(" ")).collect())
1
2
3
4
5
6
7
8
9
10
11
12
13

# reduceByKey算子

功能: 针对KV型RDD(二元元组),自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
用法:

rdd.reduceByKey(func)
# func:(V, V) -> V
# 接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。
1
2
3

代码:

rdd = sc.parallelize[('a',1),('a',1),('b',1),('b',1),('b',1)]
result = rdd.reduceByKey(lambda a,b: a+b)
print(result.collect())
# [('b',3),('a',2)]
1
2
3
4

注意

reduceByKey中接收的函数,只负责聚合,不理会分组,分组是自动 by key 来分组的

# Filter

功能:过滤想要的数据进行保留
语法:

rdd.filter(func)
# func:(T) -> bool 传入1个参数进来随意类型,返回值必须是True or False
1
2

注意

返回值是True的数据被保留,False的数据被丢弃

示例代码:

# coding:utf8
# 演示rdd的filter算子
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("create rdd").setMaster("local[*]")
sc = SparkContext(conf = conf)

rdd = sc.parallelize([1,2,3,4,5])

# 保留奇数
print(rdd.filter(lambda x: x % 2 == 1).collect()) 
# [2, 4]
1
2
3
4
5
6
7
8
9
10
11
12

# distinct算子

功能:对RDD数据进行去重,返回新RDD
语法: rdd.distinct() 无需传参

演示代码:

# coding:utf8
# 演示rdd的distinct算子
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("create rdd").setMaster("local[*]")
sc = SparkContext(conf = conf)

rdd = sc.parallelize([1,1,3,3,4,4,6,6,9,9,9])

print(rdd.distinct().collect())
# [1, 9, 3, 4, 6]
1
2
3
4
5
6
7
8
9
10
11

# sortBy算子

功能:对RDD数据进行排序,基于指定的排序依据
语法:

rdd.sortBy(func, ascending=False, numPartitions=1)
# func:(T) -> U: 告知按照rdd中的哪个数据进行排序,比如 lambda x: x[1] 表示按照rdd中的第二列元素进行排序
# ascending True升序, False降序
# numPartitions: 用多少分区排序 
1
2
3
4
"""
完成练习案例:单词计数统计
"""
# 1.构建执行环境入口对象
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("count_app")
sc = SparkContext(conf=conf)

# 2.读取数据文件
rdd = sc.textFile("./hello.txt")

# 3.取出全部单词
rdd1 = rdd.flatMap(lambda x: x.split(" "))

# 4.将所有单词都转换成二元元组,单词为key,value设置为1
rdd2 = rdd1.map(lambda x: (x, 1))

# 5.分组并求和
rdd3 = rdd2.reduceByKey(lambda a, b: a + b)

# 6.对结果进行排序
result = rdd3.sortBy(lambda x:x[1], ascending=False, numPartitions=1)

# 7.打印并输出结果
print(result.collect())
# [('itheima', 8), ('python', 6), ('spark', 4), ('itcast', 3), ('pyspark', 3)]

sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 数据输出

  • 数据输入:

    • sc.parallelize
    • sc.textFile
  • 数据计算: 返回值还是rdd的方法

    • rdd.map
    • rdd.flatMap
    • rdd.reduceByKey
    • rdd.filter
    • rdd.distinct
    • rdd.sortBy
  • 数据输出: 输出Python对象或文件,返回值不是rdd的方法

    • collect
    • reduce
    • take
    • count

# collect算子

功能:将RDD各个分区的数据,统一收集到Driver中,形成一个List对象
用法: rdd.collect() 返回值是一个list

# reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合
语法:

rdd.reduce(func)
# func: (T, T) -> T
# 2参数传入,1个返回值,返回值和参数要求类型一致
1
2
3

执行流程图:

代码:

rdd = sc.parallelize(range(1,10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a,b: a + b))
1
2
3

# take算子

功能:取RDD的前N个元素,组合成list返回给你

sc.parallelize([3,2,1,4,5,6]).take(5)
 # [3,2,1,4,5]
1
2

# count算子

功能:计算RDD有多少条数据,返回值是一个数字

sc.parallelize([3,2,1,4,5,6]).count()

# 6
1
2
3

# saveAsTextFile算子

功能:将RDD的数据写入文本文件中

  • 输出的结果是一个文件夹
  • 有几个分区就输出多少个结果文件
    支持本地写出,hdfs等文件系统
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"

conf = SparkConf().setMaster("local[*]").setAppName("count_app")
sc = SparkContext(conf=conf)

# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])

# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3),("Spark", 5),("Hi", 7)])

# 准备RDD3
rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])

# 输出到文件中
rdd1.saveAsTextFile("./output1")
rdd2.saveAsTextFile("./output2")
rdd3.saveAsTextFile("./output3")

sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

注意事项

调用保存文件的算子,需要配置Hadoop依赖

# 构建执行环境入口对象
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"
os.environ['HADOOP_HOME'] = "D:/python/hadoop3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("count_app")
sc = SparkContext(conf=conf)

# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])

# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3),("Spark", 5),("Hi", 7)])

# 准备RDD3
rdd3 = sc.parallelize([[1,3,5],[6,7,9],[11,13,11]])

# 输出到文件中
rdd1.saveAsTextFile("./output1")
rdd2.saveAsTextFile("./output2")
rdd3.saveAsTextFile("./output3")

sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

数据被分散到了8个分区中

# 修改rdd分区为1个

  • 方式1,SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf = conf)
1
2
3
  • 方式2,创建RDD的时候设置(parallelize方法传入numSlices参数为1)
rdd1 = sc.parallelize([1,2,3,4,5], numSlices = 1)
rdd1 = sc.parallelize([1,2,3,4,5], 1)
1
2