Flink 批处理生成最佳执行计划(上篇)

生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。 生成最佳执行计划是一个递归计算的过程:正向从 DataSinkNode 开始直到 DataSourceNode,分别计算每个 OptimizerNode 的最佳计划,然后反向逐步将整个 OptimizerNode DAG 图转换为 PlanNode DAG 图,得到一个最优计划。其中,PlanNode 的类继承结构,如下图所示: 通过与 OptimizerNode 对应的节点结构类图对比,PlanNode 更加抽象了一个层次,更关注 Operator 之间的数据交换策略。 其中,生成最佳执行计划的过程,可以在 Optimizer 类中看到,如下代码所示: // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); 这是一个递归的过程,每个 OptimizerNode 都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从 OptimizerNode DAG 转换为 PlanNode DAG。后面,我们会对 SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode 创建的处理过程

Flink 批处理生成 OptimizerNode DAG 图

OptimizerNode DAG,是基于程序 Plan 中创建好的 Operator DAG,并以 OptimizerNode 作为 DAG 图节点构建完成的。所以,我们先看一下组成 OptimizerNode DAG,都有哪些类型的 OptimizerNode,如下图所示: 通过上面类图,可以看到其中主要有 SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode 等这几大类,根据各个 OptimizerNode 具体实现类名称,可以对应到具体操作功能的 Operator 实现类。比如,MapNode 对应 map 操作,JoinNode 对应 join 操作,等等。 OptimizerNode 结构 下面看一下 OptimizerNode 的数据结构,了解它都抽象了哪些内容,如下图所示: 上图中的结构是每个 OptimizerNode 都包含的内容,具体到每个 OptimizerNode 实现,都会根据 Operator 的属性及行为,补充一些其他的信息。上面的 cachedPlans 是一个基于 PlanNode 的 DAG 的列表,在生成 OptimizerNode 过程中并没有用到,而是基于 OptimizerNode DAG 生成最佳执行计划的时候,通过 PlanNode DAG 来表示的,后续文章我们会单独详细分析,这里先跳过。 下面将对 OptimizerNod

Flink 批处理生成执行计划

我们这里所说的优化执行计划,是指从生成 Plan 之后,一直到生成 JobGraph 之前这一期间对 Plan 进行各种转换、增强、优化处理。从用户编程的 Flink 批处理程序开始,一直到生成 JobGraph 的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。 我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示: 根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。 构建用户编程 Operator DAG 把用户程序中设置的各种 DataSource、Transformation、DataSink 及其配置信息进行收集,以最原始的形态保存在 ExecutionEnvironment 上线文环境对象中,当然也跟踪了从 DataSource 到 DataSink 之间的操作顺序,最后通过 ExecutionEnvironment 对象,基于 DataSink 就可以从后向前遍历,直接链接到 DataSource。 在上面的处理过程中,涉及到3

Flink 批处理生成 JobGraph 流程分析

我们先通过一个简单的批量处理 WordCount 的程序,来了解一下,在编程 API 层面的基本使用方法,以及 Flink 是如何基于 Pipeline 风格的 API 实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount 程序代码,示例如下所示: final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .s

Flink 编程 API 设计分析

使用 Flink 开发批式或流式 Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是 Flink 提供的基本框架,是如何在 API 层面进行统一处理的,或者说尽量使 API 统一,这样有助于我们对 Flink 框架更深入地理解。目前使用 Flink 1.10 版本开发批式和流式 Job,在 API 层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink 数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的 Core APIs 层,结合批式和流式 Job 开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的 JobGraph,以及生成的过程有什么不同。 编程 API 设计 Flink 编程 API 中,我们开发的数据处理 Job 程序,都是始于一个 Source,对应的输入数据源,终于一个 Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程 API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程 API 对批式 Job DAG 中每个节点的抽象使用的是 DataSet,而流式编程 API 中对应的

使用 Flink 实现索引数据到 Elasticsearch

