Flink 批处理生成执行计划

我们这里所说的优化执行计划,是指从生成 Plan 之后,一直到生成 JobGraph 之前这一期间对 Plan 进行各种转换、增强、优化处理。从用户编程的 Flink 批处理程序开始,一直到生成 JobGraph 的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。
我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示:
Flink-Generating-Execution-Plan
根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。

  • 构建用户编程 Operator DAG

把用户程序中设置的各种 DataSource、Transformation、DataSink 及其配置信息进行收集,以最原始的形态保存在 ExecutionEnvironment 上线文环境对象中,当然也跟踪了从 DataSource 到 DataSink 之间的操作顺序,最后通过 ExecutionEnvironment 对象,基于 DataSink 就可以从后向前遍历,直接链接到 DataSource。
在上面的处理过程中,涉及到3个比较重要的内容:一是用户根据 Flink 定义的各种 Transformation 接口实现并传入的处理函数,也包括 Flink 为了完成处理内部直接创建的函数对象;二是创建位于 org.apache.flink.api.java.operators 包下面的 Operator 具体实现类对象;三是根据用户编写的程序配置,比如并行度、Broadcast 变量等等,收集这些配置并设置到对应的 Operator 对象上。

  • 生成通用 Operator DAG

根据 ExecutionEnvironment 中收集的信息,生成程序 Plan。Flink 批处理在生成程序 Plan 的过程中,比较容易理解,就是把原生的 Operator(与org.apache.flink.api.java.operators包下面的具体实现相对应),转换成语义更加丰富的 Operator(与 org.apache.flink.api.common.operators包下面的具体实现相对应),也就是通过调用 org.apache.flink.api.java.operators.translateToDataflow(),生成 org.apache.flink.api.common.operators.Operator。
这一阶段,生成的 Operator DAG 中的节点,基本上与上一阶段的 Operator DAG 中的节点一一对应,不过不同的是会使用一些中间结构来辅助表示一些复杂的操作,像 groupBy、join、coGroup 等操作,最后还是会转换成一个对应的 Operator(也就是 DataSet)。
举个例子,下面代码片段是一个join操作,需要指定关联条件及关联后取哪些字段作为结果集:

        DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
                filterDocs.join(filterRanks)
                            .where(0).equalTo(1)
                            .projectSecond(0, 1, 2);

首先,join()返回一个 JoinOperatorSets,它并不是一个 Operator,而是一个中间结构。然后,where() 和 equalTo() 确定了进行关联的 2 个数据集对应的关联字段,其中 where() 返回 JoinOperatorSetsPredicate 也是一个中间结构,equalTo() 返回的是一个 Operator。最后,projectSecond() 返回一个 Operator。
另外,这一阶段还会根据需要对 Operator 内部的 Function 进行处理,可能会生成新的 Function,用来在最终物理执行的时候进行调用,处理数据,比如 sum() 操作,在 translateToDataflow() 中会创建一个内部的 AggregatingUdf 函数,实现求和的操作。

  • 优化器级别 DAG 表示

得到 Plan 以后,使用 OptimizerNode 来对 Plan 进一步编译。这一阶段,主要通过 GraphCreatingVisitor 构建基本 OptimizerNode DAG,在创建对应的 OptimizerNode 的同时,还对各个 OptimizerNode 之间的连接关系通过抽象的 DagConnection 来表示。另外,这些创建的DagConnection是什么类型的,也会确定下来,主要包括InComing连接、OutGoing连接。
而对于 Broadcast DataSet,同样也会建立与 Operator 之间的对应连接关系,也通过 DagConnection 来类似去表示。与其他 Operator 之间连接不同的是,这种连接会设置 OptimizerNode 对应 Operator 的输出数据的传输策略,值为 ShipStrategyType.BROADCAST 类型,而其他连接类型包括 ShipStrategyType.FORWARD、ShipStrategyType.PARTITION_HASH、ShipStrategyType.PARTITION_RANDOM 等等。
构建好 OptimizerNode DAG 以后,再经过 IdAndEstimatesVisitor、UnionParallelismAndForwardEnforcer、BranchesVisitor、InterestingPropertyVisitor 来进行增强,具体每一步都增强了哪些内容,可以从各个 Visitor 实现类的名称大概有所了解。

  • 生成最佳执行计划

经过上一步做的比较重要之一的处理,是对每个 Operator 的数据输出数量(记录数、数据大小)通过 Optimizer 进行了估算。这一阶段会根据这些信息来生成一个最佳执行计划,这个过程是通过递归方式来实现的。在生成最佳执行计划过程中,还会对生成的 DAG 执行剪枝优化操作,以使最终生成 PlanNode DAG 执行的代价最小。
生成最佳执行计划以后,这时就可以创建生成 OptimizedPlan 了。最后,OptimizedPlan 经过 BinaryUnionReplacer、RangePartitionRewriter、JavaApiPostPass 这几步增强处理后,就可以通过调用 JobGraphGenerator.compileJobGraph() 来生成 JobGraph。

下面,我们通过一个简单的 WordCount 批处理程序,来展示在生成和优化执行计划过程中,各个阶段对应的 DAG 图的节点类型。示例代码如下所示:

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text = env.readTextFile(input);
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
        counts.writeAsCsv(params.get("output"), "\n", " ");
        // execute program
        env.execute("WordCount Example");

这个简单的 Flink 批处理程序,经过上面的翻译优化过程,分别通过下面的线性连接关系来表达,抽象出各层中 DAG 关系图,如下所示:

readTextFile() → flatMap() → groupBy() → sum() → writeAsCsv()
DataSource → FlatMapOperator → UnsortedGrouping → AggregateOperator→DataSink
GenericDataSourceBase → FlatMapOperatorBase → GroupReduceOperatorBase → GenericDataSinkBase
DataSourceNode → FlatMapNode → GroupReduceNode → DataSinkNode
SourcePlanNode → SingleInputPlanNode → SingleInputPlanNode → SinkPlanNode

上面5个简单的线性 DAG,其中,第一行表示用户编程 API 中调用各个算子所得到的操作 DAG,第二行表示通过 org.apache.flink.api.java.operators 包下面的 Operator 结构来构建最原始的 Operator DAG,第三行表示通过 org.apache.flink.api.common.operators 包下面的 Operator 结构来构建 Operator DAG,第四行表示通过 Optimizer 生成的 OptimizerNode DAG,第五行表示通过翻译优化得到的最优执行计划 PlanNode DAG。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>