Spark Block 存储管理分析

Apache Spark 中,对 Block 的查询、存储管理,是通过唯一的 Block ID 来进行区分的。所以,了解 Block ID 的生成规则,能够帮助我们了解 Block 查询、存储过程中是如何定位 Block 以及如何处理互斥存储/读取同一个 Block 的。可以想到,同一个 Spark Application,以及多个运行的 Application 之间,对应的 Block 都具有唯一的 ID,通过代码可以看到,BlockID 包括:RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId 这 8 种 ID,可以详见如下代码定义:

@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
  override def name: String = "rdd_" + rddId + "_" + splitIndex
}

// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

@DeveloperApi
case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
}

@DeveloperApi
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
}

@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}

@DeveloperApi
case class TaskResultBlockId(taskId: Long) extends BlockId {
  override def name: String = "taskresult_" + taskId
}

@DeveloperApi
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
  override def name: String = "input-" + streamId + "-" + uniqueId
}

/** Id associated with temporary local data managed as blocks. Not serializable. */
private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
  override def name: String = "temp_local_" + id
}

/** Id associated with temporary shuffle data managed as blocks. Not serializable. */
private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
  override def name: String = "temp_shuffle_" + id
}

我们以 RDDBlockId 的生成规则为例,它是以前缀字符串 “rdd_” 为前缀、分配的全局 RDD ID、下 划线 “_”、Partition ID 这 4 部分拼接而成,因为 RDD ID 是唯一的,所以最终构造好的 RDDBlockId 对应的字符串就是唯一的。如果该 Block 存在,查询可以唯一定位到该 Block,存储也不会出现覆盖其他 RDDBlockId 的问题。
下面,我们通过分析 MemoryStore、DiskStore、BlockManager、BlockInfoManager 这 4 个最核心的与 Block 管理相关的实现类,来理解 Spark 对 Block 的管理。全文中,我们主要针对 RDDBlockId 对应的 Block 数据的处理、存储、查询、读取,来分析 Block 的管理。

MemoryStore

先说明一下 MemoryStore,它主要用来在内存中存储 Block 数据,可以避免重复计算同一个 RDD的Partition 数据。一个 Block 对应着一个 RDD 的一个 Partition 的数据。当 StorageLevel 设置为如下值时,都会可能会需要使用 MemoryStore 来存储数据:

MEMORY_ONLY
MEMORY_ONLY_2
MEMORY_ONLY_SER
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP

所以,MemoryStore 提供对 Block 数据的存储、读取等操作 API,MemoryStore 也提供了多种存储方式,下面详细说明每种方式。

  • 以序列化格式保存 Block 数据
  def putBytes[T: ClassTag](
      blockId: BlockId,
      size: Long,
      memoryMode: MemoryMode,
      _bytes: () => ChunkedByteBuffer): Boolean

首先,通过 MemoryManager 来申请 Storage 内存(有关 MemoryManager 如何管理 Storage/Execution 内存分配,详见我的另一篇文章:Spark UnifiedMemoryManager 内存管理模型分析),调用 putBytes 方法,会根据 size 大小去申请 Storage 内存,如果申请成功,则会将 blockId 对应的 Block 数据保存在内部的 LinkedHashMap[BlockId, MemoryEntry[_]] 映射表中,然后以 SerializedMemoryEntry 这种序列化的格式存储,实际 SerializedMemoryEntry 就是简单指向 Buffer 中数据的引用对象:

private case class SerializedMemoryEntry[T](
    buffer: ChunkedByteBuffer,
    memoryMode: MemoryMode,
    classTag: ClassTag[T]) extends MemoryEntry[T] {
  def size: Long = buffer.size
}

如果无法申请到 size 大小的 Storage 内存,则存储失败,对于出现这种失败的情况,需要使用 MemoryStore 存储 API 的调用者去处理异常情况。

  • 基于记录迭代器,以反序列化 Java 对象形式保存 Block 数据
  private[storage] def putIteratorAsValues[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]

这种方式,调用者希望将 Block 数据记录以反序列化的方式保存在内存中,如果内存中能放得下,则返回最终 Block 数据记录的大小,否则返回一个 PartiallyUnrolledIterator[T] 迭代器,其中对应如下 2 种情况:
第一种,Block 数据记录能够完全放到内存中:和前面的方式类似,能够全部放到内存,但是不同的是,这种方式对应的数据格式是反序列化的 Java 对象格式,对应实现类 DeserializedMemoryEntry[T],它也会被直接存放到 MemoryStore 内部的 LinkedHashMap[BlockId, MemoryEntry[_]] 映射表中。DeserializedMemoryEntry[T] 类定义如下所示:

