搜索引擎日志分析

更新时间: 2024-04-02 14:09:56

# 需求

读取文件转换成RDD,并完成:

  • 打印输出:热门搜索时间段(小时精度) Top3
  • 打印输出:热门搜索词Top3
  • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  • 将数据转换为JSON格式,写出为文件

# 代码

"""
演示PySpark综合案例
"""
# 构建执行环境入口对象
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/python/python3.10.0/python.exe"
os.environ['HADOOP_HOME'] = "D:/python/hadoop3.0.0"

conf = SparkConf().setMaster("local[*]").setAppName("count_app")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

# 读取文件转换成RDD
value_list_rdd = sc.textFile("C:/Users/Administrator/Downloads/search_log.txt").map(lambda x:x.split('\t'))

# TODO1 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时
# 1.2 转换为(小时, 1)的二元元组
# 1.3 key分组聚合value
# 1.4 排序(降序)
# 1.5 取前三
result_1 = value_list_rdd.map(lambda x:x[0].split(':')[0])\
                            .map(lambda x:(x, 1))\
                            .reduceByKey(lambda a, b: a + b)\
                            .sortBy(lambda x:x[1], ascending=False, numPartitions=1)\
                            .take(3)
print("热门搜索时间段Top3",result_1)

# TODO 需求2: 热门搜索词 Top3
# 2.1 取出全部的搜索词
# 2.2 (词,1)二元数组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result_2 = value_list_rdd.map(lambda x:(x[2], 1))\
                        .reduceByKey(lambda a, b: a + b)\
                        .sortBy(lambda x:x[1], ascending=False, numPartitions=1)\
                        .take(3)
print("热门搜索词Top3",result_2)

# TODO 需求3: 统计黑马程序员关键字在什么时间段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时,1)的二元元组
# 3.3 key分组聚合value
# 3.4 排序(降序)
# 3.5 取前1
result_3 = value_list_rdd.filter(lambda x:x[2] == '黑马程序员')\
                        .map(lambda x:(x[0].split(':')[0],1))\
                        .reduceByKey(lambda a, b: a + b)\
                        .sortBy(lambda x:x[1], ascending=False, numPartitions=1)\
                        .take(1)
print("统计黑马程序员关键字在什么时间段被搜索的最多", result_3)

# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
value_list_rdd.map(lambda x:{"time":x[0],"user_id":x[1],"key_word":x[2],"rank1":x[3],"rank2":x[4],"url":x[5]})\
            .saveAsTextFile("./output_json")

sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61