> 文章列表 > Spark----RDD(弹性分布式数据集)

Spark----RDD(弹性分布式数据集)

Spark----RDD(弹性分布式数据集)

RDD

文章目录

  • RDD
    • RDD是什么?
    • 为什么需要RDD?
    • RDD的五大属性
    • WordCount中的RDD的五大属性
    • 如何创建RDD?
    • RDD的操作
      • 两种
      • 基本算子/操作/方法/API
      • 分区操作
      • 重分区操作
      • 聚合操作
        • 四个有key函数的`区别`
      • 关联操作
      • 排序操作
  • RDD的缓存/持久化
    • cache和persist
    • checkpoint检查点
      • APl
  • 共享变量
  • Shuffle本质

RDD是什么?

RDD是弹性分布式数据(Resilient Distributed Dataset)的缩写,它是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作。RDD有以下特点:

  • RDD是不可变的,也就是说,一旦创建了一个RDD,就不能对它进行修改或删除,只能通过转换操作(transformation)生成新的RDD。
  • RDD是可分区的,也就是说,一个RDD可以被切分成多个小块(partition),每个小块可以在集群中的不同节点上并行计算。
  • RDD是可并行计算的,也就是说,RDD支持一系列的高级操作(action),比如map、filter、reduce等,这些操作可以在每个分区上并行执行,并返回结果。

为什么需要RDD?

因为RDD可以提高数据处理的效率容错性。相比于传统的MapReduce框架,RDD有以下优势:

  • RDD可以支持内存计算,也就是说,RDD可以将数据缓存在内存中,避免了频繁的磁盘I/O,从而提高了速度。
  • RDD可以支持多次迭代计算,也就是说,RDD可以在多个操作之间重用中间结果,避免了重复的数据读取和计算,从而提高了效率。
  • RDD可以支持容错机制,也就是说,RDD可以通过血统(lineage)记录数据的转换过程,当某个分区发生故障时,可以根据血统重新计算该分区的数据,从而恢复数据。
    • RDD的血统(lineage)是指RDD之间的依赖关系,也可以叫做RDD的逻辑执行计划。RDD的血统是由一系列的转换操作组成的,每个转换操作都会产生一个新的RDD,并记录它的父RDD和转换函数。RDD的血统可以用来追踪数据的来源和变化过程,也可以用来在节点故障时恢复丢失的数据分区。RDD的血统可以通过toDebugString方法查看,它会返回一个字符串,表示RDD的依赖图。

RDD的五大属性

RDD的五大属性是:

  • 一组分片(Partition),即数据集的基本组成单位。每个分片可以在集群中的不同节点上并行计算。每个分片都会被一个计算任务处理,并决定并行计算的粒度。
  • 一个计算每个分区的函数(compute)。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。该函数会对每个分区的数据进行转换或操作,并返回一个迭代器。
  • RDD之间的依赖关系(dependencies)。RDD可以通过转换操作生成新的RDD(每次转换都会生成一个新的RDD),从而形成一个有向无环图(DAG)。当某个分区发生故障时,可以根据依赖关系重新计算该分区的数据。
  • 一个分区器(partitioner),即RDD的分片函数。该函数决定了RDD的分片数量和分片方式。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  • 一个优先位置列表(preferred locations),即每个分区的最佳计算位置。该列表通常是根据数据源的位置来确定的,比如HDFS文件的块位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置,以减少数据传输开销。

WordCount中的RDD的五大属性

在这里插入图片描述

如何创建RDD?