使用 Flink 处理数据时,可以基于 Flink 提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API 来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为 Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为 Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的 Sink Operator 实现。 Flink 批式处理模式,运行 Flink Batch Job 时作用在有界的输入数据集上,所以 Job 运行的时间是有时限的,一旦 Job 运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个 Hive SQL 查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的 OutputFormat,然后基于该 OutputFormat 来构建一个 Sink,下面看下 OutputFormat 接口的定义,如下所示: @Public public interface OutputFormat<IT> extends Serializable { void configure(Configuration parameters); void open(int taskNumber, int numTasks) throws IO

Flink 使用 Broadcast State 实现流处理配置实时更新

Broadcast State 是 Flink 支持的一种 Operator State。使用 Broadcast State,可以在 Flink 程序的一个 Stream 中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个 Task 中,使得这些数据记录能够为所有的 Task 所共享,比如一些用于配置的数据记录。这样,每个 Task 在处理其所对应的 Stream 中记录的时候,读取这些配置,来满足实际数据处理需要。 另外,在一定程度上,Broadcast State 能够使得 Flink Job 在运行过程中与外部的其他系统解耦合。比如,通常 Flink 会使用 YARN 来管理计算资源,使用 Broadcast State 就可以不用直接连接 MySQL 数据库读取相关配置信息了,也无需对 MySQL 做额外的授权操作。因为在一些场景下,会使用 Flink on YARN 部署模式,将 Flink Job 运行的资源申请和释放交给 YARN 去管理,那么就存在 Hadoop 集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如 MySQL 等进行连接操作授权,如果忘记对 MysQL 访问授权,Flink Job 被调度到新增的某个新增节点上连接并读取 MySQL 配置信息就会出错。 Broad

Flink Checkpoint、Savepoint 配置与实践

Flink Checkpoint Checkpoint 是 Flink 实现容错机制最核心的功能,它能够根据配置周期性地基于 Stream 中各个 Operator 的状态来生成 Snapshot,从而将这些状态数据定期持久化存储下来,当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些 Snapshot 进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下 Flink Checkpoin t机制,如官网下图所示: Checkpoint 指定触发生成时间间隔后,每当需要触发 Checkpoint 时,会向 Flink 程序运行时的多个分布式的 Stream Source 中插入一个 Barrier 标记,这些 Barrier 会根据 Stream 中的数据记录一起流向下游的各个 Operator。当一个 Operator 接收到一个 Barrier 时,它会暂停处理 Steam 中新接收到的数据记录。因为一个 Operator 可能存在多个输入的 Stream,而每个 Stream 中都会存在对应的 Barrier,该 Operator 要等到所有的输入 Stream 中的 Barrier 都到达。当所有 Stream 中的 Barrier 都已经到达该 Operator,这时所有的 Barrier 在时间上看来是同一个时刻点(表示已经对齐)

Apache Flink:使用EventTime与WaterMark进行流数据处理

在实际开发过程中,我们可能需要接入各种流数据源,比如在线业务用户点击流数据、监控系实时收集到的事件流数据、从传感器采集到的实时数据,等等,为了处理方便他们可能会写入Kafka消息中间件集群中某个/某些topic中,或者选择其它的缓冲/存储系统。这些数据源中数据元素具有固定的时间属性,是在流数据处理系统之外的其它系统生成的。比如,上亿用户通过手机终端操作触发生成的事件数据,都具有对应的事件时间;再特殊一点,可能我们希望回放(Replay)上一年手机终端用户的历史行为数据,与当前某个流数据集交叉分析才能够得到支持某类业务的特定结果,这种情况下,基于数据所具有的事件时间进行处理,就具有很重要的意义了。 下面,我们先从Flink支持的3个与流数据处理相关的时间概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系统对时间概念的抽象有其它叫法,比如,Google Cloud Dataflow中称为时间域(Time Domain)。在Flink中,基于不同的Time Notion来处理流数据,具有不同的意义和结果,所以了解这3个Time Notion非常关键。 Time No

Apache Flink:Keyed Window与Non-Keyed Window

Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction与WindowAllFunction的类设计进行分析,最后通过编程实践来应用。 基本概念 Flink将Window分为两类,一类叫做Keyed Window,另一类叫做Non-Keyed Window。为了说明这两类Window的不同,我们看下Flink官网给出的,基于这两种类型的Window编写代码的结构说明。 基于Keyed Window进行编程,用户代码基本结构如下所示: stream .keyBy(...) <- keyed versus Non-Keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: &quo

Apache Flink 1.4.0:Standalone集群模式实践

Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。 Flink集群安装配置 首先,选择一主二从共3个节点来安装配置Flink Standalone集群: Master:ali-bj01-tst-cluster-001.xiweiai.cn Worker:ali-bj01-tst-cluster-002.xiweiai.cn Worker:ali-bj01-tst-cluster-003.xiweiai.cn 为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。 在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令: wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz 接着,修改Flink配置文件flink-1.4.0/conf/flink-conf.yaml,修改后的内容如下所示: env.java.home: /usr/local/java/jdk1.8.0_131/ jobmanager.rpc.

Apache Flink:特性、概念、组件栈、架构及原理分析

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的SLA是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapReduce、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。 Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink运行时(Flink Runtime),分别提供了流处理和批处理API,而这两种API也是实现上层面向流处理、批处理类型应用框架的基础。 基本