为了更好的分析 Spark Join 处理流程,我们选择具有 Shuffle 操作的示例来进行说明,这比没有 Shuffle 操作的处理流程要复杂一些。本文主要通过实现一个 Join 操作的 Spark 程序,提交运行该程序,并通过 Spark UI 上的各种运行信息来讨论 Spark Join 处理流程。
Spark Join 示例程序
我们先给出一个简单的 Spark Application 程序代码,这里处理的数据使用了 MovieLens 数据集。其中,小表 movies(约 1.4m)、大表 genome-scores(约 323.5m),对这两个表进行 Join 操作。具体的实现代码,如下所示:
def main(args: Array[String]): Unit = { val sc = new SparkContext() // movieId,title,genres val movieRdd = sc.textFile("/data/ml-20m/movies.csv") .filter(line => !line.startsWith("movieId")) .map(line => line.split(",")) .map(a => (a(0), a(1))) // movieId,tagId,relevance val scoreRdd = sc.textFile("/data/ml-20m/genome-scores.csv") .filter(line => !line.startsWith("movieId")) .map(line => line.split(",")) .map(a => (a(0), (a(1), a(2)))) // movieId title tagId relevance val finalRdd = movieRdd.join(scoreRdd) .map(r => Seq(r._1, r._2._1, r._2._2._1, r._2._2._2) .mkString("\t")) finalRdd.toDebugString finalRdd.saveAsTextFile("/temp/join") }
上面代码,我们直接将两个表进行 Join 操作(实际更优的做法是:将小表进行 Broadcast 后,再进行连接操作,能够避免昂贵低效的 Shuffle 操作)。我们这么做,主要是为了使程序处理过程中能够进行 Shuffle 操作,从而更深入地理解 Join 操作的内部处理流程。
我们通过如下命令,查看一下输入数据表 genome-scores 表在 HDFS 上存储的 Block 情况,执行如下命令:
hdfs fsck /data/ml-20m/movies.csv -files -racks -locations -blocks hdfs fsck /data/ml-20m/genome-scores.csv -files -racks -locations -blocks
可以看到输入数据集的存储情况,如下所示:
/data/ml-20m/movies.csv 1397542 bytes, 1 block(s): OK 0. BP-893796349-172.16.117.62-1504161181581:blk_1074121675_380924 len=1397542 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.63:50010, /default/172.16.117.65:50010] /data/ml-20m/genome-scores.csv 323544381 bytes, 3 block(s): OK 0. BP-893796349-172.16.117.62-1504161181581:blk_1074121670_380919 len=134217728 Live_repl=3 [/default/172.16.117.63:50010, /default/172.16.117.65:50010, /default/172.16.117.64:50010] 1. BP-893796349-172.16.117.62-1504161181581:blk_1074121671_380920 len=134217728 Live_repl=3 [/default/172.16.117.65:50010, /default/172.16.117.64:50010, /default/172.16.117.63:50010] 2. BP-893796349-172.16.117.62-1504161181581:blk_1074121672_380921 len=55108925 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.65:50010, /default/172.16.117.63:50010]
通过 RDD 的 toDebugString() 方法,打印调试信息:
res1: String = (3) MapPartitionsRDD[40] at map at <console>:38 [] | MapPartitionsRDD[39] at join at <console>:38 [] | MapPartitionsRDD[38] at join at <console>:38 [] | CoGroupedRDD[37] at join at <console>:38 [] +-(2) MapPartitionsRDD[31] at map at <console>:34 [] | | MapPartitionsRDD[30] at map at <console>:34 [] | | MapPartitionsRDD[29] at filter at <console>:34 [] | | /data/ml-20m/movies.csv MapPartitionsRDD[28] at textFile at <console>:34 [] | | /data/ml-20m/movies.csv HadoopRDD[14] at textFile at <console>:34 [] +-(3) MapPartitionsRDD[27] at map at <console>:36 [] | | MapPartitionsRDD[26] at map at <console>:36 [] | | MapPartitionsRDD[25] at filter at <console>:36 [] | | /data/ml-20m/genome-scores.csv MapPartitionsRDD[24] at textFile at <console>:36 [] | | /data/ml-20m/genome-scores.csv HadoopRDD[13] at textFile at <console>:36 []
上面信息可以非常清晰地看到,我们的 Spark 程序都创建了哪些 RDD,以及这些 RDD 之间的大致顺序关系。
分析 Spark Join 处理流程
Spark 程序运行过程中,我们可以通过 Web UI 看到对应的 DAG 生成的 Stage 的情况,一共生成了 3 个 Stage。为了更加清晰地了解各个 Stage(对应的各组 Task)都被调度到了哪个 Executor 上运行,我们通过 Spark UI 看一下对于该 Spark 程序创建 Executor 的具体情况,如下图所示:
通过上图可见,为了运行我们提交的 Spark 程序,在 Spark 集群中的 3 个节点上,总计启动了 3 个 Executor 和 1 个 Driver。其中,图中 ID 为 3 和 driver 的 Executor 是在同一个物理节点上。我们知道,在同一个 Spark 程序运行过程中,可能会复用已经启动的 Executor,这里的 3 个 Executor 在程序运行过程中都会被复用,可以在后面各个 Stage 运行过程中看到。
下面,分别对各个 Stage 的运行情况进行分析:
- Stage 0 运行过程
对数据表 /data/ml-20m/movies.csv,进行 filter->map->map 操作,对应于 Stage 0,如下图所示:
经过前面我们通过 toDebugString() 打印出 DAG 关系图,可以知道 /data/ml-20m/movies.csv 具有两个 Partition,所以对应的 Stage 0 包含 2 个可调度运行的 ShuffleMapTask。通过 Spark Web UI 可以看到,如下图所示:
上图中,Locality Level 为 NODE_LOCAL,在 2 个节点上各启动了 1 个 Executor,Stage 0 包含 2 个 ShuffleMapTask。
- Stage 1 运行过程
同样,对数据表 /data/ml-20m/genome-scores.csv 也进行 filter->map->map 操作,对应于 Stage 1,如下图所示:
/data/ml-20m/genome-scores.csv 具有 3 个 Partition,所以同样对于 Stage 1 具有 3 个可调度运行的 ShuffleMapTask,如下图所示:
可见,在 3 个节点上各启动了 1 个 Executor,用来运行 Stage 1 的 ShuffleMapTask。其中,Locality Level 也为 NODE_LOCAL。
- Stage 2 运行过程
该 Stage 对应的 DAG 图表达,如下图所示:
可见,Join 操作实际包含了一组操作的集合:cogroup、map、map。这里,我们通过 Spark 源码来看一下,调用 join 操作,其内部具体都做了哪些事情,我们实现的 Spark 程序中调用了带有一个 RDD 参数的 join 方法,代码如下所示:
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope { join(other, defaultPartitioner(self, other)) }
继续调用了另一个多了 Partitioner 参数的 join 方法,代码如下所示:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }
上面代码中,调用了最核心的 cogroup 方法,如下所示:
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("HashPartitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } }
这里面创建了 CoGroupedRDD,所以可以直接分析 CoGroupedRDD 的实现。根据前面我们实现的 Spark 程序,应该存在 Shuffle 过程,所以 CoGroupedRDD 与 Stage 0 和S tage 1 中的两个 RDD 之间,必然应该存在 ShuffledDependency 依赖关系,通过 CoGroupedRDD 类的代码验证,如下所示:
override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner]( rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer) } } }
在我们示例程序实际运行过程中,执行了创建 ShuffleDependency 的分支,在这里一共生成了 2 个 ShuffleDependency。现在,我们需要知道 CoGroupedRDD 是如何计算的,也就是需要知道在调用 join 操作过程中如何进行 Shuffle 操作的,查看 CoGroupedRDD 的 compute 方法,基于依赖的 ShuffleDependency 进行计算,计算过程代码如下所示:
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- dependencies.zipWithIndex) dep match { case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked => val dependencyPartition = split.narrowDeps(depNum).get.split // Read them from the parent val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) rddIterators += ((it, depNum)) case shuffleDependency: ShuffleDependency[_, _, _] => // Read map outputs of shuffle val it = SparkEnv.get.shuffleManager .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) .read() rddIterators += ((it, depNum)) }
上述的 compute 方法实现了对 CoGroupedRDD 的某一个 Partition 的计算处理,实际会执行 ShuffleDependency 这个 case 分支,它会通过 ShuffleManager 来读取依赖的上游 2 个 RDD 的计算结果,从而得到 CoGroupedRDD 的某个 Partition 依赖于上游 RDD 的计算结果,通过 (Iterator[Product2[K, Any]], Int) 对的方式表达,表示计算 CoGroupedRDD 需要上游 RDD 结果迭代器(it)和对应的依赖编号(depNum)。
Stage 2 可调度运行的是一组 ResultTask,每个 ResultTask 处理最终结果的一个 Partition,该 Partition 执行上述代码后,已经将 Stage 1 运行过程的计算结果(map 输出)读取到对应的 Reduce 端,也就是说属于该 Partition 的所有 map 输出结果都已经读取过来了。而且,我们应该知道,具有相同的 key 的记录,一定会在 Shuffle 过程中读取到同一个 Executor 中(同一个 ResultTask 中),可以想到下面需要进行实际的 Join 操作流程了,具体代码如下所示:
val map = createExternalMap(numRdds) for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
首先创建一个 ExternalAppendOnlyMap 存储结构,然后遍历上面得到的 rddIterators,将该 Partition 依赖的 RDD 结果记录 insert 到 ExternalAppendOnlyMap 中。ExternalAppendOnlyMap 能够对具有相同 key 的一组记录执行 merge 操作,最终得到一个按相同 key 进行连接操作的结果。上面代码,通过创建的 InterruptibleIterator 就可以迭代出 Join 后的结果。当然,这里 Join 完的结果还不是最终我们需要的结果,通过在 RDD 的 join 和 cogroup 方法中还需要对这里得到的结果进行后续处理,才得到我们 Spark 程序最终需要的 Join 结果。
下面,看一下 Stage 2 对应的 3 个 ResultTask 的运行情况,如下图所示:
示例图中的 Task 还在运行中,运行过程中需要进行 Shuffle 读取操作。
Stage 2 对应 DAG 图中,最后一步是 saveAsTextFile,上面计算得到的结果就是 Join 操作的最终结果,结果由 Spark 程序运行过程中的各个 BlockManager 来管理,结果或者保存在内存中,或者存储在磁盘上,我们这个示例程序结果都存放在内存中,当调用 saveAsTextFile 方法时,会直接将 Join 结果写入到 HDFS 指定文件中。
总结
基于提供的 Spark 程序示例,我们从源码的角度,将对应的 RDD 及其作用于 RDD 之上的操作,以及对应的顺序关系,通过如下 RDD DAG 图来更加清晰地表现出来,如下图所示:
根据每个 RDD 对应的 Partition 情况,将上图细化到 Partition 级别,我们能够看出每个 RDD 的各个 Partition 之间是如何关联的,如下图所示:
在我们示例程序的 Join 过程中,一定存在 Shuffle 处理,最终处理流程可以在上图中各个 RDD 的各个 Partition 之间表现出来。扩展一点思考,假如没有 Join 过程没有发生 Shuffle 处理,那么在 cogroup 处理中就不会创建 ShuffleDependency,而是会创建对应的 OneToOneDependency。那么上图中,处理完成 Stage 0、Stage 1 后得到的 RDD 的各个 Partition,已经使用相同的 Partitioner 基于记录的 Key 对数据进行了 Partition 操作。所以,在生成 Stage 2 中的 RDD 的某个 Partition 时,与上游 Stage 0、Stage 1 中每个 RDD 中某个(而不是某些,不是与 RDD 中各个 Partition 都有关系)Partition 是一对一的关系,这样计算起来就简单多了。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。