基于YARN集群构建运行PySpark Application

Spark Application可以直接运行在YARN集群上,这种运行模式,会将资源的管理与协调统一交给YARN集群去处理,这样能够实现构建于YARN集群之上Application的多样性,比如可以运行MapReduc程序,可以运行HBase集群,也可以运行Storm集群,还可以运行使用Python开发机器学习应用程序,等等。 我们知道,Spark on YARN又分为client模式和cluster模式:在client模式下,Spark Application运行的Driver会在提交程序的节点上,而该节点可能是YARN集群内部节点,也可能不是,一般来说提交Spark Application的客户端节点不是YARN集群内部的节点,那么在客户端节点上可以根据自己的需要安装各种需要的软件和环境,以支撑Spark Application正常运行。在cluster模式下,Spark Application运行时的所有进程都在YARN集群的NodeManager节点上,而且具体在哪些NodeManager上运行是由YARN的调度策略所决定的。 对比这两种模式,最关键的是Spark Application运行时Driver所在的节点不同,而且,如果想要对Driver所在节点的运行环境进行配置,区别很大,但这对于PySpark Application运行

CDH-5.7.0:基于Parcels方式离线安装配置

CDH是Cloudera公司提供的Hadoop发行版,它在原生开源的Apache Hadoop基础之上,针对特定版本的Hadoop以及Hadoop相关的软件,如Zookeeper、HBase、Flume、Sqoop等做了兼容性开发,我们在安装CDH发行版的Hadoop时就无需进行额外繁琐的兼容性测试。 以往安装配置使用Apache Hadoop时,完全需要手动在服务器上,通过命令和脚本进行安装配置,比较复杂而繁琐。使用CDH,我们可以通过Cloudera提供的CM(Cloudera Manager)来进行安装,CM是一个面向Hadoop相关软件的强大SCM工具,它提供了通过Web界面向导的方式进行软件的安装配置,此外还提供了比较基础、友好的监控、预警功能,通过Web UI展示各种已安装软件的资源使用情况、系统运行状态等等。 如果使用CM来管理CDH平台,因为CM使用了监控管理、运行状态数据采集、预警等等很多服务,所以在集群服务器资源使用方面也会比通常的Apache Hadoop版本多很多,如果所需要的Hadoop集群规模超大,比如成百上千个节点,使用CM来安装管理CDH集群能够节省大量时间,而且节省了对整个集群基本的监控的配置管理;如果集群规模比较小,

PB 级海量数据服务平台架构设计实践

基于 PB 级海量数据实现数据服务平台,需要从各个不同的角度去权衡,主要包括实践背景、技术选型、架构设计,我们基于这三个方面进行了架构实践,下面分别从这三个方面进行详细分析讨论: 实践背景 该数据服务平台架构设计之初,实践的背景可以从三个维度来进行说明:当前现状、业务需求、架构需求,分别如下所示: 当前现状 收集了当前已有数据、分工、团队的一些基本情况,如下所示: 数据收集和基础数据加工有专门的 Team 在做,我们是基于收集后并进行过初步加工的基础数据,结合不同行业针对特定数据的需求进行二次加工的。 数据二次加工,会集成基础数据之外的其它有业务属性的数据,比如引入第三方 POI 数据等。 原始数据每天增量大约 30~40TB 左右。 计算集群采用 Spark on YARN 部署模式,大约 400 个节点。 所有数据各种属性、行为信息,都是围绕大约 40亿+ 的移动设备 ID 进行很多倍膨胀,比如每天使用微信 App 的设备的行为信息。 参与该平台的研发人员,对实际数据业务需求了解不会非常深入,因为跨多个行业及其不同数据需求的变化较快。 业务需求 另

基于Spark ML Pipeline构建机器学习应用

使用机器学习的方法可以解决越来越多的实际问题,它在现实世界中的应用越来越广泛,比如智能风控、欺诈检测、个性化推荐、机器翻译、模式识别、智能控制,等等。 机器学习分类 我们都知道,机器学习可以分为三大类:监督学习(Supervised Learning)、无监督学习(Unsupervised Learning)和强化学习(Reinforcement Learning),下面简单介绍一下它们含义: 监督学习 监督学习是根据给定的标签(Label)已知的训练数据集,通过选定的算法在该训练数据集上进行训练学习,最后得到一个可以描述该数据集规律的预测函数,也就是我们所说的模型。有了模型,对于未知标签的输入数据,可以通过该预测函数预测出它的标签。典型的监督学习方法,如分类、回归等。 无监督学习 无监督学习是根据给定的标签(Label)未知的训练数据集,通过训练学习从训练数据集中发现隐藏的模式或结构。典型的无监督学习方法,如聚类分析。 强化学习 强化学习是人工智能中的策略学习的一种,从动物学习、参数扰动自适应控制理论发展而来。这种学习方法是从环境状态到动作映射的学习方法,

