Spark Join 处理流程分析

为了更好的分析 Spark Join 处理流程,我们选择具有 Shuffle 操作的示例来进行说明,这比没有 Shuffle 操作的处理流程要复杂一些。本文主要通过实现一个 Join 操作的 Spark 程序,提交运行该程序,并通过 Spark UI 上的各种运行信息来讨论 Spark Join 处理流程。 Spark Join 示例程序 我们先给出一个简单的 Spark Application 程序代码,这里处理的数据使用了 MovieLens 数据集。其中,小表 movies(约 1.4m)、大表 genome-scores(约 323.5m),对这两个表进行 Join 操作。具体的实现代码,如下所示: def main(args: Array[String]): Unit = { val sc = new SparkContext() // movieId,title,genres val movieRdd = sc.textFile("/data/ml-20m/movies.csv") .filter(line => !line.startsWith("movieId")) .map(line => line.split(",")) .map(a => (a(0), a(1))) // movieId,tagId,relevance val scoreRdd = sc.textFile("/data/ml-20m/genome-sc

Spark Shuffle过程分析:Reduce阶段处理流程

Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章 Spark Shuffle过程分析:Map阶段处理流程 了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个MapStatus对象,下面是ShuffleMapTask执行后返回结果的相关代码片段: var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e

基于YARN集群构建运行PySpark Application

Spark Application可以直接运行在YARN集群上,这种运行模式,会将资源的管理与协调统一交给YARN集群去处理,这样能够实现构建于YARN集群之上Application的多样性,比如可以运行MapReduc程序,可以运行HBase集群,也可以运行Storm集群,还可以运行使用Python开发机器学习应用程序,等等。 我们知道,Spark on YARN又分为client模式和cluster模式:在client模式下,Spark Application运行的Driver会在提交程序的节点上,而该节点可能是YARN集群内部节点,也可能不是,一般来说提交Spark Application的客户端节点不是YARN集群内部的节点,那么在客户端节点上可以根据自己的需要安装各种需要的软件和环境,以支撑Spark Application正常运行。在cluster模式下,Spark Application运行时的所有进程都在YARN集群的NodeManager节点上,而且具体在哪些NodeManager上运行是由YARN的调度策略所决定的。 对比这两种模式,最关键的是Spark Application运行时Driver所在的节点不同,而且,如果想要对Driver所在节点的运行环境进行配置,区别很大,但这对于PySpark Application运行

PB 级海量数据服务平台架构设计实践

基于 PB 级海量数据实现数据服务平台,需要从各个不同的角度去权衡,主要包括实践背景、技术选型、架构设计,我们基于这三个方面进行了架构实践,下面分别从这三个方面进行详细分析讨论: 实践背景 该数据服务平台架构设计之初,实践的背景可以从三个维度来进行说明:当前现状、业务需求、架构需求,分别如下所示: 当前现状 收集了当前已有数据、分工、团队的一些基本情况,如下所示: 数据收集和基础数据加工有专门的 Team 在做,我们是基于收集后并进行过初步加工的基础数据,结合不同行业针对特定数据的需求进行二次加工的。 数据二次加工,会集成基础数据之外的其它有业务属性的数据,比如引入第三方 POI 数据等。 原始数据每天增量大约 30~40TB 左右。 计算集群采用 Spark on YARN 部署模式,大约 400 个节点。 所有数据各种属性、行为信息,都是围绕大约 40亿+ 的移动设备 ID 进行很多倍膨胀,比如每天使用微信 App 的设备的行为信息。 参与该平台的研发人员,对实际数据业务需求了解不会非常深入,因为跨多个行业及其不同数据需求的变化较快。 业务需求 另

Spark Shuffle过程分析:Map阶段处理流程

默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算。我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示: // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 如果需要修改ShuffleManager实现,则只需要修改配置项spark.shuffle.manager即可,默认支持sort和 tungsten-sort,可以指

Spark Block 存储管理分析

Apache Spark 中,对 Block 的查询、存储管理,是通过唯一的 Block ID 来进行区分的。所以,了解 Block ID 的生成规则,能够帮助我们了解 Block 查询、存储过程中是如何定位 Block 以及如何处理互斥存储/读取同一个 Block 的。可以想到,同一个 Spark Application,以及多个运行的 Application 之间,对应的 Block 都具有唯一的 ID,通过代码可以看到,BlockID 包括:RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId 这 8 种 ID,可以详见如下代码定义: @DeveloperApi case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { override def name: String = "rdd_" + rddId + "_" + splitIndex } // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi case class Sh

Spark UnifiedMemoryManager 内存管理模型分析

Spark 的内存使用,大体上可以分为两类:Execution 内存和 Storage 内存。在 Spark 1.5 版本之前,内存管理使用的是 StaticMemoryManager,该内存管理模型最大的特点就是,可以为 Execution 内存区与 Storage 内存区配置一个静态的 boundary,这种方式实现起来比较简单,但是存在一些问题: 没有一个合理的默认值能够适应不同计算场景下的 Workload 内存调优困难,需要对 Spark 内部原理非常熟悉才能做好 对不需要 Cache 的 Application 的计算场景,只能使用很少一部分内存 为了克服上述提到的问题,尽量提高 Spark 计算的通用性,降低内存调优难度,减少 OOM 导致的失败问题,从 Spark 1.6 版本开始,新增了 UnifiedMemoryManager(统一内存管理)内存管理模型的实现。UnifiedMemoryManager 依赖的一些组件类及其关系,如下类图所示: 从上图可以看出,最直接最核心的就是 StorageMemoryPool 和 ExecutionMemoryPool,它们实现了动态内存池(Memory Pool)的功能,能够动态调整 Storage 内存区与 Execution 内存区之间的 Soft boundary,使内存管理更加灵活。下

Spark Standalone架构设计要点分析

Apache Spark是一个开源的通用集群计算系统,它提供了High-level编程API,支持Scala、Java和Python三种编程语言。Spark内核使用Scala语言编写,通过基于Scala的函数式编程特性,在不同的计算层面进行抽象,代码设计非常优秀。 RDD抽象 RDD(Resilient Distributed Datasets),弹性分布式数据集,它是对分布式数据集的一种内存抽象,通过受限的共享内存方式来提供容错性,同时这种内存模型使得计算比传统的数据流模型要高效。RDD具有5个重要的特性,如下图所示: 上图展示了2个RDD进行JOIN操作,体现了RDD所具备的5个主要特性,如下所示: 一组分区 计算每一个数据分片的函数 RDD上的一组依赖 可选,对于键值对RDD,有一个Partitioner(通常是HashPartitioner) 可选,一组Preferred location信息(例如,HDFS文件的Block所在location信息) 有了上述特性,能够非常好地通过RDD来表达分布式数据集,并作为构建DAG图的基础:首先抽象一次分布式计算任务的逻辑表示,最终将任务在实际的物理计算环境中进行处理执行。 计算抽象 在描述Spark中的计算抽象,我们首先需

Spark RPC通信层设计原理分析

Spark将RPC通信层设计的非常巧妙,融合了各种设计/架构模式,将一个分布式集群系统的通信层细节完全屏蔽,这样在上层的计算框架的设计中能够获得很好的灵活性。同时,如果上层想要增加各种新的特性,或者对来自不同企业或组织的程序员贡献的特性,也能够很容易地增加进来,可以避开复杂的通信层而将注意力集中在上层计算框架的处理和优化上,入手难度非常小。另外,对上层计算框架中的各个核心组件的开发、功能增强,以及Bug修复等都会变得更加容易。 Spark RPC层设计概览 Spark RPC层是基于优秀的网络通信框架Netty设计开发的,同时获得了Netty所具有的网络通信的可靠性和高效性。我们先把Spark中与RPC相关的一些类的关系梳理一下,为了能够更直观地表达RPC的设计,我们先从类的设计来看,如下图所示: 通过上图,可以清晰地将RPC设计分离出来,能够对RPC层有一个整体的印象。了解Spark RPC层的几个核心的概念(我们通过Spark源码中对应的类名来标识),能够更好地理解设计: RpcEndpoint RpcEndpoint定义了RPC通信过程中的通信端对象,除了具有管理一个RpcEndp

Spark-1.3.1与Hive整合实现查询分析

在大数据应用场景下,使用过Hive做查询统计分析的应该知道,计算的延迟性非常大,可能一个非常复杂的统计分析需求,需要运行1个小时以上,但是比之于使用MySQL之类关系数据库做分析,执行速度快很多很多。使用HiveQL写类似SQL的查询分析语句,最终经过Hive查询解析器,翻译成Hadoop平台上的MapReduce程序进行运行,这也是MapReduce计算引擎的特点带来的延迟问题:Map中间结果写文件。如果一个HiveQL语句非常复杂,会被翻译成多个MapReduce Job,那么就会有很多的Map输出中间结果数据到文件中,基本没有数据的共享。 如果使用Spark计算平台,基于Spark RDD数据集模型计算,可以减少计算过程中产生中间结果数据写文件的开销,Spark会把数据直接放到内存中供后续操作共享数据,减少了读写磁盘I/O操作带来的延时。另外,如果基于Spark on YARN部署模式,可以充分利用数据在Hadoop集群DataNode节点的本地性(Locality)特点,减少数据传输的通信开销。 软件准备 我把使用的相关软件的版本在这里列出来,以便测试验证,如下所示: CentOS-6.6 (Final) JDK-1.7.0_25 Maven

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。 我们的应用场景是分析用户使用手机App的行为,描述如下所示: 手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列 后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持 经过Spark Streaming实时计算程序分析,将结果写入Redis,可以实时获取用户的行为数据,并可以导出进行离线综合统计分析 Spark Streaming介绍 Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flu

Shark-0.9.0安装配置运行实践

Shark(Hive on Spark)是UC Lab为Spark设计并开源的一款数据仓库系统,提供了分布式SQL查询引擎,它能够完全兼容Hive。首先,我们通过下面的图,看一下Shark与Hive的关系(http://shark.cs.berkeley.edu/img/shark-hive-integration.png): 以前我们使用Hive分析HDFS中数据时,通过将HQL翻译成MapReduce作业(Job)在Hadoop集群上运行;而使用Shark可以像使用Hive一样容易,如HQL、Metastore、序列化格式、UDF等Shark都支持,不同的是Shark运行在Spark集群上执行计算,基于Spark系统所使用的RDD模型。官方文档给出的性能方面的数据是,使用Shark查询分析HDFS数据,能比Hive快30多倍,如图所示(http://shark.cs.berkeley.edu/img/perf.png): 下面,我们通过安装配置Shark来简单地体验一下。 准备软件包 jdk-7u25-linux-x64.tar.gz scala-2.10.3.tgz apache-maven-3.2.1-bin.tar.gz hadoop-1.2.1.tar.gz spark-0.9.0-incubating-bin-hadoop1.tgz hive-0.11-shark-0.9.0.tar.gz 环境变量配置 针对上述准备软件包,我们需要安装配置好JDK、Scala环境,保证Hado