Flink批处理生成最佳执行计划(上篇)

生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。
生成最佳执行计划是一个递归计算的过程:正向从DataSinkNode开始直到DataSourceNode,分别计算每个OptimizerNode的最佳计划,然后反向逐步将整个OptimizerNode DAG图转换为PlanNode DAG图,得到一个最优计划。其中,PlanNode的类继承结构,如下图所示:
CD.PlanNode
通过与OptimizerNode对应的节点结构类图对比,PlanNode更加抽象了一个层次,更关注Operator之间的数据交换策略。
其中,生成最佳执行计划的过程,可以在Optimizer类中看到,如下代码所示:

// the final step is now to generate the actual plan alternatives
List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);

这是一个递归的过程,每个OptimizerNode都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从OptimizerNode DAG转换为PlanNode DAG。后面,我们会对SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode创建的处理过程进行详细说明。

基本数据结构和定义

首先,我们把与生成最佳执行计划相关的数据结构,以及一些常用的处理逻辑简单介绍一下,方便后续详细说明最佳执行计划的生成过程。

  • PlanNode数据结构

PlanNode的数据结构,如下图所示:
Flink-PlanNode-Structure
(1)对于Channel,对应属性说明如下:
source:在PlanNode DAG图中,当前PlanNode的直接上游PlanNode的引用
target:在PlanNode DAG图中的当前PlanNode的引用
partitioner:根据指定的Key,计算分区,它对应的接口需要实现方法int partition(K key, int numPartitions)
dataDistribution:用来描述数据分布的配置,主要包括数据被分为多少个Bucket、Bucket的Boundary为多少,目前只有在PartitioningProperty.RANGE_PARTITIONED的情况下才会用到该属性配置
tmpMode:在两个Operator连接之间,需要对数据进行临时物化处理的模式,枚举类包含NONE、PIPELINE_BREAKER、CACHED、CACHING_PIPELINE_BREAKER这4个取值
dataExchangeMode:数据交换模式,包含PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED这4种取值
localProps:局部属性,比如单个分区内部的属性配置
globalProps:全局属性,比如跨分区(Partition)的属性配置
(2)对于PlanNode,对应属性说明如下:
template:它是一个OptimizerNode,用来基于它作为模板,创建新的PlanNode
broadcastInputs:用来描述当前PlanNode具有哪些输入的Broadcast DataSet,建立从输入Broadcast DataSet到当前PlanNode对应Operator的连接
outChannels:用来描述当前PlanNode对应Operator,有哪些输出的下游Operator,建立到直接下游Operator之间的连接

  • 资源代价数据结构

在递归调用getAlternativePlans()方法的处理过程中,会使用到一个名称为Costs的类,它内部定义了一个Operator相关的代价属性,如下所示:

    private double networkCost;                // network cost, in transferred bytes
    private double diskCost;        // bytes to be written and read, in bytes
    private double cpuCost;                    // CPU costs
    private double heuristicNetworkCost;
    private double heuristicDiskCost;
    private double heuristicCpuCost;

其中,目前包括网络、I/O、CPU这三种主要的代价计算。在这里,主要是基于OptimizerNode DAG生成过程中对Operator计算所产生的代价进行处理的。

  • 数据交换模式(DataExchangeMode)

首先,我们介绍几个和设置数据交换模式有关的概念:
(1)执行模式(ExecutionMode):主要描述一个批处理程序是Batched执行,还是Pipelined执行,它是从数据交换角度来确定的,使用枚举类ExecutionMode来定义的,目前取值包含PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED。
(2)传输策略(ShipStrategyType):主要描述一个Operator输出结果的传输策略,比如本地FORWARD,HASH重分区、RANGE重分区,使用枚举类ShipStrategyType来定义的,该枚举类的取值包括NONE、FORWARD、PARTITION_RANDOM、PARTITION_HASH、PARTITION_RANGE、PARTITION_FORCED_REBALANCE、BROADCAST、PARTITION_CUSTOM。其中,NONE和FORWARD这两个取值表示不经过网络传输,直接本地传输数据。
(3)中断Pipeline:breakPipeline是一个变量,它标识了数据交换过程中需要中断Pipeline,主要是为了避免潜在的死锁(Deadlock)问题。设置该标志,实际上在数据交换过程中,对Operator之间的数据Connection做了一个解耦,比如不使用Pipeline方式,而是使用Batch方式,同样能够达到数据交换的目的。
设置数据交换模式的值,主要是通过下面的代码来进行处理:

            DataExchangeMode em = DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline);