private case class DeserializedMemoryEntry[T](
    value: Array[T],
    size: Long,
    classTag: ClassTag[T]) extends MemoryEntry[T] {
  val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}

它与 SerializedMemoryEntry 都是 MemoryEntry[T] 的子类,所有被放到同一个映射表 LinkedHashMap[BlockId, MemoryEntry[_]] entries 中。
另外,也存在这种可能,通过 MemoryManager 申请的 Unroll 内存大小大于该 Block 打开需要的内存,则会返回如下结果对象:

        Left(new PartiallyUnrolledIterator( this,  unrollMemoryUsedByThisBlock,
          unrolled = arrayValues.toIterator,  rest = Iterator.empty))

上面 unrolled = arrayValues.toIterator,rest = Iterator.empty,表示在内存中可以打开迭代器中全部的数据记录,打开对象类型为DeserializedMemoryEntry[T]。
第二种,Block 数据记录只能部分放到内存中:也就是说 Driver 或 Executor 上的内存有限,只可以放得下部分记录,另一部分记录内存中放不下。values 记录迭代器对应的全部记录数据无法完全放在内存中,所以为了保证不发生 OOM 异常,首选会调用 MemoryManager的acquireUnrollMemory方 法去申请 Unroll 内存,如果可以申请到,在迭代 values 的过程中,需要累加计算打开(Unroll)的记录对象大小之和,使其大小不能大于申请到的 Unroll 内存,直到还有一部分记录无法放到申请的 Unroll 内存中。 最后,返回的结果对象如下所示:

      Left(new PartiallyUnrolledIterator(
        this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values))

上面的 PartiallyUnrolledIterator 中 rest 对应的 values 就是 putIteratorAsValues 方法传进来的迭代器参数值,该迭代器已经迭代出部分记录,放到了内存中,调用者可以继续迭代该迭代器去处理未打开(Unroll)的记录,而 unrolled 对应一个打开记录的迭代器。这里,PartiallyUnrolledIterator 迭代器包装了 vector.iterator 和一个迭代出部分记录的 values 迭代器,调用者对 PartiallyUnrolledIterator 进行统一迭代能够获取到全部记录,里面包含两种类型的记录:DeserializedMemoryEntry[T] 和 T。

  • 基于记录迭代器,以序列化二进制格式保存 Block 数据
  private[storage] def putIteratorAsBytes[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T],
      memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]

这种方式,调用这种希望将 Block 数据记录以二进制的格式保存在内存中。如果内存中能放得下,则返回最终的大小,否则返回一个 PartiallySerializedBlock[T] 迭代器。
如果 Block 数据记录能够完全放到内存中,则以 SerializedMemoryEntry[T] 格式放到内存的映射表中。如果 Block 数据记录只能部分放到内存中,则返回如下对象:

      Left(
        new PartiallySerializedBlock(
          this,
          serializerManager,
          blockId,
          serializationStream,
          redirectableStream,
          unrollMemoryUsedByThisBlock,
          memoryMode,
          bbos.toChunkedByteBuffer,
          values,
          classTag))

类似地,返回结果对象对调用者保持统一的迭代API视图。

DiskStore

DiskStore 提供了将 Block 数据写入到磁盘的基本操作,它是通过 DiskBlockManager 来管理逻辑上 Block 到物理磁盘上 Block 文件路径的映射关系。当 StorageLevel 设置为如下值时,都可能会需要使用 DiskStore 来存储数据:

DISK_ONLY
DISK_ONLY_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP

DiskBlockManager 管理了每个 Block 数据存储位置的信息,包括从 Block ID 到磁盘上文件的映射关系。DiskBlockManager 主要有如下几个功能:

  • 负责创建一个本地节点上的指定磁盘目录,用来存储 Block 数据到指定文件中
  • 如果 Block 数据想要落盘,需要通过调用 getFile 方法来分配一个唯一的文件路径
  • 如果想要查询一个 Block 是否在磁盘上,通过调用 containsBlock 方法来查询
  • 查询当前节点上管理的全部 Block 文件
  • 通过调用 createTempLocalBlock 方法,生成一个唯一 Block ID,并创建一个唯一的临时文件,用来存储中间结果数据
  • 通过调用 createTempShuffleBlock 方法,生成一个唯一 Block ID,并创建一个唯一的临时文件,用来存储 Shuffle 过程的中间结果数据

DiskStore 提供的基本操作接口,与 MemoryStore 类似,比较简单,如下所示:

  • 通过文件流写 Block 数据

该种方式对应的接口方法,如下所示:

  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit

参数指定 Block ID,还有一个写 Block 数据到打开的文件流的函数,在调用 put 方法时,首先会从 DiskBlockManager 分配一个 Block ID 对应的磁盘文件路径,然后将数据写入到该文件中。

  • 将二进制 Block 数据写入文件

