使用 Flink 开发批式或流式 Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是 Flink 提供的基本框架,是如何在 API 层面进行统一处理的,或者说尽量使 API 统一,这样有助于我们对 Flink 框架更深入地理解。目前使用 Flink 1.10 版本开发批式和流式 Job,在 API 层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink 数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的 Core APIs 层,结合批式和流式 Job 开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的 JobGraph,以及生成的过程有什么不同。 编程 API 设计 Flink 编程 API 中,我们开发的数据处理 Job 程序,都是始于一个 Source,对应的输入数据源,终于一个 Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程 API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程 API 对批式 Job DAG 中每个节点的抽象使用的是 DataSet,而流式编程 API 中对应的
使用 Flink 实现索引数据到 Elasticsearch
使用 Flink 处理数据时,可以基于 Flink 提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API 来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为 Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为 Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的 Sink Operator 实现。 Flink 批式处理模式,运行 Flink Batch Job 时作用在有界的输入数据集上,所以 Job 运行的时间是有时限的,一旦 Job 运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个 Hive SQL 查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的 OutputFormat,然后基于该 OutputFormat 来构建一个 Sink,下面看下 OutputFormat 接口的定义,如下所示: @Public public interface OutputFormat<IT> extends Serializable { void configure(Configuration parameters); void open(int taskNumber, int numTasks) throws IO
Flink 使用 Broadcast State 实现流处理配置实时更新
Broadcast State 是 Flink 支持的一种 Operator State。使用 Broadcast State,可以在 Flink 程序的一个 Stream 中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个 Task 中,使得这些数据记录能够为所有的 Task 所共享,比如一些用于配置的数据记录。这样,每个 Task 在处理其所对应的 Stream 中记录的时候,读取这些配置,来满足实际数据处理需要。 另外,在一定程度上,Broadcast State 能够使得 Flink Job 在运行过程中与外部的其他系统解耦合。比如,通常 Flink 会使用 YARN 来管理计算资源,使用 Broadcast State 就可以不用直接连接 MySQL 数据库读取相关配置信息了,也无需对 MySQL 做额外的授权操作。因为在一些场景下,会使用 Flink on YARN 部署模式,将 Flink Job 运行的资源申请和释放交给 YARN 去管理,那么就存在 Hadoop 集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如 MySQL 等进行连接操作授权,如果忘记对 MysQL 访问授权,Flink Job 被调度到新增的某个新增节点上连接并读取 MySQL 配置信息就会出错。 Broad
Flink Checkpoint、Savepoint 配置与实践
Flink Checkpoint Checkpoint 是 Flink 实现容错机制最核心的功能,它能够根据配置周期性地基于 Stream 中各个 Operator 的状态来生成 Snapshot,从而将这些状态数据定期持久化存储下来,当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些 Snapshot 进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下 Flink Checkpoin t机制,如官网下图所示: Checkpoint 指定触发生成时间间隔后,每当需要触发 Checkpoint 时,会向 Flink 程序运行时的多个分布式的 Stream Source 中插入一个 Barrier 标记,这些 Barrier 会根据 Stream 中的数据记录一起流向下游的各个 Operator。当一个 Operator 接收到一个 Barrier 时,它会暂停处理 Steam 中新接收到的数据记录。因为一个 Operator 可能存在多个输入的 Stream,而每个 Stream 中都会存在对应的 Barrier,该 Operator 要等到所有的输入 Stream 中的 Barrier 都到达。当所有 Stream 中的 Barrier 都已经到达该 Operator,这时所有的 Barrier 在时间上看来是同一个时刻点(表示已经对齐)
Azkaban 集群内部调度原理分析
Azkaban 是一个非常简单实用,而且开源的作业调度系统。在 2.x 版本中不支持集群模式部署,在 3.x 版本中支持集群模式部署,适用于作业量比较大一些的应用场景。有关 Azkaban 更多详细信息,如特点、功能、特性、作业定义等,可以参考官方文档,这里不再详述。 Azkaban 集群架构 下面我们看一下 Azkaban 集群模式的架构,如下图所示: 从上图可见,Azkaban 集群部署模式,主要有 3 个核心的组件: Azkaban WebServer Azkaban WebServer,是整个调度集群的核心,负责所有作业的管理和调度。 Azkaban ExecutorServer Azkaban ExecutorServer,整个调度集群中实际运行作业的节点,该类节点可能是作为一个作业提交的客户端,比如 Spark on YARN 部署模式下,cluster 运行模式时只作为客户端使用,client 运行模式时会有部分计算逻辑;比如普通的 Java 程序需要处理量级较小的数据作业,这时Executor Server 节点可能有较大的工作负载,占用较多节点资源(内存、CPU)。 DB DB,是集群中所有节点运行共用的数据存储,包含作业信息、各种调度元数据等等。 核心调度
Spark Join 处理流程分析
为了更好的分析 Spark Join 处理流程,我们选择具有 Shuffle 操作的示例来进行说明,这比没有 Shuffle 操作的处理流程要复杂一些。本文主要通过实现一个 Join 操作的 Spark 程序,提交运行该程序,并通过 Spark UI 上的各种运行信息来讨论 Spark Join 处理流程。 Spark Join 示例程序 我们先给出一个简单的 Spark Application 程序代码,这里处理的数据使用了 MovieLens 数据集。其中,小表 movies(约 1.4m)、大表 genome-scores(约 323.5m),对这两个表进行 Join 操作。具体的实现代码,如下所示: def main(args: Array[String]): Unit = { val sc = new SparkContext() // movieId,title,genres val movieRdd = sc.textFile("/data/ml-20m/movies.csv") .filter(line => !line.startsWith("movieId")) .map(line => line.split(",")) .map(a => (a(0), a(1))) // movieId,tagId,relevance val scoreRdd = sc.textFile("/data/ml-20m/genome-sc
Spark Shuffle过程分析:Reduce阶段处理流程
Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章 Spark Shuffle过程分析:Map阶段处理流程 了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个MapStatus对象,下面是ShuffleMapTask执行后返回结果的相关代码片段: var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e
Apache Flink:使用EventTime与WaterMark进行流数据处理
在实际开发过程中,我们可能需要接入各种流数据源,比如在线业务用户点击流数据、监控系实时收集到的事件流数据、从传感器采集到的实时数据,等等,为了处理方便他们可能会写入Kafka消息中间件集群中某个/某些topic中,或者选择其它的缓冲/存储系统。这些数据源中数据元素具有固定的时间属性,是在流数据处理系统之外的其它系统生成的。比如,上亿用户通过手机终端操作触发生成的事件数据,都具有对应的事件时间;再特殊一点,可能我们希望回放(Replay)上一年手机终端用户的历史行为数据,与当前某个流数据集交叉分析才能够得到支持某类业务的特定结果,这种情况下,基于数据所具有的事件时间进行处理,就具有很重要的意义了。 下面,我们先从Flink支持的3个与流数据处理相关的时间概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系统对时间概念的抽象有其它叫法,比如,Google Cloud Dataflow中称为时间域(Time Domain)。在Flink中,基于不同的Time Notion来处理流数据,具有不同的意义和结果,所以了解这3个Time Notion非常关键。 Time No
Apache Flink:Keyed Window与Non-Keyed Window
Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction与WindowAllFunction的类设计进行分析,最后通过编程实践来应用。 基本概念 Flink将Window分为两类,一类叫做Keyed Window,另一类叫做Non-Keyed Window。为了说明这两类Window的不同,我们看下Flink官网给出的,基于这两种类型的Window编写代码的结构说明。 基于Keyed Window进行编程,用户代码基本结构如下所示: stream .keyBy(...) <- keyed versus Non-Keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: &quo
Nexus Repository OSS 3安装配置使用
Nexus Repository OSS 3是一个开源的仓库管理系统,提供了更加丰富的功能,而且安装、配置、使用起来也更加简单方便。OSS 3版本主要支持的仓库(Repository)包括如下: bower docker maven npm nuget pypi raw rubygems yum 其中,对于上述每种类型的Nexus仓库,都分别具有如下主要3种类型: hosted:本地仓库,可以将我们内部使用的一些Maven项目,发布到该类型仓库,供内部开发人员使用。 proxy:代理仓库,用来代理远程公共仓库,比如Maven中央仓库。 group:仓库组,用来合并多个类型(hosted/proxy)的仓库。 这里,我们主要以支持Java编程的Maven项目依赖管理和构建进行实践,Nexus版本为nexus-3.7.0-04。 安装配置 下载Nexus Repository Manager软件包: wget https://sonatype-download.global.ssl.fastly.net/nexus/3/nexus-3.7.0-04-unix.tar.gz tar xvzf nexus-3.7.0-04-unix.tar.gz 解压缩后可以看到,生成nexus-3.7.0-04和sonatype-work两个目录: [root@ali-bj01-tst-cluster-004 nexus]# ls nexus-3.7.0-04 sonatype-work 是这两个目录
Apache Flink 1.4.0:Standalone集群模式实践
Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。 Flink集群安装配置 首先,选择一主二从共3个节点来安装配置Flink Standalone集群: Master:ali-bj01-tst-cluster-001.xiweiai.cn Worker:ali-bj01-tst-cluster-002.xiweiai.cn Worker:ali-bj01-tst-cluster-003.xiweiai.cn 为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。 在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令: wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz 接着,修改Flink配置文件flink-1.4.0/conf/flink-conf.yaml,修改后的内容如下所示: env.java.home: /usr/local/java/jdk1.8.0_131/ jobmanager.rpc.
使用Apache Beam实现实时监控统计
网站使用Apache服务器,对于请求网站资源的事件被记录到日志中,我们需要基于该日志文件的数据进行实时监控统计,通过读取IP库数据可以得到每个访问IP的所属地域(国内按城市,国外按国家,IP库中没有的按未知处理)。其实,整个流程我们可以通过Flume收集聚合多个子站日志文件数据,并写入到下游的Kafka消息中间件集群中,然后可以直接从Kafka中进行消费,实现实时监控统计,最后结果更新到Redis中去。 为了简单,我们这里只是通过输入的日志文件作为数据源,下游直接通过Apache Beam来进行实时分析处理,结果输出到多个按时间分组的文件中。我们实现的实时监控功能目标,如下所示: 输入事件日志文件,以及IP库文件; 基于日志文件中的事件时间,每间隔5分钟输出一个统计文件,结果文件中包含“地域”和“访问次数”。 下面是文件格式示例。 事件日志文件的格式,示例如下所示: 113.246.155.26 - - [10/Dec/2017:01:03:28 +0800] "GET /wp-content/themes/media-maven/library/images/bg.jpg HTTP/1.1" 200 8113 "http://shiyanjun.cn/archiv