如何根据exchangeMode、shipStrategy、breakPipeline来设置DataExchangeMode呢?其实,在DataExchangeMode中定义了4组映射关系,能够基于Operator翻译处理过程中exchangeMode、shipStrategy、breakPipeline的取值不同,根据设置的ExecutionMode的值,直接映射到对应的DataExchangeMode,代码片段如下所示:

    private static final DataExchangeMode[] FORWARD = new DataExchangeMode[ExecutionMode.values().length];
    private static final DataExchangeMode[] SHUFFLE = new DataExchangeMode[ExecutionMode.values().length];
    private static final DataExchangeMode[] BREAKING = new DataExchangeMode[ExecutionMode.values().length];

    // initialize the map between execution modes and exchange modes in
    static {
        FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
        SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
        BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;

        FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
        SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
        BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH;

        FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED;
        SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH;
        BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH;

        FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
        SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
        BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
    }

这是一个预先初始化设置的配置选项矩阵,分别对应FORWARD、SHUFFLE、BREAKING三种场景下ExecutionMode与DataExchangeMode的映射关系,为了更加直观的看清上述矩阵,我们画了一个图,如下图所示:
Flink-Batch-Data-Exchange-Mode-Matrix
上图中,三种连接场景与四种执行模式相互组合,非常方便得到对应的数据交换模式(DataExchangeMode)。根据不同情况能够直接获取对应DataExchangeMode的值,例如,Flink中计算DataExchangeMode值的代码片段,如下所示:

        if (breakPipeline) {
            return getPipelineBreakingExchange(executionMode);
        }
        else if (shipStrategy == ShipStrategyType.FORWARD) {
            return getForForwardExchange(executionMode);
        }
        else {
            return getForShuffleOrBroadcast(executionMode);
        }

创建SourcePlanNode

因为构建OptimizerNode DAG的过程中,都是采用从后向前的方式对Operator进行遍历,从而生成了对应的计算DAG图表示。所以,生成执行计划过程中,也是首先会从创建SourcePlanNode后开始返回,对应的代码,如下所示:

        SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",
                this.gprops, this.lprops);

        if(!replicatedInput) {
            candidate.updatePropertiesWithUniqueSets(getUniqueFields());
            final Costs costs = new Costs();
            if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) &&
                    this.estimatedOutputSize >= 0) {
                estimator.addFileInputCost(this.estimatedOutputSize, costs);
            }
            candidate.setCosts(costs);
        } else {
            // replicated input
            final Costs costs = new Costs();
            InputFormat<?,?> inputFormat =
                    ((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
            if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
                    this.estimatedOutputSize >= 0) {
                estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs);
            }
            candidate.setCosts(costs);
        }

上面代码,首先,创建了一个SourcePlanNode;然后,针对Source是否是replicatedInput,取值不同与对应下游Operator的数据交换代价是不同的:ReplicatedInput是从Source开始,会有多个并行的Operator同时以该ReplicatedInput DataSet作为输入,那么对应的代价估计就需要乘以并行度(this.estimatedOutputSize * this.getParallelism());最后,返回了一个包含该SourcePlanNode的List。

创建SingleInputPlanNode

如果一个OptimizerNode是SingleInputNode类型,它主要包括MapNode、FilterNode、FlatMapNode、ReduceNode、GroupCombineNode、GroupReduceNode、MapPartitionNode、PartitionNode、SortPartitionNode这几种具体实现,那么都会创建一个SingleInputPlanNode(它是一个PlanNode)来进行抽象表示。
因为每个SingleInputNode对应的Operator都允许设置Broadcast输入DataSet,所以,对于Broadcast DataSet会做如下处理:

        // calculate alternative sub-plans for broadcast inputs
        final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
        List<DagConnection> broadcastConnections = getBroadcastConnections();
        List<String> broadcastConnectionNames = getBroadcastConnectionNames();

        for (int i = 0; i < broadcastConnections.size(); i++ ) {
            DagConnection broadcastConnection = broadcastConnections.get(i);
            String broadcastConnectionName = broadcastConnectionNames.get(i);
            List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);

            // wrap the plan candidates in named channels
            HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());
            for (PlanNode plan: broadcastPlanCandidates) {
                NamedChannel c = new NamedChannel(broadcastConnectionName, plan);
                DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
                                        ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
                c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);
                broadcastChannels.add(c);
            }
            broadcastPlanChannels.add(broadcastChannels);
        }