RDD中的数据可以来源于2个地方:本地集合或外部数据源

  1. 多种API
    sc.parallelize(本地集合,分区数)
    sc.makeRDD(本地集合,分区数) //底层使用的parallelize
    sc.textFile(本地/HDFS文件/文件夹,分区数) //注意不要用它读取大量小文件

    sc.wholeTextFiles(本地/HDFS文件夹,分区数) //专门用来读取小文件的

  2. 获取RDD分区数
    rdd.getNumPartitions //获取rdd的分区数,底层是partitions.lengthrdd.partitions.length //获取rdd的分区数

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDDemo1 {def main(args: Array[String]): Unit = {//创建环境val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)//创建RDDval rdd1: RDD[Int] = sc.parallelize(1 to 10) //8val rdd2: RDD[Int] = sc.parallelize(1 to 10,3) //3val rdd3: RDD[Int] = sc.makeRDD(1 to 10)//底层是parallelize //8val rdd4: RDD[Int] = sc.makeRDD(1 to 10,4) //4//RDD[一行行的数据]val rdd5: RDD[String] = sc.textFile("data/input/words.txt")//2val rdd6: RDD[String] = sc.textFile("data/input/words.txt",3)//3//RDD[一行行的数据]//ratings10文件夹中有10个小文件val rdd7: RDD[String] = sc.textFile("data/input/ratings10")//10val rdd8: RDD[String] = sc.textFile("data/input/ratings10",3)//10//RDD[(文件名, 一行行的数据),(文件名, 一行行的数据)....]val rdd9: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10")//2val rdd10: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10",3)//3println(rdd1.getNumPartitions)//8 //底层partitions.lengthprintln(rdd2.partitions.length)//3println(rdd3.getNumPartitions)//8println(rdd4.getNumPartitions)//4println(rdd5.getNumPartitions)//2println(rdd6.getNumPartitions)//3println(rdd7.getNumPartitions)//10println(rdd8.getNumPartitions)//10println(rdd9.getNumPartitions)//2println(rdd10.getNumPartitions)//3}
}

RDD的操作

两种

  1. Transformation转换操作:返回一个新的RDD

    which create a new dataset from an existing one

    所有Transformation函数都是Lazy,不会立即执行,需要Action函数触发

  2. Action动作操作:(返回值不是RDD无返回值或返回其他的)

    which return a value to the driver program after

    running a computation on the datase
    所有Action函数立即执行(Eager),比如count、first.collect、take等

在这里插入图片描述

基本算子/操作/方法/API

map,faltMap,filter,foreach,saveAsTextFile

import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDDemo2 {def main(args: Array[String]): Unit = {// 创建环境val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")// 创建RDDval lines: RDD[String] = sc.textFile("data/input/words.txt") //2// transformationval result: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)// 输出/actionresult.foreach(println)result.saveAsTextFile("data/output/result4")}
}

分区操作

每个RDD由多分区组成的,实际开发中如果涉及到资源相关操作建议对每个分区进行操作,即:
使用mapPartitions代替map函数
使用foreachPartition代替foreache函数

  • map和mapPartitions都是转换操作,它们的区别是map是对每个元素应用一个函数,而mapPartitions是对每个分区的迭代器应用一个函数
  • mapPartitions的优势是可以减少函数调用的次数,提高性能,尤其是在涉及数据库连接等IO操作时,可以在每个分区只建立一次连接。
  • mapPartitions的劣势是可能会造成内存溢出,因为它一次要处理一个分区的所有数据,如果数据量太大,就会占用过多的内存空间。而map是一条一条地处理数据,不会有这个问题。
  • foreach和foreachPartition都是动作操作,它们的区别是foreach是对每个元素执行一个函数,而foreachPartition是对每个分区的迭代器执行一个函数
  • foreachPartition的优势和劣势与mapPartitions类似,都是可以减少函数调用次数,提高性能,但也可能造成内存溢出。
  • foreach和foreachPartition一般用于在程序末尾将数据落地到存储系统中,如mysql,es或hbase中。

代码示例

  • map函数的例子:
//假设有一个RDD[Int],每个元素都加1
val rdd = sc.parallelize (List (1, 2, 3, 4, 5))
val result = rdd.map (x => x + 1) //使用匿名函数
result.collect () // Array (2, 3, 4, 5, 6)
  • mapPartitions函数的例子:
//假设有一个RDD[Int],每个分区的元素都乘以分区号
val rdd = sc.parallelize (List (1, 2, 3, 4, 5), 2) //分成两个分区
val result = rdd.mapPartitions ((iter, pid) => iter.map (x => x * pid)) //使用匿名函数
result.collect () // Array (0, 0, 6, 8, 10)
  • foreach函数的例子:
//假设有一个RDD[String],打印每个元素
val rdd = sc.parallelize (List ("a", "b", "c", "d", "e"))
rdd.foreach (x => println (x)) //使用匿名函数
//输出:
a
b
c
d
e
  • foreachPartition函数的例子:
//假设有一个RDD[String],打印每个分区的元素个数
val rdd = sc.parallelize (List ("a", "b", "c", "d", "e"), 2) //分成两个分区
rdd.foreachPartition (iter => println (iter.size)) //使用匿名函数
//输出:
3
2

使用mapPartitions和foreachPartition的主要目的

mapPartitions和foreachPartition的主要目的是为了减少函数调用的次数,提高性能。因为在Spark中,每个分区都是一个任务,每个任务都会在一个Executor上运行。如果使用map或foreach,那么每个元素都会调用一次函数,这会增加函数调用的开销,尤其是在涉及数据库连接等IO操作时,每个元素都会创建和关闭一个连接,这会降低性能。而如果使用mapPartitions或foreachPartition,那么每个分区只会调用一次函数,这样就可以在每个分区只建立一次连接,减少函数调用的开销,提高性能。

重分区操作

重分区操作是指改变RDD的分区数,有两种常用的方法:repartitioncoalesce

在这里插入图片描述

  • repartition是通过shuffle将数据重新分布到指定数量的分区,可以增加或减少分区数,也可以改变数据的分布。
  • coalesce是通过合并或拆分现有的分区来改变分区数,可以减少分区数,也可以增加分区数(需要开启shuffle),但不会改变数据的分布。
  • repartition的优势是可以平衡数据的负载,避免数据倾斜,也可以根据某些列来进行分区,提高后续操作的效率。
  • repartition的劣势是会触发shuffle操作,消耗网络和磁盘资源,降低性能。
  • coalesce的优势是可以减少分区数,减少任务调度的开销,也可以避免shuffle操作,提高性能。
  • coalesce的劣势是不能平衡数据的负载,可能导致数据倾斜,也不能根据某些列来进行分区。

代码示例

  • repartition的例子:
//假设有一个RDD[Int],分成3个分区
val rdd = sc.parallelize(List(1,2,3,4,5,6),3)
//查看分区数
rdd.partitions.length // 3
//使用repartition增加分区数到6
val rdd2 = rdd.repartition(6)
//查看分区数
rdd2.partitions.length // 6
//使用repartition减少分区数到2
val rdd3 = rdd.repartition(2)
//查看分区数
rdd3.partitions.length // 2
  • coalesce的例子:
//假设有一个RDD[Int],分成3个分区
val rdd = sc.parallelize(List(1,2,3,4,5,6),3)
//查看分区数
rdd.partitions.length // 3
//使用coalesce减少分区数到2,不开启shuffle
val rdd2 = rdd.coalesce(2)
//查看分区数
rdd2.partitions.length // 2
//使用coalesce增加分区数到4,开启shuffle
val rdd3 = rdd.coalesce(4,true)
//查看分区数
rdd3.partitions.length // 4

聚合操作

聚合操作可以按有key的聚合函数和无key的聚合函数分类:

  • 有key的聚合函数是指对PairRDD中的数据,按照key进行分组或合并的函数,例如reduceByKey、groupByKey、foldByKey、aggregateByKey等。这些函数需要传入一个或多个函数作为参数,来指定如何对每个key的value进行聚合。有key的聚合函数返回的结果仍然是PairRDD,即key-value对。

    • 四个有key函数的用法如下:

      • reduceByKey是指对PairRDD中的数据,按照key进行分组,然后对每个key的value使用一个函数进行聚合,分区内和分区间的聚合规则相同。用法是:RDD.reduceByKey(func, numPartitions),其中func是聚合函数,numPartitions是分区数。
      • groupByKey是指对PairRDD中的数据,按照key进行分组,然后返回一个新的RDD,其中每个key对应一个Iterable的value。用法是:RDD.groupByKey(numPartitions),其中numPartitions是分区数。
      • aggregateByKey是指对PairRDD中的数据,按照key进行分组,然后对每个key的value使用一个初始值和两个函数进行聚合,分区内和分区间的聚合规则可以不同。用法是:RDD.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions),其中zeroValue是初始值,seqFunc是分区内的聚合函数,combFunc是分区间的聚合函数,numPartitions是分区数。
      • foldByKey是指对PairRDD中的数据,按照key进行分组,然后对每个key的value使用一个初始值和一个函数进行聚合,分区内和分区间的聚合规则相同。用法是:RDD.foldByKey(zeroValue, func, numPartitions),其中zeroValue是初始值,func是聚合函数,numPartitions是分区数。
  • 无key的聚合函数是指对RDD中的数据,不区分key,直接进行整体或单列的聚合的函数,例如count、sum、avg等。这些函数不需要传入参数,只需要指定要聚合的列或整个数据集。无key的聚合函数返回的结果不是RDD,而是一个单一的值或数组。

代码示例

  • 有key的聚合函数的代码示例:
//假设有一个PairRDD,表示每个城市的销售额
val sales = sc.parallelize(List(("Beijing", 100), ("Shanghai", 200), ("Guangzhou", 150), ("Beijing", 120), ("Shanghai", 300), ("Guangzhou", 180)))//使用reduceByKey函数,按照城市进行分组,然后对每个城市的销售额求和
val totalSales = sales.reduceByKey((x, y) => x + y)
//输出结果为:(Beijing,220),(Shanghai,500),(Guangzhou,330)//使用groupByKey函数,按照城市进行分组,然后对每个城市的销售额求平均值
val avgSales = sales.groupByKey().mapValues(values => values.sum / values.size)
//输出结果为:(Beijing,110),(Shanghai,250),(Guangzhou,165)//使用foldByKey函数,按照城市进行分组,然后对每个城市的销售额求和
val totalSales = sales.foldByKey(0)(_ + _)
//输出结果为:(Beijing,220),(Shanghai,500),(Guangzhou,330)//使用aggregateByKey函数,按照城市进行分组,然后对每个城市的销售额求最大值和最小值
val maxMinSales = sales.aggregateByKey((Int.MinValue, Int.MaxValue))((acc, value) => (math.max(acc._1, value), math.min(acc._2, value)), //在每个分区内对每个key的value求最大值和最小值(acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2)) //在不同分区间对每个key的value求最大值和最小值
)
//输出结果为:(Beijing,(120,100)),(Shanghai,(300,200)),(Guangzhou,(180,150))
  • 无key的聚合函数的代码示例:
//假设有一个RDD,表示每个人的年龄
val ages = sc.parallelize(List(20, 25, 30, 35, 40))//使用count函数,统计RDD中的元素个数
val count = ages.count()
//输出结果为:5//使用sum函数,求RDD中的元素之和
val sum = ages.sum()
//输出结果为:150//使用avg函数,求RDD中的元素的平均值
val avg = ages.mean()
//输出结果为:30.0

四个有key函数的区别

  • reduceByKey和groupByKey的区别是,reduceByKey在分区内和分区间使用相同的函数对value进行聚合,而groupByKey只是按照key进行分组,不对value进行聚合。reduceByKey比groupByKey更高效,因为它可以减少shuffle的数据量。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y4YBexxe-1681978717408)(img/1609646014985.png)]在这里插入图片描述

