Flink批处理生成最佳执行计划(上篇)

生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。 生成最佳执行计划是一个递归计算的过程:正向从DataSinkNode开始直到DataSourceNode,分别计算每个OptimizerNode的最佳计划,然后反向逐步将整个OptimizerNode DAG图转换为PlanNode DAG图,得到一个最优计划。其中,PlanNode的类继承结构,如下图所示: 通过与OptimizerNode对应的节点结构类图对比,PlanNode更加抽象了一个层次,更关注Operator之间的数据交换策略。 其中,生成最佳执行计划的过程,可以在Optimizer类中看到,如下代码所示: // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); 这是一个递归的过程,每个OptimizerNode都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从OptimizerNode DAG转换为PlanNode DAG。后面,我们会对SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode创建的处理过程进行详细说明。 基本数据

Flink批处理生成OptimizerNode DAG图

OptimizerNode DAG,是基于程序Plan中创建好的Operator DAG,并以OptimizerNode作为DAG图节点构建完成的。所以,我们先看一下组成OptimizerNode DAG,都有哪些类型的OptimizerNode,如下图所示: 通过上面类图,可以看到其中主要有SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode等这几大类,根据各个OptimizerNode具体实现类名称,可以对应到具体操作功能的Operator实现类。比如,MapNode对应map操作,JoinNode对应join操作,等等。 OptimizerNode结构 下面看一下OptimizerNode的数据结构,了解它都抽象了哪些内容,如下图所示: 上图中的结构是每个OptimizerNode都包含的内容,具体到每个OptimizerNode实现,都会根据Operator的属性及行为,补充一些其他的信息。上面的cachedPlans是一个基于PlanNode的DAG的列表,在生成OptimizerNode过程中并没有用到,而是基于OptimizerNode DAG生成最佳执行计划的时候,通过PlanNode DAG来表示的,后续文章我们会单独详细分析,这里先跳过。 下面将对OptimizerNode节点结构包含的一些重要属性信息进行说明,如

Flink批处理生成执行计划

我们这里所说的优化执行计划,是指从生成Plan之后,一直到生成JobGraph之前这一期间对Plan进行各种转换、增强、优化处理。从用户编程的Flink批处理程序开始,一直到生成JobGraph的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。 我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示: 根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。 构建用户编程Operator DAG 把用户程序中设置的各种DataSource、Transformation、DataSink及其配置信息进行收集,以最原始的形态保存在ExecutionEnvironment上线文环境对象中,当然也跟踪了从DataSource到DataSink之间的操作顺序,最后通过ExecutionEnvironment对象,基于DataSink就可以从后向前遍历,直接链接到DataSource。 在上面的处理过程中,涉及到3个比较重要的内容:一是用

Flink批处理生成JobGraph流程分析

我们先通过一个简单的批量处理WordCount的程序,来了解一下,在编程API层面的基本使用方法,以及Flink是如何基于Pipeline风格的API实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount程序代码,示例如下所示: final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1);

Flink编程API设计分析

使用Flink开发批式或流式Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是Flink提供的基本框架,是如何在API层面进行统一处理的,或者说尽量使API统一,这样有助于我们对Flink框架更深入地理解。目前使用Flink 1.10版本开发批式和流式Job,在API层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的Core APIs层,结合批式和流式Job开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的JobGraph,以及生成的过程有什么不同。 编程API设计 Flink编程API中,我们开发的数据处理Job程序,都是始于一个Source,对应的输入数据源,终于一个Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程API对批式Job DAG中每个节点的抽象使用的是DataSet,而流式编程API中对应的是DataStream。 对于批式Job DAG中,Data