Kubernetes基础篇:主要特性、基本概念与总体架构

本文试图将Kubernetes的基础相关知识描述清楚,让一个从来没有Kubernetes实践的开发人员,能够非常容易地理解Kubernetes是什么,能够做哪些事情,以及使用它能带来的好处是什么。 Kubernetes是什么 Kubernetes是一个开源的容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。我们在完成一个应用程序的开发时,需要冗余部署该应用的多个实例,同时需要支持对应用的请求进行负载均衡,在Kubernetes中,我们可以把这个应用的多个实例分别启动一个容器,每个容器里面运行一个应用实例,然后通过内置的负载均衡策略,实现对这一组应用实例的管理、发现、访问,而这些细节都不需要应用开发和运维人员去进行复杂的手工配置和处理。 Kubernetes是Google基于内部Borg开源的容器编排引擎,关于Borg的设计,可以查看对应的论文《Large-scale cluster management at Google with Borg》。这里,我们先说一说Borg系统,它是Google内部的集群管理系统,使用它能够获得如下好处: 在一个分布式系统中进行资源管理是非常复杂的,构建于Borg系统之上的其他系统,无需关

Spark Shuffle过程分析:Map阶段处理流程

默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算。我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示: // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 如果需要修改ShuffleManager实现,则只需要修改配置项spark.shuffle.manager即可,默认支持sort和 tungsten-sort,可以指

Spark Block 存储管理分析

Apache Spark 中,对 Block 的查询、存储管理,是通过唯一的 Block ID 来进行区分的。所以,了解 Block ID 的生成规则,能够帮助我们了解 Block 查询、存储过程中是如何定位 Block 以及如何处理互斥存储/读取同一个 Block 的。可以想到,同一个 Spark Application,以及多个运行的 Application 之间,对应的 Block 都具有唯一的 ID,通过代码可以看到,BlockID 包括:RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId 这 8 种 ID,可以详见如下代码定义: @DeveloperApi case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { override def name: String = "rdd_" + rddId + "_" + splitIndex } // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi case class Sh

Docker Swarm 架构、特性与基本实践

Docker 集群管理和编排的特性是通过 SwarmKit 进行构建的, 其中 Swarm mode 是 Docker Engine 内置支持的一种默认实现。Docker 1.12 以及更新的版本,都支持 Swarm mode,我们可以基于 Docker Engine 来构建 Swarm 集群,然后就可以将我们的应用服务(Application Service)部署到 Swarm 集群中。创建 Swarm 集群的方式很简单,先初始化一个 Swarm 集群,然后将其他的 Node 加入到该集群即可。本文主要基于 Docker Swarm 官网文档,学习总结。 基本特性 Docker Swarm 具有如下基本特性: 集群管理集成进 Docker Engine 使用内置的集群管理功能,我们可以直接通过 Docker CLI 命令来创建 Swarm 集群,然后去部署应用服务,而不再需要其它外部的软件来创建和管理一个 Swarm 集群。 去中心化设计 Swarm 集群中包含 Manager 和 Worker 两类 Node,我们可以直接基于 Docker Engine 来部署任何类型的 Node。而且,在 Swarm 集群运行期间,我们既可以对其作出任何改变,实现对集群的扩容和缩容等,如添加 Manager Node,如删除 Worker Node,而做这些操作不需要暂停或

CentOS 7 Docker基本特性入门实践

