Spark UnifiedMemoryManager 内存管理模型分析

Spark 的内存使用,大体上可以分为两类:Execution 内存和 Storage 内存。在 Spark 1.5 版本之前,内存管理使用的是 StaticMemoryManager,该内存管理模型最大的特点就是,可以为 Execution 内存区与 Storage 内存区配置一个静态的 boundary,这种方式实现起来比较简单,但是存在一些问题:

  1. 没有一个合理的默认值能够适应不同计算场景下的 Workload
  2. 内存调优困难,需要对 Spark 内部原理非常熟悉才能做好
  3. 对不需要 Cache 的 Application 的计算场景,只能使用很少一部分内存

为了克服上述提到的问题,尽量提高 Spark 计算的通用性,降低内存调优难度,减少 OOM 导致的失败问题,从 Spark 1.6 版本开始,新增了 UnifiedMemoryManager(统一内存管理)内存管理模型的实现。UnifiedMemoryManager 依赖的一些组件类及其关系,如下类图所示:
UnifiedMemoryManager
从上图可以看出,最直接最核心的就是 StorageMemoryPool 和 ExecutionMemoryPool,它们实现了动态内存池(Memory Pool)的功能,能够动态调整 Storage 内存区与 Execution 内存区之间的 Soft boundary,使内存管理更加灵活。下面我们从内存布局和内存控制两个方面,来分析 UnifiedMemoryManager 内存管理模型。

内存布局

UnifiedMemoryManager 是 MemoryManager 的一种实现,是基于 StaticMemoryManager 的改进。这种模型也是将某个执行 Task 的 Executor JVM 内存划分为两类内存区域:

  • Storage内存区

Storage 内存,用来缓存 Task 数据、在 Spark 集群中传输(Propagation)内部数据。

  • Execution 内存区

Execution 内存,用于满足 Shuffle、Join、Sort、Aggregation 计算过程中对内存的需求。

这种新的内存管理模型,在 Storage 内存区与 Execution 内存区之间抽象出一个 Soft boundary,能够满足当某一个内存区中内存用量不足的时候,可以从另一个内存区中借用。我们可以理解为,上面 Storage 内存和 Execution 堆内存是受 Spark 管理的,而且每一个内存区是可以动态伸缩的。这样的好处是,当某一个内存区内存使用量达到初始分配值,如果不能够动态伸缩,不能在两类内存区之间进行动态调整(Borrow),或者如果某个 Task 计算的数据量很大超过限制,就会出现 OOM 异常导致 Task 执行失败。应该说,在一定程度上,UnifiedMemoryManager 内存管理模型降低了发生 OOM 的概率。
我们知道,在 Spark Application 提交以后,最终会在 Worker 上启动独立的 Executor JVM,Task 就运行在 Executor 里面。在一个 Executor JVM 内部,基于 UnifiedMemoryManager 这种内存管理模型,堆内存的布局如下图所示:
spark-unified-heap-memory-layout
上图中,systemMemory 是 Executor JVM 的全部堆内存,在全部堆内存基础上 reservedMemory 是预留内存,默认300M,则用于Spark计算使用堆内存大小默认是:

maxHeapMemory = (systemMemory - reservedMemory) * 0.6

受 Spark 管理的堆内存,使用去除预留内存后的、剩余内存的百分比,可以通过参数 spark.memory.fraction 来配置,默认值是 0.6。Executor JVM 堆内存,去除预留的 reservedMemory 内存,默认剩下堆内存的 60% 用于 execution 和 storage 这两类堆内存,默认情况下,Execution 和 Storage 内存区各占 50%,这个也可以通过参数 spark.memory.storageFraction 来配置,默认值是 0.5。比如,在所有参数使用默认值的情况下,我们的 Executor JVM 内存为指定为 2G,那么 Unified Memory大小为(1024 * 2 – 300) * 0.6 = 1048MB,其中,Execution 和 Storage 内存区大小分别为 1048 * 0.5 = 524MB。
另外,还有一个用来保证 Spark Application 能够计算的最小 Executor JVM 内存大小限制,即为 minSystemMemory = reservedMemory * 1.5 = 300 * 1.5 = 450MB,我们假设 Executor JVM 配置了这个默认最小限制值 450MB,则受 Spark 管理的堆内存大小为(450 – 300) * 0.6 = 90MB,其中 Execution 和 Storage 内存大小分别为 90 * 0.5 = 45MB,这种情况对一些小内存用量的 Spark 计算也能够很好的支持。
上面,我们详细说明了受 Spark 管理的堆内存(OnHeap Memory)的布局,UnifiedMemoryManager 也能够对非堆内存(OffHeap Memory)进行管理。Spark 堆内存和非堆内存的布局,如下图所示:
spark-unified-memory-layout
通过上图可以看到,非堆内存(OffHeap Memory)默认大小配置值为 0,表示不使用非堆内存,可以通过参数 spark.memory.offHeap.size 来设置非堆内存的大小。无论是对堆内存,还是对非堆内存,都分为 Execution 内存和 Storage 内存两部分,他们的分配大小比例通过参数 spark.memory.storageFraction 来控制,默认是 0.5。

