生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。 生成最佳执行计划是一个递归计算的过程:正向从DataSinkNode开始直到DataSourceNode,分别计算每个OptimizerNode的最佳计划,然后反向逐步将整个OptimizerNode DAG图转换为PlanNode DAG图,得到一个最优计划。其中,PlanNode的类继承结构,如下图所示: 通过与OptimizerNode对应的节点结构类图对比,PlanNode更加抽象了一个层次,更关注Operator之间的数据交换策略。 其中,生成最佳执行计划的过程,可以在Optimizer类中看到,如下代码所示: // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); 这是一个递归的过程,每个OptimizerNode都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从OptimizerNode DAG转换为PlanNode DAG。后面,我们会对SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode创建的处理过程进行详细说明。 基本数据
按分类浏览文章: Flink
Flink批处理生成OptimizerNode DAG图
OptimizerNode DAG,是基于程序Plan中创建好的Operator DAG,并以OptimizerNode作为DAG图节点构建完成的。所以,我们先看一下组成OptimizerNode DAG,都有哪些类型的OptimizerNode,如下图所示: 通过上面类图,可以看到其中主要有SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode等这几大类,根据各个OptimizerNode具体实现类名称,可以对应到具体操作功能的Operator实现类。比如,MapNode对应map操作,JoinNode对应join操作,等等。 OptimizerNode结构 下面看一下OptimizerNode的数据结构,了解它都抽象了哪些内容,如下图所示: 上图中的结构是每个OptimizerNode都包含的内容,具体到每个OptimizerNode实现,都会根据Operator的属性及行为,补充一些其他的信息。上面的cachedPlans是一个基于PlanNode的DAG的列表,在生成OptimizerNode过程中并没有用到,而是基于OptimizerNode DAG生成最佳执行计划的时候,通过PlanNode DAG来表示的,后续文章我们会单独详细分析,这里先跳过。 下面将对OptimizerNode节点结构包含的一些重要属性信息进行说明,如
Flink批处理生成执行计划
我们这里所说的优化执行计划,是指从生成Plan之后,一直到生成JobGraph之前这一期间对Plan进行各种转换、增强、优化处理。从用户编程的Flink批处理程序开始,一直到生成JobGraph的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。 我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示: 根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。 构建用户编程Operator DAG 把用户程序中设置的各种DataSource、Transformation、DataSink及其配置信息进行收集,以最原始的形态保存在ExecutionEnvironment上线文环境对象中,当然也跟踪了从DataSource到DataSink之间的操作顺序,最后通过ExecutionEnvironment对象,基于DataSink就可以从后向前遍历,直接链接到DataSource。 在上面的处理过程中,涉及到3个比较重要的内容:一是用
Flink批处理生成JobGraph流程分析
我们先通过一个简单的批量处理WordCount的程序,来了解一下,在编程API层面的基本使用方法,以及Flink是如何基于Pipeline风格的API实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount程序代码,示例如下所示: final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1);
Flink编程API设计分析
使用Flink开发批式或流式Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是Flink提供的基本框架,是如何在API层面进行统一处理的,或者说尽量使API统一,这样有助于我们对Flink框架更深入地理解。目前使用Flink 1.10版本开发批式和流式Job,在API层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的Core APIs层,结合批式和流式Job开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的JobGraph,以及生成的过程有什么不同。 编程API设计 Flink编程API中,我们开发的数据处理Job程序,都是始于一个Source,对应的输入数据源,终于一个Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程API对批式Job DAG中每个节点的抽象使用的是DataSet,而流式编程API中对应的是DataStream。 对于批式Job DAG中,Data
使用Flink实现索引数据到Elasticsearch
使用Flink处理数据时,可以基于Flink提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的Sink Operator实现。 Flink批式处理模式,运行Flink Batch Job时作用在有界的输入数据集上,所以Job运行的时间是有时限的,一旦Job运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个Hive SQL查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的OutputFormat,然后基于该OutputFormat来构建一个Sink,下面看下OutputFormat接口的定义,如下所示: @Public public interface OutputFormat<IT> extends Serializable { void configure(Configuration parameters); void open(int taskNumber, int numTasks) throws IOException; void wri
Flink使用Broadcast State实现流处理配置实时更新
Broadcast State是Flink支持的一种Operator State。使用Broadcast State,可以在Flink程序的一个Stream中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个Task中,使得这些数据记录能够为所有的Task所共享,比如一些用于配置的数据记录。这样,每个Task在处理其所对应的Stream中记录的时候,读取这些配置,来满足实际数据处理需要。 另外,在一定程度上,Broadcast State能够使得Flink Job在运行过程中与外部的其他系统解耦合。比如,通常Flink会使用YARN来管理计算资源,使用Broadcast State就可以不用直接连接MySQL数据库读取相关配置信息了,也无需对MySQL做额外的授权操作。因为在一些场景下,会使用Flink on YARN部署模式,将Flink Job运行的资源申请和释放交给YARN去管理,那么就存在Hadoop集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如MySQL等进行连接操作授权,如果忘记对MysQL访问授权,Flink Job被调度到新增的某个新增节点上连接并读取MySQL配置信息就会出错。 Broadcast State API 通常,我们首先会创建一个Keyed或
Flink Checkpoint、Savepoint配置与实践
Flink Checkpoint Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下Flink Checkpoint机制,如官网下图所示: Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可
Apache Flink:使用EventTime与WaterMark进行流数据处理
在实际开发过程中,我们可能需要接入各种流数据源,比如在线业务用户点击流数据、监控系实时收集到的事件流数据、从传感器采集到的实时数据,等等,为了处理方便他们可能会写入Kafka消息中间件集群中某个/某些topic中,或者选择其它的缓冲/存储系统。这些数据源中数据元素具有固定的时间属性,是在流数据处理系统之外的其它系统生成的。比如,上亿用户通过手机终端操作触发生成的事件数据,都具有对应的事件时间;再特殊一点,可能我们希望回放(Replay)上一年手机终端用户的历史行为数据,与当前某个流数据集交叉分析才能够得到支持某类业务的特定结果,这种情况下,基于数据所具有的事件时间进行处理,就具有很重要的意义了。 下面,我们先从Flink支持的3个与流数据处理相关的时间概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系统对时间概念的抽象有其它叫法,比如,Google Cloud Dataflow中称为时间域(Time Domain)。在Flink中,基于不同的Time Notion来处理流数据,具有不同的意义和结果,所以了解这3个Time Notion非常关键。 Time No
Apache Flink:Keyed Window与Non-Keyed Window
Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction与WindowAllFunction的类设计进行分析,最后通过编程实践来应用。 基本概念 Flink将Window分为两类,一类叫做Keyed Window,另一类叫做Non-Keyed Window。为了说明这两类Window的不同,我们看下Flink官网给出的,基于这两种类型的Window编写代码的结构说明。 基于Keyed Window进行编程,用户代码基本结构如下所示: stream .keyBy(...) <- keyed versus Non-Keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: &quo
Apache Flink 1.4.0:Standalone集群模式实践
Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。 Flink集群安装配置 首先,选择一主二从共3个节点来安装配置Flink Standalone集群: Master:ali-bj01-tst-cluster-001.xiweiai.cn Worker:ali-bj01-tst-cluster-002.xiweiai.cn Worker:ali-bj01-tst-cluster-003.xiweiai.cn 为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。 在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令: wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz 接着,修改Flink配置文件flink-1.4.0/conf/flink-conf.yaml,修改后的内容如下所示: env.java.home: /usr/local/java/jdk1.8.0_131/ jobmanager.rpc.
Apache Flink:特性、概念、组件栈、架构及原理分析
Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的SLA是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有MapReduce、Tez、Crunch、Spark,实现流处理的开源方案有Samza、Storm。 Flink在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个Flink运行时(Flink Runtime),分别提供了流处理和批处理API,而这两种API也是实现上层面向流处理、批处理类型应用框架的基础。 基本