你好,游客 登录
背景:
阅读新闻

Spark(1.6 版本)系列:Shuffle读写数据的源码解析

[日期:2018-09-02] 来源:sohu  作者:偷功 [字体: ]

读写数据的源码解析

1. Shuffle写数据的源码解析

SparkShuffle的整体框架中可以看到,在ShuffleManager提供了Shuffle相关数据块的写入与读取,即,对应的接口getWritergetReader

在解析Shuffle框架数据读取过程中,可以构建一个具有ShuffleDependencyRDD,查看执行过程中,Shuffle框架中的数据读写接口getWritergetReader如何使用,通过这种具体案例的方式来加深对源码的理解。

Spark中具体的执行机制可以参考本书的其他章节,在此仅分析与Shuffle直接相关的内容。通过DAG调度机制的解析,可以知道Spark中一个作业可以根据宽依赖切分Stages,而在Stages中,相应的Tasks也包含两种,即ResultTaskShuffleMapTask。其中,一个ShuffleMapTask会基于ShuffleDependency中指定的分区器,将一个RDD的元素拆分到多个buckets中,此时通过ShuffleManagergetWriter接口来获取数据与buckets的映射关系。而ResultTask对应的是一个将输出返回给应用程序Driver端的Task,在该Task执行过程中,最终都会调用RDDcompute对内部数据进行计算,而在带有ShuffleDependencyRDD中,在compute计算时,会通过ShuffleManagergetReader接口,获取上一个StageShuffle输出结果作为本次Task的输入数据。

首先查看ShuffleMapTask中的数据写流程,具体代码如下所示:

1.override def runTask(context: TaskContext): MapStatus = {

2.……

3.// 首先从SparkEnv获取ShuffleManager

4.// 然后从ShuffleDependency中获取注册到ShuffleManager时所得到的shuffleHandle

5.// 根据shuffleHandle和当前Task对应的分区ID,获取ShuffleWriter

6.// 最后根据获取的ShuffleWriter,调用其write接口,写入当前分区的数据。

7.var writer: ShuffleWriter[Any, Any] = null

8.try {

9.val manager = SparkEnv.get.shuffleManager

10.writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

11.writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

12.writer.stop(success = true).get

13.} catch {

14.……

15.}

16.}

对应具体的Shuffle读数据的实现机制,现有支持的三种方式在细节上都有所差异,具体源码解析可以参考后续针对这几种方式的各章节。

1.Shuffle读数据的源码解析

对应的数据读取器,从RDD5个抽象接口可知,RDD的数据流最终会经过算子操作,即RDD中的compute方法,下面以包含宽依赖的RDDCoGroupedRDD为例,查看如何获取Shuffle的数据。具体代码如下所示:

1.// 对指定分区进行计算的抽象接口,以下为CoGroupedRDD具体子类中该方法的实现

2.override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {

3.val split = s.asInstanceOf[CoGroupPartition]

4.val numRdds = dependencies.length

5.

6.// A list of (rdd iterator, dependency number) pairs

7.val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]

8.for ((dep, depNum) <- dependencies.zipWithIndex) dep match {

9.case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>

10.val dependencyPartition = split.narrowDeps(depNum).get.split

11.// Read them from the parent

12.val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)

13.rddIterators += ((it, depNum))

14.

15.case shuffleDependency: ShuffleDependency[_, _, _] =>

17.// 首先从SparkEnv获取ShuffleManager

18.// 然后从ShuffleDependency中获取注册到ShuffleManager

19.//所得到的shuffleHandle。根据shuffleHandle和当前Task对应的分区ID

20.// 获取ShuffleWriter

21.// 最后根据获取的ShuffleReader,调用其read接口,读取ShuffleMap输出。

16.val it = SparkEnv.get.shuffleManager

17..getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)

18..read()

19.rddIterators += ((it, depNum))

20.}

21.

22.val map = createExternalMap(numRdds)

23.for ((it, depNum) <- rddIterators) {

24.map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))