上面代码中,最核心的数据结构就是NamedChannel,它是设置名称的Channel,正好能够将用户编程设置的Broadcast DataSet与对应的字符串名称对应起来。NamedChannel中,name是Broadcast DataSet的字符串名称,value是一个Broadcast DataSet对应的PlanNode对象,同时还包含了对应的数据传输策略ShipStrategyType等其他属性。
继续看代码,需要对递归调用getAlternativePlans()返回的PlanNode的List进行处理,分别对每个PlanNode进行处理,代码如下所示:

        // create all candidates
        for (PlanNode child : subPlans) {
            if (child.getGlobalProperties().isFullyReplicated()) {
                // fully replicated input is always locally forwarded if the parallelism is not changed
                if (parallelismChange) {
                    // can not continue with this child
                    childrenSkippedDueToReplicatedInput = true;
                    continue;
                } else {
                    this.inConn.setShipStrategy(ShipStrategyType.FORWARD);
                }
            }

            if (this.inConn.getShipStrategy() == null) {
                // pick the strategy ourselves
                for (RequestedGlobalProperties igps: intGlobal) {
                    final Channel c = new Channel(child, this.inConn.getMaterializationMode());
                    igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline);
                    
                    // if the parallelism changed, make sure that we cancel out properties, unless the
                    // ship strategy preserves/establishes them even under changing parallelisms
                    if (parallelismChange && !c.getShipStrategy().isNetworkStrategy()) {
                        c.getGlobalProperties().reset();
                    }
                    
                    // check whether we meet any of the accepted properties
                    // we may remove this check, when we do a check to not inherit
                    // requested global properties that are incompatible with all possible
                    // requested properties
                    for (RequestedGlobalProperties rgps: allValidGlobals) {
                        if (rgps.isMetBy(c.getGlobalProperties())) {
                            c.setRequiredGlobalProps(rgps);
                            addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
                            break;
                        }
                    }
                }
            } else {
                // hint fixed the strategy
                final Channel c = new Channel(child, this.inConn.getMaterializationMode());
                final ShipStrategyType shipStrategy = this.inConn.getShipStrategy();
                final DataExchangeMode exMode = DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline);

                if (this.keys != null) {
                    c.setShipStrategy(shipStrategy, this.keys.toFieldList(), exMode);
                } else {
                    c.setShipStrategy(shipStrategy, exMode);
                }
                
                if (parallelismChange) {
                    c.adjustGlobalPropertiesForFullParallelismChange();
                }

                // check whether we meet any of the accepted properties
                for (RequestedGlobalProperties rgps: allValidGlobals) {
                    if (rgps.isMetBy(c.getGlobalProperties())) {
                        addLocalCandidates(c, broadcastPlanChannels, rgps, outputPlans, estimator);
                        break;
                    }
                }
            }
        }

如果输入是一个ReplicatedInput,表示当前这个PlanNode是ReplicatedInput下游多个并行Operator中的一个,这种情况下并行度和非ReplicatedInput情况的上游Operator是不相同的,否则直接使用本地的ShipStrategyType.FORWARD数据传输策略即可。
如果当前SingleInputNode对应的输入DagConnection没有设置ShipStrategyType,则会读取它所持有的RequestedGlobalProperties配置信息(如果设置了ShipStrategyType,处理逻辑基本相差不多,后面不再累述):

        final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();

同时,还会创建Channel对象,上面igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline)主要是为Channel设置对应的属性,从而使Channel能够得到它需要的GlobalProperties设置,主要有如下几种情况:
(1)如果Channel对应的Source是ReplicatedInput,则对应的分区属性必须是PartitioningProperty.FULL_REPLICATION和PartitioningProperty.ANY_DISTRIBUTION
(2)如果PartitioningProperty设置为PartitioningProperty.RANDOM_PARTITIONED(或没有设置PartitioningProperty的值),或者值为PartitioningProperty.ANY_DISTRIBUTION,则ShipStrategyType的值需要根据是否对上游(发送数据)和下游(接收数据)的并行度进行了修改来设置:若并行度未变,则为ShipStrategyType.FORWARD,否则为ShipStrategyType.PARTITION_RANDOM
(3)如果并行度没有修改,并且检查GlobalProperties的内容是否满足要求,满足的话设置为ShipStrategyType.FORWARD
(4)除了上面的情况,其他都会根据PartitioningProperty的枚举值来设置ShipStrategyType,主要包括:

PartitioningProperty.FULL_REPLICATION → ShipStrategyType.BROADCAST
PartitioningProperty.ANY_PARTITIONING || PartitioningProperty.HASH_PARTITIONED → ShipStrategyType.PARTITION_HASH
PartitioningProperty.RANGE_PARTITIONED → ShipStrategyType.PARTITION_RANGE
PartitioningProperty.FORCED_REBALANCED → ShipStrategyType.PARTITION_FORCED_REBALANCE
PartitioningProperty.CUSTOM_PARTITIONING → ShipStrategyType.PARTITION_CUSTOM

通过上面处理,已经对Channel进行了正确的配置,那么最后还需要检查Channel设置的GlobalProperties与RequestedGlobalProperties的兼容性,代码片段如下所示:

                    for (RequestedGlobalProperties rgps: allValidGlobals) {
                        if (rgps.isMetBy(c.getGlobalProperties())) {
                            c.setRequiredGlobalProps(rgps);
                            addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
                            break;
                        }
                    }

如果rgps与c.getGlobalProperties()兼容,则上面addLocalCandidates()方法会处理RequestedLocalProperties,这里会创建(基于前面的Channel对象克隆)一个新的Channel对象,来对其进行LocalProperties的设置。其中,主要是对枚举类型LocalStrategy的变量进行设置,主要包括3个取值:NONE(没有设置任何Local Strategy)、SORT(输入是经过排序的)、COMBININGSORT(输入是经过排序的,并且排序过程中应用了Combiner)。
经过前面的处理,已经得到了一个PlanNode的列表,即对应outputPlans列表。最后会对outputPlans中每个PlanNode对应的Operator进行代价计算,对得到的执行计划进行剪枝处理,代码如下所示:

        // cost and prune the plans
        for (PlanNode node : outputPlans) {
            estimator.costOperator(node);
        }
        prunePlanAlternatives(outputPlans);
        outputPlans.trimToSize();


        this.cachedPlans = outputPlans;
        return outputPlans;

上面代码中代价计算与剪枝处理,我们不在这里展开,后续单独进行详细分析。这样,处理到当前SingleInputNode时,我们得到了需要的执行计划outputPlans列表。

创建SinkPlanNode

遍历整个OptimizerNode DAG过程中,最后返回处理的最后一个OptimizerNode应该是DataSinkNode,它也是实现了getAlternativePlans()方法来最终创建生成SinkPlanNode,对应代码如下所示:

        // calculate alternative sub-plans for predecessor
        List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
        List<PlanNode> outputPlans = new ArrayList<PlanNode>();

        … …

        InterestingProperties ips = this.input.getInterestingProperties();
        for (PlanNode p : subPlans) {
            for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
                for (RequestedLocalProperties lp : ips.getLocalProperties()) {
                    Channel c = new Channel(p);
                    gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline);
                    lp.parameterizeChannel(c);
                    c.setRequiredLocalProps(lp);
                    c.setRequiredGlobalProps(gp);
                    outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
                }
            }
        }
        
        // cost and prune the plans
        for (PlanNode node : outputPlans) {
            estimator.costOperator(node);
        }
        prunePlanAlternatives(outputPlans);

通过上述代码可以看出,也对outputPlans中每一个PlanNode对应的Operator进行了代价计算,以及对生成的执行计划进行剪枝处理,最后得到了最终的执行计划outputPlans。

创建DualInputPlanNode

对TwoInputNode进行处理,和SingleInputNode的大体逻辑类似,只是TwoInputNode具有两个DagConnection来连接输入的两个OptimizerNode,处理起来比SingleInputNode更为复杂一些,最终也会包括后面的代价计算和剪枝处理,这里不做过多累述。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>