在这里插入图片描述

  • aggregateByKey和reduceByKey的区别是,aggregateByKey可以指定一个初始值,并且可以使用不同的函数对分区内和分区间的value进行聚合,而reduceByKey只能使用相同的函数。aggregateByKey比reduceByKey更灵活,因为它可以实现更多的逻辑。
  • foldByKey和aggregateByKey的区别是,foldByKey只能使用相同的函数对分区内和分区间的value进行聚合,而aggregateByKey可以使用不同的函数。foldByKey是aggregateByKey的简化形式,适用于分区内和分区间的聚合规则相同的情况。

关联操作

关联操作是指对两个或多个RDD进行连接,根据不同的条件和方式,产生一个新的RDD。

  • 对于RDD,有以下几种关联操作:
    • join:根据两个RDD中的相同的key进行内连接,返回一个新的PairRDD,其中每个key对应一个元组,包含两个RDD中的value。
    • leftOuterJoin:根据两个RDD中的相同的key进行左外连接,返回一个新的PairRDD,其中每个key对应一个元组,包含左边RDD中的value和右边RDD中的value(如果存在)或None(如果不存在)。
    • rightOuterJoin:根据两个RDD中的相同的key进行右外连接,返回一个新的PairRDD,其中每个key对应一个元组,包含左边RDD中的value(如果存在)或None(如果不存在)和右边RDD中的value。
    • fullOuterJoin:根据两个RDD中的相同的key进行全外连接,返回一个新的PairRDD,其中每个key对应一个元组,包含左边RDD中的value(如果存在)或None(如果不存在)和右边RDD中的value(如果存在)或None(如果不存在)。
    • subtract:根据两个RDD中的不同的key进行差集操作,返回一个新的PairRDD,其中只包含左边RDD中有而右边RDD中没有的键值对。
    • cartesian:根据两个RDD中的所有可能的键值对进行笛卡尔积操作,返回一个新的PairRDD,其中包含所有可能的组合。