内存控制

通过上面,我们了解了 UnifiedMemoryManager 这种内存管理模型的内存布局状况。接下来,我们看一下,通过UnifiedMemoryManager的API,如何对内存进行控制(分配/回收)。内存的控制,也对应于 Execution 内存与 Storage 内存,分别有一个 StorageMemoryPool 和 ExecutionMemoryPool,在实现类 UnifiedMemoryManager 中可以看到通过这两个 MemoryPool 实现来控制内存大小的伸缩(Increment/Decrement)。
获取当前堆上的最大可用 Storage 内存,如下 maxOnHeapStorageMemory() 方法所示:

  override def maxOnHeapStorageMemory: Long = synchronized {
    maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
  }

可以看到,maxHeapMemory 表示堆上可用的 Execution 内存与 Storage 内存总量之和,减去 Execution 内存中已经被占用的内存,剩余的都是堆上的最大可用 Storage 内存。
在 UnifiedMemoryManager 中,两类最核心的操作,就是申请/释放 Storage 内存、申请/释放 Execution 内存,分别说明如下:

  • 申请 Storage 内存

申请Storage内存的逻辑,实现代码如下所示:

  override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized { // 为 blockId 申请 numBytes 字节大小的内存
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, maxMemory) = memoryMode match { // 根据 memoryMode 值,返回对应的StorageMemoryPool与ExecutionMemoryPool
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        maxOnHeapStorageMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        maxOffHeapMemory)
    }
    if (numBytes > maxMemory) { // 如果申请的内存大于最大的 Storage 内存量(对应上面方法 maxOnHeapStorageMemory() 返回的内存大小),申请失败
      // Fail fast if the block simply won't fit
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxMemory bytes)")
      return false
    }
    if (numBytes > storagePool.memoryFree) { // 如果 Storage 内存块中没有足够可用内存给 blockId 使用,则计算当前 Storage 内存区缺少多少内存,然后从 Execution 内存区中借用
      // There is not enough free memory in the storage pool, so try to borrow free memory from
      // the execution pool.
      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
      executionPool.decrementPoolSize(memoryBorrowedFromExecution) // Execution内存区减掉借用内存量
      storagePool.incrementPoolSize(memoryBorrowedFromExecution) // Storage内存区增加借用内存量
    }
    storagePool.acquireMemory(blockId, numBytes) // 如果 Storage 内存区可以为 blockId 分配内存,直接成功分配;否则,如果从 Execution 内存区中借用的内存能够满足 blockId,则分配成功,不能满足则分配失败。
  }

如果 Storage 内存区可用内存满足申请大小,则直接成功分配内存;如果 Storage 内存区可用内存大于 0 且小于申请的内存大小,则需要从 Execution 内存区借用满足分配大小的内存,如果借用成功,则直接成功分配内存,否则分配失败;如果申请的内存超过了 Storage 内存区的最大内存量,则分配失败。
另外,UnifiedMemoryManager.acquireUnrollMemory() 方法提供了对 Unroll 内存的申请,Unroll 内存就是 Storage 内存:

  override def acquireUnrollMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    acquireStorageMemory(blockId, numBytes, memoryMode)
  }

Unroll 内存 ,被用来在 Storage 内存中 Unroll(展开)指定的 Block 数据。

  • 释放 Storage 内存

释放 Storage 内存比较简单,只需要更新 Storage 内存计量变量即可,如下所示:

  def releaseMemory(size: Long): Unit = lock.synchronized {
    if (size > _memoryUsed) {
      logWarning(s"Attempted to release $size bytes of storage " +
        s"memory when we only have ${_memoryUsed} bytes")
      _memoryUsed = 0
    } else {
      _memoryUsed -= size
    }
  }
  • 申请Execution内存

申请 Execution 内存,相对复杂一些,调用 acquireExecutionMemory() 方法可能会阻塞,直到 Execution 内存区有可用内存为止。UnifiedMemoryManager 的 acquireExecutionMemory() 方法实现如下所示:

      override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    ... ...
    executionPool.acquireMemory(
      numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
  }

上面代码,调用了 ExecutionMemoryPool 的 acquireMemory() 方法,该方法的参数需要 2 个函数(maybeGrowExecutionPool 函数用来控制如何增加 Execution 内存区对应 Pool 的大小,computeMaxExecutionPoolSize 函数用来获取当前 Execution 内存区对应 Pool 的大小)。ExecutionMemoryPool 的 acquireMemory() 方法签名,如下所示:

  private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {

在 UnifiedMemoryManager 内部,实现了如何动态增加 Execution 内存区对应 Pool 大小的函数,即为 maybeGrowExecutionPool 函数,代码如下所示:

    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
      if (extraMemoryNeeded > 0) {
        // There is not enough free memory in the execution pool, so try to reclaim memory from
        // storage. We can reclaim any free memory from the storage pool. If the storage pool
        // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
        // the memory that storage has borrowed from execution.
        val memoryReclaimableFromStorage = math.max( storagePool.memoryFree,  storagePool.poolSize - storageRegionSize)
        if (memoryReclaimableFromStorage > 0) { // 这里 memoryReclaimableFromStorage 大于 0,说明当前 Storage 内存区有可用内存,可以 Shrink 该 Pool 的内存,作为 Execution 内存区的可用内存使用
          // Only reclaim as much space as is necessary and available:
          val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) // 对 Storage 内存区进行 Shrink 操作,如果可用内存大于请求内存 extraMemoryNeeded,则直接将 Storage 内存区内存 Shrink 大小为 extraMemoryNeeded,否则 Shrink 大小为 Storage 内存区的全部可用内存大小
          storagePool.decrementPoolSize(spaceToReclaim) // Storage 内存区减掉借用内存量
          executionPool.incrementPoolSize(spaceToReclaim)  // Execution 内存区增加借用内存量
        }
      }
    }

需要说明的是,上面的 storagePool.poolSize 的大小可能大于 Storage 内存区初始最大内存大小,主要是通过借用 Execution 内存区的内存导致的。这里,storagePool.freeSpaceToShrinkPool() 方法会 Shrink 掉 Storage 内存区可用内存,我们可以看下StorageMemoryPool 中如何 Shrink Storage 内存,方法如下所示:

  def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory // Storage 内存区需要释放 remainingSpaceToFree 大小的内存
    if (remainingSpaceToFree > 0) { // 大于 0 表示当前 Storage 内存区已经无可用内存,需要通过清理 Storage 内存区的 block 来实现 Shrink 操作
      // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
      val spaceFreedByEviction =  memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)  // 通过清理 Storage 内存区的 block 释放的内存大小
      // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
      // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
      spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
    } else { // remainingSpaceToFree<=0 说明当前 Storage 内存区可用内存足够,不需要通过清理缓存的 Block 来释放内存
      spaceFreedByReleasingUnusedMemory
    }
  }

