Spark Join 处理流程分析

为了更好的分析 Spark Join 处理流程,我们选择具有 Shuffle 操作的示例来进行说明,这比没有 Shuffle 操作的处理流程要复杂一些。本文主要通过实现一个 Join 操作的 Spark 程序,提交运行该程序,并通过 Spark UI 上的各种运行信息来讨论 Spark Join 处理流程。

Spark Join 示例程序

我们先给出一个简单的 Spark Application 程序代码,这里处理的数据使用了 MovieLens 数据集。其中,小表 movies(约 1.4m)、大表 genome-scores(约 323.5m),对这两个表进行 Join 操作。具体的实现代码,如下所示:

  def main(args: Array[String]): Unit = {
    val sc = new SparkContext()
    // movieId,title,genres
    val movieRdd = sc.textFile("/data/ml-20m/movies.csv")
          .filter(line => !line.startsWith("movieId"))
          .map(line =>  line.split(","))
          .map(a => (a(0), a(1)))
    // movieId,tagId,relevance
    val scoreRdd = sc.textFile("/data/ml-20m/genome-scores.csv")
          .filter(line => !line.startsWith("movieId"))
          .map(line =>  line.split(","))
          .map(a => (a(0), (a(1), a(2))))
    // movieId  title  tagId  relevance
    val finalRdd = movieRdd.join(scoreRdd)
          .map(r => Seq(r._1, r._2._1, r._2._2._1, r._2._2._2)
          .mkString("\t"))
    finalRdd.toDebugString
    finalRdd.saveAsTextFile("/temp/join")
  }

上面代码,我们直接将两个表进行 Join 操作(实际更优的做法是:将小表进行 Broadcast 后,再进行连接操作,能够避免昂贵低效的 Shuffle 操作)。我们这么做,主要是为了使程序处理过程中能够进行 Shuffle 操作,从而更深入地理解 Join 操作的内部处理流程。
我们通过如下命令,查看一下输入数据表 genome-scores 表在 HDFS 上存储的 Block 情况,执行如下命令:

hdfs fsck /data/ml-20m/movies.csv -files -racks -locations -blocks
hdfs fsck /data/ml-20m/genome-scores.csv -files -racks -locations -blocks

可以看到输入数据集的存储情况,如下所示:

/data/ml-20m/movies.csv 1397542 bytes, 1 block(s):  OK
0. BP-893796349-172.16.117.62-1504161181581:blk_1074121675_380924 len=1397542 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.63:50010, /default/172.16.117.65:50010]

/data/ml-20m/genome-scores.csv 323544381 bytes, 3 block(s):  OK
0. BP-893796349-172.16.117.62-1504161181581:blk_1074121670_380919 len=134217728 Live_repl=3 [/default/172.16.117.63:50010, /default/172.16.117.65:50010, /default/172.16.117.64:50010]
1. BP-893796349-172.16.117.62-1504161181581:blk_1074121671_380920 len=134217728 Live_repl=3 [/default/172.16.117.65:50010, /default/172.16.117.64:50010, /default/172.16.117.63:50010]
2. BP-893796349-172.16.117.62-1504161181581:blk_1074121672_380921 len=55108925 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.65:50010, /default/172.16.117.63:50010]

通过 RDD 的 toDebugString() 方法,打印调试信息:

res1: String = 
(3) MapPartitionsRDD[40] at map at <console>:38 []
 |  MapPartitionsRDD[39] at join at <console>:38 []
 |  MapPartitionsRDD[38] at join at <console>:38 []
 |  CoGroupedRDD[37] at join at <console>:38 []
 +-(2) MapPartitionsRDD[31] at map at <console>:34 []
 |  |  MapPartitionsRDD[30] at map at <console>:34 []
 |  |  MapPartitionsRDD[29] at filter at <console>:34 []
 |  |  /data/ml-20m/movies.csv MapPartitionsRDD[28] at textFile at <console>:34 []
 |  |  /data/ml-20m/movies.csv HadoopRDD[14] at textFile at <console>:34 [] 
 +-(3) MapPartitionsRDD[27] at map at <console>:36 [] 
 |  |  MapPartitionsRDD[26] at map at <console>:36 []
 |  |  MapPartitionsRDD[25] at filter at <console>:36 []
 |  |  /data/ml-20m/genome-scores.csv MapPartitionsRDD[24] at textFile at <console>:36 []
 |  |  /data/ml-20m/genome-scores.csv HadoopRDD[13] at textFile at <console>:36 [] 