代码示例

  • 对于RDD,假设有以下两个PairRDD:
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(List(("a", 4), ("b", 5), ("d", 6)))
  • 那么可以进行以下关联操作:
// join
rdd1.join(rdd2).collect()
// 输出:Array((a,(1,4)), (b,(2,5)))// leftOuterJoin
rdd1.leftOuterJoin(rdd2).collect()
// 输出:Array((a,(1,Some(4))), (b,(2,Some(5))), (c,(3,None)))// rightOuterJoin
rdd1.rightOuterJoin(rdd2).collect()
// 输出:Array((a,(Some(1),4)), (b,(Some(2),5)), (d,(None,6)))// fullOuterJoin
rdd1.fullOuterJoin(rdd2).collect()
// 输出:Array((a,(Some(1),Some(4))), (b,(Some(2),Some(5))), (c,(Some(3),None)), (d,(None,Some(6))))// subtract
rdd1.subtract(rdd2).collect()
// 输出:Array((c,3))// cartesian
rdd1.cartesian(rdd2).collect()
// 输出:Array(((a,1),(a,4)), ((a,1),(b,5)), ((a,1),(d,6)), ((b,2),(a,4)), ((b,2),(b,5)), ((b,2),(d,6)), ((c,3),(a,4)), ((c,3),(b,5)), ((c,3),(d,6)))