25.}

26.context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)

27.context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)

28.context.internalMetricsToAccumulators(

29.InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)

30.new InterruptibleIterator(context,

31.map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])

32.}

从代码中可以看到,带宽依赖的RDDcompute操作中,最终是通过SparkEnv中的ShuffleManager实例的getReader方法,获取数据的读取器的,然后再次调用读取器的read读取指定分区范围的Shuffle数据。

注意,是带宽依赖的RDD,而非ShuffleRDD,除了ShuffleRDD之外,还有其他RDD也可以带上宽依赖的,比如前面给出的CoGroupedRDD

目前支持的几种具体Shuffle实现机制在读取数据的处理上都是一样的,从源码角度可以看到,当前继承了ShuffleReader这一数据读取器的接口的具体子类,只有BlockStoreShuffleReader,因此本章内容仅在此对各种Shuffle实现机制的数据读取进行解析,后续各实现机制中不再重复描述。

源码解析的第一步仍然是查看该类的描述信息,具体如下所示:

1./**

2.* 通过从其他节点上请求读取Shuffle数据来接收并读取指定范围[起始分区, 结束分区) ——对应为左闭右开区间。

3.*

4.* requesting them from other nodes' block stores.

5.*/

从注释上可以看出,读取器负责上一Stage为下一Stage输出数据块的读取。从前面对ShuffleReader接口的解析可知,继承的具体子类需要实现真正的数据读取操作,即实现read方法。因此该方法便是需要重点关注的源码,一些关键的代码如下所示:

1./** 为该Reduce任务读取并合并key-values 值。

2.* Read the combined key-values for this reduce task */

3.override def read(): Iterator[Product2[K, C]] = {

4.// 真正的数据Iterator读取是通过ShuffleBlockFetcherIterator来完成的。

5.val blockFetcherItr = new ShuffleBlockFetcherIterator(

6.context,

7.blockManager.shuffleClient,

8.blockManager,

9.// 可以看到,当ShuffleMapTask完成后注册到mapOutputTracker的元数据信息

10.// 同样会通过mapOutputTracker来获取,在此同时还指定了获取的分区范围

11.// 通过该方法的返回值类型,

(, startPartition, endPartition),

13.// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility

14.// 默认读取时的数据大小限制为48m,对应后续并行的读取,

15.// 都是一种数据读取的控制策略,一方面可以避免目标机器占用过多带宽,

16.// 同时也可以启动并行机制,加快读取速度。

17.SparkEnv.get.conf.getSizeAsMb("spark.Reduce.maxSizeInFlight", "48m") * 1024 * 1024)

18.

19.// Wrap the streams for compression based on configuration

20.// 在此针对前面获取的各个数据块唯一标识ID信息极其对应的输入流进行处理

21.val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>

22.blockManager.wrapForCompression(blockId, inputStream)

23.}

24.……

25.// 对读取到的数据进行聚合处理

26.val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {

27.// 如果在Map端已经做了聚合的优化操作,则对读取到的聚合结果进行聚合,

28.// 注意此时的聚合的操作与数据类型和Map端未做优化的时候是不同的。

29.if (dep.mapSideCombine) {

30.// We are reading values that are already combined

31.val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]

32.// 针对Map端各分区针对Key进行合并后的结果再次聚合,

33.// Map的合并可以大大减少网络传输的数据量

34.dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)

35.} else {

36.// We don't know the value type, but also don't care -- the dependency *should*

37.// have made sure its compatible w/ this aggregator, which will convert the value

38.// type to the combined type C

39.val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]

40.// 针对未合并的keyValues的值进行聚合

41.dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)

42.}

43.} else {

44.require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

45.interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]

46.}

47.

48.// 在基于SortShuffle实现过程中,默认仅仅是基于PartitionId进行排序,

49.// 在分区的内部数据是没有排序的,因此添加了keyOrdering变量,

50.// 提供是否需要针对分区内的数据进行排序的标识信息

