> 文章列表 > 实验手册 - 第3周Spark RDD

实验手册 - 第3周Spark RDD

实验手册 - 第3周Spark RDD

目录标题

    • 1. Transformation算子:
    • 2. Action算子
    • 3. 实验
      • 实验1
      • 实验2
      • 实验3
      • 实验4

本次实验需要用到的Transformation和Action算子:

1. Transformation算子:

(1) map

(2) filter

(3) flatMap

(4) sortBy

(5) reduceByKey(针对Pair RDD,即Key-Value形式的RDD):作用是对RDD中key相同的数据做聚合操作,比如:求最大值、最小值、平均值、总和等。

(6) mapValues

2. Action算子

(1) take

(2) collect

(3) reduce: 作用是对RDD中所有数据做聚合操作,比如:求最大值、最小值、平均值、总和等。

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext()

示例1: 已知 courses =[“Hadoop”, “Spark”, “Java”]

请编程:

(1) 将courses中的课程按照字母升序排序。输出结果:[‘Hadoop’, ‘Java’, ‘Spark’]

courses =["Hadoop", "Spark", "Java"]
sc.parallelize(courses).sortBy(lambda x : x).collect()        
['Hadoop', 'Java', 'Spark']

(2) 将courses中的课程按照最后一个字母降序排序。输出结果:[‘Hadoop’, ‘Spark’, ‘Java’]

sc.parallelize(courses).sortBy(lambda x : x[len(x) - 1], False).collect()  
['Hadoop', 'Spark', 'Java']

(3) 根据courses中各个课程单词长度降序排序。输出结果:[‘Hadoop’, ‘Spark’, ‘Java’]

sc.parallelize(courses).sortBy(len, False).collect()  
['Hadoop', 'Spark', 'Java']

示例2: 已知 scores01 =[(“张三”, “Hadoop”, 80), (“李四”, “Hadoop”, 88), (“张三”, “Spark”, 90), (“李四”, “Spark”, 70)]

请编程:

(1) 按照成绩降序排序。输出结果:
[(‘张三’, ‘Spark’, 90),
(‘李四’, ‘Hadoop’, 88),
(‘张三’, ‘Hadoop’, 80),
(‘李四’, ‘Spark’, 70)]

scores01 =[("张三", "Hadoop", 80), ("李四", "Hadoop", 88), ("张三", "Spark", 90), ("李四", "Spark", 70)]
scoreRDD = sc.parallelize(scores01)
scoreRDD.sortBy(lambda x : x[2], False).collect()   
[('张三', 'Spark', 90),('李四', 'Hadoop', 88),('张三', 'Hadoop', 80),('李四', 'Spark', 70)]

(2) 计算出全部成绩总和。输出结果:328

scoreRDD.map(lambda x : x[2]).reduce(lambda x, y : x + y)
328

(3) 计算各个同学的总成绩。输出结果:[(‘李四’, 158), (‘张三’, 170)]

# 首先通过map转换为Pair RDD,然后再使用Pair RDD特有的reduceByKey算子
scoreRDD.map(lambda x : (x[0], x[2])).reduceByKey(lambda x, y : x + y).collect()
## 或者
from operator import add
scoreRDD.map(lambda x : (x[0], x[2])).reduceByKey(add).collect()
[('张三', 170), ('李四', 158)]

(4) 汇总显示各个同学的各科成绩。输出结果:

[ (‘李四’, [ (‘Hadoop’, 88), (‘Spark’, 70) ] ),

(‘张三’, [ (‘Hadoop’, 80), (‘Spark’, 90) ] ) ]

scoreRDD.map(lambda x : (x[0], [(x[1], x[2])])).reduceByKey(lambda x, y : x + y).collect()
[('张三', [('Hadoop', 80), ('Spark', 90)]),('李四', [('Hadoop', 88), ('Spark', 70)])]

(5) 找出各科的最高成绩。输出结果:[ (‘Hadoop’, 88), (‘Spark’, 90) ]

scoreRDD.map(lambda x : (x[1], x[2])).reduceByKey(lambda x, y : x if x > y else y).collect()
## 或者
scoreRDD.map(lambda x : (x[1], x[2])).reduceByKey(max).collect()
[('Spark', 90), ('Hadoop', 88)]

3. 实验

实验1

实验1
已知 words =[“alpha”, “Bike”, “car”, “Food”]。