排序操作

排序操作是指对RDD中的数据按照某种规则进行升序或降序的操作。

  • 对于RDD,有以下几种排序操作:
    • sortBy:根据指定的函数对RDD中的元素进行排序,可以指定升序或降序,默认为升序。
    • sortByKey:对PairRDD中的键进行排序,可以指定升序或降序,默认为升序。
    • top:返回RDD中最大的n个元素,可以指定排序规则,默认为自然顺序。
    • takeOrdered:返回RDD中最小的n个元素,可以指定排序规则,默认为自然顺序。

代码示例

val rdd = sc.parallelize(List(5, 3, 7, 1, 9, 4))
val pairRdd = sc.parallelize(List(("a", 5), ("b", 3), ("c", 7), ("d", 1), ("e", 9), ("f", 4)))
  • 可以进行以下排序操作:
// sortBy
rdd.sortBy(x => x).collect() // 输出:Array(1, 3, 4, 5, 7, 9)
rdd.sortBy(x => x, false).collect() // 输出:Array(9, 7, 5, 4, 3, 1)// sortByKey
pairRdd.sortByKey().collect() // 输出:Array((a,5), (b,3), (c,7), (d,1), (e,9), (f,4))
pairRdd.sortByKey(false).collect() // 输出:Array((f,4), (e,9), (d,1), (c,7), (b,3), (a,5))// top
rdd.top(3) // 输出:Array(9, 7, 5)
pairRdd.top(3) // 输出:Array((f,4), (e,9), (d,1))// takeOrdered
rdd.takeOrdered(3) // 输出:Array(1, 3, 4)
pairRdd.takeOrdered(3) // 输出:Array((a,5), (b,3), (c,7))
  • 对pairRdd的排序都是按照字符abcd的大小来排的,因为默认的排序规则是按照字典序。如果想按照数字大小来排,可以指定一个自定义的排序规则:
