Flink 批处理生成执行计划

我们这里所说的优化执行计划,是指从生成Plan之后,一直到生成JobGraph之前这一期间对Plan进行各种转换、增强、优化处理。从用户编程的Flink批处理程序开始,一直到生成JobGraph的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。
我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示:
Flink-Generating-Execution-Plan
根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。

  • 构建用户编程Operator DAG

把用户程序中设置的各种DataSource、Transformation、DataSink及其配置信息进行收集,以最原始的形态保存在ExecutionEnvironment上线文环境对象中,当然也跟踪了从DataSource到DataSink之间的操作顺序,最后通过ExecutionEnvironment对象,基于DataSink就可以从后向前遍历,直接链接到DataSource。
在上面的处理过程中,涉及到3个比较重要的内容:一是用户根据Flink定义的各种Transformation接口实现并传入的处理函数,也包括Flink为了完成处理内部直接创建的函数对象;二是创建位于org.apache.flink.api.java.operators包下面的Operator具体实现类对象;三是根据用户编写的程序配置,比如并行度、Broadcast变量等等,收集这些配置并设置到对应的Operator对象上。

  • 生成通用Operator DAG

根据ExecutionEnvironment中收集的信息,生成程序Plan。Flink批处理在生成程序Plan的过程中,比较容易理解,就是把原生的Operator(与org.apache.flink.api.java.operators包下面的具体实现相对应),转换成语义更加丰富的Operator(与org.apache.flink.api.common.operators包下面的具体实现相对应),也就是通过调用org.apache.flink.api.java.operators.translateToDataflow(),生成org.apache.flink.api.common.operators.Operator。
这一阶段,生成的Operator DAG中的节点,基本上与上一阶段的Operator DAG中的节点一一对应,不过不同的是会使用一些中间结构来辅助表示一些复杂的操作,像groupBy、join、coGroup等操作,最后还是会转换成一个对应的Operator(也就是DataSet)。
举个例子,下面代码片段是一个join操作,需要指定关联条件及关联后取哪些字段作为结果集:

        DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
                filterDocs.join(filterRanks)
                            .where(0).equalTo(1)
                            .projectSecond(0, 1, 2);

首先,join()返回一个JoinOperatorSets,它并不是一个Operator,而是一个中间结构。然后,where()和equalTo()确定了进行关联的2个数据集对应的关联字段,其中where()返回JoinOperatorSetsPredicate也是一个中间结构,equalTo()返回的是一个Operator。最后,projectSecond()返回一个Operator。
另外,这一阶段还会根据需要对Operator内部的Function进行处理,可能会生成新的Function,用来在最终物理执行的时候进行调用,处理数据,比如sum()操作,在translateToDataflow()中会创建一个内部的AggregatingUdf函数,实现求和的操作。

  • 优化器级别DAG表示

得到Plan以后,使用OptimizerNode来对Plan进一步编译。这一阶段,主要通过GraphCreatingVisitor构建基本OptimizerNode DAG,在创建对应的OptimizerNode的同时,还对各个OptimizerNode之间的连接关系通过抽象的DagConnection来表示。另外,这些创建的DagConnection是什么类型的,也会确定下来,主要包括InComing连接、OutGoing连接。
而对于Broadcast DataSet,同样也会建立与Operator之间的对应连接关系,也通过DagConnection来类似去表示。与其他Operator之间连接不同的是,这种连接会设置OptimizerNode对应Operator的输出数据的传输策略,值为ShipStrategyType.BROADCAST类型,而其他连接类型包括ShipStrategyType.FORWARD、ShipStrategyType.PARTITION_HASH、ShipStrategyType..PARTITION_RANDOM等等。
构建好OptimizerNode DAG以后,再经过IdAndEstimatesVisitor、UnionParallelismAndForwardEnforcer、BranchesVisitor、InterestingPropertyVisitor来进行增强,具体每一步都增强了哪些内容,可以从各个Visitor实现类的名称大概有所了解。

  • 生成最佳执行计划

经过上一步做的比较重要之一的处理,是对每个Operator的数据输出数量(记录数、数据大小)通过Optimizer进行了估算。这一阶段会根据这些信息来生成一个最佳执行计划,这个过程是通过递归方式来实现的。在生成最佳执行计划过程中,还会对生成的DAG执行剪枝优化操作,以使最终生成PlanNode DAG执行的代价最小。
生成最佳执行计划以后,这时就可以创建生成OptimizedPlan了。最后,OptimizedPlan经过BinaryUnionReplacer、RangePartitionRewriter、JavaApiPostPass这几步增强处理后,就可以通过调用JobGraphGenerator.compileJobGraph()来生成JobGraph。

下面,我们通过一个简单的WordCount批处理程序,来展示在生成和优化执行计划过程中,各个阶段对应的DAG图的节点类型。示例代码如下所示:

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text = env.readTextFile(input);
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
        counts.writeAsCsv(params.get("output"), "\n", " ");
        // execute program
        env.execute("WordCount Example");

这个简单的Flink批处理程序,经过上面的翻译优化过程,分别通过下面的线性连接关系来表达,抽象出各层中DAG关系图,如下所示:

readTextFile() → flatMap() → groupBy() → sum() → writeAsCsv()
DataSource → FlatMapOperator → UnsortedGrouping → AggregateOperator→DataSink
GenericDataSourceBase → FlatMapOperatorBase → GroupReduceOperatorBase → GenericDataSinkBase
DataSourceNode → FlatMapNode → GroupReduceNode → DataSinkNode
SourcePlanNode → SingleInputPlanNode → SingleInputPlanNode → SinkPlanNode

上面5个简单的线性DAG,其中,第一行表示用户编程API中调用各个算子所得到的操作DAG,第二行表示通过org.apache.flink.api.java.operators包下面的Operator结构来构建最原始的Operator DAG,第三行表示通过org.apache.flink.api.common.operators包下面的Operator结构来构建Operator DAG,第四行表示通过Optimizer生成的OptimizerNode DAG,第五行表示通过翻译优化得到的最优执行计划PlanNode DAG。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>