Spark Shuffle过程分析:Reduce阶段处理流程

Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章 Spark Shuffle过程分析:Map阶段处理流程 了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个MapStatus对象,下面是ShuffleMapTask执行后返回结果的相关代码片段: var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e

Spark Shuffle过程分析:Map阶段处理流程

默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算。我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示: // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 如果需要修改ShuffleManager实现,则只需要修改配置项spark.shuffle.manager即可,默认支持sort和 tungsten-sort,可以指

Spark Block 存储管理分析

Apache Spark 中,对 Block 的查询、存储管理,是通过唯一的 Block ID 来进行区分的。所以,了解 Block ID 的生成规则,能够帮助我们了解 Block 查询、存储过程中是如何定位 Block 以及如何处理互斥存储/读取同一个 Block 的。可以想到,同一个 Spark Application,以及多个运行的 Application 之间,对应的 Block 都具有唯一的 ID,通过代码可以看到,BlockID 包括:RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId 这 8 种 ID,可以详见如下代码定义: @DeveloperApi case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { override def name: String = "rdd_" + rddId + "_" + splitIndex } // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi case class Sh

Spark UnifiedMemoryManager 内存管理模型分析

Spark 的内存使用,大体上可以分为两类:Execution 内存和 Storage 内存。在 Spark 1.5 版本之前,内存管理使用的是 StaticMemoryManager,该内存管理模型最大的特点就是,可以为 Execution 内存区与 Storage 内存区配置一个静态的 boundary,这种方式实现起来比较简单,但是存在一些问题: 没有一个合理的默认值能够适应不同计算场景下的 Workload 内存调优困难,需要对 Spark 内部原理非常熟悉才能做好 对不需要 Cache 的 Application 的计算场景,只能使用很少一部分内存 为了克服上述提到的问题,尽量提高 Spark 计算的通用性,降低内存调优难度,减少 OOM 导致的失败问题,从 Spark 1.6 版本开始,新增了 UnifiedMemoryManager(统一内存管理)内存管理模型的实现。UnifiedMemoryManager 依赖的一些组件类及其关系,如下类图所示: 从上图可以看出,最直接最核心的就是 StorageMemoryPool 和 ExecutionMemoryPool,它们实现了动态内存池(Memory Pool)的功能,能够动态调整 Storage 内存区与 Execution 内存区之间的 Soft boundary,使内存管理更加灵活。下

Spark Standalone架构设计要点分析

Apache Spark是一个开源的通用集群计算系统,它提供了High-level编程API,支持Scala、Java和Python三种编程语言。Spark内核使用Scala语言编写,通过基于Scala的函数式编程特性,在不同的计算层面进行抽象,代码设计非常优秀。 RDD抽象 RDD(Resilient Distributed Datasets),弹性分布式数据集,它是对分布式数据集的一种内存抽象,通过受限的共享内存方式来提供容错性,同时这种内存模型使得计算比传统的数据流模型要高效。RDD具有5个重要的特性,如下图所示: 上图展示了2个RDD进行JOIN操作,体现了RDD所具备的5个主要特性,如下所示: 一组分区 计算每一个数据分片的函数 RDD上的一组依赖 可选,对于键值对RDD,有一个Partitioner(通常是HashPartitioner) 可选,一组Preferred location信息(例如,HDFS文件的Block所在location信息) 有了上述特性,能够非常好地通过RDD来表达分布式数据集,并作为构建DAG图的基础:首先抽象一次分布式计算任务的逻辑表示,最终将任务在实际的物理计算环境中进行处理执行。 计算抽象 在描述Spark中的计算抽象,我们首先需

Spark RPC通信层设计原理分析

Spark将RPC通信层设计的非常巧妙,融合了各种设计/架构模式,将一个分布式集群系统的通信层细节完全屏蔽,这样在上层的计算框架的设计中能够获得很好的灵活性。同时,如果上层想要增加各种新的特性,或者对来自不同企业或组织的程序员贡献的特性,也能够很容易地增加进来,可以避开复杂的通信层而将注意力集中在上层计算框架的处理和优化上,入手难度非常小。另外,对上层计算框架中的各个核心组件的开发、功能增强,以及Bug修复等都会变得更加容易。 Spark RPC层设计概览 Spark RPC层是基于优秀的网络通信框架Netty设计开发的,同时获得了Netty所具有的网络通信的可靠性和高效性。我们先把Spark中与RPC相关的一些类的关系梳理一下,为了能够更直观地表达RPC的设计,我们先从类的设计来看,如下图所示: 通过上图,可以清晰地将RPC设计分离出来,能够对RPC层有一个整体的印象。了解Spark RPC层的几个核心的概念(我们通过Spark源码中对应的类名来标识),能够更好地理解设计: RpcEndpoint RpcEndpoint定义了RPC通信过程中的通信端对象,除了具有管理一个RpcEndp