// 定义一个Ordering对象,用于比较两个元组的第二个元素
object ValueOrdering extends Ordering[(String, Int)] {def compare(a: (String, Int), b: (String, Int)) = a._2 compare b._2
}// 使用自定义的排序规则进行排序
pairRdd.sortByKey()(ValueOrdering).collect() // 输出:Array((d,1), (b,3), (f,4), (a,5), (c,7), (e,9))
pairRdd.sortByKey(false)(ValueOrdering).collect() // 输出:Array((e,9), (c,7), (a,5), (f,4), (b,3), (d,1))
pairRdd.top(3)(ValueOrdering) // 输出:Array((e,9), (c,7), (a,5))
pairRdd.takeOrdered(3)(ValueOrdering) // 输出:Array((d,1), (b,3), (f,4))
  • 也可以直接使用Ordering类提供的by方法,根据指定的函数生成一个Ordering对象。这样的话,你就不用自己定义一个Ordering对象了。
pairRdd.sortByKey()(top(3)(Ordering.by(_._2))).collect() // 输出:Array((d,1), (b,3), (f,4), (a,5), (c,7), (e,9))
pairRdd.sortByKey(false)(top(3)(Ordering.by(_._2))).collect() // 输出:Array((e,9), (c,7), (a,5), (f,4), (b,3), (d,1))
pairRdd.top(3)(top(3)(Ordering.by(_._2))) // 输出:Array((e,9), (c,7), (a,5))
pairRdd.takeOrdered(3)(top(3)(Ordering.by(_._2))) // 输出:Array((d,1), (b,3), (f,4))
      • 这个函数就是_._2,表示取元组的第二个元素。所以这个排序规则就是按照元组的第二个元素进行比较。这样的写法更简洁,但是也更难理解,所以要看自己的喜好。😊

RDD的缓存/持久化

缓存解决什么问题?–解决的是热点数据频繁访问的效率问题
在Spark开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

RDD的缓存/持久化是Spark的一个重要特性,它可以提高迭代计算和交互式查询的性能,避免重复计算。

RDD的缓存/持久化有三种方式:cachepersistcheckpoint

cache和persist

  • cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。

    • cache是persist的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY)将数据持久化到内存中。

      在这里插入图片描述

    • persist可以指定不同的存储级别,比如MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER等,这些存储级别可以根据数据是否使用硬盘,是否使用堆外内存,是否序列化,是否有副本等因素进行选择。

      • 缓存级别
        在这里插入图片描述
    • cache和persist都是lazy的算子,需要触发action操作才会执行持久化。

    在这里插入图片描述

代码示例

cache的代码示例:

// 创建一个RDD
val rdd = sc.parallelize(1 to 10)// 对RDD进行一些转换操作
val rdd2 = rdd.map(_ * 2)// 对RDD进行缓存,使用默认的存储级别MEMORY_ONLY
rdd2.cache()// 对RDD进行一些action操作,触发缓存
rdd2.count()
rdd2.collect()//清空缓存
rdd2.unpersist()

persist的代码示例:

// 创建一个RDD
val rdd = sc.parallelize(1 to 10)// 对RDD进行一些转换操作
val rdd2 = rdd.map(_ * 2)// 对RDD进行持久化,指定存储级别为MEMORY_AND_DISK_SER
rdd2.persist(StorageLevel.MEMORY_AND_DISK_SER)// 对RDD进行一些action操作,触发持久化
rdd2.count()
rdd2.collect()//清空缓存
rdd2.unpersist()

如果想手动移除一个RDD的缓存,而不是等待该RDD被Spark自动移除,可以使用RDD.unpersist()方法。

checkpoint检查点

  • checkpoint: 将数据持久化到磁盘,并切断RDD的依赖关系,使得后续的操作可以从检查点处读取数据,而不需要从源头处重新计算。
    • checkpoint适用于计算代价特别大或者依赖链特别长的RDD。

    • checkpoint需要触发两次action操作才会执行持久化,一次是为了标记需要检查点的RDD,另一次是为了启动一个新的job来写入检查点数据。

    • checkpoint之后,RDD的依赖关系会被切断,而cache和persist之后,RDD的依赖关系还是存在的。

    • checkpoint之后,缓存的数据会被保存到HDFS中,不会丢失,而cache和persist之后,如果内存或磁盘不足,缓存的数据可能会丢失或被删除。

    • 一般来说,在使用checkpoint之前最好先使用cache或persist进行缓存,这样可以避免重新计算被标记的RDD。

APl

sc.setCheckpointDir(HDFS路径)//设置checkpoint路径,开发中一般设置为HDFS的目录rdd.checkpoint//对计算复杂且后续会被频繁使用的RDD进行checkpoint