Docker是一个开源的应用容器引擎,开发人员可以非常容易地打包已经开发好的应用,同时将应用相关的依赖包也打包到这样一个可移植的容器中,然后发布到任意的Linux主机系统上。Docker是基于Linux Container(LXC)技术实现的一个轻量级虚拟化解决方案,用户可以直接使用容器(Container),来构建自己的应用程序,应用开发人员无需将注意力集中在容器的管理上。Docker的目标是“Build, Ship and Run Any App, Anywhere”,这说明了使用Docker能够实现应用运行的可移植性、便捷性,对开发人员非常友好,只要你的应用是基于Docker进行构建和部署的,在任何时候任何支持Docker的Linux发行版操作系统上都可以运行你的应用程序。 Docker是基于Go语言开发的, 代码开源,可以在Github上查看对应的源码:https://github.com/docker/docker.git。 基本构架 Docker基于Client-Server架构,Docker daemon是服务端,Docker client是客户端。Docker的基本架构,如下图所示: 上图中,除了展现了Docker的Client、Server、Containers、Images、Registry之间的关系,我们主要说明Do

Spring Cloud Netflix构建微服务入门实践

在使用Spring Cloud Netflix构建微服务之前,我们先了解一下Spring Cloud集成的Netflix OSS的基础组件Eureka,对于Netflix的其他微服务组件,像Hystrix、Zuul、Ribbon等等本文暂不涉及,感兴趣可以参考官网文档。这里,我们用最基础的Eureka来构建一个最基础的微服务应用,来演示如何构建微服务,了解微服务的基本特点。 Eureka Eureka是Netflix开源的一个微服务注册组件,提供服务发现特性,它是一个基于REST的服务,主要具有如下功能: 支持服务注册和发现 具有Load Balance和Failover的功能 在进行服务调用过程中,无需知道目标服务的主机(IP)和端口,只要知道服务名就可以实现调用 通过Netfix在Github上的文档,我们看一下Eureka的基本架构,如下图所示: Eureka主要包含如下两个核心组件: Eureka Server Eureka Server是服务注册的服务端组件,负责管理Eureka Client注册的服务,提供服务发现的功能。它支持集群模式部署,集群部署模式中,多个Eureka Server之间会同步服务注册数据,能够保证某一个Eureka Server因为故障挂掉,仍能对外提供注册服务的

Spark UnifiedMemoryManager 内存管理模型分析

Spark 的内存使用,大体上可以分为两类:Execution 内存和 Storage 内存。在 Spark 1.5 版本之前,内存管理使用的是 StaticMemoryManager,该内存管理模型最大的特点就是,可以为 Execution 内存区与 Storage 内存区配置一个静态的 boundary,这种方式实现起来比较简单,但是存在一些问题: 没有一个合理的默认值能够适应不同计算场景下的 Workload 内存调优困难,需要对 Spark 内部原理非常熟悉才能做好 对不需要 Cache 的 Application 的计算场景,只能使用很少一部分内存 为了克服上述提到的问题,尽量提高 Spark 计算的通用性,降低内存调优难度,减少 OOM 导致的失败问题,从 Spark 1.6 版本开始,新增了 UnifiedMemoryManager(统一内存管理)内存管理模型的实现。UnifiedMemoryManager 依赖的一些组件类及其关系,如下类图所示: 从上图可以看出,最直接最核心的就是 StorageMemoryPool 和 ExecutionMemoryPool,它们实现了动态内存池(Memory Pool)的功能,能够动态调整 Storage 内存区与 Execution 内存区之间的 Soft boundary,使内存管理更加灵活。下

Apache Beam:一个开源的统一的分布式数据处理编程库

Apache Beam 是一个开源的数据处理编程库,由 Google 贡献给 Apache 的项目,前不久刚刚成为 Apache TLP 项目。它提供了一个高级的、统一的编程模型,允许我们通过构建 Pipeline 的方式实现批量、流数据处理,并且构建好的 Pipeline 能够运行在底层不同的执行引擎上。刚刚接触该开源项目时,我的第一感觉就是:在编程 API 的设计上,数据集及其操作的抽象有点类似Apache Crunch(MapReduce Pipeline编程库)项目;而在支持统一数据处理模型上,能够让人想到 Apache Flink 项目。如果深入了解 Apache Beam,你会发现未来 Apache Beam 很可能成为数据处理领域唯一一个能够将不同的数据应用统一起来的编程库。 Apache Beam 架构概览 Apache Beam 目前最新版本为 0.5.0-SNAPSHOT,最新的 Release 版本为 0.4.0,很多特性还在开发中。在网上找到一个由 Andrew Psaltis 在 2016 年 6 月份演讲的《Apache Beam: The Case for Unifying Streaming API’s》,引用了其中一个 Apache Beam 的架构图,如下图所示: 上图中,我们可以看到,Apache Beam 核心的主要有两层: