Azkaban 集群内部调度原理分析

Azkaban 是一个非常简单实用,而且开源的作业调度系统。在 2.x 版本中不支持集群模式部署,在 3.x 版本中支持集群模式部署,适用于作业量比较大一些的应用场景。有关 Azkaban 更多详细信息,如特点、功能、特性、作业定义等,可以参考官方文档,这里不再详述。 Azkaban 集群架构 下面我们看一下 Azkaban 集群模式的架构,如下图所示: 从上图可见,Azkaban 集群部署模式,主要有 3 个核心的组件: Azkaban WebServer Azkaban WebServer,是整个调度集群的核心,负责所有作业的管理和调度。 Azkaban ExecutorServer Azkaban ExecutorServer,整个调度集群中实际运行作业的节点,该类节点可能是作为一个作业提交的客户端,比如 Spark on YARN 部署模式下,cluster 运行模式时只作为客户端使用,client 运行模式时会有部分计算逻辑;比如普通的 Java 程序需要处理量级较小的数据作业,这时Executor Server 节点可能有较大的工作负载,占用较多节点资源(内存、CPU)。 DB DB,是集群中所有节点运行共用的数据存储,包含作业信息、各种调度元数据等等。 核心调度

Spark Join 处理流程分析

为了更好的分析 Spark Join 处理流程,我们选择具有 Shuffle 操作的示例来进行说明,这比没有 Shuffle 操作的处理流程要复杂一些。本文主要通过实现一个 Join 操作的 Spark 程序,提交运行该程序,并通过 Spark UI 上的各种运行信息来讨论 Spark Join 处理流程。 Spark Join 示例程序 我们先给出一个简单的 Spark Application 程序代码,这里处理的数据使用了 MovieLens 数据集。其中,小表 movies(约 1.4m)、大表 genome-scores(约 323.5m),对这两个表进行 Join 操作。具体的实现代码,如下所示: def main(args: Array[String]): Unit = { val sc = new SparkContext() // movieId,title,genres val movieRdd = sc.textFile("/data/ml-20m/movies.csv") .filter(line => !line.startsWith("movieId")) .map(line => line.split(",")) .map(a => (a(0), a(1))) // movieId,tagId,relevance val scoreRdd = sc.textFile("/data/ml-20m/genome-sc

Spark Shuffle过程分析:Reduce阶段处理流程

Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章 Spark Shuffle过程分析:Map阶段处理流程 了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个MapStatus对象,下面是ShuffleMapTask执行后返回结果的相关代码片段: var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e

Apache Flink:使用EventTime与WaterMark进行流数据处理

在实际开发过程中,我们可能需要接入各种流数据源,比如在线业务用户点击流数据、监控系实时收集到的事件流数据、从传感器采集到的实时数据,等等,为了处理方便他们可能会写入Kafka消息中间件集群中某个/某些topic中,或者选择其它的缓冲/存储系统。这些数据源中数据元素具有固定的时间属性,是在流数据处理系统之外的其它系统生成的。比如,上亿用户通过手机终端操作触发生成的事件数据,都具有对应的事件时间;再特殊一点,可能我们希望回放(Replay)上一年手机终端用户的历史行为数据,与当前某个流数据集交叉分析才能够得到支持某类业务的特定结果,这种情况下,基于数据所具有的事件时间进行处理,就具有很重要的意义了。 下面,我们先从Flink支持的3个与流数据处理相关的时间概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系统对时间概念的抽象有其它叫法,比如,Google Cloud Dataflow中称为时间域(Time Domain)。在Flink中,基于不同的Time Notion来处理流数据,具有不同的意义和结果,所以了解这3个Time Notion非常关键。 Time No

Apache Flink:Keyed Window与Non-Keyed Window

Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction与WindowAllFunction的类设计进行分析,最后通过编程实践来应用。 基本概念 Flink将Window分为两类,一类叫做Keyed Window,另一类叫做Non-Keyed Window。为了说明这两类Window的不同,我们看下Flink官网给出的,基于这两种类型的Window编写代码的结构说明。 基于Keyed Window进行编程,用户代码基本结构如下所示: stream .keyBy(...) <- keyed versus Non-Keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: &quo

Nexus Repository OSS 3安装配置使用

Nexus Repository OSS 3是一个开源的仓库管理系统,提供了更加丰富的功能,而且安装、配置、使用起来也更加简单方便。OSS 3版本主要支持的仓库(Repository)包括如下: bower docker maven npm nuget pypi raw rubygems yum 其中,对于上述每种类型的Nexus仓库,都分别具有如下主要3种类型: hosted:本地仓库,可以将我们内部使用的一些Maven项目,发布到该类型仓库,供内部开发人员使用。 proxy:代理仓库,用来代理远程公共仓库,比如Maven中央仓库。 group:仓库组,用来合并多个类型(hosted/proxy)的仓库。 这里,我们主要以支持Java编程的Maven项目依赖管理和构建进行实践,Nexus版本为nexus-3.7.0-04。 安装配置 下载Nexus Repository Manager软件包: wget https://sonatype-download.global.ssl.fastly.net/nexus/3/nexus-3.7.0-04-unix.tar.gz tar xvzf nexus-3.7.0-04-unix.tar.gz 解压缩后可以看到,生成nexus-3.7.0-04和sonatype-work两个目录: [root@ali-bj01-tst-cluster-004 nexus]# ls nexus-3.7.0-04 sonatype-work 是这两个目录

Apache Flink 1.4.0:Standalone集群模式实践

Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。 Flink集群安装配置 首先,选择一主二从共3个节点来安装配置Flink Standalone集群: Master:ali-bj01-tst-cluster-001.xiweiai.cn Worker:ali-bj01-tst-cluster-002.xiweiai.cn Worker:ali-bj01-tst-cluster-003.xiweiai.cn 为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。 在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令: wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz 接着,修改Flink配置文件flink-1.4.0/conf/flink-conf.yaml,修改后的内容如下所示: env.java.home: /usr/local/java/jdk1.8.0_131/ jobmanager.rpc.

使用Apache Beam实现实时监控统计

网站使用Apache服务器,对于请求网站资源的事件被记录到日志中,我们需要基于该日志文件的数据进行实时监控统计,通过读取IP库数据可以得到每个访问IP的所属地域(国内按城市,国外按国家,IP库中没有的按未知处理)。其实,整个流程我们可以通过Flume收集聚合多个子站日志文件数据,并写入到下游的Kafka消息中间件集群中,然后可以直接从Kafka中进行消费,实现实时监控统计,最后结果更新到Redis中去。 为了简单,我们这里只是通过输入的日志文件作为数据源,下游直接通过Apache Beam来进行实时分析处理,结果输出到多个按时间分组的文件中。我们实现的实时监控功能目标,如下所示: 输入事件日志文件,以及IP库文件; 基于日志文件中的事件时间,每间隔5分钟输出一个统计文件,结果文件中包含“地域”和“访问次数”。 下面是文件格式示例。 事件日志文件的格式,示例如下所示: 113.246.155.26 - - [10/Dec/2017:01:03:28 +0800] "GET /wp-content/themes/media-maven/library/images/bg.jpg HTTP/1.1" 200 8113 "http://shiyanjun.cn/archiv

基于YARN集群构建运行PySpark Application

Spark Application可以直接运行在YARN集群上,这种运行模式,会将资源的管理与协调统一交给YARN集群去处理,这样能够实现构建于YARN集群之上Application的多样性,比如可以运行MapReduc程序,可以运行HBase集群,也可以运行Storm集群,还可以运行使用Python开发机器学习应用程序,等等。 我们知道,Spark on YARN又分为client模式和cluster模式:在client模式下,Spark Application运行的Driver会在提交程序的节点上,而该节点可能是YARN集群内部节点,也可能不是,一般来说提交Spark Application的客户端节点不是YARN集群内部的节点,那么在客户端节点上可以根据自己的需要安装各种需要的软件和环境,以支撑Spark Application正常运行。在cluster模式下,Spark Application运行时的所有进程都在YARN集群的NodeManager节点上,而且具体在哪些NodeManager上运行是由YARN的调度策略所决定的。 对比这两种模式,最关键的是Spark Application运行时Driver所在的节点不同,而且,如果想要对Driver所在节点的运行环境进行配置,区别很大,但这对于PySpark Application运行

CDH-5.7.0:基于Parcels方式离线安装配置

CDH是Cloudera公司提供的Hadoop发行版,它在原生开源的Apache Hadoop基础之上,针对特定版本的Hadoop以及Hadoop相关的软件,如Zookeeper、HBase、Flume、Sqoop等做了兼容性开发,我们在安装CDH发行版的Hadoop时就无需进行额外繁琐的兼容性测试。 以往安装配置使用Apache Hadoop时,完全需要手动在服务器上,通过命令和脚本进行安装配置,比较复杂而繁琐。使用CDH,我们可以通过Cloudera提供的CM(Cloudera Manager)来进行安装,CM是一个面向Hadoop相关软件的强大SCM工具,它提供了通过Web界面向导的方式进行软件的安装配置,此外还提供了比较基础、友好的监控、预警功能,通过Web UI展示各种已安装软件的资源使用情况、系统运行状态等等。 如果使用CM来管理CDH平台,因为CM使用了监控管理、运行状态数据采集、预警等等很多服务,所以在集群服务器资源使用方面也会比通常的Apache Hadoop版本多很多,如果所需要的Hadoop集群规模超大,比如成百上千个节点,使用CM来安装管理CDH集群能够节省大量时间,而且节省了对整个集群基本的监控的配置管理;如果集群规模比较小,

PB 级海量数据服务平台架构设计实践

基于 PB 级海量数据实现数据服务平台,需要从各个不同的角度去权衡,主要包括实践背景、技术选型、架构设计,我们基于这三个方面进行了架构实践,下面分别从这三个方面进行详细分析讨论: 实践背景 该数据服务平台架构设计之初,实践的背景可以从三个维度来进行说明:当前现状、业务需求、架构需求,分别如下所示: 当前现状 收集了当前已有数据、分工、团队的一些基本情况,如下所示: 数据收集和基础数据加工有专门的 Team 在做,我们是基于收集后并进行过初步加工的基础数据,结合不同行业针对特定数据的需求进行二次加工的。 数据二次加工,会集成基础数据之外的其它有业务属性的数据,比如引入第三方 POI 数据等。 原始数据每天增量大约 30~40TB 左右。 计算集群采用 Spark on YARN 部署模式,大约 400 个节点。 所有数据各种属性、行为信息,都是围绕大约 40亿+ 的移动设备 ID 进行很多倍膨胀,比如每天使用微信 App 的设备的行为信息。 参与该平台的研发人员,对实际数据业务需求了解不会非常深入,因为跨多个行业及其不同数据需求的变化较快。 业务需求 另

基于Spark ML Pipeline构建机器学习应用

使用机器学习的方法可以解决越来越多的实际问题,它在现实世界中的应用越来越广泛,比如智能风控、欺诈检测、个性化推荐、机器翻译、模式识别、智能控制,等等。 机器学习分类 我们都知道,机器学习可以分为三大类:监督学习(Supervised Learning)、无监督学习(Unsupervised Learning)和强化学习(Reinforcement Learning),下面简单介绍一下它们含义: 监督学习 监督学习是根据给定的标签(Label)已知的训练数据集,通过选定的算法在该训练数据集上进行训练学习,最后得到一个可以描述该数据集规律的预测函数,也就是我们所说的模型。有了模型,对于未知标签的输入数据,可以通过该预测函数预测出它的标签。典型的监督学习方法,如分类、回归等。 无监督学习 无监督学习是根据给定的标签(Label)未知的训练数据集,通过训练学习从训练数据集中发现隐藏的模式或结构。典型的无监督学习方法,如聚类分析。 强化学习 强化学习是人工智能中的策略学习的一种,从动物学习、参数扰动自适应控制理论发展而来。这种学习方法是从环境状态到动作映射的学习方法,