> 文章列表 > 实验手册 - 第5周Pair RDD与分区

实验手册 - 第5周Pair RDD与分区

实验手册 - 第5周Pair RDD与分区

目录标题

    • 实验1
    • 实验2
    • 实验3
    • 实验4
    • 实验5
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext()

实验1

实验1:已知内存数据源 list01 = [1, 2, 3, 4, 11, 12, 13, 14, 21, 22, 23, 24, 31, 32, 33, 34, 41, 42, 43, 44, 51, 52, 53, 54]

(1) 获取其默认的RDD分区

list01 =\\
[1, 2, 3, 4, 11, 12, 13, 14, 21, 22, 23, 24, 31, 32, 33, 34, 41, 42, 43, 44, 51, 52, 53, 54]
sc.parallelize(list01).getNumPartitions()
12

(2) 查看各个分区上的数据,以列表形式返回

sc.parallelize(list01).glom().collect()
[[1, 2],[3, 4],[11, 12],[13, 14],[21, 22],[23, 24],[31, 32],[33, 34],[41, 42],[43, 44],[51, 52],[53, 54]]

(3) 统计各个分区上数据的行数,以列表形式返回

sc.parallelize(list01).glom().map(lambda x:[x,1]).collect()
[[[1, 2], 1],[[3, 4], 1],[[11, 12], 1],[[13, 14], 1],[[21, 22], 1],[[23, 24], 1],[[31, 32], 1],[[33, 34], 1],[[41, 42], 1],[[43, 44], 1],[[51, 52], 1],[[53, 54], 1]]

(4) 把分区个数设置为2,并再次查看各个分区上的数据

sc.parallelize(list01).coalesce(2).glom().collect()
[[1, 2, 3, 4, 11, 12, 13, 14, 21, 22, 23, 24],[31, 32, 33, 34, 41, 42, 43, 44, 51, 52, 53, 54]]

实验2

实验2:已知 scores = [(“Jim”, (“Hadoop”, 80)), (“Jack”, (“Hadoop”, 69)), (“Mike”, (“Hadoop”, 88)), (“Jackson”, (“Hadoop”, 69)),
(“Jim”, (“Spark”, 66)), (“Jack”, (“Spark”, 91)),(“Mike”, (“Spark”, 77)), (“Jackson”, (“Spark”, 79)),

(“Jim”,(“NoSQL”, 62)), (“Jack”, (“NoSQL”, 72)), (“Mike”, (“NoSQL”, 89)), (“Jackson”, (“NoSQL”, 99))]

(1) 查看各个分区上的数据

scores =\\
[("Jim", ("Hadoop", 80)), ("Jack", ("Hadoop", 69)), ("Mike", ("Hadoop", 88)), ("Jackson", ("Hadoop", 69)),
("Jim", ("Spark", 66)), ("Jack", ("Spark", 91)),("Mike", ("Spark", 77)), ("Jackson", ("Spark", 79)),
("Jim",("NoSQL", 62)), ("Jack", ("NoSQL", 72)), ("Mike", ("NoSQL", 89)), ("Jackson", ("NoSQL", 99))]
sc.parallelize(scores).glom().collect()
[[('Jim', ('Hadoop', 80))],[('Jack', ('Hadoop', 69))],[('Mike', ('Hadoop', 88))],[('Jackson', ('Hadoop', 69))],[('Jim', ('Spark', 66))],[('Jack', ('Spark', 91))],[('Mike', ('Spark', 77))],[('Jackson', ('Spark', 79))],[('Jim', ('NoSQL', 62))],[('Jack', ('NoSQL', 72))],[('Mike', ('NoSQL', 89))],[('Jackson', ('NoSQL', 99))]]

(2) 把分区个数设置为2, 且把将Key相同的数据放在同一个分区上

sc.parallelize(scores).coalesce(2).glom().collect()
# sc.parallelize(scores).partitionBy(2).glom().collect()
[[('Jim', ('Hadoop', 80)),('Jack', ('Hadoop', 69)),('Mike', ('Hadoop', 88)),('Jackson', ('Hadoop', 69)),('Jim', ('Spark', 66)),('Jack', ('Spark', 91))],[('Mike', ('Spark', 77)),('Jackson', ('Spark', 79)),('Jim', ('NoSQL', 62)),('Jack', ('NoSQL', 72)),('Mike', ('NoSQL', 89)),('Jackson', ('NoSQL', 99))]]
sc.parallelize(scores).coalesce(2)\\
.partitionBy(2,lambda x:x=='Mike')\\
.glom().collect()
[[('Jim', ('Hadoop', 80)),('Jack', ('Hadoop', 69)),('Jackson', ('Hadoop', 69)),('Jim', ('Spark', 66)),('Jack', ('Spark', 91)),('Jackson', ('Spark', 79)),('Jim', ('NoSQL', 62)),('Jack', ('NoSQL', 72)),('Jackson', ('NoSQL', 99))],[('Mike', ('Hadoop', 88)), ('Mike', ('Spark', 77)), ('Mike', ('NoSQL', 89))]]

(3) 分区数仍然是2, 但是把名字长度等于4(Mike和Jack)的数据放在同一个分区上

输出结果:

[[(‘Jim’, (‘Hadoop’, 80)), (‘Jackson’, (‘Hadoop’, 69)), (‘Jim’, (‘Spark’, 66)), (‘Jackson’, (‘Spark’, 79)), (‘Jim’, (‘NoSQL’, 62)),
(‘Jackson’, (‘NoSQL’, 99))],

[(‘Jack’, (‘Hadoop’, 69)), (‘Mike’, (‘Hadoop’, 88)), (‘Jack’, (‘Spark’, 91)), (‘Mike’, (‘Spark’, 77)), (‘Jack’, (‘NoSQL’, 72)),
(‘Mike’, (‘NoSQL’, 89))]]

sc.parallelize(scores)\\
.repartitionAndSortWithinPartitions(2,lambda x : len(x)==4)\\
.glom().collect()
[[('Jackson', ('Hadoop', 69)),('Jackson', ('Spark', 79)),('Jackson', ('NoSQL', 99)),('Jim', ('Hadoop', 80)),('Jim', ('Spark', 66)),('Jim', ('NoSQL', 62))],[('Jack', ('Hadoop', 69)),('Jack', ('Spark', 91)),('Jack', ('NoSQL', 72)),('Mike', ('Hadoop', 88)),('Mike', ('Spark', 77)),('Mike', ('NoSQL', 89))]]

实验3

实验3:

(1) 把genome-scores.csv文件上传到Ubuntu虚拟机的HDFS的/input目录中,同时设置以64MB作为HDFS数据块的分割依据

 hdfs dfs -Ddfs.blocksize=64M -put genome-scores.csv /input

(2) 查看genome-scores.csv被分多少个HDFS数据块

 hdfs fsck /input/genome-scores.csv

实验手册 - 第5周Pair RDD与分区

(3) 在Windows中读取HDFS的genome-scores.csv文件后,获取RDD的的分区个数

sc.textFile("hdfs://hadoop-server-00:9000/input/genome-scores.csv")\\
.getNumPartitions()
2

(4) 在Windows中读取HDFS的genome-scores.csv文件后, 将其减少到1个分区,并以"org.apache.hadoop.io.compress.GzipCodec"形式压缩后重新保存到/input/scores目录中

sc.textFile(r"hdfs://hadoop-server-00:9000/input/genome-scores.csv")\\
.coalesce(1).saveAsTextFile(r"hdfs://hadoop-server-00:9000/input/scores","org.apache.hadoop.io.compress.GzipCodec")

实验4

实验4:已知:list01= [“Java”, “HBase”, “MongoDB”, “Hive”, “Python”, “Hadoop”], 请编程找出该列表中的并以"H"开头的课程,以列表返回。同时通过累加器变量返回以"H"开头的课程的个数

# 创建累加器变量
count01 = sc.accumulator(0)
list01= ["Java", "HBase", "MongoDB", "Hive", "Python", "Hadoop"]
rdd = sc.parallelize(list01)
# rdd1 = rdd.filter(lambda x:'H' in x)def f(x):global count01;if 'H' in x:
#         count01.add(1)count01 += 1return True
print(rdd.filter(f).collect(),count01,sep ="\\n")
['HBase', 'Hive', 'Hadoop']
3

实验5

实验5:已知:

gender = { 0: “男”, 1: “女”}

address = [(“张三”, 0, “郑州”), (“李四”, 0, “洛阳”), (“王五”, 0, “许昌”), (“赵柳”, 1, “开封”)]

请利用广播变量编程,将address列表关联到gender字典,输出结果:

[(“张三”, “男”, “郑州”), (“李四”, “男”, “洛阳”), (“王五”, “男”, “许昌”), (“赵柳”, “女”, “开封”)]

gender = { 0: "男", 1: "女"}
address = [("张三", 0, "郑州"), ("李四", 0, "洛阳"), ("王五", 0, "许昌"), ("赵柳", 1, "开封")]
# 创建广播变量
broadcast_states = sc.broadcast(gender)
rdd = sc.parallelize(address)
def state_convert(code):return broadcast_states.value[code]
result = rdd.map(lambda x:(x[0],state_convert(x[1]),x[2])).collect()
result
[('张三', '男', '郑州'), ('李四', '男', '洛阳'), ('王五', '男', '许昌'), ('赵柳', '女', '开封')]