Flink 批处理生成最佳执行计划(上篇)

生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。 生成最佳执行计划是一个递归计算的过程:正向从 DataSinkNode 开始直到 DataSourceNode,分别计算每个 OptimizerNode 的最佳计划,然后反向逐步将整个 OptimizerNode DAG 图转换为 PlanNode DAG 图,得到一个最优计划。其中,PlanNode 的类继承结构,如下图所示: 通过与 OptimizerNode 对应的节点结构类图对比,PlanNode 更加抽象了一个层次,更关注 Operator 之间的数据交换策略。 其中,生成最佳执行计划的过程,可以在 Optimizer 类中看到,如下代码所示: // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); 这是一个递归的过程,每个 OptimizerNode 都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从 OptimizerNode DAG 转换为 PlanNode DAG。后面,我们会对 SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode 创建的处理过程

Flink 批处理生成 OptimizerNode DAG 图

OptimizerNode DAG,是基于程序 Plan 中创建好的 Operator DAG,并以 OptimizerNode 作为 DAG 图节点构建完成的。所以,我们先看一下组成 OptimizerNode DAG,都有哪些类型的 OptimizerNode,如下图所示: 通过上面类图,可以看到其中主要有 SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode 等这几大类,根据各个 OptimizerNode 具体实现类名称,可以对应到具体操作功能的 Operator 实现类。比如,MapNode 对应 map 操作,JoinNode 对应 join 操作,等等。 OptimizerNode 结构 下面看一下 OptimizerNode 的数据结构,了解它都抽象了哪些内容,如下图所示: 上图中的结构是每个 OptimizerNode 都包含的内容,具体到每个 OptimizerNode 实现,都会根据 Operator 的属性及行为,补充一些其他的信息。上面的 cachedPlans 是一个基于 PlanNode 的 DAG 的列表,在生成 OptimizerNode 过程中并没有用到,而是基于 OptimizerNode DAG 生成最佳执行计划的时候,通过 PlanNode DAG 来表示的,后续文章我们会单独详细分析,这里先跳过。 下面将对 OptimizerNod

Flink 批处理生成执行计划

我们这里所说的优化执行计划,是指从生成 Plan 之后,一直到生成 JobGraph 之前这一期间对 Plan 进行各种转换、增强、优化处理。从用户编程的 Flink 批处理程序开始,一直到生成 JobGraph 的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。 我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示: 根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。 构建用户编程 Operator DAG 把用户程序中设置的各种 DataSource、Transformation、DataSink 及其配置信息进行收集,以最原始的形态保存在 ExecutionEnvironment 上线文环境对象中,当然也跟踪了从 DataSource 到 DataSink 之间的操作顺序,最后通过 ExecutionEnvironment 对象,基于 DataSink 就可以从后向前遍历,直接链接到 DataSource。 在上面的处理过程中,涉及到3

Flink 批处理生成 JobGraph 流程分析

我们先通过一个简单的批量处理 WordCount 的程序,来了解一下,在编程 API 层面的基本使用方法,以及 Flink 是如何基于 Pipeline 风格的 API 实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount 程序代码,示例如下所示: final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .s

Flink 编程 API 设计分析

使用 Flink 开发批式或流式 Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是 Flink 提供的基本框架,是如何在 API 层面进行统一处理的,或者说尽量使 API 统一,这样有助于我们对 Flink 框架更深入地理解。目前使用 Flink 1.10 版本开发批式和流式 Job,在 API 层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink 数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的 Core APIs 层,结合批式和流式 Job 开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的 JobGraph,以及生成的过程有什么不同。 编程 API 设计 Flink 编程 API 中,我们开发的数据处理 Job 程序,都是始于一个 Source,对应的输入数据源,终于一个 Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程 API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程 API 对批式 Job DAG 中每个节点的抽象使用的是 DataSet,而流式编程 API 中对应的