请编程:

(1) 不区分大小写,字母降序排序。输出结果:[‘Food’, ‘car’, ‘Bike’, ‘alpha’]

words =["alpha", "Bike", "car", "Food"]
wordsRdd = sc.parallelize(words)
wordsRdd.sortBy(lambda x:x.lower(),False).collect()    
['Food', 'car', 'Bike', 'alpha']

(2)按各个单词第二个字母(索引1)降序排序,输出结果:[‘Food’,‘alpha’,‘Bike’,‘car’]

wordsRdd.sortBy(lambda x:x[1],False).collect()
['Food', 'alpha', 'Bike', 'car']

(3) 统计所有单词的总长度。输出结果:16

wordsRdd.map(lambda x:len(x)).reduce(lambda x,y:x+y)
16

实验2

实验2: 已知 SalesCategories目录中包含的文件有3个字段,分别是:序号、商品种类ID、商品种类名称

请编程:

(1) 按照商品种类名称升序排序后输出。输出结果:

[‘22,4,Accessories’,

‘27,5,Accessories’,

‘40,6,Accessories’,

‘16,3,As Seen on TV!’,

‘3,2,Baseball & Softball’,
…略… ,
‘56,8,World Cup Shop’,

‘14,3,Yoga & Pilates’]

salesRdd = sc.textFile(r'D:\\juniortwo\\spark\\Spark2023-02-20\\SalesCategories')
# salesRdd.take(5)
salesRdd.sortBy(lambda x:x.split(',')[2]).collect()
['22,4,Accessories','27,5,Accessories','40,6,Accessories','16,3,As Seen on  TV!','3,2,Baseball & Softball','4,2,Basketball','42,7,Bike & Skate Shop','47,7,Boating','12,3,Boxing & MMA',"25,5,Boys' Apparel",'43,7,Camping & Hiking','9,3,Cardio Equipment','17,4,Cleats','13,3,Electronics','37,6,Electronics','21,4,Featured Shops','45,7,Fishing','11,3,Fitness Accessories','1,2,Football',"26,5,Girls' Apparel",'32,6,Golf Apparel','34,6,Golf Bags & Carts','36,6,Golf Balls','35,6,Golf Gloves','33,6,Golf Shoes','7,2,Hockey','44,7,Hunting & Shooting','46,7,Indoor/Outdoor Games','55,8,International Soccer',"20,4,Kids' Footwear","38,6,Kids' Golf Clubs",'5,2,Lacrosse','49,8,MLB','57,8,MLB Players','54,8,MLS',"23,5,Men's Apparel","18,4,Men's Footwear","30,6,Men's Golf Clubs",'8,2,More Sports','52,8,NBA','53,8,NCAA','50,8,NFL','58,8,NFL Players','51,8,NHL','29,5,Shop By Sport','2,2,Soccer','10,3,Strength Training','39,6,Team Shop','6,2,Tennis & Racquet','28,5,Top Brands','41,6,Trade-In','15,3,Training by Sport','48,7,Water Sports',"24,5,Women's Apparel","19,4,Women's Footwear","31,6,Women's Golf Clubs",'56,8,World Cup Shop','14,3,Yoga & Pilates']

(2) 将各个种类所包含的商品种类名称,汇总显示中间用"|"分割,并根据种类ID升序排序

输出结果:

[(‘2’, ‘Football | Soccer | Baseball & Softball | Basketball | Lacrosse | Tennis & Racquet | Hockey | More Sports’),

(‘3’, ‘Cardio Equipment | Strength Training | Fitness Accessories | Boxing & MMA | Electronics | Yoga & Pilates | Training by Sport | As Seen on TV!’),

(‘4’, “Cleats | Men’s Footwear | Women’s Footwear | Kids’ Footwear | Featured Shops | Accessories”),

…略…,

(‘8’, ‘MLB | NFL | NHL | NBA | NCAA | MLS | International Soccer | World Cup Shop | MLB Players | NFL Players’)]

salesRdd.map(lambda x:(x.split(',')[1],x.split(',')[2])).reduceByKey(lambda x, y : x +'|'+ y).sortBy(lambda x : x[0]).collect()
[('2','Football|Soccer|Baseball & Softball|Basketball|Lacrosse|Tennis & Racquet|Hockey|More Sports'),('3','Cardio Equipment|Strength Training|Fitness Accessories|Boxing & MMA|Electronics|Yoga & Pilates|Training by Sport|As Seen on  TV!'),('4',"Cleats|Men's Footwear|Women's Footwear|Kids' Footwear|Featured Shops|Accessories"),('5',"Men's Apparel|Women's Apparel|Boys' Apparel|Girls' Apparel|Accessories|Top Brands|Shop By Sport"),('6',"Men's Golf Clubs|Women's Golf Clubs|Golf Apparel|Golf Shoes|Golf Bags & Carts|Golf Gloves|Golf Balls|Electronics|Kids' Golf Clubs|Team Shop|Accessories|Trade-In"),('7','Bike & Skate Shop|Camping & Hiking|Hunting & Shooting|Fishing|Indoor/Outdoor Games|Boating|Water Sports'),('8','MLB|NFL|NHL|NBA|NCAA|MLS|International Soccer|World Cup Shop|MLB Players|NFL Players')]

(3) 统计各个种类的数量,并根据数量降序排序。

输出结果:[(‘6’, 12), (‘8’, 10), (‘2’, 8), (‘3’, 8), (‘5’, 7), (‘7’, 7), (‘4’, 6)]

from operator import add
salesRdd.map(lambda x:(x.split(',')[1],x.split(',')[2])).reduceByKey(lambda x, y : x +'|'+y).mapValues(lambda x : len(x.split('|'))).sortBy(lambda x : x[1],False).collect()
[('6', 12), ('8', 10), ('2', 8), ('3', 8), ('5', 7), ('7', 7), ('4', 6)]

实验3

实验3: 已知 Others\\food.csv是学生就餐记录(有表头)

请编程:

(1) 去掉表头

foodRdd = sc.textFile(r'D:\\juniortwo\\spark\\Spark2023-02-20\\Others\\food.csv')
firstLine = foodRdd.first()
noHeader = foodRdd.filter(lambda x:x!=firstLine)

(2) 找到日期为2021/01/04和2021/01/05的就餐记录。输出结果:

[‘2021/01/04,张三,潼关肉夹馍’,

‘2021/01/04,李四,麻辣豆腐’,

‘2021/01/04,王五,麻辣烫’,

‘2021/01/04,赵柳,麻辣豆腐’,

‘2021/01/05,张三,潼关肉夹馍’,

‘2021/01/05,李四,麻辣豆腐’,

‘2021/01/05,王五,鸡子炒馍’,

‘2021/01/05,赵柳,麻辣豆腐’]

noHeader.filter(lambda x:x.split(',')[0] in ('2021/01/04','2021/01/05')).collect()
['2021/01/04,张三,潼关肉夹馍','2021/01/04,李四,麻辣豆腐','2021/01/04,王五,麻辣烫','2021/01/04,赵柳,麻辣豆腐','2021/01/05,张三,潼关肉夹馍','2021/01/05,李四,麻辣豆腐','2021/01/05,王五,鸡子炒馍','2021/01/05,赵柳,麻辣豆腐']

(3) 统计各个同学吃各种不同食物的次数。输出结果:

[ ((‘王五’, ‘烧茄子’), 1),

((‘赵柳’, ‘焖面’), 2),

((‘张三’, ‘潼关肉夹馍’), 3),

((‘李四’, ‘麻辣烫’), 1),

((‘王五’, ‘鱼香茄子’), 1),

((‘赵柳’, ‘鸡蛋灌饼’), 1),

…略… ]

parirdd = noHeader.flatMap(lambda x : [((x.split(',')[1],x.split(',')[2]),1)])
# parirdd.collect()
parirdd.reduceByKey(lambda x,y : x+y).collect()
[(('王五', '烧茄子'), 1),(('赵柳', '焖面'), 2),(('张三', '潼关肉夹馍'), 3),(('李四', '麻辣烫'), 1),(('王五', '鱼香茄子'), 1),(('赵柳', '鸡蛋灌饼'), 1),(('张三', '宫保鸡丁'), 3),(('王五', '狮子头'), 2),(('王五', '香菇饺子'), 1),(('李四', '麻辣豆腐'), 3),(('王五', '鸡子炒馍'), 1),(('赵柳', '茄汁面'), 1),(('王五', '扬州炒饭'), 2),(('赵柳', '鱼香茄子'), 1),(('张三', '麻辣豆腐'), 1),(('张三', '扬州炒饭'), 1),(('李四', '茄汁面'), 3),(('李四', '鸡子炒馍'), 1),(('赵柳', '重庆小面'), 2),(('赵柳', '麻辣豆腐'), 3),(('王五', '麻辣烫'), 2),(('张三', '茄汁面'), 1),(('李四', '扬州炒饭'), 2),(('张三', '鱼香茄子'), 1)]

(4) 统计各个同学吃各种不同食物的次数。以下面形式输出结果:

[(‘王五’, [ (‘烧茄子’, 1), (‘鱼香茄子’, 1),(‘狮子头’, 2),(‘香菇饺子’, 1),(‘鸡子炒馍’, 1), (‘扬州炒饭’, 2),(‘麻辣烫’, 2) ]),

(‘赵柳’, [ (‘焖面’, 2), (‘鸡蛋灌饼’, 1), (‘茄汁面’, 1), (‘鱼香茄子’, 1), (‘重庆小面’, 2), (‘麻辣豆腐’, 3)] ),

(‘张三’, [ (‘潼关肉夹馍’, 3), (‘宫保鸡丁’, 3), (‘麻辣豆腐’, 1), (‘扬州炒饭’, 1), (‘茄汁面’, 1), (‘鱼香茄子’, 1) ]),

(‘李四’, [ (‘麻辣烫’, 1), (‘麻辣豆腐’, 3), (‘茄汁面’, 3), (‘鸡子炒馍’, 1), (‘扬州炒饭’, 2) ])]

parirdd.reduceByKey(lambda x,y : x+y).map(lambda x:(x[0][0],[(x[0][1],x[1])])).reduceByKey(lambda x,y : x+y).collect()
[('王五',[('烧茄子', 1),('鱼香茄子', 1),('狮子头', 2),('香菇饺子', 1),('鸡子炒馍', 1),('扬州炒饭', 2),('麻辣烫', 2)]),('赵柳',[('焖面', 2), ('鸡蛋灌饼', 1), ('茄汁面', 1), ('鱼香茄子', 1), ('重庆小面', 2), ('麻辣豆腐', 3)]),('张三',[('潼关肉夹馍', 3),('宫保鸡丁', 3),('麻辣豆腐', 1),('扬州炒饭', 1),('茄汁面', 1),('鱼香茄子', 1)]),('李四', [('麻辣烫', 1), ('麻辣豆腐', 3), ('茄汁面', 3), ('鸡子炒馍', 1), ('扬州炒饭', 2)])]

(5) 找出最受同学欢迎(吃的次数最多)的前3个食物。输出结果:[‘麻辣豆腐’, ‘扬州炒饭’, ‘茄汁面’,]

parirdd1 = parirdd.reduceByKey(lambda x,y : x+y).map(lambda x:(x[0][0],[(x[0][1],x[1])])).reduceByKey(lambda x,y : x+y)
parirdd2 = parirdd1.map(lambda x : x[1])
# parirdd2.collect()
parirdd2.flatMap(lambda x:x).reduceByKey(lambda x,y : x+y).sortBy(lambda x:x[1],False).map(lambda x:x[0]).take(3)
['麻辣豆腐', '扬州炒饭', '茄汁面']

实验4

实验4:Sales\\sales.csv文件中是某小型超市的销售记录。
请编程:找出销售量排名前3的商品及其销售数量。

输出结果:[(‘日记本’, 25), (‘笔芯’, 20), (‘啤酒’, 12)]

rdd = sc.textFile(r'D:\\juniortwo\\spark\\Spark2023-02-20\\Sales\\sales.csv')
rdd.take(3)
['销售时间,名称,商品编号,单价,销售数量','2021/01/01,茶杯,A0001,10,1','2021/01/01,订书机,B0001,20,1']
firstLine = rdd.first()
noHeader = rdd.filter(lambda x:x != firstLine)
rdd = noHeader.map(lambda x:(x.split(',')[1],x.split(',')[4]))
# rdd.collect()
# 使用rreduceByKey时需要进行类型转换
from operator import add
rdd.map(lambda x:(x[0],int(x[1]))).reduceByKey(add).sortBy(lambda x:x[1],False).take(3)
[('日记本', 25), ('笔芯', 20), ('啤酒', 12)]