putBytes 方法实现了,将一个 Buffer 中的 Block 数据写入指定的 Block ID 对应的文件中,方法定义如下所示:

  def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit

实际上,它是调用上面的 put 方法,将 bytes 中的 Block 二进制数据写入到 Block 文件中。

  • 从磁盘文件读取 Block 数据

对应方法如下所示:

  def getBytes(blockId: BlockId): ChunkedByteBuffer

通过给定的 blockId,获取磁盘上对应的 Block 文件的数据,以 ChunkedByteBuffer 的形式返回。

  • 删除 Block 文件

对应的删除方法定义,如下所示:

  def remove(blockId: BlockId): Boolean

通过 DiskBlockManager 查找到 blockId 对应的 Block 文件,然后删除掉。

BlockManager

谈到 Spark 中的 Block 数据存储,我们很容易能够想到 BlockManager,他负责管理在每个 Dirver 和 Executor 上的 Block 数据,可能是本地或者远程的。具体操作包括查询 Block、将 Block 保存在指定的存储中,如内存、磁盘、堆外(Off-heap)。而 BlockManager 依赖的后端,对 Block 数据进行内存、磁盘存储访问,都是基于前面讲到的 MemoryStore、DiskStore。
在 Spark 集群中,当提交一个 Application 执行时,该 Application 对应的 Driver 以及所有的 Executor 上,都存在一个 BlockManager、BlockManagerMaster,而 BlockManagerMaster 是负责管理各个 BlockManager 之间通信,这个 BlockManager 管理集群,如下图所示:
blockmanager-communications
关于一个 Application 运行过程中 Block 的管理,主要是基于该 Application 所关联的一个 Driver 和多个 Executor 构建了一个 Block 管理集群:Driver 上的 (BlockManagerMaster, BlockManagerMasterEndpoint) 是集群的 Master 角色,所有 Executor上的 (BlockManagerMaster, RpcEndpointRef) 作为集群的 Slave 角色。当 Executor 上的 Task 运行时,会查询对应的 RDD 的某个 Partition 对应的 Block 数据是否处理过,这个过程中会触发多个 BlockManager 之间的通信交互。我们以 ShuffleMapTask 的运行为例,对应代码如下所示:

  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 处理RDD的Partition的数据
      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

一个 RDD 的 Partition 对应一个 ShuffleMapTask,一个 ShuffleMapTask 会在一个 Executor 上运行,它负责处理 RDD 的一个 Partition 对应的数据,基本处理流程,如下所示:

  1. 根据该 Partition 的数据,创建一个 RDDBlockId(由 RDD ID 和 Partition Index 组成),即得到一个稳定的 blockId(如果该 Partition 数据被处理过,则可能本地或者远程 Executor 存储了对应的 Block 数据)。
  2. 先从 BlockManager 获取该 blockId 对应的数据是否存在,如果本地存在(已经处理过),则直接返回 Block 结果(BlockResult);否则,查询远程的 Executor 是否已经处理过该 Block,处理过则直接通过网络传输到当前 Executor 本地,并根据 StorageLevel 设置,保存 Block 数据到本地 MemoryStore 或 DiskStore,同时通过 BlockManagerMaster 上报 Block 数据状态(通知 Driver 当前的 Block 状态,亦即,该 Block 数据存储在哪个 BlockManager 中)。
  3. 如果本地及远程 Executor 都没有处理过该 Partition 对应的 Block 数据,则调用 RDD 的 compute 方法进行计算处理,并将处理的 Block 数据,根据 StorageLevel 设置,存储到本地 MemoryStore 或 DiskStore。
  4. 根据 ShuffleManager 对应的 ShuffleWriter,将返回的该 Partition 的 Block 数据进行 Shuffle 写入操作

下面,我们基于上面逻辑,详细分析在这个处理过程中重要的交互逻辑:

  • 根据 RDD 获取一个 Partition 对应数据的记录迭代器

用户提交的 Spark Application 程序,会设置对应的 StorageLevel,所以设置与不设置对该处理逻辑有一定影响,具有两种情况,如下图所示:
RDD.iterator()
如果用户程序设置了 StorageLevel,可能该 Partition 的数据已经处理过,那么对应的处理结果 Block 数据可能已经存储。一般设置的 StorageLevel,或者将 Block 存储在内存中,或者存储在磁盘上,这里会尝试调用 getOrElseUpdate() 方法获取对应的 Block 数据,如果存在则直接返回 Block 对应的记录的迭代器实例,就不需要重新计算了,如果没有找到对应的已经处理过的 Block 数据,则调用 RDD 的 compute() 方法进行处理,处理结果根据 StorageLevel 设置,将Block数据存储在内存或磁盘上,缓存供后续 Task 重复使用。
如果用户程序没有设置 StorageLevel,那么 RDD 对应的该 Partition 的数据一定没有进行处理过,即使处理过,如果没有进行 Checkpointing,也需要重新计算(如果进行了 Checkpointing,可以直接从缓存中获取),直接调用 RDD 的 compute() 方法进行处理。

  • 从 BlockManager 查询获取 Block 数据

每个 Executor 上都有一个 BlockManager 实例,负责管理用户提交的该 Application 计算过程中产生的 Block。很有可能当前 Executor 上存储在 RDD 对应 Partition 的经过处理后得到的 Block 数据,也有可能当前 Executor 上没有,但是其他 Executor 上已经处理过并缓存了 Block 数据,所以对应着本地获取、远程获取两种可能,本地获取交互逻辑如下图所示:
BlockManager.getLocalValues
从本地获取,根据 StorageLevel 设置,如果是存储在内存中,从本地的 MemoryStore 中查询,存在则读取并返回;如果是存储在磁盘上,则从本地的 DiskStore 中查询,存在则读取并返回。本地不存在,则会从远程的 Executor 读取,对应的组件交互逻辑,如下图所示:
BlockManager.getRemoteValues
远程获取交互逻辑相对比较复杂:当前 Executor 上的 BlockManager 通过 BlockManagerMaster,向远程的 Driver 的 BlockManagerMasterEndpoint 查询对应 Block ID,有哪些 Executor 已经保存了该 Block 数据,Dirver 返回一个包含了该 Block 数据的 Location 列表,如果对应的 Location 信息与当前 ShuffleMapTask 执行所在 Executor 在同一台节点上,则会优先使用该 Location,因为同一节点上的多个 Executor 之间传输 Block 数据效率更高。
这里需要说明的是,如果对应的 Block 数据的 StorageLevel 设置为写磁盘,通过前面我们知道,DiskStore 是通过 DiskBlockManager 进行管理存储到磁盘上的 Block 数据文件的,在同一个节点上的多个 Executor 共享相同的磁盘文件路径,相同的 Block 数据文件也就会被同一个节点上的多个 Executor 所共享。而对应 MemoryStore,因为每个 Executor 对应独立的 JVM 实例,从而具有独立的 Storage/Execution 内存管理,所以使用 MemoryStore 不能共享同一个 Block 数据,但是同一个节点上的多个 Executor 之间的 MemoryStore 之间拷贝数据,比跨网络传输要高效的多。

BlockInfoManager

用户提交一个 Spark Application 程序,如果程序对应的 DAG 图相对复杂,其中很多 Task 计算的结果 Block 数据都有可能被重复使用,这种情况下如何去控制某个 Executor 上的 Task 线程去读写 Block 数据呢?其实,BlockInfoManager 就是用来控制 Block 数据读写操作,并且跟踪Task读写了哪些 Block 数据的映射关系,这样如果两个 Task 都想去处理同一个 RDD 的同一个 Partition 数据,如果没有锁来控制,很可能两个 Task 都会计算并写同一个 Block 数据,从而造成混乱。我们分析每种情况下,BlockInfoManager 是如何管理 Block 数据(同一个 RDD 的同一个 Partition)读写的:

  • 第一个 Task 请求写 Block 数据

这种情况下,没有其他 Task 写 Block 数据,第一个 Task 直接获取到写锁,并启动写 Block 数据到本地 MemoryStore 或 DiskStore。如果其他写 Block 数据的 Task 也请求写锁,则该 Task 会阻塞,等待第一个获取写锁的 Task 完成写 Block 数据,直到第一个 Task 写完成,并通知其他阻塞的 Task,然后其他 Task 需要再次获取到读锁来读取该 Block 数据。

  • 第一个 Task 正在写 Block 数据,其他 Task 请求读 Block 数据

这种情况,Block 数据没有完成写操作,其他读 Block 数据的 Task 只能阻塞,等待写 Block 的 Task 完成并通知读 Task 去读取 Block 数据。

  • Task 请求读取 Block 数据

如果该 Block 数据不存在,则直接返回空,表示当前 RDD 的该 Partition 并没有被处理过。如果当前 Block 数据存在,并且没有其他 Task 在写,表示已经完成了写 Block 数据操作,则该 Task 直接读取该 Block 数据。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(3): “Spark Block 存储管理分析

  1. 写的很好,想请教您一个问题,您在看源码的时候,是先把整体的架构整理出来在看细节,还是从细节中理出整体的架构,对这个问题一直很困惑

苗清进行回复 取消回复

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>