Spark Shuffle过程分析:Map阶段处理流程

默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算。我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示: // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 如果需要修改

Spark Block存储管理分析

Apache Spark中,对Block的查询、存储管理,是通过唯一的Block ID来进行区分的。所以,了解Block ID的生成规则,能够帮助我们了解Block查询、存储过程中是如何定位Block以及如何处理互斥存储/读取同一个Block的。可以想到,同一个Spark Application,以及多个运行的Application之间,对应的Block都具有唯一的ID,通过代码可以看到,BlockID包括:RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId这8种ID,可以详见如下代码定义: @DeveloperApi case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { override def name: String = "rdd_" + rddId + "_" + splitIndex } // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffl

Spring Cloud Netflix构建微服务入门实践

在使用Spring Cloud Netflix构建微服务之前,我们先了解一下Spring Cloud集成的Netflix OSS的基础组件Eureka,对于Netflix的其他微服务组件,像Hystrix、Zuul、Ribbon等等本文暂不涉及,感兴趣可以参考官网文档。这里,我们用最基础的Eureka来构建一个最基础的微服务应用,来演示如何构建微服务,了解微服务的基本特点。 Eureka Eureka是Netflix开源的一个微服务注册组件,提供服务发现特性,它是一个基于REST的服务,主要具有如下功能: 支持服务注册和发现 具有Load Balance和Failover的功能 在进行服务调用过程中,无需知道目标服务的主机(IP)和端口,只要知道服务名就可以实现调用 通过Netfix在Github上的文档,我们看一下Eureka的基本架构,如下图所示: Eureka主要包含如下两个核心组件: Eureka Server Eureka Server是服务注册的服务端组件,负责管理Eureka Client注册的服务,提供服务发现的功能。它支持集群模式部署,集群部署模式中,多个

Spark UnifiedMemoryManager内存管理模型分析

Spark的内存使用,大体上可以分为两类:Execution内存和Storage内存。在Spark 1.5版本之前,内存管理使用的是StaticMemoryManager,该内存管理模型最大的特点就是,可以为Execution内存区与Storage内存区配置一个静态的boundary,这种方式实现起来比较简单,但是存在一些问题: 没有一个合理的默认值能够适应不同计算场景下的Workload 内存调优困难,需要对Spark内部原理非常熟悉才能做好 对不需要Cache的Application的计算场景,只能使用很少一部分内存 为了克服上述提到的问题,尽量提高Spark计算的通用性,降低内存调优难度,减少OOM导致的失败问题,从Spark 1.6版本开始,新增了UnifiedMemoryManager(统一内存管理)内存管理模型的实现。UnifiedMemoryManager依赖的一些组件类及其关系,如下类图所示: 从上图可以看出,最直接最核心的就是StorageMemoryPool 和ExecutionMemoryPool,它们实现了动态内存池(Memory Pool)的功能,能够动态调整Storag

Apache Beam:一个开源的统一的分布式数据处理编程库

Apache Beam是一个开源的数据处理编程库,由Google贡献给Apache的项目,前不久刚刚成为Apache TLP项目。它提供了一个高级的、统一的编程模型,允许我们通过构建Pipeline的方式实现批量、流数据处理,并且构建好的Pipeline能够运行在底层不同的执行引擎上。刚刚接触该开源项目时,我的第一感觉就是:在编程API的设计上,数据集及其操作的抽象有点类似Apache Crunch(MapReduce Pipeline编程库)项目;而在支持统一数据处理模型上,能够让人想到Apache Flink项目。如果深入了解Apache Beam,你会发现未来Apache Beam很可能成为数据处理领域唯一一个能够将不同的数据应用统一起来的编程库。 Apache Beam架构概览 Apache Beam目前最新版本为0.5.0-SNAPSHOT,最新的Release版本为0.4.0,很多特性还在开发中。在网上找到一个由Andrew Psaltis在2016年6月份演讲的《Apache Beam: The Case for Unifying Streaming API’s》,引用了其中一个Apache Beam的架构图

Spark Standalone架构设计要点分析

Apache Spark是一个开源的通用集群计算系统,它提供了High-level编程API,支持Scala、Java和Python三种编程语言。Spark内核使用Scala语言编写,通过基于Scala的函数式编程特性,在不同的计算层面进行抽象,代码设计非常优秀。 RDD抽象 RDD(Resilient Distributed Datasets),弹性分布式数据集,它是对分布式数据集的一种内存抽象,通过受限的共享内存方式来提供容错性,同时这种内存模型使得计算比传统的数据流模型要高效。RDD具有5个重要的特性,如下图所示: 上图展示了2个RDD进行JOIN操作,体现了RDD所具备的5个主要特性,如下所示: 一组分区 计算每一个数据分片的函数 RDD上的一组依赖 可选,对于键值对RDD,有一个Partitioner(通常是HashPartitioner) 可选,一组Preferred location信息(例如,HDFS文件的Block所在location信息) 有了上述特性,能够非常好地通过RDD来表达分布式数据集,并作为构建DAG图的基础:首先抽象一次分布式计算任务的逻