代码示例

checkpoint()的代码示例

// 创建一个SparkContext对象
val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)// 设置检查点目录
sc.setCheckpointDir("hdfs://localhost:9000/checkpoint")// 创建一个RDD
val rdd = sc.parallelize(1 to 10)// 对RDD进行一些转换操作
val rdd2 = rdd.map(_ * 2)// 对RDD进行检查点,将数据持久化到磁盘,并切断依赖关系
rdd2.checkpoint()// 对RDD进行一些action操作,触发检查点
rdd2.count()
rdd2.collect()

checkpoint没有清除缓存的方法

共享变量

Spark共享变量是指可以在多个任务或节点之间共享的变量,它们可以用来实现一些特定的功能或优化性能。

Spark有两种共享变量,分别是:

  • 广播变量(broadcast variable),用来把一个变量在所有节点的内存之间进行共享,避免每个任务都拷贝一份变量的副本,节省网络和内存资源。
  • 累加器(accumulator),用来对信息进行聚合,比如计数或求和,只能由驱动程序读取,由任务更新。

Spark共享变量的使用方法是

  • 广播变量可以通过调用SparkContext.broadcast(v)方法从一个普通变量v中创建,然后通过调用value方法获取广播变量的值。

    在这里插入图片描述

  • 累加器可以通过调用SparkContext.longAccumulator(name)SparkContext.doubleAccumulator(name)方法创建一个数值类型的累加器,然后通过调用add(n)方法更新累加器的值,或者通过调用value方法获取累加器的值。

Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
    //创建环境val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//需求:// 以词频统计WordCount程序为例,处理的数据word2.txt所示,包括非单词符号,// 做WordCount的同时统计出特殊字符的数量//创建一个计数器/累加器val mycounter: LongAccumulator = sc.longAccumulator("mycounter")//定义一个特殊字符集合val ruleList: List[String] = List(",", ".", "!", "#", "$", "%", "?")//将集合作为广播变量广播到各个节点val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)//创建RDDval lines: RDD[String] = sc.textFile("data/input/words2.txt")//transformationval wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_)).flatMap(_.split("\\\\s+")).filter(ch => {//获取广播数据val list: List[String] = broadcast.valueif (list.contains(ch)) { //如果是特殊字符mycounter.add(1)false} else { //是单词true}}).map((_, 1)).reduceByKey(_ + _)//输出wordcountResult.foreach(println)val chResult: lang.Long = mycounter.valueprintln("特殊字符的数量:"+chResult)

Shuffle本质

shuffle本质是洗牌

Shuffle的本质是指在Spark中进行一些需要跨分区交换数据的操作时,比如reduceByKey,join,groupByKey等,需要将属于同一个key的数据发送到同一个分区中,这个过程就涉及到了数据的重新分区和网络传输,这就是Shuffle。

Shuffle的本质可以用以下几个步骤来描述:

  • 在Shuffle操作之前的阶段,称为map阶段,每个任务会根据分区器(Partitioner)和聚合器(Aggregator)对自己处理的数据进行本地分区和聚合,并将结果写入本地磁盘文件中,这些文件称为map输出文件或Shuffle文件。
  • 在Shuffle操作之后的阶段,称为reduce阶段,每个任务会根据自己的分区号向map阶段的所有任务发送请求,获取属于自己分区的数据,并将这些数据拉取到自己的内存或磁盘中,然后进行后续的处理。
  • 在整个Shuffle过程中,Spark会通过一些机制来优化性能和资源利用率,比如使用序列化和压缩技术减少数据量,使用缓存和溢写策略控制内存占用,使用推测执行和动态资源分配机制提高任务执行效率等。

Shuffle的本质是Spark中最核心也最复杂的机制之一,它决定了Spark作业的性能和稳定性。

一个Shuffle文件的结构图如下:

----------------- Shuffle文件
分区0的数据 每个分区的数据由多个记录组成,每个记录由长度、键、值三部分构成
-----------------
分区1的数据
-----------------
-----------------
分区n的数据
-----------------
分区0的索引 每个分区的索引由偏移量和长度两部分构成,用于定位分区在文件中的位置
-----------------
分区1的索引
-----------------
-----------------
分区n的索引
-----------------

在这里插入图片描述

在这里插入图片描述