51.// Sort the output if there is a sort ordering defined.

52.dep.keyOrdering match {

53.case Some(keyOrd: Ordering[K]) =>

54.// 为了减少内存的压力,避免GC开销,引入了外部排序器对数据进行排序

55.// 当内存不足以容纳排序的数据量时,会根据配置的spark.shuffle.spill属性

56.// 来决定是否需要spill到磁盘中,默认情况下会打开spill开关,

57.// 不打开的话在数据量比较大时会引发内存溢出问题(Out of MemoryOOM

58.// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,

59.// the ExternalSorter won't spill to disk.

60.val sorter =

61.new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = Some(ser))

62.……

63.case None =>

64.// 不需要排序分区内部数据时直接返回

65.aggregatedIter

66.}

67.}

下面进一步解析数据读取的部分细节,首先是数据块获取、读取的ShuffleBlockFetcherIterator类,在类的构造体中调用了initialize方法(构造体中的表达式会在构造实例时执行),该方法中会根据数据块所在位置(本地节点或远程节点)分别进行读取,其中关键代码如下所示:

1.private[this] def initialize(): Unit = {

2.……

3.// 本地与远程的数据读取方式不同,因此先进行拆分,

4.// 注意拆分时会考虑一次获取的数据大小(拆分时会同时考虑并行数)封装请求,

5.// 最后会将剩余不足该大小的数据获取也封装为一个请求

6.// Split local and remote blocks.

7.val remoteRequests = ()

8.// Add the remote requests into our queue in a random order

9.// 存入需要远程读取的数据块请求信息

10.fetchRequests ++= Utils.randomize(remoteRequests)

11.

12.

13.// Send out initial requests for blocks, up to our maxBytesInFlight

14.// 发送数据获取请求

15.fetchUpToMaxBytes()

16.……

17.// 除了远程数据获取之外,下面是获取本地数据块的方法调用

18.// Get Local Blocks

19.fetchLocalBlocks()

20.……

21.}

Hadoop一样,Spark计算框架也是基于数据本地性,即移动数据而非计算的原则,因此在获取数据块时,也会考虑数据本地性,尽量从本地读取已有的数据块,然后再远程读取。

另外,数据块的本地性是通过ShuffleBlockFetcherIterator实例构建时所传入的位置信息来判断的,而该信息由MapOutputTracker实例的getMapSizesByExecutorId方法提供,可以参考该方法的返回值类型查看相关的位置信息,返回值类型为:Seq[(BlockManagerId, Seq[(BlockId, Long)])],其中BlockManagerIdBlockManager的唯一标识信息,BlockId是数据块的唯一信息,对应的Seq[(BlockId,Long)])表示一组数据块标识ID及其数据块大小的元组信息。

最后简单分析下如何设置分区内部的排序标识,当需要对分区内的数据进行排序时,会设置RDD中的宽依赖(ShuffleDependency)实例的keyOrdering变量,下面以基于排序的OrderedRDDFunctions提供的sortByKey方法给出解析,具体代码如下所示:

1.def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

2.: RDD[(K, V)] = self.withScope

3.{

4.// 注意,这里设置了该方法构建的RDD所使用的分区器

5.// 根据Range而非Hash进行分区,对应的Range信息需要计算并将结果

6.// 反馈到Driver端,因此对应调用RDD中的Action,即会触发一个Job的执行

7.val part = new RangePartitioner(numPartitions, self, ascending)

8.// 在构建RDD实例之后,设置Key的排序算法,即Ordering实例

9.new ShuffledRDD[K, V, V](self, part)

10..setKeyOrdering(if (ascending) ordering else ordering.reverse)

11.}

当需要对分区内部的数据进行排序时,构建RDD的同时会设置Key值的排序算法,结合前面的read方法中的第52行代码,当指定Key值的排序算法时,就会使用外部排序器对分区内的数据进行排序。

收藏 推荐 打印 | 阅读: