Spark Shuffle过程分析:Reduce阶段处理流程

Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章 Spark Shuffle过程分析:Map阶段处理流程 了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个MapStatus对象,下面是ShuffleMapTask执行后返回结果的相关代码片段: var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Excep

Apache Flink:使用EventTime与WaterMark进行流数据处理

在实际开发过程中,我们可能需要接入各种流数据源,比如在线业务用户点击流数据、监控系实时收集到的事件流数据、从传感器采集到的实时数据,等等,为了处理方便他们可能会写入Kafka消息中间件集群中某个/某些topic中,或者选择其它的缓冲/存储系统。这些数据源中数据元素具有固定的时间属性,是在流数据处理系统之外的其它系统生成的。比如,上亿用户通过手机终端操作触发生成的事件数据,都具有对应的事件时间;再特殊一点,可能我们希望回放(Replay)上一年手机终端用户的历史行为数据,与当前某个流数据集交叉分析才能够得到支持某类业务的特定结果,这种情况下,基于数据所具有的事件时间进行处理,就具有很重要的意义了。 下面,我们先从Flink支持的3个与流数据处理相关的时间概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系统对时间概念的抽象有其它叫法,比如,Google Cloud Dataflow中称为时间域(Time Domain)。在Flink中,

Apache Flink:Keyed Window与Non-Keyed Window

Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction与WindowAllFunction的类设计进行分析,最后通过编程实践来应用。 基本概念 Flink将Window分为两类,一类叫做Keyed Window,另一类叫做Non-Keyed Window。为了说明这两类Window的不同,我们看下Flink官网给出的,基于这两种类型的Window编写代码的结构说明。 基于Keyed Window进行编程,用户代码基本结构如下所示: stream .keyBy(...) <- keyed versus Non-Keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional

Nexus Repository OSS 3安装配置使用

Nexus Repository OSS 3是一个开源的仓库管理系统,提供了更加丰富的功能,而且安装、配置、使用起来也更加简单方便。OSS 3版本主要支持的仓库(Repository)包括如下: bower docker maven npm nuget pypi raw rubygems yum 其中,对于上述每种类型的Nexus仓库,都分别具有如下主要3种类型: hosted:本地仓库,可以将我们内部使用的一些Maven项目,发布到该类型仓库,供内部开发人员使用。 proxy:代理仓库,用来代理远程公共仓库,比如Maven中央仓库。 group:仓库组,用来合并多个类型(hosted/proxy)的仓库。 这里,我们主要以支持Java编程的Maven项目依赖管理和构建进行实践,Nexus版本为nexus-3.7.0-04。 安装配置 下载Nexus Repository Manager软件包: wget https://sonatype-download.global.ssl.fastly.net/nexus/3/nexus-3.7.0-04-unix.tar.gz tar xvzf nexus-3.7.0-04-unix.tar.gz 解压缩后可以看到,生成nexus-3.7.0-04和sonatype-wor

Apache Flink 1.4.0:Standalone集群模式实践

Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。 Flink集群安装配置 首先,选择一主二从共3个节点来安装配置Flink Standalone集群: Master:ali-bj01-tst-cluster-001.xiweiai.cn Worker:ali-bj01-tst-cluster-002.xiweiai.cn Worker:ali-bj01-tst-cluster-003.xiweiai.cn 为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。 在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令: wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz 接着,修改Flink配置文件flink-1.4.0/conf/fli

使用Apache Beam实现实时监控统计

网站使用Apache服务器,对于请求网站资源的事件被记录到日志中,我们需要基于该日志文件的数据进行实时监控统计,通过读取IP库数据可以得到每个访问IP的所属地域(国内按城市,国外按国家,IP库中没有的按未知处理)。其实,整个流程我们可以通过Flume收集聚合多个子站日志文件数据,并写入到下游的Kafka消息中间件集群中,然后可以直接从Kafka中进行消费,实现实时监控统计,最后结果更新到Redis中去。 为了简单,我们这里只是通过输入的日志文件作为数据源,下游直接通过Apache Beam来进行实时分析处理,结果输出到多个按时间分组的文件中。我们实现的实时监控功能目标,如下所示: 输入事件日志文件,以及IP库文件; 基于日志文件中的事件时间,每间隔5分钟输出一个统计文件,结果文件中包含“地域”和“访问次数”。 下面是文件格式示例。 事件日志文件的格式,示例如下所示: 113.246.155.26 - - [10/Dec/2017:01:03:28 +0800] "GET /wp-conte

基于YARN集群构建运行PySpark Application

Spark Application可以直接运行在YARN集群上,这种运行模式,会将资源的管理与协调统一交给YARN集群去处理,这样能够实现构建于YARN集群之上Application的多样性,比如可以运行MapReduc程序,可以运行HBase集群,也可以运行Storm集群,还可以运行使用Python开发机器学习应用程序,等等。 我们知道,Spark on YARN又分为client模式和cluster模式:在client模式下,Spark Application运行的Driver会在提交程序的节点上,而该节点可能是YARN集群内部节点,也可能不是,一般来说提交Spark Application的客户端节点不是YARN集群内部的节点,那么在客户端节点上可以根据自己的需要安装各种需要的软件和环境,以支撑Spark Application正常运行。在cluster模式下,Spark Application运行时的所有进程都在YARN集群的NodeManager节点上,而且具体在哪些NodeManager上运行是由YARN的调度策略所决定的。 对比这两种模式,最关键的是Spark Application运行时Driver所在的节

CDH-5.7.0:基于Parcels方式离线安装配置

CDH是Cloudera公司提供的Hadoop发行版,它在原生开源的Apache Hadoop基础之上,针对特定版本的Hadoop以及Hadoop相关的软件,如Zookeeper、HBase、Flume、Sqoop等做了兼容性开发,我们在安装CDH发行版的Hadoop时就无需进行额外繁琐的兼容性测试。 以往安装配置使用Apache Hadoop时,完全需要手动在服务器上,通过命令和脚本进行安装配置,比较复杂而繁琐。使用CDH,我们可以通过Cloudera提供的CM(Cloudera Manager)来进行安装,CM是一个面向Hadoop相关软件的强大SCM工具,它提供了通过Web界面向导的方式进行软件的安装配置,此外还提供了比较基础、友好的监控、预警功能,通过Web UI展示各种已安装软件的资源使用情况、系统运行状态等等。 如果使用CM来管理CDH平台,因为CM使用了监控管理、运行状态数据采集、预警等等很多服务,所以在集群服务器资源使用方面也会比通常的Apache Hadoop版本多很多,如果所需要的Hadoop集群规模超大,比如成百上千个节点,使

PB级海量数据服务平台架构设计实践

基于PB级海量数据实现数据服务平台,需要从各个不同的角度去权衡,主要包括实践背景、技术选型、架构设计,我们基于这三个方面进行了架构实践,下面分别从这三个方面进行详细分析讨论: 实践背景 该数据服务平台架构设计之初,实践的背景可以从三个维度来进行说明:当前现状、业务需求、架构需求,分别如下所示: 当前现状 收集了当前已有数据、分工、团队的一些基本情况,如下所示: 数据收集和基础数据加工有专门的Team在做,我们是基于收集后并进行过初步加工的基础数据,结合不同行业针对特定数据的需求进行二次加工的。 数据二次加工,会集成基础数据之外的其它有业务属性的数据,比如引入第三方POI数据等。 原始数据每天增量大约30~40TB左右。 计算集群采用Spark on YARN部署模式,大约400个节点。 所有数据各种属性、行为信息,都是围绕大约40亿的移动设备ID进行很多倍膨胀,比如每天使用微信App的设备的行为信息。 参与该平台的研发人员,对实际数据

基于Spark ML Pipeline构建机器学习应用

使用机器学习的方法可以解决越来越多的实际问题,它在现实世界中的应用越来越广泛,比如智能风控、欺诈检测、个性化推荐、机器翻译、模式识别、智能控制,等等。 机器学习分类 我们都知道,机器学习可以分为三大类:监督学习(Supervised Learning)、无监督学习(Unsupervised Learning)和强化学习(Reinforcement Learning),下面简单介绍一下它们含义: 监督学习 监督学习是根据给定的标签(Label)已知的训练数据集,通过选定的算法在该训练数据集上进行训练学习,最后得到一个可以描述该数据集规律的预测函数,也就是我们所说的模型。有了模型,对于未知标签的输入数据,可以通过该预测函数预测出它的标签。典型的监督学习方法,如分类、回归等。 无监督学习 无监督学习是根据给定的标签(Label)未知的训练数据集,通过训练学习从训练数据集中发现隐藏的模式或结构。典型的无监督学习方法,如聚类分析。 强化学习 强化学习是人工智能中的策略学

Kubernetes基础篇:主要特性、基本概念与总体架构

本文试图将Kubernetes的基础相关知识描述清楚,让一个从来没有Kubernetes实践的开发人员,能够非常容易地理解Kubernetes是什么,能够做哪些事情,以及使用它能带来的好处是什么。 Kubernetes是什么 Kubernetes是一个开源的容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。我们在完成一个应用程序的开发时,需要冗余部署该应用的多个实例,同时需要支持对应用的请求进行负载均衡,在Kubernetes中,我们可以把这个应用的多个实例分别启动一个容器,每个容器里面运行一个应用实例,然后通过内置的负载均衡策略,实现对这一组应用实例的管理、发现、访问,而这些细节都不需要应用开发和运维人员去进行复杂的手工配置和处理。 Kubernetes是Google基于内部Borg开源的容器编排引擎,关于Borg的设计,可以查看对应的论文《Large-scale cluster management at Google with Borg》。这里,我们先说一说Borg系统,它是Google内部的集群管理系统,使用它

Spark Shuffle过程分析:Map阶段处理流程

默认配置情况下,Spark在Shuffle过程中会使用SortShuffleManager来管理Shuffle过程中需要的基本组件,以及对RDD各个Partition数据的计算。我们可以在Driver和Executor对应的SparkEnv对象创建过程中看到对应的配置,如下代码所示: // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 如果需要修改