MemoryStore 如何 evictBlocksToFreeSpace,可以查阅 MemoryStore 类源码,这里暂时不做说明。
最后,我们说明 ExecutionMemoryPool.acquireMemory() 方法与 ExecutionMemoryPool.releaseMemory() 方法的实现。在说明方法实现逻辑之前,我们先说明一下 Execution 内存区内存分配的基本原则:
如果有 N 个活跃(Active)的 Task 在运行,ExecutionMemoryPool 需要保证每个 Task 在将中间结果数据 Spill 到磁盘之前,至少能够申请到当前 Execution 内存区对应的 Pool 中 1/2N 大小的内存量,至多是 1/N 大小的内存。这里 N 是动态变化的,因为可能有新的 Task 被启动,也有可能 Task 运行完成释放资源,所以 ExecutionMemoryPool 会持续跟踪 ExecutionMemoryPool 内部 Task 集合 memoryForTask 的变化,并不断地重新计算分配给每个 Task 的这两个内存量的值:1/2N 和 1/N。
为了代码紧凑清晰,我把 ExecutionMemoryPool.acquireMemory() 方法源码中不必要的注释去掉了,代码如下所示:

  private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    if (!memoryForTask.contains(taskAttemptId)) { // ExecutionMemoryPool内部维护了一个 HashMap<TaskAttempID, 内存占用字节数>
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    while (true) {
      val numActiveTasks = memoryForTask.keys.size // 当前活跃的 Task 数量
      val curMem = memoryForTask(taskAttemptId) // 当前 Task 使用的内存量

      maybeGrowPool(numBytes - memoryFree) // 如果需要,通过 Shrink Storage 内存区对应的 Pool 内存来增加 Execution 内存区内存大小

      val maxPoolSize = computeMaxPoolSize() // 计算当前 Execution 内存区对应 Pool 的大小
      val maxMemoryPerTask = maxPoolSize / numActiveTasks // 计算 1/N:将当前 Execution 内存区对应 Pool 的大小,平均分配给所有活跃的 Task,得到每个 Task 能够获取到的最大内存大小
      val minMemoryPerTask = poolSize / (2 * numActiveTasks) // 计算 1/2N:每个 Task 能够获取到的最小内存大小

      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) // 允许当前 Task 获取到最大内存(范围:0 <= X <= 1 / numActiveTasks )
      val toGrant = math.min(maxToGrant, memoryFree) // 计算能分配给当前 Task 的内存大小

      // 如果当前 Task 无法获取到 1 / (2 * numActiveTasks) 的内存,并且能分配给当前 Task 的内存大小无法满足申请的内存量,则阻塞等待其他 Task 释放内存后在 lock 上通知
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait() // 在 ExecutionMemoryPool.releaseMemory() 方法中会通知其他申请内存并在 lock上wait 的 Task,内存已经释放
      } else {
        memoryForTask(taskAttemptId) += toGrant // 当前 Task 获取到内存,需要登记到 memoryForTask 表中
        return toGrant
      }
    }
    0L  // Never reached
  }
  • 释放 Execution 内存

相对应的,ExecutionMemoryPool.releaseMemory() 方法实现了对 Execution 内存的释放操作,方法实现代码如下所示:

  def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
    val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
    var memoryToFree = if (curMem < numBytes) { // 计算释放内存大小
      logWarning( s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
          s"of memory from the $poolName pool")
      curMem
    } else {
      numBytes
    }
    if (memoryForTask.contains(taskAttemptId)) { // Task 执行完成,从内部维护的 memoryForTask 中移除
      memoryForTask(taskAttemptId) -= memoryToFree
      if (memoryForTask(taskAttemptId) <= 0) {
        memoryForTask.remove(taskAttemptId)
      }
    }
    lock.notifyAll() // 通知调用 acquireMemory() 方法申请内存的 Task 内存已经释放
  }

总结

需要注意的,每个 Executor JVM 中只存在一个 UnifiedMemoryManager 实例,该对象统一控制该 JVM 内对 Storage 和 Execution 内存的申请和释放操作。
通过上面的分析,UnifiedMemoryManager 可以看做一个统一的内存管理控制器,底层通过 StorageMemoryPool 与 ExecutionMemoryPool 提供的申请内存、释放内存的功能,实现最基本的 bookkeeping 功能。再向底层,实际操作 Block 及其 Java 对象等数据的功能,都是在 MemoryStore 中进行的,MemoryStore 被用来在内存中存储数据,主要包括 block、反序列化的 Java 对象数组、序列化的 ByteBuffer,同时它提供了存取内存中各种格式数据的操作。关于 MemoryStore 的基本结构和原理,我们后续会单独分析。

参考资源

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(3): “Spark UnifiedMemoryManager 内存管理模型分析

    • 不好意思,请稍后再看吧。
      近期网站多次遭到攻击,对异常请求增加了防护与限制,可能会误判,稍后就会恢复的。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>