上面信息可以非常清晰地看到,我们的 Spark 程序都创建了哪些 RDD,以及这些 RDD 之间的大致顺序关系。

分析 Spark Join 处理流程

Spark 程序运行过程中,我们可以通过 Web UI 看到对应的 DAG 生成的 Stage 的情况,一共生成了 3 个 Stage。为了更加清晰地了解各个 Stage(对应的各组 Task)都被调度到了哪个 Executor 上运行,我们通过 Spark UI 看一下对于该 Spark 程序创建 Executor 的具体情况,如下图所示:
Executors
通过上图可见,为了运行我们提交的 Spark 程序,在 Spark 集群中的 3 个节点上,总计启动了 3 个 Executor 和 1 个 Driver。其中,图中 ID 为 3 和 driver 的 Executor 是在同一个物理节点上。我们知道,在同一个 Spark 程序运行过程中,可能会复用已经启动的 Executor,这里的 3 个 Executor 在程序运行过程中都会被复用,可以在后面各个 Stage 运行过程中看到。
下面,分别对各个 Stage 的运行情况进行分析:

  • Stage 0 运行过程

对数据表 /data/ml-20m/movies.csv,进行 filter->map->map 操作,对应于 Stage 0,如下图所示:
Stage-0
经过前面我们通过 toDebugString() 打印出 DAG 关系图,可以知道 /data/ml-20m/movies.csv 具有两个 Partition,所以对应的 Stage 0 包含 2 个可调度运行的 ShuffleMapTask。通过 Spark Web UI 可以看到,如下图所示:
Stage-0.Tasks
上图中,Locality Level 为 NODE_LOCAL,在 2 个节点上各启动了 1 个 Executor,Stage 0 包含 2 个 ShuffleMapTask。

  • Stage 1 运行过程

同样,对数据表 /data/ml-20m/genome-scores.csv 也进行 filter->map->map 操作,对应于 Stage 1,如下图所示:
Stage-1
/data/ml-20m/genome-scores.csv 具有 3 个 Partition,所以同样对于 Stage 1 具有 3 个可调度运行的 ShuffleMapTask,如下图所示:
Stage-1.Tasks
可见,在 3 个节点上各启动了 1 个 Executor,用来运行 Stage 1 的 ShuffleMapTask。其中,Locality Level 也为 NODE_LOCAL。

  • Stage 2 运行过程

该 Stage 对应的 DAG 图表达,如下图所示:
Stage-2
可见,Join 操作实际包含了一组操作的集合:cogroup、map、map。这里,我们通过 Spark 源码来看一下,调用 join 操作,其内部具体都做了哪些事情,我们实现的 Spark 程序中调用了带有一个 RDD 参数的 join 方法,代码如下所示:

  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
    join(other, defaultPartitioner(self, other))
  }

继续调用了另一个多了 Partitioner 参数的 join 方法,代码如下所示:

  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }

上面代码中,调用了最核心的 cogroup 方法,如下所示:

  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
    cg.mapValues { case Array(vs, w1s) =>
      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
    }
  }

这里面创建了 CoGroupedRDD,所以可以直接分析 CoGroupedRDD 的实现。根据前面我们实现的 Spark 程序,应该存在 Shuffle 过程,所以 CoGroupedRDD 与 Stage 0 和S tage 1 中的两个 RDD 之间,必然应该存在 ShuffledDependency 依赖关系,通过 CoGroupedRDD 类的代码验证,如下所示:

  override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_] =>
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
      }
    }
  }

在我们示例程序实际运行过程中,执行了创建 ShuffleDependency 的分支,在这里一共生成了 2 个 ShuffleDependency。现在,我们需要知道 CoGroupedRDD 是如何计算的,也就是需要知道在调用 join 操作过程中如何进行 Shuffle 操作的,查看 CoGroupedRDD 的 compute 方法,基于依赖的 ShuffleDependency 进行计算,计算过程代码如下所示:

    val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
    for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
      case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
        val dependencyPartition = split.narrowDeps(depNum).get.split
        // Read them from the parent
        val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
        rddIterators += ((it, depNum))

      case shuffleDependency: ShuffleDependency[_, _, _] =>
        // Read map outputs of shuffle
        val it = SparkEnv.get.shuffleManager
          .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
          .read()
        rddIterators += ((it, depNum))
    }

