为了更好的分析Spark Join处理流程,我们选择具有Shuffle操作的示例来进行说明,这比没有Shuffle操作的处理流程要复杂一些。本文主要通过实现一个Join操作的Spark程序,提交运行该程序,并通过Spark UI上的各种运行信息来讨论Spark Join处理流程。
Spark Join示例程序
我们先给出一个简单的Spark Application程序代码,这里处理的数据使用了MovieLens数据集。其中,小表movies(约1.4m)、大表genome-scores(约323.5m),对这两个表进行Join操作。具体的实现代码,如下所示:
def main(args: Array[String]): Unit = { val sc = new SparkContext() // movieId,title,genres val movieRdd = sc.textFile("/data/ml-20m/movies.csv") .filter(line => !line.startsWith("movieId")) .map(line => line.split(",")) .map(a => (a(0), a(1))) // movieId,tagId,relevance val scoreRdd = sc.textFile("/data/ml-20m/genome-scores.csv") .filter(line => !line.startsWith("movieId")) .map(line => line.split(",")) .map(a => (a(0), (a(1), a(2)))) // movieId title tagId relevance val finalRdd = movieRdd.join(scoreRdd) .map(r => Seq(r._1, r._2._1, r._2._2._1, r._2._2._2) .mkString("\t")) finalRdd.toDebugString finalRdd.saveAsTextFile("/temp/join") }
上面代码,我们直接将两个表进行Join操作(实际更优的做法是:将小表进行Broadcast后,再进行连接操作,能够避免昂贵低效的Shuffle操作)。我们这么做,主要是为了使程序处理过程中能够进行Shuffle操作,从而更深入地理解Join操作的内部处理流程。
我们通过如下命令,查看一下输入数据表genome-scores表在HDFS上存储的Block情况,执行如下命令:
hdfs fsck /data/ml-20m/movies.csv -files -racks -locations -blocks hdfs fsck /data/ml-20m/genome-scores.csv -files -racks -locations -blocks
可以看到输入数据集的存储情况,如下所示:
/data/ml-20m/movies.csv 1397542 bytes, 1 block(s): OK 0. BP-893796349-172.16.117.62-1504161181581:blk_1074121675_380924 len=1397542 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.63:50010, /default/172.16.117.65:50010] /data/ml-20m/genome-scores.csv 323544381 bytes, 3 block(s): OK 0. BP-893796349-172.16.117.62-1504161181581:blk_1074121670_380919 len=134217728 Live_repl=3 [/default/172.16.117.63:50010, /default/172.16.117.65:50010, /default/172.16.117.64:50010] 1. BP-893796349-172.16.117.62-1504161181581:blk_1074121671_380920 len=134217728 Live_repl=3 [/default/172.16.117.65:50010, /default/172.16.117.64:50010, /default/172.16.117.63:50010] 2. BP-893796349-172.16.117.62-1504161181581:blk_1074121672_380921 len=55108925 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.65:50010, /default/172.16.117.63:50010]
通过RDD的toDebugString()方法,打印调试信息:
res1: String = (3) MapPartitionsRDD[40] at map at <console>:38 [] | MapPartitionsRDD[39] at join at <console>:38 [] | MapPartitionsRDD[38] at join at <console>:38 [] | CoGroupedRDD[37] at join at <console>:38 [] +-(2) MapPartitionsRDD[31] at map at <console>:34 [] | | MapPartitionsRDD[30] at map at <console>:34 [] | | MapPartitionsRDD[29] at filter at <console>:34 [] | | /data/ml-20m/movies.csv MapPartitionsRDD[28] at textFile at <console>:34 [] | | /data/ml-20m/movies.csv HadoopRDD[14] at textFile at <console>:34 [] +-(3) MapPartitionsRDD[27] at map at <console>:36 [] | | MapPartitionsRDD[26] at map at <console>:36 [] | | MapPartitionsRDD[25] at filter at <console>:36 [] | | /data/ml-20m/genome-scores.csv MapPartitionsRDD[24] at textFile at <console>:36 [] | | /data/ml-20m/genome-scores.csv HadoopRDD[13] at textFile at <console>:36 []
上面信息可以非常清晰地看到,我们的Spark程序都创建了哪些RDD,以及这些RDD之间的大致顺序关系。
分析Spark Join处理流程
Spark程序运行过程中,我们可以通过Web UI看到对应的DAG生成的Stage的情况,一共生成了3个Stage。为了更加清晰地了解各个Stage(对应的各组Task)都被调度到了哪个Executor上运行,我们通过Spark UI看一下对于该Spark程序创建Executor的具体情况,如下图所示:
通过上图可见,为了运行我们提交的Spark程序,在Spark集群中的3个节点上,总计启动了3个Executor和1个Driver。其中,图中ID为3和driver的Executor是在同一个物理节点上。我们知道,在同一个Spark程序运行过程中,可能会复用已经启动的Executor,这里的3个Executor在程序运行过程中都会被复用,可以在后面各个Stage运行过程中看到。
下面,分别对各个Stage的运行情况进行分析:
- Stage 0运行过程
对数据表/data/ml-20m/movies.csv,进行filter->map->map操作,对应于Stage 0,如下图所示:
经过前面我们通过toDebugString()打印出DAG关系图,可以知道/data/ml-20m/movies.csv具有两个Partition,所以对应的Stage 0包含2个可调度运行的ShuffleMapTask。通过Spark Web UI可以看到,如下图所示:
上图中,Locality Level为NODE_LOCAL,在2个节点上各启动了1个Executor,Stage 0包含2个ShuffleMapTask。
- Stage 1运行过程
同样,对数据表/data/ml-20m/genome-scores.csv也进行filter->map->map操作,对应于Stage 1,如下图所示:
/data/ml-20m/genome-scores.csv具有3个Partition,所以同样对于Stage 1具有3个可调度运行的ShuffleMapTask,如下图所示:
可见,在3个节点上各启动了1个Executor,用来运行Stage 1的ShuffleMapTask。其中,Locality Level也为NODE_LOCAL。
- Stage 2运行过程
该Stage对应的DAG图表达,如下图所示:
可见,Join操作实际包含了一组操作的集合:cogroup、map、map。这里,我们通过Spark源码来看一下,调用join操作,其内部具体都做了哪些事情,我们实现的Spark程序中调用了带有一个RDD参数的join方法,代码如下所示:
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope { join(other, defaultPartitioner(self, other)) }
继续调用了另一个多了Partitioner参数的join方法,代码如下所示:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }
上面代码中,调用了最核心的cogroup方法,如下所示:
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("HashPartitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } }
这里面创建了CoGroupedRDD,所以可以直接分析CoGroupedRDD的实现。根据前面我们实现的Spark程序,应该存在Shuffle过程,所以CoGroupedRDD与Stage 0和Stage 1中的两个RDD之间,必然应该存在ShuffledDependency依赖关系,通过CoGroupedRDD类的代码验证,如下所示:
override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner]( rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer) } } }
在我们示例程序实际运行过程中,执行了创建ShuffleDependency的分支,在这里一共生成了2个ShuffleDependency。现在,我们需要知道CoGroupedRDD是如何计算的,也就是需要知道在调用join操作过程中如何进行Shuffle操作的,查看CoGroupedRDD的compute方法,基于依赖的ShuffleDependency进行计算,计算过程代码如下所示:
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- dependencies.zipWithIndex) dep match { case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked => val dependencyPartition = split.narrowDeps(depNum).get.split // Read them from the parent val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) rddIterators += ((it, depNum)) case shuffleDependency: ShuffleDependency[_, _, _] => // Read map outputs of shuffle val it = SparkEnv.get.shuffleManager .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) .read() rddIterators += ((it, depNum)) }
上述的compute方法实现了对CoGroupedRDD的某一个Partition的计算处理,实际会执行ShuffleDependency这个case分支,它会通过ShuffleManager来读取依赖的上游2个RDD的计算结果,从而得到CoGroupedRDD的某个Partition依赖于上游RDD的计算结果,通过(Iterator[Product2[K, Any]], Int)对的方式表达,表示计算CoGroupedRDD需要上游RDD结果迭代器(it)和对应的依赖编号(depNum)。
Stage 2可调度运行的是一组ResultTask,每个ResultTask处理最终结果的一个Partition,该Partition执行上述代码后,已经将Stage 1运行过程的计算结果(map输出)读取到对应的Reduce端,也就是说属于该Partition的所有map输出结果都已经读取过来了。而且,我们应该知道,具有相同的key的记录,一定会在Shuffle过程中读取到同一个Executor中(同一个ResultTask中),可以想到下面需要进行实际的Join操作流程了,具体代码如下所示:
val map = createExternalMap(numRdds) for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
首先创建一个ExternalAppendOnlyMap存储结构,然后遍历上面得到的rddIterators,将该Partition依赖的RDD结果记录insert到ExternalAppendOnlyMap中。ExternalAppendOnlyMap能够对具有相同key的一组记录执行merge操作,最终得到一个按相同key进行连接操作的结果。上面代码,通过创建的InterruptibleIterator就可以迭代出Join后的结果。当然,这里Join完的结果还不是最终我们需要的结果,通过在RDD的join和cogroup方法中还需要对这里得到的结果进行后续处理,才得到我们Spark程序最终需要的Join结果。
下面,看一下Stage 2对应的3个ResultTask的运行情况,如下图所示:
示例图中的Task还在运行中,运行过程中需要进行Shuffle读取操作。
Stage 2对应DAG图中,最后一步是saveAsTextFile,上面计算得到的结果就是Join操作的最终结果,结果由Spark程序运行过程中的各个BlockManager来管理,结果或者保存在内存中,或者存储在磁盘上,我们这个示例程序结果都存放在内存中,当调用saveAsTextFile方法时,会直接将Join结果写入到HDFS指定文件中。
总结
基于提供的Spark程序示例,我们从源码的角度,将对应的RDD及其作用于RDD之上的操作,以及对应的顺序关系,通过如下RDD DAG图来更加清晰地表现出来,如下图所示:
根据每个RDD对应的Partition情况,将上图细化到Partition级别,我们能够看出每个RDD的各个Partition之间是如何关联的,如下图所示:
在我们示例程序的Join过程中,一定存在Shuffle处理,最终处理流程可以在上图中各个RDD的各个Partition之间表现出来。扩展一点思考,假如没有Join过程没有发生Shuffle处理,那么在cogroup处理中就不会创建ShuffleDependency,而是会创建对应的OneToOneDependency。那么上图中,处理完成Stage 0、Stage 1后得到的RDD的各个Partition,已经使用相同的Partitioner基于记录的Key对数据进行了Partition操作。所以,在生成Stage 2中的RDD的某个Partition时,与上游Stage 0、Stage 1中每个RDD中某个(而不是某些,不是与RDD中各个Partition都有关系)Partition是一对一的关系,这一计算起来就简单多了。

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。