欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

源码解析Spark各个ShuffleWriter的实现机制(一)——ShuffleWriter的选择

时间:2023-06-26
概述 什么是Shuffle?

在讲到Spark Shuffle实现机制之前,需要了解下什么是Shuffle。Shuffle按字面意思,也就是洗牌,把牌视作数据,那么洗的过程也就是按照某种规则改变数据的次序,接着发牌员发牌给玩家,发牌的过程就对应着通过网络I/O分发数据。
假设在一场牌局中,有一位发牌员和两位玩家。一般情况下,发牌员在开始游戏之前,需要将牌随机打乱,再按序发给两位玩家,接着两位玩家开始处理牌。发牌员洗牌、发牌、玩家拿牌这三个过程中,洗牌和拿牌在Spark中分别对应Shuffle Write, Shuffle Read,文章关注的则是Shuffle Write。

Shuffle发生在哪些实例上?

在Spark中,driver是负责切分task并序列化task,协调资源,并调度派发到executor上,driver不负责处理数据,具体数据处理是由executor完成的,也就是说Shuffle是发生在executor上的。回到上边的牌局,这时候executor兼具发牌员和玩家两个职能,一般大家和朋友打扑克,总不能有人专门洗牌发牌但不玩吧~在Spark中稍复杂些,并不是一个executor在Shuffle Write,而是每个executor都在做,直到所有executor都完成了Shuffle Write,都通知driver已完成,才会进入到下一个Stage,进行Shuffle Read。

为什么需要Shuffle?

在数据量小时,一般一个单体进程即可完成加工处理,但面对海量数据处理,一台单体进程是难以胜任的。随着互联网发展,许多分布式计算框架被提出,这些框架总的来说,都是在多个分布式进程中处理不同的数据。在对数据处理过程中,时常需要:

group, join数据:比如根据相同key聚类,每个分布式数据处理进程处理后,将特定key的数据发往特定的分布式进程上进行聚类;数据倾斜时重分布:数据倾斜在少数分布式进程,导致其他进程空跑等待,既是浪费资源,也会影响整体处理效率,因此需要将数据发往其他分布式进程进行处理。

这两种场景就涉及到如何“洗牌”,将数据按某种规则分布到其他进程中。在Spark中有哪些操作会触发到Shuffle呢?

有哪些对RDD/DF的操作会触发到Shuffle呢?

主要是这四类:

.*ByKey: groupByKey, countByKey, reduceByKey等聚类算法.*By: distributeBy, clusterBy等聚类算法repartition: round robin重分布数据join: 可能触发,当需要连接的数据广播到各个executor时,就不会触发shuffle,直接在内存中进行join Spark中对Shuffle实现的演进历史

这部分我倒是没有细看,我开始接触时Spark就已经迭代到3.2的版本了,只是了解到在2.0之前,Shuffle的实现变化很多,主要是为了解决非功能问题,有兴趣可以了解一下,对解决日常非功能问题也有一定启发。在2.0及之后版本,Shuffle Write的实现就已经稳定了,只有以下三种:

UnsafeShuffleWriterBypassMergeSortShuffleWriterSortShuffleWriter

那么Spark是如何决定使用哪种实现的呢?

使用各个Shuffle Writer的条件 源码分析

在Spark driver构建RDD之间的血缘依赖时,便根据以下条件选择构建具体的Shuffle依赖:

// SortShuffleManager override def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { new BypassMergeSortShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { new SerializedShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { new baseShuffleHandle(shuffleId, dependency) } }

接着在getWriter中,根据上个方法确定的handle,选择对应的shuffle writer:

//override def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( handle.shuffleId, _ => new OpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) } val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf, metrics, shuffleExecutorComponents) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, bypassMergeSortHandle, mapId, env.conf, metrics, shuffleExecutorComponents) case other: baseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter( shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents) }}

由此得到Handle与Shuffle Writer的关系:

HandleWriterunsafeShuffleHandleUnsafeShuffleWriterbypassMergeSortHandleBypassMergeSortShuffleHandlebaseShuffleHandleSortShuffleWriter

需要进一步查看其中SortShuffleWriter#shouldBypassMergeSort, SortShuffleManager#canUseSerializedShuffle的实现,确认使用各个Handle的条件。

对于bypassMergeSortHandle 的使用条件有:

需要在父RDD没有开启mapSideCombine1分区数量 <= shuffle.sort.baypass.merge.threshold(默认200)的情况下才适用

// SortShuffleWriterdef shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { if (dep.mapSideCombine) { false } else { val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions <= bypassMergeThreshold }}

对于unsafeShuffleHandle的使用条件有:

父RDD使用的序列化器需要支持重排序序列化对象2父RDD没有开启mapSideCombine分区数量 <= 16777216(一般情况不会出现大于这个数量的分区,这是个magic number,不必关注为什么是这个值,只需要知道1600万+的分区数量本身就不合理)

// SortShuffleManagerdef canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + s"${dependency.serializer.getClass.getName}, does not support object relocation") false } else if (dependency.mapSideCombine) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " + s"map-side aggregation") false } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") false } else { log.debug(s"Can use serialized shuffle for shuffle $shufId") true }}

题外话,RDD支持的序列化器有两种:

序列化器支持重排序序列化对象UnsafeRowSerializertrueKryoSerializer取决于是否开启了auto-reset,默认开启,则true,用户显式定义关闭,则false。当关闭时,Kryo可能会存储重复对象的引用,而不是往序列化流中写入对象的序列化字节,这会破坏对象的重排序2小结

由此得到了Handle与Shuffle Writer的关系,在后续篇章中,将会讲解三种Shuffle Writer的实现,建议从最简单的BypassMergeSortShuffleWriter开始读起。


mapSideCombine: 即在map端对数据进行合并,减少shuffle的数据量,以及减少reducer端处理的数据量。 ↩︎

重排序序列化对象:对序列化对象排序的结果,与排序序列化前原对象的结果一致。基于shuffle数据时需要序列化数据对象的背景,这是一种避免排序时反序列化开销的技术。 ↩︎ ↩︎

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。