上述的 compute 方法实现了对 CoGroupedRDD 的某一个 Partition 的计算处理,实际会执行 ShuffleDependency 这个 case 分支,它会通过 ShuffleManager 来读取依赖的上游 2 个 RDD 的计算结果,从而得到 CoGroupedRDD 的某个 Partition 依赖于上游 RDD 的计算结果,通过 (Iterator[Product2[K, Any]], Int) 对的方式表达,表示计算 CoGroupedRDD 需要上游 RDD 结果迭代器(it)和对应的依赖编号(depNum)。
Stage 2 可调度运行的是一组 ResultTask,每个 ResultTask 处理最终结果的一个 Partition,该 Partition 执行上述代码后,已经将 Stage 1 运行过程的计算结果(map 输出)读取到对应的 Reduce 端,也就是说属于该 Partition 的所有 map 输出结果都已经读取过来了。而且,我们应该知道,具有相同的 key 的记录,一定会在 Shuffle 过程中读取到同一个 Executor 中(同一个 ResultTask 中),可以想到下面需要进行实际的 Join 操作流程了,具体代码如下所示:

    val map = createExternalMap(numRdds)
    for ((it, depNum) <- rddIterators) {
      map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
    }
    context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
    new InterruptibleIterator(context,
      map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])

首先创建一个 ExternalAppendOnlyMap 存储结构,然后遍历上面得到的 rddIterators,将该 Partition 依赖的 RDD 结果记录 insert 到 ExternalAppendOnlyMap 中。ExternalAppendOnlyMap 能够对具有相同 key 的一组记录执行 merge 操作,最终得到一个按相同 key 进行连接操作的结果。上面代码,通过创建的 InterruptibleIterator 就可以迭代出 Join 后的结果。当然,这里 Join 完的结果还不是最终我们需要的结果,通过在 RDD 的 join 和 cogroup 方法中还需要对这里得到的结果进行后续处理,才得到我们 Spark 程序最终需要的 Join 结果。
下面,看一下 Stage 2 对应的 3 个 ResultTask 的运行情况,如下图所示:
Stage-2.Tasks
示例图中的 Task 还在运行中,运行过程中需要进行 Shuffle 读取操作。
Stage 2 对应 DAG 图中,最后一步是 saveAsTextFile,上面计算得到的结果就是 Join 操作的最终结果,结果由 Spark 程序运行过程中的各个 BlockManager 来管理,结果或者保存在内存中,或者存储在磁盘上,我们这个示例程序结果都存放在内存中,当调用 saveAsTextFile 方法时,会直接将 Join 结果写入到 HDFS 指定文件中。

总结

基于提供的 Spark 程序示例,我们从源码的角度,将对应的 RDD 及其作用于 RDD 之上的操作,以及对应的顺序关系,通过如下 RDD DAG 图来更加清晰地表现出来,如下图所示:
Spark-Join-DAG-RDDs
根据每个 RDD 对应的 Partition 情况,将上图细化到 Partition 级别,我们能够看出每个 RDD 的各个 Partition 之间是如何关联的,如下图所示:
Spark-Join-DAG-RddPartitions
在我们示例程序的 Join 过程中,一定存在 Shuffle 处理,最终处理流程可以在上图中各个 RDD 的各个 Partition 之间表现出来。扩展一点思考,假如没有 Join 过程没有发生 Shuffle 处理,那么在 cogroup 处理中就不会创建 ShuffleDependency,而是会创建对应的 OneToOneDependency。那么上图中,处理完成 Stage 0、Stage 1 后得到的 RDD 的各个 Partition,已经使用相同的 Partitioner 基于记录的 Key 对数据进行了 Partition 操作。所以,在生成 Stage 2 中的 RDD 的某个 Partition 时,与上游 Stage 0、Stage 1 中每个 RDD 中某个(而不是某些,不是与 RDD 中各个 Partition 都有关系)Partition 是一对一的关系,这样计算起来就简单多了。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>