> 文章列表 > Spark 实现重新分区 partitionBy、coalesce、repartition(附代码演示)

Spark 实现重新分区 partitionBy、coalesce、repartition(附代码演示)

Spark 实现重新分区 partitionBy、coalesce、repartition(附代码演示)

文章目录

 

1、partitionBy

源码中的定义(部分)

调用方式

2、coalesce

源码中的定义

调用方式

3、repartition

源码中的定义

调用方式 

repartition和coalesce的区别

代码演示 (跳转代码)


实现重新分区,本质上就是重新配置并行度,也就是说,如果我们将分区数设置为n,那么Spark作业的并行度就是n

1、partitionBy

传入自定义的分区策略 默认为按照key进行hash

源码中的定义(部分):

def partitionBy(self: "RDD[Tuple[K, V]]",numPartitions: Optional[int],partitionFunc: Callable[[K], int] = portable_hash,
) -> "RDD[Tuple[K, V]]":

调用方式

rdd.partitionBy(参数1,参数2)

参数1,分区数

参数2,传入自定义的分区策略 默认为按照key进行hash


2、coalesce

coalesce既可以实现RDD分区的合并缩小,也可以实现RDD分区的扩大

源码中的定义:

def coalesce(self: "RDD[T]", numPartitions: int, shuffle: bool = False) -> "RDD[T]":

调用方式

rdd.coalesce(参数1,参数2)

参数1,传入分区个数

参数2,传入 shuffle,默认为 False,为False则不进行shuffle,带有分区捆绑进行重新分区

若shuffle = True, 则进行shuffle操作,不带有分区捆绑进行重新分区 分区更加均匀(避免数据倾斜)


3、repartition

返回一个新的RDD,它刚好有numPartitions(参数1)个分区。

源码中的定义

def repartition(self: "RDD[T]", numPartitions: int) -> "RDD[T]":

调用方式 

rdd.repartition(参数1)

参数1,分区数

可以增加或减少此RDD中的分区。在内部,它使用shuffle来重新分发数据。

如果正在减少这个RDD中的分区数量,考虑使用' coalesce',这可以避免执行shuffle。


repartition和coalesce的区别:

repartition默认开启shuffle,

coalesce默认不开启,但可用参数配置,

实际上repartition底层调用的就是coalesce


代码演示 

# coding:utf8
import timefrom pyspark import SparkConf, SparkContextif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 1, 7, 8, 9, 10], 3)print(rdd.glom().collect())print(rdd.coalesce(2).glom().collect())print(rdd.coalesce(2, shuffle=True).glom().collect())print(rdd.repartition(2).glom().collect())

制作不易,点个赞吧~