Spark RPC通信层设计原理分析

Spark将RPC通信层设计的非常巧妙,融合了各种设计/架构模式,将一个分布式集群系统的通信层细节完全屏蔽,这样在上层的计算框架的设计中能够获得很好的灵活性。同时,如果上层想要增加各种新的特性,或者对来自不同企业或组织的程序员贡献的特性,也能够很容易地增加进来,可以避开复杂的通信层而将注意力集中在上层计算框架的处理和优化上,入手难度非常小。另外,对上层计算框架中的各个核心组件的开发、功能增强,以及Bug修复等都会变得更加容易。 Spark RPC层设计概览 Spark RPC层是基于优秀的网络通信框架Netty设计开发的,同时获得了Netty所具有的网络通信的可靠性和高效性。我们先把Spark中与RPC相关的一些类的关系梳理一下,为了能够更直观地表达RPC的设计,我们先从类的设计来看,如下图所示: 通过上图,可以清晰地将RPC设计分离出来,能够对RPC层有一个整体的印象。了解Spark RPC层的几个核心的概念(我们通过Spark源码中对应的类名来标识),能

Apache Flink:特性、概念、组件栈、架构及原理分析

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的SLA是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapReduce、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。 Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink运行时(Flink Runti

Flume日志收集分层架构应用实践

Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点: 模块化设计:在其Flume Agent内部可以定义三种组件:Source、Channel、Sink 组合式设计:可以在Flume Agent中根据业务需要组合Source、Channel、Sink三种组件,构建相对复杂的日志流管道 插件式设计:可以通过配置文件来编排收集日志管道的流程,减少对Flume代码的侵入性 可扩展性:我们可以根据自己业务的需要来定制实现某些组件(Source、Channel、Sink) 支持集成各种主流系统和框架:像Hadoop、HBase、Hive、Kafka、ElasticSearch、Thrift、Avro等,都能够很好的和Flume集成 高级特性:Failover、Load balancing、Interceptor等 有关Flume的相关内容,可以参考官网文档,或者通过阅读我之前写的文章《Flume(NG)架构设计要点及配置实践》来快速了解。 为什么要对Flume日志收集系统进行分层设计 基于Fl

Apache Storm内部原理分析

本文算是个人对Storm应用和学习的一个总结,由于不太懂Clojure语言,所以无法更多地从源码分析,但是参考了官网、好多朋友的文章,以及《Storm Applied: Strategies for real-time event processing》这本书,以及结合自己使用Storm的经历,希望对于想深入一点了解Storm原理的朋友能有所帮助,有不足之处欢迎拍砖交流。 Storm集群架构 Storm集群采用主从架构方式,主节点是Nimbus,从节点是Supervisor,有关调度相关的信息存储到ZooKeeper集群中,架构如下图所示: 具体描述,如下所示: Nimbus Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。 Supervisor Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过

MapReduce V1:MapTask执行流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。 在文章《MapReduce V1:TaskTracker设计要点概要分析》中我们已经了解了org.apache.hadoop.mapred.Child启动的基本流程,在Child VM启动的过程中会运行MapTask,实际是运行用户编写的MapReduce程序中的map方法中的处理逻辑,我们首先看一下,在Child类中,Child基于TaskUmbilicalProtocol协议与TaskTracker通信,获取到该Child VM需要加载的Task相关数据,包括Task本身,代码如下所示: final TaskUmbilicalProtocol umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() { @Override public TaskUmbilicalProtocol run() throws Exception { // 建立Child到TaskTracker的RPC连接 return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,

基于协同过滤的推荐方法

协同过滤(Collaborative Filtering, CF)是推荐系统广泛使用的一种技术,它主要通过考虑用户(User)与用户之间、物品(Item)与物品之间的相似度(Similarity),来向用户推荐物品,常被用在电商网站中。其中,在推荐系统中最常使用的协同过滤方法,有如下4种: 基于用户的协同过滤推荐 基于物品的协同过滤推荐 基于模型的协同过滤推荐 混合协同过滤推荐 上面4种方法中,基于用户的协同过滤推荐、基于物品的协同过滤推荐都是基于内存的协同过滤推荐,一般在数据量较小的应用场景下,可以直接在线使用的实时推荐方法;基于模型的协同过滤推荐一般用于离线计算,它采用机器学习的方法,一般首相将用户偏好行为数据分成2个数据集(有时可能会将数据集分成k个子集,采用交叉验证的方式来提高模型精度),一个为训练集,一个为测试集,使用训练集数据来训练出推荐模型,然后使用测试集数据来评估模型的精度,当满足特定精度时,可以将得到的推荐模型应用于实